import os
import time
import numpy
import requests
from http import HTTPStatus
from predictor_api_client.common.exceptions import *
from predictor_api_client.common.logging import get_client_logger
from predictor_api_client.utils.data import DataWrapper
from predictor_api_client.utils.headers import (
get_header_with_authentication_credentials,
get_header_with_access_token,
get_header_with_refresh_token
)
[docs]class PredictorApiClient(object):
"""Class implementing the lightweight client app for the Predictor API"""
# Define the logging configuration
logging_configuration = {
"class": "logging.handlers.TimedRotatingFileHandler",
"kwargs": {
"when": "midnight",
"interval": 1,
"backupCount": 365,
"encoding": "utf8",
"filename": os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"..",
"..",
"logs",
time.strftime("%Y_%m_%d_client.log")
)
}
}
# Define the predicted data wrapper
data_wrapper = DataWrapper
# Define the request attributes
host = "http://127.0.0.1"
port = 5000
verify = True
timeout = 2
# Define the HTTP error codes to be handled by refreshing the access token
refresh_required_errors = [
HTTPStatus.UNAUTHORIZED,
HTTPStatus.UNPROCESSABLE_ENTITY
]
def __init__(self, host=None, port=None, verify=None, timeout=None, logging_configuration=None):
"""
Initializes the PredictorApiClient.
:param host: host (IP address), defaults to None
:type host: str, optional
:param port: port (port number), defaults to None
:type port: str, optional
:param verify: request verification, defaults to None
:type verify: bool, optional
:param timeout: timeout in seconds, defaults to None
:type timeout: int. optional
:param logging_configuration: logging configuration, defaults to None
:type logging_configuration: dict, optional
"""
# Set the client logger
self.logger = get_client_logger(logging_configuration or PredictorApiClient.logging_configuration)
# Set the basic attributes
self.host = host if host else PredictorApiClient.host
self.port = port if port else PredictorApiClient.port
self.verify = verify if verify else PredictorApiClient.verify
self.timeout = timeout if timeout else PredictorApiClient.timeout
# Set the internal attributes
self._username = None
self._password = None
self._access_token = None
self._refresh_token = None
# --------- #
# Endpoints #
# --------- #
[docs] def sign_up(self, username, password):
"""
Signs-up a new user in the predictor API.
:param username: username
:type username: str
:param password: password
:type password: str
:return: (data/error_info, status_code)
:rtype: tuple
"""
# Prepare the request body (username, password)
body = get_header_with_authentication_credentials(username, password)
# Sign-up a new user
try:
response = requests.post(url=self.signup_endpoint, json=body, verify=self.verify, timeout=self.timeout)
except requests.ConnectionError:
return {"message": "Connection error."}, HTTPStatus.NOT_FOUND
else:
return self._prepare_authentication_response(response)
[docs] def log_in(self, username, password):
"""
Logs-in an existing user in the predictor API.
:param username: username
:type username: str
:param password: password
:type password: str
:return: (data/error_info, status_code)
:rtype: tuple
"""
# Prepare the request body (username, password)
body = get_header_with_authentication_credentials(username, password)
# Log-in an existing user
try:
response = requests.post(url=self.login_endpoint, json=body, verify=self.verify, timeout=self.timeout)
except requests.ConnectionError:
return {"message": "Connection error."}, HTTPStatus.NOT_FOUND
else:
return self._prepare_authorization_response(response)
[docs] def refresh_access_token(self, refresh_token):
"""
Refreshes an access token in the predictor API.
:param refresh_token: refresh token
:type refresh_token: str
:return: (data/error_info, status_code)
:rtype: tuple
"""
# Prepare the request body (refresh token)
body = get_header_with_refresh_token(refresh_token)
# Refresh an access token
try:
response = requests.post(url=self.refresh_endpoint, headers=body, verify=self.verify, timeout=self.timeout)
except requests.ConnectionError:
return {"message": "Connection error."}, HTTPStatus.NOT_FOUND
else:
return self._prepare_authorization_response(response)
[docs] def predict(self, access_token, refresh_token, model_identifier, feature_values, feature_labels=None):
"""
Calls .predict(...) on a predictor (model) via the API.
:param access_token: access token
:type access_token: str, optional
:param refresh_token: refresh token
:type refresh_token: str
:param model_identifier: model identifier
:type model_identifier: str
:param feature_values: feature values
:type feature_values: numpy.array
:param feature_labels: feature labels, defaults to None
:type feature_labels: list, optional
:return: (data/error_info, status_code)
:rtype: tuple
"""
return self._predict(
endpoint=self.predict_endpoint,
access_token=access_token,
refresh_token=refresh_token,
model_identifier=model_identifier,
feature_values=feature_values,
feature_labels=feature_labels)
[docs] def predict_proba(self, access_token, refresh_token, model_identifier, feature_values, feature_labels=None):
"""
Calls .predict_proba(...) on a predictor (model) via the API.
:param access_token: access token
:type access_token: str, optional
:param refresh_token: refresh token
:type refresh_token: str
:param model_identifier: model identifier
:type model_identifier: str
:param feature_values: feature values
:type feature_values: numpy.array
:param feature_labels: feature labels, defaults to None
:type feature_labels: list, optional
:return: (data/error_info, status_code)
:rtype: tuple
"""
return self._predict(
endpoint=self.predict_proba_endpoint,
access_token=access_token,
refresh_token=refresh_token,
model_identifier=model_identifier,
feature_values=feature_values,
feature_labels=feature_labels)
def _predict(
self,
endpoint,
access_token,
refresh_token,
model_identifier,
feature_values,
feature_labels=None):
"""
Calls .<endpoint>(...) on a predictor (model) via the API.
:param endpoint: endpoint to be called
:type endpoint: method
:param access_token: access token
:type access_token: str
:param refresh_token: refresh token
:type refresh_token: str
:param model_identifier: model identifier
:type model_identifier: str
:param feature_values: feature values
:type feature_values: numpy.array
:param feature_labels: feature labels, defaults to None
:type feature_labels: list, optional
:return: (data/error_info, status_code)
:rtype: tuple
"""
# Validate the input arguments
if not model_identifier:
raise NoModelIdentifierForPredictionError(f"Missing: <model_identifier>")
if feature_values is None:
raise NoFeatureValuesForPredictionError(f"Missing: <feature_values>")
if not isinstance(model_identifier, str):
raise UnsupportedModelIdentifierForPredictionError("Unsupported type: <model_identifier>")
if not isinstance(feature_values, numpy.ndarray):
raise UnsupportedFeatureValuesForPredictionError("Unsupported type: <feature_values>")
if not isinstance(feature_labels, (list, tuple, type(None))):
raise UnsupportedFeatureLabelsForPredictionError("Unsupported type: <feature_labels>")
# Prepare the prediction data
data = self._prepare_prediction_data(
model_identifier=model_identifier,
feature_values=feature_values,
feature_labels=feature_labels)
# Prepare the refresh token necessity flag
needs_refresh = False
# Predict the class(es)/probabilit(y/ies) using an identified model
#
# 1. call the prediction endpoint
# 2. if access token refresh is required, refresh the access token
# 3. if the access token got refreshed, re-call the prediction endpoint again
# -------------------------------
# 1. Call the prediction endpoint
try:
response = requests.post(
url=endpoint,
json=data,
headers=get_header_with_access_token(access_token),
verify=self.verify,
timeout=self.timeout)
if response.status_code in self.refresh_required_errors:
needs_refresh = True
except (requests.ConnectionError, requests.exceptions.ChunkedEncodingError):
needs_refresh = True
else:
if not needs_refresh:
return self._prepare_prediction_response(data, response, endpoint)
# ----------------------------------
# 2. Handle the access token refresh
if needs_refresh:
response, status_code = self.refresh_access_token(refresh_token)
if status_code != HTTPStatus.OK:
return response, status_code
# ----------------------------------
# 3. Re-call the prediction endpoint
response = requests.post(
url=endpoint,
json=data,
headers=get_header_with_access_token(response.get("access_token")),
verify=self.verify,
timeout=self.timeout)
return self._prepare_prediction_response(data, response, endpoint)
# --------- #
# Utilities #
# --------- #
def _prepare_prediction_data(self, model_identifier, feature_values, feature_labels=None):
"""
Prepares the prediction data.
:param model_identifier: model identifier
:type model_identifier: str
:param feature_values: feature values
:type feature_values: numpy.array
:param feature_labels: feature labels, defaults to None
:type feature_labels: list, optional
:return: predicted data
:rtype: dict
"""
# Prepare the feature values and labels
feature_values = self.data_wrapper.wrap_data(feature_values)
feature_labels = feature_labels if feature_labels else []
# Return the prediction data
return {
"model": model_identifier,
"features": {
"values": feature_values,
"labels": feature_labels,
}
}
def _prepare_authentication_response(self, response):
"""
Prepares the authentication response.
:param response: API response
:type response: requests.Response
:return: prepared response data
:rtype: tuple
"""
# Extract the authentication-data and status code
data, state = response.json(), response.status_code
# Call the authorization hooks
self._update_username_hook(data.get("username"))
self._update_password_hook(data.get("password"))
# Return the authentication-data and status code
return data, state
def _prepare_authorization_response(self, response):
"""
Prepares the authorization response.
:param response: API response
:type response: requests.Response
:return: prepared response data
:rtype: tuple
"""
# Extract the authorization-data and status code
data, state = response.json(), response.status_code
# Call the authorization hooks
self._update_access_token_hook(data.get("access_token"))
self._update_refresh_token_hook(data.get("refresh_token"))
# Return the authorization-data and status code
return data, state
def _prepare_prediction_response(self, data, response, endpoint):
"""
Prepares the prediction response.
:param data: data for prediction
:type data: dict or str
:param response: API response
:type response: requests.Response
:param endpoint: endpoint to be called
:type endpoint: method
:return: prepared response data
:rtype: tuple
"""
# Log the prediction
self.logger.info(
f"./{endpoint[(endpoint.rfind('/') + 1):]} ({response.status_code}) "
f"data: {data}; "
f"response: {response.json()}")
# Return the prediction response
if response.status_code == HTTPStatus.OK:
return self.data_wrapper.unwrap_data(response.json().get("predicted")), response.status_code
return response.json(), response.status_code
# ----- #
# Hooks #
# ----- #
def _update_username_hook(self, username):
self.username = username
def _update_password_hook(self, password):
self.password = password
def _update_access_token_hook(self, access_token):
self.access_token = access_token
def _update_refresh_token_hook(self, refresh_token):
self.refresh_token = refresh_token
# ---------- #
# Properties #
# ---------- #
@property
def address(self):
return f"{self.host}:{self.port}"
@property
def signup_endpoint(self):
return f"{self.address}/signup"
@property
def login_endpoint(self):
return f"{self.address}/login"
@property
def refresh_endpoint(self):
return f"{self.address}/refresh"
@property
def predict_endpoint(self):
return f"{self.address}/predict"
@property
def predict_proba_endpoint(self):
return f"{self.address}/predict_proba"
@property
def username(self):
return self._username
@username.setter
def username(self, username):
self._username = username
@property
def password(self):
return self._password
@password.setter
def password(self, password):
self._password = password
@property
def access_token(self):
return self._access_token
@access_token.setter
def access_token(self, access_token):
self._access_token = access_token
@property
def refresh_token(self):
return self._refresh_token
@refresh_token.setter
def refresh_token(self, refresh_token):
self._refresh_token = refresh_token