Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support avro types for nullable fields generated by Connect. #592

Merged

Conversation

hjafarpour
Copy link
Contributor

With this PR KSQL will be able to handle union field types that were generated by connect for nullable fields. For instance, here is an example of the type connect creates:
"name": "table",
"type": [
"null",
"string"
],
KSQL will use string as the type of this field.

@hjafarpour hjafarpour self-assigned this Jan 4, 2018
return getKSQLSchemaForAvroSchema(schemaList.get(0));
}
}
throw new KsqlException(String.format("Cannot find correct type for avro type: %s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case it would help to have a more informative error message about the restrictions we place on unions:
(at most 2 types, one must be null)

assertThat("Incorrect field schema.", schema.fields().get(7).schema(), equalTo(Schema
.STRING_SCHEMA));
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some negative cases for union validation?

@rmoff
Copy link
Contributor

rmoff commented Jan 5, 2018

I've tested this against two Kafka Connect-sourced topics.
Works fine for data coming from from our JDBC Source connector.
Fails for data coming from Oracle's GoldenGate connector:

ksql> CREATE STREAM LOGON WITH (KAFKA_TOPIC='ora-ogg-SOE2-LOGON-avro', VALUE_FORMAT='AVRO');
io.confluent.ksql.parser.exception.ParseFailedException: line 2:2: extraneous input 'TABLE' expecting {'ADD', 'APPROXIMATE', 'AT', 'CONFIDENCE', 'NO', 'SUBSTRING', 'POSITION', 'TINYINT', 'SMALLINT', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'OVER', 'PARTITION', 'RANGE', 'ROWS', 'PRECEDING', 'FOLLOWING', 'CURRENT', 'ROW', 'VIEW', 'REPLACE', 'GRANT', 'REVOKE', 'PRIVILEGES', 'PUBLIC', 'OPTION', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'GRAPHVIZ', 'LOGICAL', 'DISTRIBUTED', 'TRY', 'SHOW', 'TABLES', 'SCHEMAS', 'CATALOGS', 'COLUMNS', 'COLUMN', 'USE', 'PARTITIONS', 'FUNCTIONS', 'TO', 'SYSTEM', 'BERNOULLI', 'POISSONIZED', 'TABLESAMPLE', 'RESCALED', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'START', 'TRANSACTION', 'COMMIT', 'ROLLBACK', 'WORK', 'ISOLATION', 'LEVEL', 'SERIALIZABLE', 'REPEATABLE', 'COMMITTED', 'UNCOMMITTED', 'READ', 'WRITE', 'ONLY', 'CALL', 'NFD', 'NFC', 'NFKD', 'NFKC', 'IF', 'NULLIF', 'COALESCE', IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
ksql>

My guess is because the Avro schema has a table column?

{
  "type": "record",
  "name": "LOGON",
  "namespace": "ORCL.SOE2",
  "fields": [
    {
      "name": "table",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "op_type",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "op_ts",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "current_ts",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "pos",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "LOGON_ID",
      "type": [
        "null",
        "double"
      ],
      "default": null
    },
    {
      "name": "CUSTOMER_ID",
      "type": [
        "null",
        "double"
      ],
      "default": null
    },
    {
      "name": "LOGON_DATE",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ],
  "connect.name": "ORCL.SOE2.LOGON"
}

@rmoff
Copy link
Contributor

rmoff commented Jan 5, 2018

Fortunately GoldenGate can be configured to drop the table field (gg.kafkaconnect.name.includeTableName=false). With this field dropped, KSQL 0.3 + this PR successfully works !

ksql> CREATE STREAM LOGON WITH (KAFKA_TOPIC='ora-ogg2-SOE2-LOGON-avro', VALUE_FORMAT='AVRO');

 Message
----------------
 Stream created
----------------
ksql> select * from logon;
1515151793394 | @178724_31809_2000-11-08 23:08:51 | I | 2017-09-13 14:51:36.000000 | 2018-01-05 06:29:53.221000 | 00000000010105263863 | 178724.0 | 31809.0 | 2000-11-08 23:08:51
1515151793394 | @178725_91808_2009-06-29 02:38:11 | I | 2017-09-13 14:51:36.000000 | 2018-01-05 06:29:53.394000 | 00000000010105264035 | 178725.0 | 91808.0 | 2009-06-29 02:38:11
1515151793395 | @178726_78742_2007-11-06 15:29:38 | I | 2017-09-13 14:51:36.000000 | 2018-01-05 06:29:53.394001 | 00000000010105264167 | 178726.0 | 78742.0 | 2007-11-06 15:29:38

Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably want to add a test that fails if the union type isn't as expected, i.e, there is no null, doesn't have exactly 2 fields etc etc

@hjafarpour
Copy link
Contributor Author

@dguy @rodesai made your suggested changes.

Copy link
Contributor

@rodesai rodesai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Schema schema = SerDeUtil.getSchemaFromAvro(avroSchemaStr);
fail();
} catch (KsqlException ksqlException) {
assertThat("", ksqlException.getMessage(), equalTo("Union type cannot have more than two "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you aren't going to provide a message in the assertThat(..) you can omit the first string param, i.e., just use assertThat(ksqlException.getMessage(), equalTo("...."))

Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@hjafarpour hjafarpour merged commit 2c20654 into confluentinc:master Jan 10, 2018
satybald pushed a commit to satybald/ksql that referenced this pull request Jan 17, 2018
…ntinc#592)

* Support avro types for nullable fields  generated by Connect.

* Applied review feedback.

* Applied review feedback.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants