Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Elasticsearch Ingestion] Add Elasticsearch ingestion pipeline #727

Merged
merged 13 commits into from
Dec 29, 2020

Conversation

tahoe01
Copy link
Contributor

@tahoe01 tahoe01 commented Nov 11, 2019

No description provided.

@tahoe01 tahoe01 requested review from baiqiushi and byhow November 11, 2019 09:19
@tahoe01
Copy link
Contributor Author

tahoe01 commented Nov 11, 2019

@baiqiushi Hi Qiushi, @cdvr and I decided to use raw tweets data (retrieved from Twitter API) as sample data. When you test this PR, please replace the file: sample.json.zip at path: cloudberry.ics.uci.edu/img/ with the one I sent to you.

@codecov-io
Copy link

codecov-io commented Nov 11, 2019

Codecov Report

Merging #727 (d3bcf93) into master (6ba0c7c) will decrease coverage by 0.08%.
The diff coverage is 14.28%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #727      +/-   ##
==========================================
- Coverage   63.99%   63.91%   -0.09%     
==========================================
  Files          75       75              
  Lines        4069     4076       +7     
  Branches      349      355       +6     
==========================================
+ Hits         2604     2605       +1     
- Misses       1465     1471       +6     
Impacted Files Coverage Δ
...edu/uci/ics/cloudberry/zion/actor/RESTSolver.scala 100.00% <ø> (ø)
...es/twittermap/web/app/actor/TwitterMapPigeon.scala 0.00% <0.00%> (ø)
...ap/web/app/controllers/TwitterMapApplication.scala 0.00% <0.00%> (ø)
...websocket/TwitterMapServerToCloudBerrySocket.scala 0.00% <0.00%> (ø)
...wittermap/web/app/websocket/WebSocketFactory.scala 0.00% <0.00%> (ø)
.../uci/ics/cloudberry/noah/TwitterJSONTagToADM.scala 27.50% <44.44%> (-1.92%) ⬇️
...a/edu/uci/ics/cloudberry/zion/actor/Reporter.scala 80.64% <100.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 86790a2...d3bcf93. Read the comment docs.

Copy link
Contributor

@byhow byhow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@baiqiushi
Copy link
Collaborator

Needs verification, and then merge.

@baiqiushi
Copy link
Collaborator

I tested it again following the wiki guide.

Still as before, I got the following error while running ./script/ingestTweetToElasticCluster.sh.

[info] Start to ingest tweets...

[info] Checked Python interpreter version...
[info]Make sure to use Python 3.0+
[info] Passed!
Traceback (most recent call last):
  File "./script/ingestElasticData.py", line 43, in <module>
    res = request.urlopen(req)
  File "/usr/local/Cellar/python/3.7.4_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/urllib/request.py", line 222, in urlopen
    return opener.open(url, data, timeout)
  File "/usr/local/Cellar/python/3.7.4_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/urllib/request.py", line 531, in open
    response = meth(req, response)
  File "/usr/local/Cellar/python/3.7.4_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/urllib/request.py", line 641, in http_response
    'http', request, response, code, msg, hdrs)
  File "/usr/local/Cellar/python/3.7.4_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/urllib/request.py", line 569, in error
    return self._call_chain(*args)
  File "/usr/local/Cellar/python/3.7.4_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/urllib/request.py", line 503, in _call_chain
    result = func(*args)
  File "/usr/local/Cellar/python/3.7.4_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/urllib/request.py", line 649, in http_error_default
    raise HTTPError(req.full_url, code, msg, hdrs, fp)
urllib.error.HTTPError: HTTP Error 500: Internal Server Error


[info] Showing high-level information about indices in Elasticsearch cluster AFTER ingesting data...

health status index            uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   twitter.ds_tweet EO7Yk1o5T_avjPMCFQDMpg   4   0          0            0       920b           920b


[success] Finish ingesting tweetsMacBook-Pro:twittermap white$ 

On the elasticsearch side, it gives this error:

[2020-11-08T21:40:12,012][INFO ][o.e.c.m.MetaDataIndexTemplateService] [QP0DVi2] adding template [twitter] for index patterns [twitter.ds_tweet_*]
[2020-11-08T21:42:04,328][WARN ][r.suppressed             ] [QP0DVi2] path: /twitter.ds_tweet/_doc/_bulk, params: {index=twitter.ds_tweet, type=_doc}
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Use': was expecting ('true', 'false' or 'null')
 at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@6df9fc31; line: 1, column: 5]
	at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702) ~[jackson-core-2.8.11.jar:2.8.11]
	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558) ~[jackson-core-2.8.11.jar:2.8.11]
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3528) ~[jackson-core-2.8.11.jar:2.8.11]
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686) ~[jackson-core-2.8.11.jar:2.8.11]
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878) ~[jackson-core-2.8.11.jar:2.8.11]
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772) ~[jackson-core-2.8.11.jar:2.8.11]
	at org.elasticsearch.common.xcontent.json.JsonXContentParser.nextToken(JsonXContentParser.java:52) ~[elasticsearch-x-content-6.7.2.jar:6.7.2]
	at org.elasticsearch.action.bulk.BulkRequest.add(BulkRequest.java:335) ~[elasticsearch-6.7.2.jar:6.7.2]
	at org.elasticsearch.rest.action.document.RestBulkAction.prepareRequest(RestBulkAction.java:94) ~[elasticsearch-6.7.2.jar:6.7.2]
	at org.elasticsearch.rest.BaseRestHandler.handleRequest(BaseRestHandler.java:91) ~[elasticsearch-6.7.2.jar:6.7.2]
	at org.elasticsearch.xpack.security.rest.SecurityRestFilter.handleRequest(SecurityRestFilter.java:87) ~[?:?]
	at org.elasticsearch.rest.RestController.dispatchRequest(RestController.java:240) [elasticsearch-6.7.2.jar:6.7.2]
	at org.elasticsearch.rest.RestController.tryAllHandlers(RestController.java:336) [elasticsearch-6.7.2.jar:6.7.2]
	at org.elasticsearch.rest.RestController.dispatchRequest(RestController.java:174) [elasticsearch-6.7.2.jar:6.7.2]
	at org.elasticsearch.http.netty4.Netty4HttpServerTransport.dispatchRequest(Netty4HttpServerTransport.java:551) [transport-netty4-client-6.7.2.jar:6.7.2]
	at org.elasticsearch.http.netty4.Netty4HttpRequestHandler.channelRead0(Netty4HttpRequestHandler.java:137) [transport-netty4-client-6.7.2.jar:6.7.2]
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at org.elasticsearch.http.netty4.pipelining.HttpPipeliningHandler.channelRead(HttpPipeliningHandler.java:68) [transport-netty4-client-6.7.2.jar:6.7.2]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) [netty-codec-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111) [netty-codec-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) [netty-codec-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) [netty-codec-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323) [netty-codec-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297) [netty-codec-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) [netty-handler-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:556) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:510) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) [netty-common-4.1.32.Final.jar:4.1.32.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191]

@byhow
Copy link
Contributor

byhow commented Nov 10, 2020

@baiqiushi I am able to ingest the data fine. Although I am hitting a UnsupportedDataTypeException in the activation jar when building the geotag module, the data is inserted correctly with the count of 73348 as stated in the wiki. The mapping and template are created as well.

image

image

@baiqiushi
Copy link
Collaborator

@duckino Thanks! Then how should we proceed with this? Any idea with the errors that I encountered?

@baiqiushi
Copy link
Collaborator

I re-did the verification using a brand new Mac Book Pro with Catalina 10.15.7. It worked as expected using OpenJDK 8 and sbt-0.13. The corresponding adjustment notes are added into the quick-start and elasticsearch-adapter wiki pages.
This PR can be merged now.

@tahoe01
Copy link
Contributor Author

tahoe01 commented Dec 28, 2020

Thanks 👍🏼

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants