Skip to content

Commit

Permalink
Demo refactor updates (#1658)
Browse files Browse the repository at this point in the history
* Update first notebook

* Updates

* Fix all examples in remaining notebooks

* Quick

* Back to int

* Apply suggestions from code review

Co-authored-by: Chip Kent <5250374+chipkent@users.noreply.github.com>

* Apply suggestions from code review

Change combo_agg to as_list

Co-authored-by: Chip Kent <5250374+chipkent@users.noreply.github.com>

Co-authored-by: Chip Kent <5250374+chipkent@users.noreply.github.com>
  • Loading branch information
jjbrosnan and chipkent authored Dec 13, 2021
1 parent eb1d065 commit b030989
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 35 deletions.
2 changes: 1 addition & 1 deletion demo/web/src/main/notebooks/00 The Deephaven IDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ static_table = newTable(
```python
from deephaven.TableTools import timeTable
import random
updating_table = timeTable('00:00:00.400').updateView("Row = i", "Some_Int = random.randint(0,100)").reverse()
updating_table = timeTable('00:00:00.400').updateView("Row = i", "Some_Int = (int)random.randint(0,100)").reverse()
```


Expand Down
31 changes: 16 additions & 15 deletions demo/web/src/main/notebooks/01 Tables, Updates, and the Engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ You can quickly see streaming data in a UI and do table operations, interactivel
For example, you can listen to a Kafka stream of cryptocurrency trades sourced from their native exchanges (like the ones below, built using the [XChange library](https://github.com/knowm/XChange)).

```python
from deephaven import KafkaTools as kt
from deephaven import ConsumeKafka as ck

def get_trades_stream():
return kt.consumeToTable(
return ck.consumeToTable(
{ 'bootstrap.servers' : 'demo-kafka.c.deephaven-oss.internal:9092',
'schema.registry.url' : 'http://demo-kafka.c.deephaven-oss.internal:8081' },
'io.deephaven.crypto.kafka.TradesTopic',
key = kt.IGNORE,
value = kt.avro('io.deephaven.crypto.kafka.TradesTopic-io.deephaven.crypto.Trade'),
offsets=kt.ALL_PARTITIONS_SEEK_TO_END,
key = ck.IGNORE,
value = ck.avro('io.deephaven.crypto.kafka.TradesTopic-io.deephaven.crypto.Trade'),
offsets=ck.ALL_PARTITIONS_SEEK_TO_END,
table_type='append')

trades_stream = get_trades_stream()
Expand All @@ -29,9 +29,6 @@ trades_stream = get_trades_stream()
To keep the most recent ticks within view, you could sort the table descending by timestamp. Alternatively, you can reverse the table.

```python
# Not doing this:
# t = t.sortDescending("Timestamp")

trades_stream = trades_stream.reverse()
```
\
Expand Down Expand Up @@ -83,14 +80,18 @@ row_count_by_instrument = trades_stream_cleaner.countBy("Tot_Rows", "Instrument"
Counts are informative, but often you'll be interested in other aggregations. The script below shows both how to [bin data by time](https://deephaven.io/core/docs/reference/cheat-sheets/datetime-cheat-sheet/#downsampling-temporal-data-via-time-binning) and to [do multiple aggregations](https://deephaven.io/core/docs/how-to-guides/combined-aggregations/).

```python
from deephaven import ComboAggregateFactory as caf
from deephaven import Aggregation as agg, as_list

agg_list = as_list([
agg.AggCount("Trade_Count"),
agg.AggSum("Total_Size = Size"),
agg.AggAvg("Avg_Size = Size", "Avg_Price = Price"),
agg.AggMin("Low_Price = Price"),
agg.AggMax("High_Price = Price")
])

multi_agg = trades_stream_cleaner.updateView("TimeBin = upperBin(KafkaTimestamp, MINUTE)")\
.by(caf.AggCombo(
caf.AggCount("Trade_Count"),
caf.AggSum("Total_Size = Size"),
caf.AggAvg("Avg_Size = Size", "Avg_Price = Price"),
caf.AggMin("Low_Price = Price"),
caf.AggMax("High_Price = Price")),"TimeBin", "Instrument")\
.aggBy(agg_list, "TimeBin", "Instrument")\
.sortDescending("TimeBin", "Trade_Count")\
.formatColumnWhere("Instrument", "Instrument = `BTC/USD`", "CYAN")
```
Expand Down
33 changes: 22 additions & 11 deletions demo/web/src/main/notebooks/02 Stream and Batch Together.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ Below you’ll do calculations and aggregations on stream and batch data using i
First, hook up a Kafka stream. (This is the same script from the first notebook.) Our [how-to guide](https://deephaven.io/core/docs/how-to-guides/kafka-stream/) provides detail on the integration.

```python
from deephaven import KafkaTools as kt
from deephaven import ConsumeKafka as ck

def get_trades_stream():
return kt.consumeToTable(
return ck.consumeToTable(
{ 'bootstrap.servers' : 'demo-kafka.c.deephaven-oss.internal:9092',
'schema.registry.url' : 'http://demo-kafka.c.deephaven-oss.internal:8081' },
'io.deephaven.crypto.kafka.TradesTopic',
key = kt.IGNORE,
value = kt.avro('io.deephaven.crypto.kafka.TradesTopic-io.deephaven.crypto.Trade'),
offsets=kt.ALL_PARTITIONS_SEEK_TO_END,
key = ck.IGNORE,
value = ck.avro('io.deephaven.crypto.kafka.TradesTopic-io.deephaven.crypto.Trade'),
offsets=ck.ALL_PARTITIONS_SEEK_TO_END,
table_type='append')

trades_stream = get_trades_stream()
Expand Down Expand Up @@ -64,8 +64,8 @@ Let's return to our crypto data.
Read in a CSV of batch crypto data sourced on 09/22/2021.

```python
from deephaven.TableTools import readCsv
trades_batch_view = readCsv("/data/large/crypto/CryptoTrades_20210922.csv")
from deephaven import read_csv
trades_batch_view = read_csv("/data/large/crypto/CryptoTrades_20210922.csv")
```

\
Expand Down Expand Up @@ -93,15 +93,26 @@ The following scripts will demonstrate much the same with two examples:

```python
# the table decoration
from deephaven.DBTimeUtils import formatDate
from deephaven.DateTimeUtils import formatDate

add_column_streaming = trades_stream_view.updateView("Date = formatDate(KafkaTimestamp, TZ_NY)")
add_column_batch = trades_batch_view .updateView("Date = formatDate(Timestamp, TZ_NY)")

# the table aggregation
from deephaven import ComboAggregateFactory as caf
agg_streaming = add_column_streaming.by(caf.AggCombo(caf.AggFirst("Price"), caf.AggAvg("Avg_Price = Price")), "Date", "Exchange", "Instrument")
agg_batch = add_column_batch .by(caf.AggCombo(caf.AggFirst("Price"), caf.AggAvg("Avg_Price = Price")), "Date", "Exchange", "Instrument")
from deephaven import Aggregation as agg, as_list

agg_list = as_list([
agg.AggFirst("Price"),
agg.AggAvg("Avg_Price = Price"),
])

agg_streaming = add_column_streaming.aggBy(
agg_list, "Date", "Exchange", "Instrument"
)

agg_batch = add_column_batch.aggBy(
agg_list, "Date", "Exchange", "Instrument"
)
```

\
Expand Down
20 changes: 12 additions & 8 deletions demo/web/src/main/notebooks/03 Kafka Stream vs Append.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Start by importing some requisite packages. There is documentation on [installin
[ComboAggregateFactory](https://deephaven.io/core/docs/reference/table-operations/group-and-aggregate/AggCombo/), [emptyTable](https://deephaven.io/core/docs/how-to-guides/empty-table/#related-documentation), and [merge](https://deephaven.io/core/docs/how-to-guides/merge-tables/#merge-tables).

```python
from deephaven import KafkaTools as kt, ComboAggregateFactory as caf
from deephaven import ConsumeKafka as ck, Aggregation as agg, combo_agg
from deephaven.TableTools import emptyTable, merge
```

Expand All @@ -32,12 +32,12 @@ This demo will demonstrate the impact of choices related to `offsets` and `table

```python
def get_trades(*, offsets, table_type):
return kt.consumeToTable(
return ck.consumeToTable(
{ 'bootstrap.servers' : 'demo-kafka.c.deephaven-oss.internal:9092',
'schema.registry.url' : 'http://demo-kafka.c.deephaven-oss.internal:8081' },
'io.deephaven.crypto.kafka.TradesTopic',
key = kt.IGNORE,
value = kt.avro('io.deephaven.crypto.kafka.TradesTopic-io.deephaven.crypto.Trade'),
key = ck.IGNORE,
value = ck.avro('io.deephaven.crypto.kafka.TradesTopic-io.deephaven.crypto.Trade'),
offsets=offsets,
table_type=table_type)
```
Expand All @@ -50,7 +50,7 @@ In this demo, imagine you want to start your Kafka feed "1 million events ago" (
Create a Deephaven table that listens to current records (-- i.e. crypto trades happening now).

```python
latest_offset = get_trades(offsets=kt.ALL_PARTITIONS_SEEK_TO_END, table_type='stream')
latest_offset = get_trades(offsets=ck.ALL_PARTITIONS_SEEK_TO_END, table_type='stream')
```

\
Expand Down Expand Up @@ -100,9 +100,13 @@ Define a [table aggregation function](https://deephaven.io/core/docs/reference/t

```python
def trades_agg(table):
return table.by(caf.AggCombo(caf.AggCount("Trade_Count"),caf.AggSum("Total_Size = Size")),"Exchange", "Instrument")\
.sort("Exchange", "Instrument")\
.formatColumnWhere("Instrument", "Instrument.startsWith(`BTC`)", "IVORY")
agg_list = combo_agg([
agg.AggCount("Trade_Count"),
agg.AggSum("Total_Size = Size"),
])
return table.aggBy(agg_list, "Exchange", "Instrument").\
sort("Exchange", "Instrument").\
formatColumnWhere("Instrument", "Instrument.startsWith(`BTC`)", "IVORY")
```

\
Expand Down

0 comments on commit b030989

Please sign in to comment.