Skip to content

Commit

Permalink
feat(ingest): various logging improvements
Browse files Browse the repository at this point in the history
Follow up on #8015
  • Loading branch information
hsheth2 committed Aug 8, 2024
1 parent 840b150 commit 03fbdfe
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 3 deletions.
2 changes: 2 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ def _write_done_callback(
record_urn = _get_urn(record_envelope)
if record_urn:
e.info["urn"] = record_urn
if workunit_id := record_envelope.metadata.get("workunit_id"):
e.info["workunit_id"] = workunit_id

if not self.treat_errors_as_warnings:
self.report.report_failure({"error": e.message, "info": e.info})
Expand Down
14 changes: 11 additions & 3 deletions metadata-ingestion/src/datahub/telemetry/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ def init_tracking(self) -> None:
if not self.enabled or self.mp is None or self.tracking_init is True:
return

logger.debug("Sending init Telemetry")
logger.debug("Sending init telemetry")
try:
self.mp.people_set(
self.client_id,
Expand All @@ -310,13 +310,21 @@ def ping(
if not self.enabled or self.mp is None:
return

properties = properties or {}

# send event
try:
logger.debug(f"Sending telemetry for {event_name}")
if event_name == "function-call":
logger.debug(
f"Sending telemetry for {event_name} {properties.get('function')}, status {properties.get('status')}"
)
else:
logger.debug(f"Sending telemetry for {event_name}")

properties = {
**_default_telemetry_properties(),
**self._server_props(server),
**(properties or {}),
**properties,
}
self.mp.track(self.client_id, event_name, properties)
except Exception as e:
Expand Down
5 changes: 5 additions & 0 deletions metadata-ingestion/src/datahub/utilities/logging_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
"acryl_datahub_cloud",
]
IN_MEMORY_LOG_BUFFER_SIZE = 2000 # lines
IN_MEMORY_LOG_BUFFER_MAX_LINE_LENGTH = 2000 # characters


NO_COLOR = os.environ.get("NO_COLOR", False)

Expand Down Expand Up @@ -159,6 +161,9 @@ def __init__(self, maxlen: Optional[int] = None) -> None:
self._buffer: Deque[str] = collections.deque(maxlen=maxlen)

def write(self, line: str) -> None:
if len(line) > IN_MEMORY_LOG_BUFFER_MAX_LINE_LENGTH:
line = line[:IN_MEMORY_LOG_BUFFER_MAX_LINE_LENGTH] + "[truncated]"

self._buffer.append(line)

def clear(self) -> None:
Expand Down

0 comments on commit 03fbdfe

Please sign in to comment.