Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RSPEED-129] Add audit logging capability to CLAD #90

Merged
merged 1 commit into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions command_line_assistant/commands/query.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Module to handle the query command."""

import getpass
from argparse import Namespace
from typing import Optional

Expand Down Expand Up @@ -119,6 +120,8 @@ def run(self) -> int:

input_query = Message()
input_query.message = query
# Get the current user
input_query.user = getpass.getuser()
output = "Nothing to see here..."

try:
Expand Down
23 changes: 23 additions & 0 deletions command_line_assistant/config/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,15 @@ class LoggingSchema:

Attributes:
level (str): The level to log. Defaults to "INFO".
responses (bool): If the responses should be logged. Defaults to True.
question (bool): If the questions should be logged. Defaults to True.
users (dict[str, dict[str, bool]]): A dictionary of users and their logging preferences.
"""

level: str = "INFO"
responses: bool = True
question: bool = True
users: dict[str, dict[str, bool]] = dataclasses.field(default_factory=dict)

def __post_init__(self) -> None:
"""Post initialization method to normalize values
Expand All @@ -30,6 +36,23 @@ def __post_init__(self) -> None:

self.level = self.level.upper()

def should_log_for_user(self, username: str, log_type: str) -> bool:
"""Check if logging should be enabled for a specific user and log type.

Args:
username (str): The username to check
log_type (str): The type of log ('responses' or 'question')

Returns:
bool: Whether logging should be enabled for this user and log type
"""
# If user has specific settings, use those
if username in self.users:
return self.users[username].get(log_type, False)

# Otherwise fall back to global settings
return getattr(self, log_type, False)


@dataclasses.dataclass
class OutputSchema:
Expand Down
13 changes: 13 additions & 0 deletions command_line_assistant/dbus/interfaces.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""D-Bus interfaces that defines and powers our commands."""

import logging

from dasbus.server.interface import dbus_interface
from dasbus.server.property import emits_properties_changed
from dasbus.server.template import InterfaceTemplate
Expand All @@ -14,6 +16,8 @@
from command_line_assistant.history.manager import HistoryManager
from command_line_assistant.history.plugins.local import LocalHistory

audit_logger = logging.getLogger("audit")


@dbus_interface(QUERY_IDENTIFIER.interface_name)
class QueryInterface(InterfaceTemplate):
Expand All @@ -26,6 +30,7 @@ def RetrieveAnswer(self) -> Structure:
Structure: The message output in format of a d-bus structure.
"""
query = self.implementation.query.message
user = self.implementation.query.user

# Submit query to backend
llm_response = submit(query, self.implementation.config)
Expand All @@ -39,6 +44,14 @@ def RetrieveAnswer(self) -> Structure:
current_history = manager.read()
manager.write(current_history, query, llm_response)

audit_logger.info(
"Query executed successfully.",
extra={
"user": user,
"query": query,
"response": llm_response,
},
)
# Return the data - okay
return Message.to_structure(message)

Expand Down
19 changes: 19 additions & 0 deletions command_line_assistant/dbus/structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class Message(DBusData):
def __init__(self) -> None:
"""Constructor of class."""
self._message: Str = ""
self._user: Str = ""
super().__init__()

@property
Expand All @@ -30,6 +31,24 @@ def message(self, value: Str) -> None:
"""
self._message = value

@property
def user(self) -> Str:
"""Property for internal user attribute.

Returns:
Str: Value of user
"""
return self._user

@user.setter
def user(self, value: Str) -> None:
"""Set a new user

Args:
value (Str): User to be set to the internal property
"""
self._user = value


class HistoryItem(DBusData):
"""Represents a single history item with query and response"""
Expand Down
101 changes: 99 additions & 2 deletions command_line_assistant/logger.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Module for logging configuration."""

import copy
import json
import logging.config
from typing import Optional

from command_line_assistant.config import Config

Expand All @@ -13,20 +15,113 @@
"default": {
"format": "[%(asctime)s] [%(filename)s:%(lineno)d] %(levelname)s: %(message)s",
"datefmt": "%m/%d/%Y %I:%M:%S %p",
}
},
"audit": {
"()": "command_line_assistant.logger._create_audit_formatter",
"config": None, # Will be set in setup_logging
},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "default",
"stream": "ext://sys.stdout",
},
"audit_file": {
"class": "logging.FileHandler",
"filename": "/var/log/audit/command-line-assistant.log",
"formatter": "audit",
"mode": "a",
},
},
"loggers": {
"root": {"handlers": ["console"], "level": "DEBUG"},
"audit": {
"handlers": ["audit_file", "console"],
"level": "INFO",
"propagate": False,
},
},
"loggers": {"root": {"handlers": ["console"], "level": "DEBUG"}},
}
#: Define the dictionary configuration for the logger instance


class AuditFormatter(logging.Formatter):
"""Custom formatter that handles user-specific logging configuration."""

def __init__(
self, config: Config, fmt: Optional[str] = None, datefmt: Optional[str] = None
):
"""Initialize the formatter with config.

Args:
config (Config): The application configuration
fmt (Optional[str], optional): Format string. Defaults to None.
datefmt (Optional[str], optional): Date format string. Defaults to None.
"""
super().__init__(fmt, datefmt)
self._config = config

def format(self, record: logging.LogRecord) -> str:
"""Format the record based on user-specific settings.

Args:
record (logging.LogRecord): The log record to format

