Skip to content

Commit

Permalink
Handle non-fqsn for derivs and don't put brokername in
Browse files Browse the repository at this point in the history
  • Loading branch information
goodboy committed Apr 11, 2022
1 parent d081348 commit 1aae686
Showing 1 changed file with 16 additions and 7 deletions.
23 changes: 16 additions & 7 deletions piker/data/feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,10 @@ def diff_history(

async def allocate_persistent_feed(
bus: _FeedsBus,

brokername: str,
symbol: str,

loglevel: str,
start_stream: bool = True,

Expand All @@ -398,6 +400,7 @@ async def allocate_persistent_feed(
- a real-time streaming task which connec
'''
# load backend module
try:
mod = get_brokermod(brokername)
except ImportError:
Expand Down Expand Up @@ -454,7 +457,10 @@ async def allocate_persistent_feed(
# true fqsn
fqsn = '.'.join((bfqsn, brokername))
# add a fqsn entry that includes the ``.<broker>`` suffix
# and an entry that includes the broker-specific fqsn (including
# any new suffixes or elements as injected by the backend).
init_msg[fqsn] = msg
init_msg[bfqsn] = msg

# TODO: pretty sure we don't need this? why not just leave 1s as
# the fastest "sample period" since we'll probably always want that
Expand All @@ -468,13 +474,14 @@ async def allocate_persistent_feed(
await some_data_ready.wait()

# append ``.<broker>`` suffix to each quote symbol
bsym = symbol + f'.{brokername}'
acceptable_not_fqsn_with_broker_suffix = symbol + f'.{brokername}'

generic_first_quotes = {
bsym: first_quote,
acceptable_not_fqsn_with_broker_suffix: first_quote,
fqsn: first_quote,
}

bus.feeds[symbol] = bus.feeds[fqsn] = (
bus.feeds[symbol] = bus.feeds[bfqsn] = (
init_msg,
generic_first_quotes,
)
Expand Down Expand Up @@ -525,7 +532,7 @@ async def open_feed_bus(

ctx: tractor.Context,
brokername: str,
symbol: str,
symbol: str, # normally expected to the broker-specific fqsn
loglevel: str,
tick_throttle: Optional[float] = None,
start_stream: bool = True,
Expand All @@ -547,7 +554,9 @@ async def open_feed_bus(
# TODO: check for any stale shm entries for this symbol
# (after we also group them in a nice `/dev/shm/piker/` subdir).
# ensure we are who we think we are
assert 'brokerd' in tractor.current_actor().name
servicename = tractor.current_actor().name
assert 'brokerd' in servicename
assert brokername in servicename

bus = get_feed_bus(brokername)

Expand All @@ -557,7 +566,7 @@ async def open_feed_bus(
entry = bus.feeds.get(symbol)
if entry is None:
# allocate a new actor-local stream bus which
# will persist for this `brokerd`.
# will persist for this `brokerd`'s service lifetime.
async with bus.task_lock:
await bus.nursery.start(
partial(
Expand Down Expand Up @@ -586,7 +595,7 @@ async def open_feed_bus(
# true fqsn
fqsn = '.'.join([bfqsn, brokername])
assert fqsn in first_quotes
assert bus.feeds[fqsn]
assert bus.feeds[bfqsn]

# broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib)
bsym = symbol + f'.{brokername}'
Expand Down

0 comments on commit 1aae686

Please sign in to comment.