Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Fix config cannot be updated #5171

Merged
merged 2 commits into from
Apr 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 49 additions & 2 deletions golem/envs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
from copy import deepcopy
from enum import Enum
from logging import Logger, getLogger
from threading import RLock
from threading import Lock, RLock

from typing import Any, Callable, Dict, List, Optional, NamedTuple, Union, \
Sequence, Iterable, ContextManager, Set, Tuple, TYPE_CHECKING
Sequence, Iterable, ContextManager, Set, Tuple, Type, TYPE_CHECKING

from dataclasses import dataclass, field
from twisted.internet.threads import deferToThread
Expand Down Expand Up @@ -599,3 +599,50 @@ def listen(
listener: EnvEventListener
) -> None:
self._event_listeners.setdefault(event_type, set()).add(listener)


def delayed_config(cls: Type[Environment]) -> Type[Environment]:
"""
This class decorator allows to save config update and apply it, when env is
disabled.
Mutex prevents the following scenario
Thread 1 Thread 2
call apply_next_config
call update_config
status is disabled, so calls
super().update_config
cls.update_config
with _next_config
"""
# FIXME workaround https://github.com/python/mypy/issues/5865
cls2: Any = cls

class DelayedConfigWrapper(cls2):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._next_config: Optional[EnvConfig] = None
self._config_lock = Lock()

def apply_next_config(_):
with self._config_lock:
if self._next_config is None:
return
self._logger.debug("Applying saved config")
config = self._next_config
self._next_config = None
cls.update_config(self, config)

self.listen(EnvEventType.DISABLED, apply_next_config)

def update_config(self, new_config: EnvConfig) -> None:
with self._config_lock:
if self._status == EnvStatus.DISABLED:
self._logger.debug("Config applied immediately")
super().update_config(new_config)
return

self._logger.debug("Config saved for later")
self._next_config = new_config

return DelayedConfigWrapper
4 changes: 3 additions & 1 deletion golem/envs/docker/cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
RuntimePayload,
RuntimeStatus,
UsageCounter,
UsageCounterValues
UsageCounterValues,
delayed_config,
)
from golem.envs.docker import DockerRuntimePayload, DockerPrerequisites
from golem.envs.docker.whitelist import Whitelist
Expand Down Expand Up @@ -496,6 +497,7 @@ def usage_counter_values(self) -> UsageCounterValues:
return deepcopy(self._counters)


@delayed_config
class DockerCPUEnvironment(EnvironmentBase):

MIN_MEMORY_MB: ClassVar[int] = 1024
Expand Down
18 changes: 5 additions & 13 deletions golem/task/taskcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def __init__(
)
self._new_computer = NewTaskComputer(
env_manager=env_manager,
work_dir=task_server.get_task_computer_root(),
work_dir=Path(task_server.get_task_computer_root()),
stats_keeper=self.stats
)

