From c57c54251d90960e8059580ddac11111de9e62d7 Mon Sep 17 00:00:00 2001 From: JJ Brosnan <84038776+jjbrosnan@users.noreply.github.com> Date: Thu, 2 Dec 2021 12:13:52 -0500 Subject: [PATCH 1/7] Update first notebook --- demo/web/src/main/notebooks/00 The Deephaven IDE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/demo/web/src/main/notebooks/00 The Deephaven IDE.md b/demo/web/src/main/notebooks/00 The Deephaven IDE.md index 807433c7fcd..e3d6d36ede0 100644 --- a/demo/web/src/main/notebooks/00 The Deephaven IDE.md +++ b/demo/web/src/main/notebooks/00 The Deephaven IDE.md @@ -44,7 +44,7 @@ static_table = newTable( ```python from deephaven.TableTools import timeTable import random -updating_table = timeTable('00:00:00.400').updateView("Row = i", "Some_Int = random.randint(0,100)").reverse() +updating_table = timeTable('00:00:00.400').updateView("Row = i", "Some_Int = (int)random.randint(0,100)").reverse() ``` From f1f7f72a450a2604992d902f4990399b7c9785d6 Mon Sep 17 00:00:00 2001 From: JJ Brosnan <84038776+jjbrosnan@users.noreply.github.com> Date: Thu, 2 Dec 2021 14:37:57 -0500 Subject: [PATCH 2/7] Updates --- .../notebooks/01 Tables, Updates, and the Engine.md | 13 +++++-------- .../main/notebooks/02 Stream and Batch Together.md | 10 +++++----- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/demo/web/src/main/notebooks/01 Tables, Updates, and the Engine.md b/demo/web/src/main/notebooks/01 Tables, Updates, and the Engine.md index 523ebac7a44..f5eb40b4391 100644 --- a/demo/web/src/main/notebooks/01 Tables, Updates, and the Engine.md +++ b/demo/web/src/main/notebooks/01 Tables, Updates, and the Engine.md @@ -9,16 +9,16 @@ You can quickly see streaming data in a UI and do table operations, interactivel For example, you can listen to a Kafka stream of cryptocurrency trades sourced from their native exchanges (like the ones below, built using the [XChange library](https://github.com/knowm/XChange)). ```python -from deephaven import KafkaTools as kt +from deephaven import ConsumeKafka as ck def get_trades_stream(): - return kt.consumeToTable( + return ck.consumeToTable( { 'bootstrap.servers' : 'demo-kafka.c.deephaven-oss.internal:9092', 'schema.registry.url' : 'http://demo-kafka.c.deephaven-oss.internal:8081' }, 'io.deephaven.crypto.kafka.TradesTopic', - key = kt.IGNORE, - value = kt.avro('io.deephaven.crypto.kafka.TradesTopic-io.deephaven.crypto.Trade'), - offsets=kt.ALL_PARTITIONS_SEEK_TO_END, + key = ck.IGNORE, + value = ck.avro('io.deephaven.crypto.kafka.TradesTopic-io.deephaven.crypto.Trade'), + offsets=ck.ALL_PARTITIONS_SEEK_TO_END, table_type='append') trades_stream = get_trades_stream() @@ -29,9 +29,6 @@ trades_stream = get_trades_stream() To keep the most recent ticks within view, you could sort the table descending by timestamp. Alternatively, you can reverse the table. ```python -# Not doing this: -# t = t.sortDescending("Timestamp") - trades_stream = trades_stream.reverse() ``` \ diff --git a/demo/web/src/main/notebooks/02 Stream and Batch Together.md b/demo/web/src/main/notebooks/02 Stream and Batch Together.md index 7579c0f3e3f..af094247b85 100644 --- a/demo/web/src/main/notebooks/02 Stream and Batch Together.md +++ b/demo/web/src/main/notebooks/02 Stream and Batch Together.md @@ -12,16 +12,16 @@ Below you’ll do calculations and aggregations on stream and batch data using i First, hook up a Kafka stream. (This is the same script from the first notebook.) Our [how-to guide](https://deephaven.io/core/docs/how-to-guides/kafka-stream/) provides detail on the integration. ```python -from deephaven import KafkaTools as kt +from deephaven import ConsumeKafka as ck def get_trades_stream(): - return kt.consumeToTable( + return ck.consumeToTable( { 'bootstrap.servers' : 'demo-kafka.c.deephaven-oss.internal:9092', 'schema.registry.url' : 'http://demo-kafka.c.deephaven-oss.internal:8081' }, 'io.deephaven.crypto.kafka.TradesTopic', - key = kt.IGNORE, - value = kt.avro('io.deephaven.crypto.kafka.TradesTopic-io.deephaven.crypto.Trade'), - offsets=kt.ALL_PARTITIONS_SEEK_TO_END, + key = ck.IGNORE, + value = ck.avro('io.deephaven.crypto.kafka.TradesTopic-io.deephaven.crypto.Trade'), + offsets=ck.ALL_PARTITIONS_SEEK_TO_END, table_type='append') trades_stream = get_trades_stream() From d40a0b5cfc8aba5179028811e8747fc73a50a528 Mon Sep 17 00:00:00 2001 From: JJ Brosnan <84038776+jjbrosnan@users.noreply.github.com> Date: Mon, 6 Dec 2021 15:09:10 -0500 Subject: [PATCH 3/7] Fix all examples in remaining notebooks --- .../01 Tables, Updates, and the Engine.md | 19 +++++++++------ .../notebooks/02 Stream and Batch Together.md | 23 ++++++++++++++----- .../notebooks/03 Kafka Stream vs Append.md | 20 +++++++++------- 3 files changed, 41 insertions(+), 21 deletions(-) diff --git a/demo/web/src/main/notebooks/01 Tables, Updates, and the Engine.md b/demo/web/src/main/notebooks/01 Tables, Updates, and the Engine.md index f5eb40b4391..c51bc6dd072 100644 --- a/demo/web/src/main/notebooks/01 Tables, Updates, and the Engine.md +++ b/demo/web/src/main/notebooks/01 Tables, Updates, and the Engine.md @@ -80,14 +80,19 @@ row_count_by_instrument = trades_stream_cleaner.countBy("Tot_Rows", "Instrument" Counts are informative, but often you'll be interested in other aggregations. The script below shows both how to [bin data by time](https://deephaven.io/core/docs/reference/cheat-sheets/datetime-cheat-sheet/#downsampling-temporal-data-via-time-binning) and to [do multiple aggregations](https://deephaven.io/core/docs/how-to-guides/combined-aggregations/). ```python -from deephaven import ComboAggregateFactory as caf +from deephaven import Aggregation as agg +from deephaven import Aggregation.combo_agg as combo_agg + +agg_list = combo_agg([ + agg.AggCount("Trade_Count"), + agg.AggSum("Total_Size = Size"), + agg.AggAvg("Avg_Size = Size", "Avg_Price = Price"), + agg.AggMin("Low_Price = Price"), + agg.AggMax("High_Price = Price") +]) + multi_agg = trades_stream_cleaner.updateView("TimeBin = upperBin(KafkaTimestamp, MINUTE)")\ - .by(caf.AggCombo( - caf.AggCount("Trade_Count"), - caf.AggSum("Total_Size = Size"), - caf.AggAvg("Avg_Size = Size", "Avg_Price = Price"), - caf.AggMin("Low_Price = Price"), - caf.AggMax("High_Price = Price")),"TimeBin", "Instrument")\ + .aggBy(agg_list, "TimeBin", "Instrument")\ .sortDescending("TimeBin", "Trade_Count")\ .formatColumnWhere("Instrument", "Instrument = `BTC/USD`", "CYAN") ``` diff --git a/demo/web/src/main/notebooks/02 Stream and Batch Together.md b/demo/web/src/main/notebooks/02 Stream and Batch Together.md index af094247b85..6eea7890e21 100644 --- a/demo/web/src/main/notebooks/02 Stream and Batch Together.md +++ b/demo/web/src/main/notebooks/02 Stream and Batch Together.md @@ -64,8 +64,8 @@ Let's return to our crypto data. Read in a CSV of batch crypto data sourced on 09/22/2021. ```python -from deephaven.TableTools import readCsv -trades_batch_view = readCsv("/data/large/crypto/CryptoTrades_20210922.csv") +from deephaven import csv +trades_batch_view = csv.read("/data/large/crypto/CryptoTrades_20210922.csv") ``` \ @@ -93,15 +93,26 @@ The following scripts will demonstrate much the same with two examples: ```python # the table decoration -from deephaven.DBTimeUtils import formatDate +from deephaven.DateTimeUtils import formatDate add_column_streaming = trades_stream_view.updateView("Date = formatDate(KafkaTimestamp, TZ_NY)") add_column_batch = trades_batch_view .updateView("Date = formatDate(Timestamp, TZ_NY)") # the table aggregation -from deephaven import ComboAggregateFactory as caf -agg_streaming = add_column_streaming.by(caf.AggCombo(caf.AggFirst("Price"), caf.AggAvg("Avg_Price = Price")), "Date", "Exchange", "Instrument") -agg_batch = add_column_batch .by(caf.AggCombo(caf.AggFirst("Price"), caf.AggAvg("Avg_Price = Price")), "Date", "Exchange", "Instrument") +from deephaven import Aggregation as agg, combo_agg + +agg_list = combo_agg([ + agg.AggFirst("Price"), + agg.AggAvg("Avg_Price = Price"), +]) + +agg_streaming = add_column_streaming.aggBy( + agg_list, "Date", "Exchange", "Instrument" +) + +agg_batch = add_column_batch.aggBy( + agg_list, "Date", "Exchange", "Instrument" +) ``` \ diff --git a/demo/web/src/main/notebooks/03 Kafka Stream vs Append.md b/demo/web/src/main/notebooks/03 Kafka Stream vs Append.md index 950535e4d08..4a6ec1cbfbb 100644 --- a/demo/web/src/main/notebooks/03 Kafka Stream vs Append.md +++ b/demo/web/src/main/notebooks/03 Kafka Stream vs Append.md @@ -17,7 +17,7 @@ Start by importing some requisite packages. There is documentation on [installin [ComboAggregateFactory](https://deephaven.io/core/docs/reference/table-operations/group-and-aggregate/AggCombo/), [emptyTable](https://deephaven.io/core/docs/how-to-guides/empty-table/#related-documentation), and [merge](https://deephaven.io/core/docs/how-to-guides/merge-tables/#merge-tables). ```python -from deephaven import KafkaTools as kt, ComboAggregateFactory as caf +from deephaven import ConsumeKafka as ck, Aggregation as agg, combo_agg from deephaven.TableTools import emptyTable, merge ``` @@ -32,12 +32,12 @@ This demo will demonstrate the impact of choices related to `offsets` and `table ```python def get_trades(*, offsets, table_type): - return kt.consumeToTable( + return ck.consumeToTable( { 'bootstrap.servers' : 'demo-kafka.c.deephaven-oss.internal:9092', 'schema.registry.url' : 'http://demo-kafka.c.deephaven-oss.internal:8081' }, 'io.deephaven.crypto.kafka.TradesTopic', - key = kt.IGNORE, - value = kt.avro('io.deephaven.crypto.kafka.TradesTopic-io.deephaven.crypto.Trade'), + key = ck.IGNORE, + value = ck.avro('io.deephaven.crypto.kafka.TradesTopic-io.deephaven.crypto.Trade'), offsets=offsets, table_type=table_type) ``` @@ -50,7 +50,7 @@ In this demo, imagine you want to start your Kafka feed "1 million events ago" ( Create a Deephaven table that listens to current records (-- i.e. crypto trades happening now). ```python -latest_offset = get_trades(offsets=kt.ALL_PARTITIONS_SEEK_TO_END, table_type='stream') +latest_offset = get_trades(offsets=ck.ALL_PARTITIONS_SEEK_TO_END, table_type='stream') ``` \ @@ -100,9 +100,13 @@ Define a [table aggregation function](https://deephaven.io/core/docs/reference/t ```python def trades_agg(table): - return table.by(caf.AggCombo(caf.AggCount("Trade_Count"),caf.AggSum("Total_Size = Size")),"Exchange", "Instrument")\ - .sort("Exchange", "Instrument")\ - .formatColumnWhere("Instrument", "Instrument.startsWith(`BTC`)", "IVORY") + agg_list = combo_agg([ + agg.AggCount("Trade_Count"), + agg.AggSum("Total_Size = Size"), + ]) + return table.aggBy(agg_list, "Exchange", "Instrument").\ + sort("Exchange", "Instrument").\ + formatColumnWhere("Instrument", "Instrument.startsWith(`BTC`)", "IVORY") ``` \ From 1bd0f42f8f45fb5858f9715a2daa1cc92d7fef7d Mon Sep 17 00:00:00 2001 From: JJ Brosnan <84038776+jjbrosnan@users.noreply.github.com> Date: Wed, 8 Dec 2021 14:44:48 -0500 Subject: [PATCH 4/7] Quick --- demo/web/src/main/notebooks/00 The Deephaven IDE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/demo/web/src/main/notebooks/00 The Deephaven IDE.md b/demo/web/src/main/notebooks/00 The Deephaven IDE.md index e3d6d36ede0..6a169f56a0a 100644 --- a/demo/web/src/main/notebooks/00 The Deephaven IDE.md +++ b/demo/web/src/main/notebooks/00 The Deephaven IDE.md @@ -44,7 +44,7 @@ static_table = newTable( ```python from deephaven.TableTools import timeTable import random -updating_table = timeTable('00:00:00.400').updateView("Row = i", "Some_Int = (int)random.randint(0,100)").reverse() +updating_table = timeTable('00:00:00.400').updateView("Row = i", "Some_Int = (byte)random.randint(0,100)").reverse() ``` From a36e14338d568ae76a30ceb633997078513e6a50 Mon Sep 17 00:00:00 2001 From: JJ Brosnan <84038776+jjbrosnan@users.noreply.github.com> Date: Wed, 8 Dec 2021 14:45:13 -0500 Subject: [PATCH 5/7] Back to int --- demo/web/src/main/notebooks/00 The Deephaven IDE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/demo/web/src/main/notebooks/00 The Deephaven IDE.md b/demo/web/src/main/notebooks/00 The Deephaven IDE.md index 6a169f56a0a..f10114de8e5 100644 --- a/demo/web/src/main/notebooks/00 The Deephaven IDE.md +++ b/demo/web/src/main/notebooks/00 The Deephaven IDE.md @@ -44,7 +44,7 @@ static_table = newTable( ```python from deephaven.TableTools import timeTable import random -updating_table = timeTable('00:00:00.400').updateView("Row = i", "Some_Int = (byte)random.randint(0,100)").reverse() +updating_table = timeTable('00:00:00.400').updateView("Row = i", "Some_Int = (int)random.randint(0,100)").reverse() ``` From 0c9149d1593f9db73b3f0bdfaebea5e28b893219 Mon Sep 17 00:00:00 2001 From: JJ Brosnan <84038776+jjbrosnan@users.noreply.github.com> Date: Thu, 9 Dec 2021 13:29:50 -0500 Subject: [PATCH 6/7] Apply suggestions from code review Co-authored-by: Chip Kent <5250374+chipkent@users.noreply.github.com> --- .../src/main/notebooks/01 Tables, Updates, and the Engine.md | 3 +-- demo/web/src/main/notebooks/02 Stream and Batch Together.md | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/demo/web/src/main/notebooks/01 Tables, Updates, and the Engine.md b/demo/web/src/main/notebooks/01 Tables, Updates, and the Engine.md index c51bc6dd072..5535b6244ce 100644 --- a/demo/web/src/main/notebooks/01 Tables, Updates, and the Engine.md +++ b/demo/web/src/main/notebooks/01 Tables, Updates, and the Engine.md @@ -80,8 +80,7 @@ row_count_by_instrument = trades_stream_cleaner.countBy("Tot_Rows", "Instrument" Counts are informative, but often you'll be interested in other aggregations. The script below shows both how to [bin data by time](https://deephaven.io/core/docs/reference/cheat-sheets/datetime-cheat-sheet/#downsampling-temporal-data-via-time-binning) and to [do multiple aggregations](https://deephaven.io/core/docs/how-to-guides/combined-aggregations/). ```python -from deephaven import Aggregation as agg -from deephaven import Aggregation.combo_agg as combo_agg +from deephaven import Aggregation as agg, combo_agg agg_list = combo_agg([ agg.AggCount("Trade_Count"), diff --git a/demo/web/src/main/notebooks/02 Stream and Batch Together.md b/demo/web/src/main/notebooks/02 Stream and Batch Together.md index 6eea7890e21..85b4520b783 100644 --- a/demo/web/src/main/notebooks/02 Stream and Batch Together.md +++ b/demo/web/src/main/notebooks/02 Stream and Batch Together.md @@ -64,8 +64,8 @@ Let's return to our crypto data. Read in a CSV of batch crypto data sourced on 09/22/2021. ```python -from deephaven import csv -trades_batch_view = csv.read("/data/large/crypto/CryptoTrades_20210922.csv") +from deephaven import read_csv +trades_batch_view = read_csv("/data/large/crypto/CryptoTrades_20210922.csv") ``` \ From 94f55725fd0e9bc5b60b5004865d46bc6a5bc492 Mon Sep 17 00:00:00 2001 From: JJ Brosnan <84038776+jjbrosnan@users.noreply.github.com> Date: Thu, 9 Dec 2021 16:33:05 -0500 Subject: [PATCH 7/7] Apply suggestions from code review Change combo_agg to as_list Co-authored-by: Chip Kent <5250374+chipkent@users.noreply.github.com> --- .../src/main/notebooks/01 Tables, Updates, and the Engine.md | 4 ++-- demo/web/src/main/notebooks/02 Stream and Batch Together.md | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/demo/web/src/main/notebooks/01 Tables, Updates, and the Engine.md b/demo/web/src/main/notebooks/01 Tables, Updates, and the Engine.md index 5535b6244ce..7c0e0f2466e 100644 --- a/demo/web/src/main/notebooks/01 Tables, Updates, and the Engine.md +++ b/demo/web/src/main/notebooks/01 Tables, Updates, and the Engine.md @@ -80,9 +80,9 @@ row_count_by_instrument = trades_stream_cleaner.countBy("Tot_Rows", "Instrument" Counts are informative, but often you'll be interested in other aggregations. The script below shows both how to [bin data by time](https://deephaven.io/core/docs/reference/cheat-sheets/datetime-cheat-sheet/#downsampling-temporal-data-via-time-binning) and to [do multiple aggregations](https://deephaven.io/core/docs/how-to-guides/combined-aggregations/). ```python -from deephaven import Aggregation as agg, combo_agg +from deephaven import Aggregation as agg, as_list -agg_list = combo_agg([ +agg_list = as_list([ agg.AggCount("Trade_Count"), agg.AggSum("Total_Size = Size"), agg.AggAvg("Avg_Size = Size", "Avg_Price = Price"), diff --git a/demo/web/src/main/notebooks/02 Stream and Batch Together.md b/demo/web/src/main/notebooks/02 Stream and Batch Together.md index 85b4520b783..421319eb7a1 100644 --- a/demo/web/src/main/notebooks/02 Stream and Batch Together.md +++ b/demo/web/src/main/notebooks/02 Stream and Batch Together.md @@ -99,9 +99,9 @@ add_column_streaming = trades_stream_view.updateView("Date = formatDate(KafkaTim add_column_batch = trades_batch_view .updateView("Date = formatDate(Timestamp, TZ_NY)") # the table aggregation -from deephaven import Aggregation as agg, combo_agg +from deephaven import Aggregation as agg, as_list -agg_list = combo_agg([ +agg_list = as_list([ agg.AggFirst("Price"), agg.AggAvg("Avg_Price = Price"), ])