Skip to content

Commit

Permalink
feat: realtime jupyter example (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar committed Nov 13, 2019
1 parent 2af15b7 commit 0bdbc86
Showing 1 changed file with 82 additions and 1 deletion.
83 changes: 82 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -693,10 +693,91 @@ The first example shows how to use a client capabilities to predict stock price

* sources - `stock-predictions.ipynb <https://github.com/influxdata/influxdb-client-python/blob/master/notebooks/stock-predictions.ipynb>`_

The second example shows how to use a client capabilities to realtime visualization via `hvPlot <https://hvplot.pyviz.org>`_, `Streamz <https://streamz.readthedocs.io/en/latest/>`_:
The second example shows how to use a client capabilities to realtime visualization via `hvPlot <https://hvplot.pyviz.org>`_, `Streamz <https://streamz.readthedocs.io/en/latest/>`_, `RxPY <https://rxpy.readthedocs.io/en/latest/>`_:

* sources - `realtime-stream.ipynb <https://github.com/influxdata/influxdb-client-python/blob/master/notebooks/realtime-stream.ipynb>`_

.. code:: python
from datetime import timedelta
from typing import List
import hvplot.streamz
import pandas as pd
import rx
from rx import operators as ops
from streamz.dataframe import Random, DataFrame
from streamz import Stream
from influxdb_client import InfluxDBClient
.. code:: python
def source_data(auto_refresh: int, query: str, sink: Stream):
rx \
.interval(period=timedelta(seconds=auto_refresh)) \
.pipe(ops.map(lambda start: f'from(bucket: "my-bucket") '
f'|> range(start: -{auto_refresh}s, stop: now()) '
f'{query}')) \
.pipe(ops.map(lambda query: client.query_api().query_data_frame(query, data_frame_index=['_time']))) \
.pipe(ops.map(lambda data_frame: data_frame.drop(columns=['result', 'table']))) \
.subscribe(observer=lambda data_frame: sink.emit(data_frame), on_error=lambda error: print(error))
pass
.. code:: python
client = InfluxDBClient(url='http://localhost:9999', token='my-token', org='my-org')
.. code:: python
cpu_query = '|> filter(fn: (r) => r._measurement == "cpu") ' \
'|> filter(fn: (r) => r._field == "usage_user") ' \
'|> filter(fn: (r) => r.cpu == "cpu-total") ' \
'|> keep(columns: ["_time", "_value"])'
cpu_sink = Stream()
cpu_example = pd.DataFrame({'_value': []}, columns=['_value'])
cpu_df = DataFrame(cpu_sink, example=cpu_example)
source_data(auto_refresh=5, sink=cpu_sink, query=cpu_query)
.. code:: python
mem_query = '|> filter(fn: (r) => r._measurement == "mem") ' \
'|> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "total" or r._field == "used") ' \
'|> map(fn: (r) => ({ r with _value: r._value / 1024 / 1024 }))' \
'|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \
'|> keep(columns: ["_time", "used", "total", "free", "available"])'
mem_sink = Stream()
mem_example = pd.DataFrame({'used': [], 'total': [], 'free': [], 'available': []}, columns=['available', 'free', 'total', 'used'])
mem_df = DataFrame(mem_sink, example=mem_example)
source_data(auto_refresh=5, sink=mem_sink, query=mem_query)
.. code:: python
from bokeh.models.formatters import DatetimeTickFormatter
# Time formatter
formatter = DatetimeTickFormatter(
microseconds = ["%H:%M:%S"],
milliseconds = ["%H:%M:%S"],
seconds = ["%H:%M:%S"],
minsec = ["%H:%M:%S"],
minutes = ["%H:%M:%S"],
hourmin = ["%H:%M:%S"],
hours=["%H:%M:%S"],
days=["%H:%M:%S"],
months=["%H:%M:%S"],
years=["%H:%M:%S"],
)
cpu_df.hvplot(width=450, backlog=50, title='CPU % usage', xlabel='Time', ylabel='%', xformatter=formatter) +\
mem_df.hvplot.line(width=450, backlog=50, title='Memory', xlabel='Time', ylabel='MiB', xformatter=formatter, legend='top_left')
.. image:: docs/images/realtime-result.gif


Advanced Usage
Expand Down

0 comments on commit 0bdbc86

Please sign in to comment.