Source code for descarteslabs.compute.job

# Copyright 2018-2024 Descartes Labs.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import time
import warnings
from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Type

from strenum import StrEnum

from descarteslabs.exceptions import NotFoundError

from ..catalog import Blob, CatalogClient, DeletedObjectError, StorageType
from ..common.client import (
    Attribute,
    DatetimeAttribute,
    Document,
    DocumentState,
    ListAttribute,
    Search,
)
from .compute_client import ComputeClient
from .job_statistics import JobStatistics
from .result import Serializable

if TYPE_CHECKING:
    from .function import Function


[docs]class JobStatus(StrEnum): """The status of the Job.""" PENDING = "pending" RUNNING = "running" CANCEL = "cancel" CANCELING = "canceling" SUCCESS = "success" FAILURE = "failure" TIMEOUT = "timeout" CANCELED = "canceled"
[docs] @classmethod def terminal(cls): return [ cls.SUCCESS, cls.FAILURE, cls.TIMEOUT, cls.CANCELED, ]
[docs]class JobSearch(Search["Job"]):
[docs] def cancel(self): response = self._client.session.post( "/jobs/cancel", json=self._serialize(json_encode=False) ) return [Job(**job, client=self._client, saved=True) for job in response.json()]
[docs] def rerun(self): response = self._client.session.post( "/jobs/rerun", json=self._serialize(json_encode=False) ) return [Job(**job, client=self._client, saved=True) for job in response.json()]
[docs] def delete(self, delete_results: bool = False): json = self._serialize(json_encode=False) json["delete_results"] = delete_results response = self._client.session.post("/jobs/delete", json=json) return response.json()
[docs]class Job(Document): """A single invocation of a Function.""" id: str = Attribute( str, filterable=True, readonly=True, sortable=True, doc="The ID of the Job." ) function_id: str = Attribute( str, filterable=True, mutable=False, doc="The ID of the Function the Job belongs to.", ) creation_date: datetime = DatetimeAttribute( filterable=True, readonly=True, sortable=True, doc="The date the Job was created.", ) args: Optional[List] = Attribute(list, doc="The arguments provided to the Job.") error_reason: Optional[str] = Attribute( str, readonly=True, doc="The reason the Job failed.", ) execution_count: Optional[int] = Attribute( int, filterable=True, readonly=True, sortable=True, doc="The number of attempts made to execute this job.", ) exit_code: Optional[int] = Attribute( int, filterable=True, readonly=True, sortable=True, doc="The exit code of the Job.", ) kwargs: Optional[Dict[str, Any]] = Attribute( dict, doc="The parameters provided to the Job." ) environment: Optional[Dict[str, str]] = Attribute( dict, doc="The environment variables provided to the Job." ) last_completion_date: Optional[datetime] = DatetimeAttribute( filterable=True, readonly=True, sortable=True, doc="The date the Job was last completed or canceled.", ) last_execution_date: Optional[datetime] = DatetimeAttribute( filterable=True, readonly=True, sortable=True, doc="The date the Job was last executed.", ) runtime: Optional[int] = Attribute( int, filterable=True, readonly=True, sortable=True, doc="The time it took the Job to complete.", ) status: JobStatus = Attribute( JobStatus, filterable=True, readonly=True, sortable=True, doc="""The current status of the Job. The status may occasionally need to be refreshed by calling :py:meth:`Job.refresh` """, ) statistics: Optional[JobStatistics] = Attribute( JobStatistics, readonly=True, doc="""The runtime utilization statistics for the Job. The statistics include the cpu, memory, and network usage of the Job. """, ) tags: List[str] = ListAttribute( str, filterable=True, doc="A list of tags associated with the Job.", ) # Lazy attributes provisioning_time: Optional[int] = Attribute( int, readonly=True, doc=( "The time it took to provision the Job. This attribute will only be available " "if include='timings' is specified in the request by setting params.", ), ) pull_time: Optional[int] = Attribute( int, readonly=True, doc=( "The time it took to load the user code in the Job. This attribute will only" " be available if include='timings' is specified in the request by setting params.", ), ) def __init__( self, function_id: str, args: Optional[List] = None, kwargs: Optional[Dict] = None, client: ComputeClient = None, environment: Optional[Dict[str, str]] = None, **extra, ): """ Parameters ---------- function_id : str The id of the Function. A function must first be created to create a job. args : List, optional A list of positional arguments to pass to the function. kwargs : Dict, optional A dictionary of named arguments to pass to the function. environment : Dict[str, str], optional Environment variables to be set in the environment of the running Job. Will be merged with environment variables set on the Function, with the Job environment variables taking precedence. client: ComputeClient, optional The compute client to use for requests. If not set, the default client will be used. """ self._client = client or ComputeClient.get_default_client() super().__init__( function_id=function_id, args=args, kwargs=kwargs, environment=environment, **extra, ) # support use of jobs in sets def __hash__(self): return hash(self.id) # support use of jobs in sets def __eq__(self, other): if not isinstance(other, Job): return False return self.id == other.id def _get_result_namespace(self) -> str: """Returns the namespace for the Job result blob.""" namespace = self._client.get_namespace(self.function_id) if not namespace: # Fetching the function from the server will set the namespace # during hydration in Function.__init__ namespace = self.function.namespace return namespace @property def function(self) -> "Function": """Returns the Function the Job belongs to.""" from .function import Function return Function.get(self.function_id)
[docs] @classmethod def get(cls, id, client: ComputeClient = None, **params) -> "Job": """Retrieves the Job by id. Parameters ---------- id : str The id of the Job to fetch. client: ComputeClient, optional If set, the result will be retrieved using the configured client. Otherwise, the default client will be used. include : List[str], optional List of additional attributes to include in the response. Allowed values are: - "timings": Include additional debugging timing information about the Job. Example ------- >>> from descarteslabs.compute import Job >>> job = Job.get(<job-id>) Job <job-id>: pending """ client = client or ComputeClient.get_default_client() response = client.session.get(f"/jobs/{id}", params=params) return cls(**response.json(), client=client, saved=True)
[docs] @classmethod def list( cls, page_size: int = 100, client: ComputeClient = None, **params ) -> JobSearch: """Retrieves an iterable of all jobs matching the given parameters. If you would like to filter Jobs, use :py:meth:`Job.search`. Parameters ---------- page_size : int, default=100 Maximum number of results per page. client: ComputeClient, optional If set, the result will be retrieved using the configured client. Otherwise, the default client will be used. Example ------- >>> from descarteslabs.compute import Job >>> fn = Job.list(<function_id>) [Job <job-id1>: pending, Job <job-id2>: pending, Job <job-id3>: pending] """ params = {"page_size": page_size, **params} search = Job.search(client=client).param(**params) # Deprecation: remove this in a future release if "function_id" in params or "status" in params: examples = [] if "function_id" in params: examples.append(f"Job.function_id == '{params['function_id']}'") if "status" in params: if not isinstance(params["status"], list): params["status"] = [params["status"]] examples.append(f"Job.status.in_({params['status']})") warnings.warn( "The `function_id` parameter is deprecated. " "Use `Job.search().filter({})` instead.".format(" & ".join(examples)) ) return search
[docs] def cancel(self): """Cancels the Job. If the Job is already canceled or completed, this will do nothing. If the Job is still pending, it will be canceled immediately. If the job is running, it will be canceled as soon as possible. However, it may complete before the cancel request is processed. """ if self.state != DocumentState.SAVED: raise ValueError("Cannot cancel a Job that has not been saved") response = self._client.session.post(f"/jobs/{self.id}/cancel") self._load_from_remote(response.json())
[docs] def delete(self, delete_result: bool = False): """Deletes the Job. Also deletes any job log blob for the job. Use `delete_result=True` to delete the job result blob as well. Parameters ---------- delete_result : bool, False If set, the result of the job will also be deleted. """ if self.state == DocumentState.NEW: raise ValueError("Cannot delete a Job that has not been saved") self._client.session.delete(f"/jobs/{self.id}?delete_results={delete_result}") self._deleted = True
[docs] def refresh(self, client: ComputeClient = None) -> None: """Update the Job instance with the latest information from the server. Parameters ---------- client: ComputeClient, optional If set, the result will be retrieved using the configured client. Otherwise, the default client will be used. """ if self.pull_time or self.provisioning_time: params = {"include": ["timings"]} else: params = {} response = self._client.session.get(f"/jobs/{self.id}", params=params) self._load_from_remote(response.json())
[docs] def result( self, cast_type: Optional[Type[Serializable]] = None, catalog_client: CatalogClient = None, ): """Retrieves the result of the Job. Parameters ---------- cast_type: Type[Serializable], None If set, the result will be deserialized to the given type. catalog_client: CatalogClient, None If set, the result will be retrieved using the configured catalog client. Otherwise, the default catalog client will be used. Raises ------ ValueError When job has not completed successfully or when `cast_type` does not implement Serializable. """ if self.status != JobStatus.SUCCESS: # just check if maybe it is meanwhile done. self.refresh() if self.status != JobStatus.SUCCESS: if self.status in JobStatus.terminal(): raise ValueError( f"Job {self.id} has not completed successfully, status is {self.status}" ) else: raise ValueError( f"Job {self.id} has not completed, status is {self.status}. " "Please wait for the job to complete." ) if not catalog_client: catalog_client = self._client.catalog_client try: namespace = self._get_result_namespace() result = Blob.get_data( id=f"{StorageType.COMPUTE}/{namespace}/{self.function_id}/{self.id}", client=catalog_client, ) except NotFoundError: return None except ValueError: raise except DeletedObjectError: raise if not result: return None if cast_type: deserialize = getattr(cast_type, "deserialize", None) if deserialize and callable(deserialize): return deserialize(result) else: raise ValueError(f"Type {cast_type} must implement Serializable.") try: return json.loads(result) except Exception: return result
[docs] def result_blob( self, catalog_client: CatalogClient = None, ): """Retrieves the Catalog Blob holding the result of the Job. If there is no result Blob, `None` will be returned. Parameters ---------- catalog_client: CatalogClient, None If set, the result will be retrieved using the configured client. Otherwise, the default client will be used. Raises ------ ValueError When job has not completed successfully or when `cast_type` does not implement Serializable. """ if self.status != JobStatus.SUCCESS: # just check if maybe it is meanwhile done. self.refresh() if self.status != JobStatus.SUCCESS: if self.status in JobStatus.terminal(): raise ValueError( f"Job {self.id} has not completed successfully, status is {self.status}" ) else: raise ValueError( f"Job {self.id} has not completed, status is {self.status}. " "Please wait for the job to complete." ) if not catalog_client: catalog_client = self._client.catalog_client return Blob.get( name=f"{self.function_id}/{self.id}", namespace=self._get_result_namespace(), storage_type=StorageType.COMPUTE, client=catalog_client, )
[docs] @classmethod def search(cls, client: ComputeClient = None) -> JobSearch: """Creates a search for Jobs. The search is lazy and will be executed when the search is iterated over or :py:meth:`Search.collect` is called. Parameters ---------- client: ComputeClient, optional If set, the result will be retrieved using the configured client. Otherwise, the default client will be used. Example ------- >>> from descarteslabs.compute import Job, JobStatus >>> jobs: List[Job] = Job.search().filter(Job.status == JobStatus.SUCCESS).collect() Collection([Job <job-id1>: success, <job-id2>: success]) """ client = client or ComputeClient.get_default_client() return JobSearch(Job, client, url="/jobs")
[docs] def wait_for_completion(self, timeout=None, interval=10): """Waits until the Job is completed. Parameters ---------- timeout : int, default=None Maximum time to wait before timing out. If not set, the call will block until job completion. interval : int, default=10 Interval in seconds for how often to check if jobs have been completed. """ start_time = time.time() while True: self.refresh() if self.status in JobStatus.terminal(): break if timeout: t = timeout - (time.time() - start_time) if t <= 0: raise TimeoutError( f"Job {self.id} did not complete before timeout!" ) t = min(t, interval) else: t = interval time.sleep(t)
[docs] def save(self): """Creates the Job if it does not already exist. If the job already exists, it will be updated on the server if modifications were made to the Job instance. """ if self.state == DocumentState.SAVED: return if self.state == DocumentState.MODIFIED: response = self._client.session.patch( f"/jobs/{self.id}", json=self.to_dict(only_modified=True) ) elif self.state == DocumentState.NEW: response = self._client.session.post( "/jobs", json=self.to_dict(exclude_none=True) ) else: raise ValueError( f"Unexpected Job state {self.state}." f'Reload the job from the server: Job.get("{self.id}")' ) self._load_from_remote(response.json())
[docs] def iter_log(self, timestamps: bool = True): """Retrieves the log for the job, returning an iterator over the lines. Parameters ---------- timestamps : bool, True If set, log timestamps will be included and converted to the users system timezone from UTC. You may consider disabling this if you use a structured logger. """ return self._client.iter_log_lines( f"/jobs/{self.id}/log", timestamps=timestamps )
[docs] def log(self, timestamps: bool = True): """Retrieves the log for the job, returning a string. As logs can potentially be unbounded, consider using :py:meth:`Job.iter_log`. Parameters ---------- timestamps : bool, True If set, log timestamps will be included and converted to the users system timezone from UTC. You may consider disabling this if you use a structured logger. """ return "\n".join(self.iter_log(timestamps=timestamps))