[docs]classAuth:"""Client used to authenticate with all Descartes Labs service APIs."""RETRY_CONFIG=Retry(total=5,backoff_factor=random.uniform(1,10),allowed_methods=frozenset(["GET","POST"]),status_forcelist=[429,500,502,503,504],)AUTHORIZATION_ERROR=("No valid authentication info found{}. ""See https://docs.descarteslabs.com/authentication.html.")KEY_CLIENT_ID="client_id"KEY_CLIENT_SECRET="client_secret"KEY_REFRESH_TOKEN="refresh_token"KEY_SCOPE="scope"KEY_GRANT_TYPE="grant_type"KEY_TARGET="target"KEY_API_TYPE="api_type"KEY_JWT_TOKEN="jwt_token"KEY_ALT_JWT_TOKEN="JWT_TOKEN"# The various prefixes that can be used in Catalog ACLs.ACL_PREFIX_USER="user:"# Followed by the user's sha1 hashACL_PREFIX_EMAIL="email:"# Followed by the user's emailACL_PREFIX_GROUP="group:"# Followed by a lowercase groupACL_PREFIX_ORG="org:"# Followed by a lowercase org nameACL_PREFIX_ACCESS="access-id:"# Followed by the purchase-specific access id# Note that the access-id, including the prefix `access_id:`, is matched against# a group with the same name. In other words `group:access-id:<access-id>` will# match against `access-id:<access-id>` (assuming the `<access_id>` is identical).# these match the values in descarteslabs/common/services/python_auth/groups.pyORG_ADMIN_SUFFIX=":org-admin"RESOURCE_ADMIN_SUFFIX=":resource-admin"# These are cache keys for caching various data in the object's __dict__.# These are scrubbed out with `_clear_cache()` when retrieving a new token.KEY_PAYLOAD="_payload"KEY_ALL_ACL_SUBJECTS="_aas"KEY_ALL_ACL_SUBJECTS_AS_SET="_aasas"KEY_ALL_OWNER_ACL_SUBJECTS="_aoas"KEY_ALL_OWNER_ACL_SUBJECTS_AS_SET="_aoasas"__attrs__=["domain","scope","leeway","token_info_path","client_id","client_secret","refresh_token","_token","_namespace","RETRY_CONFIG",]_default_token_info_path=object()# Just any unique object_instance=None# the default Auth instancedef__init__(self,domain=None,scope=None,leeway=500,token_info_path=_default_token_info_path,client_id=None,client_secret=None,jwt_token=None,refresh_token=None,retries=None,_suppress_warning=False,):"""Retrieves a JWT access token from a client id and refresh token for cli usage. By default and without arguments the credentials are retrieved from a config file named ``token_info.json``. This file can be created by running ``descarteslabs auth login`` from the command line. You can change the default location by setting the environment variable ``DESCARTESLABS_TOKEN_INFO_PATH``. Make sure you do this **before** running ``descarteslabs auth login`` so the credentials will be saved to the file specified in the environment variable, and when still set when instantiating this class, the credentials will be read from that file. To use a short-lived access token that will not be refreshed, either set the environment variable ``DESCARTESLABS_TOKEN`` or use the ``jwt_token`` parameter. To use a long-lived refresh token that will be refreshed, either set the environment variables ``DESCARTESLABS_CLIENT_ID`` and ``DESCARTESLABS_CLIENT_SECRET`` or use the parameters ``client_id`` and ``client_secret``. This will retrieve an access token which will be cached between instances for the same combination of client id and client secret. If in addition to the client id and client secret you also specify a valid short-lived access token, it will be used until it expires. Note that the environment variable ``DESCARTESLABS_REFRESH_TOKEN`` is identical to ``DESCARTESLABS_CLIENT_SECRET`` and the parameter ``refresh_token`` is identical to ``client_secret``. Use one or the other but not both. Although discouraged, it is possible to set one value as environment variable, and pass the other value in as parameter. For example, one could set the environment variable ``DESCARTESLABS_CLIENT_ID`` and only pass in the parameter ``client_secret``. If you also specify a ``token_info_path`` that indicates which file to read the credentials from. If used by itself, it works the same as ``DESCARTESLABS_TOKEN_INFO_PATH`` and assuming the file exists and contains valid credentials, you could switch between accounts this way. If you specify the ``token_info_path`` together with an additional client id and client secret (whether retrieved through environment variables or given using parameters), the given credentials will be written to the given file. If this file already exists and contains matching credentials, it will be used to retrieve the short-lived access token and refreshes it when it expires. If the file already exists and contains conflicting credentials, it will be overwritten with the new credentials. Parameters ---------- domain : str, default ``descarteslabs.config.get_settings().IAM_URL`` The domain used for the credentials. You should normally never change this. scope : list(str), optional The JWT access token fields to be included. You should normally never have to use this. leeway : int, default 500 The leeway is given in seconds and is used as a safety cushion for the expiration. If the expiration falls within the leeway, the JWT access token will be renewed. token_info_path : str, default ``~/.descarteslabs/token_info.json`` Path to a JSON file holding the credentials. If not set and credentials are provided through environment variables or through parameters, this parameter will **not** be used. However, if no credentials are provided through environment variables or through parameters, it will default to ``~/.descarteslabs/token_info.json`` and credentials will be retrieved from that file if present. If explicitly set to ``None``, credentials will never be retrieved from file and **must** be provided through environment variables or parameters. client_id : str, optional The JWT client id. If provided it will take precedence over the corresponding environment variable, or the credentials retrieved through the file specified in ``token_info_path``. If this parameter is provided, you **must** either provide a ``client_secret`` or ``refresh_token`` (but not both). Access tokens retrieved this way will be cached without revealing the client secret. client_secret : str, optional The refresh token used to retrieve short-lived access tokens. If provided it will take precedence over the corresponding environment variable, or the credentials retrieved through the file specified in ``token_info_path``. If this parameter is provided, you **must** also provide a client id either as a parameter or through an environment variable. Access tokens retrieved this way will be cached without revealing the client secret. jwt_token : str, optional A short-lived JWT access token. If valid and used without other parameters, it will be used for access. If used with a client id, the access token must match or it will be discarded. If the access token is discarded either because it expired or didn't match the given client id, and no client secret has been given, no new access token can be retrieved and access will be denied. If used with both client id and client secret, the token will be cached and updated as needed without revealing the client secret. refresh_token : str, optional Identical to the ``client_secret``. You can only specify one or the other, or if specified both, they must match. The refresh token takes precedence over the client secret. retries : Retry or int, optional The number of retries and backoff policy; by default 5 retries with a random backoff policy between 1 and 10 seconds. Raises ------ UserWarning In case the refresh token and client secret differ. In case the defailt or given ``token_info_path`` cannot be found. In case no credentials can be found. Examples -------- >>> import descarteslabs >>> # Use default credentials obtained through 'descarteslabs auth login' >>> auth = descarteslabs.auth.Auth() >>> # Your Descartes Labs user id >>> auth.namespace # doctest: +SKIP 'a54d88e06612d820bc3be72877c74f257b561b19' >>> auth = descarteslabs.auth.Auth( ... client_id="some-client-id", ... client_secret="some-client-secret", ... ) >>> auth.namespace # doctest: +SKIP '67f21eb1040f978fe1da32e5e33501d0f4a604ac' >>> """# The logic here is murky and changed over time. Initially, the logic would# retrieve *any* of the information from *any* of the sources. This resulted in# the `token_info.json` being overwritten when you would use a different refresh# token set in the environment or passed in. This was changed to make a# distinction between data that is provided through the environment or as# arguments, versus the data that is retrieved from `token_info.json`. This still# allows arbitrary combinations of data provided through the environment and# passed in as arguments.# In addition there are duplicate keys and arguments, which makes things even# more unnecessarily complicated. For backward compatibility reasons we keep it# as-is. Overall the core information consists of:# client_id: The oauth application id.# client_secret: Same as refresh_token.# refresh_token: The oauth application refresh token. Refresh token has# precedence over client_secret.# _token: The short-lived jwt id token that can be generated from the# refresh token if present.self.token_info_path=token_info_pathiftoken_info_pathisAuth._default_token_info_path:token_info_path=Noneself.token_info_path=os.environ.get(DESCARTESLABS_TOKEN_INFO_PATH,DEFAULT_TOKEN_INFO_PATH)token_info={}# First determine if we are getting our info from the args or environmentself.client_id=next((xforxin(client_id,os.environ.get(DESCARTESLABS_CLIENT_ID),os.environ.get("CLIENT_ID"),)ifxisnotNone),None,)self.client_secret=next((xforxin(client_secret,os.environ.get(DESCARTESLABS_CLIENT_SECRET),os.environ.get("CLIENT_SECRET"),)ifxisnotNone),None,)self.refresh_token=next((xforxin(refresh_token,os.environ.get(DESCARTESLABS_REFRESH_TOKEN),)ifxisnotNone),None,)self._token=next((xforxin(jwt_token,os.environ.get(DESCARTESLABS_TOKEN),)ifxisnotNone),None,)# Make sure self.refresh_token is setifnotself.refresh_token:self.refresh_token=self.client_secretifself.client_idorself.refresh_tokenorself._token:# Information is provided through the environment or as argumentiftoken_info_path:# Explicit token_info.json file; see if we can use it...ifos.path.exists(self.token_info_path):token_info=self._read_token_info(self.token_info_path)if(notself._tokenandself.client_id==token_info.get(self.KEY_CLIENT_ID)andself.refresh_token==token_info.get(self.KEY_REFRESH_TOKEN)):self._token=token_info.get(self.KEY_JWT_TOKEN)elifself.refresh_tokenandself.token_info_path:# Make the saved JWT token file unique to the refresh tokentoken=self.refresh_tokentoken_sha1=sha1(token.encode("utf-8")).hexdigest()self.token_info_path=os.path.join(DEFAULT_TOKEN_INFO_DIR,f"{JWT_TOKEN_PREFIX}{token_sha1}.json")ifself._token:self._write_token_info(self.token_info_path,{self.KEY_JWT_TOKEN:self._token})else:self._token=self._read_token_info(self.token_info_path,suppress_warning=True).get(self.KEY_JWT_TOKEN)elifself.token_info_path:# All information comes from the cached token_info.json filetoken_info=self._read_token_info(self.token_info_path,_suppress_warning)self.client_id=token_info.get(self.KEY_CLIENT_ID)self.client_secret=token_info.get(self.KEY_CLIENT_SECRET)self.refresh_token=token_info.get(self.KEY_REFRESH_TOKEN)self._token=next((xforxin(token_info.get(self.KEY_ALT_JWT_TOKEN),token_info.get(self.KEY_JWT_TOKEN),)ifxisnotNone),None,)# The refresh token and client secret should be identical if both setif(self.client_secretandself.refresh_tokenandself.client_secret!=self.refresh_token):warnings.warn("Authentication token mismatch: both the client secret and the ""refresh token are provided but differ in value; ""the refresh token will be used for authentication.",stacklevel=2,)# Make sure they're identical. Refresh token has precedence.ifself.refresh_token:self.client_secret=self.refresh_tokenelifself.client_secret:self.refresh_token=self.client_secretself.scope=next((xforxin(scope,token_info.get(self.KEY_SCOPE))ifxisnotNone),None)# Verify that the token is valid; otherwise clear itifself._token:try:payload=self._get_payload(self._token)exceptAuthError:self._token=Noneelse:ifself._token_expired(payload)or(self.client_idandpayload.get("aud")!=self.client_id):self._token=Noneifnot_suppress_warningandnot(self._tokenor(self.client_idandself.refresh_token)):# Won't authn if we don't have a token or a client_id/refresh_token pairwarnings.warn(self.AUTHORIZATION_ERROR.format(""),stacklevel=2)self._namespace=NoneifretriesisNone:retries=self.RETRY_CONFIGself._retry_config=retriesself._init_session()self.leeway=leewayifdomainisNone:domain=get_default_domain()self.domain=domain
[docs]@classmethoddeffrom_environment_or_token_json(cls,**kwargs):"""Creates an Auth object from the given arguments. Creates an Auth object from the given arguments, environment variables, or stored credentials. See :py:class:`Auth` for details. """returnAuth(**kwargs)
def_init_session(self):# Sessions can't be shared across threads or processes because the underlying# SSL connection pool can't be shared. We create them thread-local to avoid# intractable exceptions when users naively share clients e.g. when using# multiprocessing.self._session=ThreadLocalWrapper(self.build_session)def_token_expired(self,payload,leeway=0):exp=payload.get("exp")ifexpisnotNone:now=(datetime.datetime.now(datetime.timezone.utc)-datetime.datetime(1970,1,1,tzinfo=datetime.timezone.utc)).total_seconds()returnnow+leeway>expreturnTrue# Must have exp@propertydeftoken(self):"""Gets the short-lived JWT access token. Returns ------- str The JWT token string. Raises ------ AuthError Raised when incomplete credentials were provided. OauthError Raised when a token cannot be obtained or refreshed. """ifself._tokenisNone:self._get_token()else:# might have token but could be close to expirationpayload=self._get_payload(self._token)ifself._token_expired(payload,self.leeway):try:self._get_token()exceptAuthErrorase:# Unable to refresh, raise if truly expiredifself._token_expired(payload):raiseereturnself._token@propertydefpayload(self):"""Gets the token payload. Returns ------- dict Dictionary containing the fields specified by scope, which may include: .. highlight:: none :: name: The name of the user. groups: Groups to which the user belongs. org: The organization to which the user belongs. email: The email address of the user. email_verified: True if the user's email has been verified. sub: The user identifier. exp: The expiration time of the token, in seconds since the start of the unix epoch. Raises ------ AuthError Raised when incomplete credentials were provided. OauthError Raised when a token cannot be obtained or refreshed. """payload=self.__dict__.get(self.KEY_PAYLOAD)ifpayloadisNone:payload=self._get_payload(self.token)# doctor custom claimsifDESCARTESLABS_CUSTOM_CLAIM_PREFIX:forkeyinlist(payload.keys()):ifkey.startswith(DESCARTESLABS_CUSTOM_CLAIM_PREFIX):payload[key[len(DESCARTESLABS_CUSTOM_CLAIM_PREFIX):]]=(payload.pop(key))self.__dict__[self.KEY_PAYLOAD]=payloadreturnpayload@staticmethoddef_get_payload(token):ifisinstance(token,str):token=token.encode("utf-8")try:# Anything that goes wrong here means it's a bad tokenclaims=token.split(b".")[1]returnjson.loads(base64url_decode(claims).decode("utf-8"))exceptExceptionase:raiseAuthError("Unable to read token {}: {}".format(token,e))@propertydefsession(self):returnself._session.get()defbuild_session(self):session=Session(self.domain,retries=self._retry_config)# local testing will not have necessary certsifself.domain.startswith("https://dev.localhost"):session.verify=Falsereturnsession
[docs]@staticmethoddefget_default_auth():"""Retrieve the default Auth. This Auth is used whenever you don't explicitly set the Auth when creating clients, etc. """ifAuth._instanceisNone:Auth._instance=Auth()returnAuth._instance
[docs]@staticmethoddefset_default_auth(auth):"""Change the default Auth to the given Auth. This is the Auth that will be used whenever you don't explicitly set the Auth when creating clients, etc. """Auth._instance=auth
@staticmethoddef_read_token_info(path,suppress_warning=False):ifos.environ.get("DESCARTESLABS_NO_JWT_CACHE","").lower()=="true":return{}try:withopen(path)asfp:returnjson.load(fp)exceptExceptionase:ifnotsuppress_warning:warnings.warn("Unable to read token_info from {} with error {}.".format(path,str(e)),stacklevel=3,)return{}@staticmethoddef_write_token_info(path,token_info):token_info_directory=os.path.dirname(path)temp_prefix=".{}.".format(os.path.basename(path))fd=Nonetemp_path=Nonesuppress_warning=Falsetry:ifAuth.KEY_JWT_TOKENintoken_info:token=token_info[Auth.KEY_JWT_TOKEN]ifisinstance(token,bytes):token_info[Auth.KEY_JWT_TOKEN]=token.decode("utf-8")makedirs_if_not_exists(token_info_directory)fd,temp_path=tempfile.mkstemp(prefix=temp_prefix,dir=token_info_directory)ifJWT_TOKEN_PREFIXinpath:token_info={Auth.KEY_JWT_TOKEN:token_info[Auth.KEY_JWT_TOKEN]}suppress_warning=Truetry:withos.fdopen(fd,"w+")asfp:json.dump(token_info,fp)finally:fd=None# Closed nowos.chmod(temp_path,stat.S_IRUSR|stat.S_IWUSR)try:os.rename(temp_path,path)exceptFileExistsError:# On windows remove the file firstos.remove(path)os.rename(temp_path,path)exceptExceptionase:ifnotsuppress_warning:warnings.warn("Failed to save token: {}".format(e),stacklevel=3,)finally:iffdisnotNone:os.close(fd)iftemp_pathisnotNoneandos.path.exists(temp_path):os.remove(temp_path)def_get_token(self,timeout=100):ifself.client_idisNone:raiseAuthError(self.AUTHORIZATION_ERROR.format(" (no client_id)"))ifself.client_secretisNoneandself.refresh_tokenisNone:raiseAuthError(self.AUTHORIZATION_ERROR.format(" (no client_secret or refresh_token)"))ifself.client_idinLEGACY_DELEGATION_CLIENT_IDS:ifself.scopeisNone:scope=["openid","name","groups","org","email"]else:scope=self.scopeparams={self.KEY_SCOPE:" ".join(scope),self.KEY_CLIENT_ID:self.client_id,self.KEY_GRANT_TYPE:"urn:ietf:params:oauth:grant-type:jwt-bearer",self.KEY_TARGET:self.client_id,self.KEY_API_TYPE:"app",self.KEY_REFRESH_TOKEN:self.refresh_token,}else:params={self.KEY_CLIENT_ID:self.client_id,self.KEY_GRANT_TYPE:"refresh_token",self.KEY_REFRESH_TOKEN:self.refresh_token,}ifself.scopeisnotNone:params[self.KEY_SCOPE]=" ".join(self.scope)r=self.session.post("/token",json=params,timeout=timeout)ifr.status_code!=200:raiseOauthError("Could not retrieve token: {}".format(r.text.strip()))data=r.json()access_token=data.get("access_token")id_token=data.get("id_token")# TODO(justin) remove legacy id_token usageifaccess_tokenisnotNone:self._token=access_tokenelifid_tokenisnotNone:self._token=id_tokenelse:raiseOauthError("Could not retrieve token")# clear out payload and subjects cacheself._clear_cache()token_info={}# Read the token from the token_info_path, and save it againifself.token_info_path:token_info=self._read_token_info(self.token_info_path,suppress_warning=True)if(token_info.get(self.KEY_CLIENT_ID)!=self.client_idortoken_info.get(self.KEY_CLIENT_SECRET)!=self.client_secret):# Not matching; better rewrite!token_info={self.KEY_CLIENT_ID:self.client_id,self.KEY_CLIENT_SECRET:self.client_secret,self.KEY_REFRESH_TOKEN:self.refresh_token,}token_info[self.KEY_JWT_TOKEN]=self._tokentoken_info.pop(self.KEY_ALT_JWT_TOKEN,None)# Remove alt keyself._write_token_info(self.token_info_path,token_info)@propertydefnamespace(self):"""Gets the user namespace (the Descartes Labs user id). Returns ------- str The user namespace. Raises ------ AuthError Raised when incomplete credentials were provided. OauthError Raised when a token cannot be obtained or refreshed. """namespace=self._namespaceifnamespaceisNone:namespace=self.payload.get("userid")ifnotnamespace:# legacy, compute it on the flynamespace=sha1(self.payload["sub"].encode("utf-8")).hexdigest()self._namespace=namespacereturnnamespace@propertydefall_acl_subjects(self):""" A list of all ACL subjects identifying this user (the user itself, the org, the groups) which can be used in ACL queries. """subjects=self.__dict__.get(self.KEY_ALL_ACL_SUBJECTS)ifsubjectsisNone:subjects=[self.ACL_PREFIX_USER+self.namespace]ifemail:=self.payload.get("email"):subjects.append(self.ACL_PREFIX_EMAIL+email.lower())iforg:=self.payload.get("org"):subjects.append(self.ACL_PREFIX_ORG+org)subjects+=[self.ACL_PREFIX_GROUP+groupforgroupinself._active_groups()]self.__dict__[self.KEY_ALL_ACL_SUBJECTS]=subjectsreturnsubjects@propertydefall_acl_subjects_as_set(self):subjects_as_set=self.__dict__.get(self.KEY_ALL_ACL_SUBJECTS_AS_SET)ifsubjects_as_setisNone:subjects_as_set=set(self.all_acl_subjects)self.__dict__[self.KEY_ALL_ACL_SUBJECTS_AS_SET]=subjects_as_setreturnsubjects_as_set@propertydefall_owner_acl_subjects(self):""" A list of ACL subjects identifying this user (the user itself, the org, org admin and catalog admins) which can be used in owner ACL queries. """subjects=self.__dict__.get(self.KEY_ALL_OWNER_ACL_SUBJECTS)ifsubjectsisNone:subjects=[self.ACL_PREFIX_USER+self.namespace]subjects.extend([self.ACL_PREFIX_ORG+orgfororginself.get_org_admins()iforg])subjects.extend([self.ACL_PREFIX_ACCESS+access_idforaccess_idinself.get_resource_admins()ifaccess_id])self.__dict__[self.KEY_ALL_OWNER_ACL_SUBJECTS]=subjectsreturnsubjects@propertydefall_owner_acl_subjects_as_set(self):subjects_as_set=self.__dict__.get(self.KEY_ALL_OWNER_ACL_SUBJECTS_AS_SET)ifsubjects_as_setisNone:subjects_as_set=set(self.all_owner_acl_subjects)self.__dict__[self.KEY_ALL_OWNER_ACL_SUBJECTS_AS_SET]=subjects_as_setreturnsubjects_as_setdefget_org_admins(self):# This retrieves the value of the org to be added if the user has one or# more org-admin groups, otherwise the empty list.return[group[:-len(self.ORG_ADMIN_SUFFIX)]forgroupinself.payload.get("groups",[])ifgroup.endswith(self.ORG_ADMIN_SUFFIX)]defget_resource_admins(self):# This retrieves the value of the access-id to be added if the user has one or# more resource-admin groups, otherwise the empty list.return[group[:-len(self.RESOURCE_ADMIN_SUFFIX)]forgroupinself.payload.get("groups",[])ifgroup.endswith(self.RESOURCE_ADMIN_SUFFIX)]def_active_groups(self):""" Attempts to filter groups to just the ones that are currently valid for this user. If they have a colon, the prefix leading up to the colon must be the user's current org, otherwise the user should not actually have rights with this group. """org=self.payload.get("org")forgroupinself.payload.get("groups",[]):parts=group.split(":")iflen(parts)==1:yieldgroupeliforgandparts[0]==org:yieldgroupdef_clear_cache(self):forkeyin(self.KEY_PAYLOAD,self.KEY_ALL_ACL_SUBJECTS,self.KEY_ALL_ACL_SUBJECTS_AS_SET,self.KEY_ALL_OWNER_ACL_SUBJECTS,self.KEY_ALL_OWNER_ACL_SUBJECTS_AS_SET,):ifkeyinself.__dict__:delself.__dict__[key]self._namespace=Nonedef__getstate__(self):returndict((attr,getattr(self,attr))forattrinself.__attrs__)def__setstate__(self,state):forname,valueinstate.items():setattr(self,name,value)self._init_session()