Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage layer: initial marketstore tsdb support with async OHLCV history loading. #308

Merged
merged 105 commits into from
May 11, 2022

Conversation

goodboy
Copy link
Contributor

@goodboy goodboy commented May 8, 2022

Replaces #247 and #305 (merging history from both) and instead adds a more formal "storage layer" for retrieving and storing large ohlcv series from all major backends. The work here is best experienced with the new incremental update patchset from #302 in order to see the new graphics performance improvements at work though this should work moderately well as is.


TL;DR:

  • adds a marketstore docker container supervisor actor which allows spawning by passing pikerd --tsdb -> this closes Supervise the store #143
    • this command must be run as root, but the root perms will be dropped (in pikerd) shortly after marketstored (the container super) is started
  • adds working async with open_history_client() support to ib, binance, kraken which allows pulling in and storing a large amount of history in the tsdb, marketstore.
    • this API expects the async cntx mngr endpoint to deliver an awaitable that can be called with an input datetime range, new special exception signals can be used (NoData, DataUnavailable) to signal that the backend can't deliver data for a given range (or at all).
  • async "batch" fetching of OHLC history using trimeter in the backends that can handle it (currently only binance).
    • includes a history "frame generator" system which delivers datetime ranges that are passed to the trimeter request scheduler and which dynamically adjusts the request-time index when a gap is detected.
  • adds a piker.data.marketstore.Storage api/layer which allows async, high level operations. the intention is to eventually have this layer support more tsdb providers like arctic, techtonicdb. Currently the only backend is marketstore with client-side operations implemented using our anyio-marketstore library:
    • loading / reading existing ohlc time series by fqsn with appropriate request size limiting with .read_ohlcv()
    • writing ohlcv series by fqsn with appropriate limits with .write_ohlcv()
    • deleting time series entries via Storage.delete_ts()

What this does (yet) not introduce:

  • real-time ingest from tick feeds to marketstore
    • this was originally planned but the shorter path to get graphics downsampling methods up and running was to first start with OHLC history ingest and display
  • a full storesh interactive repl for managing the tsdb, there is a minimal ipython embedding at the moment but it is nowhere complete and we need a follow up task-issue to finish this.

TODO:

  • ._ahab.py supervisor:
    • probably deliver back net-socket info over the ctx.started() call?
      • grprc socket
      • ws addr
    • figure out what to do with the mkts.yaml config?
      • we could push a template from code to the user dir?
      • or should we just always gen it from python?
    • cli support for running pikerd with the tsdb stuff spawned?
      • maybe a --tsdb or --data or something?
    • step by step pikerd --tsdb test list:
      • pikerd --tsdb should raise DockerNotStarted and appropriate perms error on no sudo (for now)
      • ctrl-c should kill container instance
  • general marketstore config and operation:
    • should we always push newly received history from backends which is not yet in the tsdb to it?
      • we could also just offer this as a config option? eventually the UI should offer manual controls for such things..

to be done as #313

  • what docs should we offer regarding saving / deleting history?

follow up (to be written in new task-issues and implemented in coming PRs)

moved to #314

  • tick ingest support and an accompanying feed-style inter-actor API to pull feeds from ingestor re-broadcast system(s):
    • tick ingest to marketstore from brokerd feeds and experiment with techtonicdb schema (some tinkering was already done in this patchset by @guilledk but is unfinished).
    • tick-to-ohlcv sampling if it can be done with the aggregator plugin (got a feeling we'll need to write at least a 1Sec bucket in order for this to work looking at the code:

these moved to #312

  • we need a timeseries diffing and syncing system to validate new captured histories from real-time runs (which almost always are slightly different then the history provided by the providers dbs 🙄) as well as for catching history mis-writes / gaps which need to be edited / corrected when bugs
  • a REPL (with ipython) that allows interaction, edit, and general management of the tsdb for both the purposes of research and just plain old data mgmt.

@goodboy goodboy changed the base branch from master to marketstore May 8, 2022 16:43
@goodboy goodboy changed the base branch from marketstore to master May 8, 2022 18:03
@goodboy goodboy changed the title Storage layer Storage layer: initial marketstore tsdb support with async OHLCV history loading. May 8, 2022

tractor.run(main)
# @cli.command()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh yeah we can probably drop this.

the way i'd like to deal with "tsdb management" is the new storesh repl (with embedded ipython) which will have a small interactive API for manual db tinkering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually gonna leave it commented in here just in case we decide std cli cmds for this is handy.

@@ -254,61 +254,6 @@ def iterfqsns(self) -> list[str]:
return keys


def from_df(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh yeah right!

we can drop this (and maybe pandas altogether for now) since i rewrote the ib ohlc frame parser to just cast directly to numpy.

@goodboy
Copy link
Contributor Author

goodboy commented May 9, 2022

Ok so my plan forward on this is to try and wrap the final few TODOs and make issues/tasks for all the follow up stuff.

Bleh/:facepalm:, the ``end_dt`` in scope is not the "earliest" frame's
`end_dt` in the async response queue.. Parse the queue's latest epoch
and use **that** to compare to the last last pushed datetime index..

Add more detailed logging to help debug any (un)expected datetime index
gaps.
It seems once in a while a frame can get missed or dropped (at least
with binance?) so in those cases, when the request erlangs is already at
max, we just manually request the missing frame and presume things will
work out XD

Further, discard out of order frames that are "from the future" that
somehow end up in the async queue once in a while? Not sure why this
happens but it seems thus far just discarding them is nbd.
We return a copy (since since a view doesn't seem to work..) of the
(field filtered) shm array contents which is the same index-length as
the source data.

Further, fence off the resource tracker disable-hack into a helper
routine.
@goodboy
Copy link
Contributor Author

goodboy commented May 10, 2022

Pushed a few more piker.data._sharedmem changes from #302 which fixed some shm pushing / slicing edge cases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Supervise the store
2 participants