Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(api): logs #3272

Merged
merged 6 commits into from
Feb 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions keep/api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@ def on_starting(server=None):

def post_worker_init(worker):
# We need to reinitialize logging in each worker because gunicorn forks the worker processes
print("Init logging in worker")
logging.getLogger().handlers = [] # noqa
keep.api.logging.setup_logging() # noqa
print("Logging initialized in worker")


post_worker_init = post_worker_init
58 changes: 34 additions & 24 deletions keep/api/core/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -806,32 +806,42 @@ def push_logs_to_db(log_entries):
# avoid circular import
from keep.api.logging import LOG_FORMAT, LOG_FORMAT_OPEN_TELEMETRY

db_log_entries = []
if LOG_FORMAT == LOG_FORMAT_OPEN_TELEMETRY:
db_log_entries = [
WorkflowExecutionLog(
workflow_execution_id=log_entry["workflow_execution_id"],
timestamp=datetime.strptime(
log_entry["asctime"], "%Y-%m-%d %H:%M:%S,%f"
),
message=log_entry["message"][0:255], # limit the message to 255 chars
context=json.loads(
json.dumps(log_entry.get("context", {}), default=str)
), # workaround to serialize any object
)
for log_entry in log_entries
]
for log_entry in log_entries:
try:
WorkflowExecutionLog(
workflow_execution_id=log_entry["workflow_execution_id"],
timestamp=datetime.strptime(
log_entry["asctime"], "%Y-%m-%d %H:%M:%S,%f"
),
message=log_entry["message"][
0:255
], # limit the message to 255 chars
context=json.loads(
json.dumps(log_entry.get("context", {}), default=str)
), # workaround to serialize any object
)
db_log_entries.append(log_entry)
except Exception:
print("Failed to parse log entry - ", log_entry)

else:
db_log_entries = [
WorkflowExecutionLog(
workflow_execution_id=log_entry["workflow_execution_id"],
timestamp=log_entry["created"],
message=log_entry["message"][0:255], # limit the message to 255 chars
context=json.loads(
json.dumps(log_entry.get("context", {}), default=str)
), # workaround to serialize any object
)
for log_entry in log_entries
]
for log_entry in log_entries:
try:
log_entry = WorkflowExecutionLog(
workflow_execution_id=log_entry["workflow_execution_id"],
timestamp=log_entry["created"],
message=log_entry["message"][
0:255
], # limit the message to 255 chars
context=json.loads(
json.dumps(log_entry.get("context", {}), default=str)
), # workaround to serialize any object
)
db_log_entries.append(log_entry)
except Exception:
print("Failed to parse log entry - ", log_entry)

# Add the LogEntry instances to the database session
with Session(engine) as session:
Expand Down
33 changes: 15 additions & 18 deletions keep/api/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def filter(self, record):
if not workflow_id:
return False

print("Adding workflow_id to log record")
# Skip DEBUG logs unless debug mode is enabled
if not getattr(thread, "workflow_debug", False) and record.levelname == "DEBUG":
return False
Expand Down Expand Up @@ -97,9 +98,9 @@ def __init__(self, flush_interval: int = 2):

def _timer_run(self):
while not self._stop_event.is_set():
logging.getLogger(__name__).info("Timer running")
# logging.getLogger(__name__).info("Timer running")
self.flush()
logging.getLogger(__name__).info("Timer sleeping")
# logging.getLogger(__name__).info("Timer sleeping")
self._stop_event.wait(self.flush_interval) # Wait but can be interrupted

def close(self):
Expand Down Expand Up @@ -459,22 +460,6 @@ def _log(
)


# MONKEY PATCHING http.client
# See: https://stackoverflow.com/questions/58738195/python-http-request-and-debug-level-logging-to-the-log-file
http_client_logger = logging.getLogger("http.client")
http_client_logger.setLevel(logging.DEBUG)
http.client.HTTPConnection.debuglevel = 1


def print_to_log(*args):
http_client_logger.debug(" ".join(args))


# monkey-patch a `print` global into the http.client module; all calls to
# print() in that module will then use our print_to_log implementation
http.client.print = print_to_log


def setup_logging():
# Add file handler if KEEP_LOG_FILE is set
if KEEP_LOG_FILE:
Expand All @@ -489,3 +474,15 @@ def setup_logging():
CONFIG["loggers"][""]["handlers"].append("file")

logging.config.dictConfig(CONFIG)
# MONKEY PATCHING http.client
# See: https://stackoverflow.com/questions/58738195/python-http-request-and-debug-level-logging-to-the-log-file
http_client_logger = logging.getLogger("http.client")
http_client_logger.setLevel(logging.DEBUG)
http.client.HTTPConnection.debuglevel = 1

def print_to_log(*args):
http_client_logger.debug(" ".join(args))

# monkey-patch a `print` global into the http.client module; all calls to
# print() in that module will then use our print_to_log implementation
http.client.print = print_to_log
Loading