-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support kafka -> structured streaming data process #43
Conversation
cb39a3e0(Merge branch 'master' of https://github.com/riverzzz/nebula-exchange) 6587cbd6(Merge branch 'master' of https://github.com/riverzzz/nebula-exchange) 53e37ee7(Merge branch 'master' of https://github.com/riverzzz/nebula-exchange) 261677e2(增加fields,解析kafkajson数据) 47eea8e5(Merge branch 'master' of https://github.com/riverzzz/nebula-spark-utils) 24ae4250(Merge pull request #166 from Nicole00/readme) a8fd6623(Merge branch 'master' into readme) 6c2700f4(Merge pull request #164 from Thericecookers/master) d1724aa5(format) a13c2415(format) 6196e6d8(add repo transfer note) f41f41dc(Merge branch 'master' into master) 4cd75f07(Merge pull request #165 from Nicole00/louvain) 8a837127(Merge branch 'master' into louvain) 40dbe339(fix louvain's result format) 138a1a11(bugfix: Reverse edge has wrong partitionId) 4d184f1e(删除多余的pom依赖避免报错) 93a8ff74(支持解析kafka数据,支持配置offset、拉取限制 流数据和离线导入解析兼容 pom配置支持流处理)
cb39a3e0(Merge branch 'master' of https://github.com/riverzzz/nebula-exchange) 6587cbd6(Merge branch 'master' of https://github.com/riverzzz/nebula-exchange) 53e37ee7(Merge branch 'master' of https://github.com/riverzzz/nebula-exchange) 261677e2(增加fields,解析kafkajson数据) 47eea8e5(Merge branch 'master' of https://github.com/riverzzz/nebula-spark-utils) 24ae4250(Merge pull request #166 from Nicole00/readme) a8fd6623(Merge branch 'master' into readme) 6c2700f4(Merge pull request #164 from Thericecookers/master) d1724aa5(format) a13c2415(format) 6196e6d8(add repo transfer note) f41f41dc(Merge branch 'master' into master) 4cd75f07(Merge pull request #165 from Nicole00/louvain) 8a837127(Merge branch 'master' into louvain) 40dbe339(fix louvain's result format) 138a1a11(bugfix: Reverse edge has wrong partitionId) 4d184f1e(删除多余的pom依赖避免报错) 93a8ff74(支持解析kafka数据,支持配置offset、拉取限制 流数据和离线导入解析兼容 pom配置支持流处理)
Thanks for your contribution~ That's exactly what I'm going to do. Besides to skip the invalid data, can we extract the data pre-process to a function? |
reader.load() | ||
.select($"value".cast(StringType)) | ||
.select($"value", json_tuple($"value", fields: _*)) | ||
.toDF("value" :: fields: _*) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need to keep the value
field in the dataframe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest keep it,if zhe data is invalid data,i will know source data to help fix bug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's no print operation for df, how can we know the source data's format?
the way is to debug the data, then can we use watch
to get the value data ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in Processor.scala filter will print it
i don't know watch means
@@ -249,6 +249,7 @@ object Configs { | |||
private[this] val DEFAULT_LOCAL_PATH = None | |||
private[this] val DEFAULT_REMOTE_PATH = None | |||
private[this] val DEFAULT_STREAM_INTERVAL = 30 | |||
private[this] val DEFAULT_KAFKA_STARTINGOFFSETS = "latest" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
format the code style please
@@ -162,16 +162,20 @@ case class MySQLSourceConfigEntry(override val category: SourceCategory.Value, | |||
* | |||
* @param server | |||
* @param topic | |||
* @param startingOffsets |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
format the code style please
remove value data extract function
please rebase the master code and rerun the ci please. |
Codecov Report
@@ Coverage Diff @@
## master #43 +/- ##
============================================
- Coverage 28.10% 27.98% -0.12%
Complexity 6 6
============================================
Files 24 24
Lines 2117 2158 +41
Branches 396 404 +8
============================================
+ Hits 595 604 +9
- Misses 1415 1445 +30
- Partials 107 109 +2
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Great work, thanks again
support auto transfor kafka json data to df by flields,and keep field data type unchanged
support set kafka startingOffsets and maxOffsetsPerTrigger
split data check and transfer process logic,split it to filter and map;change error streaming data process logic,will only print log
pom add streaming process dependencies,exclusion unnecessary dependencies