Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed the regression for stream-table join. Table should always read … #495

Merged

Conversation

hjafarpour
Copy link
Contributor

…from the beginning of the topic.

@apurvam
Copy link
Contributor

apurvam commented Nov 30, 2017

When was the regression introduced? what's the effect? That the join is empty? Something else?

@apurvam
Copy link
Contributor

apurvam commented Nov 30, 2017

This also seems like a good time to add a unit test :)

@hjafarpour
Copy link
Contributor Author

The regression was introduced when we were refactoring the code. When we join stream-table, the table always should be read from the beginning otherwise the join will return null values for the right hand side.
I also fixed the integration test for the join to enforce this.

@hjafarpour
Copy link
Contributor Author

retest this please

final SchemaKStream schemaKStream = right.buildStream(builder, ksqlConfig, kafkaTopicClient, metastoreUtil, functionRegistry, props);
Map<String, Object> joinTableProps = new HashMap<>();
joinTableProps.putAll(props);
joinTableProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "EARLIEST");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should "EARLIEST" be "earliest"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but in KSQL it's case insensitive. Regardless, I think it's better to use the correct one. Changed it to earliest :)

Copy link

@bluemonk3y bluemonk3y left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expose the property and LGTM

final SchemaKStream schemaKStream = right.buildStream(builder, ksqlConfig, kafkaTopicClient, metastoreUtil, functionRegistry, props);
Map<String, Object> joinTableProps = new HashMap<>();
joinTableProps.putAll(props);
joinTableProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "EARLIEST");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably be better to expose this as a KSQLConfig property - and default it to EARLIEST. There might be some edge cases where tables are massive and cannot be bootstrapped etc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auto.offset.reset is configurable for KSQL, however, this one is a special case for Stream-Table join and is required to always be earliest and if we have it as latest the join would be incorrect.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats what I mean - this is a special case and we dont always know what people will do with the system. It is possible that there will be an instance where the hardcoded 'earliest' is not wanted. exposing a new property 'table.auto.offset.reset' would work around this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean but in this case earliest is the only correct option and setting it to anything else will result in incorrect query results. So even if we have table.auto.offset.reset it's value cannot be changed and should always be earliest even when we have the table change topic containing huge number of rows.

Copy link
Contributor

@apurvam apurvam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch and for adding the unit test. I left a question on the test itself, but it looks good to me otherwise.

("EARLIEST")) {
return mock(SchemaKTable.class);
} else {
throw new KsqlException("auto.offset.reset should be set to EARLIEST.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if this regresses again, this exception will be thrown in the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if the Stream-table join changes the table auto.offset.reset this will catch it.

@hjafarpour hjafarpour merged commit a02f0ae into confluentinc:4.0.x Dec 1, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants