From 8529e123c25fe0ed9c134dafa0119db6834899b5 Mon Sep 17 00:00:00 2001 From: "Michael G. Noll" Date: Tue, 19 Dec 2017 09:23:21 +0100 Subject: [PATCH 1/4] Consistent uppercasing of properties in the WHERE clause --- docs/examples.md | 54 ++++++++++++++++++++++++------------------------ 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/docs/examples.md b/docs/examples.md index 13b563af8530..5050849f09f7 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -36,8 +36,8 @@ CREATE STREAM pageviews \ (viewtime BIGINT, \ userid VARCHAR, \ pageid VARCHAR) \ - WITH (kafka_topic='pageviews-topic', \ - value_format='DELIMITED'); + WITH (KAFKA_TOPIC='pageviews-topic', \ + VALUE_FORMAT='DELIMITED'); ``` The above statement creates a stream with three columns on the Kafka topic that is named `pageviews`. You should also @@ -54,9 +54,9 @@ CREATE STREAM pageviews \ (viewtime BIGINT, \ userid VARCHAR, \ pageid VARCHAR) \ - WITH (kafka_topic='pageviews-topic', \ - value_format='DELIMITED', \ - key='pageid'); + WITH (KAFKA_TOPIC='pageviews-topic', \ + VALUE_FORMAT='DELIMITED', \ + KEY='pageid'); ``` **Associating Kafka message timestamps:** @@ -71,10 +71,10 @@ CREATE STREAM pageviews \ (viewtime BIGINT, \ userid VARCHAR, \ pageid VARCHAR) \ - WITH (kafka_topic='pageviews-topic', \ - value_format='DELIMITED', \ - key='pageid', \ - timestamp='viewtime'); + WITH (KAFKA_TOPIC='pageviews-topic', \ + VALUE_FORMAT='DELIMITED', \ + KEY='pageid', \ + TIMESTAMP='viewtime'); ``` @@ -92,8 +92,8 @@ CREATE TABLE users \ userid VARCHAR, \ interests array, \ contact_info map) \ - WITH (kafka_topic='users-topic', \ - value_format='JSON'); + WITH (KAFKA_TOPIC='users-topic', \ + VALUE_FORMAT='JSON'); ``` As you can see the above table has, next to columns with primitive data types, a column of `array` type and another @@ -121,9 +121,9 @@ The following statement will generate a new stream, `pageviews_transformed` with ```sql CREATE STREAM pageviews_transformed \ - WITH (timestamp='viewtime', \ - partitions=5, \ - value_format='JSON') AS \ + WITH (TIMESTAMP='viewtime', \ + PARTITIONS=5, \ + VALUE_FORMAT='JSON') AS \ SELECT viewtime, \ userid, \ pageid, \ @@ -136,9 +136,9 @@ Use a `[ WHERE condition ]` clause to select a subset of data. If you want to r ```sql CREATE STREAM pageviews_transformed_priority_1 \ - WITH (timestamp='viewtime', \ - partitions=5, \ - value_format='JSON') AS \ + WITH (TIMESTAMP='viewtime', \ + PARTITIONS=5, \ + VALUE_FORMAT='JSON') AS \ SELECT viewtime, \ userid, \ pageid, \ @@ -150,9 +150,9 @@ CREATE STREAM pageviews_transformed_priority_1 \ ```sql CREATE STREAM pageviews_transformed_priority_2 \ - WITH (timestamp='viewtime', \ - partitions=5, \ - value_format='JSON') AS \ + WITH (TIMESTAMP='viewtime', \ + PARTITIONS=5, \ + VALUE_FORMAT='JSON') AS \ SELECT viewtime, \ userid, \ pageid, \ @@ -328,8 +328,8 @@ Example: Create a new stream `pageviews` by reading from a Kafka topic with Avro ```sql CREATE STREAM pageviews - WITH (kafka_topic='pageviews-avro-topic', - value_format='AVRO'); + WITH (KAFKA_TOPIC='pageviews-avro-topic', + VALUE_FORMAT='AVRO'); ``` Note how in the above example you don't need to define any columns or data types in the CREATE @@ -344,8 +344,8 @@ Example: Create a new stream `pageviews_reduced`, similar to the previous exampl ```sql CREATE STREAM pageviews_reduced (viewtime BIGINT, pageid VARCHAR) - WITH (kafka_topic='pageviews-avro-topic', - value_format='AVRO'); + WITH (KAFKA_TOPIC='pageviews-avro-topic', + VALUE_FORMAT='AVRO'); ``` KSQL allows you to work with streams and tables regardless of their underlying data format. This @@ -356,14 +356,14 @@ Example: Convert a JSON stream into an Avro stream. ```sql CREATE STREAM pageviews_json (viewtime BIGINT, userid VARCHAR, pageid VARCHAR) - WITH (kafka_topic='pageviews-json-topic', value_format='JSON'); + WITH (KAFKA_TOPIC='pageviews-json-topic', VALUE_FORMAT='JSON'); CREATE STREAM pageviews_avro - WITH (value_format = 'AVRO') AS + WITH (VALUE_FORMAT = 'AVRO') AS SELECT * FROM pageviews_json; ``` - Note how you only need to set `value_format` to Avro to achieve the data conversion. Also, KSQL + Note how you only need to set `VALUE_FORMAT` to Avro to achieve the data conversion. Also, KSQL will automatically generate an appropriate Avro schema for the new `pageviews_avro` stream, and it will also register the schema with Confluent Schema Registry. From 1fb3a5c626e15de15d0a098eda8656684f27184a Mon Sep 17 00:00:00 2001 From: "Michael G. Noll" Date: Tue, 19 Dec 2017 09:26:59 +0100 Subject: [PATCH 2/4] Correct formatting and identation --- docs/examples.md | 54 +++++++++++++++++++++++------------------------- 1 file changed, 26 insertions(+), 28 deletions(-) diff --git a/docs/examples.md b/docs/examples.md index 5050849f09f7..39f7adb119d2 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -294,35 +294,33 @@ interaction with the schema registry. Currently KSQL supports Avro data in the values of Kafka messages: -| | Message Key | Message Value | -|-----------------|-------------------|------------------------------| -| Avro format | Not supported yet | Supported (read and write) | +| | Message Key | Message Value | +|--------------|-------------------|----------------------------| +| Avro format | Not supported yet | Supported (read and write) | What is not supported yet: - * Message keys in Avro format. Message keys in KSQL are always interpreted as STRING format, -which means KSQL will ignore any Avro schemas that have been registered for message keys. - + which means KSQL will ignore any Avro schemas that have been registered for message keys. * Avro schemas with nested fields because KSQL does not yet supported nested columns. #### Configuring KSQL for Avro - To use Avro in KSQL you must configure the schema registry's API endpoint via setting the - `ksql.schema.registry.url` config variable in the `properties` file that you use to start KSQL - (default: `http://localhost:8081`). +To use Avro in KSQL you must configure the schema registry's API endpoint via setting the `ksql.schema.registry.url` +config variable in the KSQL configuration file that you use to start KSQL (default: `http://localhost:8081`). - #### Using Avro in KSQL - First you must ensure that: +#### Using Avro in KSQL + +First you must ensure that: 1. Confluent Schema Registry is up and running. 2. `ksql.schema.registry.url` is set correctly in KSQL (see previous section). - Then you can use `CREATE STREAM` and `CREATE TABLE` statements to read from Kafka topics with - Avro-formatted data and `CREATE STREAM AS` and `CREATE TABLE AS` statements to write Avro-formatted data into Kafka topics. +Then you can use `CREATE STREAM` and `CREATE TABLE` statements to read from Kafka topics with Avro-formatted data and +`CREATE STREAM AS` and `CREATE TABLE AS` statements to write Avro-formatted data into Kafka topics. Example: Create a new stream `pageviews` by reading from a Kafka topic with Avro-formatted messages. @@ -332,15 +330,15 @@ CREATE STREAM pageviews VALUE_FORMAT='AVRO'); ``` - Note how in the above example you don't need to define any columns or data types in the CREATE - statement because KSQL will automatically infer this information from the latest registered Avro - schema for topic `pageviews-avro-topic` (i.e., the latest schema at the time the statement is first executed). +Note how in the above example you don't need to define any columns or data types in the CREATE +statement because KSQL will automatically infer this information from the latest registered Avro +schema for topic `pageviews-avro-topic` (i.e., the latest schema at the time the statement is first executed). - If you want to create a STREAM or TABLE with only a subset of all the available fields in the - Avro schema, then you must explicitly define the columns and data types. +If you want to create a STREAM or TABLE with only a subset of all the available fields in the +Avro schema, then you must explicitly define the columns and data types. Example: Create a new stream `pageviews_reduced`, similar to the previous example, but with only a - few of all the available fields in the Avro data (here, only the two columns `viewtime` and `pageid` are picked). +few of all the available fields in the Avro data (here, only the two columns `viewtime` and `pageid` are picked). ```sql CREATE STREAM pageviews_reduced (viewtime BIGINT, pageid VARCHAR) @@ -348,9 +346,9 @@ CREATE STREAM pageviews_reduced (viewtime BIGINT, pageid VARCHAR) VALUE_FORMAT='AVRO'); ``` - KSQL allows you to work with streams and tables regardless of their underlying data format. This - means that you can easily mix and match streams and tables with different data formats - (e.g. join a stream backed by Avro data with a table backed by JSON data) and also convert easily between data formats. +KSQL allows you to work with streams and tables regardless of their underlying data format. This +means that you can easily mix and match streams and tables with different data formats +(e.g. join a stream backed by Avro data with a table backed by JSON data) and also convert easily between data formats. Example: Convert a JSON stream into an Avro stream. @@ -363,9 +361,9 @@ CREATE STREAM pageviews_avro SELECT * FROM pageviews_json; ``` - Note how you only need to set `VALUE_FORMAT` to Avro to achieve the data conversion. Also, KSQL - will automatically generate an appropriate Avro schema for the new `pageviews_avro` stream, - and it will also register the schema with Confluent Schema Registry. +Note how you only need to set `VALUE_FORMAT` to Avro to achieve the data conversion. Also, KSQL +will automatically generate an appropriate Avro schema for the new `pageviews_avro` stream, +and it will also register the schema with Confluent Schema Registry. ## Configuring KSQL @@ -391,10 +389,10 @@ Common configuration properties that you might want to change from their default The default value in KSQL is `10000000` (~ 10 MB); - fail.on.deserialization.error: - When set to false (the default), any errors that occur when deserializing a record will result in the the error being logged and the record being dropped. + When set to false (the default), any errors that occur when deserializing a record will result in the the error being + logged and the record being dropped. If you wish to halt processing on deserialization errors you should set this to true. - - + ## Running KSQL From 7b05097188a6e15784203e303e30c03d84e2ce7a Mon Sep 17 00:00:00 2001 From: "Michael G. Noll" Date: Tue, 19 Dec 2017 09:27:16 +0100 Subject: [PATCH 3/4] Clarify that you should not use SET to configure the schema registry endpoint --- docs/examples.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/examples.md b/docs/examples.md index 39f7adb119d2..0edd42c0ae59 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -309,6 +309,7 @@ What is not supported yet: To use Avro in KSQL you must configure the schema registry's API endpoint via setting the `ksql.schema.registry.url` config variable in the KSQL configuration file that you use to start KSQL (default: `http://localhost:8081`). +You should not use `SET` to configure the registry endpoint. #### Using Avro in KSQL From c54a95d8a564190f3bcde02edaa5e5c79b98ebc6 Mon Sep 17 00:00:00 2001 From: "Michael G. Noll" Date: Tue, 19 Dec 2017 09:30:55 +0100 Subject: [PATCH 4/4] Clarify Avro configuration section --- docs/examples.md | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/docs/examples.md b/docs/examples.md index 0edd42c0ae59..13d3245ef948 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -307,9 +307,9 @@ What is not supported yet: #### Configuring KSQL for Avro -To use Avro in KSQL you must configure the schema registry's API endpoint via setting the `ksql.schema.registry.url` -config variable in the KSQL configuration file that you use to start KSQL (default: `http://localhost:8081`). -You should not use `SET` to configure the registry endpoint. +You must configure the API endpoint of Confluent Schema Registry by setting `ksql.schema.registry.url` +(default: `http://localhost:8081`) in the KSQL configuration file that you use to start KSQL. +You *should not* use `SET` to configure the registry endpoint. #### Using Avro in KSQL @@ -317,7 +317,6 @@ You should not use `SET` to configure the registry endpoint. First you must ensure that: 1. Confluent Schema Registry is up and running. - 2. `ksql.schema.registry.url` is set correctly in KSQL (see previous section). Then you can use `CREATE STREAM` and `CREATE TABLE` statements to read from Kafka topics with Avro-formatted data and