Skip to content

Commit

Permalink
Fix a nested nexus action in stormservice deletion (#1876)
Browse files Browse the repository at this point in the history
* For service deletion, spin the del event storm commmand off in a schedcoro so it is out of line with the bottom half of the nexus function. This changes the del event semantics by executing a del handler without the presence of the supporting package, but breaks a nested nexus loop action.

* Ensure that cull is passed in the coreQueueGets call, since it was missed

* Temporarily pin nbconvert until jupyter/nbconvert#1369 is resolved.
  • Loading branch information
vEpiphyte authored Sep 10, 2020
1 parent 01eb0dd commit bec60e7
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 13 deletions.
1 change: 1 addition & 0 deletions requirements_doc.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
-r requirements_dev.txt
nbconvert==5.6.1
sphinx>=1.8.2,<2.0.0
jupyter>=1.0.0,<2.0.0
hide-code>=0.5.2,<0.5.3
Expand Down
10 changes: 9 additions & 1 deletion synapse/cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -1568,6 +1568,8 @@ async def _runStormSvcAdd(self, iden):
await self.svchive.set(iden, sdef)

async def runStormSvcEvent(self, iden, name):
assert name in ('add', 'del')

sdef = self.svchive.get(iden)
if sdef is None:
mesg = f'No storm service with iden: {iden}'
Expand All @@ -1576,7 +1578,13 @@ async def runStormSvcEvent(self, iden, name):
evnt = sdef.get('evts', {}).get(name, {}).get('storm')
if evnt is None:
return
await s_common.aspin(self.storm(evnt, opts={'vars': {'cmdconf': {'svciden': iden}}}))

opts = {'vars': {'cmdconf': {'svciden': iden}}}
coro = s_common.aspin(self.storm(evnt, opts=opts))
if name == 'add':
await coro
else:
self.schedCoro(coro)

async def _setStormSvc(self, sdef):

Expand Down
4 changes: 2 additions & 2 deletions synapse/lib/stormtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1444,13 +1444,13 @@ async def _methQueueGets(self, offs=0, wait=True, cull=False, size=None):
if size is not None:
size = await toint(size)

todo = s_common.todo('coreQueueGets', self.name, offs, wait=wait, size=size)
todo = s_common.todo('coreQueueGets', self.name, offs, cull=cull, wait=wait, size=size)
gatekeys = self._getGateKeys('get')

async for item in self.runt.dyniter('cortex', todo, gatekeys=gatekeys):
yield item

async def _methQueuePuts(self, items, wait=False):
async def _methQueuePuts(self, items):
todo = s_common.todo('coreQueuePuts', self.name, items)
gatekeys = self._getGateKeys('put')
return await self.runt.dyncall('cortex', todo, gatekeys=gatekeys)
Expand Down
21 changes: 11 additions & 10 deletions synapse/tests/test_lib_stormsvc.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class RealService(s_stormsvc.StormSvc):
'storm': '$lib.queue.add(vertex)',
},
'del': {
'storm': '$lib.queue.del(vertex)',
'storm': '$que=$lib.queue.get(vertex) $que.put(done)',
},
}

Expand Down Expand Up @@ -611,9 +611,10 @@ async def test_storm_svcs(self):
# make sure stormcmd got deleted
self.none(core.getStormCmd('ohhai'))

# ensure fini ran
queue = core.multiqueue.list()
self.len(0, queue)
# ensure del event ran
q = 'for ($o, $m) in $lib.queue.get(vertex).gets(wait=10) {return (($o, $m))}'
retn = await core.callStorm(q)
self.eq(retn, (0, 'done'))

# specifically call teardown
for svc in core.getStormSvcs():
Expand Down Expand Up @@ -833,11 +834,11 @@ async def test_storm_svc_mirror(self):

# Make sure it got removed from both
self.none(core00.getStormCmd('ohhai'))
queue = core00.multiqueue.list()
self.len(0, queue)
self.notin('foo.bar', core00.stormmods)
q = 'for ($o, $m) in $lib.queue.get(vertex).gets(wait=10) {return (($o, $m))}'
retn = await core00.callStorm(q)
self.eq(retn, (0, 'done'))

self.none(core01.getStormCmd('ohhai'))
queue = core01.multiqueue.list()
self.len(0, queue)
self.notin('foo.bar', core01.stormmods)
q = 'for ($o, $m) in $lib.queue.get(vertex).gets(wait=10) {return (($o, $m))}'
retn = await core01.callStorm(q)
self.eq(retn, (0, 'done'))

0 comments on commit bec60e7

Please sign in to comment.