From b419c7fc5189beced60b33f4e068fcb71c2318dd Mon Sep 17 00:00:00 2001 From: Adam Wierzbicki Date: Thu, 7 Nov 2019 13:01:52 +0100 Subject: [PATCH 1/2] Environment wrappers refactoring Signed-off-by: Adam Wierzbicki --- golem/envs/__init__.py | 8 +- golem/envs/auto_setup.py | 192 ---------------------------- golem/envs/wrappers/__init__.py | 129 +++++++++++++++++++ golem/envs/wrappers/auto_setup.py | 129 +++++++++++++++++++ golem/task/envmanager.py | 2 +- tests/golem/envs/test_auto_setup.py | 5 +- 6 files changed, 264 insertions(+), 201 deletions(-) delete mode 100644 golem/envs/auto_setup.py create mode 100644 golem/envs/wrappers/__init__.py create mode 100644 golem/envs/wrappers/auto_setup.py diff --git a/golem/envs/__init__.py b/golem/envs/__init__.py index f8c94e6796..883644ecd7 100644 --- a/golem/envs/__init__.py +++ b/golem/envs/__init__.py @@ -403,6 +403,7 @@ def supported(cls) -> EnvSupportStatus: """ Is the Environment supported on this machine? """ raise NotImplementedError + @abstractmethod def status(self) -> EnvStatus: """ Get current status of the Environment. """ raise NotImplementedError @@ -423,9 +424,8 @@ def run_benchmark(self) -> Deferred: """ Get the general performance score for this environment. """ raise NotImplementedError - @classmethod @abstractmethod - def parse_prerequisites(cls, prerequisites_dict: Dict[str, Any]) \ + def parse_prerequisites(self, prerequisites_dict: Dict[str, Any]) \ -> Prerequisites: """ Build Prerequisites struct from supplied dictionary. Returned value is of appropriate type for calling install_prerequisites(). """ @@ -438,9 +438,8 @@ def install_prerequisites(self, prerequisites: Prerequisites) -> Deferred: Returns boolean indicating whether installation was successful. """ raise NotImplementedError - @classmethod @abstractmethod - def parse_config(cls, config_dict: Dict[str, Any]) -> EnvConfig: + def parse_config(self, config_dict: Dict[str, Any]) -> EnvConfig: """ Build config struct from supplied dictionary. Returned value is of appropriate type for calling update_config(). """ raise NotImplementedError @@ -455,6 +454,7 @@ def update_config(self, config: EnvConfig) -> None: """ Update configuration. Assumes current status is 'DISABLED'. """ raise NotImplementedError + @abstractmethod def listen( self, event_type: EnvEventType, diff --git a/golem/envs/auto_setup.py b/golem/envs/auto_setup.py deleted file mode 100644 index 1f73968666..0000000000 --- a/golem/envs/auto_setup.py +++ /dev/null @@ -1,192 +0,0 @@ -from typing import ( - Any, - Callable, - Dict, - Optional, - Tuple -) - -from twisted.internet.defer import ( - Deferred, - DeferredLock, - inlineCallbacks -) - -from golem.envs import ( - CounterId, - CounterUsage, - EnvConfig, - EnvEventListener, - EnvEventType, - Environment, - EnvStatus, - EnvSupportStatus, - Prerequisites, - Runtime, - RuntimeEventListener, - RuntimeEventType, - RuntimeInput, - RuntimeOutput, - RuntimePayload, - RuntimeStatus -) - - -class RuntimeSetupWrapper(Runtime): - - def __init__( - self, - runtime: Runtime, - start_usage: Callable[[], Deferred], - end_usage: Callable[[], Deferred] - ) -> None: - self._runtime = runtime - self._start_usage = start_usage - self._end_usage = end_usage - - @inlineCallbacks - def prepare(self) -> Deferred: - yield self._start_usage() - yield self._runtime.prepare() - - @inlineCallbacks - def clean_up(self) -> Deferred: - yield self._runtime.clean_up() - yield self._end_usage() - - def start(self) -> Deferred: - return self._runtime.start() - - def wait_until_stopped(self) -> Deferred: - return self._runtime.wait_until_stopped() - - def stop(self) -> Deferred: - return self._runtime.stop() - - def status(self) -> RuntimeStatus: - return self._runtime.status() - - def stdin(self, encoding: Optional[str] = None) -> RuntimeInput: - return self._runtime.stdin(encoding) - - def stdout(self, encoding: Optional[str] = None) -> RuntimeOutput: - return self._runtime.stdout(encoding) - - def stderr(self, encoding: Optional[str] = None) -> RuntimeOutput: - return self._runtime.stderr(encoding) - - def get_port_mapping(self, port: int) -> Tuple[str, int]: - return self._runtime.get_port_mapping(port) - - def usage_counters(self) -> Dict[CounterId, CounterUsage]: - return self._runtime.usage_counters() - - def listen( - self, - event_type: RuntimeEventType, - listener: RuntimeEventListener - ) -> None: - self._runtime.listen(event_type, listener) - - -def auto_setup( - env: Environment, - start_usage: Callable[[Environment], Deferred], - end_usage: Callable[[Environment], Deferred] -) -> Environment: - - class EnvSetupWrapper(Environment): - - def __init__(self) -> None: - self._num_users = 0 - self._lock = DeferredLock() - - @inlineCallbacks - def _start_usage(self) -> Deferred: - yield self._lock.acquire() - try: - if self._num_users == 0: - yield start_usage(env) - self._num_users += 1 - finally: - self._lock.release() - - @inlineCallbacks - def _end_usage(self) -> Deferred: - yield self._lock.acquire() - try: - self._num_users -= 1 - if self._num_users == 0: - yield end_usage(env) - finally: - self._lock.release() - - @classmethod - def supported(cls) -> EnvSupportStatus: - return env.supported() - - def status(self) -> EnvStatus: - return env.status() - - def prepare(self) -> Deferred: - raise AttributeError('prepare and clean_up not supported') - - def clean_up(self) -> Deferred: - raise AttributeError('prepare and clean_up not supported') - - @inlineCallbacks - def run_benchmark(self) -> Deferred: - yield self._start_usage() - try: - return (yield env.run_benchmark()) - finally: - yield self._end_usage() - - @classmethod - def parse_prerequisites( - cls, - prerequisites_dict: Dict[str, Any] - ) -> Prerequisites: - return env.parse_prerequisites(prerequisites_dict) - - @inlineCallbacks - def install_prerequisites( - self, - prerequisites: Prerequisites - ) -> Deferred: - yield self._start_usage() - try: - return (yield env.install_prerequisites(prerequisites)) - finally: - yield self._end_usage() - - @classmethod - def parse_config(cls, config_dict: Dict[str, Any]) -> EnvConfig: - return env.parse_config(config_dict) - - def config(self) -> EnvConfig: - return env.config() - - def update_config(self, config: EnvConfig) -> None: - env.update_config(config) - - def listen( - self, - event_type: EnvEventType, - listener: EnvEventListener - ) -> None: - env.listen(event_type, listener) - - def runtime( - self, - payload: RuntimePayload, - config: Optional[EnvConfig] = None - ) -> Runtime: - runtime = env.runtime(payload, config) - return RuntimeSetupWrapper( - runtime=runtime, - start_usage=self._start_usage, - end_usage=self._end_usage - ) - - return EnvSetupWrapper() diff --git a/golem/envs/wrappers/__init__.py b/golem/envs/wrappers/__init__.py new file mode 100644 index 0000000000..d0d608d6cc --- /dev/null +++ b/golem/envs/wrappers/__init__.py @@ -0,0 +1,129 @@ +from typing import ( + Any, + Dict, + Optional, + Tuple +) + +from twisted.internet.defer import Deferred + +from golem.envs import ( + CounterId, + CounterUsage, + EnvConfig, + EnvEventListener, + EnvEventType, + Environment, + EnvStatus, + EnvSupportStatus, + Prerequisites, + Runtime, + RuntimeEventListener, + RuntimeEventType, + RuntimeInput, + RuntimeOutput, + RuntimePayload, + RuntimeStatus +) + + +class RuntimeWrapper(Runtime): + """ A no-op wrapper which proxies all calls to the wrapped Runtime. + Base class for implementing other wrappers. """ + + def __init__(self, runtime: Runtime) -> None: + self._runtime = runtime + + def prepare(self) -> Deferred: + return self._runtime.prepare() + + def clean_up(self) -> Deferred: + return self._runtime.clean_up() + + def start(self) -> Deferred: + return self._runtime.start() + + def wait_until_stopped(self) -> Deferred: + return self._runtime.wait_until_stopped() + + def stop(self) -> Deferred: + return self._runtime.stop() + + def status(self) -> RuntimeStatus: + return self._runtime.status() + + def stdin(self, encoding: Optional[str] = None) -> RuntimeInput: + return self._runtime.stdin(encoding) + + def stdout(self, encoding: Optional[str] = None) -> RuntimeOutput: + return self._runtime.stdout(encoding) + + def stderr(self, encoding: Optional[str] = None) -> RuntimeOutput: + return self._runtime.stderr(encoding) + + def get_port_mapping(self, port: int) -> Tuple[str, int]: + return self._runtime.get_port_mapping(port) + + def usage_counters(self) -> Dict[CounterId, CounterUsage]: + return self._runtime.usage_counters() + + def listen( + self, + event_type: RuntimeEventType, + listener: RuntimeEventListener + ) -> None: + self._runtime.listen(event_type, listener) + + +class EnvironmentWrapper(Environment): + + def __init__(self, env: Environment) -> None: + self._env = env + + @classmethod + def supported(cls) -> EnvSupportStatus: + # This method should not be called on a wrapped environment. + raise AttributeError('Method not supported on a wrapped environment') + + def status(self) -> EnvStatus: + return self._env.status() + + def prepare(self) -> Deferred: + return self._env.prepare() + + def clean_up(self) -> Deferred: + return self._env.clean_up() + + def run_benchmark(self) -> Deferred: + return self._env.run_benchmark() + + def parse_prerequisites( + self, prerequisites_dict: Dict[str, Any] + ) -> Prerequisites: + return self._env.parse_prerequisites(prerequisites_dict) + + def install_prerequisites(self, prerequisites: Prerequisites) -> Deferred: + return self._env.install_prerequisites(prerequisites) + + def parse_config(self, config_dict: Dict[str, Any]) -> EnvConfig: + return self._env.parse_config(config_dict) + + def config(self) -> EnvConfig: + return self._env.config() + + def update_config(self, config: EnvConfig) -> None: + self._env.update_config(config) + + def listen( + self, + event_type: EnvEventType, + listener: EnvEventListener + ) -> None: + self._env.listen(event_type, listener) + + def runtime( + self, + payload: RuntimePayload, + config: Optional[EnvConfig] = None + ) -> Runtime: + return self._env.runtime(payload, config) diff --git a/golem/envs/wrappers/auto_setup.py b/golem/envs/wrappers/auto_setup.py new file mode 100644 index 0000000000..0557262c90 --- /dev/null +++ b/golem/envs/wrappers/auto_setup.py @@ -0,0 +1,129 @@ +import functools +from typing import Callable, Optional + +from twisted.internet.defer import ( + Deferred, + DeferredLock, + inlineCallbacks +) + +from golem.envs import ( + EnvConfig, + Environment, + Prerequisites, + Runtime, + RuntimePayload, +) +from . import EnvironmentWrapper, RuntimeWrapper + + +class RuntimeSetupWrapper(RuntimeWrapper): + + def __init__( + self, + runtime: Runtime, + start_usage: Callable[[], Deferred], + end_usage: Callable[[], Deferred] + ) -> None: + super().__init__(runtime) + self._start_usage = start_usage + self._end_usage = end_usage + + @inlineCallbacks + def prepare(self) -> Deferred: + yield self._start_usage() + yield super().prepare() + + @inlineCallbacks + def clean_up(self) -> Deferred: + yield super().clean_up() + yield self._end_usage() + + +class EnvSetupWrapper(EnvironmentWrapper): + + def __init__( + self, + env: Environment, + start_usage: Callable[[], Deferred], + end_usage: Callable[[], Deferred] + ) -> None: + super().__init__(env) + self._num_users = 0 + self._lock = DeferredLock() + self._start_usage = start_usage + self._end_usage = end_usage + + @inlineCallbacks + def _prepare_runtime(self) -> Deferred: + yield self._lock.acquire() + try: + if self._num_users == 0: + yield self._start_usage() + self._num_users += 1 + finally: + self._lock.release() + + @inlineCallbacks + def _clean_up_runtime(self) -> Deferred: + yield self._lock.acquire() + try: + self._num_users -= 1 + if self._num_users == 0: + yield self._end_usage() + finally: + self._lock.release() + + def prepare(self) -> Deferred: + raise AttributeError('prepare and clean_up not supported') + + def clean_up(self) -> Deferred: + raise AttributeError('prepare and clean_up not supported') + + @inlineCallbacks + def run_benchmark(self) -> Deferred: + yield self._prepare_runtime() + try: + return (yield self._env.run_benchmark()) + finally: + yield self._clean_up_runtime() + + @inlineCallbacks + def install_prerequisites( + self, + prerequisites: Prerequisites + ) -> Deferred: + yield self._prepare_runtime() + try: + return (yield self._env.install_prerequisites(prerequisites)) + finally: + yield self._clean_up_runtime() + + def runtime( + self, + payload: RuntimePayload, + config: Optional[EnvConfig] = None + ) -> Runtime: + runtime = self._env.runtime(payload, config) + return RuntimeSetupWrapper( + runtime=runtime, + start_usage=self._prepare_runtime, + end_usage=self._clean_up_runtime + ) + + +def auto_setup( + env: Environment, + start_usage: Callable[[Environment], Deferred], + end_usage: Callable[[Environment], Deferred] +) -> Environment: + """ Wrap given environment so that it automatically calls start_usage when + it's needed and end_usage when it's no longer needed. By 'needed' we + mean there are active Runtime objects created by this environment, or + benchmark is running, or runtime prerequisites are being installed. """ + + return EnvSetupWrapper( + env=env, + start_usage=functools.partial(start_usage, env), + end_usage=functools.partial(end_usage, env) + ) diff --git a/golem/task/envmanager.py b/golem/task/envmanager.py index f6f099e0b7..ddc3a226dc 100644 --- a/golem/task/envmanager.py +++ b/golem/task/envmanager.py @@ -6,7 +6,7 @@ from twisted.internet.defer import Deferred, inlineCallbacks, DeferredLock from golem.envs import BenchmarkResult, EnvId, Environment, EnvMetadata -from golem.envs.auto_setup import auto_setup +from golem.envs.wrappers.auto_setup import auto_setup from golem.model import Performance, EnvConfiguration from golem.task.task_api import TaskApiPayloadBuilder diff --git a/tests/golem/envs/test_auto_setup.py b/tests/golem/envs/test_auto_setup.py index 7b0446d496..db810ec5b1 100644 --- a/tests/golem/envs/test_auto_setup.py +++ b/tests/golem/envs/test_auto_setup.py @@ -12,7 +12,7 @@ RuntimeEventType, RuntimePayload ) -from golem.envs.auto_setup import auto_setup +from golem.envs.wrappers.auto_setup import auto_setup class TestAutoSetup(TwistedTestCase): @@ -31,9 +31,6 @@ def setUp(self): self.master_mock.attach_mock(self.start_usage, 'start_usage') self.master_mock.attach_mock(self.end_usage, 'end_usage') - def test_supported(self): - self.assertEqual(self.wrapped_env.supported(), self.env.supported()) - def test_parse_prerequisites(self): prereq_dict = {'key': 'value'} prereq = self.wrapped_env.parse_prerequisites(prereq_dict) From f57f12c6b6aeb639278c740a8e269473249e0e51 Mon Sep 17 00:00:00 2001 From: Adam Wierzbicki Date: Thu, 7 Nov 2019 17:45:20 +0100 Subject: [PATCH 2/2] Environment wrapper for dumping output logs Signed-off-by: Adam Wierzbicki --- golem/core/common.py | 20 ++-- golem/envs/__init__.py | 12 ++- golem/envs/docker/cpu.py | 7 +- golem/envs/wrappers/__init__.py | 4 + golem/envs/wrappers/auto_setup.py | 2 +- golem/envs/wrappers/dump_logs.py | 98 ++++++++++++++++++++ golem/task/envmanager.py | 30 +++++- golem/task/task_api/__init__.py | 5 +- golem/task/taskserver.py | 5 +- scripts/task_api_tests/basic_integration.py | 4 +- tests/golem/envs/localhost.py | 23 ++++- tests/golem/task/server/test_queue.py | 2 +- tests/golem/task/test_appbenchmarkmanager.py | 2 +- tests/golem/task/test_envmanager.py | 36 ++++++- tests/golem/task/test_taskkeeper.py | 14 +-- 15 files changed, 233 insertions(+), 31 deletions(-) create mode 100644 golem/envs/wrappers/dump_logs.py diff --git a/golem/core/common.py b/golem/core/common.py index f2289cb00b..3741452e8c 100644 --- a/golem/core/common.py +++ b/golem/core/common.py @@ -7,7 +7,8 @@ from calendar import timegm from datetime import datetime from functools import wraps -from typing import Any, Callable, cast, List, TypeVar +from pathlib import Path +from typing import Any, Callable, cast, List, TypeVar, Optional import pytz @@ -238,6 +239,12 @@ def wrapper(*args, **kwargs): return decorator +def get_log_dir(data_dir: Optional[str] = None) -> Path: + if data_dir is None: + data_dir = simpleenv.get_local_datadir("default") + return Path(data_dir) / 'logs' + + # pylint: disable=too-many-branches,too-many-locals def config_logging( suffix='', @@ -252,9 +259,7 @@ def config_logging( except ImportError: from loggingconfig import LOGGING - if datadir is None: - datadir = simpleenv.get_local_datadir("default") - logdir_path = os.path.join(datadir, 'logs') + logdir_path = get_log_dir(datadir) for formatter in LOGGING.get('formatters', {}).values(): formatter['format'] = f"{formatter_prefix}{formatter['format']}" @@ -284,14 +289,11 @@ def config_logging( LOGGING['loggers']['twisted']['level'] = 'WARNING' try: - if not os.path.exists(logdir_path): - os.makedirs(logdir_path) + logdir_path.mkdir(parents=True, exist_ok=True) logging.config.dictConfig(LOGGING) except (ValueError, PermissionError) as e: - sys.stderr.write( - "Can't configure logging in: {} Got: {}\n".format(logdir_path, e) - ) + sys.stderr.write(f"Can't configure logging in {logdir_path} Got: {e}\n") return # Avoid consequent errors logging.captureWarnings(True) diff --git a/golem/envs/__init__.py b/golem/envs/__init__.py index 883644ecd7..a743f4f866 100644 --- a/golem/envs/__init__.py +++ b/golem/envs/__init__.py @@ -20,6 +20,7 @@ CounterUsage = Any EnvId = str +RuntimeId = str class RuntimeEventType(Enum): @@ -143,7 +144,10 @@ def __exit__(self, *_, **__) -> None: self.close() -class RuntimeOutput(Iterable[Union[str, bytes]], ABC): +RuntimeOutput = Iterable[Union[str, bytes]] + + +class RuntimeOutputBase(RuntimeOutput, ABC): """ A handle for reading output (either stdout or stderr) from a running Runtime. Yielded items are output lines. Output could be either raw (bytes) or decoded (str). """ @@ -161,6 +165,12 @@ class Runtime(ABC): """ A runnable object representing some particular computation. Tied to a particular Environment that was used to create this object. """ + @abstractmethod + def id(self) -> Optional[RuntimeId]: + """ Get unique identifier of this Runtime. Might not be available if the + Runtime is not yet prepared. """ + raise NotImplementedError + @abstractmethod def prepare(self) -> Deferred: """ Prepare the Runtime to be started. Assumes current status is diff --git a/golem/envs/docker/cpu.py b/golem/envs/docker/cpu.py index ce7d933c67..e2ed396113 100644 --- a/golem/envs/docker/cpu.py +++ b/golem/envs/docker/cpu.py @@ -34,8 +34,10 @@ EnvSupportStatus, Prerequisites, RuntimeBase, + RuntimeId, RuntimeInput, RuntimeOutput, + RuntimeOutputBase, RuntimePayload, RuntimeStatus, BenchmarkResult) @@ -74,7 +76,7 @@ def from_dict(cls, data: Dict[str, Any]) -> 'DockerCPUConfig': return cls(work_dirs=work_dirs, **data) -class DockerOutput(RuntimeOutput): +class DockerOutput(RuntimeOutputBase): def __init__( self, raw_output: Iterable[bytes], encoding: Optional[str] = None @@ -231,6 +233,9 @@ def _update_status_loop(self) -> None: self._logger.info("Runtime is no longer running. " "Stopping status update thread.") + def id(self) -> Optional[RuntimeId]: + return self._container_id + def prepare(self) -> Deferred: self._change_status( from_status=RuntimeStatus.CREATED, diff --git a/golem/envs/wrappers/__init__.py b/golem/envs/wrappers/__init__.py index d0d608d6cc..673827b344 100644 --- a/golem/envs/wrappers/__init__.py +++ b/golem/envs/wrappers/__init__.py @@ -20,6 +20,7 @@ Runtime, RuntimeEventListener, RuntimeEventType, + RuntimeId, RuntimeInput, RuntimeOutput, RuntimePayload, @@ -34,6 +35,9 @@ class RuntimeWrapper(Runtime): def __init__(self, runtime: Runtime) -> None: self._runtime = runtime + def id(self) -> Optional[RuntimeId]: + return self._runtime.id() + def prepare(self) -> Deferred: return self._runtime.prepare() diff --git a/golem/envs/wrappers/auto_setup.py b/golem/envs/wrappers/auto_setup.py index 0557262c90..8140755bf4 100644 --- a/golem/envs/wrappers/auto_setup.py +++ b/golem/envs/wrappers/auto_setup.py @@ -14,7 +14,7 @@ Runtime, RuntimePayload, ) -from . import EnvironmentWrapper, RuntimeWrapper +from golem.envs.wrappers import EnvironmentWrapper, RuntimeWrapper class RuntimeSetupWrapper(RuntimeWrapper): diff --git a/golem/envs/wrappers/dump_logs.py b/golem/envs/wrappers/dump_logs.py new file mode 100644 index 0000000000..4017e7cc0d --- /dev/null +++ b/golem/envs/wrappers/dump_logs.py @@ -0,0 +1,98 @@ +import logging +from pathlib import Path +from threading import Thread +from typing import Optional + +from twisted.internet.defer import Deferred, inlineCallbacks + +from golem.envs import ( + EnvConfig, + Environment, + Runtime, + RuntimeOutput, + RuntimePayload, +) +from golem.envs.wrappers import EnvironmentWrapper, RuntimeWrapper + +logger = logging.getLogger(__name__) + + +class RuntimeLogsWrapper(RuntimeWrapper): + + def __init__( + self, + runtime: Runtime, + logs_dir: Path, + encoding: str = 'utf-8' + ) -> None: + super().__init__(runtime) + self._logs_dir = logs_dir + self._encoding = encoding + self._stdout_thread: Optional[Thread] = None + self._stderr_thread: Optional[Thread] = None + + def _dump_output(self, output: RuntimeOutput, path: Path) -> None: + logger.info('Dumping runtime output to %r', path) + with path.open(mode='w', encoding=self._encoding) as file: + file.writelines(output) + + @inlineCallbacks + def prepare(self) -> Deferred: + yield super().prepare() + stdout_file = self._logs_dir / f'{self._runtime.id()}_stdout.txt' + stderr_file = self._logs_dir / f'{self._runtime.id()}_stderr.txt' + stdout = self._runtime.stdout(self._encoding) + stderr = self._runtime.stderr(self._encoding) + self._stdout_thread = Thread( + target=self._dump_output, + args=(stdout, stdout_file)) + self._stderr_thread = Thread( + target=self._dump_output, + args=(stderr, stderr_file)) + self._stdout_thread.start() + self._stderr_thread.start() + + @inlineCallbacks + def clean_up(self) -> Deferred: + assert self._stdout_thread is not None + assert self._stderr_thread is not None + yield super().clean_up() + self._stdout_thread.join(5) + if self._stdout_thread.is_alive(): + logger.warning('Cannot join stdout thread') + self._stderr_thread.join(5) + if self._stderr_thread.is_alive(): + logger.warning('Cannot join stderr thread') + + +class EnvironmentLogsWrapper(EnvironmentWrapper): + + def __init__( + self, + env: Environment, + logs_dir: Path, + encoding: str = 'utf-8' + ) -> None: + super().__init__(env) + self._logs_dir = logs_dir + self._encoding = encoding + + def runtime( + self, + payload: RuntimePayload, + config: Optional[EnvConfig] = None + ) -> Runtime: + runtime = super().runtime(payload, config) + return RuntimeLogsWrapper(runtime, self._logs_dir, self._encoding) + + +def dump_logs( + env: Environment, + logs_dir: Path, + encoding: str = 'utf-8' +) -> Environment: + return EnvironmentLogsWrapper( + env=env, + logs_dir=logs_dir, + encoding=encoding + ) diff --git a/golem/task/envmanager.py b/golem/task/envmanager.py index ddc3a226dc..dc26d53699 100644 --- a/golem/task/envmanager.py +++ b/golem/task/envmanager.py @@ -1,12 +1,18 @@ import logging +from pathlib import Path from typing import Dict, List, Type, Optional from dataclasses import dataclass from peewee import PeeweeException from twisted.internet.defer import Deferred, inlineCallbacks, DeferredLock -from golem.envs import BenchmarkResult, EnvId, Environment, EnvMetadata -from golem.envs.wrappers.auto_setup import auto_setup +from golem.envs import ( + BenchmarkResult, + EnvId, + Environment, + EnvMetadata, +) +from golem.envs.wrappers import auto_setup, dump_logs from golem.model import Performance, EnvConfiguration from golem.task.task_api import TaskApiPayloadBuilder @@ -23,7 +29,8 @@ class EnvEntry: metadata: EnvMetadata payload_builder: Type[TaskApiPayloadBuilder] - def __init__(self): + def __init__(self, runtime_logs_dir: Path) -> None: + self._runtime_logs_dir = runtime_logs_dir self._envs: Dict[EnvId, EnvironmentManager.EnvEntry] = {} self._state = EnvStates() self._running_benchmark: bool = False @@ -67,7 +74,22 @@ def register_env( """ Register an Environment (i.e. make it visible to manager). """ if metadata.id in self._envs: raise ValueError(f"Environment '{metadata.id}' already registered.") - wrapped_env = auto_setup(env, self._start_usage, self._end_usage) + + # Apply automatic setup wrapper + wrapped_env = auto_setup.auto_setup( + env=env, + start_usage=self._start_usage, + end_usage=self._end_usage + ) + + # Apply runtime logs wrapper + logs_dir = self._runtime_logs_dir / metadata.id + logs_dir.mkdir(parents=True, exist_ok=True) + wrapped_env = dump_logs.dump_logs( + env=wrapped_env, + logs_dir=logs_dir + ) + self._envs[metadata.id] = EnvironmentManager.EnvEntry( instance=wrapped_env, metadata=metadata, diff --git a/golem/task/task_api/__init__.py b/golem/task/task_api/__init__.py index e36cad33f9..c003460680 100644 --- a/golem/task/task_api/__init__.py +++ b/golem/task/task_api/__init__.py @@ -51,7 +51,10 @@ async def start(self, command: str, port: int) -> Tuple[str, int]: ) self._runtime = self._env.runtime(runtime_payload) loop = asyncio.get_event_loop() - await self._runtime.prepare().asFuture(loop) + d = self._runtime.prepare() + f = d.asFuture(loop) + await f + # await self._runtime.prepare().asFuture(loop) await self._runtime.start().asFuture(loop) return self._runtime.get_port_mapping(port) diff --git a/golem/task/taskserver.py b/golem/task/taskserver.py index eae4029bee..1c35e945af 100644 --- a/golem/task/taskserver.py +++ b/golem/task/taskserver.py @@ -37,7 +37,7 @@ from golem.apps import manager as app_manager from golem.apps.default import save_built_in_app_definitions from golem.clientconfigdescriptor import ClientConfigDescriptor -from golem.core.common import short_node_id, deadline_to_timeout +from golem.core.common import short_node_id, deadline_to_timeout, get_log_dir from golem.core.deferred import ( asyncio_main_loop, deferred_from_future, @@ -129,7 +129,8 @@ def __init__(self, Path(self.get_task_computer_root()).mkdir(parents=True, exist_ok=True) - new_env_manager = EnvironmentManager() + runtime_logs_dir = get_log_dir(client.datadir) + new_env_manager = EnvironmentManager(runtime_logs_dir) register_built_in_repositories() register_environments( work_dir=self.get_task_computer_root(), diff --git a/scripts/task_api_tests/basic_integration.py b/scripts/task_api_tests/basic_integration.py index 456c64988b..b459a72be0 100644 --- a/scripts/task_api_tests/basic_integration.py +++ b/scripts/task_api_tests/basic_integration.py @@ -38,7 +38,9 @@ async def test_task( app_manager.register_app(app_definition) app_manager.set_enabled(app_definition.id, True) - env_manager = envmanager.EnvironmentManager() + runtime_logs_dir = work_dir / 'runtime_logs' + runtime_logs_dir.mkdir() + env_manager = envmanager.EnvironmentManager(runtime_logs_dir) register_built_in_repositories() register_environments( work_dir=str(work_dir), diff --git a/tests/golem/envs/localhost.py b/tests/golem/envs/localhost.py index 3a3a8c6717..30f46b177b 100644 --- a/tests/golem/envs/localhost.py +++ b/tests/golem/envs/localhost.py @@ -2,12 +2,14 @@ import logging import multiprocessing import signal +import uuid from pathlib import Path from typing import Optional, Dict, Any, Tuple, List, Awaitable, Callable import dill from dataclasses import dataclass, asdict from golem_task_api import RequestorAppHandler, ProviderAppHandler, entrypoint +from golem_task_api.dirutils import RequestorTaskDir from golem_task_api.enums import VerifyResult from golem_task_api.structs import Subtask, Task from twisted.internet import defer, threads @@ -24,6 +26,7 @@ Prerequisites, Runtime, RuntimeBase, + RuntimeId, RuntimeInput, RuntimeOutput, RuntimePayload @@ -72,6 +75,7 @@ class LocalhostPayload(RuntimePayload): command: str shared_dir: Path prerequisites: LocalhostPrerequisites + runtime_id: Optional[RuntimeId] = None class LocalhostPayloadBuilder(TaskApiPayloadBuilder): @@ -142,6 +146,16 @@ async def compute( return await self._prereq.compute( # type: ignore subtask_id, subtask_params) + async def abort_task(self, task_work_dir: RequestorTaskDir) -> None: + pass + + async def abort_subtask( + self, + task_work_dir: RequestorTaskDir, + subtask_id: str + ) -> None: + pass + class LocalhostRuntime(RuntimeBase): @@ -150,6 +164,8 @@ def __init__( payload: LocalhostPayload, ) -> None: super().__init__(logger) + self._id = payload.runtime_id or str(uuid.uuid4()) + # From docs: Start a fresh python interpreter process. Unnecessary # file descriptors and handles from the parent process will not # be inherited. @@ -161,6 +177,9 @@ def __init__( ) self._shutdown_deferred: Optional[defer.Deferred] = None + def id(self) -> Optional[RuntimeId]: + return self._id + def prepare(self) -> defer.Deferred: self._prepared() return defer.succeed(None) @@ -219,10 +238,10 @@ def stdin(self, encoding: Optional[str] = None) -> RuntimeInput: raise NotImplementedError def stdout(self, encoding: Optional[str] = None) -> RuntimeOutput: - raise NotImplementedError + return [] def stderr(self, encoding: Optional[str] = None) -> RuntimeOutput: - raise NotImplementedError + return [] def get_port_mapping(self, port: int) -> Tuple[str, int]: return '127.0.0.1', port diff --git a/tests/golem/task/server/test_queue.py b/tests/golem/task/server/test_queue.py index bb2dd3214f..9d952f98e4 100644 --- a/tests/golem/task/server/test_queue.py +++ b/tests/golem/task/server/test_queue.py @@ -25,7 +25,7 @@ def setUp(self): self.server.client = self.client self.server.task_keeper = taskkeeper.TaskHeaderKeeper( old_env_manager=self.client.environments_manager, - new_env_manager=NewEnvManager(), + new_env_manager=NewEnvManager(self.new_path), node=self.client.node, min_price=0 ) diff --git a/tests/golem/task/test_appbenchmarkmanager.py b/tests/golem/task/test_appbenchmarkmanager.py index a480e87cb3..d3aca452f2 100644 --- a/tests/golem/task/test_appbenchmarkmanager.py +++ b/tests/golem/task/test_appbenchmarkmanager.py @@ -33,7 +33,7 @@ def setup_env(self, env_id): @pytest.fixture(autouse=True) def setup_method(self, pytest_database_fixture, tmpdir, event_loop): # noqa # pylint: disable=attribute-defined-outside-init - self.env_manager = EnvironmentManager() + self.env_manager = EnvironmentManager(Path(tmpdir)) self.app_benchmark_manager = AppBenchmarkManager( env_manager=self.env_manager, root_path=Path(tmpdir)) diff --git a/tests/golem/task/test_envmanager.py b/tests/golem/task/test_envmanager.py index ce49ea9ad7..ea5e53349a 100644 --- a/tests/golem/task/test_envmanager.py +++ b/tests/golem/task/test_envmanager.py @@ -13,7 +13,7 @@ class EnvManagerBaseTest(DatabaseFixture): def setUp(self): super().setUp() - self.manager = EnvironmentManager() + self.manager = EnvironmentManager(self.new_path) def register_env(self, env_id): env = MagicMock(spec=Environment) @@ -125,6 +125,40 @@ def test_auto_setup(self): env2.prepare.assert_called_once() +class TestRuntimeLogs( # pylint: disable=too-many-ancestors + EnvManagerBaseTest, + TwistedTestCase +): + + @defer.inlineCallbacks + def test_runtime_logs(self): + env_id = 'env' + runtime_id = 'runtime' + stdout = ['ąąą\n', 'bbb\n', 'ććć\n'] + stderr = ['ddd\n', 'ęęę\n', 'fff\n'] + + env, *_ = self.register_env(env_id) + env.runtime().id.return_value = runtime_id + env.runtime().stdout.return_value = stdout + env.runtime().stderr.return_value = stderr + + wrapped_env = self.manager.environment("env") + runtime = wrapped_env.runtime(Mock()) + + yield runtime.prepare() + yield runtime.clean_up() + + stdout_path = self.new_path / env_id / f'{runtime_id}_stdout.txt' + self.assertTrue(stdout_path.exists()) + with stdout_path.open(mode='r', encoding='utf-8') as file: + self.assertEqual(list(file), stdout) + + stderr_path = self.new_path / env_id / f'{runtime_id}_stderr.txt' + self.assertTrue(stderr_path.exists()) + with stderr_path.open(mode='r', encoding='utf-8') as file: + self.assertEqual(list(file), stderr) + + class TestEnvironmentManagerDB( # pylint: disable=too-many-ancestors EnvManagerBaseTest, TwistedTestCase diff --git a/tests/golem/task/test_taskkeeper.py b/tests/golem/task/test_taskkeeper.py index 0f1cece84f..6c65d5ab2e 100644 --- a/tests/golem/task/test_taskkeeper.py +++ b/tests/golem/task/test_taskkeeper.py @@ -45,11 +45,13 @@ def async_run(request, success=None, error=None): success(result) -class TestTaskHeaderKeeperIsSupported(LogTestCase): +class TestTaskHeaderKeeperIsSupported(TempDirFixture, LogTestCase): + def setUp(self) -> None: + super().setUp() self.tk = TaskHeaderKeeper( old_env_manager=OldEnvManager(), - new_env_manager=NewEnvManager(), + new_env_manager=NewEnvManager(self.new_path), node=dt_p2p_factory.Node(), min_price=10.0) self.tk.old_env_manager.environments = {} @@ -123,12 +125,12 @@ def test_mask_mismatch(self): self.assertIn(UnsupportReason.MASK_MISMATCH, supported.desc) -class TaskHeaderKeeperBase(LogTestCase): +class TaskHeaderKeeperBase(TempDirFixture, LogTestCase): def setUp(self): super().setUp() self.thk = taskkeeper.TaskHeaderKeeper( old_env_manager=OldEnvManager(), - new_env_manager=NewEnvManager(), + new_env_manager=NewEnvManager(self.new_path), node=dt_p2p_factory.Node(), min_price=10.0, ) @@ -140,7 +142,7 @@ def setUp(self): self.tar = mock.Mock(spec=taskarchiver.TaskArchiver) self.thk = TaskHeaderKeeper( old_env_manager=OldEnvManager(), - new_env_manager=NewEnvManager(), + new_env_manager=NewEnvManager(self.new_path), node=dt_p2p_factory.Node(), min_price=10.0, task_archiver=self.tar, @@ -314,7 +316,7 @@ def test_check_max_tasks_per_owner(freezer, self): tk = TaskHeaderKeeper( old_env_manager=OldEnvManager(), - new_env_manager=NewEnvManager(), + new_env_manager=NewEnvManager(self.new_path), node=dt_p2p_factory.Node(), min_price=10, max_tasks_per_requestor=10)