Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix signature validation #83

Merged
merged 6 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions operator_service/admin_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import psycopg2
from flask import Blueprint, jsonify, request, Response
from flask_headers import headers
from kubernetes.client.rest import ApiException


from operator_service.config import Config
from operator_service.kubernetes_api import KubeAPI
Expand All @@ -16,6 +16,7 @@

config = Config()
logger = logging.getLogger(__name__)
kube_api = KubeAPI(config)


@adminpg_services.route("/pgsqlinit", methods=["POST"])
Expand Down Expand Up @@ -168,10 +169,10 @@ def get_compute_job_info():
return output, code
try:
job_id = request.args["jobId"]
api_response = KubeAPI(config).get_namespaced_custom_object(job_id)
api_response = kube_api.get_namespaced_custom_object(job_id)
logger.info(api_response)
return jsonify(api_response), 200
except ApiException as e:
except Exception as e:
logger.error(f"The jobId {job_id} is not registered in your namespace: {e}")
return f"The jobId {job_id} is not registered in your namespace.", 400

Expand All @@ -194,14 +195,14 @@ def list_compute_jobs():
logger.error(output)
return output, code
try:
api_response = KubeAPI(config).list_namespaced_custom_object()
api_response = kube_api.list_namespaced_custom_object()
result = list()
for i in api_response["items"]:
result.append(i["metadata"]["name"])
logger.info(api_response)
return jsonify(result), 200

except ApiException as e:
except Exception as e:
logger.error(
f"Exception when calling CustomObjectsApi->list_cluster_custom_object: {e}"
)
Expand Down Expand Up @@ -244,7 +245,7 @@ def get_logs():
logger.error(output)
return output, code
data = request.args
kube_api = KubeAPI(config)

try:
job_id = data.get("jobId")
component = data.get("component")
Expand All @@ -254,7 +255,7 @@ def get_logs():
f"Looking pods in ns {kube_api.namespace} with labels {label_selector}"
)
pod_response = kube_api.list_namespaced_pod(label_selector=label_selector)
except ApiException as e:
except Exception as e:
logger.error(
f"Exception when calling CustomObjectsApi->list_namespaced_pod: {e}"
)
Expand All @@ -279,7 +280,7 @@ def get_logs():
r.headers["Content-Type"] = "text/plain; charset=utf-8"
return r

except ApiException as e:
except Exception as e:
logger.error(
f"Exception when calling CustomObjectsApi->read_namespaced_pod_log: {e}"
)
Expand Down
11 changes: 9 additions & 2 deletions operator_service/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,15 @@ def get_nonce_for_certain_provider(provider_address: str):
if not rows:
logger.info("nonce is null")
return []
logger.info(f"nonce found: {max([float(row[0]) for row in rows])}")
return max([float(row[0]) for row in rows])
values = []
for row in rows:
if isinstance(row[0], str) and len(row[0]) == 19:
values.append(int(row[0]))
else:
values.append(float(row[0]))

logger.info(f"nonce found: {max(values)}")
return max(values)
except (Exception, psycopg2.Error) as error:
logger.error(f"PG query error: {error}")
return
Expand Down
27 changes: 16 additions & 11 deletions operator_service/kubernetes_api.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
import base64
from tempfile import NamedTemporaryFile
import os
import logging
from os import path

import kubernetes
from kubernetes import client

from operator_service.config import Config


logger = logging.getLogger(__name__)
# Configuration to connect to k8s.
if not path.exists("/.dockerenv"):
kubernetes.config.load_kube_config()
else:
try:
kubernetes.config.load_incluster_config()
except Exception as e:
logger.error("Failed to load kubernetes config")
exit(1)


class KubeAPI:
Expand All @@ -20,24 +26,23 @@ class KubeAPI:
def __init__(self, config=None):
if config is None:
config = Config()

self.group = config.group
self.version = config.version
self.namespace = config.namespace
self.plural = config.plural

