Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24630][SS] Support SQLStreaming in Spark #22575

Closed
wants to merge 9 commits into from

Conversation

jackylee-ch
Copy link
Contributor

What changes were proposed in this pull request?

This patch propose new support of SQLStreaming in Spark, Please refer SPARK-24630 for more details.

This patch supports:

  1. Support create stream table, which can be used as Source and Sink in SQLStreaming;
    create table kafka_sql_test using kafka options( isStreaming = 'true', subscribe = 'topic', kafka.bootstrap.servers = 'localhost:9092')
  2. Add keyword 'STREAM' in sql to support SQLStreaming queries;
    select stream * from kafka_sql_test
  3. As for those complex queries, they all can be supported as long as SQL and StructStreaming support.

How was this patch tested?

Some UTs are added to verify sqlstreaming.

@WangTaoTheTonic
Copy link
Contributor

ok to test

@WangTaoTheTonic
Copy link
Contributor

Is this still a WIP?
Using isStreaming tag in DDL to mark if a table is streaming or not is brilliant. It keeps compatible with batch queries sql.
If possible, I think not introducing STREAM keywords in DML is better to go. Maybe we can use properties(like isStreaming) of table participated in query to generate StreamingRelation or batch relation. How do you think?
SQLStreaming is important part in SS in my perspective, as it makes SS more complete and usable. Thanks for your work!

@jackylee-ch jackylee-ch changed the title [SPARK-24630][SS][WIP] Support SQLStreaming in Spark [SPARK-24630][SS] Support SQLStreaming in Spark Oct 11, 2018
@jackylee-ch
Copy link
Contributor Author

@WangTaoTheTonic
Adding 'stream' keyword has two purposes:

  • Mark the entire sql query as a stream query and generate the SQLStreaming plan tree.
  • Mark the table type as UnResolvedStreamRelation. Parse the table as StreamingRelation or other Relation, especially in the stream join batch queries, such as kafka join mysql.

Besides, the keyword 'stream' makes it easier to express StructStreaming with pure SQL.
A little example to show importances of 'stream': read stream from kafka stream table, and join mysql to count user message

  • with 'stream'

    • select stream kafka_sql_test.name, count(door) from kafka_sql_test inner join mysql_test on kafka_sql_test.name == mysql_test.name group by kafka_sql_test.name
      • It will be regarded as Streaming Query using Console Sink, the kafka_sql_test will be parsed as StreamingRelation and mysql_test will be parsed as JDBCRelation, not Streaming Relation.
    • insert into csv_sql_table select stream kafka_sql_test.name, count(door) from kafka_sql_test inner join mysql_test on kafka_sql_test.name == mysql_test.name group by kafka_sql_test.name
      • It will be regarded as Streaming Query using FileStream Sink, the kafka_sql_test will be parsed as StreamingRelation and mysql_test will be parsed as JDBCRelation, not Streaming Relation.
  • without 'stream'

    • select kafka_sql.name, count(door) from kafka_sql_test inner join mysql_test on kafka_sql_test.name == mysql_test.name group by kafka_sql_test.name
      • It will be regarded as Batch Query, the kafka_sql_test will be parsed to KafkaRelation and mysql_test will be parsed as JDBCRelation.

@WangTaoTheTonic
Copy link
Contributor

How should we do if we wanna join two kafka stream and sink the result to another stream?

@jackylee-ch
Copy link
Contributor Author

How should we do if we wanna join two kafka stream and sink the result to another stream?
insert into kafka_sql_out select stream t1.value from (select cast(value as string), timestamp as time1 from kafka_sql_in1) as t1 inner join (select cast(value as string), timestamp as time2 from kafka_sql_in2) as t2 on time1 >= time2 and time1 <= time2 + interval 10 seconds where t1.value == t2.value

@jackylee-ch
Copy link
Contributor Author

cc @xuanyuanking

Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As comment in https://issues.apache.org/jira/browse/SPARK-24630?focusedCommentId=16523064&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16523064, the currently approach supporting user submitting a streaming job all by sql, but mainly based on Hive table support, need more discussion on other data sources.

@cloud-fan
Copy link
Contributor

Do we have a full story about streaming SQL? is the STREAM keyword the only difference between stream sql and normal sql?

also cc @tdas @zsxwing

@jackylee-ch
Copy link
Contributor Author

ql and normal sql? how could users define watermark with SQL?

Yes, the 'stream' keyword is the only difference from normal sql.
We can use configuration to define watermark.

@jackylee-ch
Copy link
Contributor Author

jackylee-ch commented Oct 26, 2018

@WangTaoTheTonic @cloud-fan @xuanyuanking
I have removed the stream keyword. Table API is supported in SQLStreaming now.

@WangTaoTheTonic
Copy link
Contributor

Nice! I am looking forward to it.

@shijinkui
Copy link
Contributor

@cloud-fan Hi, Wenchen. Is it ready for merge in? This PR is very useful and is what I want to develop and need.
Once Spark support StreamSQL, it will be easier for developping streaming job.
Thanks.

