Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor docs improvements #548

Merged
merged 4 commits into from
Dec 19, 2017
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 53 additions & 54 deletions docs/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ CREATE STREAM pageviews \
(viewtime BIGINT, \
userid VARCHAR, \
pageid VARCHAR) \
WITH (kafka_topic='pageviews-topic', \
value_format='DELIMITED');
WITH (KAFKA_TOPIC='pageviews-topic', \
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also switched e.g. KSQL decks to use uppercase for properties in the WITH clause.

VALUE_FORMAT='DELIMITED');
```

The above statement creates a stream with three columns on the Kafka topic that is named `pageviews`. You should also
Expand All @@ -54,9 +54,9 @@ CREATE STREAM pageviews \
(viewtime BIGINT, \
userid VARCHAR, \
pageid VARCHAR) \
WITH (kafka_topic='pageviews-topic', \
value_format='DELIMITED', \
key='pageid');
WITH (KAFKA_TOPIC='pageviews-topic', \
VALUE_FORMAT='DELIMITED', \
KEY='pageid');
```

**Associating Kafka message timestamps:**
Expand All @@ -71,10 +71,10 @@ CREATE STREAM pageviews \
(viewtime BIGINT, \
userid VARCHAR, \
pageid VARCHAR) \
WITH (kafka_topic='pageviews-topic', \
value_format='DELIMITED', \
key='pageid', \
timestamp='viewtime');
WITH (KAFKA_TOPIC='pageviews-topic', \
VALUE_FORMAT='DELIMITED', \
KEY='pageid', \
TIMESTAMP='viewtime');
```


Expand All @@ -92,8 +92,8 @@ CREATE TABLE users \
userid VARCHAR, \
interests array<VARCHAR>, \
contact_info map<VARCHAR, VARCHAR>) \
WITH (kafka_topic='users-topic', \
value_format='JSON');
WITH (KAFKA_TOPIC='users-topic', \
VALUE_FORMAT='JSON');
```

As you can see the above table has, next to columns with primitive data types, a column of `array` type and another
Expand Down Expand Up @@ -121,9 +121,9 @@ The following statement will generate a new stream, `pageviews_transformed` with

```sql
CREATE STREAM pageviews_transformed \
WITH (timestamp='viewtime', \
partitions=5, \
value_format='JSON') AS \
WITH (TIMESTAMP='viewtime', \
PARTITIONS=5, \
VALUE_FORMAT='JSON') AS \
SELECT viewtime, \
userid, \
pageid, \
Expand All @@ -136,9 +136,9 @@ Use a `[ WHERE condition ]` clause to select a subset of data. If you want to r

```sql
CREATE STREAM pageviews_transformed_priority_1 \
WITH (timestamp='viewtime', \
partitions=5, \
value_format='JSON') AS \
WITH (TIMESTAMP='viewtime', \
PARTITIONS=5, \
VALUE_FORMAT='JSON') AS \
SELECT viewtime, \
userid, \
pageid, \
Expand All @@ -150,9 +150,9 @@ CREATE STREAM pageviews_transformed_priority_1 \

```sql
CREATE STREAM pageviews_transformed_priority_2 \
WITH (timestamp='viewtime', \
partitions=5, \
value_format='JSON') AS \
WITH (TIMESTAMP='viewtime', \
PARTITIONS=5, \
VALUE_FORMAT='JSON') AS \
SELECT viewtime, \
userid, \
pageid, \
Expand Down Expand Up @@ -294,78 +294,77 @@ interaction with the schema registry.

Currently KSQL supports Avro data in the values of Kafka messages:

| | Message Key | Message Value |
|-----------------|-------------------|------------------------------|
| Avro format | Not supported yet | Supported (read and write) |
| | Message Key | Message Value |
|--------------|-------------------|----------------------------|
| Avro format | Not supported yet | Supported (read and write) |

What is not supported yet:


* Message keys in Avro format. Message keys in KSQL are always interpreted as STRING format,
which means KSQL will ignore any Avro schemas that have been registered for message keys.

which means KSQL will ignore any Avro schemas that have been registered for message keys.
* Avro schemas with nested fields because KSQL does not yet supported nested columns.


#### Configuring KSQL for Avro

To use Avro in KSQL you must configure the schema registry's API endpoint via setting the
`ksql.schema.registry.url` config variable in the `properties` file that you use to start KSQL
(default: `http://localhost:8081`).
To use Avro in KSQL you must configure the schema registry's API endpoint via setting the `ksql.schema.registry.url`
config variable in the KSQL configuration file that you use to start KSQL (default: `http://localhost:8081`).
You should not use `SET` to configure the registry endpoint.

