Source code for featurizer_api_client.client

import os
import time
import numpy
import requests
from http import HTTPStatus
from featurizer_api_client.common.exceptions import *
from featurizer_api_client.common.logging import get_client_logger
from featurizer_api_client.utils.data import DataWrapper
from featurizer_api_client.utils.headers import (
    get_header_with_authentication_credentials,
    get_header_with_access_token,
    get_header_with_refresh_token
)


[docs]class FeaturizerApiClient(object): """Class implementing the lightweight client app for the Featurizer API""" # Define the logging configuration logging_configuration = { "class": "logging.handlers.TimedRotatingFileHandler", "kwargs": { "when": "midnight", "interval": 1, "backupCount": 365, "encoding": "utf8", "filename": os.path.join( os.path.dirname(os.path.realpath(__file__)), "..", "..", "logs", time.strftime("%Y_%m_%d_client.log") ) } } # Define the featurized data wrapper data_wrapper = DataWrapper # Define the request attributes host = "http://127.0.0.1" port = 5000 verify = True timeout = 2 # Define the HTTP error codes to be handled by refreshing the access token refresh_required_errors = [ HTTPStatus.UNAUTHORIZED, HTTPStatus.UNPROCESSABLE_ENTITY ] def __init__(self, host=None, port=None, verify=None, timeout=None, logging_configuration=None): """ Initializes the FeaturizerApiClient. :param host: host (IP address), defaults to None :type host: str, optional :param port: port (port number), defaults to None :type port: str, optional :param verify: request verification, defaults to None :type verify: bool, optional :param timeout: timeout in seconds, defaults to None :type timeout: int. optional :param logging_configuration: logging configuration, defaults to None :type logging_configuration: dict, optional """ # Set the client logger self.logger = get_client_logger(logging_configuration or FeaturizerApiClient.logging_configuration) # Set the basic attributes self.host = host if host else FeaturizerApiClient.host self.port = port if port else FeaturizerApiClient.port self.verify = verify if verify else FeaturizerApiClient.verify self.timeout = timeout if timeout else FeaturizerApiClient.timeout # Set the internal attributes self._username = None self._password = None self._access_token = None self._refresh_token = None # --------- # # Endpoints # # --------- #
[docs] def sign_up(self, username, password): """ Signs-up a new user in the featurizer API. :param username: username :type username: str :param password: password :type password: str :return: (data/error_info, status_code) :rtype: tuple """ # Prepare the request body (username, password) body = get_header_with_authentication_credentials(username, password) # Sign-up a new user try: response = requests.post(url=self.signup_endpoint, json=body, verify=self.verify, timeout=self.timeout) except requests.ConnectionError: return {"message": "Connection error."}, HTTPStatus.NOT_FOUND else: return self._prepare_authentication_response(response)
[docs] def log_in(self, username, password): """ Logs-in an existing user in the featurizer API. :param username: username :type username: str :param password: password :type password: str :return: (data/error_info, status_code) :rtype: tuple """ # Prepare the request body (username, password) body = get_header_with_authentication_credentials(username, password) # Log-in an existing user try: response = requests.post(url=self.login_endpoint, json=body, verify=self.verify, timeout=self.timeout) except requests.ConnectionError: return {"message": "Connection error."}, HTTPStatus.NOT_FOUND else: return self._prepare_authorization_response(response)
[docs] def refresh_access_token(self, refresh_token): """ Refreshes an access token in the featurizer API. :param refresh_token: refresh token :type refresh_token: str :return: (data/error_info, status_code) :rtype: tuple """ # Prepare the request body (refresh token) body = get_header_with_refresh_token(refresh_token) # Refresh an access token try: response = requests.post(url=self.refresh_endpoint, headers=body, verify=self.verify, timeout=self.timeout) except requests.ConnectionError: return {"message": "Connection error."}, HTTPStatus.NOT_FOUND else: return self._prepare_authorization_response(response)
[docs] def featurize( self, access_token, refresh_token, features_pipeline, sample_values, sample_labels=None, extractor_configuration=None): """ Calls .featurize(...) on an injected featurization library via the API. :param access_token: access token :type access_token: str :param refresh_token: refresh token :type refresh_token: str :param features_pipeline: features pipeline :type features_pipeline: list of dicts :param sample_values: sample values :type sample_values: numpy.array :param sample_labels: sample labels, defaults to None :type sample_labels: list, optional :param extractor_configuration: extractor config, defaults to None :type extractor_configuration: dict, optional :return: (data/error_info, status_code) :rtype: tuple """ # Validate the input arguments if not features_pipeline: raise NoFeaturesPipelineForFeaturizationError(f"Missing: <features_pipeline>") if sample_values is None: raise NoSampleValuesForFeaturizationError(f"Missing: <sample_values>") if not isinstance(features_pipeline, (list, tuple)): raise UnsupportedSampleValuesForFeaturizationError("Unsupported type: <features_pipeline>") if not isinstance(sample_values, numpy.ndarray): raise UnsupportedSampleValuesForFeaturizationError("Unsupported type: <sample_values>") if not isinstance(sample_labels, (list, tuple, type(None))): raise UnsupportedSampleLabelsForFeaturizationError("Unsupported type: <sample_labels>") # Prepare the featurization data data = self._prepare_featurization_data( features_pipeline=features_pipeline, sample_values=sample_values, sample_labels=sample_labels, extractor_configuration=extractor_configuration) # Prepare the refresh token necessity flag needs_refresh = False # Featurize the data using an injected featurization library # # 1. call the featurization endpoint # 2. if access token refresh is required, refresh the access token # 3. if the access token got refreshed, re-call the featurization endpoint again # ---------------------------------- # 1. Call the featurization endpoint try: response = requests.post( url=self.featurize_endpoint, json=data, headers=get_header_with_access_token(access_token), verify=self.verify, timeout=self.timeout) if response.status_code in self.refresh_required_errors: needs_refresh = True except (requests.ConnectionError, requests.exceptions.ChunkedEncodingError): needs_refresh = True else: if not needs_refresh: return self._prepare_featurization_response(data, response) # ---------------------------------- # 2. Handle the access token refresh if needs_refresh: response, status_code = self.refresh_access_token(refresh_token) if status_code != HTTPStatus.OK: return response, status_code # ------------------------------------- # 3. Re-call the featurization endpoint response = requests.post( url=self.featurize_endpoint, json=data, headers=get_header_with_access_token(response.get("access_token")), verify=self.verify, timeout=self.timeout) return self._prepare_featurization_response(data, response)
# --------- # # Utilities # # --------- # def _prepare_featurization_data( self, features_pipeline, sample_values, sample_labels=None, extractor_configuration=None): """ Prepares the featurization data. :param features_pipeline: features pipeline :type features_pipeline: list of dicts :param sample_values: sample values :type sample_values: numpy.array :param sample_labels: sample labels :type sample_labels: list, optional :param extractor_configuration: feature extractor configuration :type extractor_configuration: dict, optional :return: featurization data :rtype: dict """ # Prepare the sample values and labels sample_values = self.data_wrapper.wrap_data(sample_values) sample_labels = sample_labels if sample_labels else [] # Prepare the extractor configuration extractor_configuration = extractor_configuration if extractor_configuration else {} # Return the featurization data return { "samples": { "values": sample_values, "labels": sample_labels, }, "features": { "pipeline": features_pipeline }, "extractor_configuration": extractor_configuration } def _prepare_authentication_response(self, response): """ Prepares the authentication response. :param response: API response :type response: requests.Response :return: prepared response data :rtype: tuple """ # Extract the authentication-data and status code data, state = response.json(), response.status_code # Call the authorization hooks self._update_username_hook(data.get("username")) self._update_password_hook(data.get("password")) # Return the authentication-data and status code return data, state def _prepare_authorization_response(self, response): """ Prepares the authorization response. :param response: API response :type response: requests.Response :return: prepared response data :rtype: tuple """ # Extract the authorization-data and status code data, state = response.json(), response.status_code # Call the authorization hooks self._update_access_token_hook(data.get("access_token")) self._update_refresh_token_hook(data.get("refresh_token")) # Return the authorization-data and status code return data, state def _prepare_featurization_response(self, data, response): """ Prepares the featurization response. :param data: data for featurization :type data: dict or str :param response: API response :type response: requests.Response :return: prepared response data :rtype: tuple """ # Log the featurization self.logger.info(f"./featurize ({response.status_code}) data: {data}; response: {response.json()}") # Prepare the featurization response if response.status_code == HTTPStatus.OK: features = response.json().get("features") features = { "values": self.data_wrapper.unwrap_data(features.get("values")), "labels": features.get("labels") } else: features = response.json() # Return the featurization response return features, response.status_code # ----- # # Hooks # # ----- # def _update_username_hook(self, username): self.username = username def _update_password_hook(self, password): self.password = password def _update_access_token_hook(self, access_token): self.access_token = access_token def _update_refresh_token_hook(self, refresh_token): self.refresh_token = refresh_token # ---------- # # Properties # # ---------- # @property def address(self): return f"{self.host}:{self.port}" @property def signup_endpoint(self): return f"{self.address}/signup" @property def login_endpoint(self): return f"{self.address}/login" @property def refresh_endpoint(self): return f"{self.address}/refresh" @property def featurize_endpoint(self): return f"{self.address}/featurize" @property def username(self): return self._username @username.setter def username(self, username): self._username = username @property def password(self): return self._password @password.setter def password(self, password): self._password = password @property def access_token(self): return self._access_token @access_token.setter def access_token(self, access_token): self._access_token = access_token @property def refresh_token(self): return self._refresh_token @refresh_token.setter def refresh_token(self, refresh_token): self._refresh_token = refresh_token