Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

LoopingCallService sync by default #3661

Merged
merged 4 commits into from
Feb 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 3 additions & 28 deletions golem/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1548,12 +1548,9 @@ def block_node(


class DoWorkService(LoopingCallService):
_client = None # type: Client

def __init__(self, client: Client) -> None:
super().__init__(interval_seconds=1)
self._client = client
self._check_ts: Dict[Hashable, Any] = {}

def start(self):
super().start(now=False)
Expand Down Expand Up @@ -1582,17 +1579,8 @@ def _run(self):
except Exception:
logger.exception("ranking.sync_network failed")

def _time_for(self, key: Hashable, interval_seconds: float):
now = time.time()
if now >= self._check_ts.get(key, 0):
self._check_ts[key] = now + interval_seconds
return True
return False


class MonitoringPublisherService(LoopingCallService):
_task_server = None # type: TaskServer

def __init__(self,
task_server: TaskServer,
interval_seconds: int) -> None:
Expand Down Expand Up @@ -1635,19 +1623,13 @@ def _run(self):


class NetworkConnectionPublisherService(LoopingCallService):
_client = None # type: Client

def __init__(self,
client: Client,
interval_seconds: int) -> None:
super().__init__(interval_seconds)
self._client = client
self._last_value = self.poll()

def _run_async(self):
# Skip the async_run call and publish events in the main thread
self._run()

def _run(self):
current_value = self.poll()
if current_value == self._last_value:
Expand All @@ -1663,26 +1645,22 @@ def poll(self) -> Dict[str, Any]:


class TaskArchiverService(LoopingCallService):
_task_archiver = None # type: TaskArchiver

def __init__(self,
task_archiver: TaskArchiver) -> None:
super().__init__(interval_seconds=TASKARCHIVE_MAINTENANCE_INTERVAL)
super().__init__(interval_seconds=TASKARCHIVE_MAINTENANCE_INTERVAL,
run_in_thread=True)
self._task_archiver = task_archiver

def _run(self):
self._task_archiver.do_maintenance()


class ResourceCleanerService(LoopingCallService):
_client = None # type: Client
older_than_seconds = 0 # type: int

def __init__(self,
client: Client,
interval_seconds: int,
older_than_seconds: int) -> None:
super().__init__(interval_seconds)
super().__init__(interval_seconds, run_in_thread=True)
self._client = client
self.older_than_seconds = older_than_seconds

Expand All @@ -1694,8 +1672,6 @@ def _run(self):


class TaskCleanerService(LoopingCallService):
_client = None # type: Client

def __init__(self,
client: Client,
interval_seconds: int) -> None:
Expand All @@ -1707,7 +1683,6 @@ def _run(self):


class MaskUpdateService(LoopingCallService):

def __init__(
self,
requested_task_manager: 'RequestedTaskManager',
Expand Down
8 changes: 4 additions & 4 deletions golem/core/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ class LoopingCallService(IService):

This implementation uses LoopingCall from Twisted framework.
"""
__interval_seconds = 0 # type: int
_loopingCall = None # type: LoopingCall

def __init__(self, interval_seconds: int = 1):
def __init__(self, interval_seconds: int = 1, run_in_thread: bool = False) \
-> None:
self.__interval_seconds = interval_seconds
self._loopingCall = LoopingCall(self._run_async)
self._loopingCall = LoopingCall(
self._run_async if run_in_thread else self._run)

@property
def running(self) -> bool:
Expand Down
16 changes: 12 additions & 4 deletions golem/task/taskarchiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class TaskArchiver(object):
"""

def __init__(self, datadir=None, max_tasks=TASKARCHIVE_MAX_TASKS):
self._input_lock = threading.Lock()
self._input_tasks = []
self._input_statuses = []
self._archive_lock = threading.Lock()
Expand All @@ -50,14 +51,16 @@ def add_task(self, task_header):
"""Schedule a task to be archived.
:param task_header: Header of task to be archived
"""
self._input_tasks.append(ArchTask(task_header))
with self._input_lock:
self._input_tasks.append(ArchTask(task_header))

def add_support_status(self, uuid, support_status):
"""Schedule support status of a task to be archived.
:param uuid: Identifier of task the status belongs to
:param support_status: SupportStatus object denoting the status
"""
self._input_statuses.append((uuid, support_status))
with self._input_lock:
self._input_statuses.append((uuid, support_status))

def do_maintenance(self):
"""Updates information on unsupported task reasons and
Expand All @@ -66,8 +69,9 @@ def do_maintenance(self):
functions. Optimizes internal structures and, if needed, writes the
entire structure to a file.
"""
input_tasks, self._input_tasks = self._input_tasks, []
input_statuses, self._input_statuses = self._input_statuses, []
with self._input_lock:
input_tasks, self._input_tasks = self._input_tasks, []
input_statuses, self._input_statuses = self._input_statuses, []
with self._archive_lock:
ntasks_to_take = self._max_tasks - len(self._archive.tasks)
if ntasks_to_take < len(input_tasks):
Expand Down Expand Up @@ -139,6 +143,10 @@ def get_unsupport_reasons(self, last_n_days, today=None):
For each task we only take into consideration the most recent support
status of this task.
"""
with self._archive_lock:
return self._get_unsupport_reasons(last_n_days, today)

def _get_unsupport_reasons(self, last_n_days, today):
if not today:
today = datetime.datetime.now(pytz.utc)
today = today.replace(hour=0, minute=0, second=0, microsecond=0)
Expand Down
6 changes: 3 additions & 3 deletions tests/golem/core/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def test_service_invalid_stop():

class CountingService(LoopingCallService):
def __init__(self):
super(CountingService, self).__init__()
super().__init__(run_in_thread=True)
self.clock = Clock()
self.count = 0
# Mock the real clock.
Expand All @@ -54,7 +54,7 @@ def _run_async(self):

class AsyncCountingService(LoopingCallService):
def __init__(self):
super(AsyncCountingService, self).__init__()
super().__init__(run_in_thread=True)
self.count = 0

def _run(self):
Expand All @@ -63,7 +63,7 @@ def _run(self):

class ExceptionalService(LoopingCallService):
def __init__(self, delay):
super(ExceptionalService, self).__init__()
super().__init__(run_in_thread=True)
self.initial_delay = delay
self.delay = 0
self.clock = Clock()
Expand Down
20 changes: 0 additions & 20 deletions tests/golem/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,26 +561,6 @@ def raise_exc():
self.client.p2pservice.ping_peers.assert_called()
assert logger.exception.call_count == 4

@freeze_time("2018-01-01 00:00:00")
def test_time_for(self):
key = 'payments'
interval = 4.0

assert key not in self.do_work_service._check_ts
assert self.do_work_service._time_for(key, interval)
assert key in self.do_work_service._check_ts

next_check = self.do_work_service._check_ts[key]

with freeze_time("2018-01-01 00:00:01"):
assert not self.do_work_service._time_for(key, interval)
assert self.do_work_service._check_ts[key] == next_check

with freeze_time("2018-01-01 00:01:00"):
assert self.do_work_service._time_for(key, interval)
assert self.do_work_service._check_ts[
key] == time.time() + interval

@freeze_time("2018-01-01 00:00:00")
def test_intervals(self):
self.do_work_service._run()
Expand Down