- How to Start
- ksqlDB Syntax Reference
- Hello ksqlDB
- Basic Data 1
- Basic Data 2
- Basic Data 3
- Array
- Map
- Complex Data Types
- Stream & Table Key
- Commodity - First Step
- Row Key
- Notes From This Point Forward
- Commodity - Additional Requirements
- Commodity - Reward for Each Location
- Run Script File
- Commodity - Further Fraud Processing
- ksqldb REST API
- Feedback - Are We Good Enough?
- Feedback - Who Owns This Feedback?
- Feedback - Good Feedback or Bad Feedback?
- Feedback - Group Using Table
- Feedback - Overall Good or Bad
- Insert Data Using Ksql
- Customer - Web & Mobile
- Customer - Shopping Cart & Wishlist
- Pull Query
- Flash Sale
- Flash Sale - Timestamp
- Feedback - Average Rating
- Feedback - Detailed Rating
- Inventory - Sum & Subtract Records
- Inventory - Timestamp Extractor
- Inventory - Tumbling Time Window
- Inventory - Hopping Time Window
- Inner Join Stream / Stream
- Left Join Stream / Stream
- Outer Join Stream / Stream
- Inner Join Table / Table
- Left Join Table / Table
- Outer Join Table / Table
- Inner Join Stream / Table
- Left Join Stream / Table
- Stream / Table Co-partition
- User Defined Function
- User Defined Tabular Function
- User Defined Aggregation Function
- Avro on KsqlDB
- Writing Avro Schema
- Avro Json Conversion
- KsqlDB & Kafka Connect
- Java Client
- Stop docker using
docker-compose down
(see here) - Delete subfolder
data
that exists at same location with docker compose scripts - Start docker using
docker-compose up
, on filedocker-compose-full.yml
(see here) - From eclipse, run all
kafka-stream-xxx
projects, exceptkafka-stream-sample
- Re-create kafka topics using this script
- Reference below only quick version of ksqlDB syntax which is used on the course.
- Some examples for quick start (more complete than reference below, but still simple enough) available here.
- For complete example, see ksqlDB website. Note that documentation for each ksqlDB release might different. Choose the appropriate version when you see the reference on ksqlDB website.
Each ksqlDB statement must be ended with semicolon (;
).
Single-line ksql comment is started with double dash (--)
-- This is a comment
Multi-line ksql comments is between /* ... */
/*
Multiple
comment
lines
*/
{::options parse_block_html="true" /}
Click to expand!
Notes
- Topic name is case-sensitive (
myTopic
is different withmyTOPIC
) - If the topic name contains non-alphanumeric characters, quote the topic name between single quotes (
'
)
-- List all kafka topics
SHOW TOPICS;
-- Show messages in kafka topic, start from message received after the command executed
-- Topic name is alphanumeric
PRINT topicNameCaseSensitive;
-- Topic name contains non-alphanumeric characters, put topic name between backtick
PRINT `topic-name-with-some-dash`;
-- Topics below contain non-alphanumeric characters, so written between single quotes
-- Show messages in kafka topic, start from message received after the command executed, until 500 messages only
PRINT `topic-name-with-some-dash` LIMIT 500;
-- Show all messages in kafka topic
PRINT `topic-name-with-some-dash` FROM BEGINNING;
-- Show the first 1000 messages in kafka topic
PRINT `topic-name-with-some-dash` FROM BEGINNING LIMIT 1000;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
```bash -- List all ksqlDB streams SHOW STREAMS;-- List all ksqlDB tables SHOW TABLES;
-- Describe kafka stream or tables (summary)
DESCRIBE stream-or-table-name
;
-- Describe kafka stream or tables (detail)
DESCRIBE stream-or-table-name
EXTENDED;
</details>
{::options parse_block_html="false" /}
----
## Hello ksqlDB
{::options parse_block_html="true" /}
[Back to top](/spring-kafka-bootcamp/ksqldb)
<details><summary markdown="span">Click to expand!</summary>
```bash
-- Set the offset to earliest
SET 'auto.offset.reset'='earliest';
-- Print data in topic
PRINT `t-commodity-promotion`
FROM BEGINNING;
-- Create stream from underlying topic, with JSON value
CREATE STREAM `s-commodity-promotion` (
promotionCode VARCHAR
) WITH (
KAFKA_TOPIC = 't-commodity-promotion',
VALUE_FORMAT = 'JSON'
);
-- Push query from stream
SELECT *
FROM `s-commodity-promotion`
EMIT CHANGES;
-- Push query with transformed data
SELECT UCASE(promotionCode) AS uppercasePromotionCode
FROM `s-commodity-promotion`
EMIT CHANGES;
-- Create stream from stream, publishing the transformed data into custom output topic
CREATE STREAM `s-commodity-promotion-uppercase`
WITH (
kafka_topic = 't-ksql-commodity-promotion-uppercase'
)
AS
SELECT UCASE(promotionCode) AS uppercasePromotionCode
FROM `s-commodity-promotion`
EMIT CHANGES;
-- Push query from the uppercase stream
SELECT *
FROM `s-commodity-promotion-uppercase`
EMIT CHANGES;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
```bash -- Print data in topic PRINT `t-ksql-basic-data-one` FROM BEGINNING;-- Create stream
CREATE STREAM s-basic-data-one
(
myString
STRING,
myFloat
DOUBLE,
myBoolean
BOOLEAN,
myInteger
INT,
myDouble
DOUBLE,
myBigDecimal
DECIMAL(30,18),
myLong
BIGINT,
myAnotherString
VARCHAR
)
WITH (
KAFKA_TOPIC = 't-ksql-basic-data-one',
VALUE_FORMAT = 'JSON'
);
-- Try to replace stream with different column order
CREATE OR REPLACE STREAM s-basic-data-one
(
myBoolean
BOOLEAN,
myFloat
DOUBLE,
myDouble
DOUBLE,
myInteger
INT,
myLong
BIGINT,
myString
STRING,
myAnotherString
VARCHAR,
myBigDecimal
DECIMAL(30,18)
)
WITH (
KAFKA_TOPIC = 't-ksql-basic-data-one',
VALUE_FORMAT = 'JSON'
);
-- Drop stream without dropping topic
DROP STREAM IF EXISTS s-basic-data-one
;
-- Drop stream and delete underlying topic (be careful, data will lost)
DROP STREAM IF EXISTS s-basic-data-one
DELETE TOPIC;
-- Set the offset to earliest SET 'auto.offset.reset'='earliest';
-- Push query, up to 15 data only
SELECT *
FROM s-basic-data-one
EMIT CHANGES
LIMIT 15;
</details>
{::options parse_block_html="false" /}
----
## Basic Data 2
{::options parse_block_html="true" /}
[Back to top](/spring-kafka-bootcamp/ksqldb)
<details><summary markdown="span">Click to expand!</summary>
```bash
-- Print data in topic
PRINT `t-ksql-basic-data-two`
FROM BEGINNING;
-- Create stream with ksqldb date / time data type
CREATE OR REPLACE STREAM `s-basic-data-two` (
`myEpochDay` DATE,
`myMillisOfDay` TIME,
`myEpochMillis` TIMESTAMP
)
WITH (
KAFKA_TOPIC = 't-ksql-basic-data-two',
VALUE_FORMAT = 'JSON'
);
-- Push query
SELECT *
FROM `s-basic-data-two`
EMIT CHANGES;
-- Date / time functions sample
SELECT `myEpochDay`,
DATEADD(DAYS, 7, `myEpochDay`) AS `aWeekAfterMyEpochDay`,
`myMillisOfDay`,
TIMESUB(HOURS, 2, `myMillisOfDay`) AS `twoHoursBeforeMyMillisOfDay`,
`myEpochMillis`,
FORMAT_TIMESTAMP(`myEpochMillis`, 'dd-MMM-yyyy, HH:mm:ss Z', 'Asia/Jakarta') as `epochMillisAtJakartaTimezone`
FROM `s-basic-data-two`
EMIT CHANGES;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
```bash -- Print data in topic PRINT `t-ksql-basic-data-three`;-- Create stream
CREATE OR REPLACE STREAM s-basic-data-three
(
myLocalDate
VARCHAR,
myLocalDateCustomFormat
VARCHAR,
myLocalTime
VARCHAR,
myLocalTimeCustomFormat
VARCHAR,
myLocalDateTime
VARCHAR,
myLocalDateTimeCustomFormat
VARCHAR
)
WITH (
KAFKA_TOPIC = 't-ksql-basic-data-three',
VALUE_FORMAT = 'JSON'
);
-- Push query
SELECT *
FROM s-basic-data-three
EMIT CHANGES;
-- Test query (LocalDate)
SELECT myLocalDate
,
DATEADD(DAYS, 7, myLocalDate
) AS aWeekAfterMyLocalDate
,
CONCAT('Prefix string- ', myLocalDate
, ' -suffix String') AS myLocalDateConcatString
,
myLocalDateCustomFormat
,
DATEADD(DAYS, 7, myLocalDateCustomFormat
) AS aWeekAfterMyLocalDateCustomFormat
,
CONCAT('Prefix string- ', myLocalDateCustomFormat
, ' -suffix String') AS myLocalDateCustomFormatConcatString
FROM s-basic-data-three
EMIT CHANGES;
-- Test query (LocalTime)
SELECT myLocalTime
,
TIMEADD(HOURS, 3, myLocalTime
) AS 3HoursAfterMyLocalTime
,
CONCAT('Prefix string- ', myLocalTime
, ' -suffix String') AS myLocalTimeConcatString
,
myLocalTimeCustomFormat
,
TIMEADD(HOURS, 3, myLocalTimeCustomFormat
) AS 3HoursAfterMyLocalDateCustomFormat
,
CONCAT('Prefix string- ', myLocalTimeCustomFormat
, ' -suffix String') AS myLocalTimeCustomFormatConcatString
FROM s-basic-data-three
EMIT CHANGES;
-- Test query (LocalDateTime)
SELECT myLocalDateTime
,
DATEADD(DAYS, 2, myLocalDateTime
) AS 2DaysAfterMyLocalDateTime
,
CONCAT('Prefix string- ', myLocalDateTime
, ' -suffix String') AS myLocalDateTimeConcatString
,
myLocalDateTimeCustomFormat
,
DATEADD(DAYS, 2, myLocalDateTimeCustomFormat
) AS 2DaysAfterMyLocalDateTimeCustomFormat
,
CONCAT('Prefix string- ', myLocalDateTimeCustomFormat
, ' -suffix String') AS myLocalDateTimeCustomFormatConcatString
FROM s-basic-data-three
EMIT CHANGES;
-- Parse date / time from string
SELECT PARSE_DATE(myLocalDate
, 'yyyy-MM-dd') AS parsedLocalDate
,
PARSE_DATE(myLocalDateCustomFormat
, 'dd MMM yyyy') AS parsedLocalDateCustomFormat
,
PARSE_TIME(myLocalTime
, 'HH:mm:ss') AS parsedLocalTime
,
PARSE_TIME(myLocalTimeCustomFormat
, 'hh:mm:ss a') AS parsedLocalTimeCustomFormat
,
PARSE_TIMESTAMP(myLocalDateTime
, 'yyyy-MM-dd''T''HH:mm:ss') AS parsedLocalDateTime
,
PARSE_TIMESTAMP(myLocalDateTimeCustomFormat
, 'dd-MMM-yyyy hh:mm:ss a') AS parsedLocalDateTimeCustomFormat
FROM s-basic-data-three
EMIT CHANGES;
-- Create new stream for parsed date / time
CREATE STREAM s-basic-data-three-parsed
AS
SELECT PARSE_DATE(myLocalDate
, 'yyyy-MM-dd') AS parsedLocalDate
,
PARSE_DATE(myLocalDateCustomFormat
, 'dd MMM yyyy') AS parsedLocalDateCustomFormat
,
PARSE_TIME(myLocalTime
, 'HH:mm:ss') AS parsedLocalTime
,
PARSE_TIME(myLocalTimeCustomFormat
, 'hh:mm:ss a') AS parsedLocalTimeCustomFormat
,
PARSE_TIMESTAMP(myLocalDateTime
, 'yyyy-MM-dd''T''HH:mm:ss') AS parsedLocalDateTime
,
PARSE_TIMESTAMP(myLocalDateTimeCustomFormat
, 'dd-MMM-yyyy hh:mm:ss a') AS parsedLocalDateTimeCustomFormat
FROM s-basic-data-three
EMIT CHANGES;
-- Describe stream
DESCRIBE s-basic-data-three
;
DESCRIBE s-basic-data-three-parsed
;
DESCRIBE s-basic-data-three
EXTENDED;
DESCRIBE s-basic-data-three-parsed
EXTENDED;
-- Test query from parsed stream (LocalDate)
SELECT parsedLocalDate
,
DATEADD(DAYS, 7, parsedLocalDate
) AS aWeekAfterParsedLocalDate
,
parsedLocalDateCustomFormat
,
DATEADD(DAYS, 7, parsedLocalDateCustomFormat
) AS aWeekAfterParsedLocalDateCustomFormat
FROM s-basic-data-three-parsed
EMIT CHANGES;
-- Test query from parsed stream (LocalTime)
SELECT parsedLocalTime
,
TIMEADD(HOURS, 3, parsedLocalTime
) AS 3HoursAfterParsedLocalTime
,
parsedLocalTimeCustomFormat
,
TIMEADD(HOURS, 3, parsedLocalTimeCustomFormat
) AS 3HoursAfterParsedLocalDateCustomFormat
FROM s-basic-data-three-parsed
EMIT CHANGES;
-- Test query from parsed stream (LocalDateTime)
SELECT parsedLocalDateTime
,
TIMESTAMPADD(DAYS, 2, parsedLocalDateTime
) AS 2DaysAfterParsedLocalDateTime
,
parsedLocalDateTimeCustomFormat
,
TIMESTAMPADD(DAYS, 2, parsedLocalDateTimeCustomFormat
) AS 2DaysAfterParsedLocalDateTimeCustomFormat
FROM s-basic-data-three-parsed
EMIT CHANGES;
</details>
{::options parse_block_html="false" /}
----
## Array
{::options parse_block_html="true" /}
[Back to top](/spring-kafka-bootcamp/ksqldb)
<details><summary markdown="span">Click to expand!</summary>
```bash
-- Print data in topic
PRINT `t-ksql-basic-data-four`;
-- Create stream which contains array / list / set
CREATE STREAM `s-basic-data-four` (
`myStringArray` ARRAY<VARCHAR>,
`myIntegerList` ARRAY<INT>,
`myDoubleSet` ARRAY<DOUBLE>
) WITH (
KAFKA_TOPIC = 't-ksql-basic-data-four',
VALUE_FORMAT = 'JSON'
);
-- Push query
SELECT *
FROM `s-basic-data-four`
EMIT CHANGES;
-- Describe stream
DESCRIBE `s-basic-data-four`;
-- Test array functions
SELECT ARRAY_LENGTH(`myStringArray`) as `lengthMyStringArray`,
ARRAY_CONCAT(`myIntegerList`, ARRAY[999, 998, 997]) as `concatMyIntegerList`,
ARRAY_SORT(`myDoubleSet`, 'DESC') as `sortedDescMyDoubleSet`
FROM `s-basic-data-four`
EMIT CHANGES;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
```bash -- Print data in topic PRINT `t-ksql-basic-data-five`;-- Create stream which contains array / list / set
CREATE STREAM s-basic-data-five
(
myMapAlpha
MAP<VARCHAR, VARCHAR>,
myMapBeta
MAP<VARCHAR, VARCHAR>
) WITH (
KAFKA_TOPIC = 't-ksql-basic-data-five',
VALUE_FORMAT = 'JSON'
);
-- Push query
SELECT *
FROM s-basic-data-five
EMIT CHANGES;
-- Describe stream
DESCRIBE s-basic-data-five
;
-- Test map functions
SELECT MAP_VALUES(myMapAlpha
) as valuesAtMyMapAlpha
,
MAP_KEYS(myMapBeta
) as keysAtMyMapBeta
FROM s-basic-data-five
EMIT CHANGES;
</details>
{::options parse_block_html="false" /}
----
## Complex Data Types
{::options parse_block_html="true" /}
[Back to top](/spring-kafka-bootcamp/ksqldb)
<details><summary markdown="span">Click to expand!</summary>
```bash
-- Print data in topic
PRINT `t-ksql-basic-data-person`
FROM BEGINNING;
-- Create stream which contains complex data types
CREATE STREAM `s-basic-data-person` (
`firstName` VARCHAR,
`lastName` VARCHAR,
`birthDate` VARCHAR,
`contacts` MAP<VARCHAR, VARCHAR>,
`passport` STRUCT<
`number` VARCHAR,
`expiryDate` VARCHAR
>,
`addresses` ARRAY<
STRUCT<
`streetAddress` VARCHAR,
`country` VARCHAR,
`location` STRUCT<
`latitude` DOUBLE,
`longitude` DOUBLE
>
>
>
) WITH (
KAFKA_TOPIC = 't-ksql-basic-data-person',
VALUE_FORMAT = 'JSON'
);
-- Push query
SELECT *
FROM `s-basic-data-person`
EMIT CHANGES;
-- Accessing contact (MAP) by key
SELECT `contacts`['email'] AS `emailFromContactsMap`,
`contacts`['phoneHome'] AS `phoneHomeFromContactsMap`,
`contacts`['phoneWork'] AS `phoneWorkFromContactsMap`
FROM `s-basic-data-person`
EMIT CHANGES;
-- Accessing passport (STRUCT) by field name
SELECT `passport`->`number` AS `passportNumber`,
`passport`->`expiryDate` AS `passportExpiryDate`
FROM `s-basic-data-person`
EMIT CHANGES;
-- Convert each element in list addresses into one record
SELECT `firstName`, `lastName`,
EXPLODE(`addresses`) as `addressSingle`
FROM `s-basic-data-person`
EMIT CHANGES;
-- Convert each element in ARRAY addresses into one record, then access each field in address
SELECT `firstName`, `lastName`,
EXPLODE(`addresses`)->`streetAddress`,
EXPLODE(`addresses`)->`country`,
EXPLODE(`addresses`)->`location`
FROM `s-basic-data-person`
EMIT CHANGES;
-- Convert each element in ARRAY addresses into one record, then access each field in address
-- including latitude / longitude which actually field at STRUCT within STRUCT
SELECT `firstName`, `lastName`,
EXPLODE(`addresses`)->`streetAddress`,
EXPLODE(`addresses`)->`country`,
EXPLODE(`addresses`)->`location`->`latitude` AS `latitude`,
EXPLODE(`addresses`)->`location`->`longitude` AS `longitude`
FROM `s-basic-data-person`
EMIT CHANGES;
-- Create stream which each field at STRUCT converted as one column, and ARRAY exploded so each
-- element at ARRAY becomes one row. Also, date string converted to DATE data type
CREATE STREAM `s-basic-data-person-complete`
AS
SELECT `firstName`,
`lastName`,
PARSE_DATE(`birthDate`, 'yyyy-MM-dd') AS `birthDate`,
`contacts`,
`passport`->`number` AS `passportNumber`,
PARSE_DATE(`passport`->`expiryDate`,'yyyy-MM-dd') AS `passportExpiryDate`,
EXPLODE(`addresses`)->`streetAddress`,
EXPLODE(`addresses`)->`country`,
EXPLODE(`addresses`)->`location`->`latitude` AS `latitude`,
EXPLODE(`addresses`)->`location`->`longitude` AS `longitude`
FROM `s-basic-data-person`
EMIT CHANGES;
-- Describe the complete stream
DESCRIBE `s-basic-data-person-complete`;
-- Push query from complete stream
SELECT *
FROM `s-basic-data-person-complete`
EMIT CHANGES;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- Print country topic
PRINT `t-ksql-basic-data-country`
FROM BEGINNING;
-- Create stream country
CREATE STREAM `s-basic-data-country` (
`countryName` VARCHAR,
`currencyCode` VARCHAR,
`population` INT
) WITH (
KAFKA_TOPIC = 't-ksql-basic-data-country',
VALUE_FORMAT = 'JSON'
);
-- Describe stream
DESCRIBE `s-basic-data-country`;
-- Show data from stream country
SET 'auto.offset.reset'='earliest';
SELECT *
FROM `s-basic-data-country`
EMIT CHANGES;
-- Re-key by country name, create new stream
DROP STREAM IF EXISTS `s-basic-data-country-rekeyed`;
CREATE STREAM `s-basic-data-country-rekeyed`
AS
SELECT `countryName`, `currencyCode`, `population`
FROM `s-basic-data-country`
PARTITION BY `countryName`
EMIT CHANGES;
-- Describe rekeyed stream
DESCRIBE `s-basic-data-country-rekeyed`;
-- Re-key by country name, create new stream, preserve value
DROP STREAM IF EXISTS `s-basic-data-country-rekeyed`;
CREATE STREAM `s-basic-data-country-rekeyed`
AS
SELECT `countryName` AS `rowkey`, AS_VALUE(`countryName`) AS `countryName`, `currencyCode`, `population`
FROM `s-basic-data-country`
PARTITION BY `countryName`
EMIT CHANGES;
-- Show data from stream country (rekeyed)
SET 'auto.offset.reset'='earliest';
SELECT *
FROM `s-basic-data-country-rekeyed`
EMIT CHANGES;
-- Re-key by country name & currency code (JSON), create new stream
DROP STREAM IF EXISTS `s-basic-data-country-rekeyed-json`;
CREATE STREAM `s-basic-data-country-rekeyed-json`
WITH (
KEY_FORMAT = 'JSON'
)
AS
SELECT STRUCT(`countryName` := `countryName`, `currencyCode` := `currencyCode`) AS `jsonKey`,
AS_VALUE(`countryName`) AS `countryName`,
AS_VALUE(`currencyCode`) AS `currencyCode`,
`population`
FROM `s-basic-data-country`
PARTITION BY STRUCT(`countryName` := `countryName`, `currencyCode` := `currencyCode`)
EMIT CHANGES;
-- Show data from stream country (rekeyed)
SET 'auto.offset.reset'='earliest';
SELECT *
FROM `s-basic-data-country-rekeyed-json`
EMIT CHANGES;
-- Create table where key is country name and summarize population
DROP TABLE IF EXISTS `tbl-basic-data-country`;
CREATE TABLE `tbl-basic-data-country`
AS
SELECT `countryName`, SUM(`population`) AS `totalPopulation`
FROM `s-basic-data-country`
GROUP BY `countryName`
EMIT CHANGES;
-- Show data from table country
SET 'auto.offset.reset'='earliest';
SELECT *
FROM `tbl-basic-data-country`
EMIT CHANGES;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
```bash -- Create base commodity stream CREATE STREAM `s-commodity-order` ( `rowkey` VARCHAR KEY, `creditCardNumber` VARCHAR, `itemName` VARCHAR, `orderDateTime` VARCHAR, `orderLocation` VARCHAR, `orderNumber` VARCHAR, `price` INT, `quantity` INT ) WITH ( KAFKA_TOPIC = 't-commodity-order', VALUE_FORMAT = 'JSON' );-- Describe stream
DESCRIBE s-commodity-order
;
-- Push query
SELECT *
FROM s-commodity-order
EMIT CHANGES;
-- Mask credit card number
CREATE STREAM s-commodity-order-masked
AS
SELECT rowkey
, MASK_LEFT(creditCardNumber
, 12, '', '', '', '') AS maskedCreditCardNumber
,
itemName
, orderDateTime
, orderLocation
, orderNumber
, price
, quantity
FROM s-commodity-order
EMIT CHANGES;
-- Push query
SELECT *
FROM s-commodity-order-masked
EMIT CHANGES;
-- Calculate total item amount to pattern output
CREATE STREAM s-commodity-pattern-one
AS
SELECT rowkey
, itemName
, orderDateTime
, orderLocation
, orderNumber
,
(price
* quantity
) as totalItemAmount
FROM s-commodity-order-masked
EMIT CHANGES;
-- Push query from pattern
SELECT *
FROM s-commodity-pattern-one
EMIT CHANGES;
-- Filter only "large" quantity to reward output
CREATE STREAM s-commodity-reward-one
AS
SELECT rowkey
, itemName
, orderDateTime
, orderLocation
,
orderNumber
, price
, quantity
FROM s-commodity-order-masked
WHERE quantity
> 200
EMIT CHANGES;
-- Push query from reward
SELECT *
FROM s-commodity-reward-one
EMIT CHANGES;
-- Storage sink is just as is
CREATE STREAM s-commodity-storage-one
AS
SELECT *
FROM s-commodity-order-masked
EMIT CHANGES;
-- Push query from storage
SELECT *
FROM s-commodity-reward-one
EMIT CHANGES;
</details>
{::options parse_block_html="false" /}
----
## Row Key
{::options parse_block_html="true" /}
[Back to top](/spring-kafka-bootcamp/ksqldb)
<details><summary markdown="span">Click to expand!</summary>
```bash
-- Create stream with key from kafka record value
CREATE STREAM `s-commodity-order-key-from-value` (
`creditCardNumber` VARCHAR,
`itemName` VARCHAR,
`orderDateTime` VARCHAR,
`orderLocation` VARCHAR,
`orderNumber` VARCHAR KEY,
`price` INT,
`quantity` INT
) WITH (
KAFKA_TOPIC = 't-commodity-order',
VALUE_FORMAT = 'JSON'
);
-- Describe stream
DESCRIBE `s-commodity-order-key-from-value`;
{::options parse_block_html="false" /}
From this point forward, the regular push query:
SELECT *
FROM `s-my-stream`
EMIT CHANGES;
and describe:
DESCRIBE `s-my-stream`;
Will be ommited from statement reference. You can simply change s-my-stream
above to push query / describe.
On the create statement, I might use CREATE OR REPLACE
or CREATE s-my-stream IF NOT EXISTS
, but they are optional.
{::options parse_block_html="true" /}
Click to expand!
-- Create stream for plastic items
CREATE OR REPLACE STREAM `s-commodity-pattern-two-plastic`
AS
SELECT `rowkey`, `itemName`, `orderDateTime`, `orderLocation`,
`orderNumber`, (`price` * `quantity`) as `totalItemAmount`
FROM `s-commodity-order-masked`
WHERE LCASE(`itemName`) LIKE 'plastic%'
EMIT CHANGES;
-- Create stream for non-plastic items
CREATE OR REPLACE STREAM `s-commodity-pattern-two-notplastic`
AS
SELECT `rowkey`, `itemName`, `orderDateTime`, `orderLocation`,
`orderNumber`, (`price` * `quantity`) as `totalItemAmount`
FROM `s-commodity-order-masked`
WHERE LCASE(`itemName`) NOT LIKE 'plastic%'
EMIT CHANGES;
-- Create stream for large & not cheap items
CREATE OR REPLACE STREAM `s-commodity-reward-two`
AS
SELECT `rowkey`, `itemName`, `orderDateTime`, `orderLocation`,
`orderNumber`, `price`, `quantity`
FROM `s-commodity-order-masked`
WHERE `quantity` > 200
AND `price` > 100
EMIT CHANGES;
-- Replace key for storage
CREATE OR REPLACE STREAM `s-commodity-storage-two`
AS
SELECT FROM_BYTES(
TO_BYTES(`orderNumber`, 'utf8'), 'base64'
) AS `base64Rowkey`,
`itemName`, `orderDateTime`, `orderLocation`,
`orderNumber`, `price`, `quantity`
FROM `s-commodity-order-masked`
PARTITION BY FROM_BYTES(
TO_BYTES(`orderNumber`, 'utf8'), 'base64'
)
EMIT CHANGES;
-- describe stream
DESCRIBE `s-commodity-storage-two`;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- Replace key for reward
CREATE OR REPLACE STREAM `s-commodity-reward-four`
AS
SELECT `itemName`, `orderDateTime`, `orderLocation`,
`orderNumber`, `price`, `quantity`
FROM `s-commodity-order-masked`
PARTITION BY `orderLocation`
EMIT CHANGES;
{::options parse_block_html="false" /}
- Copy the content below into a file (for example
commodity-sample.ksql
. File name and extension are free.
{::options parse_block_html="true" /}
Click to expand!
-- Sample ksql script
DROP STREAM IF EXISTS `s-commodity-pattern-from-script`;
DROP STREAM IF EXISTS `s-commodity-reward-from-script`;
DROP STREAM IF EXISTS `s-commodity-storage-from-script`;
DROP STREAM IF EXISTS `s-commodity-order-from-script`;
CREATE STREAM `s-commodity-order-from-script` (
`creditCardNumber` VARCHAR,
`itemName` VARCHAR,
`orderDateTime` VARCHAR,
`orderLocation` VARCHAR,
`orderNumber` VARCHAR KEY,
`price` INT,
`quantity` INT
) WITH (
KAFKA_TOPIC = 't-commodity-order',
VALUE_FORMAT = 'JSON'
);
CREATE OR REPLACE STREAM `s-commodity-pattern-from-script`
AS
SELECT `itemName`, `orderDateTime`, `orderLocation`,
`orderNumber`, (`price` * `quantity`) as `totalItemAmount`
FROM `s-commodity-order-from-script`
WHERE LCASE(`itemName`) LIKE 'wooden%'
OR LCASE(`itemName`) LIKE 'metal%'
EMIT CHANGES;
CREATE OR REPLACE STREAM `s-commodity-reward-from-script`
AS
SELECT `itemName`, `orderDateTime`, `orderLocation`,
`orderNumber`, `price`, `quantity`
FROM `s-commodity-order-from-script`
WHERE `quantity` >= 100
AND `price` >= 500
EMIT CHANGES;
CREATE OR REPLACE STREAM `s-commodity-storage-from-script`
AS
SELECT `itemName`, `orderDateTime`, `orderLocation`,
`orderNumber`, `price`, `quantity`
FROM `s-commodity-order-from-script`
PARTITION BY `orderLocation`
EMIT CHANGES;
SET 'auto.offset.reset'='earliest';
{::options parse_block_html="false" /}
- Put the file
commodity-sample.ksql
under folderdata/kafka-ksqldb/scripts
located on same folder asdocker-compose-full.yml
. The docker compose script will bind that volume into docker volume named/data/scripts
- Go to ksql console
- Execute using
RUN SCRIPT /data/scripts/commodity-sample.ksql;
{::options parse_block_html="true" /}
Click to expand!
CREATE OR REPLACE STREAM `s-commodity-fraud-six`
AS
SELECT CONCAT( SUBSTRING(`orderLocation`, 1, 1), '***' ) as `rowkey`,
(`price` * `quantity`) as `totalValue`
FROM `s-commodity-order`
WHERE LCASE(`orderLocation`) LIKE 'c%'
PARTITION BY CONCAT( SUBSTRING(`orderLocation`, 1, 1), '***' )
EMIT CHANGES;
SELECT *
FROM `s-commodity-fraud-six`
EMIT CHANGES;
{::options parse_block_html="false" /}
- Complete reference here
- Curl statement at this reference is generated using postman on Windows, where each generated curl contain multi lines. To run properly, you might need to adjust the statement (e.g. on Windows make the curl as single line).
- Alternatively, import the statement at postman, or just use postman collection which is downloadable from the course.
Note for Push Query Push query response is streaming response. Postman currently does not stream response. The curl statement will display data when using command-line curl, but when you run the push query using postman, it will not show any response body.
{::options parse_block_html="true" /}
Click to expand!
Drop stream if exists (1)curl --location --request POST "http://localhost:8088/ksql" \
--header "Content-Type: application/json" \
--data-raw "{
\"ksql\": \"DROP STREAM IF EXISTS `s-commodity-feedback-one-good`;\"
}"
Drop stream if exists (2)
curl --location --request POST "http://localhost:8088/ksql" \
--header "Content-Type: application/json" \
--data-raw "{
\"ksql\": \"DROP STREAM IF EXISTS `s-commodity-feedback-word`;\"
}"
Drop stream if exists (3)
curl --location --request POST "http://localhost:8088/ksql" \
--header "Content-Type: application/json" \
--data-raw "{
\"ksql\": \"DROP STREAM IF EXISTS `s-commodity-feedback`;\"
}"
Create base stream
curl --location --request POST "http://localhost:8088/ksql" \
--header "Content-Type: application/json" \
--data-raw "{
\"ksql\": \"CREATE OR REPLACE STREAM `s-commodity-feedback` (`feedback` VARCHAR, `feedbackDateTime` VARCHAR, `location` VARCHAR, `rating` INT) WITH (KAFKA_TOPIC = 't-commodity-feedback', VALUE_FORMAT = 'JSON');\"
}"
Check Base Stream
curl --location --request POST "http://localhost:8088/ksql" \
--header "Content-Type: application/json" \
--data-raw "{
\"ksql\": \"DESCRIBE `s-commodity-feedback`;\"
}"
Create distinct word stream
curl --location --request POST "http://localhost:8088/ksql" \
--header "Content-Type: application/json" \
--data-raw "{
\"ksql\": \"CREATE OR REPLACE STREAM `s-commodity-feedback-word` AS SELECT EXPLODE( ARRAY_DISTINCT( REGEXP_SPLIT_TO_ARRAY( LCASE( REGEXP_REPLACE(`feedback`, '[^a-zA-Z ]', '')), '\\s+'))) AS `word`, `feedbackDateTime`, `location`, `rating` FROM `s-commodity-feedback` EMIT CHANGES;\"
}"
Push query word stream (run this on command-line curl to stream the response, as postman does not support response streaming)
curl --location --request POST "http://localhost:8088/query" \
--header "Content-Type: application/json" \
--data-raw "{
\"ksql\": \"SELECT * FROM `s-commodity-feedback-word` EMIT CHANGES;\"
}"
Create good word stream
curl --location --request POST "http://localhost:8088/ksql" \
--header "Content-Type: application/json" \
--data-raw "{
\"ksql\": \"CREATE OR REPLACE STREAM `s-commodity-feedback-one-good` AS SELECT * FROM `s-commodity-feedback-word` WHERE `word` IN ('happy', 'good', 'helpful') EMIT CHANGES;\"
}"
Push query good word stream (run on command-line curl)
curl --location --request POST "http://localhost:8088/query" \
--header "Content-Type: application/json" \
--data-raw "{
\"ksql\": \"SELECT * FROM `s-commodity-feedback-one-good` EMIT CHANGES;\"
}"
{::options parse_block_html="true" /}
Click to expand!
Create good word using location as keycurl --location --request POST "http://localhost:8088/ksql" \
--header "Content-Type: application/json" \
--data-raw "{
\"ksql\": \"CREATE OR REPLACE STREAM `s-commodity-feedback-two-good` AS SELECT * FROM `s-commodity-feedback-word` WHERE `word` IN ('happy', 'good', 'helpful') PARTITION BY `location` EMIT CHANGES;\"
}"
Print stream
curl --location --request POST "http://localhost:8088/query" \
--header "Content-Type: application/json" \
--data-raw "{
\"ksql\": \"PRINT `s-commodity-feedback-two-good`;\"
}"
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
Create bad word using location as keycurl --location --request POST "http://localhost:8088/ksql" \
--header "Content-Type: application/json" \
--data-raw "{
\"ksql\": \"CREATE OR REPLACE STREAM `s-commodity-feedback-three-bad` AS SELECT * FROM `s-commodity-feedback-word` WHERE `word` IN ('angry', 'sad', 'bad') PARTITION BY `location` EMIT CHANGES;\"
}"
Print bad word stream
curl --location --request POST "http://localhost:8088/query" \
--header "Content-Type: application/json" \
--data-raw "{
\"ksql\": \"PRINT `s-commodity-feedback-three-bad`;\"
}"
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
Create good word streamcurl --location --request POST "http://localhost:8088/ksql" \
--header "Content-Type: application/json" \
--data-raw "{
\"ksql\": \"CREATE OR REPLACE STREAM `s-commodity-feedback-four-good` AS SELECT * FROM `s-commodity-feedback-word` WHERE `word` IN ('happy', 'good', 'helpful') PARTITION BY `location` EMIT CHANGES;\"
}"
Create table for count good words by location
curl --location --request POST "http://localhost:8088/ksql" \
--header "Content-Type: application/json" \
--data-raw "{
\"ksql\": \"CREATE OR REPLACE TABLE `tbl-commodity-feedback-four-good-count` AS SELECT `location`, COUNT(`word`) AS `countGoodWord` FROM `s-commodity-feedback-four-good` GROUP BY `location` EMIT CHANGES;\"
}"
Push query good word table
curl --location --request POST "http://localhost:8088/query" \
--header "Content-Type: application/json" \
--data-raw "{
\"ksql\": \"SELECT * FROM `tbl-commodity-feedback-four-good-count` EMIT CHANGES;\"
}"
Create bad word stream
curl --location --request POST "http://localhost:8088/ksql" \
--header "Content-Type: application/json" \
--data-raw "{
\"ksql\": \"CREATE OR REPLACE STREAM `s-commodity-feedback-four-bad` AS SELECT * FROM `s-commodity-feedback-word` WHERE `word` IN ('angry', 'sad', 'bad') PARTITION BY `location` EMIT CHANGES;\"
}"
Create table for count bad words by location
curl --location --request POST "http://localhost:8088/ksql" \
--header "Content-Type: application/json" \
--data-raw "{
\"ksql\": \"CREATE OR REPLACE TABLE `tbl-commodity-feedback-four-bad-count` AS SELECT `location`, COUNT(`word`) AS `countBadWord` FROM `s-commodity-feedback-four-bad` GROUP BY `location` EMIT CHANGES;\"
}"
Push query bad word table
curl --location --request POST "http://localhost:8088/query" \
--header "Content-Type: application/json" \
--data-raw "{
\"ksql\": \"SELECT * FROM `tbl-commodity-feedback-four-bad-count` EMIT CHANGES;\"
}"
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
Create good word stream
curl --location --request POST "http://localhost:8088/ksql" \
--header "Content-Type: application/json" \
--data-raw "{
\"ksql\": \"CREATE OR REPLACE STREAM `s-commodity-feedback-six-good` AS SELECT * FROM `s-commodity-feedback-word` WHERE `word` IN ('happy', 'good', 'helpful') EMIT CHANGES;\"
}"
Create table for count good words
curl --location --request POST "http://localhost:8088/ksql" \
--header "Content-Type: application/json" \
--data-raw "{
\"ksql\": \"CREATE OR REPLACE TABLE `tbl-commodity-feedback-six-good-count-word` AS SELECT `word`, COUNT(`word`) AS `countGoodWord` FROM `s-commodity-feedback-six-good` GROUP BY `word` EMIT CHANGES;\"
}"
Push query good word table
curl --location --request POST "http://localhost:8088/query" \
--header "Content-Type: application/json" \
--data-raw "{
\"ksql\": \"SELECT * FROM `tbl-commodity-feedback-six-good-count-word` EMIT CHANGES;\"
}"
Create bad word stream
curl --location --request POST "http://localhost:8088/ksql" \
--header "Content-Type: application/json" \
--data-raw "{
\"ksql\": \"CREATE OR REPLACE STREAM `s-commodity-feedback-six-bad` AS SELECT * FROM `s-commodity-feedback-word` WHERE `word` IN ('angry', 'sad', 'bad') EMIT CHANGES;\"
}"
Create table for count bad words
curl --location --request POST "http://localhost:8088/ksql" \
--header "Content-Type: application/json" \
--data-raw "{
\"ksql\": \"CREATE OR REPLACE TABLE `tbl-commodity-feedback-six-bad-count-word` AS SELECT `word`, COUNT(`word`) AS `countBadWord` FROM `s-commodity-feedback-six-bad` GROUP BY `word` EMIT CHANGES;\"
}"
Push query bad word table
curl --location --request POST "http://localhost:8088/query" \
--header "Content-Type: application/json" \
--data-raw "{
\"ksql\": \"SELECT * FROM `tbl-commodity-feedback-six-bad-count` EMIT CHANGES;\"
}"
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- inserting basic data one
INSERT INTO `s-basic-data-one` (
`myBoolean`,
`myString`,
`myAnotherString`,
`myFloat`,
`myDouble`,
`myBigDecimal`,
`myInteger`,
`myLong`
) VALUES (
false,
'This is a string',
'And this is another string',
52.918,
58290.581047,
4421672.5001855,
1057,
2900175
);
-- inserting basic data two
INSERT INTO `s-basic-data-two` (
`myEpochDay`,
`myMillisOfDay`,
`myEpochMillis`
) VALUES (
FROM_DAYS(20967),
PARSE_TIME('18:47:15', 'HH:mm:ss'),
FROM_UNIXTIME(1678610274295)
);
-- inserting basic data three
INSERT INTO `s-basic-data-three` (
`myLocalDate`,
`myLocalTime`,
`myLocalDateTime`,
`myLocalDateCustomFormat`,
`myLocalTimeCustomFormat`,
`myLocalDateTimeCustomFormat`
) VALUES (
'2024-03-07',
'16:52:09',
'2028-11-26T19:44:16',
'27 Aug 2024',
'02:55:17 PM',
'19-Dec-2026 05:42:53 AM'
);
-- inserting basic data four (array of string)
INSERT INTO `s-basic-data-four` (
`myStringArray`
) VALUES (
ARRAY[
'Hello',
'from',
'ksqldb',
'I hope you like it',
'and enjoy the course'
]
);
-- inserting basic data four (list of integer)
INSERT INTO `s-basic-data-four` (
`myIntegerList`
) VALUES (
ARRAY[
1001, 1002, 1003, 1004, 1005, 1006
]
);
-- inserting basic data four (set of double)
INSERT INTO `s-basic-data-four` (
`myDoubleSet`
) VALUES (
ARRAY[
582.59, 1964.094, 287.296, 7933.04, 332.694
]
);
-- inserting basic data five (1)
INSERT INTO `s-basic-data-five` (
`myMapAlpha`
) VALUES (
MAP(
'973' := 'nine seven three',
'628' := 'six two eight',
'510' := 'five one zero'
)
);
-- inserting basic data five (2)
INSERT INTO `s-basic-data-five` (
`myMapAlpha`,
`myMapBeta`
) VALUES (
MAP(
'409' := 'four zero nine',
'152' := 'one five two',
'736' := 'seven three six',
'827' := 'eight two seven'
),
MAP(
'd2c1b963-c18c-4c6e-b85f-3ebc44b93cec' := 'The first element',
'4edf4394-fd33-4643-9ed8-f3354fe96c28' := 'The second element',
'720ecc9e-c81f-4fac-a4d5-752c1d3f3f4f' := 'The third element'
)
);
-- inserting person
INSERT INTO `s-basic-data-person` (
`firstName`,
`lastName`,
`birthDate`,
`contacts`,
`passport`,
`addresses`
) VALUES (
'Kate',
'Bishop',
'2002-11-25',
MAP(
'email' := 'kate.bishop@marvel.com',
'phone' := '999888777'
),
STRUCT(
`number` := 'MCU-PASS-957287759',
`expiryDate` := '2029-08-18'
),
ARRAY[
STRUCT(
`streetAddress` := 'Somewhere in New York',
`country` := 'USA',
`location` := STRUCT(
`latitude` := 40.830063426849705,
`longitude` := -74.14751581646931
)
),
STRUCT(
`streetAddress` := 'Tokyo, just there',
`country` := 'Japan',
`location` := STRUCT(
`latitude` := 35.734078460795104,
`longitude` := 139.62821562631277
)
)
]
);
{::options parse_block_html="true" /}
{::options parse_block_html="true" /}
Click to expand!
-- drop streams (if exists)
TERMINATE ALL;
DROP STREAM IF EXISTS `s-commodity-customer-purchase-all`;
DROP STREAM IF EXISTS `s-commodity-customer-purchase-mobile`;
DROP STREAM IF EXISTS `s-commodity-customer-purchase-web`;
-- Create stream from topic mobile
CREATE STREAM `s-commodity-customer-purchase-mobile`(
` purchaseNumber` VARCHAR,
`purchaseAmount` INT,
`mobileAppVersion` VARCHAR,
`operatingSystem` VARCHAR,
`location` STRUCT<
`latitude` DOUBLE,
`longitude` DOUBLE
>
) WITH (
KAFKA_TOPIC = 't-commodity-customer-purchase-mobile',
VALUE_FORMAT = 'JSON'
);
-- Create stream from topic web
CREATE STREAM `s-commodity-customer-purchase-web`(
`purchaseNumber` VARCHAR,
`purchaseAmount` INT,
`browser` VARCHAR,
`operatingSystem` VARCHAR
) WITH (
KAFKA_TOPIC = 't-commodity-customer-purchase-web',
VALUE_FORMAT = 'JSON'
);
-- Create merged stream from topic mobile + web
CREATE STREAM `s-commodity-customer-purchase-all` (
`purchaseNumber` VARCHAR,
`purchaseAmount` INT,
`mobileAppVersion` VARCHAR,
`operatingSystem` VARCHAR,
`location` STRUCT<
`latitude` DOUBLE,
`longitude` DOUBLE
>,
`browser` VARCHAR,
`source` VARCHAR
) WITH (
KAFKA_TOPIC = 't-ksql-commodity-customer-purchase-all',
PARTITIONS = 2,
VALUE_FORMAT = 'JSON'
);
-- Insert into merged from stream mobile
INSERT INTO `s-commodity-customer-purchase-all`
SELECT `purchaseNumber`,
`purchaseAmount`,
`mobileAppVersion`,
`operatingSystem`,
`location`,
CAST(null AS VARCHAR) AS `browser`,
'mobile' AS `source`
FROM `s-commodity-customer-purchase-mobile`
EMIT CHANGES;
-- Insert into merged from stream web
INSERT INTO `s-commodity-customer-purchase-all`
SELECT `purchaseNumber`,
`purchaseAmount`,
CAST(null AS VARCHAR) AS `mobileAppVersion`,
`operatingSystem`,
CAST(null AS STRUCT<`latitude` DOUBLE, `longitude` DOUBLE>) AS `location`,
`browser`,
'web' AS `source`
FROM `s-commodity-customer-purchase-web`
EMIT CHANGES;
-- Push query from merged stream
SELECT *
FROM `s-commodity-customer-purchase-all`
EMIT CHANGES;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- drop stream (if exists)
TERMINATE ALL;
DROP TABLE IF EXISTS `tbl-commodity-customer-preference-all`;
DROP TABLE IF EXISTS `tbl-commodity-customer-preference-shopping-cart`;
DROP TABLE IF EXISTS `tbl-commodity-customer-preference-wishlist`;
DROP TABLE IF EXISTS `tbl-commodity-customer-cogroup-shopping-cart`;
DROP TABLE IF EXISTS `tbl-commodity-customer-cogroup-wishlist`;
DROP STREAM IF EXISTS `s-commodity-customer-preference-wishlist`;
DROP STREAM IF EXISTS `s-commodity-customer-preference-shopping-cart`;
-- Create stream from topic shopping cart
CREATE STREAM `s-commodity-customer-preference-shopping-cart`(
`customerId` VARCHAR,
`itemName` VARCHAR,
`cartAmount` INT,
`cartDatetime` VARCHAR
) WITH (
KAFKA_TOPIC = 't-commodity-customer-preference-shopping-cart',
VALUE_FORMAT = 'JSON'
);
-- Create stream from topic wishlist
CREATE STREAM `s-commodity-customer-preference-wishlist`(
`customerId` VARCHAR,
`itemName` VARCHAR,
`wishlistDatetime` VARCHAR
) WITH (
KAFKA_TOPIC = 't-commodity-customer-preference-wishlist',
VALUE_FORMAT = 'JSON'
);
-- Create cogroup stream, taking latest cart date time for each item
CREATE TABLE `tbl-commodity-customer-cogroup-shopping-cart`
WITH (
KEY_FORMAT = 'JSON'
)
AS
SELECT `customerId`, `itemName`,
ARRAY_MAX(
COLLECT_LIST(`cartDatetime`)
) AS `latestCartDatetime`
FROM `s-commodity-customer-preference-shopping-cart`
GROUP BY `customerId`, `itemName`
EMIT CHANGES;
-- Create map of <item name, latest add to cart datetime>
CREATE TABLE `tbl-commodity-customer-preference-shopping-cart`
AS
SELECT `customerId`, AS_MAP ( COLLECT_LIST(`itemName`), COLLECT_LIST(`latestCartDatetime`)) AS `cartItems`
FROM `tbl-commodity-customer-cogroup-shopping-cart`
GROUP BY `customerId`
EMIT CHANGES;
-- Check the cogrouped & mapped shopping cart
SELECT *
FROM `tbl-commodity-customer-preference-shopping-cart`
EMIT CHANGES;
-- Create cogroup stream, taking latest wishlist date time for each item
CREATE TABLE `tbl-commodity-customer-cogroup-wishlist`
WITH (
KEY_FORMAT = 'JSON'
)
AS
SELECT `customerId`, `itemName`,
ARRAY_MAX(
COLLECT_LIST(`wishlistDatetime`)
) AS `latestWishlistDatetime`
FROM `s-commodity-customer-preference-wishlist`
GROUP BY `customerId`, `itemName`
EMIT CHANGES;
-- Create map of <item name, latest wishlist datetime>
CREATE TABLE `tbl-commodity-customer-preference-wishlist`
AS
SELECT `customerId`, AS_MAP ( COLLECT_LIST(`itemName`), COLLECT_LIST(`latestWishlistDatetime`)) AS `wishlistItems`
FROM `tbl-commodity-customer-cogroup-wishlist`
GROUP BY `customerId`
EMIT CHANGES;
-- Check the cogrouped & mapped wishlist
SELECT *
FROM `tbl-commodity-customer-preference-wishlist`
EMIT CHANGES;
-- create merged preference from shopping cart + wishlist
CREATE TABLE `tbl-commodity-customer-preference-all`
AS
SELECT `tbl-commodity-customer-preference-shopping-cart`.`customerId` AS `customerId`,
`cartItems`,
`wishlistItems`
FROM `tbl-commodity-customer-preference-shopping-cart`
JOIN `tbl-commodity-customer-preference-wishlist`
ON `tbl-commodity-customer-preference-shopping-cart`.`customerId` = `tbl-commodity-customer-preference-wishlist`.`customerId`
EMIT CHANGES;
-- Check the merged preference
SELECT *
FROM `tbl-commodity-customer-preference-all`
EMIT CHANGES;
{::options parse_block_html="false" /}
See also postman collection sample for pull query.
{::options parse_block_html="true" /}
Click to expand!
-- Pull query to stream (1)
SELECT `myBoolean`, `myDouble`, `myString`
FROM `s-basic-data-one`;
-- Pull query to stream (2)
SELECT *
FROM `s-basic-data-person`;
-- Pull query to table
SELECT *
FROM `tbl-commodity-customer-preference-all`
WHERE `customerId` = 'Linda';
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- drop stream if exists
TERMINATE ALL;
DROP TABLE IF EXISTS `tbl-commodity-flashsale-vote-one-result`;
DROP TABLE IF EXISTS `tbl-commodity-flashsale-vote-user-item`;
DROP STREAM IF EXISTS `s-commodity-flashsale-vote-user-item`;
DROP STREAM IF EXISTS `s-commodity-flashsale-vote`;
-- Create stream from underlying topic
CREATE STREAM `s-commodity-flashsale-vote` (
`customerId` VARCHAR,
`itemName` VARCHAR
) WITH (
KAFKA_TOPIC = 't-commodity-flashsale-vote',
VALUE_FORMAT = 'JSON'
);
-- Create table to know latest user vote
CREATE TABLE `tbl-commodity-flashsale-vote-user-item`
AS
SELECT `customerId`, LATEST_BY_OFFSET(`itemName`) AS `itemName`
FROM `s-commodity-flashsale-vote`
GROUP BY `customerId`;
-- table for item and vote count, based on latest user vote
CREATE TABLE `tbl-commodity-flashsale-vote-one-result`
AS
SELECT `itemName`, COUNT(`customerId`) AS `votesCount`
FROM `tbl-commodity-flashsale-vote-user-item`
GROUP BY `itemName`
EMIT CHANGES;
-- push query
SELECT *
FROM `tbl-commodity-flashsale-vote-one-result`
EMIT CHANGES;
-- pull query
SELECT *
FROM `tbl-commodity-flashsale-vote-one-result`;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- describe extended
DESCRIBE `s-commodity-flashsale-vote` EXTENDED;
-- drop stream if exists
TERMINATE ALL;
DROP TABLE IF EXISTS `tbl-commodity-flashsale-vote-two-result`;
DROP TABLE IF EXISTS `tbl-commodity-flashsale-vote-user-item-timestamp`;
-- Create table to know latest user vote, on certain time range
-- adjust the rowtime parameter to the time you run the lesson
CREATE TABLE `tbl-commodity-flashsale-vote-user-item-timestamp`
AS
SELECT `customerId`, LATEST_BY_OFFSET(`itemName`) AS `itemName`
FROM `s-commodity-flashsale-vote`
WHERE rowtime >= '2024-09-06T22:00:00'
AND rowtime < '2024-09-06T23:00:00'
GROUP BY `customerId`;
-- table for item and vote count, based on latest user vote, on certain time range
CREATE TABLE `tbl-commodity-flashsale-vote-two-result`
AS
SELECT `itemName`, COUNT(`customerId`) AS `votesCount`
FROM `tbl-commodity-flashsale-vote-user-item-timestamp`
GROUP BY `itemName`
EMIT CHANGES;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- drop stream if exists
TERMINATE ALL;
DROP TABLE IF EXISTS `tbl-commodity-feedback-rating-one`;
-- Create table for average rating by country
CREATE TABLE `tbl-commodity-feedback-rating-one`
AS
SELECT `location`, AVG(`rating`) as `averageRating`
FROM `s-commodity-feedback`
GROUP BY `location`
EMIT CHANGES;
-- Sample using HAVING
SET 'auto.offset.reset'='earliest';
SELECT `location`, AVG(`rating`) as `averageRating`
FROM `s-commodity-feedback`
GROUP BY `location`
HAVING AVG(`rating`) <= 3.5
EMIT CHANGES;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- drop table if exists
DROP TABLE IF EXISTS `tbl-commodity-feedback-rating-two`;
-- Create table for average rating and histogram
CREATE TABLE `tbl-commodity-feedback-rating-two`
AS
SELECT `location`,
AVG(`rating`) as `averageRating`,
HISTOGRAM( CAST(`rating` AS VARCHAR) ) as `histogramRating`
FROM `s-commodity-feedback`
GROUP BY `location`
EMIT CHANGES;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- drop stream if exists
TERMINATE ALL;
DROP TABLE IF EXISTS `tbl-commodity-inventory-total-two`;
DROP STREAM IF EXISTS `s-commodity-inventory-movement`;
DROP STREAM IF EXISTS `s-commodity-inventory`;
-- Create stream from underlying topic
CREATE STREAM `s-commodity-inventory` (
`item` VARCHAR,
`location` VARCHAR,
`quantity` INT,
`transactionTime` VARCHAR,
`type` VARCHAR
) WITH (
KAFKA_TOPIC = 't-commodity-inventory',
VALUE_FORMAT = 'JSON'
);
-- Create stream based on type (ADD is positive, REMOVE is negative)
CREATE STREAM `s-commodity-inventory-movement`
AS
SELECT `item`,
CASE
WHEN `type` = 'ADD' THEN `quantity`
WHEN `type` = 'REMOVE' THEN (-1 * `quantity`)
ELSE 0
END AS `quantity`
FROM `s-commodity-inventory`
EMIT CHANGES;
-- Create inventory table
CREATE TABLE `tbl-commodity-inventory-total-two`
AS
SELECT `item`, SUM(`quantity`) AS `totalQuantity`
FROM `s-commodity-inventory-movement`
GROUP BY `item`
EMIT CHANGES;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- Push query showing rowtime (base stream)
SELECT `item`,
`location`,
`quantity`,
`type`,
`transactionTime`,
FORMAT_TIMESTAMP( FROM_UNIXTIME(rowtime), 'yyyy-MM-dd''T''HH:mm:ss') AS `defaultRowtime`
FROM `s-commodity-inventory`
EMIT CHANGES;
-- drop stream if exists
TERMINATE ALL;
DROP STREAM IF EXISTS `s-commodity-inventory-four`;
-- Create stream with custom timestamp
CREATE STREAM `s-commodity-inventory-four`
WITH (
TIMESTAMP = '`transactionTime`',
TIMESTAMP_FORMAT = 'yyyy-MM-dd''T''HH:mm:ss.SSSZ'
)
AS
SELECT `item`,
`location`,
`quantity`,
`transactionTime`,
`type`
FROM `s-commodity-inventory`
EMIT CHANGES;
-- Push query showing rowtime (extracted from field)
SELECT `item`,
`location`,
`quantity`,
`type`,
`transactionTime`,
FORMAT_TIMESTAMP( FROM_UNIXTIME(rowtime), 'yyyy-MM-dd''T''HH:mm:ss.SSSZ') AS `extractedTime`
FROM `s-commodity-inventory-four`
EMIT CHANGES;
-- describe stream
DESCRIBE `s-commodity-inventory-four` EXTENDED;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- drop stream if exists
TERMINATE ALL;
DROP TABLE IF EXISTS `tbl-commodity-inventory-total-five`;
DROP STREAM IF EXISTS `s-commodity-inventory-five-movement`;
-- Create stream with custom timestamp and quantity movement
CREATE STREAM `s-commodity-inventory-five-movement`
WITH (
TIMESTAMP = '`transactionTime`',
TIMESTAMP_FORMAT = 'yyyy-MM-dd''T''HH:mm:ss.SSSZ'
)
AS
SELECT `item`,
CASE
WHEN `type` = 'ADD' THEN `quantity`
WHEN `type` = 'REMOVE' THEN (-1 * `quantity`)
ELSE 0
END AS `quantity`,
`transactionTime`
FROM `s-commodity-inventory`
EMIT CHANGES;
-- Tumbling window
CREATE TABLE `tbl-commodity-inventory-total-five`
AS
SELECT FORMAT_TIMESTAMP( FROM_UNIXTIME(windowstart), 'yyyy-MM-dd''T''HH:mm:ss.SSSZ') AS `windowStartTime`,
FORMAT_TIMESTAMP( FROM_UNIXTIME(windowend), 'yyyy-MM-dd''T''HH:mm:ss.SSSZ') AS `windowEndTime`,
`item`, SUM(`quantity`) `totalQuantity`
FROM `s-commodity-inventory-five-movement`
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY `item`
EMIT CHANGES;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- drop stream if exists
TERMINATE ALL;
DROP TABLE IF EXISTS `tbl-commodity-inventory-total-six`;
DROP STREAM IF EXISTS `s-commodity-inventory-six-movement`;
-- Create stream with custom timestamp and quantity movement
CREATE STREAM `s-commodity-inventory-six-movement`
WITH (
TIMESTAMP = '`transactionTime`',
TIMESTAMP_FORMAT = 'yyyy-MM-dd''T''HH:mm:ss.SSSZ'
)
AS
SELECT `item`,
CASE
WHEN `type` = 'ADD' THEN `quantity`
WHEN `type` = 'REMOVE' THEN (-1 * `quantity`)
ELSE 0
END AS `quantity`,
`transactionTime`
FROM `s-commodity-inventory`
EMIT CHANGES;
-- Hopping window
CREATE TABLE `tbl-commodity-inventory-total-six`
AS
SELECT FORMAT_TIMESTAMP( FROM_UNIXTIME(windowstart), 'yyyy-MM-dd''T''HH:mm:ss.SSSZ') AS `windowStartTime`,
FORMAT_TIMESTAMP( FROM_UNIXTIME(windowend), 'yyyy-MM-dd''T''HH:mm:ss.SSSZ') AS `windowEndTime`,
`item`, SUM(`quantity`) `totalQuantity`
FROM `s-commodity-inventory-six-movement`
WINDOW HOPPING (SIZE 1 HOUR, ADVANCE BY 20 MINUTES)
GROUP BY `item`
EMIT CHANGES;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- drop stream if exists
TERMINATE ALL;
DROP TABLE IF EXISTS `tbl-commodity-inventory-total-seven`;
DROP STREAM IF EXISTS `s-commodity-inventory-seven-movement`;
-- Create stream with custom timestamp and quantity movement
CREATE STREAM `s-commodity-inventory-seven-movement`
WITH (
TIMESTAMP = '`transactionTime`',
TIMESTAMP_FORMAT = 'yyyy-MM-dd''T''HH:mm:ss.SSSZ'
)
AS
SELECT `item`,
CASE
WHEN `type` = 'ADD' THEN `quantity`
WHEN `type` = 'REMOVE' THEN (-1 * `quantity`)
ELSE 0
END AS `quantity`,
`transactionTime`
FROM `s-commodity-inventory`
EMIT CHANGES;
-- Session window
CREATE TABLE `tbl-commodity-inventory-total-seven`
AS
SELECT FORMAT_TIMESTAMP( FROM_UNIXTIME(windowstart), 'yyyy-MM-dd''T''HH:mm:ss.SSSZ') AS `windowStartTime`,
FORMAT_TIMESTAMP( FROM_UNIXTIME(windowend), 'yyyy-MM-dd''T''HH:mm:ss.SSSZ') AS `windowEndTime`,
`item`, SUM(`quantity`) `totalQuantity`
FROM `s-commodity-inventory-seven-movement`
WINDOW SESSION (30 MINUTES)
GROUP BY `item`
EMIT CHANGES;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- drop stream if exists
TERMINATE ALL;
DROP STREAM IF EXISTS `s-commodity-join-order-payment-one`;
DROP STREAM IF EXISTS `s-commodity-online-order`;
DROP STREAM IF EXISTS `s-commodity-online-payment`;
-- Create stream online order
CREATE STREAM `s-commodity-online-order` (
`orderDateTime` VARCHAR,
`onlineOrderNumber` VARCHAR KEY,
`totalAmount` INT,
`username` VARCHAR
)
WITH (
TIMESTAMP = '`orderDateTime`',
TIMESTAMP_FORMAT = 'yyyy-MM-dd''T''HH:mm:ss.SSSZ',
KAFKA_TOPIC = 't-commodity-online-order',
VALUE_FORMAT = 'JSON'
);
-- Create stream online payment
CREATE STREAM `s-commodity-online-payment` (
`paymentDateTime` VARCHAR,
`onlineOrderNumber` VARCHAR KEY,
`paymentMethod` VARCHAR,
`paymentNumber` VARCHAR
)
WITH (
TIMESTAMP = '`paymentDateTime`',
TIMESTAMP_FORMAT = 'yyyy-MM-dd''T''HH:mm:ss.SSSZ',
KAFKA_TOPIC = 't-commodity-online-payment',
VALUE_FORMAT = 'JSON'
);
-- Inner join with no grace period
CREATE STREAM `s-commodity-join-order-payment-one`
AS
SELECT `s-commodity-online-order`.`onlineOrderNumber` AS `onlineOrderNumber`,
PARSE_TIMESTAMP(`s-commodity-online-order`.`orderDateTime`, 'yyyy-MM-dd''T''HH:mm:ss.SSSZ') AS `orderDateTime`,
`s-commodity-online-order`.`totalAmount` AS `totalAmount`,
`s-commodity-online-order`.`username` AS `username`,
PARSE_TIMESTAMP(`s-commodity-online-payment`.`paymentDateTime`, 'yyyy-MM-dd''T''HH:mm:ss.SSSZ') AS `paymentDateTime`,
`s-commodity-online-payment`.`paymentMethod` AS `paymentMethod`,
`s-commodity-online-payment`.`paymentNumber` AS `paymentNumber`
FROM `s-commodity-online-order`
INNER JOIN `s-commodity-online-payment`
WITHIN 1 HOUR GRACE PERIOD 0 MILLISECOND
ON `s-commodity-online-order`.`onlineOrderNumber` = `s-commodity-online-payment`.`onlineOrderNumber`
EMIT CHANGES;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- drop stream if exists
TERMINATE ALL;
DROP STREAM IF EXISTS `s-commodity-join-order-payment-two`;
-- Left join
CREATE STREAM `s-commodity-join-order-payment-two`
AS
SELECT `s-commodity-online-order`.`onlineOrderNumber` AS `onlineOrderNumber`,
PARSE_TIMESTAMP(`s-commodity-online-order`.`orderDateTime`, 'yyyy-MM-dd''T''HH:mm:ss.SSSZ') AS `orderDateTime`,
`s-commodity-online-order`.`totalAmount` AS `totalAmount`,
`s-commodity-online-order`.`username` AS `username`,
PARSE_TIMESTAMP(`s-commodity-online-payment`.`paymentDateTime`, 'yyyy-MM-dd''T''HH:mm:ss.SSSZ') AS `paymentDateTime`,
`s-commodity-online-payment`.`paymentMethod` AS `paymentMethod`,
`s-commodity-online-payment`.`paymentNumber` AS `paymentNumber`
FROM `s-commodity-online-order`
LEFT JOIN `s-commodity-online-payment`
WITHIN 1 HOUR GRACE PERIOD 0 MILLISECOND
ON `s-commodity-online-order`.`onlineOrderNumber` = `s-commodity-online-payment`.`onlineOrderNumber`
EMIT CHANGES;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- drop stream if exists
TERMINATE ALL;
DROP STREAM IF EXISTS `s-commodity-join-order-payment-three`;
-- Full outer join
CREATE STREAM `s-commodity-join-order-payment-three`
AS
SELECT ROWKEY as `syntheticKey`,
`s-commodity-online-order`.`onlineOrderNumber` AS `onlineOrderNumber`,
PARSE_TIMESTAMP(`s-commodity-online-order`.`orderDateTime`, 'yyyy-MM-dd''T''HH:mm:ss.SSSZ') AS `orderDateTime`,
`s-commodity-online-order`.`totalAmount` AS `totalAmount`,
`s-commodity-online-order`.`username` AS `username`,
PARSE_TIMESTAMP(`s-commodity-online-payment`.`paymentDateTime`, 'yyyy-MM-dd''T''HH:mm:ss.SSSZ') AS `paymentDateTime`,
`s-commodity-online-payment`.`paymentMethod` AS `paymentMethod`,
`s-commodity-online-payment`.`paymentNumber` AS `paymentNumber`
FROM `s-commodity-online-order`
FULL JOIN `s-commodity-online-payment`
WITHIN 1 HOUR
ON `s-commodity-online-order`.`onlineOrderNumber` = `s-commodity-online-payment`.`onlineOrderNumber`
EMIT CHANGES;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- drop stream if exists
TERMINATE ALL;
DROP TABLE IF EXISTS `t-commodity-web-vote-one-result-color`;
DROP TABLE IF EXISTS `t-commodity-web-vote-one-result-layout`;
DROP TABLE IF EXISTS `tbl-commodity-web-vote-username-color`;
DROP TABLE IF EXISTS `tbl-commodity-web-vote-username-layout`;
DROP STREAM IF EXISTS `s-commodity-web-vote-color`;
DROP STREAM IF EXISTS `s-commodity-web-vote-layout`;
-- Create stream from underlying topic (color)
CREATE STREAM `s-commodity-web-vote-color` (
`username` VARCHAR,
`color` VARCHAR,
`voteDateTime` VARCHAR
) WITH (
KAFKA_TOPIC = 't-commodity-web-vote-color',
VALUE_FORMAT = 'JSON'
);
-- Create stream from underlying topic (layout)
CREATE STREAM `s-commodity-web-vote-layout` (
`username` VARCHAR,
`layout` VARCHAR,
`voteDateTime` VARCHAR
) WITH (
KAFKA_TOPIC = 't-commodity-web-vote-layout',
VALUE_FORMAT = 'JSON'
);
-- Create table to know latest user vote (color)
CREATE TABLE `tbl-commodity-web-vote-username-color`
AS
SELECT `username`, LATEST_BY_OFFSET(`color`) AS `color`
FROM `s-commodity-web-vote-color`
GROUP BY `username` EMIT CHANGES;
-- Create table to know latest user vote (layout)
CREATE TABLE `tbl-commodity-web-vote-username-layout`
AS
SELECT `username`, LATEST_BY_OFFSET(`layout`) AS `layout`
FROM `s-commodity-web-vote-layout`
GROUP BY `username` EMIT CHANGES;
-- table for item and vote count, based on latest user vote (color only)
CREATE TABLE `t-commodity-web-vote-one-result-color`
AS
SELECT `color`,
COUNT(`tbl-commodity-web-vote-username-color`.`username`) AS `votesCount`
FROM `tbl-commodity-web-vote-username-color`
INNER JOIN `tbl-commodity-web-vote-username-layout`
ON `tbl-commodity-web-vote-username-color`.`username` = `tbl-commodity-web-vote-username-layout`.`username`
GROUP BY `color`
EMIT CHANGES;
-- table for item and vote count, based on latest user vote (layout only)
CREATE TABLE `t-commodity-web-vote-one-result-layout`
AS
SELECT `layout`,
COUNT(`tbl-commodity-web-vote-username-layout`.`username`) AS `votesCount`
FROM `tbl-commodity-web-vote-username-color`
INNER JOIN `tbl-commodity-web-vote-username-layout`
ON `tbl-commodity-web-vote-username-color`.`username` = `tbl-commodity-web-vote-username-layout`.`username`
GROUP BY `layout`
EMIT CHANGES;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- drop stream if exists
TERMINATE ALL;
DROP TABLE IF EXISTS `t-commodity-web-vote-two-result-color`;
DROP TABLE IF EXISTS `t-commodity-web-vote-two-result-layout`;
-- table for item and vote count, based on latest user vote (color only)
CREATE TABLE `t-commodity-web-vote-two-result-color`
AS
SELECT `color`,
COUNT(`tbl-commodity-web-vote-username-color`.`username`) AS `votesCount`
FROM `tbl-commodity-web-vote-username-color`
LEFT JOIN `tbl-commodity-web-vote-username-layout`
ON `tbl-commodity-web-vote-username-color`.`username` = `tbl-commodity-web-vote-username-layout`.`username`
GROUP BY `color`
EMIT CHANGES;
-- table for item and vote count, based on latest user vote (layout only)
CREATE TABLE `t-commodity-web-vote-two-result-layout`
AS
SELECT `layout`,
COUNT(`tbl-commodity-web-vote-username-layout`.`username`) AS `votesCount`
FROM `tbl-commodity-web-vote-username-color`
LEFT JOIN `tbl-commodity-web-vote-username-layout`
ON `tbl-commodity-web-vote-username-color`.`username` = `tbl-commodity-web-vote-username-layout`.`username`
GROUP BY `layout`
EMIT CHANGES;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- drop stream if exists
TERMINATE ALL;
DROP TABLE IF EXISTS `t-commodity-web-vote-three-result-color`;
DROP TABLE IF EXISTS `t-commodity-web-vote-three-result-layout`;
-- table for item and vote count, based on latest user vote (color only)
CREATE TABLE `t-commodity-web-vote-three-result-color`
AS
SELECT `color`,
COUNT(`tbl-commodity-web-vote-username-color`.`username`) AS `votesCount`
FROM `tbl-commodity-web-vote-username-color`
FULL JOIN `tbl-commodity-web-vote-username-layout`
ON `tbl-commodity-web-vote-username-color`.`username` = `tbl-commodity-web-vote-username-layout`.`username`
GROUP BY `color`
EMIT CHANGES;
-- table for item and vote count, based on latest user vote (layout only)
CREATE TABLE `t-commodity-web-vote-three-result-layout`
AS
SELECT `layout`,
COUNT(`tbl-commodity-web-vote-username-layout`.`username`) AS `votesCount`
FROM `tbl-commodity-web-vote-username-color`
FULL JOIN `tbl-commodity-web-vote-username-layout`
ON `tbl-commodity-web-vote-username-color`.`username` = `tbl-commodity-web-vote-username-layout`.`username`
GROUP BY `layout`
EMIT CHANGES;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- drop stream if exists
TERMINATE ALL;
DROP STREAM IF EXISTS `s-commodity-premium-offer-one`;
DROP TABLE IF EXISTS `tbl-commodity-premium-user`;
DROP STREAM IF EXISTS `s-commodity-premium-purchase`;
DROP STREAM IF EXISTS `s-commodity-premium-user`;
-- Create stream from underlying topic (purchase)
CREATE STREAM `s-commodity-premium-purchase` (
`username` VARCHAR,
`purchaseNumber` VARCHAR,
`item` VARCHAR
) WITH (
KAFKA_TOPIC = 't-commodity-premium-purchase',
VALUE_FORMAT = 'JSON'
);
-- Create stream from underlying topic (user)
CREATE STREAM `s-commodity-premium-user` (
`username` VARCHAR,
`level` VARCHAR
) WITH (
KAFKA_TOPIC = 't-commodity-premium-user',
VALUE_FORMAT = 'JSON'
);
-- table for latest user level
CREATE TABLE `tbl-commodity-premium-user`
AS
SELECT `username`, LATEST_BY_OFFSET(`level`) AS `level`
FROM `s-commodity-premium-user`
GROUP BY `username`
EMIT CHANGES;
-- join stream / table, filter only 'gold' and 'diamond' users
CREATE STREAM `s-commodity-premium-offer-one`
AS
SELECT `s-commodity-premium-purchase`.`username` AS `username`,
`level`, `purchaseNumber`
FROM `s-commodity-premium-purchase`
INNER JOIN `tbl-commodity-premium-user`
ON `s-commodity-premium-purchase`.`username` = `tbl-commodity-premium-user`.`username`
WHERE LCASE(`level`) IN ('gold', 'diamond')
EMIT CHANGES;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- drop stream if exists
TERMINATE ALL;
DROP STREAM IF EXISTS `s-commodity-premium-offer-two`;
-- join stream / table, filter only 'gold' and 'diamond' users
CREATE STREAM `s-commodity-premium-offer-two`
AS
SELECT `s-commodity-premium-purchase`.`username` AS `username`,
`level`, `purchaseNumber`
FROM `s-commodity-premium-purchase`
LEFT JOIN `tbl-commodity-premium-user`
ON `s-commodity-premium-purchase`.`username` = `tbl-commodity-premium-user`.`username`
WHERE `level` IS NULL
OR LCASE(`level`) IN ('gold', 'diamond')
EMIT CHANGES;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- drop stream if exists
TERMINATE ALL;
DROP STREAM IF EXISTS `s-commodity-subscription-offer-one`;
DROP STREAM IF EXISTS `s-commodity-subscription-offer-two`;
DROP TABLE IF EXISTS `tbl-commodity-subscription-user-repartition`;
DROP TABLE IF EXISTS `tbl-commodity-subscription-user`;
DROP STREAM IF EXISTS `s-commodity-subscription-user`;
DROP STREAM IF EXISTS `s-commodity-subscription-purchase`;
-- Create stream from underlying topic (user)
CREATE STREAM `s-commodity-subscription-user` (
`username` VARCHAR KEY,
`duration` VARCHAR
) WITH (
KAFKA_TOPIC = 't-commodity-subscription-user',
VALUE_FORMAT = 'JSON'
);
-- Create stream from underlying topic (purchase)
CREATE STREAM `s-commodity-subscription-purchase` (
`username` VARCHAR KEY,
`subscriptionNumber` VARCHAR
) WITH (
KAFKA_TOPIC = 't-commodity-subscription-purchase',
VALUE_FORMAT = 'JSON'
);
-- table for latest user subscription
CREATE TABLE `tbl-commodity-subscription-user`
AS
SELECT `username`, LATEST_BY_OFFSET(`duration`) AS `duration`
FROM `s-commodity-subscription-user`
GROUP BY `username`
EMIT CHANGES;
-- see partition number (5 on stream)
DESCRIBE `s-commodity-subscription-purchase` EXTENDED;
-- see partition number (2 on table)
DESCRIBE `tbl-commodity-subscription-user` EXTENDED;
-- join stream / table (different partition number, will fail)
CREATE STREAM `s-commodity-subscription-offer-one`
AS
SELECT `s-commodity-subscription-purchase`.`username` AS `username`,
`subscriptionNumber`,`duration`
FROM `s-commodity-subscription-purchase`
INNER JOIN `tbl-commodity-subscription-user`
ON `s-commodity-subscription-purchase`.`username` = `tbl-commodity-subscription-user`.`username`
EMIT CHANGES;
-- re-partition table for latest user duration (5 partitions)
CREATE TABLE `tbl-commodity-subscription-user-repartition`
WITH (
PARTITIONS = 5
)
AS
SELECT `username`, LATEST_BY_OFFSET(`duration`) AS `duration`
FROM `s-commodity-subscription-user`
GROUP BY `username`
EMIT CHANGES;
-- see partition number (5 on table)
DESCRIBE `tbl-commodity-subscription-user-repartition` EXTENDED;
-- join stream / re-partitioned table (same partition number)
CREATE STREAM `s-commodity-subscription-offer-two`
AS
SELECT `s-commodity-subscription-purchase`.`username` AS `username`,
`subscriptionNumber`,`duration`
FROM `s-commodity-subscription-purchase`
INNER JOIN `tbl-commodity-subscription-user-repartition`
ON `s-commodity-subscription-purchase`.`username` = `tbl-commodity-subscription-user-repartition`.`username`
EMIT CHANGES;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- show functions
SHOW FUNCTIONS;
-- describe function
DESCRIBE FUNCTION LOAN_INSTALLMENT;
-- drop stream if exists
TERMINATE ALL;
DROP STREAM IF EXISTS `s-commodity-loan-request` DELETE TOPIC;
-- Create new stream and new topic
CREATE STREAM `s-commodity-loan-request` (
`username` VARCHAR,
`principalLoanAmount` DOUBLE,
`annualInterestRate` DOUBLE,
`loanPeriodMonth` INT
) WITH (
KAFKA_TOPIC = 't-commodity-loan-request',
PARTITIONS = 2,
VALUE_FORMAT = 'JSON'
);
-- insert data
INSERT INTO `s-commodity-loan-request` (
`username`,
`principalLoanAmount`,
`annualInterestRate`,
`loanPeriodMonth`
) VALUES (
'danny',
1000,
12,
12
);
INSERT INTO `s-commodity-loan-request` (
`username`,
`principalLoanAmount`,
`annualInterestRate`,
`loanPeriodMonth`
) VALUES (
'melvin',
1500,
10.5,
24
);
INSERT INTO `s-commodity-loan-request` (
`username`,
`principalLoanAmount`,
`annualInterestRate`,
`loanPeriodMonth`
) VALUES (
'thomas',
3500,
11.2,
36
);
-- try the UDF. Make sure udf jar already uploaded
SET 'auto.offset.reset'='earliest';
SELECT `username`, `principalLoanAmount`, `annualInterestRate`, `loanPeriodMonth`,
LOAN_INSTALLMENT(`principalLoanAmount`, `annualInterestRate`, `loanPeriodMonth`) AS `monthlyLoanInstallment`
FROM `s-commodity-loan-request`
EMIT CHANGES;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- drop stream if exists
DROP STREAM IF EXISTS `s-commodity-loan-submission` DELETE TOPIC;
-- create stream with struct
CREATE STREAM `s-commodity-loan-submission` (
`loanSubmission` STRUCT<
`principalLoanAmount` DOUBLE,
`annualInterestRate` DOUBLE,
`loanPeriodMonth` INT,
`loanApprovedDate` VARCHAR
>
) WITH (
KAFKA_TOPIC = 't-commodity-loan-submission',
PARTITIONS = 2,
VALUE_FORMAT = 'JSON'
);
-- insert dummy data
INSERT INTO `s-commodity-loan-submission` (
`loanSubmission`
) VALUES (
STRUCT(
`principalLoanAmount` := 6000,
`annualInterestRate` := 11.5,
`loanPeriodMonth` := 24,
`loanApprovedDate` := '2024-11-21'
)
);
-- run query
SELECT LOAN_INSTALLMENT_SCHEDULE(`loanSubmission`)
FROM `s-commodity-loan-submission`;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- drop stream if exists
TERMINATE ALL;
DROP STREAM IF EXISTS `s-commodity-loan-payment-latency` DELETE TOPIC;
DROP STREAM IF EXISTS `s-commodity-loan-payment` DELETE TOPIC;
-- create stream for payment
CREATE STREAM `s-commodity-loan-payment` (
`loanNumber` VARCHAR,
`installmentDueDate` VARCHAR,
`installmentPaidDate` VARCHAR
) WITH (
KAFKA_TOPIC = 't-commodity-loan-payment',
PARTITIONS = 2,
VALUE_FORMAT = 'JSON'
);
-- create stream with payment latency. Positive latency means late payment (bad).
CREATE STREAM `s-commodity-loan-payment-latency`
AS
SELECT `loanNumber`,
`installmentDueDate`,
`installmentPaidDate`,
UNIX_DATE(PARSE_DATE(`installmentPaidDate`, 'yyyy-MM-dd')) -
UNIX_DATE(PARSE_DATE(`installmentDueDate`, 'yyyy-MM-dd')) AS `paymentLatency`
FROM `s-commodity-loan-payment`;
-- insert dummy data
INSERT INTO `s-commodity-loan-payment` (
`loanNumber`,
`installmentDueDate`,
`installmentPaidDate`
) VALUES (
'LOAN-111',
'2024-04-17',
'2024-04-15'
);
INSERT INTO `s-commodity-loan-payment` (
`loanNumber`,
`installmentDueDate`,
`installmentPaidDate`
) VALUES (
'LOAN-111',
'2024-05-17',
'2024-05-05'
);
INSERT INTO `s-commodity-loan-payment` (
`loanNumber`,
`installmentDueDate`,
`installmentPaidDate`
) VALUES (
'LOAN-111',
'2024-06-17',
'2024-06-09'
);
INSERT INTO `s-commodity-loan-payment` (
`loanNumber`,
`installmentDueDate`,
`installmentPaidDate`
) VALUES (
'LOAN-111',
'2024-07-17',
'2024-07-17'
);
INSERT INTO `s-commodity-loan-payment` (
`loanNumber`,
`installmentDueDate`,
`installmentPaidDate`
) VALUES (
'LOAN-111',
'2024-08-17',
'2024-08-15'
);
-- insert dummy data 2
INSERT INTO `s-commodity-loan-payment` (
`loanNumber`,
`installmentDueDate`,
`installmentPaidDate`
) VALUES (
'LOAN-222',
'2024-04-14',
'2024-04-15'
);
INSERT INTO `s-commodity-loan-payment` (
`loanNumber`,
`installmentDueDate`,
`installmentPaidDate`
) VALUES (
'LOAN-222',
'2024-05-14',
'2024-05-05'
);
INSERT INTO `s-commodity-loan-payment` (
`loanNumber`,
`installmentDueDate`,
`installmentPaidDate`
) VALUES (
'LOAN-222',
'2024-06-14',
'2024-06-19'
);
INSERT INTO `s-commodity-loan-payment` (
`loanNumber`,
`installmentDueDate`,
`installmentPaidDate`
) VALUES (
'LOAN-222',
'2024-07-14',
'2024-07-22'
);
INSERT INTO `s-commodity-loan-payment` (
`loanNumber`,
`installmentDueDate`,
`installmentPaidDate`
) VALUES (
'LOAN-222',
'2024-08-14',
'2024-08-15'
);
-- run query
SET 'auto.offset.reset'='earliest';
SELECT `loanNumber`, LOAN_RATING(`paymentLatency`) AS `loanRating`
FROM `s-commodity-loan-payment-latency`
GROUP BY `loanNumber`
EMIT CHANGES;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- drop stream if exists
TERMINATE ALL;
DROP STREAM IF EXISTS `s-avro01`;
-- create stream from topic
CREATE STREAM `s-avro01`
WITH (
KAFKA_TOPIC = 'sc-avro01',
VALUE_FORMAT = 'AVRO'
);
-- describe stream
DESCRIBE `s-avro01`;
-- push query
SELECT *
FROM `s-avro01`
EMIT CHANGES;
-- insert some dummy data, will fail
INSERT INTO `s-avro01` (
fullName,
maritalStatus,
active
) VALUES (
'Clark Kent',
'MARRIED',
true
);
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- drop stream if exists
TERMINATE ALL;
DROP STREAM IF EXISTS `s-avro-member` DELETE TOPIC;
-- create stream from topic
CREATE STREAM `s-avro-member` (
`email` VARCHAR,
`username` VARCHAR,
`birthDate` VARCHAR,
`membership` VARCHAR
)
WITH (
KAFKA_TOPIC = 'sc-avro-member',
PARTITIONS = 1,
VALUE_FORMAT = 'AVRO'
);
-- insert some dummy data
INSERT INTO `s-avro-member` (
`email`,
`username`,
`birthDate`,
`membership`
) VALUES (
'thor@asgard.com',
'god_of_thunder',
'1900-05-19',
'black'
);
INSERT INTO `s-avro-member` (
`email`,
`username`,
`birthDate`,
`membership`
) VALUES (
'loki@asgard.com',
'iamloki',
'1914-11-05',
'black'
);
INSERT INTO `s-avro-member` (
`email`,
`username`,
`birthDate`,
`membership`
) VALUES (
'kang@universe.com',
'kang.the.conqueror',
'1912-10-05',
'white'
);
INSERT INTO `s-avro-member` (
`email`,
`username`,
`birthDate`,
`membership`
) VALUES (
'zeus@olympus.com',
'therealgodofthunder',
'1852-01-05',
'white'
);
INSERT INTO `s-avro-member` (
`email`,
`username`,
`birthDate`,
`membership`
) VALUES (
'athena@olympus.com',
'prettybutdeadly',
'1922-08-25',
'blue'
);
-- query stream
SELECT *
FROM `s-avro-member`
EMIT CHANGES;
-- stream for black membership only
DROP STREAM IF EXISTS `s-avro-member-black`;
CREATE STREAM `s-avro-member-black`
WITH (
VALUE_FORMAT = 'AVRO'
)
AS
SELECT *
FROM `s-avro-member`
WHERE LCASE(`membership`) = 'black';
DESCRIBE `s-avro-member-black`;
-- table for count membership only
DROP TABLE IF EXISTS `tbl-avro-member-count`;
CREATE TABLE `tbl-avro-member-count`
WITH (
VALUE_FORMAT = 'AVRO'
)
AS
SELECT `membership`, COUNT(`email`) AS `countMember`
FROM `s-avro-member`
GROUP BY `membership`
EMIT CHANGES;
DESCRIBE `tbl-avro-member-count`;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- drop stream if exists
TERMINATE ALL;
DROP STREAM IF EXISTS `s-avro-member-json` DELETE TOPIC;
-- create stream from topic
CREATE STREAM `s-avro-member-json`
WITH (
VALUE_FORMAT = 'JSON'
)
AS
SELECT *
FROM `s-avro-member`
EMIT CHANGES;
INSERT INTO `s-avro-member` (
`email`,
`username`,
`birthDate`,
`membership`
) VALUES (
'kara@dc.com',
'supergirl',
'1993-11-05',
'black'
);
-- create new json stream
CREATE STREAM `s-power-json` (
`power` VARCHAR,
`level` INT
) WITH (
VALUE_FORMAT = 'JSON',
KAFKA_TOPIC = 't-power-json',
PARTITIONS = 1
);
-- dummy data
INSERT INTO `s-power-json` (
`power`,
`level`
) VALUES (
'healing',
6
);
INSERT INTO `s-power-json` (
`power`,
`level`
) VALUES (
'energy projection',
8
);
INSERT INTO `s-power-json` (
`power`,
`level`
) VALUES (
'mind control',
7
);
-- create avro stream
CREATE STREAM `s-power-avro`
WITH (
VALUE_FORMAT = 'AVRO'
)
AS
SELECT *
FROM `s-power-json`
EMIT CHANGES;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- show existing connectors
SHOW CONNECTORS;
-- show connector detail
DESCRIBE CONNECTOR `source-spooldir-csv`;
-- create dummy source connector
CREATE SOURCE CONNECTOR `source-spooldir-dummy-csv`
WITH (
'connector.class'='com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector',
'topic'='t-spooldir-csv-demo',
'input.file.pattern'='dummy-.*.csv',
'input.path'='/data/inputs',
'error.path'='/data/errors',
'finished.path'='/data/processed',
'schema.generation.enabled'='true',
'csv.first.row.as.header'='true',
'empty.poll.wait.ms'='10000'
);
-- create dummy sink connector
CREATE SINK CONNECTOR `sink-postgresql-dummy-csv`
WITH (
'connector.class'='io.confluent.connect.jdbc.JdbcSinkConnector',
'topics'='t-spooldir-csv-demo',
'confluent.topic.bootstrap.servers'='192.168.0.7:9092',
'connection.url'='jdbc:postgresql://192.168.0.7:5432/postgres',
'connection.user'='postgres',
'connection.password'='postgres',
'table.name.format'='kafka_employees',
'auto.create'=true,
'auto.evolve'=true,
'pk.mode'='record_value',
'pk.fields'='employee_id',
'insert.mode'='upsert'
);
-- drop dummy connector
DROP CONNECTOR IF EXISTS `source-spooldir-dummy-csv`;
DROP CONNECTOR IF EXISTS `sink-postgresql-dummy-csv`;
{::options parse_block_html="false" /}
{::options parse_block_html="true" /}
Click to expand!
-- Set the offset to earliest
SET 'auto.offset.reset'='earliest';
-- Push query
SELECT *
FROM `s-java-client`
EMIT CHANGES;
{::options parse_block_html="false" /}