Note:
This method is called by the logging framework to format the log message.

Example:
This is how it will look like in the audit.log file::

>>> # In case the query and response are disabled for the current user.
>>> {"timestamp":"2025-01-03T11:26:37.%fZ","user":"my-user","message":"Query executed successfully.","query":null,"response":null}

>>> # In case the query and response are enabled for the current user.
>>> {"timestamp":"2025-01-03T11:26:37.%fZ","user":"my-user","message":"Query executed successfully.","query":"My query!","response":"My super response"}

Returns:
str: The formatted log message
"""
# Basic structure that will always be included
data = {
"timestamp": self.formatTime(record, self.datefmt),
"user": getattr(record, "user", "unknown"),
"message": record.getMessage(),
}

is_query_enabled = hasattr(
record, "query"
) and self._config.logging.should_log_for_user(data["user"], "question")
# Add query if enabled for user
data["query"] = record.query if is_query_enabled else None # type: ignore

is_response_enabled = hasattr(
record, "response"
) and self._config.logging.should_log_for_user(data["user"], "responses")
# Add response if enabled for user
data["response"] = record.response if is_response_enabled else None # type: ignore

# separators will remove whitespace between items
# ensure_ascii will properly handle unicode characters.
return json.dumps(data, separators=(",", ":"), ensure_ascii=False)


def _create_audit_formatter(config: Config) -> AuditFormatter:
"""Internal method to create a new audit formatter instance.

Args:
config (Config): The application configuration

Returns:
AuditFormatter: The new audit formatter instance.
"""

fmt = '{"timestamp": "%(asctime)s", "user": "%(user)s", "message": "%(message)s"%(query)s%(response)s}'
datefmt = "%Y-%m-%dT%H:%M:%S.%fZ"
return AuditFormatter(config=config, fmt=fmt, datefmt=datefmt)


def setup_logging(config: Config):
"""Setup basic logging functionality"

Expand All @@ -36,5 +131,7 @@ def setup_logging(config: Config):

logging_configuration = copy.deepcopy(LOGGING_CONFIG_DICTIONARY)
logging_configuration["loggers"]["root"]["level"] = config.logging.level
# Set the config in the formatter
logging_configuration["formatters"]["audit"]["config"] = config

logging.config.dictConfig(logging_configuration)
5 changes: 5 additions & 0 deletions data/development/config/command_line_assistant/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,8 @@ verify_ssl = false

[logging]
level = "DEBUG"
responses = false # Global setting - don't log responses by default
question = false # Global setting - don't log questions by default

# User-specific settings
# users.admin = { responses = true, question = true }
12 changes: 9 additions & 3 deletions tests/commands/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def mock_dbus_service(mock_proxy):
# Setup default mock response
mock_output = Message()
mock_output.message = "default mock response"
mock_output.user = "mock"
mock_proxy.RetrieveAnswer = lambda: Message.to_structure(mock_output)

yield mock_proxy
Expand Down Expand Up @@ -53,15 +54,19 @@ def test_query_command_run(mock_dbus_service, test_input, expected_output, capsy
# Setup mock response for this specific test
mock_output = Message()
mock_output.message = expected_output
mock_output.user = "mock"
mock_dbus_service.RetrieveAnswer = lambda: Message.to_structure(mock_output)

# Create and run command
command = QueryCommand(test_input, None)
command.run()
with patch("command_line_assistant.commands.query.getpass.getuser") as mock_getuser:
mock_getuser.return_value = "mock"
# Create and run command
command = QueryCommand(test_input, None)
command.run()

# Verify ProcessQuery was called with correct input
expected_input = Message()
expected_input.message = test_input
expected_input.user = "mock"
mock_dbus_service.ProcessQuery.assert_called_once_with(
Message.to_structure(expected_input)
)
Expand All @@ -76,6 +81,7 @@ def test_query_command_empty_response(mock_dbus_service, capsys):
# Setup empty response
mock_output = Message()
mock_output.message = ""
mock_output.user = "mock"
mock_dbus_service.RetrieveAnswer = lambda: Message.to_structure(mock_output)

command = QueryCommand("test query", None)
Expand Down
21 changes: 14 additions & 7 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import copy
import logging
from pathlib import Path
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

import pytest

Expand All @@ -13,20 +14,26 @@
OutputSchema,
)
from command_line_assistant.config.schemas import AuthSchema
from command_line_assistant.logger import LOGGING_CONFIG_DICTIONARY


@pytest.fixture(autouse=True)
def setup_logger(request):
def setup_logger(tmp_path, request):
# This makes it so we can skip this using @pytest.mark.noautofixtures
if "noautofixtures" in request.keywords:
return

logger.setup_logging(config.Config(logging=config.LoggingSchema(level="DEBUG")))
logging_configuration = copy.deepcopy(LOGGING_CONFIG_DICTIONARY)
logging_configuration["handlers"]["audit_file"]["filename"] = tmp_path / "audit.log"
with patch(
"command_line_assistant.logger.LOGGING_CONFIG_DICTIONARY", logging_configuration
):
logger.setup_logging(config.Config(logging=config.LoggingSchema(level="DEBUG")))

# get root logger
root_logger = logging.getLogger()
for handler in root_logger.handlers:
root_logger.removeHandler(handler)
# get root logger
root_logger = logging.getLogger()
for handler in root_logger.handlers:
root_logger.removeHandler(handler)


@pytest.fixture
Expand Down
Loading
Loading