Skip to content

Commit

Permalink
fix message distribution to API processes
Browse files Browse the repository at this point in the history
  • Loading branch information
thomas-mangin committed Dec 12, 2014
1 parent a1d1c82 commit 5595b09
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 19 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ Version explained:
minor : increase on risk of code breakage during a major release
bug : increase on bug or incremental changes

Version 3.4.5
* Fix: improper distribution of events to process workers
reported by: Tim Epkes

Version 3.4.4
* Fix: bug with IPv4 / ipv6 handling
* Fix: better peer isolation when parsing messages
Expand Down
3 changes: 3 additions & 0 deletions lib/exabgp/configuration/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,9 @@ def _multi_process (self,scope,tokens):
name = tokens[0] if len(tokens) >= 1 else 'conf-only-%s' % str(time.time())[-6:]
self.process.setdefault(name,{})['neighbor'] = scope[-1]['peer-address'] if 'peer-address' in scope[-1] else '*'

for key in ['neighbor-changes', 'receive-refresh', 'receive-notifications', 'receive-parsed', 'receive-operational', 'receive-updates']:
self.process[name][key] = scope[-1].pop(key,False)

run = scope[-1].pop('process-run','')
if run:
if len(tokens) != 1:
Expand Down
46 changes: 27 additions & 19 deletions lib/exabgp/reactor/api/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ def __init__ (self,reactor):

def clean (self):
self._process = {}
self._api = {}
self._api_encoder = {}
self._encoder = {}
self._events= {}
self._neighbor_process = {}
self._broken = []
self._respawning = {}
Expand All @@ -79,7 +79,7 @@ def terminate (self):
for process in list(self._process):
if not self.silence:
try:
self.write(process,self._api_encoder[process].shutdown())
self.write(process,self._encoder[process].shutdown())
except ProcessError:
pass
self.silence = True
Expand All @@ -94,6 +94,12 @@ def terminate (self):
self.clean()

def _start (self,process):
events = self.reactor.configuration.process[process]
for event in events:
if event in ('run','encoder'):
continue
self._events.setdefault(process,[]).append(event)

try:
if process in self._process:
self.logger.processes("process already running")
Expand All @@ -109,7 +115,7 @@ def _start (self,process):
run = self.reactor.configuration.process[process].get('run','')
if run:
api = self.reactor.configuration.process[process]['encoder']
self._api_encoder[process] = JSON('3.4.0',self.highres) if api == 'json' else Text('3.3.2')
self._encoder[process] = JSON('3.4.0',self.highres) if api == 'json' else Text('3.3.2')

self._process[process] = subprocess.Popen(run,
stdin=subprocess.PIPE,
Expand Down Expand Up @@ -237,54 +243,56 @@ def _notify (self,peer,event):
neighbor = peer.neighbor.peer_address
for process in self._neighbor_process.get(neighbor,[]):
if process in self._process:
yield process
if event in self._events[process]:
yield process
for process in self._neighbor_process.get('*',[]):
if process in self._process:
yield process
if event in self._events[process]:
yield process

def reset (self,peer):
if self.silence: return
for process in self._notify(peer,'*'):
data = self._api_encoder[process].reset(peer)
data = self._encoder[process].reset(peer)
if data:
self.write(process,data,peer)

def increase (self,peer):
if self.silence: return
for process in self._notify(peer,'*'):
data = self._api_encoder[process].increase(peer)
data = self._encoder[process].increase(peer)
if data:
self.write(process,data,peer)

def up (self,peer):
if self.silence: return
for process in self._notify(peer,'neighbor-changes'):
self.write(process,self._api_encoder[process].up(peer),peer)
self.write(process,self._encoder[process].up(peer),peer)

def connected (self,peer):
if self.silence: return
for process in self._notify(peer,'neighbor-changes'):
self.write(process,self._api_encoder[process].connected(peer),peer)
self.write(process,self._encoder[process].connected(peer),peer)

def down (self,peer,reason):
if self.silence: return
for process in self._notify(peer,'neighbor-changes'):
self.write(process,self._api_encoder[process].down(peer,reason),peer)
self.write(process,self._encoder[process].down(peer,reason),peer)

def receive (self,peer,category,header,body):
if self.silence: return
for process in self._notify(peer,'receive-packets'):
self.write(process,self._api_encoder[process].receive(peer,category,header,body),peer)
self.write(process,self._encoder[process].receive(peer,category,header,body),peer)

def send (self,peer,category,header,body):
if self.silence: return
for process in self._notify(peer,'send-packets'):
self.write(process,self._api_encoder[process].send(peer,category,header,body),peer)
self.write(process,self._encoder[process].send(peer,category,header,body),peer)

def notification (self,peer,code,subcode,data):
if self.silence: return
for process in self._notify(peer,'neighbor-changes'):
self.write(process,self._api_encoder[process].notification(peer,code,subcode,data),peer)
self.write(process,self._encoder[process].notification(peer,code,subcode,data),peer)

def message (self,message_id,peer,message,header,*body):
self._dispatch[message_id](self,peer,message,header,*body)
Expand All @@ -303,28 +311,28 @@ def wrap (*args):
def _open (self,peer,open_msg,header,body,direction='received'):
if self.silence: return
for process in self._notify(peer,'receive-opens'):
self.write(process,self._api_encoder[process].open(peer,direction,open_msg,header,body),peer)
self.write(process,self._encoder[process].open(peer,direction,open_msg,header,body),peer)

@register_process(Message.ID.KEEPALIVE,_dispatch)
def _keepalive (self,peer,category,header,body):
if self.silence: return
for process in self._notify(peer,'receive-keepalives'):
self.write(process,self._api_encoder[process].keepalive(peer,header,body),peer)
self.write(process,self._encoder[process].keepalive(peer,header,body),peer)

@register_process(Message.ID.UPDATE,_dispatch)
def _update (self,peer,update,header,body):
if self.silence: return
for process in self._notify(peer,'receive-updates'):
self.write(process,self._api_encoder[process].update(peer,update,header,body),peer)
self.write(process,self._encoder[process].update(peer,update,header,body),peer)

@register_process(Message.ID.ROUTE_REFRESH,_dispatch)
def _refresh (self,peer,refresh,header,body):
if self.silence: return
for process in self._notify(peer,'receive-refresh'):
self.write(process,self._api_encoder[process].refresh(peer,refresh,header,body),peer)
self.write(process,self._encoder[process].refresh(peer,refresh,header,body),peer)

@register_process(Message.ID.OPERATIONAL,_dispatch)
def _operational (self,peer,operational,header,body):
if self.silence: return
for process in self._notify(peer,'receive-operational'):
self.write(process,self._api_encoder[process].operational(peer,operational.category,operational,header,body),peer)
self.write(process,self._encoder[process].operational(peer,operational.category,operational,header,body),peer)

0 comments on commit 5595b09

Please sign in to comment.