diff --git a/edsl/data/RemoteCacheSync.py b/edsl/data/RemoteCacheSync.py index c96284cf..ccdaa157 100644 --- a/edsl/data/RemoteCacheSync.py +++ b/edsl/data/RemoteCacheSync.py @@ -2,14 +2,15 @@ from dataclasses import dataclass from contextlib import AbstractContextManager from collections import UserList +from logging import Logger, getLogger +import logging +from edsl.logging_config import setup_logging if TYPE_CHECKING: from .Cache import Cache from edsl.coop.coop import Coop from .CacheEntry import CacheEntry -from logging import Logger - class CacheKeyList(UserList): def __init__(self, data: List[str]): @@ -69,6 +70,7 @@ def __init__( output_func: Callable, remote_cache: bool = True, remote_cache_description: str = "", + logger: Optional[Logger] = None, ): """ Initializes a RemoteCacheSync object. @@ -78,7 +80,7 @@ def __init__( :param output_func: Function for outputting messages :param remote_cache: Whether to enable remote cache synchronization :param remote_cache_description: Description for remote cache entries - + :param logger: Optional logger instance. If not provided, creates a new one """ self.coop = coop self.cache = cache @@ -86,6 +88,10 @@ def __init__( self.remote_cache_enabled = remote_cache self.remote_cache_description = remote_cache_description self.initial_cache_keys = [] + self.logger = logger or getLogger(__name__) + self.logger.info( + f"RemoteCacheSync initialized with cache size: {len(self.cache.keys())}" + ) def __enter__(self) -> "RemoteCacheSync": if self.remote_cache_enabled: @@ -112,18 +118,18 @@ def _sync_from_remote(self) -> None: missing_count = len(diff.client_missing_entries) if missing_count == 0: - # self._output("No new entries to add to local cache.") + self.logger.debug("No new entries to add to local cache.") return - # self._output( - # f"Updating local cache with {missing_count:,} new " - # f"{'entry' if missing_count == 1 else 'entries'} from remote..." - # ) + self.logger.info( + f"Updating local cache with {missing_count:,} new " + f"{'entry' if missing_count == 1 else 'entries'} from remote..." + ) self.cache.add_from_dict( {entry.key: entry for entry in diff.client_missing_entries} ) - # self._output("Local cache updated!") + self.logger.info("Local cache updated!") def _get_entries_to_upload(self, diff: CacheDifference) -> CacheEntriesList: """Determines which entries need to be uploaded to remote cache.""" @@ -154,23 +160,23 @@ def _sync_to_remote(self) -> None: upload_count = len(entries_to_upload) if upload_count > 0: - # self._output( - # f"Updating remote cache with {upload_count:,} new " - # f"{'entry' if upload_count == 1 else 'entries'}..." - # ) + self.logger.info( + f"Updating remote cache with {upload_count:,} new " + f"{'entry' if upload_count == 1 else 'entries'}..." + ) self.coop.remote_cache_create_many( entries_to_upload, visibility="private", description=self.remote_cache_description, ) - # self._output("Remote cache updated!") - # else: - # self._output("No new entries to add to remote cache.") + self.logger.info("Remote cache updated!") + else: + self.logger.debug("No new entries to add to remote cache.") - # self._output( - # f"There are {len(self.cache.keys()):,} entries in the local cache." - # ) + self.logger.info( + f"There are {len(self.cache.keys()):,} entries in the local cache." + ) if __name__ == "__main__": @@ -178,6 +184,9 @@ def _sync_to_remote(self) -> None: doctest.testmod() + setup_logging() # Use default settings + # Or customize: setup_logging(log_dir="custom_logs", console_level=logging.DEBUG) + from edsl.coop.coop import Coop from edsl.data.Cache import Cache from edsl.data.CacheEntry import CacheEntry diff --git a/edsl/jobs/Jobs.py b/edsl/jobs/Jobs.py index af93ef36..bbe8da5b 100644 --- a/edsl/jobs/Jobs.py +++ b/edsl/jobs/Jobs.py @@ -26,6 +26,8 @@ from edsl.jobs.JobsChecks import JobsChecks from edsl.jobs.data_structures import RunEnvironment, RunParameters, RunConfig +import logging + if TYPE_CHECKING: from edsl.agents.Agent import Agent from edsl.agents.AgentList import AgentList @@ -499,9 +501,14 @@ def _check_if_local_keys_ok(self): jc.check_api_keys() async def _execute_with_remote_cache(self, run_job_async: bool) -> Results: - + logger = logging.getLogger(__name__) use_remote_cache = self.use_remote_cache() + if use_remote_cache: + logger.info("Remote cache enabled") + else: + logger.debug("Remote cache disabled") + from edsl.coop.coop import Coop from edsl.jobs.runners.JobsRunnerAsyncio import JobsRunnerAsyncio from edsl.data.Cache import Cache @@ -516,10 +523,12 @@ async def _execute_with_remote_cache(self, run_job_async: bool) -> Results: remote_cache_description=self.run_config.parameters.remote_cache_description, ): runner = JobsRunnerAsyncio(self, environment=self.run_config.environment) + logger.info("Starting job execution") if run_job_async: results = await runner.run_async(self.run_config.parameters) else: results = runner.run(self.run_config.parameters) + logger.info(f"Job completed with {len(results)} results") return results def _setup_and_check(self) -> Tuple[RunConfig, Optional[Results]]: @@ -543,15 +552,21 @@ def num_interviews(self): def _run(self, config: RunConfig): "Shared code for run and run_async" + logger = logging.getLogger(__name__) + logger.info(f"Initializing job with {self.num_interviews} interviews") + if config.environment.cache is not None: + logger.debug("Using provided cache") self.run_config.environment.cache = config.environment.cache if config.environment.bucket_collection is not None: + logger.debug("Using provided bucket collection") self.run_config.environment.bucket_collection = ( config.environment.bucket_collection ) if config.environment.key_lookup is not None: + logger.debug("Using provided key lookup") self.run_config.environment.key_lookup = config.environment.key_lookup # replace the parameters with the ones from the config @@ -560,6 +575,7 @@ def _run(self, config: RunConfig): self.replace_missing_objects() # try to run remotely first + logger.debug("Preparing job and checking remote execution options") self._prepare_to_run() self._check_if_remote_keys_ok() @@ -567,22 +583,27 @@ def _run(self, config: RunConfig): self.run_config.environment.cache is None or self.run_config.environment.cache is True ): + logger.debug("Initializing default cache") from edsl.data.CacheHandler import CacheHandler self.run_config.environment.cache = CacheHandler().get_cache() if self.run_config.environment.cache is False: + logger.debug("Initializing cache with immediate_write=False") from edsl.data.Cache import Cache self.run_config.environment.cache = Cache(immediate_write=False) # first try to run the job remotely if results := self._remote_results(): + logger.info("Job executed remotely") return results + logger.debug("Checking local API keys") self._check_if_local_keys_ok() if config.environment.bucket_collection is None: + logger.debug("Creating new bucket collection") self.run_config.environment.bucket_collection = ( self.create_bucket_collection() ) @@ -606,9 +627,12 @@ def run(self, *, config: RunConfig) -> "Results": :param bucket_collection: A BucketCollection object to track API calls :param key_lookup: A KeyLookup object to manage API keys """ + logger = logging.getLogger(__name__) + logger.info("Starting synchronous job execution") self._run(config) - - return asyncio.run(self._execute_with_remote_cache(run_job_async=False)) + results = asyncio.run(self._execute_with_remote_cache(run_job_async=False)) + logger.info("Synchronous job execution completed") + return results @with_config async def run_async(self, *, config: RunConfig) -> "Results": @@ -629,9 +653,12 @@ async def run_async(self, *, config: RunConfig) -> "Results": :param bucket_collection: A BucketCollection object to track API calls :param key_lookup: A KeyLookup object to manage API keys """ + logger = logging.getLogger(__name__) + logger.info("Starting asynchronous job execution") self._run(config) - - return await self._execute_with_remote_cache(run_job_async=True) + results = await self._execute_with_remote_cache(run_job_async=True) + logger.info("Asynchronous job execution completed") + return results def __repr__(self) -> str: """Return an eval-able string representation of the Jobs instance.""" diff --git a/edsl/jobs/runners/JobsRunnerAsyncio.py b/edsl/jobs/runners/JobsRunnerAsyncio.py index 26d0225f..37e0f9a6 100644 --- a/edsl/jobs/runners/JobsRunnerAsyncio.py +++ b/edsl/jobs/runners/JobsRunnerAsyncio.py @@ -4,6 +4,8 @@ import threading import warnings from typing import TYPE_CHECKING +from edsl.logging_config import setup_logging +import logging from edsl.results.Results import Results from edsl.jobs.runners.JobsRunnerStatus import JobsRunnerStatus @@ -24,6 +26,9 @@ class JobsRunnerAsyncio: """ def __init__(self, jobs: "Jobs", environment: RunEnvironment): + setup_logging() + self.logger = logging.getLogger(__name__) + self.logger.info(f"Initializing JobsRunnerAsyncio with {len(jobs)} jobs") self.jobs = jobs self.environment = environment diff --git a/edsl/logging_config.py b/edsl/logging_config.py new file mode 100644 index 00000000..fa321cf4 --- /dev/null +++ b/edsl/logging_config.py @@ -0,0 +1,67 @@ +import logging +from logging.handlers import RotatingFileHandler +from pathlib import Path + + +def setup_logging( + log_dir: str = "logs", + log_file: str = "edsl.log", + file_level: int = logging.DEBUG, + console_level: int = None, + max_bytes: int = 10 * 1024 * 1024, # 10MB + backup_count: int = 5, +) -> None: + """Configure logging for the EDSL framework. + + Args: + log_dir: Directory where log files will be stored + log_file: Name of the log file + file_level: Logging level for file output + console_level: Logging level for console output. If None, no console output + max_bytes: Maximum size of each log file + backup_count: Number of backup files to keep + """ + # Create logs directory if it doesn't exist + log_path = Path(log_dir) + log_path.mkdir(exist_ok=True) + + # Configure the root logger + root_logger = logging.getLogger() + + # Clear any existing handlers to avoid duplicate logs + root_logger.handlers.clear() + + # Create file handler for all logs + file_handler = RotatingFileHandler( + log_path / log_file, maxBytes=max_bytes, backupCount=backup_count + ) + file_handler.setLevel(file_level) + + # Create formatter and add it to the handlers + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + file_handler.setFormatter(formatter) + + # Add file handler to the root logger + root_logger.addHandler(file_handler) + + # Only add console handler if console_level is specified + if console_level is not None: + console_handler = logging.StreamHandler() + console_handler.setLevel(console_level) + console_handler.setFormatter(formatter) + root_logger.addHandler(console_handler) + + # Set overall logging level to file_level since we always want file logging + root_logger.setLevel(file_level) + + +if __name__ == "__main__": + setup_logging() + logger = logging.getLogger(__name__) + + logger.debug("This is a debug message") + logger.info("This is an info message") + logger.warning("This is a warning message") + logger.error("This is an error message")