Skip to content

Commit

Permalink
fix store gc and change job id format
Browse files Browse the repository at this point in the history
Signed-off-by: sagewe <wbwmat@gmail.com>
  • Loading branch information
sagewe committed Dec 11, 2023
1 parent d5577b0 commit 967fc5e
Show file tree
Hide file tree
Showing 18 changed files with 256 additions and 243 deletions.
176 changes: 84 additions & 92 deletions python/eggroll/computing/roll_pair/_gc.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,23 @@
import logging
import os
import queue
from threading import Thread
from threading import Thread, Lock

from eggroll.computing.tasks.store import StoreTypes
from eggroll.core.datastructure import create_simple_queue
from eggroll.core.meta_model import ErStore

L = logging.getLogger(__name__)


class WrappedDict:
def __init__(self, d):
self.d = d

def __getitem__(self, key):
return self.d[key]

def __setitem__(self, key, value):
self.d[key] = value

def __contains__(self, key):
return key in self.d

def get(self, key, default=None):
return self.d.get(key, default)

def items(self):
return self.d.items()

def pop(self, key, default=None):
return self.d.pop(key, default)

def __len__(self):
return len(self.d)

def __iter__(self):
return iter(self.d)

def __str__(self):
return str(self.d)

def __repr__(self):
return repr(self.d)

def __eq__(self, other):
return self.d == other
ReferenceCountLock = Lock()


class GcRecorder(object):
def __init__(self, rpc):
super(GcRecorder, self).__init__()
self.should_stop = False
self.record_rpc = rpc
self._gc_recorder = dict()
self.leveldb_recorder = set()
self.gc_queue = create_simple_queue()
self._record_rpc = rpc
self._gc_recorder = {}
self._gc_worker_queue = create_simple_queue()
self._runtime_gc_stopped = False
if (
"EGGROLL_GC_DISABLE" in os.environ
and os.environ["EGGROLL_GC_DISABLE"] == "1"
Expand All @@ -66,76 +28,106 @@ def __init__(self, rpc):
)
else:
L.info("global GC enabled. starting GC thread")
self.gc_thread = Thread(target=self.run, daemon=True)
self.gc_thread = Thread(target=self._runtime_gc_worker, daemon=True)
self.gc_thread.start()

@property
def gc_recorder(self):
return WrappedDict(self._gc_recorder)
def runtime_gc_stop(self):
L.debug("stop runtime gc thread")
self._runtime_gc_stopped = True

