Skip to content

Commit

Permalink
Make scratchpad counters thread-safe
Browse files Browse the repository at this point in the history
- Same solution as used in python stdlib to name threads and asyncio tasks
  • Loading branch information
nfcampos committed Jan 23, 2025
1 parent c961392 commit c43a9a4
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 18 deletions.
9 changes: 5 additions & 4 deletions libs/langgraph/langgraph/pregel/algo.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import functools

Check notice on line 1 in libs/langgraph/langgraph/pregel/algo.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

......................................... fanout_to_subgraph_10x: Mean +- std dev: 66.1 ms +- 1.5 ms ......................................... fanout_to_subgraph_10x_sync: Mean +- std dev: 56.9 ms +- 0.8 ms ......................................... fanout_to_subgraph_10x_checkpoint: Mean +- std dev: 81.1 ms +- 2.3 ms ......................................... fanout_to_subgraph_10x_checkpoint_sync: Mean +- std dev: 99.9 ms +- 2.7 ms ......................................... fanout_to_subgraph_100x: Mean +- std dev: 642 ms +- 15 ms ......................................... fanout_to_subgraph_100x_sync: Mean +- std dev: 550 ms +- 16 ms ......................................... fanout_to_subgraph_100x_checkpoint: Mean +- std dev: 819 ms +- 33 ms ......................................... fanout_to_subgraph_100x_checkpoint_sync: Mean +- std dev: 997 ms +- 19 ms ......................................... react_agent_10x: Mean +- std dev: 30.9 ms +- 0.8 ms ......................................... react_agent_10x_sync: Mean +- std dev: 23.2 ms +- 0.2 ms ......................................... react_agent_10x_checkpoint: Mean +- std dev: 39.1 ms +- 0.9 ms ......................................... react_agent_10x_checkpoint_sync: Mean +- std dev: 36.5 ms +- 1.1 ms ......................................... react_agent_100x: Mean +- std dev: 342 ms +- 7 ms ......................................... react_agent_100x_sync: Mean +- std dev: 272 ms +- 2 ms ......................................... react_agent_100x_checkpoint: Mean +- std dev: 642 ms +- 7 ms ......................................... react_agent_100x_checkpoint_sync: Mean +- std dev: 621 ms +- 15 ms ......................................... wide_state_25x300: Mean +- std dev: 23.8 ms +- 0.4 ms ......................................... wide_state_25x300_sync: Mean +- std dev: 16.0 ms +- 0.3 ms ......................................... wide_state_25x300_checkpoint: Mean +- std dev: 255 ms +- 16 ms ......................................... wide_state_25x300_checkpoint_sync: Mean +- std dev: 248 ms +- 13 ms ......................................... wide_state_15x600: Mean +- std dev: 27.8 ms +- 0.5 ms ......................................... wide_state_15x600_sync: Mean +- std dev: 18.4 ms +- 0.2 ms ......................................... wide_state_15x600_checkpoint: Mean +- std dev: 434 ms +- 13 ms ......................................... wide_state_15x600_checkpoint_sync: Mean +- std dev: 429 ms +- 13 ms ......................................... wide_state_9x1200: Mean +- std dev: 27.7 ms +- 0.6 ms ......................................... wide_state_9x1200_sync: Mean +- std dev: 18.3 ms +- 0.2 ms ......................................... wide_state_9x1200_checkpoint: Mean +- std dev: 284 ms +- 14 ms ......................................... wide_state_9x1200_checkpoint_sync: Mean +- std dev: 279 ms +- 15 ms

