diff --git a/operator_service/admin_routes.py b/operator_service/admin_routes.py index 3d66516..45659ff 100644 --- a/operator_service/admin_routes.py +++ b/operator_service/admin_routes.py @@ -4,7 +4,7 @@ import psycopg2 from flask import Blueprint, jsonify, request, Response from flask_headers import headers -from kubernetes.client.rest import ApiException + from operator_service.config import Config from operator_service.kubernetes_api import KubeAPI @@ -16,6 +16,7 @@ config = Config() logger = logging.getLogger(__name__) +kube_api = KubeAPI(config) @adminpg_services.route("/pgsqlinit", methods=["POST"]) @@ -168,10 +169,10 @@ def get_compute_job_info(): return output, code try: job_id = request.args["jobId"] - api_response = KubeAPI(config).get_namespaced_custom_object(job_id) + api_response = kube_api.get_namespaced_custom_object(job_id) logger.info(api_response) return jsonify(api_response), 200 - except ApiException as e: + except Exception as e: logger.error(f"The jobId {job_id} is not registered in your namespace: {e}") return f"The jobId {job_id} is not registered in your namespace.", 400 @@ -194,14 +195,14 @@ def list_compute_jobs(): logger.error(output) return output, code try: - api_response = KubeAPI(config).list_namespaced_custom_object() + api_response = kube_api.list_namespaced_custom_object() result = list() for i in api_response["items"]: result.append(i["metadata"]["name"]) logger.info(api_response) return jsonify(result), 200 - except ApiException as e: + except Exception as e: logger.error( f"Exception when calling CustomObjectsApi->list_cluster_custom_object: {e}" ) @@ -244,7 +245,7 @@ def get_logs(): logger.error(output) return output, code data = request.args - kube_api = KubeAPI(config) + try: job_id = data.get("jobId") component = data.get("component") @@ -254,7 +255,7 @@ def get_logs(): f"Looking pods in ns {kube_api.namespace} with labels {label_selector}" ) pod_response = kube_api.list_namespaced_pod(label_selector=label_selector) - except ApiException as e: + except Exception as e: logger.error( f"Exception when calling CustomObjectsApi->list_namespaced_pod: {e}" ) @@ -279,7 +280,7 @@ def get_logs(): r.headers["Content-Type"] = "text/plain; charset=utf-8" return r - except ApiException as e: + except Exception as e: logger.error( f"Exception when calling CustomObjectsApi->read_namespaced_pod_log: {e}" ) diff --git a/operator_service/data_store.py b/operator_service/data_store.py index 43dd097..11551c0 100644 --- a/operator_service/data_store.py +++ b/operator_service/data_store.py @@ -170,8 +170,15 @@ def get_nonce_for_certain_provider(provider_address: str): if not rows: logger.info("nonce is null") return [] - logger.info(f"nonce found: {max([float(row[0]) for row in rows])}") - return max([float(row[0]) for row in rows]) + values = [] + for row in rows: + if isinstance(row[0], str) and len(row[0]) == 19: + values.append(int(row[0])) + else: + values.append(float(row[0])) + + logger.info(f"nonce found: {max(values)}") + return max(values) except (Exception, psycopg2.Error) as error: logger.error(f"PG query error: {error}") return diff --git a/operator_service/kubernetes_api.py b/operator_service/kubernetes_api.py index 9245235..390e66c 100644 --- a/operator_service/kubernetes_api.py +++ b/operator_service/kubernetes_api.py @@ -1,15 +1,21 @@ +import base64 +from tempfile import NamedTemporaryFile +import os +import logging from os import path - import kubernetes from kubernetes import client from operator_service.config import Config + +logger = logging.getLogger(__name__) # Configuration to connect to k8s. -if not path.exists("/.dockerenv"): - kubernetes.config.load_kube_config() -else: +try: kubernetes.config.load_incluster_config() +except Exception as e: + logger.error("Failed to load kubernetes config") + exit(1) class KubeAPI: @@ -20,24 +26,23 @@ class KubeAPI: def __init__(self, config=None): if config is None: config = Config() - self.group = config.group self.version = config.version self.namespace = config.namespace self.plural = config.plural def create_namespaced_custom_object(self, body): - return KubeAPI.api_customobject.create_namespaced_custom_object( + return self.api_customobject.create_namespaced_custom_object( self.group, self.version, self.namespace, self.plural, body ) def get_namespaced_custom_object(self, job_id): - return KubeAPI.api_customobject.get_namespaced_custom_object( + return self.api_customobject.get_namespaced_custom_object( self.group, self.version, self.namespace, self.plural, job_id ) def list_namespaced_custom_object(self): - return KubeAPI.api_customobject.list_namespaced_custom_object( + return self.api_customobject.list_namespaced_custom_object( self.group, self.version, self.namespace, @@ -47,7 +52,7 @@ def list_namespaced_custom_object(self): def delete_namespaced_custom_object( self, name, body, grace_period_seconds, orphan_dependents, propagation_policy ): - return KubeAPI.api_customobject.delete_namespaced_custom_object( + return self.api_customobject.delete_namespaced_custom_object( self.group, self.version, self.namespace, @@ -60,11 +65,11 @@ def delete_namespaced_custom_object( ) def read_namespaced_pod_log(self, name): - return KubeAPI.api_core.read_namespaced_pod_log( + return self.api_core.read_namespaced_pod_log( name=name, namespace=self.namespace ) def list_namespaced_pod(self, label_selector): - return KubeAPI.api_core.list_namespaced_pod( + return self.api_core.list_namespaced_pod( self.namespace, label_selector=label_selector ) diff --git a/operator_service/routes.py b/operator_service/routes.py index bc15d3b..db63001 100644 --- a/operator_service/routes.py +++ b/operator_service/routes.py @@ -5,9 +5,8 @@ import kubernetes from flask import Blueprint, request, Response -from kubernetes.client.rest import ApiException -from operator_service.config import Config + from operator_service.data_store import ( create_sql_job, get_sql_status, @@ -20,7 +19,6 @@ check_environment_exists, get_job_by_provider_and_owner, ) -from operator_service.kubernetes_api import KubeAPI from operator_service.utils import ( create_compute_job, check_required_attributes, @@ -40,14 +38,6 @@ standard_headers = {"Content-type": "application/json", "Connection": "close"} -# Configuration to connect to k8s. -if not path.exists("/.dockerenv"): - kubernetes.config.load_kube_config() -else: - kubernetes.config.load_incluster_config() - -config = Config() - @services.route("/compute", methods=["POST"]) def start_compute_job(): @@ -175,12 +165,8 @@ def start_compute_job(): 400, headers=standard_headers, ) - except ApiException as e: - msg = f"Error getting the active jobs for initializing a compute job: {e}" - logger.error(msg) - return Response(json.dumps({"error": msg}), 400, headers=standard_headers) except Exception as e: - msg = f"{e}" + msg = f"Error getting the active jobs for initializing a compute job: {e}" logger.error(msg) return Response(json.dumps({"error": msg}), 400, headers=standard_headers) @@ -222,9 +208,7 @@ def start_compute_job(): ) job_id = generate_new_id() logger.info(f"Got job_id: {job_id}") - body = create_compute_job( - workflow, job_id, config.group, config.version, environment - ) + body = create_compute_job(workflow, job_id, environment) body["metadata"]["secret"] = generate_new_id() logger.debug(f"Got body: {body}") create_sql_job( @@ -324,17 +308,14 @@ def stop_compute_job(): return Response(json.dumps(status_list), 200, headers=standard_headers) - except ApiException as e: - logger.error(f"Exception when stopping compute job: {e}") + except Exception as e: + msg = f"Exception when stopping compute job: {e}" + logger.error(msg) return Response( - json.dumps({"error": f"Error stopping job: {e}"}), + json.dumps({"error": msg}), 400, headers=standard_headers, ) - except Exception as e: - msg = f"{e}" - logger.error(msg) - return Response(json.dumps({"error": msg}), 400, headers=standard_headers) @services.route("/compute", methods=["DELETE"]) @@ -433,6 +414,9 @@ def get_compute_job_status(): sign_message = f"{owner}{job_id}" else: sign_message = f"{owner}" + logger.info( + f"route get status process signature\nproviderSignature: {data.get('providerSignature')}\nsign_message: {sign_message}\nnonce to compare: {nonce}" + ) msg, status, provider_address = process_provider_signature_validation( data.get("providerSignature"), sign_message, nonce ) @@ -449,12 +433,8 @@ def get_compute_job_status(): headers=standard_headers, ) - except ApiException as e: - msg = f"Error getting the status: {e}" - logger.error(msg) - return Response(json.dumps({"error": msg}), 400) except Exception as e: - msg = f"{e}" + msg = f"Error getting the status: {e}" logger.error(msg) return Response(json.dumps({"error": msg}), 400, headers=standard_headers) @@ -475,12 +455,8 @@ def get_running_jobs(): try: api_response = sanitize_response_for_provider(get_sql_running_jobs()) return Response(json.dumps(api_response), 200, headers=standard_headers) - except ApiException as e: - msg = f"Error getting the status: {e}" - logger.error(msg) - return Response(json.dumps({"error": msg}), 400, headers=standard_headers) except Exception as e: - msg = f"{e}" + msg = f"Error getting running jobs: {e}" logger.error(msg) return Response(json.dumps({"error": msg}), 400, headers=standard_headers) @@ -578,12 +554,8 @@ def get_indexed_result(): request, requests_session, outputs[index]["url"], None ) - except ApiException as e: - msg = f"Error getting the status: {e}" - logger.error(msg) - return Response(json.dumps({"error": msg}), 400, headers=standard_headers) except Exception as e: - msg = f"{e}" + msg = f"Error getResult: {e}" logger.error(msg) return Response(json.dumps({"error": msg}), 400, headers=standard_headers) diff --git a/operator_service/utils.py b/operator_service/utils.py index 130e440..2ebf888 100644 --- a/operator_service/utils.py +++ b/operator_service/utils.py @@ -7,7 +7,6 @@ from cgi import parse_header from os import getenv -from kubernetes.client.rest import ApiException from eth_keys import KeyAPI from eth_keys.backends import NativeECCBackend from flask import Response, request @@ -31,9 +30,9 @@ def generate_new_id(): return uuid.uuid4().hex -def create_compute_job(workflow, execution_id, group, version, namespace): +def create_compute_job(workflow, execution_id, namespace): execution = dict() - execution["apiVersion"] = group + "/" + version + execution["apiVersion"] = "v0.0.1" execution["kind"] = "WorkFlow" execution["metadata"] = dict() execution["metadata"]["name"] = execution_id @@ -94,7 +93,7 @@ def process_provider_signature_validation(signature, original_msg, nonce): db_nonce = get_nonce_for_certain_provider(address) - if db_nonce and float(nonce) <= float(db_nonce): + if db_nonce and nonce <= db_nonce: msg = ( f"Invalid signature expected nonce ({db_nonce}) > current nonce ({nonce})." ) @@ -127,7 +126,7 @@ def get_list_of_allowed_providers(): logger.error("Failed loading ALLOWED_PROVIDERS") return [] return config_allowed_list - except ApiException as e: + except Exception as e: logging.error( f'Exception when calling json.loads(os.getenv("ALLOWED_PROVIDERS")): {e}' ) diff --git a/setup.py b/setup.py index b5f2d42..05a82a0 100644 --- a/setup.py +++ b/setup.py @@ -23,10 +23,10 @@ "flask-swagger-ui==3.20.9", "flask-headers", "Jinja2>=2.10.1,<3.1", - "kubernetes==10.0.0", + "kubernetes==27.2.0", "requests>=2.21.0", - "gunicorn==19.9.0", - "PyYAML==5.1", + "gunicorn==21.2.0", + "PyYAML>=5.4.1", "pytz==2018.5", "simplejson>=3.13.2", "psycopg2>=2.8.4", @@ -44,7 +44,7 @@ "twine==1.11.0", "flake8", "isort", - "black==22.1.0", + "black==22.3.0", "click==8.0.4", "pre-commit", "licenseheaders", diff --git a/test/kube_mock.py b/test/kube_mock.py index 56914ee..d7798a0 100644 --- a/test/kube_mock.py +++ b/test/kube_mock.py @@ -32,8 +32,6 @@ def create_namespaced_custom_object(self, body): correct_body = create_compute_job( processed_workflow, FAKE_UUID, - config.group, - config.version, config.namespace, ) assert body == correct_body