@jackylee-ch
Copy link
Contributor Author

@tdas @zsxwing @cloud-fan
Hi, any other questions block this patch for merge in?

@gvramana
Copy link
Contributor

gvramana commented Nov 12, 2018

How should we do if we wanna join two kafka stream and sink the result to another stream?
insert into kafka_sql_out select stream t1.value from (select cast(value as string), timestamp as time1 from kafka_sql_in1) as t1 inner join (select cast(value as string), timestamp as time2 from kafka_sql_in2) as t2 on time1 >= time2 and time1 <= time2 + interval 10 seconds where t1.value == t2.value

Hi stczwd,
Currently Dataframe API support "writeStream.start()" api to run streaming in background, so that query can be executed on that sink, also multiple stream to stream processing can happen in single session.
How this can be achieved using INSERT INTO stream?
How multiple streams with different properties can be executed in same session?

@jackylee-ch
Copy link
Contributor Author

Currently Dataframe API support "writeStream.start()" api to run streaming in background, so that query can be executed on that sink, also multiple stream to stream processing can happen in single session.
How this can be achieved using INSERT INTO stream?
How multiple streams with different properties can be executed in same session?

SQLStreaming does not support multiple streams. In our cases, SQLStreaming is basically used in ad-hoc, Each case only run one insert into steam.
Still, SQLStreaming can support multiple streams with Table API.
spark.table("kafka_stream").groupBy("value").count().writeStream.outputMode("complete").format("console").start()

@jackylee-ch
Copy link
Contributor Author

@cloud-fan @zsxwing @tdas @xuanyuanking
This patch has been submitted for a long time. Do you have any questions? Can this patch be merged in?

@sujith71955
Copy link
Contributor

sujith71955 commented Nov 26, 2018

@stczwd Can you provide a detail design document for this PR, by mentioning the scenarios is been handled and constraints if any. this wll give a complete pitcture about this PR. Thanks

@jackylee-ch
Copy link
Contributor Author

@sujithjay
Please refer SPARK-24630 for more details.

@sujith71955
Copy link
Contributor

image

There is a DatasourceV2 community synch meetup tomorrow which is cordinated by Ryan Blue , can we discuss this point.

@sujith71955
Copy link
Contributor

cc @koeninger

@jackylee-ch
Copy link
Contributor Author

image

I have removed the 'stream' keyword.

There is a DatasourceV2 community synch meetup tomorrow which is cordinated by Ryan Blue , can we discuss this point.

Yep, it's a good idea.

@sujith71955
Copy link
Contributor

sujith71955 commented Nov 28, 2018 via email

@jackylee-ch
Copy link
Contributor Author

Can you send a mail to Ryan blue for adding this SPIP topic in tomorrow meeting. Meeting will be conducted tomorrow 05:00 pm PST. If you confirm then we can also attend the meeting.

I have send an email to Ryan Blue to attend this meeting.

@sujith71955
Copy link
Contributor

Can you send a mail to Ryan blue for adding this SPIP topic in tomorrow meeting. Meeting will be conducted tomorrow 05:00 pm PST. If you confirm then we can also attend the meeting.

I have send an email to Ryan Blue to attend this meeting.

I think you should also ask him to add your SPIP topic for tomorrows discussion.Agenda has to be set prior.

@jackylee-ch
Copy link
Contributor Author

I hive send an email to Ryan Blue.

Can you send a mail to Ryan blue for adding this SPIP topic in tomorrow meeting. Meeting will be conducted tomorrow 05:00 pm PST. If you confirm then we can also attend the meeting.

I have send an email to Ryan Blue to attend this meeting.

I think you should also ask him to add your SPIP topic for tomorrows discussion.Agenda has to be set prior.

Tomorrow's discussion is mainly focus on DataSource V2 API, I don't think they will spend time to discuss SQL API. However, We can mention it while discussing the Catalog API.

@jackylee-ch
Copy link
Contributor Author

@cloud-fan You can take a look at my changes here, I will update the design documentation later,
Thank you for your interest in SQLStreaming.

@yaooqinn
Copy link
Member

@stczwd lgtm. nit: It would be very helpful to add some instructions to the online documentation.

@jackylee-ch
Copy link
Contributor Author

@stczwd lgtm. nit: It would be very helpful to add some instructions to the online documentation.

@yaooqinn Thanks for support. I have written a detailed design doc, which will be issued soon.

*/
private def parseTrigger(): Trigger = {
val trigger = Utils.timeStringAsMs(sqlConf.sqlStreamTrigger)
Trigger.ProcessingTime(trigger, TimeUnit.MILLISECONDS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continuous processing mode is supported now, do you plan to support it? If so I think we can traverse the logical plan to find out whether this is a continuous query and create a ContinuousTrigger

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions
Copy link

github-actions bot commented Jan 6, 2020

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

@github-actions github-actions bot added the Stale label Jan 6, 2020
@github-actions github-actions bot closed this Jan 7, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.