Check notice on line 1 in libs/langgraph/langgraph/pregel/algo.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------------+---------+-----------------------+ | Benchmark | main | changes | +=========================================+=========+=======================+ | fanout_to_subgraph_100x_checkpoint_sync | 991 ms | 997 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | wide_state_15x600_sync | 18.3 ms | 18.4 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | wide_state_9x1200 | 27.5 ms | 27.7 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | react_agent_10x | 30.6 ms | 30.9 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | wide_state_15x600_checkpoint | 429 ms | 434 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | wide_state_25x300_sync | 15.8 ms | 16.0 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | react_agent_100x_checkpoint_sync | 615 ms | 621 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | wide_state_15x600 | 27.5 ms | 27.8 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_100x_checkpoint | 809 ms | 819 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | wide_state_15x600_checkpoint_sync | 424 ms | 429 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | wide_state_25x300 | 23.5 ms | 23.8 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_10x_checkpoint | 79.9 ms | 81.1 ms: 1.02x slower | +-----------------------------------------+---------+-----------------------+ | wide_state_9x1200_checkpoint_sync | 274 ms | 279 ms: 1.02x slower | +-----------------------------------------+---------+-----------------------+ | wide_state_25x300_checkpoint | 250 ms | 255 ms: 1.02x slower | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_10x_checkpoint_sync | 97.8 ms | 99.9 ms: 1.02x slower | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_10x_sync | 55.2 ms | 56.9 ms: 1.03x slower | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_100x | 621 ms | 642 ms: 1.03x slower | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_10x | 63.0 ms | 66.1 ms: 1.05x slower | +-----------------------------------------+---------+-----------------------+ | Geometric mean | (ref) | 1.01x slower | +-----------------------------------------+---------+-----------------------+ Benchmark hidden because not significant (10): react_agent_100x, react_agent_100x_checkpoint, react_agent_100x_sync, react_agent_10x_sync, wide_state_9x1200_sync, fanout_to_subgraph_100x_sync, react_agent_10x_checkpoint, react_agent_10x_checkpoint_sync, wide_state_25x300_checkpoint_sync, wide_state_9x1200_checkpoint
import itertools
import sys
from collections import defaultdict, deque
from functools import partial
Expand Down Expand Up @@ -767,12 +768,12 @@ def _scratchpad(
null_resume_write = next(
(w for w in pending_writes if w[0] == NULL_TASK_ID and w[1] == RESUME), None
)

# using itertools.count as an atomic counter (+= 1 is not thread-safe)
return PregelScratchpad(
# call
call_counter=0,
call_counter=itertools.count(0).__next__,
# interrupt
interrupt_counter=-1,
interrupt_counter=itertools.count(0).__next__,
resume=next(
(w[2] for w in pending_writes if w[0] == task_id and w[1] == RESUME), []
),
Expand All @@ -781,7 +782,7 @@ def _scratchpad(
if null_resume_write is not None
else lambda: None,
# subgraph
subgraph_counter=0,
subgraph_counter=itertools.count(0).__next__,
)


Expand Down
7 changes: 4 additions & 3 deletions libs/langgraph/langgraph/pregel/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,19 +231,20 @@ def __init__(
if not self.config[CONF].get(CONFIG_KEY_DELEGATE) and isinstance(
scratchpad, PregelScratchpad
):
if scratchpad.subgraph_counter:
# if count is > 0, append to checkpoint_ns
# if count is 0, leave as is
if cnt := scratchpad.subgraph_counter():
self.config = patch_configurable(
self.config,
{
CONFIG_KEY_CHECKPOINT_NS: NS_SEP.join(
(
config[CONF][CONFIG_KEY_CHECKPOINT_NS],
str(scratchpad.subgraph_counter),
str(cnt),
)
)
},
)
scratchpad.subgraph_counter += 1
if not self.is_nested and config[CONF].get(CONFIG_KEY_CHECKPOINT_NS):
self.config = patch_configurable(
self.config,
Expand Down
12 changes: 6 additions & 6 deletions libs/langgraph/langgraph/pregel/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ def writer(
continue
# schedule the next task, if the callback returns one
wcall = calls[idx] if calls else None
cnt = scratchpad.call_counter
scratchpad.call_counter += 1
if next_task := self.schedule_task(task, cnt, wcall):
if next_task := self.schedule_task(
task, scratchpad.call_counter(), wcall
):
if fut := next(
(
f
Expand Down Expand Up @@ -331,9 +331,9 @@ def writer(
continue
# schedule the next task, if the callback returns one
wcall = calls[idx] if calls is not None else None
cnt = scratchpad.call_counter
scratchpad.call_counter += 1
if next_task := self.schedule_task(task, cnt, wcall):
if next_task := self.schedule_task(
task, scratchpad.call_counter(), wcall
):
# if the parent task was retried,
# the next task might already be running
if fut := next(
Expand Down
9 changes: 4 additions & 5 deletions libs/langgraph/langgraph/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,14 +342,14 @@ def __init__(
@dataclasses.dataclass(**{**_DC_KWARGS, "frozen": False})
class PregelScratchpad:
# call
call_counter: int
call_counter: Callable[[], int]
# interrupt
interrupt_counter: int
interrupt_counter: Callable[[], int]
resume: list[Any]
null_resume: Optional[Any]
_consume_null_resume: Callable[[], None]
# subgraph
subgraph_counter: int
subgraph_counter: Callable[[], int]

def consume_null_resume(self) -> Any:
if self.null_resume is not None:
Expand Down Expand Up @@ -468,8 +468,7 @@ def node(state: State):
conf = get_config()["configurable"]
# track interrupt index
scratchpad: PregelScratchpad = conf[CONFIG_KEY_SCRATCHPAD]
scratchpad.interrupt_counter += 1
idx = scratchpad.interrupt_counter
idx = scratchpad.interrupt_counter()
# find previous resume values
if scratchpad.resume:
if idx < len(scratchpad.resume):
Expand Down

0 comments on commit c43a9a4

Please sign in to comment.