def create_namespaced_custom_object(self, body):
return KubeAPI.api_customobject.create_namespaced_custom_object(
return self.api_customobject.create_namespaced_custom_object(
self.group, self.version, self.namespace, self.plural, body
)

def get_namespaced_custom_object(self, job_id):
return KubeAPI.api_customobject.get_namespaced_custom_object(
return self.api_customobject.get_namespaced_custom_object(
self.group, self.version, self.namespace, self.plural, job_id
)

def list_namespaced_custom_object(self):
return KubeAPI.api_customobject.list_namespaced_custom_object(
return self.api_customobject.list_namespaced_custom_object(
self.group,
self.version,
self.namespace,
Expand All @@ -47,7 +52,7 @@ def list_namespaced_custom_object(self):
def delete_namespaced_custom_object(
self, name, body, grace_period_seconds, orphan_dependents, propagation_policy
):
return KubeAPI.api_customobject.delete_namespaced_custom_object(
return self.api_customobject.delete_namespaced_custom_object(
self.group,
self.version,
self.namespace,
Expand All @@ -60,11 +65,11 @@ def delete_namespaced_custom_object(
)

def read_namespaced_pod_log(self, name):
return KubeAPI.api_core.read_namespaced_pod_log(
return self.api_core.read_namespaced_pod_log(
name=name, namespace=self.namespace
)

def list_namespaced_pod(self, label_selector):
return KubeAPI.api_core.list_namespaced_pod(
return self.api_core.list_namespaced_pod(
self.namespace, label_selector=label_selector
)
54 changes: 13 additions & 41 deletions operator_service/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@

import kubernetes
from flask import Blueprint, request, Response
from kubernetes.client.rest import ApiException

from operator_service.config import Config

from operator_service.data_store import (
create_sql_job,
get_sql_status,
Expand All @@ -20,7 +19,6 @@
check_environment_exists,
get_job_by_provider_and_owner,
)
from operator_service.kubernetes_api import KubeAPI
from operator_service.utils import (
create_compute_job,
check_required_attributes,
Expand All @@ -40,14 +38,6 @@

standard_headers = {"Content-type": "application/json", "Connection": "close"}

# Configuration to connect to k8s.
if not path.exists("/.dockerenv"):
kubernetes.config.load_kube_config()
else:
kubernetes.config.load_incluster_config()

config = Config()


@services.route("/compute", methods=["POST"])
def start_compute_job():
Expand Down Expand Up @@ -175,12 +165,8 @@ def start_compute_job():
400,
headers=standard_headers,
)
except ApiException as e:
msg = f"Error getting the active jobs for initializing a compute job: {e}"
logger.error(msg)
return Response(json.dumps({"error": msg}), 400, headers=standard_headers)
except Exception as e:
msg = f"{e}"
msg = f"Error getting the active jobs for initializing a compute job: {e}"
logger.error(msg)
return Response(json.dumps({"error": msg}), 400, headers=standard_headers)

Expand Down Expand Up @@ -222,9 +208,7 @@ def start_compute_job():
)
job_id = generate_new_id()
logger.info(f"Got job_id: {job_id}")
body = create_compute_job(
workflow, job_id, config.group, config.version, environment
)
body = create_compute_job(workflow, job_id, environment)
body["metadata"]["secret"] = generate_new_id()
logger.debug(f"Got body: {body}")
create_sql_job(
Expand Down Expand Up @@ -324,17 +308,14 @@ def stop_compute_job():

return Response(json.dumps(status_list), 200, headers=standard_headers)

except ApiException as e:
logger.error(f"Exception when stopping compute job: {e}")
except Exception as e:
msg = f"Exception when stopping compute job: {e}"
logger.error(msg)
return Response(
json.dumps({"error": f"Error stopping job: {e}"}),
json.dumps({"error": msg}),
400,
headers=standard_headers,
)
except Exception as e:
msg = f"{e}"
logger.error(msg)
return Response(json.dumps({"error": msg}), 400, headers=standard_headers)


@services.route("/compute", methods=["DELETE"])
Expand Down Expand Up @@ -433,6 +414,9 @@ def get_compute_job_status():
sign_message = f"{owner}{job_id}"
else:
sign_message = f"{owner}"
logger.info(
f"route get status process signature\nproviderSignature: {data.get('providerSignature')}\nsign_message: {sign_message}\nnonce to compare: {nonce}"
)
msg, status, provider_address = process_provider_signature_validation(
data.get("providerSignature"), sign_message, nonce
)
Expand All @@ -449,12 +433,8 @@ def get_compute_job_status():
headers=standard_headers,
)

except ApiException as e:
msg = f"Error getting the status: {e}"
logger.error(msg)
return Response(json.dumps({"error": msg}), 400)
except Exception as e:
msg = f"{e}"
msg = f"Error getting the status: {e}"
logger.error(msg)
return Response(json.dumps({"error": msg}), 400, headers=standard_headers)

Expand All @@ -475,12 +455,8 @@ def get_running_jobs():
try:
api_response = sanitize_response_for_provider(get_sql_running_jobs())
return Response(json.dumps(api_response), 200, headers=standard_headers)
except ApiException as e:
msg = f"Error getting the status: {e}"
logger.error(msg)
return Response(json.dumps({"error": msg}), 400, headers=standard_headers)
except Exception as e:
msg = f"{e}"
msg = f"Error getting running jobs: {e}"
logger.error(msg)
return Response(json.dumps({"error": msg}), 400, headers=standard_headers)

Expand Down Expand Up @@ -578,12 +554,8 @@ def get_indexed_result():
request, requests_session, outputs[index]["url"], None
)

except ApiException as e:
msg = f"Error getting the status: {e}"
logger.error(msg)
return Response(json.dumps({"error": msg}), 400, headers=standard_headers)
except Exception as e:
msg = f"{e}"
msg = f"Error getResult: {e}"
logger.error(msg)
return Response(json.dumps({"error": msg}), 400, headers=standard_headers)

Expand Down
9 changes: 4 additions & 5 deletions operator_service/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from cgi import parse_header
from os import getenv

from kubernetes.client.rest import ApiException
from eth_keys import KeyAPI
from eth_keys.backends import NativeECCBackend
from flask import Response, request
Expand All @@ -31,9 +30,9 @@ def generate_new_id():
return uuid.uuid4().hex


def create_compute_job(workflow, execution_id, group, version, namespace):
def create_compute_job(workflow, execution_id, namespace):
execution = dict()
execution["apiVersion"] = group + "/" + version
execution["apiVersion"] = "v0.0.1"
execution["kind"] = "WorkFlow"
execution["metadata"] = dict()
execution["metadata"]["name"] = execution_id
Expand Down Expand Up @@ -94,7 +93,7 @@ def process_provider_signature_validation(signature, original_msg, nonce):

db_nonce = get_nonce_for_certain_provider(address)

if db_nonce and float(nonce) <= float(db_nonce):
if db_nonce and nonce <= db_nonce:
msg = (
f"Invalid signature expected nonce ({db_nonce}) > current nonce ({nonce})."
)
Expand Down Expand Up @@ -127,7 +126,7 @@ def get_list_of_allowed_providers():
logger.error("Failed loading ALLOWED_PROVIDERS")
return []
return config_allowed_list
except ApiException as e:
except Exception as e:
logging.error(
f'Exception when calling json.loads(os.getenv("ALLOWED_PROVIDERS")): {e}'
)
Expand Down
8 changes: 4 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
"flask-swagger-ui==3.20.9",
"flask-headers",
"Jinja2>=2.10.1,<3.1",
"kubernetes==10.0.0",
"kubernetes==27.2.0",
"requests>=2.21.0",
"gunicorn==19.9.0",
"PyYAML==5.1",
"gunicorn==21.2.0",
"PyYAML>=5.4.1",
"pytz==2018.5",
"simplejson>=3.13.2",
"psycopg2>=2.8.4",
Expand All @@ -44,7 +44,7 @@
"twine==1.11.0",
"flake8",
"isort",
"black==22.1.0",
"black==22.3.0",
"click==8.0.4",
"pre-commit",
"licenseheaders",
Expand Down
2 changes: 0 additions & 2 deletions test/kube_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ def create_namespaced_custom_object(self, body):
correct_body = create_compute_job(
processed_workflow,
FAKE_UUID,
config.group,
config.version,
config.namespace,
)
assert body == correct_body
Expand Down