diff --git a/README.rst b/README.rst index a98458d7..39c0d50b 100644 --- a/README.rst +++ b/README.rst @@ -693,10 +693,91 @@ The first example shows how to use a client capabilities to predict stock price * sources - `stock-predictions.ipynb `_ -The second example shows how to use a client capabilities to realtime visualization via `hvPlot `_, `Streamz `_: +The second example shows how to use a client capabilities to realtime visualization via `hvPlot `_, `Streamz `_, `RxPY `_: * sources - `realtime-stream.ipynb `_ +.. code:: python + + from datetime import timedelta + from typing import List + + import hvplot.streamz + import pandas as pd + import rx + from rx import operators as ops + + from streamz.dataframe import Random, DataFrame + from streamz import Stream + from influxdb_client import InfluxDBClient + +.. code:: python + + def source_data(auto_refresh: int, query: str, sink: Stream): + rx \ + .interval(period=timedelta(seconds=auto_refresh)) \ + .pipe(ops.map(lambda start: f'from(bucket: "my-bucket") ' + f'|> range(start: -{auto_refresh}s, stop: now()) ' + f'{query}')) \ + .pipe(ops.map(lambda query: client.query_api().query_data_frame(query, data_frame_index=['_time']))) \ + .pipe(ops.map(lambda data_frame: data_frame.drop(columns=['result', 'table']))) \ + .subscribe(observer=lambda data_frame: sink.emit(data_frame), on_error=lambda error: print(error)) + pass + +.. code:: python + + client = InfluxDBClient(url='http://localhost:9999', token='my-token', org='my-org') + +.. code:: python + + cpu_query = '|> filter(fn: (r) => r._measurement == "cpu") ' \ + '|> filter(fn: (r) => r._field == "usage_user") ' \ + '|> filter(fn: (r) => r.cpu == "cpu-total") ' \ + '|> keep(columns: ["_time", "_value"])' + + + cpu_sink = Stream() + cpu_example = pd.DataFrame({'_value': []}, columns=['_value']) + cpu_df = DataFrame(cpu_sink, example=cpu_example) + + source_data(auto_refresh=5, sink=cpu_sink, query=cpu_query) + +.. code:: python + + mem_query = '|> filter(fn: (r) => r._measurement == "mem") ' \ + '|> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "total" or r._field == "used") ' \ + '|> map(fn: (r) => ({ r with _value: r._value / 1024 / 1024 }))' \ + '|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \ + '|> keep(columns: ["_time", "used", "total", "free", "available"])' + + mem_sink = Stream() + mem_example = pd.DataFrame({'used': [], 'total': [], 'free': [], 'available': []}, columns=['available', 'free', 'total', 'used']) + mem_df = DataFrame(mem_sink, example=mem_example) + + source_data(auto_refresh=5, sink=mem_sink, query=mem_query) + +.. code:: python + + from bokeh.models.formatters import DatetimeTickFormatter + + # Time formatter + formatter = DatetimeTickFormatter( + microseconds = ["%H:%M:%S"], + milliseconds = ["%H:%M:%S"], + seconds = ["%H:%M:%S"], + minsec = ["%H:%M:%S"], + minutes = ["%H:%M:%S"], + hourmin = ["%H:%M:%S"], + hours=["%H:%M:%S"], + days=["%H:%M:%S"], + months=["%H:%M:%S"], + years=["%H:%M:%S"], + ) + + cpu_df.hvplot(width=450, backlog=50, title='CPU % usage', xlabel='Time', ylabel='%', xformatter=formatter) +\ + mem_df.hvplot.line(width=450, backlog=50, title='Memory', xlabel='Time', ylabel='MiB', xformatter=formatter, legend='top_left') + +.. image:: docs/images/realtime-result.gif Advanced Usage