Skip to content

Commit

Permalink
Merge pull request #252 from pikers/fspd_cluster
Browse files Browse the repository at this point in the history
Fspd cluster
  • Loading branch information
goodboy authored Jan 25, 2022
2 parents 1c85efc + 119ff0e commit 11544dc
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 250 deletions.
128 changes: 5 additions & 123 deletions piker/_cacheables.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,18 @@
Cacheing apis and toolz.
"""
# further examples of interest:
# https://gist.github.com/njsmith/cf6fc0a97f53865f2c671659c88c1798#file-cache-py-L8

from collections import OrderedDict
from typing import (
Any,
Hashable,
Optional,
TypeVar,
AsyncContextManager,
)
from contextlib import (
asynccontextmanager,
)

import trio
from trio_typing import TaskStatus
import tractor
from tractor.trionics import maybe_open_context

from .brokers import get_brokermod
from .log import get_logger


T = TypeVar('T')
log = get_logger(__name__)


Expand Down Expand Up @@ -74,124 +62,18 @@ async def wrapper(*args):
return decorator


_cache: dict[str, 'Client'] = {} # noqa


class cache:
'''Globally (processs wide) cached, task access to a
kept-alive-while-in-use async resource.
'''
lock = trio.Lock()
users: int = 0
values: dict[Any, Any] = {}
resources: dict[
int,
Optional[tuple[trio.Nursery, trio.Event]]
] = {}
no_more_users: Optional[trio.Event] = None

@classmethod
async def run_ctx(
cls,
mng,
key,
task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED,

) -> None:
async with mng as value:

_, no_more_users = cls.resources[id(mng)]
cls.values[key] = value
task_status.started(value)
try:
await no_more_users.wait()
finally:
value = cls.values.pop(key)
# discard nursery ref so it won't be re-used (an error)
cls.resources.pop(id(mng))


@asynccontextmanager
async def maybe_open_ctx(

key: Hashable,
mngr: AsyncContextManager[T],

) -> (bool, T):
'''Maybe open a context manager if there is not already a cached
version for the provided ``key``. Return the cached instance on
a cache hit.
'''

await cache.lock.acquire()

ctx_key = id(mngr)

value = None
try:
# lock feed acquisition around task racing / ``trio``'s
# scheduler protocol
value = cache.values[key]
log.info(f'Reusing cached resource for {key}')
cache.users += 1
cache.lock.release()
yield True, value

except KeyError:
log.info(f'Allocating new resource for {key}')

# **critical section** that should prevent other tasks from
# checking the cache until complete otherwise the scheduler
# may switch and by accident we create more then one feed.

# TODO: avoid pulling from ``tractor`` internals and
# instead offer a "root nursery" in piker actors?
service_n = tractor.current_actor()._service_n

# TODO: does this need to be a tractor "root nursery"?
ln = cache.resources.get(ctx_key)
assert not ln

ln, _ = cache.resources[ctx_key] = (service_n, trio.Event())

value = await ln.start(cache.run_ctx, mngr, key)
cache.users += 1
cache.lock.release()

yield False, value

finally:
cache.users -= 1

if cache.lock.locked():
cache.lock.release()

if value is not None:
# if no more consumers, teardown the client
if cache.users <= 0:
log.warning(f'De-allocating resource for {key}')

# terminate mngr nursery
entry = cache.resources.get(ctx_key)
if entry:
_, no_more_users = entry
no_more_users.set()


@asynccontextmanager
async def open_cached_client(
brokername: str,
) -> 'Client': # noqa
'''Get a cached broker client from the current actor's local vars.
'''
Get a cached broker client from the current actor's local vars.
If one has not been setup do it and cache it.
'''
brokermod = get_brokermod(brokername)
async with maybe_open_ctx(
key=brokername,
mngr=brokermod.get_client(),
async with maybe_open_context(
acm_func=brokermod.get_client,
) as (cache_hit, client):
yield client
3 changes: 1 addition & 2 deletions piker/data/_sharedmem.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,8 @@ def push(
return end

except ValueError as err:
# shoudl raise if diff detected
# should raise if diff detected
self.diff_err_fields(data)

raise err

def diff_err_fields(
Expand Down
43 changes: 27 additions & 16 deletions piker/data/feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from typing import (
Any, Sequence,
AsyncIterator, Optional,
Awaitable, Callable,
)

import trio
Expand All @@ -37,7 +36,7 @@
from pydantic import BaseModel

from ..brokers import get_brokermod
from .._cacheables import maybe_open_ctx
from .._cacheables import maybe_open_context
from ..log import get_logger, get_console_log
from .._daemon import (
maybe_spawn_brokerd,
Expand Down Expand Up @@ -356,7 +355,10 @@ async def open_feed_bus(
f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}')
if tick_throttle:
n.cancel_scope.cancel()
bus._subscribers[symbol].remove(sub)
try:
bus._subscribers[symbol].remove(sub)
except ValueError:
log.warning(f'{sub} for {symbol} was already removed?')


@asynccontextmanager
Expand All @@ -368,12 +370,13 @@ async def open_sample_step_stream(
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
# created for all practical purposes
async with maybe_open_ctx(
key=delay_s,
mngr=portal.open_stream_from(
async with maybe_open_context(
acm_func=partial(
portal.open_stream_from,
iter_ohlc_periods,
delay_s=delay_s, # must be kwarg
),

kwargs={'delay_s': delay_s},
) as (cache_hit, istream):
if cache_hit:
# add a new broadcast subscription for the quote stream
Expand Down Expand Up @@ -520,7 +523,12 @@ async def open_feed(

) as (ctx, (init_msg, first_quotes)),

ctx.open_stream() as stream,
ctx.open_stream(
# XXX: be explicit about stream backpressure since we should
# **never** overrun on feeds being too fast, which will
# pretty much always happen with HFT XD
backpressure=True
) as stream,

):
# we can only read from shm
Expand Down Expand Up @@ -566,6 +574,7 @@ async def open_feed(

feed._max_sample_rate = max(ohlc_sample_rates)

# yield feed
try:
yield feed
finally:
Expand All @@ -590,17 +599,19 @@ async def maybe_open_feed(
'''
sym = symbols[0].lower()

async with maybe_open_ctx(
key=(brokername, sym),
mngr=open_feed(
brokername,
[sym],
loglevel=loglevel,
**kwargs,
),
async with maybe_open_context(
acm_func=open_feed,
kwargs={
'brokername': brokername,
'symbols': [sym],
'loglevel': loglevel,
'tick_throttle': kwargs.get('tick_throttle'),
},
key=sym,
) as (cache_hit, feed):

if cache_hit:
print('USING CACHED FEED')
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use
async with feed.stream.subscribe() as bstream:
Expand Down
5 changes: 4 additions & 1 deletion piker/fsp/_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,13 @@ async def fsp_compute(
profiler(f'{func_name} pushed history')
profiler.finish()

# TODO: UGH, what is the right way to do something like this?
if not ctx._started_called:
await ctx.started(index)

# setup a respawn handle
with trio.CancelScope() as cs:
tracker = TaskTracker(trio.Event(), cs)
await ctx.started(index)
task_status.started((tracker, index))
profiler(f'{func_name} yield last index')

Expand Down
80 changes: 80 additions & 0 deletions piker/trionics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship of piker0)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

'''
sugarz for trio/tractor conc peeps.
'''
from typing import AsyncContextManager
from typing import TypeVar
from contextlib import asynccontextmanager as acm

import trio


# A regular invariant generic type
T = TypeVar("T")


async def _enter_and_sleep(

mngr: AsyncContextManager[T],
to_yield: dict[int, T],
all_entered: trio.Event,
# task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED,

) -> T:
'''Open the async context manager deliver it's value
to this task's spawner and sleep until cancelled.
'''
async with mngr as value:
to_yield[id(mngr)] = value

if all(to_yield.values()):
all_entered.set()

# sleep until cancelled
await trio.sleep_forever()


@acm
async def async_enter_all(

*mngrs: list[AsyncContextManager[T]],

) -> tuple[T]:

to_yield = {}.fromkeys(id(mngr) for mngr in mngrs)

all_entered = trio.Event()

async with trio.open_nursery() as n:
for mngr in mngrs:
n.start_soon(
_enter_and_sleep,
mngr,
to_yield,
all_entered,
)

# deliver control once all managers have started up
await all_entered.wait()
yield tuple(to_yield.values())

# tear down all sleeper tasks thus triggering individual
# mngr ``__aexit__()``s.
n.cancel_scope.cancel()
7 changes: 4 additions & 3 deletions piker/ui/_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,11 @@ def _main(
piker_loglevel: str,
tractor_kwargs,
) -> None:
"""Sync entry point to start a chart app.
'''
Sync entry point to start a chart: a ``tractor`` + Qt runtime
entry point
"""
# ``tractor`` + Qt runtime entry point
'''
run_qtractor(
func=_async_main,
args=(sym, brokernames, piker_loglevel),
Expand Down
Loading

0 comments on commit 11544dc

Please sign in to comment.