Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix minor formatting issues, typos, and unused imports. #82

Merged
merged 1 commit into from
Apr 27, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 23 additions & 25 deletions infrastructure/beacon_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,10 @@ def __init__(self, addr, topo_file, config_file, path_policy_file):
self._state_synced = threading.Event()
# TODO(kormat): def zookeeper host/port in topology
self.zk = Zookeeper(
self.topology.isd_id, self.topology.ad_id,
"bs", self.addr.host_addr, ["localhost:2181"],
ensure_paths=(
self.ZK_INCOMING_PATH,
self.ZK_RECENT_PATH)
)
self.topology.isd_id, self.topology.ad_id,
"bs", self.addr.host_addr, ["localhost:2181"],
ensure_paths=(self.ZK_INCOMING_PATH,
self.ZK_RECENT_PATH))

def _get_if_rev_token(self, if_id):
"""
Expand Down Expand Up @@ -423,8 +421,8 @@ def read_shared_pcbs(self):
if not self._state_synced.is_set():
# Register that we can now accept and store PCBs in ZK
self.zk.join_party()
# Prime our path list with the previous propagation period's
# set of PCBs
# Prime our path list with the previous propagation
# period's set of PCBs
self._read_shared_entries(self.ZK_RECENT_PATH, "recent")
# Clear our cache of previously-seen shared entries
self._seen_entries.clear()
Expand All @@ -439,13 +437,13 @@ def _read_shared_entries(self, path, name):
"""
desc = "Fetching list of %s PCBs from shared path" % name
entries = self.zk.get_shared_entries(
path,
timed_desc=desc)
path,
timed_desc=desc)
desc = "Processing %s PCBs from shared path" % name
count = self._process_shared_pcbs(
path,
entries,
timed_desc=desc)
path,
entries,
timed_desc=desc)
return count

@timed(1.0)
Expand Down Expand Up @@ -483,7 +481,7 @@ def _process_shared_pcbs(self, path, entries):
break
except ZkNoNodeError:
logging.debug("Unable to retrieve PCB from shared path: "
"no such entry (%s/%s)" % (path, entry))
"no such entry (%s/%s)" % (path, entry))
continue
pcbs.append(PathSegment(raw=raw))
self.process_pcbs(pcbs)
Expand All @@ -498,9 +496,9 @@ def _move_shared_pcbs(self):
return False
try:
self.zk.move_shared_items(
self.ZK_INCOMING_PATH,
self.ZK_RECENT_PATH,
timed_desc="Moving PCBs from 'incoming' to 'recent'")
self.ZK_INCOMING_PATH,
self.ZK_RECENT_PATH,
timed_desc="Moving PCBs from 'incoming' to 'recent'")
except ZkConnectionLoss:
logging.warning("Connection dropped while moving shared items")
return False
Expand All @@ -523,7 +521,7 @@ def __init__(self, addr, topo_file, config_file, path_policy_file):
path_policy_file)
# Sanity check that we should indeed be a core beacon server.
assert self.topology.is_core_ad, "This shouldn't be a core BS!"
self.beacons = deque() # FIXME: Discuss with Lorenzo
self.beacons = deque() # FIXME: Discuss with Lorenzo
self.core_segments = deque() # FIXME: ditto