def stop(self):
self.should_stop = True
L.info("GC: gc_util.stop called")
def _runtime_gc_worker(self):
"""
Infinite loop to retrieve store represented by namespace and name
from gc_queue that is expected to be destroyed, and destroy it.
def run(self):
This method is expected to be called in a thread and won't return until
self.should_stop is set to True.
"""
if (
"EGGROLL_GC_DISABLE" in os.environ
and os.environ["EGGROLL_GC_DISABLE"] == "1"
):
L.info(
L.warning(
"global GC disabled, "
"will not execute gc but only record temporary RollPair during the whole session"
)
return
while not self.should_stop:
while not self._runtime_gc_stopped:
try:
rp_namespace_name = self.gc_queue.get(block=True, timeout=0.5)
namespace_name_tuple = self._gc_worker_queue.get(
block=True, timeout=0.5
)
except queue.Empty:
continue
if not rp_namespace_name:
if not namespace_name_tuple:
continue
L.debug(f"GC thread destroying rp={rp_namespace_name}")
self.record_rpc.create_rp(
id=-1,
namespace=rp_namespace_name[0],
name=rp_namespace_name[1],
total_partitions=1,
store_type=StoreTypes.ROLLPAIR_IN_MEMORY,
key_serdes_type=0,
value_serdes_type=0,
partitioner_type=0,
options={},
no_gc=True,
).destroy()

L.info(f"GC should_stop={self.should_stop}, stopping GC thread")
if L.isEnabledFor(logging.DEBUG):
L.debug(
f"GC thread destroying store: namespace={namespace_name_tuple[0]}, name={namespace_name_tuple[1]}"
)

def record(self, er_store: ErStore):
store_type = er_store._store_locator._store_type
name = er_store._store_locator._name
namespace = er_store._store_locator._namespace
self._record_rpc.destroy_store(
name=namespace_name_tuple[1],
namespace=namespace_name_tuple[0],
store_type=StoreTypes.ROLLPAIR_IN_MEMORY,
)
L.info("GC thread stopped")

def increase_ref_count(self, er_store: ErStore):
store_type = er_store.store_locator.store_type
name = er_store.store_locator.name
namespace = er_store.store_locator.namespace
if store_type != StoreTypes.ROLLPAIR_IN_MEMORY:
return
else:
L.debug(
"GC recording in memory table namespace={}, name={}".format(
namespace, name
with ReferenceCountLock:
count = self._gc_recorder.get((namespace, name))
if count is None:
count = 0
count += 1
self._gc_recorder[(namespace, name)] = count
if L.isEnabledFor(logging.DEBUG):
L.debug(
f"GC increase ref count. namespace={namespace}, name={name}, count={count}"
)
)
count = self.gc_recorder.get((namespace, name))
if count is None:
count = 0
self.gc_recorder[(namespace, name)] = count + 1
L.debug(f"GC recorded count={len(self.gc_recorder)}")
L.debug(f"GC recorded count={len(self._gc_recorder)}")

def decrease_ref_count(self, er_store):
if er_store._store_locator._store_type != StoreTypes.ROLLPAIR_IN_MEMORY:
if er_store.store_locator.store_type != StoreTypes.ROLLPAIR_IN_MEMORY:
return
t_ns_n = (er_store._store_locator._namespace, er_store._store_locator._name)
ref_count = self.gc_recorder.get(t_ns_n)
record_count = 0 if ref_count is None or ref_count == 0 else (ref_count - 1)
self.gc_recorder[t_ns_n] = record_count
if record_count == 0 and t_ns_n in self.gc_recorder:
L.debug(f"GC put in queue. namespace={t_ns_n[0]}, name={t_ns_n[1]}")
self.gc_queue.put(t_ns_n)
self.gc_recorder.pop(t_ns_n)
name = er_store.store_locator.name
namespace = er_store.store_locator.namespace
with ReferenceCountLock:
ref_count = self._gc_recorder.get((namespace, name))
if ref_count is None:
ref_count = 0
else:
ref_count -= 1
self._gc_recorder[(namespace, name)] = ref_count
if L.isEnabledFor(logging.DEBUG):
L.debug(
f"GC decrease ref count. namespace={namespace}, name={name}, count={ref_count}"
)
if ref_count == 0:
self._gc_worker_queue.put((namespace, name))
self._gc_recorder.pop((namespace, name))

def flush(self):
# stop gc thread
self.runtime_gc_stop()

if self._gc_recorder is None or len(self._gc_recorder) == 0:
return

for (namespace, name), v in dict(self._gc_recorder.items()).items():
try:
self._record_rpc.destroy_store(
name=name,
namespace=namespace,
store_type=StoreTypes.ROLLPAIR_IN_MEMORY,
)
except Exception as e:
raise RuntimeError(
f"fail to destroy store with name={name}, namespace={namespace}"
) from e
55 changes: 30 additions & 25 deletions python/eggroll/computing/roll_pair/_roll_pair.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,28 @@ def __init__(self, er_store: ErStore, rp_ctx: "RollPairContext", gc_enabled=None
self._ctx = rp_ctx
self._store = er_store
self._session_id = self.ctx.session_id
if gc_enabled is not None:
gc_enabled = rp_ctx.rpc_gc_enable
self.gc_enabled = gc_enabled

self._command_client = CommandClient(config=self.ctx.session.config)

# record store for gc
rp_ctx.gc_recorder.record(er_store)
# increase gc count only for in-memory store
if self._store.store_locator.store_type == StoreTypes.ROLLPAIR_IN_MEMORY:
rp_ctx.increase_store_gc_count(er_store)

self._is_destroyed = False
if gc_enabled is None:
gc_enabled = rp_ctx.is_rpc_gc_enabled
self._is_gc_enabled = gc_enabled

@property
def is_destroyed(self):
return self._is_destroyed

@is_destroyed.setter
def is_destroyed(self, value):
self._is_destroyed = value

# check if roll pair is destroyed
self.destroyed = False
@property
def is_gc_enabled(self):
return self._is_gc_enabled

@property
def command_client(self):
Expand All @@ -71,29 +82,23 @@ def config(self):
def ctx(self):
return self._ctx

@property
def should_cleanup(self):
if self._store.store_locator.store_type == StoreTypes.ROLLPAIR_IN_MEMORY:
return True
return False

def __del__(self):
if self.ctx.session.is_stopped():
# when session stopped, gc_recorder will be cleared, so we just return
# when session stopped, gc_recorder will be cleared, so we just return.
# notice that, when this happens, log will be disabled, so we can't log anything
# L.exception(f"try to cleanup store={self._store} but session stopped")
return
if self.destroyed:
L.debug(f"store={self._store} has been destroyed before")
if self.is_destroyed:
L.info(f"store={self._store} has been marked as destroyed before")
return
if not self.gc_enabled:
L.debug(f"GC not enabled: store={self._store}")
if not self.is_gc_enabled:
L.info(f"store={self._store} gc disabled, will not be cleaned up")
return

if not self.should_cleanup:
L.debug(f"store={self._store} should not cleanup")
return
self.ctx.gc_recorder.decrease_ref_count(self._store)
L.info(
f"{self} is being cleaned up, store reference of {self._store} will be decreased"
)
self.ctx.decrease_store_gc_count(self._store)

def __repr__(self):
return f"<{self.__class__.__name__}(_store={self._store}) at {hex(id(self))}>"
Expand Down Expand Up @@ -155,11 +160,11 @@ def take(self, num: int, options: dict = None):

@roll_pair_method_trace
def destroy(self):
if self.destroyed:
if self.is_destroyed:
L.exception(f"store={self._store} has been destroyed before")
raise ValueError(f"store:{self.get_store()} has been destroyed before")
response = tasks.Destroy.submit(self)
self.destroyed = True
self.is_destroyed = True
return response

def copy_as(self, name: str, namespace: str, store_type: str):
Expand Down
Loading

0 comments on commit 967fc5e

Please sign in to comment.