#### Using Avro in KSQL

First you must ensure that:
#### Using Avro in KSQL

First you must ensure that:

1. Confluent Schema Registry is up and running.

2. `ksql.schema.registry.url` is set correctly in KSQL (see previous section).

Then you can use `CREATE STREAM` and `CREATE TABLE` statements to read from Kafka topics with
Avro-formatted data and `CREATE STREAM AS` and `CREATE TABLE AS` statements to write Avro-formatted data into Kafka topics.
Then you can use `CREATE STREAM` and `CREATE TABLE` statements to read from Kafka topics with Avro-formatted data and
`CREATE STREAM AS` and `CREATE TABLE AS` statements to write Avro-formatted data into Kafka topics.

Example: Create a new stream `pageviews` by reading from a Kafka topic with Avro-formatted messages.

```sql
CREATE STREAM pageviews
WITH (kafka_topic='pageviews-avro-topic',
value_format='AVRO');
WITH (KAFKA_TOPIC='pageviews-avro-topic',
VALUE_FORMAT='AVRO');
```

Note how in the above example you don't need to define any columns or data types in the CREATE
statement because KSQL will automatically infer this information from the latest registered Avro
schema for topic `pageviews-avro-topic` (i.e., the latest schema at the time the statement is first executed).
Note how in the above example you don't need to define any columns or data types in the CREATE
statement because KSQL will automatically infer this information from the latest registered Avro
schema for topic `pageviews-avro-topic` (i.e., the latest schema at the time the statement is first executed).

If you want to create a STREAM or TABLE with only a subset of all the available fields in the
Avro schema, then you must explicitly define the columns and data types.
If you want to create a STREAM or TABLE with only a subset of all the available fields in the
Avro schema, then you must explicitly define the columns and data types.

Example: Create a new stream `pageviews_reduced`, similar to the previous example, but with only a
few of all the available fields in the Avro data (here, only the two columns `viewtime` and `pageid` are picked).
few of all the available fields in the Avro data (here, only the two columns `viewtime` and `pageid` are picked).

```sql
CREATE STREAM pageviews_reduced (viewtime BIGINT, pageid VARCHAR)
WITH (kafka_topic='pageviews-avro-topic',
value_format='AVRO');
WITH (KAFKA_TOPIC='pageviews-avro-topic',
VALUE_FORMAT='AVRO');
```

KSQL allows you to work with streams and tables regardless of their underlying data format. This
means that you can easily mix and match streams and tables with different data formats
(e.g. join a stream backed by Avro data with a table backed by JSON data) and also convert easily between data formats.
KSQL allows you to work with streams and tables regardless of their underlying data format. This
means that you can easily mix and match streams and tables with different data formats
(e.g. join a stream backed by Avro data with a table backed by JSON data) and also convert easily between data formats.

Example: Convert a JSON stream into an Avro stream.

```sql
CREATE STREAM pageviews_json (viewtime BIGINT, userid VARCHAR, pageid VARCHAR)
WITH (kafka_topic='pageviews-json-topic', value_format='JSON');
WITH (KAFKA_TOPIC='pageviews-json-topic', VALUE_FORMAT='JSON');

CREATE STREAM pageviews_avro
WITH (value_format = 'AVRO') AS
WITH (VALUE_FORMAT = 'AVRO') AS
SELECT * FROM pageviews_json;
```

Note how you only need to set `value_format` to Avro to achieve the data conversion. Also, KSQL
will automatically generate an appropriate Avro schema for the new `pageviews_avro` stream,
and it will also register the schema with Confluent Schema Registry.
Note how you only need to set `VALUE_FORMAT` to Avro to achieve the data conversion. Also, KSQL
will automatically generate an appropriate Avro schema for the new `pageviews_avro` stream,
and it will also register the schema with Confluent Schema Registry.


## Configuring KSQL
Expand All @@ -391,10 +390,10 @@ Common configuration properties that you might want to change from their default
The default value in KSQL is `10000000` (~ 10 MB);

- fail.on.deserialization.error:
When set to false (the default), any errors that occur when deserializing a record will result in the the error being logged and the record being dropped.
When set to false (the default), any errors that occur when deserializing a record will result in the the error being
logged and the record being dropped.
If you wish to halt processing on deserialization errors you should set this to true.




## Running KSQL

Expand Down