Source code for descarteslabs.catalog.task

# Copyright 2018-2024 Descartes Labs.

import time
from concurrent.futures import TimeoutError

from strenum import StrEnum

from .catalog_base import _new_abstract_class
from .catalog_client import CatalogClient


[docs]class TaskState(StrEnum): """The state of a task. Attributes ---------- NEVERRAN : enum The operation was never invoked. RUNNING : enum The operation is in progress. SUCCEEDED : enum The operation was successfully completed. FAILED : enum The operation resulted in a failure and may not have been completed. """ NEVERRAN = "NONE" # The operation was never started RUNNING = "RUNNING" SUCCEEDED = "SUCCESS" FAILED = "FAILURE"
class TaskStatus(object): """A base class for the status of asynchronous jobs.""" _TERMINAL_STATES = [TaskState.NEVERRAN, TaskState.SUCCEEDED, TaskState.FAILED] _POLLING_INTERVAL = 60 # The following 2 attributes must be set correctly in any derived class _task_name = "task" # The name of the task as shown in __repr__() _url = "{}" # The url for getting the status of the task with the `id` passed in def __new__(cls, *args, **kwargs): return _new_abstract_class(cls, TaskStatus) def __init__( self, id=None, status=None, start_datetime=None, duration_in_seconds=None, errors=None, _client=None, **kwargs ): self.id = id self.start_datetime = start_datetime self.duration_in_seconds = duration_in_seconds self.errors = errors self._client = _client or CatalogClient.get_default_client() try: self.status = TaskState(status) except ValueError: pass def __repr__(self): status = self.status.value if self.status else "UNKNOWN" text = ["{} {} status: {}".format(self.id, self._task_name, status)] if self.start_datetime: text.append(" - started: {}".format(self.start_datetime)) if self.duration_in_seconds: text.append(" - took {:,.4f} seconds".format(self.duration_in_seconds)) if self.errors: text.append(" - {} errors reported:".format(len(self.errors))) for e in self.errors: text.append(" - {}".format(e)) return "\n".join(text) def reload(self): """Update the task information. Raises ------ ~descarteslabs.exceptions.ClientError or ~descarteslabs.exceptions.ServerError :ref:`Spurious exception <network_exceptions>` that can occur during a network request. """ r = self._client.session.get(self._url.format(self.id)) response = r.json() new_values = response["data"]["attributes"] self.status = TaskState(new_values.pop("status")) for key, value in new_values.items(): setattr(self, key, value) def wait_for_completion(self, timeout=None): """Wait for the task to complete. Parameters ---------- timeout : int, optional If specified, will wait up to specified number of seconds and will raise a :py:exc:`concurrent.futures.TimeoutError` if the task has not completed. Raises ------ :py:exc:`concurrent.futures.TimeoutError` If the specified timeout elapses and the task has not completed """ if self.status in self._TERMINAL_STATES: return if timeout: timeout = time.time() + timeout while True: self.reload() if self.status in self._TERMINAL_STATES: return if timeout: t = timeout - time.time() if t <= 0: raise TimeoutError() t = min(t, self._POLLING_INTERVAL) else: t = self._POLLING_INTERVAL time.sleep(t)