Compute¶
The Compute API provides scalable compute capabilities to parallelize your computations. It works by packaging your Python code and executing the code on nodes hosted by Descartes Labs in our cloud infrastructure. These nodes are able to access imagery at extremely high rates of throughput which, paired with horizontal scaling, allow you to execute computations over nearly any spatio-temporal scale.
All features described here require a recent version of the Descartes Labs Python client. See these instructions for installing the latest client.
Note
For information about API Quotas and limits see our Quotas & Limits page.
Basic Example¶
This basic example shows how to create a new Function
and
invoke it to schedule a single Job.
Note
All the following examples use Python 3.8. You may need to adapt these to your Python version
by changing the image
argument to match your Python version.
See Choosing Your Environment for the available images.
Note
The source for the Function
must be available to Compute.
These examples must be run by placing the code in a file and executing that file with Python.
from descarteslabs.compute import Function
def hello(i):
import geopandas
print(geopandas)
return "hello {}".format(i)
print("creating function")
async_func = Function(
hello,
name="my-compute-hello",
image="python3.8:latest",
cpus=0.25,
memory=512,
maximum_concurrency=1,
timeout=600,
retry_count=0,
requirements=[
"geopandas==0.10.2",
],
)
async_func.save()
# invoke the function
print("submitting a job")
job = async_func(5)
# print the job result and logs
print("waiting for the job to complete")
job.wait_for_completion()
print(job.result())
print(job.log())
We define a Function called hello
which prints out information about
the geopandas
package, and returns the string hello <argument>
.
Then we generate a new Function using the Function
class
constructor which specifies the entrypoint function hello
, gives the Function a name, and
specifies a Docker image that defines the environment in which the code will be executed.
Finally, we invoke the async function to create a Job. This submits the Job to the Function
created by the Function
call, and the reference to the Job is
stored in the job
variable. This also triggers instances to spin up on the backend to execute
the Job. Instance management is handled in the background. Instances are created or destroyed as
needed to match the compute resources required by the jobs.
A few important features of the Compute API are highlighted by this example:
- You can pass any JSON-serializable argument to a Job, e.g. arguments with type
str
,dict
,list
,None
, or any numeric data type. - Your can import non-standard Python packages to be used in your entrypoint if the packages are
specified as
requirements
or already present in the image you’ve selected. - You can access any logging or debugging information, including print statements executed inside
your function, through the logs stored in
job.log
.
Advanced Compute Usage¶
Advanced features of Compute allow you to
- organize your code using standard Python package and module conventions instead of writing all of your code inside a single function
- add Python dependencies and specify particular version requirements
- include data files that your function requires to run
We recommend that you use these features to improve the readability of your code and better control the environment your code executes on.
Python Package Example¶
This example shows all the features you can use when using Python packages to organize your code.
This and the following examples require some example code. Download the example code
.
See scripts/complete_example.py
.
from descarteslabs.compute import Function
print("creating function")
async_func = Function(
"compute_examples.complete.simplify",
name="my-complete-compute-example",
image="python3.8:latest",
cpus=0.25,
memory=512,
maximum_concurrency=1,
timeout=600,
retry_count=0,
requirements=[
"geopandas==0.10.2",
],
include_modules=[
"compute_examples",
]
)
async_func.save()
# invoking the function
print("submitting a job")
job = async_func(5)
# print the job result and logs
print("waiting for the job to complete")
job.wait_for_completion()
print(job.result())
print(job.log)
Instead of defining our entrypoint function in the deployment script, we’ve organized our code
using common Python conventions.
We’ve created a compute_examples.complete
module which contains the simplify
function.
Additionally, we tell the Function to include this package, some additional data, and
specific Python requirements for it to run successfully.
Including local packages (include_modules).
Your function can make use of any local modules and packages. Specify them by the name you
would use to import them. This includes cython module source files (with some restrictions, see the section on Cython Code).
In this example the assumption is there is a local directory compute_examples
with a complete.py
file that defines
a simplify
function. All submodules of the compute_examples
package will be included.
Making Python dependencies available to your code (requirements).
Your function can make use of any external Python dependencies that you specify as requirements. In this example,
we specify a descarteslabs
client version and a geopandas
version. As long as you pick an image with your desired
Python version (Python 3.8 in this case), you can upgrade or downgrade any of your other package dependencies as needed.
Including data files (include_data).
You can include local data files that your function and included code can read. Wildcard patterns such as the *
(asterisk) - meaning any string - are supported. Your code must use the pkg_resources
API to read data files (see below).
Code Organization¶
We suggest that you use customary ways of organizing the code for a Python project. A common way to organize your source repository looks like this:
myproject/
├── my_package/
| ├── data/
| | └── my_data.txt
| ├── __init__.py
| ├── models.py
| └── utils.py
| └── cython_module.pyx
├── scripts/
| └── deploy_function.py
└── requirements.txt
- The project’s Python code is all contained within a package called
my_package
. - Data is co-located with code within
my_package
so it can be referenced relative to the source code. - A requirements file at the top level lists all the dependencies for the the source code. The same requirements file can be given when creating a Function.
- A
deploy_function.py
script creates a new Function and kicks off jobs. It contains an entrypoint function (see below) which imports code frommy_package
to use.
This example follows some general guidelines. But you are not restricted to a single package and you can organize your code in any way you want, as long as you can put it together as a list of module names importable in your current local Python environment.
Entrypoint Function¶
You can specify an entrypoint function two ways. As a referenced function:
from descarteslabs.compute import Function
def f(x):
from my_package import my_entrypoint
return my_entrypoint(x)
async_func = Function(
f,
name='hello-world',
image="python3.8",
include_modules=[
'my_package',
],
)
Alternatively, you can use a fully-qualified function name:
from descarteslabs.compute import Function
async_func = Function(
'my_package.my_entrypoint',
name='hello-world',
image="python3.8",
include_modules=[
'my_package',
],
)
Some restrictions apply to one or both methods of passing an entrypoint function:
- *function references only* The function needs to be completely self-contained. Globals (variables defined in the top-level module namespace) cannot be referenced. Define any variables and constants within the function’s local scope. All modules it uses need to be imported within the function. The function can’t be decorated.
- *fully-qualified function name* Any modules referenced in your packages and submodules need to be locally importable.
- You can only return JSON-serializable values from the function. If a function returns a value that cannot be JSON-serialized, your jobs will fail.
- You can only pass JSON-serializable arguments to the function, e.g. arguments with type
str
,dict
,list
,None
, or any numeric data type.
Python Dependencies¶
You can specify your Python dependencies in two ways. You can give a list of dependencies:
from descarteslabs.compute import Function
async_func = Function(
requirements=[
"descarteslabs[complete]>=2.0.0",
"scikit-image==0.13.1".
"scipy>=1.0.0",
],
...
)
If you already have your dependencies in a standard requirements file you can give a path (absolute or relative to the current working directory) to that:
from descarteslabs.compute import Function
async_func = Function(
requirements="path/to/requirements.txt",
...
)
The dependency specification and requirements file use the same format you are used to from standard Python packaging tools such as pip. For exhaustive details on this see PEP 508 for dependency specification and the pip documentation on requirements files.
If you specify a different version for a requirement that already exists on the image, your specified version will take precedence over the existing version, allowing you to upgrade or downgrade dependencies as required.
Cython Code¶
Cython extension modules can be included in your code in much the same way as regular Python modules. See compute_examples/scripts/cython_example.py
. The source files (.pyx) will be compiled into extension modules (.so) during the build phase. However, there are a few restrictions:
- Source cython files in the working directory (where the deploy script is being run from) cannot be included. Instead, simply create a subdirectory e.g.
my_package
and import the cython module asmy_package.cython_module
as in the examples. - Compute cannot directly execute a function from within a cython module as the Entrypoint Function. Instead of executing
cython_example.fib
, create a wrapper function in the deployment script that imports and executescython_example.fib
. Use the wrapper function as the entrypoint. - numpy.get_include() will be added to the cythonize’s include_dirs argument to allow the compiler to find numpy header and library files. If you request a specific version of numpy in Function requirements while using numpy in a cython module, the job may fail.
- Cython modules will be compiled using the default settings (except for adding numpy include dirs, discussed above). Cython compiler options are not currently supported.
Life Cycle¶
When you submit a Function, it goes through several states.
awaiting_bundle
– This means that the request was received but is waiting for the corresponding code to be uploadedbuilding
– This means that the code was received and a Function image is being createdbuild_failed
– This means that the Function image could not be created, see the next section for an explanation of Build Failuresrunning
– The Function is ready to receive requests for jobs
Build Failures¶
If you give Python dependencies for your Function, they are essentially installed with pip from PyPI into your image before a Function is run. There is a chance that this dependency build fails. Here are a few reasons why it might fail:
- You have a typo in your list of requirements and the package doesn’t exist
- A package version you request is not compatible with the environment (e.g. incompatible Python version)
- A package needs system libraries or tools to build that are not present in the environment
- The package fails to download from PyPI because of a transient network problem
- Data or code files you included are too large
If a problem occurs during the build, the Function will be in a “build failed” state and not accept jobs anymore.
Data Files¶
You can specify data files to be included as a list of patterns:
from descarteslabs.compute import Function
async_func = Function(
include_data=[
'my_package/data/*.txt',
'my_package/data/image??.png',
'my_package/data/document.rst',
],
...
)
This supports Unix-style pattern expansion as per the glob module in the Python standard library.
In your code you must read data files using the standard pkg_resources
API - not by looking for and opening files directly:
import pkg_resources
import my_package
# Read a file as a string
text = pkg_resources.resource_string(my_package.__name__, "data/data.txt")
# Open a file as a file-like object
file_like = pkg_resources.resource_stream(my_package.__name__, "data/data.txt")
We reference data files relative to the package they are contained in. For example, the original inclusion path for the file referenced
here would have been my_package/data/data.txt
- in the package my_package
. Colocate your data with your code in a package as much as possible.
The pkg_resources
API is part of setuptools, read more details about it in its documentation.
Compute Best Practices¶
Make the Function idempotent and deterministic
The Compute service guarantees that every submitted job will run at least once. Because jobs run on scalable cloud infrastructure it is possible for a job to be preempted occasionally - this means a job can forcibly abort at any point in time. If this happens, it will be restarted from the beginning.
From this follows that the compute function should be idempotent and (usually) deterministic: if it’s aborted at any point and restarted it should still work and it should produce the same result for the same input. If a job is long-running and produces an intermediate result (which is for example persisted to the storage service) it’s a good practice to check for the presence of the intermediate result before expensively producing it again. This saves time in case the previous run for the same input was preempted.
Make a job a small unit of work
A single job has low overhead and startup cost from the point of view of the Compute service. There is little penalty for having lots of small jobs, so it’s recommended to break up work into the smallest parallelizable unit per job. Jobs with a shorter runtime are also less likely to be preempted. A good typical job runtime is a few minutes, though it is possible to have a job run for hours.
The code in the function itself may have a high startup cost. A typical example is a function that needs to download a Tensorflow model over the network and load it into memory. In this case there may be a balance to strike between many jobs, each of which has the same model loading overhead, and fewer jobs that run several independent inputs against the Tensorflow model, amortizing some of the model loading cost. The right balance depends on your constraints on total runtime and cost.
Use job results
Each job produces a result when it completes. The result includes the return value of the compute function, any output written by the code to stdout/stderr, and - in case of a failure - details about raised exceptions. Results for a Function are persisted and can be queried through the Compute API.
Typically the outcome of a job is some new piece of data such as metrics, a classification or a
geometry. If that data needs to be persisted and is easily JSON-serializable the simplest solution
is to return it from the compute function as the result.
iter_results`()
can then iterate over all results, and
result()
retrieves individual results
by job id. See Retries, reruns and job results for example code.
Use retries and reruns to handle failures
If a function is doing anything that may occasionally fail by raising an exception, for example
network requests through the Raster API, it’s often a good idea not to do explicit error
handling. Instead, a Function can handle occasional failures by giving a retry_count
during
Function creation (i.e., Function
); if any uncaught
exceptions are raised during the execution of a function it is retried this many times before it is
finally considered a failure. This works particularly well if jobs are small, idempotent units
of work as recommended above.
As an alternative or in addition to retries, a set of jobs can also be rerun through the client.
rerun()
reruns all jobs in a function that have failed.
See Retries, reruns and job results for example code.
More Examples¶
Multiple Jobs Example¶
This example illustrates the more typical use case of submitting multiple jobs to a new function.
See scripts/multiple_jobs.py
from descarteslabs.compute import Function
# create the Function
print("creating function")
async_func = Function(
"compute_examples.basic.generate_random_image",
name='my-job-random-image',
image="python3.8",
include_modules=[
'compute_examples'
]
)
async_func.save()
# submit 20 jobs to the Function
print("submitting jobs")
jobs = async_func.map(range(20))
# print the shape of the image array returned by each job
print("starting to wait for job completions")
for job in jobs:
job.wait_for_completion()
if job.status == JobStatus.SUCCESS:
print(job.result().shape)
else:
print(job.log())
Here, we reference the "compute_examples.basic.generate_random_image"
function which generates a random image using
numpy with the same number of bands as the value passed to the num_bands
parameter.
This example highlights a few additional features of the Compute API:
- To submit jobs to the Function, we are using the
map()
method to submit a job for each of the elements in the list. This is typically the most efficient way to submit jobs to a Function, particularly if the number of jobs is large. You are also able to submit jobs one at a time, e.g. within in a for-loop. - We use the
wait_for_completion()
method to retrieve the results for each job as it is completed. Within this loop, we also catch exceptions and print the logs of any failed job.
It’s important to note that the return value from the entrypoint function is converted to a list
because
return values must be JSON-serializable.
Retries, reruns and job results¶
This examples demonstrates how to use retries and reruns to make compute more robust and how to make use of job results.
This is a function that takes a single Descartes Labs tile as input and returns a histogram of the pixel values of the NIR band of a Sentinel-2 mosaic around July 2022:
def nir_histogram(tile):
from descarteslabs.catalog import *
from descarteslabs.catalog import properties as p
import numpy as np
image_collection = (
Product.get("esa:sentinel-2:l1c:v1").images()
.intersects(tile)
.filter(p.cloud_fraction < 0.2)
.filter("2022-07-01" <= p.acquired < "2022-01-01")
.sort("acquired")
.limit(10)
).collect()
tile_mosaic = image_collection.mosaic("nir", resolution=120)
histogram, _ = np.histogram(
tile_mosaic,
bins=100,
range=(0, 10000),
density=False,
)
return histogram.tolist()
Each histogram is an array of 100 elements corresponding to pixel counts in 100 bins, evenly spaced from pixel values 0 to 10000. For example, the first bin is the total number of pixels in a tile that have values 0 to 100.
We can create a Function from this function and run it with tiles covering the state of New Mexico:
from descarteslabs.compute import Compute
from descarteslabs.geo import DLTile
async_func = Function(
nir_histogram,
name="nir-histogram",
image="python3.8",
retry_count=3,
)
async_func.save()
nm_geom = {
"type": "Polygon",
"coordinates": [[
[-109.039306640625, 37.00255267215955], [-109.039306640625, 31.3348710339506],
[-108.21533203125, 31.344254455668054], [-108.19335937499999, 31.784216884487385],
[-106.490478515625, 31.784216884487385], [-106.490478515625, 31.99875937194732],
[-103.062744140625, 31.99875937194732], [-102.996826171875, 37.00255267215955],
[-109.039306640625, 37.00255267215955]
]]
}
resolution = 10
tile_size = 2000
padding = 0
tiles = DLTile.from_shape(nm_geom, resolution, tile_size, padding)
async_func.map(tiles)
async_func.wait_for_completion()
Segmenting a large geographic area into tiles and processing one tile per job like this is a common pattern to parallelize work. This will kick off and wait for the completion of 867 jobs, each computing a histogram for one 2000x2000 pixel tile at full resolution of the Sentinel-2 NIR band (10m per pixel).
When creating the Function, we passed an argument retry_count=3
. The nir_histogram
function uses a raster call - there’s a small chance that this will raise an exception, e.g.,
because of network instability or exceeded rate/quota limits. Rather than doing explicit error
handling in the function, we can rely on the retry feature of Compute. If a job raises an exception
here it is retried 3 times before it is discarded as a failure. Using retries instead of explicit
error handling is recommended if job runtimes are reasonably short.
In the very unlikely case that some jobs failed even with retries, there is a quick way to rerun all failed jobs:
async_func.rerun()
async_func.wait_for_completion()
rerun()
requires a reference to the Function.
In this case we take it from the previously created Function.
from descarteslabs.compute import Function
async_func = Function.get("<function-id>")
async_func.rerun()
async_func.wait_for_completion()
We broke up our geographic area into tiles so we can run a computation on high-resolution imagery without running into memory limits and to speed it up through parallelization. In the end we are after the aggregate computation across the whole area of the state of New Mexico. We returned the histograms for each tile from the function, so they are now stored as job results. We can retrieve and aggregate them:
import json
import numpy as np
import requests
success_results = async_func.iter_results()
aggregated_histogram = np.zeros((100,))
for result in success_results:
histogram_list = result
aggregated_histogram += np.asarray(histogram_list)
iter_results()
iterates over all completed jobs for a Function,
optionally restricted by some search criteria. Here we only query for successful jobs.
aggregated_histogram
is now a numpy histogram across the whole state of New Mexico.
This shows how it is often natural to rely on the results if indeed the return value of the
function is the crucial outcome of a job - as opposed to other cases where the return value is
insignificant because the side effect of the job matters, such as typically the upload of a new
raster scene into the catalog.
In-Memory file storage¶
You can use the directory located at /tmp
for file storage while your job is running.
This directory is a memory-backed filesystem and is a good place to write temporary files.
Keep in mind that any data written to /tmp
counts against the memory limit of your job.
Use /tmp
if you can tolerate the memory usage and you have a lot of IO or you need
high-performance IO.
See scripts/cache_example.py
from descarteslabs.compute import Function
def hello(i):
from compute_examples.cache import hello
# specify a file location in the cache to write files to
return hello(i, "/tmp/geometry.wkt")
print("creating function")
async_func = Function(
hello,
name='my-compute-hello',
image="python3.8",
include_modules=[
"compute_examples"
]
)
async_func.save()
# submit a job to the function
print("submitting a job")
job = async_func(5)
# print the job result and logs
print("waiting for the job to complete")
print(job.result())
print(job.log())
Choosing Your Environment¶
The execution environment for your function in the cloud is defined by the docker image you pick when creating the function. The below images are available covering typical use cases.
Match your local Python version to the image you choose. Your function will be rejected or might not run successfully if there is a mismatch between your local Python version and the Python version in the image. A differing bug release version (the “x” in Python version “3.8.x”) is fine.
Current Images¶
- Python 3.9, latest
- Image:
python3.9:master_ec20a887a6a964cf
Date: 05/02/2023Python highlights: GDAL, numpy, pandas, scikit-image, scikit-learn, scipy, Tensorflow, PyTorchOther libraries and tools: proj 7.2.0, GDAL 3.2.2affine>=2.2.2 blosc>=1.10.6 cachetools>=3.1.1 cython==0.29.33 descarteslabs==1.12.1 dill>=0.3.6 dynaconf>=3.1.11 Fiona>=1.8.20,<1.9 geojson>=2.5.0 h5py==3.7.0 imagecodecs>=2021.5.20 Keras==2.11.0 lazy_object_proxy>=1.7.1 mercantile>=1.1.3 networkx==2.4 numpy>=1.21.6,<1.23.0;python_version<'3.8' numpy>=1.21.6;python_version>='3.8' and python_version<'3.11' numpy>=1.23.2;python_version>='3.11' opencv-python-headless==4.7.0.68 Pillow>=9.2.0 psutil==5.9.4 pytz>=2021.1 PyYAML==6.0.0 rasterio==1.2.10 requests>=2.28.1,<3 scikit-image==0.19.3 scikit-learn==0.24.2 scipy==1.7.3 setuptools>=65.6.3 shapely>=1.8.1 strenum>=0.4.8 tensorflow==2.11.0 tifffile==2021.4.8 torch==1.13.1+cpu torchvision==0.14.1+cpu tqdm>=4.32.1 xarray==0.20.2
- Python 3.8, latest
- Image:
python3.8:master_ec20a887a6a964cf
Date: 05/02/2023Python highlights: GDAL, numpy, pandas, scikit-image, scikit-learn, scipy, Tensorflow, PyTorchOther libraries and tools: proj 7.2.0, GDAL 3.2.2affine>=2.2.2 blosc>=1.10.6 cachetools>=3.1.1 cython==0.29.33 descarteslabs==1.12.1 dill>=0.3.6 dynaconf>=3.1.11 Fiona>=1.8.20,<1.9 geojson>=2.5.0 h5py==3.7.0 imagecodecs>=2021.5.20 Keras==2.11.0 lazy_object_proxy>=1.7.1 mercantile>=1.1.3 networkx==2.4 numpy>=1.21.6,<1.23.0;python_version<'3.8' numpy>=1.21.6;python_version>='3.8' and python_version<'3.11' numpy>=1.23.2;python_version>='3.11' opencv-python-headless==4.7.0.68 Pillow>=9.2.0 psutil==5.9.4 pytz>=2021.1 PyYAML==6.0.0 rasterio==1.2.10 requests>=2.28.1,<3 scikit-image==0.19.3 scikit-learn==0.24.2 scipy==1.7.3 setuptools>=65.6.3 shapely>=1.8.1 strenum>=0.4.8 tensorflow==2.11.0 tifffile==2021.4.8 torch==1.13.1+cpu torchvision==0.14.1+cpu tqdm>=4.32.1 xarray==0.20.2