Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding logging #1477

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 28 additions & 19 deletions edsl/data/RemoteCacheSync.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
from dataclasses import dataclass
from contextlib import AbstractContextManager
from collections import UserList
from logging import Logger, getLogger
import logging
from edsl.logging_config import setup_logging

if TYPE_CHECKING:
from .Cache import Cache
from edsl.coop.coop import Coop
from .CacheEntry import CacheEntry

from logging import Logger


class CacheKeyList(UserList):
def __init__(self, data: List[str]):
Expand Down Expand Up @@ -69,6 +70,7 @@ def __init__(
output_func: Callable,
remote_cache: bool = True,
remote_cache_description: str = "",
logger: Optional[Logger] = None,
):
"""
Initializes a RemoteCacheSync object.
Expand All @@ -78,14 +80,18 @@ def __init__(
:param output_func: Function for outputting messages
:param remote_cache: Whether to enable remote cache synchronization
:param remote_cache_description: Description for remote cache entries

:param logger: Optional logger instance. If not provided, creates a new one
"""
self.coop = coop
self.cache = cache
self._output = output_func
self.remote_cache_enabled = remote_cache
self.remote_cache_description = remote_cache_description
self.initial_cache_keys = []
self.logger = logger or getLogger(__name__)
self.logger.info(
f"RemoteCacheSync initialized with cache size: {len(self.cache.keys())}"
)

def __enter__(self) -> "RemoteCacheSync":
if self.remote_cache_enabled:
Expand All @@ -112,18 +118,18 @@ def _sync_from_remote(self) -> None:
missing_count = len(diff.client_missing_entries)

if missing_count == 0:
# self._output("No new entries to add to local cache.")
self.logger.debug("No new entries to add to local cache.")
return

# self._output(
# f"Updating local cache with {missing_count:,} new "
# f"{'entry' if missing_count == 1 else 'entries'} from remote..."
# )
self.logger.info(
f"Updating local cache with {missing_count:,} new "
f"{'entry' if missing_count == 1 else 'entries'} from remote..."
)

self.cache.add_from_dict(
{entry.key: entry for entry in diff.client_missing_entries}
)
# self._output("Local cache updated!")
self.logger.info("Local cache updated!")

def _get_entries_to_upload(self, diff: CacheDifference) -> CacheEntriesList:
"""Determines which entries need to be uploaded to remote cache."""
Expand Down Expand Up @@ -154,30 +160,33 @@ def _sync_to_remote(self) -> None:
upload_count = len(entries_to_upload)

if upload_count > 0:
# self._output(
# f"Updating remote cache with {upload_count:,} new "
# f"{'entry' if upload_count == 1 else 'entries'}..."
# )
self.logger.info(
f"Updating remote cache with {upload_count:,} new "
f"{'entry' if upload_count == 1 else 'entries'}..."
)

self.coop.remote_cache_create_many(
entries_to_upload,
visibility="private",
description=self.remote_cache_description,
)
# self._output("Remote cache updated!")
# else:
# self._output("No new entries to add to remote cache.")
self.logger.info("Remote cache updated!")
else:
self.logger.debug("No new entries to add to remote cache.")

# self._output(
# f"There are {len(self.cache.keys()):,} entries in the local cache."
# )
self.logger.info(
f"There are {len(self.cache.keys()):,} entries in the local cache."
)


if __name__ == "__main__":
import doctest

doctest.testmod()

setup_logging() # Use default settings
# Or customize: setup_logging(log_dir="custom_logs", console_level=logging.DEBUG)

from edsl.coop.coop import Coop
from edsl.data.Cache import Cache
from edsl.data.CacheEntry import CacheEntry
Expand Down
37 changes: 32 additions & 5 deletions edsl/jobs/Jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
from edsl.jobs.JobsChecks import JobsChecks
from edsl.jobs.data_structures import RunEnvironment, RunParameters, RunConfig

import logging

if TYPE_CHECKING:
from edsl.agents.Agent import Agent
from edsl.agents.AgentList import AgentList
Expand Down Expand Up @@ -499,9 +501,14 @@ def _check_if_local_keys_ok(self):
jc.check_api_keys()

async def _execute_with_remote_cache(self, run_job_async: bool) -> Results:

logger = logging.getLogger(__name__)
use_remote_cache = self.use_remote_cache()

if use_remote_cache:
logger.info("Remote cache enabled")
else:
logger.debug("Remote cache disabled")

from edsl.coop.coop import Coop
from edsl.jobs.runners.JobsRunnerAsyncio import JobsRunnerAsyncio
from edsl.data.Cache import Cache
Expand All @@ -516,10 +523,12 @@ async def _execute_with_remote_cache(self, run_job_async: bool) -> Results:
remote_cache_description=self.run_config.parameters.remote_cache_description,
):
runner = JobsRunnerAsyncio(self, environment=self.run_config.environment)
logger.info("Starting job execution")
if run_job_async:
results = await runner.run_async(self.run_config.parameters)
else:
results = runner.run(self.run_config.parameters)
logger.info(f"Job completed with {len(results)} results")
return results

def _setup_and_check(self) -> Tuple[RunConfig, Optional[Results]]:
Expand All @@ -543,15 +552,21 @@ def num_interviews(self):

