# Copyright 2018-2024 Descartes Labs.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import io
from strenum import StrEnum
from descarteslabs.exceptions import NotFoundError
from ..client.services.service import ThirdPartyService
from ..common.collection import Collection
from ..common.property_filtering import Properties
from .attributes import (
DocumentState,
EnumAttribute,
GeometryAttribute,
StorageState,
Timestamp,
TypedAttribute,
parse_iso_datetime,
)
from .blob_download import BlobDownload
from .catalog_base import (
AuthCatalogObject,
CatalogClient,
check_deleted,
check_derived,
hybridmethod,
UnsavedObjectError,
)
from .search import AggregateDateField, GeoSearch, SummarySearchMixin
from .task import TaskStatus
properties = Properties()
class StorageType(StrEnum):
"""The storage type for a blob.
Attributes
----------
COMPUTE : enum
Compute service job results.
DATA : enum
Arbitrary user-managed data. This type may be uploaded by users.
DYNCOMP : enum
Saved Dynamic Compute objects. This type may be uploaded by users.
LOGS : enum
Compute service job log output (text files).
"""
COMPUTE = "compute"
DATA = "data"
DYNCOMP = "dyncomp"
LOGS = "logs"
[docs]class BlobSummaryResult(object):
"""
The readonly data returned by :py:meth:`SummaySearch.summary` or
:py:meth:`SummaySearch.summary_interval`.
Attributes
----------
count : int
Number of blobs in the summary.
bytes : int
Total number of bytes of data across all blobs in the summary.
namespaces : list(str)
List of namespace IDs for the blobs included in the summary.
interval_start: datetime
For interval summaries only, a datetime representing the start of the interval period.
"""
def __init__(
self, count=None, bytes=None, namespaces=None, interval_start=None, **kwargs
):
self.count = count
self.bytes = bytes
self.namespaces = namespaces
self.interval_start = (
parse_iso_datetime(interval_start) if interval_start else None
)
def __repr__(self):
text = [
"\nSummary for {} blobs:".format(self.count),
" - Total bytes: {:,}".format(self.bytes),
]
if self.namespaces:
text.append(" - Namespaces: {}".format(", ".join(self.namespaces)))
if self.interval_start:
text.append(" - Interval start: {}".format(self.interval_start))
return "\n".join(text)
class BlobSearch(SummarySearchMixin, GeoSearch):
# Be aware that the `|` characters below add whitespace. The first one is needed
# avoid the `Inheritance` section from appearing before the auto summary.
"""A search request that iterates over its search results for blobs.
The `BlobSearch` is identical to `Search` but with a couple of summary methods:
:py:meth:`summary` and :py:meth:`summary_interval`.
"""
SummaryResult = BlobSummaryResult
DEFAULT_AGGREGATE_DATE_FIELD = AggregateDateField.CREATED
[docs]class Blob(AuthCatalogObject):
"""A stored blob (arbitrary bytes) that can be searched and retrieved.
Instantiating a blob indicates that you want to create a *new* Descartes Labs
storage blob. If you instead want to retrieve an existing blob use
`Blob.get() <descarteslabs.catalog.Blob.get>`.
You can also use `Blob.search() <descarteslabs.catalog.Blob.search>`.
Also see the example for :py:meth:`~descarteslabs.catalog.Blob.upload`.
Parameters
----------
client : CatalogClient, optional
A `CatalogClient` instance to use for requests to the Descartes Labs catalog.
The :py:meth:`~descarteslabs.catalog.CatalogClient.get_default_client` will
be used if not set.
kwargs : dict
With the exception of readonly attributes (`created`, `modified`) and with
the exception of properties (`ATTRIBUTES`, `is_modified`, and `state`), any
attribute listed below can also be used as a keyword argument. Also see
`~Blob.ATTRIBUTES`.
"""
_doc_type = "storage"
_url = "/storage"
# _collection_type set below due to circular problems
_url_client = ThirdPartyService()
# Blob Attributes
namespace = TypedAttribute(
str,
doc="""str: The namespace of this blob.
All blobs are stored and indexed under a namespace. Namespaces are allowed
a restricted alphabet (``a-zA-Z0-9:._-``), and must begin with the user's
org name, or their unique user hash if the user has no org. The required
prefix is seperated from the rest of the namespace name (if any) by a ``:``.
If not provided, the namespace will default to the users org (if any) and
the unique user hash. The combined length of the ``namespace`` and the
``name`` cannot exceed 979 bytes.
*Searchable, sortable*.
""",
)
name = TypedAttribute(
str,
doc="""str: The name of this blob.
All blobs are stored and indexed by name. Names are allowed
a restricted alphabet (``a-zA-Z0-9:._/-``), but may not begin or end with a
``/``. The combined length of the ``namespace`` and the ``name`` cannot exceed
979 bytes.
The ``/`` is intended to be used like a directory in a pathname to allow for
prefix search operations, but otherwise has no special meaning.
*Searchable, sortable*.
""",
)
storage_state = EnumAttribute(
StorageState,
doc="""str or StorageState: Storage state of the blob.
The state is `~StorageState.AVAILABLE` if the data is available and can be
retrieved, `~StorageState.REMOTE` if the data is not currently available.
*Filterable, sortable*.
""",
)
storage_type = EnumAttribute(
StorageType,
doc="""str or StorageType: Storage type of the blob.
`~StorageType.DATA` is managed by end users (e.g. via
:py:meth:`descarteslabs.catalog.Blob.upload`.
Other types are generated and managed by various components of the platform.
*Filterable, sortable*.
""",
)
description = TypedAttribute(
str,
doc="""str, optional: A description with further details on this blob.
The description can be up to 80,000 characters and is used by
:py:meth:`Search.find_text`.
*Searchable*
""",
)
geometry = GeometryAttribute(
doc="""str or shapely.geometry.base.BaseGeometry, optional: Geometry representing the location for the blob.
*Filterable*
(use :py:meth:`BlobSearch.intersects
<descarteslabs.catalog.BlobSearch.intersects>` to search based on geometry)
"""
)
expires = Timestamp(
doc="""str or datetime, optional: Timestamp when the blob should be expired and deleted.
*Filterable, sortable*.
"""
)
href = TypedAttribute(
str,
doc="""str, optional: Storage location for the blob.
This attribute may not be set by the end user.
""",
)
size_bytes = TypedAttribute(
int,
doc="""int, optional: Size of the blob in bytes.
*Filterable, sortable*.
""",
)
hash = TypedAttribute(
str, doc="""str, optional: Content hash (MD5) for the blob."""
)
[docs] @classmethod
def namespace_id(cls, namespace_id, client=None):
"""Generate a fully namespaced id.
Parameters
----------
namespace_id : str or None
The unprefixed part of the id that you want prefixed.
client : CatalogClient, optional
A `CatalogClient` instance to use for requests to the Descartes Labs
catalog. The
:py:meth:`~descarteslabs.catalog.CatalogClient.get_default_client` will
be used if not set.
Returns
-------
str
The fully namespaced id.
Example
-------
>>> namespace = Blob.namespace_id("myproject") # doctest: +SKIP
'myorg:myproject' # doctest: +SKIP
"""
if client is None:
client = CatalogClient.get_default_client()
org = client.auth.payload.get("org")
namespace = client.auth.namespace
if not namespace_id:
if org:
return f"{org}:{namespace}"
else:
return namespace
elif org:
if namespace_id == org or namespace_id.startswith(org + ":"):
return namespace_id
else:
return f"{org}:{namespace_id}"
elif namespace_id == namespace or namespace_id.startswith(namespace + ":"):
return namespace_id
else:
return f"{namespace}:{namespace_id}"
[docs] @classmethod
def get(
cls,
id=None,
storage_type=StorageType.DATA,
namespace=None,
name=None,
client=None,
request_params=None,
headers=None,
):
"""Get an existing Blob from the Descartes Labs catalog.
If the Blob is found, it will be returned in the
`~descarteslabs.catalog.DocumentState.SAVED` state. Subsequent changes will
put the instance in the `~descarteslabs.catalog.DocumentState.MODIFIED` state,
and you can use :py:meth:`save` to commit those changes and update the Descartes
Labs catalog object. Also see the example for :py:meth:`save`.
Exactly one of the ``id`` and ``name`` parameters must be specified. If ``name``
is specified, it is used together with the ``storage_type`` and ``namespace``
parameters to form the corresponding ``id``.
Parameters
----------
id : str, optional
The id of the object you are requesting. Required unless ``name`` is supplied.
May not be specified if ``name`` is specified.
storage_type : StorageType, optional
The storage type of the Blob you wish to retrieve. Defaults to ``data``. Ignored
unless ``name`` is specified.
namespace : str, optional
The namespace of the Blob you wish to retrieve. Defaults to the user's org name
(if any) plus the unique user hash. Ignored unless ``name`` is specified.
name : str, optional
The name of the Blob you wish to retrieve. Required if ``id`` is not specified.
May not be specified if ``id`` is specified.
client : CatalogClient, optional
A `CatalogClient` instance to use for requests to the Descartes Labs
catalog. The
:py:meth:`~descarteslabs.catalog.CatalogClient.get_default_client` will
be used if not set.
Returns
-------
:py:class:`~descarteslabs.catalog.CatalogObject` or None
The object you requested, or ``None`` if an object with the given `id`
does not exist in the Descartes Labs catalog.
Raises
------
~descarteslabs.exceptions.ClientError or ~descarteslabs.exceptions.ServerError
:ref:`Spurious exception <network_exceptions>` that can occur during a
network request.
"""
if (not id and not name) or (id and name):
raise TypeError("Must specify exactly one of id or name parameters")
if not id:
id = f"{storage_type}/{Blob.namespace_id(namespace)}/{name}"
return super(cls, Blob).get(
id, client=client, request_params=request_params, headers=headers
)
[docs] @classmethod
def get_or_create(
cls,
id=None,
storage_type=StorageType.DATA,
namespace=None,
name=None,
client=None,
**kwargs,
):
"""Get an existing object from the Descartes Labs catalog or create a new object.
If the Descartes Labs catalog object is found, and the remainder of the
arguments do not differ from the values in the retrieved instance, it will be
returned in the `~descarteslabs.catalog.DocumentState.SAVED` state.
If the Descartes Labs catalog object is found, and the remainder of the
arguments update one or more values in the instance, it will be returned in
the `~descarteslabs.catalog.DocumentState.MODIFIED` state.
If the Descartes Labs catalog object is not found, it will be created and the
state will be `~descarteslabs.catalog.DocumentState.UNSAVED`. Also see the
example for :py:meth:`save`.
Parameters
----------
id : str, optional
The id of the object you are requesting. Required unless ``name`` is supplied.
May not be specified if ``name`` is specified.
storage_type : StorageType, optional
The storage type of the Blob you wish to retrieve. Defaults to ``data``. Ignored
unless ``name`` is specified.
namespace : str, optional
The namespace of the Blob you wish to retrieve. Defaults to the user's org name
(if any) plus the unique user hash. Ignored unless ``name`` is specified.
name : str, optional
The name of the Blob you wish to retrieve. Required if ``id`` is not specified.
May not be specified if ``id`` is specified.
client : CatalogClient, optional
A `CatalogClient` instance to use for requests to the Descartes Labs
catalog. The
:py:meth:`~descarteslabs.catalog.CatalogClient.get_default_client` will
be used if not set.
kwargs : dict, optional
With the exception of readonly attributes (`created`, `modified`), any
attribute of a catalog object can be set as a keyword argument (Also see
`ATTRIBUTES`).
Returns
-------
:py:class:`~descarteslabs.catalog.CatalogObject`
The requested catalog object that was retrieved or created.
"""
if (not id and not name) or (id and name):
raise TypeError("Must specify exactly one of id or name parameters")
if not id:
namespace = cls.namespace_id(namespace)
id = f"{storage_type}/{namespace}/{name}"
kwargs["storage_type"] = storage_type
kwargs["namespace"] = namespace
kwargs["name"] = name
return super(cls, Blob).get_or_create(id, client=client, **kwargs)
[docs] @classmethod
def search(cls, client=None, request_params=None, headers=None):
"""A search query for all blobs.
Return an `~descarteslabs.catalog.BlobSearch` instance for searching
blobs in the Descartes Labs catalog. This instance extends the
:py:class:`~descarteslabs.catalog.Search` class with the
:py:meth:`~descarteslabs.catalog.BlobSearch.summary` and
:py:meth:`~descarteslabs.catalog.BlobSearch.summary_interval` methods
which return summary statistics about the blobs that match the search query.
Parameters
----------
client : :class:`CatalogClient`, optional
A `CatalogClient` instance to use for requests to the Descartes Labs
catalog.
Returns
-------
:class:`~descarteslabs.catalog.BlobSearch`
An instance of the `~descarteslabs.catalog.BlobSearch` class
Example
-------
>>> from descarteslabs.catalog import Blob
>>> search = Blob.search().limit(10)
>>> for result in search: # doctest: +SKIP
... print(result.name) # doctest: +SKIP
"""
return BlobSearch(
cls, client=client, request_params=request_params, headers=headers
)
[docs] @check_deleted
def upload(self, file):
"""Uploads storage blob from a file.
Uploads data from a file and creates the Blob.
The Blob must be in the state `~descarteslabs.catalog.DocumentState.UNSAVED`.
The `storage_state`, `storage_type`, `namespace`, and the `name` attributes,
must all be set. If either the `size_bytes` and the `hash` attributes are set,
they must agree with the actual file to be uploaded, and will be validated
during the upload process.
On return, the Blob object will be updated to reflect the full state of the
new blob.
Parameters
----------
file : str or io.IOBase
File or files to be uploaded. Can be string with path to the file in the
local filesystem, or a file-like object (``io.IOBase``). If a file like
object and already open, must be binary mode and readable. Open file-like
objects remain open on return and must be closed by the caller.
Returns
-------
Blob
The uploaded instance.
Raises
------
ValueError
If any improper arguments are supplied.
DeletedObjectError
If this blob was deleted.
"""
self.namespace = self.__class__.namespace_id(self.namespace)
if not self.name:
raise ValueError("name field required")
if not self.storage_state:
self.storage_state = StorageState.AVAILABLE
if not self.storage_type:
self.storage_type = StorageType.DATA
if self.state != DocumentState.UNSAVED:
raise ValueError(
"Blob {} has been saved. Please use an unsaved blob for uploading".format(
self.id
)
)
if isinstance(file, str):
file = io.open(file, "rb")
close = True
elif isinstance(file, io.IOBase):
close = file.closed
if close:
file = io.open(file.name, "rb")
elif not file.readable() or "b" not in file.mode:
raise ValueError("Invalid file is open but not readable or binary mode")
else:
raise ValueError("Invalid file value: must be string or IOBase")
try:
return self._do_upload(file)
finally:
if close:
file.close()
[docs] @check_deleted
def upload_data(self, data):
"""Uploads storage blob from a bytes or str.
Uploads data from a string or bytes and creates the Blob.
The Blob must be in the state `~descarteslabs.catalog.DocumentState.UNSAVED`.
The `storage_state`, `storage_type`, `namespace`, and the `name` attributes,
must all be set. If either the `size_bytes` and the `hash` attributes are set,
they must agree with the actual data to be uploaded, and will be validated
during the upload process.
On return, the Blob object will be updated to reflect the full state of the
new blob.
Parameters
----------
data : str or bytes
Data to be uploaded. A str will be default encoded to bytes.
Returns
-------
Blob
The uploaded instance.
Raises
------
ValueError
If any improper arguments are supplied.
DeletedObjectError
If this blob was deleted.
"""
self.namespace = self.__class__.namespace_id(self.namespace)
if not self.name:
raise ValueError("name field required")
if not self.storage_state:
self.storage_state = StorageState.AVAILABLE
if not self.storage_type:
self.storage_type = StorageType.DATA
if self.state != DocumentState.UNSAVED:
raise ValueError(
"Blob {} has been saved. Please use an unsaved blob for uploading".format(
self.id
)
)
if isinstance(data, str):
data = data.encode()
elif not isinstance(data, bytes):
raise ValueError("Invalid data value: must be string or bytes")
return self._do_upload(data)
# the upload implementation is broken out so it can be used from multiple methods
def _do_upload(self, src):
# import here for circular dependency
from .blob_upload import BlobUpload
# Request an upload url
upload = BlobUpload(client=self._client, storage=self)
upload.save()
headers = {}
headers["content-type"] = "application/octet-stream"
if upload.storage.size_bytes:
headers["content-length"] = str(upload.storage.size_bytes)
# This should work but it doesn't. The header must be the base64
# encoding of the 16 binary MD5 checksum bytes. But the value
# that is is checked against by S3 is the hex-ified version of the
# 16 binary bytes. So even though they mean the same thing,
# they miscompare at S3 and the file upload fails.
# if upload.storage.hash:
# headers["content-md5"] = upload.storage.hash
# do the upload
self._url_client.session.put(upload.resumable_url, data=src, headers=headers)
# save the blob
upload.storage.save(request_params={"upload_signature": upload.signature})
# replenish our state, like reload but no need to go to server.
# this will effectively wipe all current state & caching.
self._initialize(
saved=True,
**upload.storage._attributes,
)
return self
[docs] @check_deleted
def download(self, file, range=None):
"""Downloads storage blob to a file.
Downloads data from the blob to a file.
The Blob must be in the state `~descarteslabs.catalog.DocumentState.SAVED`.
Parameters
----------
file : str or io.IOBase
Where to write the downloaded blob. Can be string with path to the file in the
local filesystem, or an file opened for writing (``io.IOBase``). If a file like
object and already open, must be binary mode and writable. Open file-like
objects remain open on return and must be closed by the caller.
range : str or list, optional
Range(s) of blob to be downloaded. Can either be a string in the standard
HTTP Range header format (e.g. "bytes=0-99"), or a list or tuple containing
one or two integers (e.g. ``(0, 99)``), or a list or tuple of the same
(e.g. ``((0, 99), (200-299))``). A list or tuple of one integer implies
no upper bound; in this case the integer can be negative, indicating the
count back from the end of the blob.
Returns
-------
str
The name of the downloaded file.
Raises
------
ValueError
If any improper arguments are supplied.
DeletedObjectError
If this blob was deleted.
"""
if self.state != DocumentState.SAVED:
raise ValueError("Blob {} has not been saved".format(self.id))
if isinstance(file, str):
file = io.open(file, "wb")
elif isinstance(file, io.IOBase):
close = file.closed
if close:
file = io.open(file.name, "wb")
elif not file.writable() or "b" not in file.mode:
raise ValueError("Invalid file is open but not writable or binary mode")
else:
raise ValueError("Invalid file value: must be string or IOBase")
return self._do_download(dest=file, range=range)
[docs] @check_deleted
def data(self, range=None):
"""Downloads storage blob data.
Downloads data from the blob and returns as a bytes object.
The Blob must be in the state `~descarteslabs.catalog.DocumentState.SAVED`.
Parameters
----------
range : str or list, optional
Range(s) of blob to be downloaded. Can either be a string in the standard
HTTP Range header format (e.g. "bytes=0-99"), or a list or tuple containing
one or two integers (e.g. ``(0, 99)``), or a list or tuple of the same
(e.g. ``((0, 99), (200-299))``). A list or tuple of one integer implies
no upper bound; in this case the integer can be negative, indicating the
count back from the end of the blob.
Returns
-------
bytes
The data retrieved from the Blob.
Raises
------
ValueError
If any improper arguments are supplied.
DeletedObjectError
If this blob was deleted.
"""
if self.state != DocumentState.SAVED:
raise ValueError("Blob {} has not been saved".format(self.id))
return self._do_download(range=range)
[docs] @check_deleted
def iter_data(self, chunk_size=None, range=None):
"""Downloads storage blob data.
Downloads data from the blob and returns as an iterator (generator)
which will yield the data (as a bytes) in chunks. This enables the
processing of very large files.
The Blob must be in the state `~descarteslabs.catalog.DocumentState.SAVED`.
Parameters
----------
chunk_size : int, optional
Size of chunks over which to iterate. Default is whatever size chunks
are received.
range : str or list, optional
Range(s) of blob to be downloaded. Can either be a string in the standard
HTTP Range header format (e.g. "bytes=0-99"), or a list or tuple containing
one or two integers (e.g. ``(0, 99)``), or a list or tuple of the same
(e.g. ``((0, 99), (200-299))``). A list or tuple of one integer implies
no upper bound; in this case the integer can be negative, indicating the
count back from the end of the blob.
Returns
-------
generator
An iterator over the blob data.
Raises
------
ValueError
If any improper arguments are supplied.
DeletedObjectError
If this blob was deleted.
"""
if self.state != DocumentState.SAVED:
raise ValueError("Blob {} has not been saved".format(self.id))
def generator(response):
try:
yield from response.iter_content(chunk_size)
finally:
response.close()
return self._do_download(dest=generator, range=range)
[docs] @check_deleted
def iter_lines(self, decode_unicode=False, delimiter=None):
"""Downloads storage blob data.
Downloads data from the blob and returns as an iterator (generator)
which will yield the data as text lines. This enables the
processing of very large files.
The Blob must be in the state `~descarteslabs.catalog.DocumentState.SAVED`.
The data within the blob must represent encoded text.
.. note:: This method is not reentrant safe.
Parameters
----------
decode_unicode : bool, optional
If true, then decode unicode in the incoming data and return
strings. Default is to return bytes.
delimiter : str or byte, optional
Delimiter for lines. Type depends on setting of `decode_unicode`.
Default is to use default line break sequence.
Returns
-------
generator
An iterator over the blob byte or text lines, depending on
value of `decode_unicode`.
Raises
------
ValueError
If any improper arguments are supplied.
DeletedObjectError
If this blob was deleted.
"""
if self.state != DocumentState.SAVED:
raise ValueError("Blob {} has not been saved".format(self.id))
def generator(response):
if decode_unicode:
# response will always claim to be application/octet-stream
response.encoding = "utf-8"
try:
yield from response.iter_lines(
decode_unicode=decode_unicode, delimiter=delimiter
)
finally:
response.close()
return self._do_download(dest=generator)
[docs] @classmethod
def get_data(
cls,
id=None,
storage_type=StorageType.DATA,
namespace=None,
name=None,
client=None,
range=None,
stream=False,
chunk_size=None,
):
"""Downloads storage blob data.
Downloads data for a given blob id and returns as a bytes object.
Parameters
----------
id : str, optional
The id of the object you are requesting. Required unless ``name`` is supplied.
May not be specified if ``name`` is specified.
storage_type : StorageType, optional
The storage type of the Blob you wish to retrieve. Defaults to ``data``. Ignored
unless ``name`` is specified.
namespace : str, optional
The namespace of the Blob you wish to retrieve. Defaults to the user's org name
(if any) plus the unique user hash. Ignored unless ``name`` is specified.
name : str, optional
The name of the Blob you wish to retrieve. Required if ``id`` is not specified.
May not be specified if ``id`` is specified.
client : Client, optional
Client instance. If not given, the default client will be used.
range : str or list, optional
Range(s) of blob to be downloaded. Can either be a string in the standard
HTTP Range header format (e.g. "bytes=0-99"), or a list or tuple containing
one or two integers (e.g. ``(0, 99)``), or a list or tuple of the same
(e.g. ``((0, 99), (200-299))``). A list or tuple of one integer implies
no upper bound; in this case the integer can be negative, indicating the
count back from the end of the blob.
stream : bool, optional
If True, return a generator that will yield the data in chunks. Defaults to False.
chunk_size : int, optional
If stream is True, the size of chunks over which to stream. Default is whatever
chunks are received on the wire.
Returns
-------
bytes or generator
The data retrieved from the Blob. If stream is True, returned as an iterator
(generator) which will yeild the data in chunks.
Raises
------
ValueError
If any improper arguments are supplied.
NotFoundError
If the Blob does not exist.
DeletedObjectError
If this blob was deleted.
"""
if (not id and not name) or (id and name):
raise TypeError("Must specify exactly one of id or name parameters")
if not id:
id = f"{storage_type}/{cls.namespace_id(namespace)}/{name}"
dest = None
if stream:
def generator(response):
try:
yield from response.iter_content(chunk_size)
finally:
response.close()
dest = generator
return cls(id=id, client=client)._do_download(dest=dest, range=range)
[docs] @classmethod
def delete_many(
cls, ids, raise_on_missing=False, wait_for_completion=False, client=None
):
"""Delete many blobs from the Descartes Labs catalog.
Only those blobs that exist and are owned by the user will be deleted.
No errors will be raised for blobs that do not exist or are visible but
not owned by the user. If you need to know, compare the supplied list of
ids with the returned list of deleted ids.
All blobs to be deleted must belong to the same purchase.
Parameters
----------
ids : list(str)
A list of blob ids to delete.
raise_on_missing : bool, optional
If True, raise an exception if any of the blobs are not found, otherwise ignore
missing blobs. Defaults to False.
wait_for_completion : bool, optional
If True, wait for the deletion to complete before returning. Defaults to False.
client : CatalogClient, optional
A `CatalogClient` instance to use for requests to the Descartes Labs catalog.
The :py:meth:`~descarteslabs.catalog.CatalogClient.get_default_client` will
be used if not set.
Returns
-------
list(str)
A list of the ids of the blobs that were successfully deleted.
Raises
------
~descarteslabs.exceptions.ClientError or ~descarteslabs.exceptions.ServerError
:ref:`Spurious exception <network_exceptions>` that can occur during a
network request.
"""
if client is None:
client = CatalogClient.get_default_client()
task_status = BlobDeletionTaskStatus.create(
ids=ids, raise_on_missing=raise_on_missing, client=client
)
if wait_for_completion:
task_status.wait_for_completion()
return task_status.ids
def _do_download(self, dest=None, range=None):
download = BlobDownload.get(id=self.id, client=self._client)
# BlobDownload.get() returns None if the blob does not exist
# raise a NotFoundError in this case
if not download:
raise NotFoundError("Blob {} does not exist".format(self.id))
headers = {}
if self.hash:
headers["if-match"] = self.hash
if range:
if isinstance(range, str):
range_str = range
elif isinstance(range, (list, tuple)) and all(
map(lambda x: isinstance(x, int), range)
):
if len(range) == 1:
range_str = f"bytes={range[0]}"
elif len(range) == 2:
range_str = f"bytes={range[0]}-{range[1]}"
else:
raise ValueError("invalid range value")
else:
raise ValueError("invalid range value")
headers["range"] = range_str
r = self._url_client.session.get(
download.resumable_url, headers=headers, stream=True
)
r.raise_for_status()
if callable(dest):
# generator will close response
return dest(r)
else:
try:
if dest is None:
return r.raw.read()
else:
for chunk in r.iter_content(1048576):
dest.write(chunk)
return dest.name
finally:
r.close()
@hybridmethod
@check_derived
def delete(cls, id, client=None):
"""Delete the catalog object with the given `id`.
Parameters
----------
id : str
The id of the object to be deleted.
client : CatalogClient, optional
A `CatalogClient` instance to use for requests to the Descartes Labs
catalog. The
:py:meth:`~descarteslabs.catalog.CatalogClient.get_default_client` will
be used if not set.
Returns
-------
BlobDeletionTaskStatus
The status of the deletion task which can be used to wait for completion. ``None`` if the
object was not found.
Raises
------
ConflictError
If the object has related objects (bands, images) that exist.
~descarteslabs.exceptions.ClientError or ~descarteslabs.exceptions.ServerError
:ref:`Spurious exception <network_exceptions>` that can occur during a
network request.
Example
-------
>>> Image.delete('my-image-id') # doctest: +SKIP
There is also an instance ``delete`` method that can be used to delete a blob.
It accepts no parameters and also returns a ``BlobDeletionTaskStatus``. Once
deleted, you cannot use the blob and should release any references.
"""
if client is None:
client = CatalogClient.get_default_client()
try:
return BlobDeletionTaskStatus.create(
ids=[id], raise_on_missing=True, client=client
)
except NotFoundError:
return None
[docs] @delete.instancemethod
@check_deleted
def delete(self):
"""Delete this catalog object from the Descartes Labs catalog.
Once deleted, you cannot use the catalog object and should release any
references.
Returns
-------
BlobDeletionTaskStatus
The status of the deletion task which can be used to wait for completion.
Raises
------
DeletedObjectError
If this catalog object was already deleted.
UnsavedObjectError
If this catalog object is being deleted without having been saved.
~descarteslabs.exceptions.ClientError or ~descarteslabs.exceptions.ServerError
:ref:`Spurious exception <network_exceptions>` that can occur during a
network request.
"""
if self.state == DocumentState.UNSAVED:
raise UnsavedObjectError("You cannot delete an unsaved object.")
task_status = BlobDeletionTaskStatus.create(
ids=[self.id], raise_on_missing=True, client=self._client
)
self._deleted = True # non-200 will raise an exception
return task_status
[docs]class BlobCollection(Collection):
_item_type = Blob
# handle circular references
Blob._collection_type = BlobCollection
[docs]class BlobDeletionTaskStatus(TaskStatus):
"""The asynchronous deletion task's status
Attributes
----------
id : str
The id of the object for which this task is running.
status : TaskState
The state of the task as explained in `TaskState`.
start_datetime : datetime
The date and time at which the task started running.
duration_in_seconds : float
The duration of the task.
objects_deleted : int
The number of objects (a combination of bands or images) that were deleted.
errors : list
In case the status is ``FAILED`` this will contain a list of errors
that were encountered. In all other states this will not be set.
ids : list
The ids of the objects that were deleted.
"""
_task_name = "delete task"
_url = "/storage/delete/{}"
def __init__(self, objects_deleted=None, ids=None, **kwargs):
super(BlobDeletionTaskStatus, self).__init__(**kwargs)
self.objects_deleted = objects_deleted
self.ids = ids
[docs] @classmethod
def create(cls, ids, raise_on_missing, client):
# TaskStatus objects are not catalog objects so we need to do this manually
response = client.session.post(
"/storage/delete",
json={
"data": {
"attributes": {
"ids": ids,
"raise_on_missing": raise_on_missing,
},
"type": "storage_delete",
}
},
)
if response.status_code == 201:
data = response.json()["data"]
return BlobDeletionTaskStatus(
id=data["id"], _client=client, **data["attributes"]
)
else:
return None
def __repr__(self):
text = super(BlobDeletionTaskStatus, self).__repr__()
if self.objects_deleted:
text += "\n - {:,} objects deleted".format(self.objects_deleted)
return text