From e4e63c7b5b91273b3aae04fda59cc5a21c848de2 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 12 Dec 2023 14:24:57 -0800 Subject: [PATCH] fix: add lock to flow control (#899) --- google/cloud/bigtable/batcher.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/google/cloud/bigtable/batcher.py b/google/cloud/bigtable/batcher.py index 8f0cabadd..f9b85386d 100644 --- a/google/cloud/bigtable/batcher.py +++ b/google/cloud/bigtable/batcher.py @@ -114,6 +114,7 @@ def __init__( self.inflight_size = 0 self.event = threading.Event() self.event.set() + self._lock = threading.Lock() def is_blocked(self): """Returns True if: @@ -132,8 +133,9 @@ def control_flow(self, batch_info): Calculate the resources used by this batch """ - self.inflight_mutations += batch_info.mutations_count - self.inflight_size += batch_info.mutations_size + with self._lock: + self.inflight_mutations += batch_info.mutations_count + self.inflight_size += batch_info.mutations_size self.set_flow_control_status() def wait(self): @@ -158,8 +160,9 @@ def release(self, batch_info): Release the resources. Decrement the row size to allow enqueued mutations to be run. """ - self.inflight_mutations -= batch_info.mutations_count - self.inflight_size -= batch_info.mutations_size + with self._lock: + self.inflight_mutations -= batch_info.mutations_count + self.inflight_size -= batch_info.mutations_size self.set_flow_control_status()