def propagate_core_pcb(self, pcb):
Expand Down Expand Up @@ -571,14 +569,14 @@ def handle_pcbs_propagation(self):
# Create beacon for downstream ADs.
downstream_pcb = PathSegment()
timestamp = int(time.time())
downstream_pcb.iof = InfoOpaqueField.from_values(OFT.TDC_XOVR,
False, timestamp, self.topology.isd_id)
downstream_pcb.iof = InfoOpaqueField.from_values(
OFT.TDC_XOVR, False, timestamp, self.topology.isd_id)
downstream_pcb.trcf = TRCField()
self.propagate_downstream_pcb(downstream_pcb)
# Create beacon for core ADs.
core_pcb = PathSegment()
core_pcb.iof = InfoOpaqueField.from_values(OFT.TDC_XOVR, False,
timestamp, self.topology.isd_id)
core_pcb.iof = InfoOpaqueField.from_values(
OFT.TDC_XOVR, False, timestamp, self.topology.isd_id)
core_pcb.trcf = TRCField()
count = self.propagate_core_pcb(core_pcb)
# Propagate received beacons. A core beacon server can only receive
Expand All @@ -603,7 +601,7 @@ def register_segments(self):
while True:
lock = self.zk.have_lock()
if not lock:
logging.debug("register_segements: waiting for lock")
logging.debug("register_segments: waiting for lock")
self.zk.wait_lock()
if not lock:
logging.debug("register_segments: have lock")
Expand Down Expand Up @@ -645,7 +643,7 @@ def process_pcbs(self, pcbs):
# that it hasn't been received before.
for ad in pcb.ads:
if (ad.pcbm.spcbf.isd_id == self.topology.isd_id and
ad.pcbm.ad_id == self.topology.ad_id):
ad.pcbm.ad_id == self.topology.ad_id):
count += 1
continue
self._try_to_verify_beacon(pcb)
Expand Down Expand Up @@ -912,7 +910,7 @@ def handle_pcbs_propagation(self):
"""
Main loop to propagate received beacons.
"""
# TODO: define function that dispaches the pcbs among the interfaces
# TODO: define function that dispatches the pcbs among the interfaces
master = False
while True:
# Wait until we have enough context to be a useful master
Expand Down
14 changes: 7 additions & 7 deletions lib/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,25 @@
import os.path
import logging
import threading
import time

from kazoo.client import (KazooClient, KazooState, KazooRetry)
from kazoo.handlers.threading import TimeoutError
from kazoo.exceptions import (LockTimeout, SessionExpiredError,
NoNodeError, ConnectionLoss)
from lib.thread import (kill_self, thread_safety_net)
from lib.util import (timed)
from lib.packet.pcb import PathSegment


class ZkConnectionLoss(Exception):
"""Connection to Zookeeper is lost"""
pass


class ZkNoNodeError(Exception):
"""A node doesn't exist"""
pass


class Zookeeper(object):
"""
A wrapper class for Zookeeper interfacing, using the `Kazoo python library
Expand Down Expand Up @@ -65,8 +66,8 @@ def __init__(self, isd_id, ad_id, srv_name, srv_id,

:param int isd_id: The ID of the current ISD.
:param int ad_id: The ID of the current AD.
:param str srv_name: Short description of the service. E.g. ``"bs"`` for
Beacon server.
:param str srv_name: Short description of the service. E.g. ``"bs"``
for Beacon server.
:param str srv_id: The ID of the service. E.g. host the service is
running on.
:param list zk_hosts: List of Zookeeper instances to connect to, in the
Expand Down Expand Up @@ -124,7 +125,7 @@ def __init__(self, isd_id, ad_id, srv_name, srv_id,
self._zk.start()
except TimeoutError:
logging.critical(
"Timed out connecting to Zookeeper on startup, exiting")
"Timed out connecting to Zookeeper on startup, exiting")
kill_self()

def _state_listener(self, new_state):
Expand Down Expand Up @@ -297,7 +298,7 @@ def release_lock(self):
if self.is_connected():
try:
self._zk_lock.release()
except (NoNodeError, ConnectionLoss, SessionExpiredError) as e:
except (NoNodeError, ConnectionLoss, SessionExpiredError):
pass
# Hack suggested by https://github.com/python-zk/kazoo/issues/2
self._zk_lock.is_acquired = False
Expand Down Expand Up @@ -425,4 +426,3 @@ def move_shared_items(self, src, dest):

logging.debug("Moved %d entries, deleted %d entries", moved, deleted)
return moved