def _run(self, config: RunConfig):
"Shared code for run and run_async"
logger = logging.getLogger(__name__)
logger.info(f"Initializing job with {self.num_interviews} interviews")

if config.environment.cache is not None:
logger.debug("Using provided cache")
self.run_config.environment.cache = config.environment.cache

if config.environment.bucket_collection is not None:
logger.debug("Using provided bucket collection")
self.run_config.environment.bucket_collection = (
config.environment.bucket_collection
)

if config.environment.key_lookup is not None:
logger.debug("Using provided key lookup")
self.run_config.environment.key_lookup = config.environment.key_lookup

# replace the parameters with the ones from the config
Expand All @@ -560,29 +575,35 @@ def _run(self, config: RunConfig):
self.replace_missing_objects()

# try to run remotely first
logger.debug("Preparing job and checking remote execution options")
self._prepare_to_run()
self._check_if_remote_keys_ok()

if (
self.run_config.environment.cache is None
or self.run_config.environment.cache is True
):
logger.debug("Initializing default cache")
from edsl.data.CacheHandler import CacheHandler

self.run_config.environment.cache = CacheHandler().get_cache()

if self.run_config.environment.cache is False:
logger.debug("Initializing cache with immediate_write=False")
from edsl.data.Cache import Cache

self.run_config.environment.cache = Cache(immediate_write=False)

# first try to run the job remotely
if results := self._remote_results():
logger.info("Job executed remotely")
return results

logger.debug("Checking local API keys")
self._check_if_local_keys_ok()

if config.environment.bucket_collection is None:
logger.debug("Creating new bucket collection")
self.run_config.environment.bucket_collection = (
self.create_bucket_collection()
)
Expand All @@ -606,9 +627,12 @@ def run(self, *, config: RunConfig) -> "Results":
:param bucket_collection: A BucketCollection object to track API calls
:param key_lookup: A KeyLookup object to manage API keys
"""
logger = logging.getLogger(__name__)
logger.info("Starting synchronous job execution")
self._run(config)

return asyncio.run(self._execute_with_remote_cache(run_job_async=False))
results = asyncio.run(self._execute_with_remote_cache(run_job_async=False))
logger.info("Synchronous job execution completed")
return results

@with_config
async def run_async(self, *, config: RunConfig) -> "Results":
Expand All @@ -629,9 +653,12 @@ async def run_async(self, *, config: RunConfig) -> "Results":
:param bucket_collection: A BucketCollection object to track API calls
:param key_lookup: A KeyLookup object to manage API keys
"""
logger = logging.getLogger(__name__)
logger.info("Starting asynchronous job execution")
self._run(config)

return await self._execute_with_remote_cache(run_job_async=True)
results = await self._execute_with_remote_cache(run_job_async=True)
logger.info("Asynchronous job execution completed")
return results

def __repr__(self) -> str:
"""Return an eval-able string representation of the Jobs instance."""
Expand Down
5 changes: 5 additions & 0 deletions edsl/jobs/runners/JobsRunnerAsyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import threading
import warnings
from typing import TYPE_CHECKING
from edsl.logging_config import setup_logging
import logging

from edsl.results.Results import Results
from edsl.jobs.runners.JobsRunnerStatus import JobsRunnerStatus
Expand All @@ -24,6 +26,9 @@ class JobsRunnerAsyncio:
"""

def __init__(self, jobs: "Jobs", environment: RunEnvironment):
setup_logging()
self.logger = logging.getLogger(__name__)
self.logger.info(f"Initializing JobsRunnerAsyncio with {len(jobs)} jobs")
self.jobs = jobs
self.environment = environment

Expand Down
67 changes: 67 additions & 0 deletions edsl/logging_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import logging
from logging.handlers import RotatingFileHandler
from pathlib import Path


def setup_logging(
log_dir: str = "logs",
log_file: str = "edsl.log",
file_level: int = logging.DEBUG,
console_level: int = None,
max_bytes: int = 10 * 1024 * 1024, # 10MB
backup_count: int = 5,
) -> None:
"""Configure logging for the EDSL framework.

Args:
log_dir: Directory where log files will be stored
log_file: Name of the log file
file_level: Logging level for file output
console_level: Logging level for console output. If None, no console output
max_bytes: Maximum size of each log file
backup_count: Number of backup files to keep
"""
# Create logs directory if it doesn't exist
log_path = Path(log_dir)
log_path.mkdir(exist_ok=True)

# Configure the root logger
root_logger = logging.getLogger()

# Clear any existing handlers to avoid duplicate logs
root_logger.handlers.clear()

# Create file handler for all logs
file_handler = RotatingFileHandler(
log_path / log_file, maxBytes=max_bytes, backupCount=backup_count
)
file_handler.setLevel(file_level)

# Create formatter and add it to the handlers
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
file_handler.setFormatter(formatter)

# Add file handler to the root logger
root_logger.addHandler(file_handler)

# Only add console handler if console_level is specified
if console_level is not None:
console_handler = logging.StreamHandler()
console_handler.setLevel(console_level)
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)

# Set overall logging level to file_level since we always want file logging
root_logger.setLevel(file_level)


if __name__ == "__main__":
setup_logging()
logger = logging.getLogger(__name__)

logger.debug("This is a debug message")
logger.info("This is an info message")
logger.warning("This is a warning message")
logger.error("This is an error message")
Loading