Expand Down Expand Up @@ -263,10 +263,8 @@ def change_config(
config_desc: 'ClientConfigDescriptor',
in_background: bool = True
) -> defer.Deferred:
work_dir = Path(self._task_server.get_task_computer_root())
yield self._new_computer.change_config(
config_desc=config_desc,
work_dir=work_dir)
self._new_computer.change_config(
config_desc=config_desc)
return (yield self._old_computer.change_config(
config_desc=config_desc,
in_background=in_background))
Expand Down Expand Up @@ -480,13 +478,9 @@ def get_current_computing_env(self) -> 'Optional[EnvId]':
def change_config(
self,
config_desc: 'ClientConfigDescriptor',
work_dir: Path
) -> defer.Deferred:
assert not self._is_computing()
self._work_dir = work_dir

) -> None:
config_dict = dict(
work_dirs=[work_dir],
work_dirs=[self._work_dir],
cpu_count=config_desc.num_cores,
memory_mb=scale_memory(
config_desc.max_memory_size,
Expand All @@ -505,8 +499,6 @@ def change_config(
# TODO: GPU options in config_dict
docker_gpu.update_config(DockerGPUConfig(**config_dict))

return defer.succeed(None)

def quit(self):
if self.has_assigned_task():
self.task_interrupted()
Expand Down
43 changes: 43 additions & 0 deletions scripts/node_integration_tests/nodes/provider/configure_or_die.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/usr/bin/env python

import logging
from unittest.mock import patch

from twisted.internet.defer import inlineCallbacks

from golemapp import main

from golem.client import Client
from golem.task.taskserver import TaskServer


def on_exception():
logging.critical("#### Integration test failed ####")


client_change_config_orig = Client.change_config


def client_change_config(self: Client, *args, **kwargs):
try:
client_change_config_orig(self, *args, **kwargs)
except: # noqa pylint: disable=broad-except
on_exception()


task_server_change_config_orig = TaskServer.change_config


@inlineCallbacks
def task_server_change_config(self: TaskServer, *args, **kwargs):
try:
yield task_server_change_config_orig(self, *args, **kwargs)
except: # noqa pylint: disable=broad-except
on_exception()


with patch("golem.client.Client.change_config",
client_change_config), \
patch("golem.task.taskserver.TaskServer.change_config",
task_server_change_config):
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import time
from functools import partial

from scripts.node_integration_tests import helpers
from ...test_config_base import NodeId
from ..task_api.playbook import Playbook as BasePlaybook


class Playbook(BasePlaybook):
def wait_for_computing_task(self):
def on_success(result):
state = result['provider_state']
print(f"provider state: {state}")
if state['status'] == 'Computing':
self.next()
else:
time.sleep(10)

def on_error(_):
print(f"failed getting provider stats")
self.fail()
return self.call(NodeId.provider, 'comp.tasks.stats',
on_success=on_success, on_error=on_error)

def ui_stop(self, node_id: NodeId):
def on_success(_):
print(f"stopped {node_id.value}")
self.next()

def on_error(_):
print(f"stopping {node_id.value} failed")
self.fail()
return self.call(node_id, 'ui.stop', on_success=on_success,
on_error=on_error)

def change_config(self, node_id: NodeId):
opts = {
"node_name": "a new name",
}

def on_success(_):
print(f"reconfigured {node_id.value}")
time.sleep(10) # give time for async operations to process
self.next()

def on_error(_):
print(f"reconfiguring {node_id.value} failed")
self.fail()

return self.call(node_id, 'env.opts.update', opts,
on_success=on_success, on_error=on_error)

def check_if_test_failed(self, node_id: NodeId):
test_failed = bool(helpers.search_output(
self.output_queues[node_id],
".*#### Integration test failed ####.*"))

if test_failed:
self.fail("found failure marker in log")

print("no failure marker found in log")
self.next()

steps = BasePlaybook.initial_steps + (
BasePlaybook.step_enable_app,
BasePlaybook.step_create_task,
BasePlaybook.step_get_task_id,
BasePlaybook.step_get_task_status,
wait_for_computing_task,
partial(ui_stop, node_id=NodeId.provider),
partial(change_config, node_id=NodeId.provider),
partial(check_if_test_failed, node_id=NodeId.provider),
Copy link
Contributor

@shadeofblue shadeofblue Apr 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we somehow detect that the configuration has been completed? (instead of waiting the 10 seconds)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if we should, but in the current "fire and forget" implementation model, it's not easy to achieve. I tried.

)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from ...test_config_base import NodeId

from ..task_api.test_config import TestConfig as TestConfigBase


class TestConfig(TestConfigBase):
def __init__(self):
super().__init__()
self.nodes[NodeId.provider].script = 'provider/configure_or_die'
5 changes: 0 additions & 5 deletions tests/golem/envs/docker/cpu/test_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,6 @@ def test_wrong_type(self):
with self.assertRaises(AssertionError):
self.env.update_config(object())

def test_enabled_status(self):
self.env._status = EnvStatus.ENABLED
with self.assertRaises(ValueError):
self.env.update_config(Mock(spec=DockerCPUConfig))

@patch_env('_validate_config', side_effect=ValueError)
def test_invalid_config(self, validate):
config = Mock(spec=DockerCPUConfig)
Expand Down
91 changes: 90 additions & 1 deletion tests/golem/envs/test_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@
from unittest import TestCase
from unittest.mock import Mock, patch

from dataclasses import dataclass

from twisted.internet import defer

from golem.envs import (
EnvConfig,
EnvEvent,
EnvEventType,
EnvironmentBase,
EnvStatus,
Prerequisites
Prerequisites,
delayed_config,
)


Expand Down Expand Up @@ -121,3 +126,87 @@ def test_re_register(self):
self.assertEqual(self.env._event_listeners, {
EnvEventType.ERROR_OCCURRED: {listener}
})


@dataclass
class MyConfig(EnvConfig):
i: int

def to_dict(self) -> dict:
pass

@staticmethod
def from_dict(data):
pass


@delayed_config
class MyEnv(EnvironmentBase):
def __init__(self, config: MyConfig) -> None:
super().__init__()
self._config = config

def update_config(self, config: EnvConfig) -> None:
assert isinstance(config, MyConfig)
self._logger.debug("dupa %r", self._event_listeners)
if self._status != EnvStatus.DISABLED:
raise ValueError
self._config = config
self._config_updated(config)

def config(self) -> MyConfig:
return self._config

@classmethod
def supported(cls):
raise NotImplementedError

def prepare(self):
raise NotImplementedError

def clean_up(self):
raise NotImplementedError

def run_benchmark(self):
raise NotImplementedError

def parse_prerequisites(self, prerequisites_dict):
raise NotImplementedError

def install_prerequisites(self, prerequisites):
raise NotImplementedError

def parse_config(self, config_dict):
raise NotImplementedError

def supported_usage_counters(self):
raise NotImplementedError

def runtime(self, payload, config=None):
raise NotImplementedError


def execute(f, *args, **kwargs):
try:
return defer.succeed(f(*args, **kwargs))
except Exception as exc: # pylint: disable=broad-except
return defer.fail(exc)


@patch('golem.envs.deferToThread', execute)
class TestDelayedConfig(TestCase):

def setUp(self) -> None:
config = MyConfig(i=1)
self.env = MyEnv(config)

def test_update_config_when_disabled(self):
self.env.update_config(MyConfig(i=2))
self.assertEqual(self.env.config().i, 2)

def test_update_config_when_enabled(self):
self.env._env_enabled()
self.env.update_config(MyConfig(i=2))
self.assertEqual(self.env.config().i, 1)
self.env._env_disabled()
self.assertEqual(self.env.config().i, 2)
Loading