From fea80a59bcb680960e8d616a3842cc8910e14dc8 Mon Sep 17 00:00:00 2001 From: Stephen Shirley Date: Fri, 17 Apr 2015 18:11:21 +0200 Subject: [PATCH 1/2] Major zookeeper support update; Switch from events to polling. Using ZK watches meant events were being constantly generated, this change switches over to polling the incoming ZK shared path every half second. Also: - Add a lot more error handling, especially for corner-cases. - Make the log output a lot less chatty - Added time checks around some of the heavy-weight ZK operations - Show a warning if we're failing to keep our propagation/registration intervals --- infrastructure/beacon_server.py | 220 ++++++++++++++++++-------------- lib/util.py | 42 ++++++ lib/zookeeper.py | 56 ++++---- 3 files changed, 202 insertions(+), 116 deletions(-) diff --git a/infrastructure/beacon_server.py b/infrastructure/beacon_server.py index 58f41096ba..631292b7c9 100644 --- a/infrastructure/beacon_server.py +++ b/infrastructure/beacon_server.py @@ -37,7 +37,7 @@ from lib.path_store import PathPolicy, PathStoreRecord, PathStore from lib.util import (read_file, write_file, get_cert_chain_file_path, get_sig_key_file_path, get_trc_file_path, init_logging, log_exception, - trace) + trace, timed, sleep_interval) from lib.thread import thread_safety_net from lib.zookeeper import (Zookeeper, ZkConnectionLoss, ZkNoNodeError) from Crypto import Random @@ -91,14 +91,10 @@ def __init__(self, addr, topo_file, config_file, path_policy_file): # Set when we have connected and read the existing recent and incoming # PCBs self._state_synced = threading.Event() - # Set anytime the connection status changes - self._connection_event = threading.Semaphore(value=0) # TODO(kormat): def zookeeper host/port in topology self.zk = Zookeeper( self.topology.isd_id, self.topology.ad_id, "bs", self.addr.host_addr, ["localhost:2181"], - on_connect=self._on_connect, - on_disconnect=self._on_disconnect, ensure_paths=( self.ZK_INCOMING_PATH, self.ZK_RECENT_PATH) @@ -174,7 +170,6 @@ def store_pcb(self, beacon): logging.debug("Unable to store PCB in shared path: " "no connection to ZK") return - logging.debug("PCB stored") def process_pcbs(self, pcbs): """ @@ -405,22 +400,6 @@ def handle_unverified_beacons(self): pcb = self.unverified_beacons.popleft() self._try_to_verify_beacon(pcb) - def _on_connect(self): - """ - Callback given to lib.zookeeper to call everytime we connect to - Zookeeper - """ - logging.debug("Beaconserver: zk connected") - self._connection_event.release() - - def _on_disconnect(self): - """ - Callback given to lib.zookeeper to call everytime we disconnect from - Zookeeper - """ - logging.debug("Beaconserver: zk disconnected") - self._connection_event.release() - @thread_safety_net("read_shared_pcbs") def read_shared_pcbs(self): """ @@ -434,76 +413,102 @@ def read_shared_pcbs(self): ZK path. """ while True: - self._connection_event.acquire() - logging.debug("ZK connection event happened") - self._state_synced.clear() if not self.zk.is_connected(): - continue + self._state_synced.clear() + self.zk.wait_connected() + else: + time.sleep(0.5) try: - # Register that we can now accept and store PCBs in ZK - self.zk.join_party() - # Prime our path list with the previous propagation period's - # set of PCBs - logging.debug("Load recent PCBs from shared path: started") - recent_entries = self.zk.get_shared_entries(self.ZK_RECENT_PATH) - self._process_shared_pcbs(self.ZK_RECENT_PATH, recent_entries) - logging.debug("Load recent PCBs from shared path: complete") - # Clear our cache of previously-seen shared entries - self._seen_entries.clear() - # Get notified anytime a new PCB appears in the shared - # 'incoming' path. Our callback will be called immediately with - # the current entries. - self.zk.watch_children( - self.ZK_INCOMING_PATH, - lambda entries: self._process_shared_pcbs( - self.ZK_INCOMING_PATH, - entries)) + if not self._state_synced.is_set(): + # Register that we can now accept and store PCBs in ZK + self.zk.join_party() + # Prime our path list with the previous propagation period's + # set of PCBs + self._read_shared_entries(self.ZK_RECENT_PATH, "recent") + # Clear our cache of previously-seen shared entries + self._seen_entries.clear() + self._read_shared_entries(self.ZK_INCOMING_PATH, "incoming") except ZkConnectionLoss: continue + self._state_synced.set() + def _read_shared_entries(self, path, name): + """ + Read all entries from the specified path and return them + """ + desc = "Fetching list of %s PCBs from shared path" % name + entries = self.zk.get_shared_entries( + path, + timed_desc=desc) + desc = "Processing %s PCBs from shared path" % name + count = self._process_shared_pcbs( + path, + entries, + timed_desc=desc) + return count + + @timed(1.0) def _process_shared_pcbs(self, path, entries): """ Retrieve new shared beacons send them for local processing. """ # Limit the number of entries we try handle # TODO(kormat): move constant to proper place - max_entries = 50 - entries = entries[:max_entries] + max_entries = 20 if path == self.ZK_INCOMING_PATH: # Only read new incoming PCBS entries_set = set(entries) new_set = entries_set - self._seen_entries - logging.debug("_process_shared_pcbs() called with %d entries" % - len(entries_set)) - logging.debug("_process_shared_pcbs() new entries: %s" % - sorted(new_set)) + if len(new_set) > 0: + logging.debug("Processing %d new (out of %d total) " + "shared PCBs", len(new_set), len(entries_set)) # Remove old entries that are no longer in the shared path self._seen_entries = new_set - self._seen_entries | entries_set - new_entries = list(new_set) + new_entries = list(new_set)[:max_entries] else: # Otherwise we're dealing with the recent pcbs, and we want to - # just read them all. - new_entries = entries + # just read a reasonable number. + new_entries = entries[:max_entries] # TODO(kormat): move constant to proper place chunk_size = 10 + pcbs = [] for i in range(0, len(new_entries), chunk_size): - pcbs = [] for entry in new_entries[i:i+chunk_size]: try: raw = self.zk.get_shared_item(path, entry) except ZkConnectionLoss: logging.warning("Unable to retrieve PCB from shared path: " "no connection to ZK") - return + break except ZkNoNodeError: - logging.info("Unable to retrieve PCB from shared path: " + logging.debug("Unable to retrieve PCB from shared path: " "no such entry (%s/%s)" % (path, entry)) continue pcbs.append(PathSegment(raw=raw)) - self.process_pcbs(pcbs) - # We've just caught up on all incoming PCBs - if path == self.ZK_INCOMING_PATH: - self._state_synced.set() + self.process_pcbs(pcbs) + return len(pcbs) + + def _move_shared_pcbs(self): + """ + Move new shared beacons to the 'recent' shared path. + """ + # Check to make sure we didn't just disconnect or reconnect + if not (self._state_synced.is_set() and self.zk.have_lock()): + return False + try: + self.zk.move_shared_items( + self.ZK_INCOMING_PATH, + self.ZK_RECENT_PATH, + timed_desc="Moving PCBs from 'incoming' to 'recent'") + except ZkConnectionLoss: + logging.warning("Connection dropped while moving shared items") + return False + except ZkNoNodeError: + logging.error("Item not found when moving shared items, " + "could mean there are multiple masters. " + "Will drop lock just in case.") + return False + return True class CoreBeaconServer(BeaconServer): """ @@ -526,6 +531,7 @@ def propagate_core_pcb(self, pcb): """ assert isinstance(pcb, PathSegment) ingress_if = pcb.trcf.if_id + count = 0 for core_router in self.topology.routing_edge_routers: new_pcb = copy.deepcopy(pcb) egress_if = core_router.interface.if_id @@ -537,22 +543,30 @@ def propagate_core_pcb(self, pcb): beacon = PathConstructionBeacon.from_values(self.addr.get_isd_ad(), dst, new_pcb) self.send(beacon, core_router.addr) - logging.info("Core PCB propagated!") + count += 1 + return count @thread_safety_net("handle_pcbs_propagation") def handle_pcbs_propagation(self): """ Generates a new beacon or gets ready to forward the one received. """ + master = False while True: # Wait until we have enough context to be a useful master # candidate. self._state_synced.wait() - logging.debug("handle_pcbs_propagation() Trying to become master") + if not master: + logging.debug("Trying to become master") if not self.zk.get_lock(): - logging.debug("handle_pcbs_propagation() Not Master, spinning") + if master: + logging.debug("No longer master") + master = False continue - logging.debug("handle_pcbs_propagation() Am master, propagating") + if not master: + logging.debug("Became master") + master = True + start_propagation = time.time() # Create beacon for downstream ADs. downstream_pcb = PathSegment() timestamp = int(time.time()) @@ -565,20 +579,19 @@ def handle_pcbs_propagation(self): core_pcb.iof = InfoOpaqueField.from_values(OFT.TDC_XOVR, False, timestamp, self.topology.isd_id) core_pcb.trcf = TRCField() - self.propagate_core_pcb(core_pcb) + count = self.propagate_core_pcb(core_pcb) # Propagate received beacons. A core beacon server can only receive # beacons from other core beacon servers. while self.beacons: pcb = self.beacons.popleft() - self.propagate_core_pcb(pcb) - # Clear old entries - try: - self.zk.move_shared_items(self.ZK_INCOMING_PATH, - self.ZK_RECENT_PATH) - except ZkConnectionLoss: - logging.warning("Connection dropped while moving shared items") + count += self.propagate_core_pcb(pcb) + logging.info("Propagated %d Core PCBs", count) + if not self._move_shared_pcbs(): + self.zk.release_lock() + master = False continue - time.sleep(self.config.propagation_time) + sleep_interval(start_propagation, self.config.propagation_time, + "PCB propagation") @thread_safety_net("register_segments") def register_segments(self): @@ -587,11 +600,17 @@ def register_segments(self): "register_segments") return while True: - logging.debug("register_segments() waiting for lock") + lock = self.zk.have_lock() + if not lock: + logging.debug("register_segements: waiting for lock") self.zk.wait_lock() - logging.debug("register_segments() have lock") + if not lock: + logging.debug("register_segments: have lock") + lock = True + start_registration = time.time() self.register_core_segments() - time.sleep(self.config.registration_time) + sleep_interval(start_registration, self.config.registration_time, + "Path registration") def register_core_segment(self, pcb): """ @@ -613,23 +632,24 @@ def register_core_segment(self, pcb): self.topology.path_servers[0].addr) pkt = PathMgmtPacket.from_values(PMT.RECORDS, records, None, self.addr.get_isd_ad(), dst) - logging.debug("Registering core path with local PS.") self.send(pkt, dst.host_addr) def process_pcbs(self, pcbs): """ Processes new beacons and appends them to beacon list. """ - logging.info("PCBs received: %d", len(pcbs)) + count = 0 for pcb in pcbs: # Before we append the PCB for further processing we need to check # that it hasn't been received before. for ad in pcb.ads: if (ad.pcbm.spcbf.isd_id == self.topology.isd_id and ad.pcbm.ad_id == self.topology.ad_id): - logging.debug("Core Segment PCB already seen. Dropping...") + count += 1 continue self._try_to_verify_beacon(pcb) + if count: + logging.debug("Dropped %d previously seen Core Segment PCBs", count) def _check_certs_trc(self, isd_id, ad_id, cert_chain_version, trc_version, if_id): @@ -652,13 +672,15 @@ def register_core_segments(self): """ Register the core segment between core ADs. """ + count = 0 while self.core_segments: new_pcb = copy.deepcopy(self.core_segments.popleft()) ad_marking = self._create_ad_marking(new_pcb.trcf.if_id, 0) new_pcb.add_ad(ad_marking) new_pcb.segment_id = self._get_segment_rev_token(new_pcb) self.register_core_segment(new_pcb) - logging.info("Core path registered") + count += 1 + logging.info("Registered %d Core paths", count) class LocalBeaconServer(BeaconServer): @@ -764,18 +786,23 @@ def register_segments(self): "leaving register_segments") return while True: - logging.debug("register_segments() waiting for lock") + lock = self.zk.have_lock() + if not lock: + logging.debug("register_segements: waiting for lock") self.zk.wait_lock() - logging.debug("register_segments() have lock") + if not lock: + logging.debug("register_segments: have lock") + lock = True + start_registration = time.time() self.register_up_segments() self.register_down_segments() - time.sleep(self.config.registration_time) + sleep_interval(start_registration, self.config.registration_time, + "Path registration") def process_pcbs(self, pcbs): """ Processes new beacons and appends them to beacon list. """ - logging.info("PCBs received: %d", len(pcbs)) for pcb in pcbs: if self._check_filters(pcb): self._try_to_verify_beacon(pcb) @@ -885,26 +912,31 @@ def handle_pcbs_propagation(self): Main loop to propagate received beacons. """ # TODO: define function that dispaches the pcbs among the interfaces + master = False while True: # Wait until we have enough context to be a useful master # candidate. self._state_synced.wait() - logging.debug("handle_pcbs_propagation() Trying to become master") + if not master: + logging.debug("Trying to become master") if not self.zk.get_lock(): - logging.debug("handle_pcbs_propagation() Not Master, spinning") + if master: + logging.debug("No longer master") + master = False continue - logging.debug("handle_pcbs_propagation() Am master, propagating") + if not master: + logging.debug("Became master") + master = True + start_propagation = time.time() best_segments = self.beacons.get_best_segments() for pcb in best_segments: self.propagate_downstream_pcb(pcb) - # Clear old entries - try: - self.zk.move_shared_items(self.ZK_INCOMING_PATH, - self.ZK_RECENT_PATH) - except ZkConnectionLoss: - logging.warning("Connection dropped while moving shared items") + if not self._move_shared_pcbs(): + self.zk.release_lock() + master = False continue - time.sleep(self.config.propagation_time) + sleep_interval(start_propagation, self.config.propagation_time, + "PCB propagation") def register_up_segments(self): """ diff --git a/lib/util.py b/lib/util.py index 1d3770c53f..13c9ada348 100644 --- a/lib/util.py +++ b/lib/util.py @@ -195,3 +195,45 @@ def trace(): path = os.path.join(TRACE_DIR, "%s.trace.html" % os.environ['SUPERVISOR_PROCESS_NAME']) trace_start(path) + +def timed(limit): + """ + Decorator to measure to execution time of a function, and log a warning if + it takes too long. The wrapped function takes an optional `timed_desc` + string parameter which is printed as part of the warning. If `timed_desc` + isn't passed in, then the wrapped function's path is printed instead. + + :param float limit: If the wrapped function takes more than `limit` + seconds, log a warning. + """ + def wrap(f): + def wrapper(*args, timed_desc=None, **kwargs): + start = time.time() + ret = f(*args, **kwargs) + elapsed = time.time() - start + if elapsed > limit: + if not timed_desc: + timed_desc = "Call to %s.%s" % (f.__module__, f.__name__) + logging.warning("%s took too long: %.3fs", timed_desc, elapsed) + return ret + return wrapper + return wrap + +def sleep_interval(start, interval, desc): + """ + Sleep until the `interval` seconds have elapsed since `start`. + + If the interval is already over, log a warning with `desc` at the start. + + :param float start: Time (in seconds since the Epoch) the current interval + started. + :param float interval: Length (in seconds) of an interval. + :param string desc: Description of the operation. + """ + now = time.time() + delay = start + interval - now + if delay < 0: + logging.warning("%s took too long: %.3fs (should have been <= %.3fs)", + desc, now - start, interval) + delay = 0 + time.sleep(delay) diff --git a/lib/zookeeper.py b/lib/zookeeper.py index a55b655414..83362cfb63 100644 --- a/lib/zookeeper.py +++ b/lib/zookeeper.py @@ -26,6 +26,7 @@ from kazoo.exceptions import (LockTimeout, SessionExpiredError, NoNodeError, ConnectionLoss) from lib.thread import (kill_self, thread_safety_net) +from lib.util import (timed) from lib.packet.pcb import PathSegment class ZkConnectionLoss(Exception): @@ -205,11 +206,11 @@ def is_connected(self): """ return self._connected.is_set() - def wait_connected(self): + def wait_connected(self, timeout=None): """ Wait until there is a connection to Zookeeper. """ - self._connected.wait() + return self._connected.wait(timeout=timeout) def join_party(self): """ @@ -265,37 +266,43 @@ def get_lock(self, timeout=60.0): """ if self._zk_lock is None: # First-time setup. - logging.debug("get_lock: init lock") + #logging.debug("get_lock: init lock") lock_path = os.path.join(self._prefix, "lock") self._zk_lock = self._zk.Lock(lock_path, self._srv_id) if not self.is_connected(): - self._lock.clear() - # Hack suggested by https://github.com/python-zk/kazoo/issues/2 - self._zk_lock.is_acquired = False - logging.debug("get_lock: not connected") + self.release_lock() + #logging.debug("get_lock: not connected") return False - if self._lock.is_set(): + elif self._lock.is_set(): # We already have the lock - logging.debug("get_lock: already have lock") + #logging.debug("get_lock: already have lock") return True - else: - # Hack suggested by https://github.com/python-zk/kazoo/issues/2 - self._zk_lock.is_acquired = False try: - logging.debug("get_lock: try acquire lock") + #logging.debug("get_lock: try acquire lock") if self._zk_lock.acquire(timeout=timeout): - logging.debug("get_lock: acquired lock") + #logging.debug("get_lock: acquired lock") self._lock.set() else: - logging.debug("get_lock: failed to acquire lock") + #logging.debug("get_lock: failed to acquire lock") + pass except (LockTimeout, ConnectionLoss, SessionExpiredError) as e: - logging.debug("get_lock: exception acquiring lock: %s", e) + #logging.debug("get_lock: exception acquiring lock: %s", e) pass - ret = self._have_lock() - logging.debug("get_lock: do we have the lock? %s", ret) + ret = self.have_lock() + #logging.debug("get_lock: do we have the lock? %s", ret) return ret - def _have_lock(self): + def release_lock(self): + self._lock.clear() + if self.is_connected(): + try: + self._zk_lock.release() + except (NoNodeError, ConnectionLoss, SessionExpiredError) as e: + pass + # Hack suggested by https://github.com/python-zk/kazoo/issues/2 + self._zk_lock.is_acquired = False + + def have_lock(self): """ Check if we currently hold the lock """ @@ -348,6 +355,7 @@ def get_shared_item(self, path, entry): raise ZkConnectionLoss return data + @timed(1.0) def get_shared_entries(self, path): """ List the items in a shared path. @@ -368,6 +376,7 @@ def get_shared_entries(self, path): raise ZkConnectionLoss return entries + @timed(1.0) def move_shared_items(self, src, dest): """ Move items from one shared path to another @@ -401,7 +410,6 @@ def move_shared_items(self, src, dest): trans.delete("%s/%s" % (src, entry)) moved += 1 trans.commit() - logging.debug("Moved %d entries", moved) # Second, delete all pre-existing dest entries deleted = 0 for i in range(0, len(dest_entries), chunk_size): @@ -410,7 +418,11 @@ def move_shared_items(self, src, dest): trans.delete("%s/%s" % (dest, entry)) deleted += 1 trans.commit() - logging.debug("Deleted %d entries", deleted) - except ConnectionLoss: + except (ConnectionLoss, SessionExpiredError): raise ZkConnectionLoss + except NoNodeError: + raise ZkNoNodeError + + logging.debug("Moved %d entries, deleted %d entries", moved, deleted) + return moved From 5be78454d2196eb453f7df08560cb7dbba835d4c Mon Sep 17 00:00:00 2001 From: Stephen Shirley Date: Fri, 24 Apr 2015 18:09:48 +0200 Subject: [PATCH 2/2] Handle session expiry more consistently --- lib/zookeeper.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/zookeeper.py b/lib/zookeeper.py index 83362cfb63..0518ad7c88 100644 --- a/lib/zookeeper.py +++ b/lib/zookeeper.py @@ -230,7 +230,7 @@ def join_party(self): self._party = self._zk.Party(party_path, self._srv_id) try: self._party.join() - except ConnectionLoss: + except (ConnectionLoss, SessionExpiredError): raise ZkConnectionLoss logging.debug("Joined party, members are: %s", list(self._party)) @@ -250,7 +250,7 @@ def watch_children(self, path, func): try: self._zk.exists(path) self._zk.ChildrenWatch(path, func=func, allow_session_lost=False) - except ConnectionLoss: + except (ConnectionLoss, SessionExpiredError): raise ZkConnectionLoss def get_lock(self, timeout=60.0): @@ -372,7 +372,7 @@ def get_shared_entries(self, path): path = os.path.join(self._prefix, path) try: entries = self._zk.get_children(path) - except ConnectionLoss: + except (ConnectionLoss, SessionExpiredError): raise ZkConnectionLoss return entries