# Copyright 2018-2024 Descartes Labs.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from concurrent.futures import TimeoutError
import itertools
import requests.exceptions
import time
import urllib3.exceptions
import warnings
from strenum import StrEnum
from descarteslabs.exceptions import ServerError
from .catalog_base import CatalogObjectBase, check_deleted, check_derived, hybridmethod
from .attributes import (
Attribute,
CatalogObjectReference,
Timestamp,
EnumAttribute,
MappingAttribute,
ListAttribute,
TypedAttribute,
)
from .image import Image
from .search import Search
[docs]class ImageUploadType(StrEnum):
"""The type of upload data.
Attributes
----------
NDARRAY : enum
An multidimensional, homogeneous array of fixed-size items representing one
or more images.
FILE : enum
A file on disk containing one or more images.
"""
NDARRAY = "ndarray"
FILE = "file"
[docs]class OverviewResampler(StrEnum):
"""Allowed overview resampler algorithms.
Attributes
----------
NEAREST : enum
Applies a nearest neighbour (simple sampling) resampler
AVERAGE : enum
Computes the average of all non-NODATA contributing pixels.
GAUSS : enum
Applies a Gaussian kernel before computing the overview, which can lead to
better results than simple averaging in e.g case of sharp edges with high
contrast or noisy patterns.
CUBIC : enum
Applies a cubic convolution kernel.
CUBICSPLINE : enum
Applies a B-Spline convolution kernel.
LANCZOS : enum
Applies a Lanczos windowed sinc convolution kernel.
AVERAGE_MP : enum
Averages complex data in mag/phase space.
AVERAGE_MAGPHASE : enum
average_magphase
MODE : enum
Selects the value which appears most often of all the sampled points.
"""
NEAREST = "nearest"
AVERAGE = "average"
GAUSS = "gauss"
CUBIC = "cubic"
CUBICSPLINE = "cubicspline"
LANCZOS = "lanczos"
AVERAGE_MP = "average_mp"
AVERAGE_MAGPHASE = "average_magphase"
MODE = "mode"
[docs]class ImageUploadStatus(StrEnum):
"""The status of the image upload operation.
Attributes
----------
TRANSFERRING : enum
Upload has been initiated and file(s) are being transfered from
the client to the platform.
PENDING : enum
The files were transfered to the platform, and are waiting for processing to begin.
RUNNING : enum
The processing step is currently running.
SUCCESS : enum
The upload processing completed successfully and the new image is available.
FAILURE : enum
The upload failed; error information is available.
CANCELED : enum
The upload was canceled by the user prior to completion.
"""
TRANSFERRING = "transferring"
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILURE = "failure"
CANCELED = "canceled"
[docs]class ImageUploadEventType(StrEnum):
"""The type of the image upload event.
Attributes
----------
QUEUE : enum
The transfer of the file(s) was completed, and the upload processing
request has been issued.
CANCEL : enum
The user has requested that the upload be canceled. If processing is
already underway, it will continue.
RUN : enum
The processing step is starting.
COMPLETE : enum
All processing has completed. The upload status will reflect
success, failure, or cancellation.
ERROR : enum
An error has been detected, but the operation may continue or be
retried.
TIMEOUT : enum
The upload operation has timed out, and will be retried.
LOG : enum
The event contains logging output.
USAGE : enum
The event contains process resource usage information.
"""
QUEUE = "queue"
CANCEL = "cancel"
RUN = "run"
COMPLETE = "complete"
ERROR = "error"
TIMEOUT = "timeout"
LOG = "log"
USAGE = "usage"
[docs]class ImageUploadEventSeverity(StrEnum):
"""The severity of an image upload event.
The severity values duplicate the standard python logging package
level names and have the same meaning.
Attributes
----------
CRITICAL : enum
Critical (error) event.
ERROR : enum
Error event.
WARNING : enum
Warning event.
INFO : enum
Informational event.
DEBUG : enum
Debug event.
"""
CRITICAL = "CRITICAL"
ERROR = "ERROR"
WARNING = "WARNING"
INFO = "INFO"
DEBUG = "DEBUG"
[docs]class ImageUploadOptions(MappingAttribute):
"""Control of the upload process.
Attributes
----------
upload_type : str or ImageUploadType
Type of upload job, see `ImageUploadType`.
image_files : list(str)
File basenames of the uploaded files.
overviews : list(int)
Overview generation control, only used when `upload_type` is
`ImageUploadType.NDARRAY`.
overview_resampler : str or OverviewResampler
Overview resampler method, only used when `upload_type` is
`ImageUploadType.NDARRAY`.
upload_size : int
When `upload_type` is `ImageUploadType.NDARRAY`,
the total size of the array in bytes.
"""
upload_type = EnumAttribute(ImageUploadType)
image_files = ListAttribute(Attribute)
overviews = ListAttribute(Attribute)
overview_resampler = EnumAttribute(OverviewResampler)
upload_size = Attribute()
# worker_tag is for development and testing and should not be used by ordinary
# clients, and as such is not documented above.
worker_tag = Attribute()
[docs]class ImageUploadEvent(MappingAttribute):
"""Image upload event data.
During the sequence of steps in the life-cycle of an upload, events are recorded
at each change in upload status and as responsibility for the upload passes between
different subsystems (referred to here as "components"). While the `ImageUpload`
object provides the current status of the upload and the time at which that
status was reached, the events associated with an upload record the circumstances
for each of the changes in the upload status as they occurred.
A typical upload, once complete, will have four events with the following
`event_type`:
* `ImageUploadEventType.QUEUE`
* `ImageUploadEventType.RUN`
* `ImageUploadEventType.USAGE`
* `ImageUploadEventType.COMPLETE`
"""
_doc_type = "image_upload_event"
id = Attribute(readonly=True, doc="str: Unique id for the event.")
event_datetime = Timestamp(
readonly=True, doc="datetime: The time at which the event occurred."
)
component = Attribute(
readonly=True,
doc="""str: The component which generated the event.
The value of this field depends on the internal details of how images are
uploaded, but is useful to support personnel for understanding where a failure
may have occurred.
""",
)
component_id = Attribute(
readonly=True,
doc="""str: The unique identifier for the component instance which generated the event.
This identifier is useful to support personnel for tracking down any errors
which may have occurred.
""",
)
event_type = EnumAttribute(
ImageUploadEventType,
readonly=True,
doc="ImageUploadEventType: The type of the event.",
)
severity = EnumAttribute(
ImageUploadEventSeverity,
readonly=True,
doc="ImageUploadEventSeverity: The severity of the event.",
)
message = Attribute(
readonly=True, doc="str: Any message associated with the event."
)
[docs]class ImageUpload(CatalogObjectBase):
"""The status object returned when you upload an image using
:py:meth:`~descarteslabs.catalog.Image.upload` or
:py:meth:`~descarteslabs.catalog.Image.upload_ndarray`.
"""
_POLLING_INTERVALS = [1, 1, 1, 1, 1, 5, 10, 10, 30, 60]
_TERMINAL_STATES = (
ImageUploadStatus.SUCCESS,
ImageUploadStatus.FAILURE,
ImageUploadStatus.CANCELED,
)
_upload_model_classes = {ImageUploadEvent._doc_type: ImageUploadEvent}
_doc_type = "image_upload"
_url = "/uploads_v2"
INCLUDE_EVENTS = "events"
_default_includes = [INCLUDE_EVENTS]
_no_inherit = True
id = TypedAttribute(
str,
mutable=False,
serializable=False,
doc="str: Globally unique identifier for the upload.",
)
product_id = TypedAttribute(
attribute_type=str,
mutable=False,
doc="""str: Product id of the product for this imagery.
The product id for the `~descarteslabs.catalog.Product` to which this imagery
will be uploaded.
*Filterable, sortable*.
""",
)
image_id = TypedAttribute(
str,
mutable=False,
doc="""str: Image id of the image for this imagery.
The image id for the `~descarteslabs.catalog.Image` to which this imagery will
be uploaded. This is identical to `image`.id.
*Filterable*.
""",
)
image = CatalogObjectReference(
Image,
require_unsaved=True,
mutable=False,
serializable=True,
sticky=True,
doc="""~descarteslabs.catalog.Image: Image instance with all desired metadata fields.
Note that any values will override those determined from the image files
themselves.
""",
)
image_upload_options = ImageUploadOptions(
sticky=True,
mutable=False,
doc="ImageUploadOptions: Control of the upload process.",
)
user = Attribute(
readonly=True,
doc="""str: The User ID of the user requesting the upload.
*Filterable, sortable*.
""",
)
resumable_urls = ListAttribute(
Attribute,
readonly=True,
doc="""list(str): Upload URLs to which the client will transfer the file contents.
This field is for internal use by the client only.
""",
)
status = EnumAttribute(
ImageUploadStatus,
doc="""str or ImageUploadStatus: Current job status.
To retrieve the latest status, use :py:meth:`reload`.
*Filterable, sortable*.
""",
)
events = ListAttribute(
ImageUploadEvent,
readonly=True,
doc="list(ImageUploadEvent): List of events pertaining to the upload process.",
)
def _initialize(
self,
id=None,
saved=False,
relationships=None,
related_objects=None,
deleted=False,
**kwargs
):
# CatalogObjectBase only supports many to one, we need the other direction
if relationships and related_objects:
for name, relationship in relationships.items():
# we depend on our attribute name (e.g. "events") being the same as the upstream
value = []
for related in relationship["data"]:
value.append(related_objects.get((related["type"], related["id"])))
kwargs[name] = value
super(ImageUpload, self)._initialize(
id=id, saved=saved, deleted=deleted, **kwargs
)
[docs] @classmethod
def search(cls, client=None, includes=True):
"""A search query for all uploads.
Return an `Search` instance for searching image uploads.
Parameters
----------
includes : bool
Controls the inclusion of events. If True, includes these objects.
If False, no events are included. Defaults to True.
client : :class:`CatalogClient`, optional
A `CatalogClient` instance to use for requests to the Descartes Labs
catalog.
Returns
-------
:class:`~descarteslabs.catalog.search.Search`
An instance of the `Search` class
Example
-------
>>> from descarteslabs.catalog import (
... ImageUpload,
... ImageUploadStatus,
... properties as p,
... )
>>> search = ImageUpload.search().filter(p.status == ImageUploadStatus.FAILURE)
>>> for result in search: # doctest: +SKIP
... print(result) # doctest: +SKIP
"""
return Search(cls, client=client, includes=includes)
[docs] def wait_for_completion(self, timeout=None, warn_transient_errors=True):
"""Wait for the upload to complete.
Parameters
----------
timeout : int, optional
If specified, will wait up to specified number of seconds and will raise
a `concurrent.futures.TimeoutError` if the upload has not completed.
warn_transient_errors : bool, optional, default True
Any transient errors while periodically checking upload status are suppressed.
If True, those errors will be printed as warnings.
Raises
------
concurrent.futures.TimeoutError
If the specified timeout elapses and the upload has not completed.
"""
if self.status in self._TERMINAL_STATES:
return
if timeout:
timeout = time.time() + timeout
intervals = itertools.chain(
self._POLLING_INTERVALS, itertools.repeat(self._POLLING_INTERVALS[-1])
)
while True:
try:
self.reload()
except (
ServerError,
urllib3.exceptions.MaxRetryError,
requests.exceptions.RetryError,
urllib3.exceptions.TimeoutError,
) as e:
# If a reload fails, just try again on the next interval
if warn_transient_errors:
warnings.warn(
"In wait_for_completion: error fetching status for ImageUpload {!r}; "
"will retry: {}".format(self.id, e)
)
if self.status in self._TERMINAL_STATES:
return
interval = next(intervals)
if timeout:
t = timeout - time.time()
if t <= 0:
raise TimeoutError()
t = min(t, interval)
else:
t = interval
time.sleep(t)
[docs] def reload(self):
"""Reload all attributes from the Descartes Labs catalog.
Refresh the state of this upload object. The instance
state must be in the `~descarteslabs.catalog.DocumentState.SAVED` state.
If the status changes to ``ImageUploadStatus.SUCCESS`` then the `image`
instance is also reloaded so that it contains the full state of the newly
loaded image.
Raises
------
NotFoundError
If the object no longer exists.
ValueError
If the catalog object is not in the ``SAVED`` state.
DeletedObjectError
If this catalog object was deleted.
"""
oldstatus = self.status
super(ImageUpload, self).reload()
if self.status == ImageUploadStatus.SUCCESS and oldstatus != self.status:
# image is not in a saved state, so doctor it up
self.image._saved = True
self.image._clear_modified_attributes()
self.image.reload()
[docs] @check_deleted
def cancel(self):
"""Cancel the upload if it is not yet completed.
Note that if the upload process is already running, it
cannot be canceled unless a retryable error occurs.
Raises
------
NotFoundError
If the object no longer exists.
ValueError
If the catalog object is not in the ``SAVED`` state.
DeletedObjectError
If this catalog object was deleted.
ConflictError
If the upload has a current status which does not allow it to be canceled.
"""
self.status = ImageUploadStatus.CANCELED
self.save()
@classmethod
def _load_related_objects(cls, response, client):
"""
The relationships of the ImageUpload are not first-class CatalogObjects,
so we need slightly different handling here.
"""
related_objects = {}
related_objects_serialized = response.get("included")
if related_objects_serialized:
for serialized in related_objects_serialized:
model_class = cls._upload_model_classes[serialized["type"]]
if model_class:
related = model_class(
validate=False, id=serialized["id"], **serialized["attributes"]
)
related_objects[(serialized["type"], serialized["id"])] = related
return related_objects
@hybridmethod
@check_derived
def delete(cls, id, client=None):
"""You cannot delete an ImageUpload.
Raises
------
NotImplementedError
This method is not supported for ImageUploads.
"""
raise NotImplementedError("Deleting ImageUploads is not permitted")
[docs] @delete.instancemethod
@check_deleted
def delete(self):
"""You cannot delete an ImageUpload.
Raises
------
NotImplementedError
This method is not supported for ImageUploads.
"""
raise NotImplementedError("Deleting ImageUploads is not permitted")