From 9a526c9da53659f9a60e183420c782e6d07069eb Mon Sep 17 00:00:00 2001 From: dayuebai Date: Mon, 11 Nov 2019 01:12:39 -0800 Subject: [PATCH 1/5] Add Elasticsearch ingestion pipeline --- .../cloudberry/noah/TwitterJSONTagToADM.scala | 20 ++- .../edu/uci/ics/cloudberry/noah/adm/ADM.java | 42 ++--- .../uci/ics/cloudberry/noah/adm/Place.java | 58 ++++--- .../uci/ics/cloudberry/noah/adm/Tweet.java | 80 +++++++++- .../edu/uci/ics/cloudberry/noah/adm/User.java | 29 +++- examples/twittermap/script/elasticGeoTag.sh | 28 ++++ .../twittermap/script/ingestElasticData.py | 48 ++++++ .../script/ingestTweetToElasticCluster.sh | 149 ++++++++++++++++++ 8 files changed, 407 insertions(+), 47 deletions(-) create mode 100755 examples/twittermap/script/elasticGeoTag.sh create mode 100644 examples/twittermap/script/ingestElasticData.py create mode 100755 examples/twittermap/script/ingestTweetToElasticCluster.sh diff --git a/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/TwitterJSONTagToADM.scala b/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/TwitterJSONTagToADM.scala index 7993951f6..565cc18c2 100644 --- a/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/TwitterJSONTagToADM.scala +++ b/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/TwitterJSONTagToADM.scala @@ -2,7 +2,7 @@ package edu.uci.ics.cloudberry.noah import java.io.File import java.util.concurrent.Executors - +import play.api.libs.json._ import edu.uci.ics.cloudberry.gnosis._ import edu.uci.ics.cloudberry.noah.adm.Tweet import edu.uci.ics.cloudberry.util.Profile._ @@ -20,6 +20,7 @@ object TwitterJSONTagToADM { var threadNumber = 2 var isDebug = false val bufferSize = 100 + var file = "ADM" // By default, generate ADM file. val usage = """ @@ -36,14 +37,25 @@ object TwitterJSONTagToADM { case "-city" :: value :: tail => shapeMap += CityLevel -> value; parseOption(tail) case "-thread" :: value :: tail => threadNumber = value.toInt; parseOption(tail) case "-debug" :: value :: tail => isDebug = true; parseOption(tail) + case "-fileFormat" :: value :: tail => file = value; parseOption(tail) case option :: tail => System.err.println("unknown option:" + option); System.err.println(usage); System.exit(1); } } def tagOneTweet(ln: String, usGeoGnosis: USGeoGnosis) = { try { - val adm = Tweet.toADM(TwitterObjectFactory.createStatus(ln), usGeoGnosis, true) - if (adm.length > 0) println(adm) + if (file.equals("ADM")) { + val adm = Tweet.toADM(TwitterObjectFactory.createStatus(ln), usGeoGnosis, true) + if (adm.length > 0) println(adm) + } else { + val json = Tweet.toJSON(TwitterObjectFactory.createStatus(ln), usGeoGnosis, true) + if (json.length > 0) { + val obj = Json.parse(json).as[JsObject] + val twitterId = (obj \ "id").get.toString() + val indexStr = s"""{ "index": {"_id": "$twitterId" } }""" + println(indexStr + "\n" + json) + } + } } catch { case e: Throwable => { if (isDebug) { @@ -78,4 +90,4 @@ object TwitterJSONTagToADM { buffer.foreach(tagOneTweet(_, usGeoGnosis)) thpool.shutdownNow() } -} +} \ No newline at end of file diff --git a/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/adm/ADM.java b/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/adm/ADM.java index 761c059ca..91290c625 100644 --- a/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/adm/ADM.java +++ b/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/adm/ADM.java @@ -41,6 +41,10 @@ public synchronized static String mkDateTimeConstructor(Date jdate) { return "datetime(\"" + ADMDateFormat.format(jdate) + "T" + ADMTimeFormat.format(jdate) + "\")"; } + public synchronized static String mkJSONDateTimeConstructor(Date jdate) { + return ADMDateFormat.format(jdate) + "T" + ADMTimeFormat.format(jdate); + } + public static Rectangle coordinates2Rectangle(GeoLocation[][] boundingBoxCoordinates){ if (boundingBoxCoordinates.length != 1 || boundingBoxCoordinates[0].length != 4) { throw new IllegalArgumentException("unknown boundingBoxCoordinates"); @@ -48,21 +52,21 @@ public static Rectangle coordinates2Rectangle(GeoLocation[][] boundingBoxCoordin // Twitter has some wield format historically, though it still rectangle, but it is not always // in (sw, se, ne,nw) order double swLog = Collections.min(Arrays.asList(boundingBoxCoordinates[0][0].getLongitude(), - boundingBoxCoordinates[0][1].getLongitude(), - boundingBoxCoordinates[0][2].getLongitude(), - boundingBoxCoordinates[0][3].getLongitude())); + boundingBoxCoordinates[0][1].getLongitude(), + boundingBoxCoordinates[0][2].getLongitude(), + boundingBoxCoordinates[0][3].getLongitude())); double swLat = Collections.min(Arrays.asList(boundingBoxCoordinates[0][0].getLatitude(), - boundingBoxCoordinates[0][1].getLatitude(), - boundingBoxCoordinates[0][2].getLatitude(), - boundingBoxCoordinates[0][3].getLatitude())); + boundingBoxCoordinates[0][1].getLatitude(), + boundingBoxCoordinates[0][2].getLatitude(), + boundingBoxCoordinates[0][3].getLatitude())); double neLog = Collections.max(Arrays.asList(boundingBoxCoordinates[0][0].getLongitude(), - boundingBoxCoordinates[0][1].getLongitude(), - boundingBoxCoordinates[0][2].getLongitude(), - boundingBoxCoordinates[0][3].getLongitude())); + boundingBoxCoordinates[0][1].getLongitude(), + boundingBoxCoordinates[0][2].getLongitude(), + boundingBoxCoordinates[0][3].getLongitude())); double neLat = Collections.max(Arrays.asList(boundingBoxCoordinates[0][0].getLatitude(), - boundingBoxCoordinates[0][1].getLatitude(), - boundingBoxCoordinates[0][2].getLatitude(), - boundingBoxCoordinates[0][3].getLatitude())); + boundingBoxCoordinates[0][1].getLatitude(), + boundingBoxCoordinates[0][2].getLatitude(), + boundingBoxCoordinates[0][3].getLatitude())); // AsterixDB is unhappy with this kind of point "rectangular" if (swLog == neLog && swLat == neLat){ @@ -71,22 +75,22 @@ public static Rectangle coordinates2Rectangle(GeoLocation[][] boundingBoxCoordin } if (swLog > neLog || swLat > neLat) { throw new IllegalArgumentException( - "Not a good Rectangle: " + "sw:" + swLog + "," + swLat + ", ne:" + neLog + "," + neLat); + "Not a good Rectangle: " + "sw:" + swLog + "," + swLat + ", ne:" + neLog + "," + neLat); } return new Rectangle(swLog, swLat, neLog, neLat); } public static String mkRectangleConstructor(GeoLocation[][] boundingBoxCoordinates) - throws IllegalArgumentException { + throws IllegalArgumentException { StringBuilder sb = new StringBuilder("rectangle"); Rectangle rectangle = coordinates2Rectangle(boundingBoxCoordinates); sb.append("(\"").append(rectangle.swLog()).append(',') - .append(rectangle.swLat()) - .append(' ') - .append(rectangle.neLog()).append(',') - .append(rectangle.neLat()) - .append("\")"); + .append(rectangle.swLat()) + .append(' ') + .append(rectangle.neLog()).append(',') + .append(rectangle.neLat()) + .append("\")"); return sb.toString(); } diff --git a/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/adm/Place.java b/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/adm/Place.java index 43d190397..7d14a016a 100644 --- a/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/adm/Place.java +++ b/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/adm/Place.java @@ -1,29 +1,45 @@ package edu.uci.ics.cloudberry.noah.adm; +import edu.uci.ics.cloudberry.util.Rectangle; import twitter4j.GeoLocation; +import static edu.uci.ics.cloudberry.noah.adm.ADM.coordinates2Rectangle; + public class Place { - public static String COUNTRY = "country"; - public static String COUNTRY_CODE = "country_code"; - public static String FULL_NAME = "full_name"; - public static String ID = "id"; - public static String NAME = "name"; - public static String PLACE_TYPE = "place_type"; - public static String BOUNDING_BOX = "bounding_box"; + public static String COUNTRY = "country"; + public static String COUNTRY_CODE = "country_code"; + public static String FULL_NAME = "full_name"; + public static String ID = "id"; + public static String NAME = "name"; + public static String PLACE_TYPE = "place_type"; + public static String BOUNDING_BOX = "bounding_box"; + + public static String toADM(twitter4j.Place place) { + StringBuilder sb = new StringBuilder(); + sb.append("{"); + ADM.keyValueToSbWithComma(sb, COUNTRY, ADM.mkQuote(place.getCountry())); + ADM.keyValueToSbWithComma(sb, COUNTRY_CODE, ADM.mkQuote(place.getCountry())); + ADM.keyValueToSbWithComma(sb, FULL_NAME, ADM.mkQuote(place.getFullName())); + ADM.keyValueToSbWithComma(sb, ID, ADM.mkQuote(String.valueOf(place.getId()))); + ADM.keyValueToSbWithComma(sb, NAME, ADM.mkQuote(place.getName())); + ADM.keyValueToSbWithComma(sb, PLACE_TYPE, ADM.mkQuote(place.getPlaceType())); + ADM.keyValueToSb(sb, BOUNDING_BOX, ADM.mkQuote(mkRectangleConstructor(place.getBoundingBoxCoordinates()))); + sb.append("}"); + return sb.toString(); + } - public static String toADM(twitter4j.Place place) { - StringBuilder sb = new StringBuilder(); - sb.append("{"); - ADM.keyValueToSbWithComma(sb, COUNTRY, ADM.mkQuote(place.getCountry())); - ADM.keyValueToSbWithComma(sb, COUNTRY_CODE, ADM.mkQuote(place.getCountry())); - ADM.keyValueToSbWithComma(sb, FULL_NAME, ADM.mkQuote(place.getFullName())); - ADM.keyValueToSbWithComma(sb, ID, ADM.mkQuote(String.valueOf(place.getId()))); - ADM.keyValueToSbWithComma(sb, NAME, ADM.mkQuote(place.getName())); - ADM.keyValueToSbWithComma(sb, PLACE_TYPE, ADM.mkQuote(place.getPlaceType())); - ADM.keyValueToSb(sb, BOUNDING_BOX, ADM.mkRectangleConstructor(place.getBoundingBoxCoordinates())); - sb.append("}"); - return sb.toString(); - } + private static String mkRectangleConstructor(GeoLocation[][] boundingBoxCoordinates) + throws IllegalArgumentException { + StringBuilder sb = new StringBuilder("LINESTRING"); -} + Rectangle rectangle = coordinates2Rectangle(boundingBoxCoordinates); + sb.append("(").append(rectangle.swLog()).append(' ') + .append(rectangle.swLat()) + .append(',') + .append(rectangle.neLog()).append(' ') + .append(rectangle.neLat()) + .append(")"); + return sb.toString(); + } +} \ No newline at end of file diff --git a/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/adm/Tweet.java b/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/adm/Tweet.java index 5ce6eb9b1..405115de1 100644 --- a/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/adm/Tweet.java +++ b/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/adm/Tweet.java @@ -3,6 +3,13 @@ import edu.uci.ics.cloudberry.gnosis.USGeoGnosis; import twitter4j.GeoLocation; import twitter4j.Status; +import twitter4j.HashtagEntity; +import twitter4j.UserMentionEntity; + +import java.text.SimpleDateFormat; +import java.util.Locale; + +import static edu.uci.ics.cloudberry.noah.adm.ADM.*; public class Tweet { public static String CREATE_AT = "create_at"; @@ -21,6 +28,45 @@ public class Tweet { public static String USER = "user"; public static String PLACE = "place"; + public static String toJSON(Status status, USGeoGnosis gnosis, boolean requireGeoField) throws UnknownPlaceException{ + String geoTags = geoTag(status, gnosis, requireGeoField); + if (geoTags == null && requireGeoField) + return ""; + StringBuilder sb = new StringBuilder(); + sb.append("{"); + + ADM.keyValueToSbWithComma(sb, CREATE_AT, ADM.mkQuote(ADM.mkJSONDateTimeConstructor(status.getCreatedAt()))); + ADM.keyValueToSbWithComma(sb, ID, String.valueOf(status.getId())); + ADM.keyValueToSbWithComma(sb, TEXT, ADM.mkQuote(status.getText())); + ADM.keyValueToSbWithComma(sb, IN_REPLY_TO_STATUS, String.valueOf(status.getInReplyToStatusId())); + ADM.keyValueToSbWithComma(sb, IN_REPLY_TO_USER, String.valueOf(status.getInReplyToUserId())); + ADM.keyValueToSbWithComma(sb, FAVORITE_COUNT, String.valueOf(status.getFavoriteCount())); + ADM.keyValueToSbWithComma(sb, RETWEET_COUNT, String.valueOf(status.getRetweetCount())); + ADM.keyValueToSbWithComma(sb, LANG, ADM.mkQuote(status.getLang())); + ADM.keyValueToSbWithComma(sb, IS_RETWEET, String.valueOf(status.isRetweet())); + + if (status.getHashtagEntities().length > 0) { + ADM.keyValueToSbWithComma(sb, HASHTAG, mkStringSet(status.getHashtagEntities())); + } + if (status.getUserMentionEntities().length > 0) { + ADM.keyValueToSbWithComma(sb, USER_MENTION, mkStringSet(status.getUserMentionEntities())); + } + if (status.getPlace() != null) { + ADM.keyValueToSbWithComma(sb, PLACE, Place.toADM(status.getPlace())); + } + if (status.getGeoLocation() != null) { + ADM.keyValueToSbWithComma(sb, GEO_COORDINATE, ADM.mkQuote(mkPoint(status.getGeoLocation()))); + } else if (status.getPlace() != null && status.getPlace().getPlaceType().equals("poi")) { + ADM.keyValueToSbWithComma(sb, GEO_COORDINATE, ADM.mkQuote(mkPoint(status.getPlace().getBoundingBoxCoordinates()[0][0]))); + } + if(geoTags != null){ + ADM.keyValueToSbWithComma(sb, GEO_TAG, geoTags); + } + ADM.keyValueToSb(sb, USER, User.toJSON(status.getUser())); + sb.append("}"); + return sb.toString(); + } + public static String toADM(Status status, USGeoGnosis gnosis, boolean requireGeoField) throws UnknownPlaceException{ String geoTags = geoTag(status, gnosis, requireGeoField); if (geoTags == null && requireGeoField) @@ -59,6 +105,10 @@ public static String toADM(Status status, USGeoGnosis gnosis, boolean requireGeo return sb.toString(); } + private static String mkPoint(GeoLocation geoLocation) { + return "point(" + geoLocation.getLongitude() + " " + geoLocation.getLatitude() + ")"; + } + public static String geoTag(Status status, USGeoGnosis gnosis, boolean requireGeoField) throws UnknownPlaceException{ StringBuilder sb = new StringBuilder(); if (textMatchPlace(sb, status, gnosis)) { @@ -122,7 +172,7 @@ protected static boolean textMatchPlace(StringBuilder sb, Status status, USGeoGn } cityName = place.getFullName().substring(index + 1).trim(); info = gnosis.tagNeighborhood(cityName, - ADM.coordinates2Rectangle(place.getBoundingBoxCoordinates())); + ADM.coordinates2Rectangle(place.getBoundingBoxCoordinates())); break; case "poi": // a point double longitude = (place.getBoundingBoxCoordinates())[0][0].getLongitude(); @@ -141,4 +191,30 @@ protected static boolean textMatchPlace(StringBuilder sb, Status status, USGeoGn return true; } -} + private static String mkStringSet(HashtagEntity[] hashtagEntities) { + StringBuilder sb = new StringBuilder(); + sb.append("["); + for (int i = 0; i < hashtagEntities.length; i++) { + if (i > 0) { + sb.append(','); + } + sb.append(ADM.mkQuote(hashtagEntities[i].getText())); + } + sb.append("]"); + return sb.toString(); + } + + private static String mkStringSet(UserMentionEntity[] userMentionEntities) { + StringBuilder sb = new StringBuilder(); + sb.append("["); + for (int i = 0; i < userMentionEntities.length; i++) { + if (i > 0) { + sb.append(','); + } + sb.append(userMentionEntities[i].getId()); + } + sb.append("]"); + return sb.toString(); + } + +} \ No newline at end of file diff --git a/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/adm/User.java b/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/adm/User.java index 328c9f2de..3e27783e6 100644 --- a/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/adm/User.java +++ b/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/adm/User.java @@ -1,5 +1,11 @@ package edu.uci.ics.cloudberry.noah.adm; +import java.text.SimpleDateFormat; +import java.util.Locale; + +import static edu.uci.ics.cloudberry.noah.adm.ADM.ADMDateFormat; +import static edu.uci.ics.cloudberry.noah.adm.ADM.ADMTimeFormat; + public class User { public static final String ID = "id"; @@ -33,4 +39,25 @@ public static String toADM(twitter4j.User user) { return sb.toString(); } -} + + public static String toJSON(twitter4j.User user) { + + StringBuilder sb = new StringBuilder(); + sb.append("{"); + ADM.keyValueToSbWithComma(sb, ID, String.valueOf(user.getId())); + ADM.keyValueToSbWithComma(sb, NAME, ADM.mkQuote(user.getName())); + ADM.keyValueToSbWithComma(sb, SCREEN_NAME, ADM.mkQuote(user.getScreenName())); + ADM.keyValueToSbWithComma(sb, PROFILE_IMAGE_URL, ADM.mkQuote(user.getProfileImageURL())); + ADM.keyValueToSbWithComma(sb, LANG, ADM.mkQuote(user.getLang())); + ADM.keyValueToSbWithComma(sb, LOCATION, ADM.mkQuote(user.getLocation())); + + ADM.keyValueToSbWithComma(sb, CREATE_AT, ADM.mkQuote(ADM.mkJSONDateTimeConstructor(user.getCreatedAt()))); + ADM.keyValueToSbWithComma(sb, DESCRIPTION, ADM.mkQuote(user.getDescription())); + ADM.keyValueToSbWithComma(sb, FOLLOWERS_COUNT, String.valueOf(user.getFollowersCount())); + ADM.keyValueToSbWithComma(sb, FRIENDS_COUNT, String.valueOf(user.getFriendsCount())); + ADM.keyValueToSb(sb, STATUS_COUNT, String.valueOf(user.getStatusesCount())); + sb.append("}"); + + return sb.toString(); + } +} \ No newline at end of file diff --git a/examples/twittermap/script/elasticGeoTag.sh b/examples/twittermap/script/elasticGeoTag.sh new file mode 100755 index 000000000..37447f8f7 --- /dev/null +++ b/examples/twittermap/script/elasticGeoTag.sh @@ -0,0 +1,28 @@ +#!/bin/bash - +#=============================================================================== +# +# FILE: elasticGeoTag.sh +# +# USAGE: ./elasticGeoTag.sh < read stdin > write stdout +# +# DESCRIPTION: +# +# OPTIONS: --- +# REQUIREMENTS: --- +# BUGS: --- +# NOTES: --- +# AUTHOR: Dayue Bai (dayueb@uci.edu), Baihao Wang (baihaow@uci.edu) +# ORGANIZATION: ics.uci.edu +# CREATED: 11/02/2019 21:29:00 PM PST +# REVISION: --- +#=============================================================================== + +set -o nounset # Treat unset variables as an error + +thread=${1:-1} +sbt -mem 2048 "project noah" --error 'set showSuccess := false' "run-main edu.uci.ics.cloudberry.noah.TwitterJSONTagToADM\ + -state web/public/data/state.json\ + -county web/public/data/county.json \ + -city web/public/data/city.json \ + -thread $thread \ + -fileFormat \"JSON\"" diff --git a/examples/twittermap/script/ingestElasticData.py b/examples/twittermap/script/ingestElasticData.py new file mode 100644 index 000000000..c15451c0f --- /dev/null +++ b/examples/twittermap/script/ingestElasticData.py @@ -0,0 +1,48 @@ +#=============================================================================== +# +# FILE: ingestElasticData.py +# +# USAGE: used in elasticGeoTag.sh +# +# DESCRIPTION: ingest data into Elasticsearch cluster +# +# OPTIONS: --- +# REQUIREMENTS: --- +# BUGS: --- +# NOTES: --- +# AUTHOR: Dayue Bai (dayueb@uci.edu), Baihao Wang (baihaow@uci.edu) +# ORGANIZATION: ics.uci.edu +# CREATED: 11/02/2019 21:29:00 PM PST +# REVISION: --- +#=============================================================================== + +import sys + +print("[info]Checking Python interpreter version...\n[info]Make sure to use Python 3.0+") +if sys.version_info.major >= 3: + print("[info]Passed!") + from urllib import request +else: + raise Exception("[error]Must be using Python 3.0+") + +COUNTER = 0 +BUFFER_SIZE_LIMIT = 40000 +URL = "http://localhost:9200/twitter.ds_tweet/_doc/_bulk" +HEADERS = {"Content-type": "application/json"} +buffer = [] + +for tweet in sys.stdin: + COUNTER += 1 + buffer.append(tweet) + + if COUNTER >= BUFFER_SIZE_LIMIT: + COUNTER = 0 + data = ("".join(buffer)).encode("utf-8") + buffer = [] + req = request.Request(URL, data=data, headers=HEADERS) + res = request.urlopen(req) + +if buffer: + data = ("".join(buffer)).encode("utf-8") + req = request.Request(URL, data=data, headers=HEADERS) + res = request.urlopen(req) \ No newline at end of file diff --git a/examples/twittermap/script/ingestTweetToElasticCluster.sh b/examples/twittermap/script/ingestTweetToElasticCluster.sh new file mode 100755 index 000000000..3cad9f463 --- /dev/null +++ b/examples/twittermap/script/ingestTweetToElasticCluster.sh @@ -0,0 +1,149 @@ +#!/bin/bash - +#=============================================================================== +# +# FILE: ingestTweetToElasticCluster.sh +# +# DESCRIPTION: Ingest the twitter data data to Elasticsearch cluster +# +# OPTIONS: +# REQUIREMENTS: --- +# BUGS: --- +# NOTES: --- +# AUTHOR: Dayue Bai (dayueb@uci.edu), Baihao Wang (baihaow@uci.edu) +# ORGANIZATION: ics.uci.edu +# CREATED: 11/02/2019 21:29:00 PM PST +# REVISION: --- +#=============================================================================== + +set -o nounset # Treat unset variables as an error + +printf "[info] Creating an index named twitter.ds_tweet with the following schema in Elasticsearch...\n\n" +curl -X PUT "localhost:9200/twitter.ds_tweet" -H 'Content-Type: application/json' -d' +{ + "mappings" : { + "_doc" : { + "properties" : { + "create_at" : {"type": "date", "format": "strict_date_time"}, + "text": {"type": "text", "fields": {"keyword": {"type": "keyword","ignore_above": 256}}}, + "id": {"type" : "long"}, + "hashtags": {"type": "text", "fields": {"keyword": {"type": "keyword","ignore_above": 256}}}, + "in_reply_to_status": {"type" : "object", "enabled": false}, + "in_reply_to_user": {"type" : "object", "enabled": false}, + "favorite_count": {"type" : "object", "enabled": false}, + "lang": {"type" : "object", "enabled": false}, + "is_retweet": {"type" : "object", "enabled": false}, + "coordinate": {"type" : "object", "enabled": false}, + "user_mentions": {"type" : "object", "enabled": false}, + "user.id": {"type" : "object", "enabled": false}, + "user.name": {"type" : "object", "enabled": false}, + "user.screen_name": {"type" : "object", "enabled": false}, + "user.lang": {"type" : "object", "enabled": false}, + "user.location": {"type" : "object", "enabled": false}, + "user.profile_image_url": {"type" : "object", "enabled": false}, + "user.create_at" : {"type": "date", "format": "strict_date_time"}, + "user.description": {"type" : "object", "enabled": false}, + "user.followers_count": {"type": "object", "enabled": false}, + "user.friends_count": {"type": "object", "enabled": false}, + "user.statues_count": {"type": "object", "enabled": false}, + "place.country": {"type": "object", "enabled": false}, + "place.country_code": {"type": "object", "enabled": false}, + "place.bounding_box": {"type" : "object", "enabled": false}, + "place.full_name": {"type": "object", "enabled": false}, + "place.id": {"type": "object", "enabled": false}, + "place.name": {"type": "object", "enabled": false}, + "place.place_type": {"type": "object", "enabled": false}, + "geo_tag.stateName": {"type" : "object", "enabled": false}, + "geo_tag.countyName": {"type" : "object", "enabled": false}, + "geo_tag.cityName": {"type" : "object", "enabled": false}, + "geo_tag.stateID": {"type": "long"}, + "geo_tag.countyID": {"type": "long"}, + "geo_tag.cityID": {"type": "long"} + } + } + }, + "settings": { + "index": { + "max_result_window": 2147483647, + "number_of_replicas": 0, + "number_of_shards": 4 + } + } +} +' + +printf "\n\n[info] Creating a template named twitter the following schema in Elasticsearch...\nThis template is used for creating view table in Cloudberry...\n\n" +curl -X PUT "localhost:9200/_template/twitter" -H 'Content-Type: application/json' -d' +{ + "index_patterns": ["twitter.ds_tweet_*"], + "mappings" : { + "_doc" : { + "properties" : { + "create_at" : {"type": "date", "format": "strict_date_time"}, + "text": {"type": "text", "fields": {"keyword": {"type": "keyword","ignore_above": 256}}}, + "id": {"type" : "long"}, + "hashtags": {"type": "text", "fields": {"keyword": {"type": "keyword","ignore_above": 256}}}, + "in_reply_to_status": {"type" : "object", "enabled": false}, + "in_reply_to_user": {"type" : "object", "enabled": false}, + "favorite_count": {"type" : "object", "enabled": false}, + "lang": {"type" : "object", "enabled": false}, + "is_retweet": {"type" : "object", "enabled": false}, + "coordinate": {"type" : "object", "enabled": false}, + "user_mentions": {"type" : "object", "enabled": false}, + "user.id": {"type" : "object", "enabled": false}, + "user.name": {"type" : "object", "enabled": false}, + "user.screen_name": {"type" : "object", "enabled": false}, + "user.lang": {"type" : "object", "enabled": false}, + "user.location": {"type" : "object", "enabled": false}, + "user.profile_image_url": {"type" : "object", "enabled": false}, + "user.create_at" : {"type": "date", "format": "strict_date_time"}, + "user.description": {"type" : "object", "enabled": false}, + "user.followers_count": {"type": "object", "enabled": false}, + "user.friends_count": {"type": "object", "enabled": false}, + "user.statues_count": {"type": "object", "enabled": false}, + "place.country": {"type": "object", "enabled": false}, + "place.country_code": {"type": "object", "enabled": false}, + "place.bounding_box": {"type" : "object", "enabled": false}, + "place.full_name": {"type": "object", "enabled": false}, + "place.id": {"type": "object", "enabled": false}, + "place.name": {"type": "object", "enabled": false}, + "place.place_type": {"type": "object", "enabled": false}, + "geo_tag.stateName": {"type" : "object", "enabled": false}, + "geo_tag.countyName": {"type" : "object", "enabled": false}, + "geo_tag.cityName": {"type" : "object", "enabled": false}, + "geo_tag.stateID": {"type": "long"}, + "geo_tag.countyID": {"type": "long"}, + "geo_tag.cityID": {"type": "long"} + } + } + }, + "settings": { + "index": { + "max_result_window": 2147483647, + "number_of_replicas": 0, + "number_of_shards": 4, + "refresh_interval": "10s" + } + } +} +' + +printf "\n\n[info] Showing high-level information about indices in the Elasticsearch cluster BEFORE ingesting data...\n\n" +curl -X GET "localhost:9200/_cat/indices?v" + +printf "\n[info] Showing information about templates in the Elasticsearch cluster...\n\n" +curl -X GET "localhost:9200/_cat/templates?v&s=name&pretty" + +printf "\n[info] Compiling geo tag modules...\n\n" +sbt "project noah" assembly + +printf "\n\n[info] Start to ingest tweets...\n\n" +# The first argument after "./geotag.sh" means the number of threads used to ingest data. Feel free to change it to the number of threads your local machine has. +# Run the following command under path: ~/quick-start/cloudberry/examples/twittermap/ +# Need to use Python 3.x in the following command. +gunzip -c ./script/sample.json.gz | ./script/elasticGeoTag.sh 4 2>&1 | python3 ./script/ingestElasticData.py + +sleep 2 # Waiting for Elasticsearch indexing process +printf "\n\n[info] Showing high-level information about indices in Elasticsearch cluster AFTER ingesting data...\n\n" +curl -X GET "localhost:9200/_cat/indices?v" + +printf "\n\n[success] Finish ingesting tweets" From 98acfab0dd1d4d8570f2a7d6825c55d6f25790eb Mon Sep 17 00:00:00 2001 From: dayuebai Date: Sat, 16 Nov 2019 21:41:15 -0800 Subject: [PATCH 2/5] Add whitespaces --- examples/twittermap/script/ingestElasticData.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/twittermap/script/ingestElasticData.py b/examples/twittermap/script/ingestElasticData.py index c15451c0f..18057580e 100644 --- a/examples/twittermap/script/ingestElasticData.py +++ b/examples/twittermap/script/ingestElasticData.py @@ -18,12 +18,12 @@ import sys -print("[info]Checking Python interpreter version...\n[info]Make sure to use Python 3.0+") if sys.version_info.major >= 3: - print("[info]Passed!") + print("[info] Checked Python interpreter version...\n[info]Make sure to use Python 3.0+") + print("[info] Passed!") from urllib import request else: - raise Exception("[error]Must be using Python 3.0+") + raise Exception("[error] Must be using Python 3.0+") COUNTER = 0 BUFFER_SIZE_LIMIT = 40000 From 80f7dc5567ff0267b3ae5f6aec60e2bf321d099a Mon Sep 17 00:00:00 2001 From: cdvr <25713361+cdvr@users.noreply.github.com> Date: Thu, 5 Dec 2019 21:25:45 -0800 Subject: [PATCH 3/5] add counter for new doc id in elasticsearch --- .../uci/ics/cloudberry/noah/TwitterJSONTagToADM.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/TwitterJSONTagToADM.scala b/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/TwitterJSONTagToADM.scala index 565cc18c2..3b18554eb 100644 --- a/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/TwitterJSONTagToADM.scala +++ b/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/TwitterJSONTagToADM.scala @@ -21,7 +21,7 @@ object TwitterJSONTagToADM { var isDebug = false val bufferSize = 100 var file = "ADM" // By default, generate ADM file. - + var counter = 1 val usage = """ |Usage: USHierarchyBuilder -state /path/to/state.json -county /path/to/county.json -city /path/to/city.json @@ -42,7 +42,7 @@ object TwitterJSONTagToADM { } } - def tagOneTweet(ln: String, usGeoGnosis: USGeoGnosis) = { + def tagOneTweet(ln: String, usGeoGnosis: USGeoGnosis) = synchronized { try { if (file.equals("ADM")) { val adm = Tweet.toADM(TwitterObjectFactory.createStatus(ln), usGeoGnosis, true) @@ -50,9 +50,8 @@ object TwitterJSONTagToADM { } else { val json = Tweet.toJSON(TwitterObjectFactory.createStatus(ln), usGeoGnosis, true) if (json.length > 0) { - val obj = Json.parse(json).as[JsObject] - val twitterId = (obj \ "id").get.toString() - val indexStr = s"""{ "index": {"_id": "$twitterId" } }""" + val indexStr = s"""{ "index": {"_id": "$counter" } }""" + counter += 1 println(indexStr + "\n" + json) } } From 70f68a9f8378fe88540836bb87224aa72383bac3 Mon Sep 17 00:00:00 2001 From: dayuebai Date: Thu, 5 Dec 2019 22:25:52 -0800 Subject: [PATCH 4/5] Change indentation --- examples/twittermap/script/ingestElasticData.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/twittermap/script/ingestElasticData.py b/examples/twittermap/script/ingestElasticData.py index 18057580e..f190351d1 100644 --- a/examples/twittermap/script/ingestElasticData.py +++ b/examples/twittermap/script/ingestElasticData.py @@ -19,7 +19,7 @@ import sys if sys.version_info.major >= 3: - print("[info] Checked Python interpreter version...\n[info]Make sure to use Python 3.0+") + print("[info] Checked Python interpreter version...\n[info]Make sure to use Python 3.0+") print("[info] Passed!") from urllib import request else: @@ -45,4 +45,4 @@ if buffer: data = ("".join(buffer)).encode("utf-8") req = request.Request(URL, data=data, headers=HEADERS) - res = request.urlopen(req) \ No newline at end of file + res = request.urlopen(req) From d678d5a468fc4a4a85da6f7a34eda0f4ca33a8f7 Mon Sep 17 00:00:00 2001 From: dayuebai Date: Sat, 7 Dec 2019 22:12:53 -0800 Subject: [PATCH 5/5] revert config file changes --- .../edu/uci/ics/cloudberry/noah/TwitterJSONTagToADM.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/TwitterJSONTagToADM.scala b/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/TwitterJSONTagToADM.scala index 3b18554eb..5e52af468 100644 --- a/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/TwitterJSONTagToADM.scala +++ b/examples/twittermap/noah/src/main/scala/edu/uci/ics/cloudberry/noah/TwitterJSONTagToADM.scala @@ -21,7 +21,7 @@ object TwitterJSONTagToADM { var isDebug = false val bufferSize = 100 var file = "ADM" // By default, generate ADM file. - var counter = 1 + val usage = """ |Usage: USHierarchyBuilder -state /path/to/state.json -county /path/to/county.json -city /path/to/city.json @@ -42,7 +42,7 @@ object TwitterJSONTagToADM { } } - def tagOneTweet(ln: String, usGeoGnosis: USGeoGnosis) = synchronized { + def tagOneTweet(ln: String, usGeoGnosis: USGeoGnosis) = { try { if (file.equals("ADM")) { val adm = Tweet.toADM(TwitterObjectFactory.createStatus(ln), usGeoGnosis, true) @@ -50,9 +50,7 @@ object TwitterJSONTagToADM { } else { val json = Tweet.toJSON(TwitterObjectFactory.createStatus(ln), usGeoGnosis, true) if (json.length > 0) { - val indexStr = s"""{ "index": {"_id": "$counter" } }""" - counter += 1 - println(indexStr + "\n" + json) + println("""{ "index": {} }""" + "\n" + json) } } } catch {