Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Factor out attributes into EventAttributes class #721

Open
wants to merge 2 commits into
base: rc-kueue
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 21 additions & 24 deletions octue/cloud/emulators/_pub_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,31 +405,28 @@ def ask(
# If the originator isn't provided, assume that this service revision is the originator.
originator = originator or self.id

attributes = make_minimal_dictionary(
datetime="2024-04-11T10:46:48.236064",
uuid="a9de11b1-e88f-43fa-b3a4-40a590c3443f",
question_uuid=question_uuid,
parent_question_uuid=parent_question_uuid,
originator_question_uuid=originator_question_uuid,
forward_logs=subscribe_to_logs,
save_diagnostics=save_diagnostics,
parent=self.id,
originator=originator,
sender=self.id,
sender_type=PARENT_SENDER_TYPE,
sender_sdk_version=parent_sdk_version,
recipient=service_id,
retry_count=retry_count,
cpus=cpus,
memory=memory,
ephemeral_storage=ephemeral_storage,
)

try:
self.children[service_id].answer(
MockMessage.from_primitive(
data=question,
attributes={
"datetime": "2024-04-11T10:46:48.236064",
"uuid": "a9de11b1-e88f-43fa-b3a4-40a590c3443f",
"question_uuid": question_uuid,
"parent_question_uuid": parent_question_uuid,
"originator_question_uuid": originator_question_uuid,
"forward_logs": subscribe_to_logs,
"save_diagnostics": save_diagnostics,
"parent": self.id,
"originator": originator,
"sender": self.id,
"sender_type": PARENT_SENDER_TYPE,
"sender_sdk_version": parent_sdk_version,
"recipient": service_id,
"retry_count": retry_count,
"cpus": cpus,
"memory": memory,
"ephemeral_storage": ephemeral_storage,
},
)
)
self.children[service_id].answer(MockMessage.from_primitive(data=question, attributes=attributes))
except Exception as e: # noqa
logger.exception(e)

Expand Down
95 changes: 95 additions & 0 deletions octue/cloud/events/attributes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import datetime as dt
import json
import uuid as uuid_library

from octue.cloud import LOCAL_SDK_VERSION
from octue.utils.dictionaries import make_minimal_dictionary

SENDER_TYPE_OPPOSITES = {"CHILD": "PARENT", "PARENT": "CHILD"}


class EventAttributes:
def __init__(
self,
originator_question_uuid,
parent,
originator,
sender,
sender_type,
recipient,
uuid=None,
datetime=None,
question_uuid=None,
parent_question_uuid=None,
sender_sdk_version=None,
retry_count=0,
forward_logs=True,
save_diagnostics=True,
cpus=None,
memory=None,
ephemeral_storage=None,
):
# Attributes for all event kinds.
self.uuid = uuid or str(uuid_library.uuid4())
self.datetime = datetime or dt.datetime.now(tz=dt.timezone.utc).isoformat()
self.question_uuid = question_uuid or str(uuid_library.uuid4())
self.parent_question_uuid = parent_question_uuid
self.originator_question_uuid = originator_question_uuid
self.parent = parent
self.originator = originator
self.sender = sender
self.sender_type = sender_type
self.sender_sdk_version = sender_sdk_version or LOCAL_SDK_VERSION
self.recipient = recipient
self.retry_count = int(retry_count)

# Question event attributes.
self.forward_logs = bool(forward_logs)
self.save_diagnostics = save_diagnostics
self.cpus = cpus
self.memory = memory
self.ephemeral_storage = ephemeral_storage

def make_response_attributes(self):
attributes = self.to_dict()
attributes["sender"] = self.recipient
attributes["sender_type"] = SENDER_TYPE_OPPOSITES[self.sender_type]
attributes["sender_sdk_version"] = LOCAL_SDK_VERSION
attributes["recipient"] = self.sender
return EventAttributes(**attributes)

def to_dict(self):
return make_minimal_dictionary(
uuid=self.uuid,
datetime=self.datetime,
question_uuid=self.question_uuid,
parent_question_uuid=self.parent_question_uuid,
originator_question_uuid=self.originator_question_uuid,
parent=self.parent,
originator=self.originator,
sender=self.sender,
sender_type=self.sender_type,
sender_sdk_version=self.sender_sdk_version,
recipient=self.recipient,
retry_count=self.retry_count,
forward_logs=self.forward_logs,
save_diagnostics=self.save_diagnostics,
cpus=self.cpus,
memory=self.memory,
ephemeral_storage=self.ephemeral_storage,
)

def to_serialised_attributes(self):
serialised_attributes = {}

for key, value in self.to_dict().items():
if isinstance(value, bool):
value = str(int(value))
elif isinstance(value, (int, float)):
value = str(value)
elif value is None:
value = json.dumps(value)

serialised_attributes[key] = value

return serialised_attributes
59 changes: 3 additions & 56 deletions octue/cloud/events/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import datetime
import uuid

from octue.definitions import LOCAL_SDK_VERSION
from octue.cloud.events.attributes import EventAttributes
from octue.utils.dictionaries import make_minimal_dictionary


Expand All @@ -26,7 +25,7 @@ def make_question_event(
if not attributes:
question_uuid = question_uuid or str(uuid.uuid4())

attributes = make_attributes(
attributes = EventAttributes(
question_uuid=question_uuid,
sender=sender,
recipient=recipient,
Expand All @@ -37,57 +36,5 @@ def make_question_event(

return {
"event": make_minimal_dictionary(input_values=input_values, input_manifest=input_manifest, kind="question"),
"attributes": attributes,
"attributes": attributes.to_dict(),
}


def make_attributes(
sender,
sender_type,
recipient,
question_uuid=None,
parent_question_uuid=None,
originator_question_uuid=None,
parent=None,
originator=None,
retry_count=0,
forward_logs=None,
save_diagnostics=None,
cpus=None,
memory=None,
ephemeral_storage=None,
):
# If the originator isn't provided, assume that this service revision is the originator.
originator_question_uuid = originator_question_uuid or question_uuid
parent = parent or sender
originator = originator or sender

attributes = {
"uuid": str(uuid.uuid4()),
"datetime": datetime.datetime.now(tz=datetime.timezone.utc).isoformat(),
"question_uuid": question_uuid or str(uuid.uuid4()),
"parent_question_uuid": parent_question_uuid,
"originator_question_uuid": originator_question_uuid,
"parent": parent,
"originator": originator,
"sender": sender,
"sender_type": sender_type,
"sender_sdk_version": LOCAL_SDK_VERSION,
"recipient": recipient,
"retry_count": int(retry_count),
}

if sender_type == "PARENT":
if forward_logs:
attributes["forward_logs"] = bool(forward_logs)

attributes.update(
make_minimal_dictionary(
save_diagnostics=save_diagnostics,
cpus=cpus,
memory=memory,
ephemeral_storage=ephemeral_storage,
)
)

return attributes
20 changes: 4 additions & 16 deletions octue/cloud/events/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,8 @@
"description": "The UUID of the question the event is related to.",
},
"parent_question_uuid": {
"oneOf": [
{
"type": "string",
"description": "The UUID of the question that triggered this question.",
},
{"type": "null", "description": "If this is the originating question."},
]
"type": "string",
"description": "If this isn't the originating question, the UUID of the question that triggered this question. If it is, don't provide this.",
},
"originator_question_uuid": {
"type": "string",
Expand Down Expand Up @@ -106,7 +101,6 @@
"datetime",
"uuid",
"question_uuid",
"parent_question_uuid",
"originator_question_uuid",
"forward_logs",
"save_diagnostics",
Expand Down Expand Up @@ -137,13 +131,8 @@
"description": "The UUID of the question the event is related to.",
},
"parent_question_uuid": {
"oneOf": [
{
"type": "string",
"description": "The UUID of the question that triggered this question.",
},
{"type": "null", "description": "If this is the originating question."},
]
"type": "string",
"description": "If this isn't the originating question, the UUID of the question that triggered this question. If it is, don't provide this.",
},
"originator_question_uuid": {
"type": "string",
Expand Down Expand Up @@ -188,7 +177,6 @@
"datetime",
"uuid",
"question_uuid",
"parent_question_uuid",
"originator_question_uuid",
"parent",
"originator",
Expand Down
44 changes: 6 additions & 38 deletions octue/cloud/pub_sub/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,14 @@ class GoogleCloudPubSubHandler(logging.Handler):
"""A log handler that publishes log records to a Google Cloud Pub/Sub topic.

:param callable event_emitter: the `_emit_event` method of the service that instantiated this instance
:param str question_uuid: the UUID of the question to handle log records for
:param str|None parent_question_uuid: the UUID of the question these log records are related to
:param str|None originator_question_uuid: the UUID of the question that triggered all ancestor questions of this question
:param str parent: the SRUID of the parent that asked the question these log records are related to
:param str originator: the SRUID of the service revision that triggered the tree of questions these log records are related to
:param str recipient: the SRUID of the service to send these log records to
:param int retry_count: the retry count of the question (this is zero if it's the first attempt at the question)
:param octue.cloud.events.attributes.EventAttributes attributes:
:param float timeout: timeout in seconds for attempting to publish each log record
:return None:
"""

def __init__(
self,
event_emitter,
question_uuid,
parent_question_uuid,
originator_question_uuid,
parent,
originator,
recipient,
retry_count,
timeout=60,
*args,
**kwargs,
):
def __init__(self, event_emitter, attributes, timeout=60, *args, **kwargs):
super().__init__(*args, **kwargs)
self.question_uuid = question_uuid
self.parent_question_uuid = parent_question_uuid
self.originator_question_uuid = originator_question_uuid
self.parent = parent
self.originator = originator
self.recipient = recipient
self.retry_count = retry_count
self.attributes = attributes
self.timeout = timeout
self._emit_event = event_emitter

Expand All @@ -56,22 +31,15 @@ def emit(self, record):
"kind": "log_record",
"log_record": self._convert_log_record_to_primitives(record),
},
parent=self.parent,
originator=self.originator,
recipient=self.recipient,
retry_count=self.retry_count,
question_uuid=self.question_uuid,
parent_question_uuid=self.parent_question_uuid,
originator_question_uuid=self.originator_question_uuid,
# The sender type is repeated here as a string to avoid a circular import.
attributes={"sender_type": "CHILD"},
attributes=self.attributes,
wait=False,
)

except Exception: # noqa
self.handleError(record)

def _convert_log_record_to_primitives(self, log_record):
@staticmethod
def _convert_log_record_to_primitives(log_record):
"""Convert a log record to JSON-serialisable primitives by interpolating the args into the message, and
removing the exception info, which is potentially not JSON-serialisable. This is similar to the approach in
`logging.handlers.SocketHandler.makePickle`. Also strip any ANSI escape sequences from the message.
Expand Down
Loading