Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support kafka -> structured streaming data process #43

Merged
merged 4 commits into from
Dec 23, 2021
Merged

support kafka -> structured streaming data process #43

merged 4 commits into from
Dec 23, 2021

Conversation

riverzzz
Copy link
Contributor

support auto transfor kafka json data to df by flields,and keep field data type unchanged
support set kafka startingOffsets and maxOffsetsPerTrigger
split data check and transfer process logic,split it to filter and map;change error streaming data process logic,will only print log
pom add streaming process dependencies,exclusion unnecessary dependencies

cb39a3e0(Merge branch 'master' of https://github.com/riverzzz/nebula-exchange)
6587cbd6(Merge branch 'master' of https://github.com/riverzzz/nebula-exchange)
53e37ee7(Merge branch 'master' of https://github.com/riverzzz/nebula-exchange)
261677e2(增加fields,解析kafkajson数据)
47eea8e5(Merge branch 'master' of https://github.com/riverzzz/nebula-spark-utils)
24ae4250(Merge pull request #166 from Nicole00/readme)
a8fd6623(Merge branch 'master' into readme)
6c2700f4(Merge pull request #164 from Thericecookers/master)
d1724aa5(format)
a13c2415(format)
6196e6d8(add repo transfer note)
f41f41dc(Merge branch 'master' into master)
4cd75f07(Merge pull request #165 from Nicole00/louvain)
8a837127(Merge branch 'master' into louvain)
40dbe339(fix louvain's result format)
138a1a11(bugfix: Reverse edge has wrong partitionId)
4d184f1e(删除多余的pom依赖避免报错)
93a8ff74(支持解析kafka数据,支持配置offset、拉取限制 流数据和离线导入解析兼容 pom配置支持流处理)
cb39a3e0(Merge branch 'master' of https://github.com/riverzzz/nebula-exchange)
6587cbd6(Merge branch 'master' of https://github.com/riverzzz/nebula-exchange)
53e37ee7(Merge branch 'master' of https://github.com/riverzzz/nebula-exchange)
261677e2(增加fields,解析kafkajson数据)
47eea8e5(Merge branch 'master' of https://github.com/riverzzz/nebula-spark-utils)
24ae4250(Merge pull request #166 from Nicole00/readme)
a8fd6623(Merge branch 'master' into readme)
6c2700f4(Merge pull request #164 from Thericecookers/master)
d1724aa5(format)
a13c2415(format)
6196e6d8(add repo transfer note)
f41f41dc(Merge branch 'master' into master)
4cd75f07(Merge pull request #165 from Nicole00/louvain)
8a837127(Merge branch 'master' into louvain)
40dbe339(fix louvain's result format)
138a1a11(bugfix: Reverse edge has wrong partitionId)
4d184f1e(删除多余的pom依赖避免报错)
93a8ff74(支持解析kafka数据,支持配置offset、拉取限制 流数据和离线导入解析兼容 pom配置支持流处理)
@CLAassistant
Copy link

CLAassistant commented Dec 17, 2021

CLA assistant check
All committers have signed the CLA.

@Nicole00
Copy link
Contributor

Thanks for your contribution~ That's exactly what I'm going to do.

Besides to skip the invalid data, can we extract the data pre-process to a function?

reader.load()
.select($"value".cast(StringType))
.select($"value", json_tuple($"value", fields: _*))
.toDF("value" :: fields: _*)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need to keep the value field in the dataframe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest keep it,if zhe data is invalid data,i will know source data to help fix bug

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's no print operation for df, how can we know the source data's format?
the way is to debug the data, then can we use watch to get the value data ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in Processor.scala filter will print it
i don't know watch means

@@ -249,6 +249,7 @@ object Configs {
private[this] val DEFAULT_LOCAL_PATH = None
private[this] val DEFAULT_REMOTE_PATH = None
private[this] val DEFAULT_STREAM_INTERVAL = 30
private[this] val DEFAULT_KAFKA_STARTINGOFFSETS = "latest"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

format the code style please

@@ -162,16 +162,20 @@ case class MySQLSourceConfigEntry(override val category: SourceCategory.Value,
*
* @param server
* @param topic
* @param startingOffsets
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

format the code style please

remove value data
extract function
@Nicole00
Copy link
Contributor

please rebase the master code and rerun the ci please.

@codecov-commenter
Copy link

Codecov Report

Merging #43 (e3de154) into master (14f29ef) will decrease coverage by 0.11%.
The diff coverage is 25.20%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master      #43      +/-   ##
============================================
- Coverage     28.10%   27.98%   -0.12%     
  Complexity        6        6              
============================================
  Files            24       24              
  Lines          2117     2158      +41     
  Branches        396      404       +8     
============================================
+ Hits            595      604       +9     
- Misses         1415     1445      +30     
- Partials        107      109       +2     
Impacted Files Coverage Δ
...soft/nebula/exchange/processor/EdgeProcessor.scala 0.00% <0.00%> (ø)
...m/vesoft/nebula/exchange/processor/Processor.scala 63.07% <0.00%> (-1.50%) ⬇️
.../nebula/exchange/processor/VerticesProcessor.scala 0.00% <0.00%> (ø)
...t/nebula/exchange/reader/StreamingBaseReader.scala 0.00% <0.00%> (ø)
...la/com/vesoft/nebula/exchange/config/Configs.scala 63.89% <93.10%> (+0.24%) ⬆️
.../vesoft/nebula/exchange/config/SourceConfigs.scala 60.52% <100.00%> (+0.70%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 14f29ef...e3de154. Read the comment docs.

Copy link
Contributor

@Nicole00 Nicole00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Great work, thanks again

@Nicole00 Nicole00 merged commit d229bee into vesoft-inc:master Dec 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants