-
Notifications
You must be signed in to change notification settings - Fork 82
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Elasticsearch Ingestion] Add Elasticsearch ingestion pipeline #727
Conversation
@baiqiushi Hi Qiushi, @cdvr and I decided to use raw tweets data (retrieved from Twitter API) as sample data. When you test this PR, please replace the file: |
Codecov Report
@@ Coverage Diff @@
## master #727 +/- ##
==========================================
- Coverage 63.99% 63.91% -0.09%
==========================================
Files 75 75
Lines 4069 4076 +7
Branches 349 355 +6
==========================================
+ Hits 2604 2605 +1
- Misses 1465 1471 +6
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
…berry into elastic-ingestion
examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/TwitterJSONTagToADM.scala
Outdated
Show resolved
Hide resolved
59a2e6f
to
d678d5a
Compare
Needs verification, and then merge. |
I tested it again following the wiki guide. Still as before, I got the following error while running [info] Start to ingest tweets...
[info] Checked Python interpreter version...
[info]Make sure to use Python 3.0+
[info] Passed!
Traceback (most recent call last):
File "./script/ingestElasticData.py", line 43, in <module>
res = request.urlopen(req)
File "/usr/local/Cellar/python/3.7.4_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/urllib/request.py", line 222, in urlopen
return opener.open(url, data, timeout)
File "/usr/local/Cellar/python/3.7.4_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/urllib/request.py", line 531, in open
response = meth(req, response)
File "/usr/local/Cellar/python/3.7.4_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/urllib/request.py", line 641, in http_response
'http', request, response, code, msg, hdrs)
File "/usr/local/Cellar/python/3.7.4_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/urllib/request.py", line 569, in error
return self._call_chain(*args)
File "/usr/local/Cellar/python/3.7.4_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/urllib/request.py", line 503, in _call_chain
result = func(*args)
File "/usr/local/Cellar/python/3.7.4_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/urllib/request.py", line 649, in http_error_default
raise HTTPError(req.full_url, code, msg, hdrs, fp)
urllib.error.HTTPError: HTTP Error 500: Internal Server Error
[info] Showing high-level information about indices in Elasticsearch cluster AFTER ingesting data...
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
green open twitter.ds_tweet EO7Yk1o5T_avjPMCFQDMpg 4 0 0 0 920b 920b
[success] Finish ingesting tweetsMacBook-Pro:twittermap white$ On the elasticsearch side, it gives this error: [2020-11-08T21:40:12,012][INFO ][o.e.c.m.MetaDataIndexTemplateService] [QP0DVi2] adding template [twitter] for index patterns [twitter.ds_tweet_*]
[2020-11-08T21:42:04,328][WARN ][r.suppressed ] [QP0DVi2] path: /twitter.ds_tweet/_doc/_bulk, params: {index=twitter.ds_tweet, type=_doc}
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Use': was expecting ('true', 'false' or 'null')
at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@6df9fc31; line: 1, column: 5]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702) ~[jackson-core-2.8.11.jar:2.8.11]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558) ~[jackson-core-2.8.11.jar:2.8.11]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3528) ~[jackson-core-2.8.11.jar:2.8.11]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686) ~[jackson-core-2.8.11.jar:2.8.11]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878) ~[jackson-core-2.8.11.jar:2.8.11]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772) ~[jackson-core-2.8.11.jar:2.8.11]
at org.elasticsearch.common.xcontent.json.JsonXContentParser.nextToken(JsonXContentParser.java:52) ~[elasticsearch-x-content-6.7.2.jar:6.7.2]
at org.elasticsearch.action.bulk.BulkRequest.add(BulkRequest.java:335) ~[elasticsearch-6.7.2.jar:6.7.2]
at org.elasticsearch.rest.action.document.RestBulkAction.prepareRequest(RestBulkAction.java:94) ~[elasticsearch-6.7.2.jar:6.7.2]
at org.elasticsearch.rest.BaseRestHandler.handleRequest(BaseRestHandler.java:91) ~[elasticsearch-6.7.2.jar:6.7.2]
at org.elasticsearch.xpack.security.rest.SecurityRestFilter.handleRequest(SecurityRestFilter.java:87) ~[?:?]
at org.elasticsearch.rest.RestController.dispatchRequest(RestController.java:240) [elasticsearch-6.7.2.jar:6.7.2]
at org.elasticsearch.rest.RestController.tryAllHandlers(RestController.java:336) [elasticsearch-6.7.2.jar:6.7.2]
at org.elasticsearch.rest.RestController.dispatchRequest(RestController.java:174) [elasticsearch-6.7.2.jar:6.7.2]
at org.elasticsearch.http.netty4.Netty4HttpServerTransport.dispatchRequest(Netty4HttpServerTransport.java:551) [transport-netty4-client-6.7.2.jar:6.7.2]
at org.elasticsearch.http.netty4.Netty4HttpRequestHandler.channelRead0(Netty4HttpRequestHandler.java:137) [transport-netty4-client-6.7.2.jar:6.7.2]
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at org.elasticsearch.http.netty4.pipelining.HttpPipeliningHandler.channelRead(HttpPipeliningHandler.java:68) [transport-netty4-client-6.7.2.jar:6.7.2]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) [netty-codec-4.1.32.Final.jar:4.1.32.Final]
at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111) [netty-codec-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) [netty-codec-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) [netty-codec-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323) [netty-codec-4.1.32.Final.jar:4.1.32.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297) [netty-codec-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) [netty-handler-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:556) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:510) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) [netty-transport-4.1.32.Final.jar:4.1.32.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) [netty-common-4.1.32.Final.jar:4.1.32.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191] |
@baiqiushi I am able to ingest the data fine. Although I am hitting a |
@duckino Thanks! Then how should we proceed with this? Any idea with the errors that I encountered? |
I re-did the verification using a brand new Mac Book Pro with Catalina 10.15.7. It worked as expected using OpenJDK 8 and sbt-0.13. The corresponding adjustment notes are added into the |
Thanks 👍🏼 |
No description provided.