Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Elasticsearch Ingestion] Add Elasticsearch ingestion pipeline #727

Merged
merged 13 commits into from
Dec 29, 2020
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package edu.uci.ics.cloudberry.noah

import java.io.File
import java.util.concurrent.Executors

import play.api.libs.json._
import edu.uci.ics.cloudberry.gnosis._
import edu.uci.ics.cloudberry.noah.adm.Tweet
import edu.uci.ics.cloudberry.util.Profile._
Expand All @@ -20,6 +20,7 @@ object TwitterJSONTagToADM {
var threadNumber = 2
var isDebug = false
val bufferSize = 100
var file = "ADM" // By default, generate ADM file.

val usage =
"""
Expand All @@ -36,14 +37,25 @@ object TwitterJSONTagToADM {
case "-city" :: value :: tail => shapeMap += CityLevel -> value; parseOption(tail)
case "-thread" :: value :: tail => threadNumber = value.toInt; parseOption(tail)
case "-debug" :: value :: tail => isDebug = true; parseOption(tail)
case "-fileFormat" :: value :: tail => file = value; parseOption(tail)
case option :: tail => System.err.println("unknown option:" + option); System.err.println(usage); System.exit(1);
}
}

def tagOneTweet(ln: String, usGeoGnosis: USGeoGnosis) = {
try {
val adm = Tweet.toADM(TwitterObjectFactory.createStatus(ln), usGeoGnosis, true)
if (adm.length > 0) println(adm)
if (file.equals("ADM")) {
val adm = Tweet.toADM(TwitterObjectFactory.createStatus(ln), usGeoGnosis, true)
if (adm.length > 0) println(adm)
} else {
val json = Tweet.toJSON(TwitterObjectFactory.createStatus(ln), usGeoGnosis, true)
if (json.length > 0) {
val obj = Json.parse(json).as[JsObject]
val twitterId = (obj \ "id").get.toString()
val indexStr = s"""{ "index": {"_id": "$twitterId" } }"""
println(indexStr + "\n" + json)
}
}
} catch {
case e: Throwable => {
if (isDebug) {
Expand Down Expand Up @@ -78,4 +90,4 @@ object TwitterJSONTagToADM {
buffer.foreach(tagOneTweet(_, usGeoGnosis))
thpool.shutdownNow()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,32 @@ public synchronized static String mkDateTimeConstructor(Date jdate) {
return "datetime(\"" + ADMDateFormat.format(jdate) + "T" + ADMTimeFormat.format(jdate) + "\")";
}

public synchronized static String mkJSONDateTimeConstructor(Date jdate) {
return ADMDateFormat.format(jdate) + "T" + ADMTimeFormat.format(jdate);
}

public static Rectangle coordinates2Rectangle(GeoLocation[][] boundingBoxCoordinates){
if (boundingBoxCoordinates.length != 1 || boundingBoxCoordinates[0].length != 4) {
throw new IllegalArgumentException("unknown boundingBoxCoordinates");
}
// Twitter has some wield format historically, though it still rectangle, but it is not always
// in (sw, se, ne,nw) order
double swLog = Collections.min(Arrays.asList(boundingBoxCoordinates[0][0].getLongitude(),
boundingBoxCoordinates[0][1].getLongitude(),
boundingBoxCoordinates[0][2].getLongitude(),
boundingBoxCoordinates[0][3].getLongitude()));
boundingBoxCoordinates[0][1].getLongitude(),
boundingBoxCoordinates[0][2].getLongitude(),
boundingBoxCoordinates[0][3].getLongitude()));
double swLat = Collections.min(Arrays.asList(boundingBoxCoordinates[0][0].getLatitude(),
boundingBoxCoordinates[0][1].getLatitude(),
boundingBoxCoordinates[0][2].getLatitude(),
boundingBoxCoordinates[0][3].getLatitude()));
boundingBoxCoordinates[0][1].getLatitude(),
boundingBoxCoordinates[0][2].getLatitude(),
boundingBoxCoordinates[0][3].getLatitude()));
double neLog = Collections.max(Arrays.asList(boundingBoxCoordinates[0][0].getLongitude(),
boundingBoxCoordinates[0][1].getLongitude(),
boundingBoxCoordinates[0][2].getLongitude(),
boundingBoxCoordinates[0][3].getLongitude()));
boundingBoxCoordinates[0][1].getLongitude(),
boundingBoxCoordinates[0][2].getLongitude(),
boundingBoxCoordinates[0][3].getLongitude()));
double neLat = Collections.max(Arrays.asList(boundingBoxCoordinates[0][0].getLatitude(),
boundingBoxCoordinates[0][1].getLatitude(),
boundingBoxCoordinates[0][2].getLatitude(),
boundingBoxCoordinates[0][3].getLatitude()));
boundingBoxCoordinates[0][1].getLatitude(),
boundingBoxCoordinates[0][2].getLatitude(),
boundingBoxCoordinates[0][3].getLatitude()));

// AsterixDB is unhappy with this kind of point "rectangular"
if (swLog == neLog && swLat == neLat){
Expand All @@ -71,22 +75,22 @@ public static Rectangle coordinates2Rectangle(GeoLocation[][] boundingBoxCoordin
}
if (swLog > neLog || swLat > neLat) {
throw new IllegalArgumentException(
"Not a good Rectangle: " + "sw:" + swLog + "," + swLat + ", ne:" + neLog + "," + neLat);
"Not a good Rectangle: " + "sw:" + swLog + "," + swLat + ", ne:" + neLog + "," + neLat);
}
return new Rectangle(swLog, swLat, neLog, neLat);
}

public static String mkRectangleConstructor(GeoLocation[][] boundingBoxCoordinates)
throws IllegalArgumentException {
throws IllegalArgumentException {
StringBuilder sb = new StringBuilder("rectangle");

Rectangle rectangle = coordinates2Rectangle(boundingBoxCoordinates);
sb.append("(\"").append(rectangle.swLog()).append(',')
.append(rectangle.swLat())
.append(' ')
.append(rectangle.neLog()).append(',')
.append(rectangle.neLat())
.append("\")");
.append(rectangle.swLat())
.append(' ')
.append(rectangle.neLog()).append(',')
.append(rectangle.neLat())
.append("\")");
return sb.toString();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,45 @@
package edu.uci.ics.cloudberry.noah.adm;

import edu.uci.ics.cloudberry.util.Rectangle;
import twitter4j.GeoLocation;

import static edu.uci.ics.cloudberry.noah.adm.ADM.coordinates2Rectangle;

public class Place {

public static String COUNTRY = "country";
public static String COUNTRY_CODE = "country_code";
public static String FULL_NAME = "full_name";
public static String ID = "id";
public static String NAME = "name";
public static String PLACE_TYPE = "place_type";
public static String BOUNDING_BOX = "bounding_box";
public static String COUNTRY = "country";
public static String COUNTRY_CODE = "country_code";
public static String FULL_NAME = "full_name";
public static String ID = "id";
public static String NAME = "name";
public static String PLACE_TYPE = "place_type";
public static String BOUNDING_BOX = "bounding_box";

public static String toADM(twitter4j.Place place) {
StringBuilder sb = new StringBuilder();
sb.append("{");
ADM.keyValueToSbWithComma(sb, COUNTRY, ADM.mkQuote(place.getCountry()));
ADM.keyValueToSbWithComma(sb, COUNTRY_CODE, ADM.mkQuote(place.getCountry()));
ADM.keyValueToSbWithComma(sb, FULL_NAME, ADM.mkQuote(place.getFullName()));
ADM.keyValueToSbWithComma(sb, ID, ADM.mkQuote(String.valueOf(place.getId())));
ADM.keyValueToSbWithComma(sb, NAME, ADM.mkQuote(place.getName()));
ADM.keyValueToSbWithComma(sb, PLACE_TYPE, ADM.mkQuote(place.getPlaceType()));
ADM.keyValueToSb(sb, BOUNDING_BOX, ADM.mkQuote(mkRectangleConstructor(place.getBoundingBoxCoordinates())));
sb.append("}");
return sb.toString();
}

public static String toADM(twitter4j.Place place) {
StringBuilder sb = new StringBuilder();
sb.append("{");
ADM.keyValueToSbWithComma(sb, COUNTRY, ADM.mkQuote(place.getCountry()));
ADM.keyValueToSbWithComma(sb, COUNTRY_CODE, ADM.mkQuote(place.getCountry()));
ADM.keyValueToSbWithComma(sb, FULL_NAME, ADM.mkQuote(place.getFullName()));
ADM.keyValueToSbWithComma(sb, ID, ADM.mkQuote(String.valueOf(place.getId())));
ADM.keyValueToSbWithComma(sb, NAME, ADM.mkQuote(place.getName()));
ADM.keyValueToSbWithComma(sb, PLACE_TYPE, ADM.mkQuote(place.getPlaceType()));
ADM.keyValueToSb(sb, BOUNDING_BOX, ADM.mkRectangleConstructor(place.getBoundingBoxCoordinates()));
sb.append("}");
return sb.toString();
}
private static String mkRectangleConstructor(GeoLocation[][] boundingBoxCoordinates)
throws IllegalArgumentException {
StringBuilder sb = new StringBuilder("LINESTRING");

}
Rectangle rectangle = coordinates2Rectangle(boundingBoxCoordinates);
sb.append("(").append(rectangle.swLog()).append(' ')
.append(rectangle.swLat())
.append(',')
.append(rectangle.neLog()).append(' ')
.append(rectangle.neLat())
.append(")");
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
import edu.uci.ics.cloudberry.gnosis.USGeoGnosis;
import twitter4j.GeoLocation;
import twitter4j.Status;
import twitter4j.HashtagEntity;
import twitter4j.UserMentionEntity;

import java.text.SimpleDateFormat;
import java.util.Locale;

import static edu.uci.ics.cloudberry.noah.adm.ADM.*;

public class Tweet {
public static String CREATE_AT = "create_at";
Expand All @@ -21,6 +28,45 @@ public class Tweet {
public static String USER = "user";
public static String PLACE = "place";

public static String toJSON(Status status, USGeoGnosis gnosis, boolean requireGeoField) throws UnknownPlaceException{
String geoTags = geoTag(status, gnosis, requireGeoField);
if (geoTags == null && requireGeoField)
return "";
StringBuilder sb = new StringBuilder();
sb.append("{");

ADM.keyValueToSbWithComma(sb, CREATE_AT, ADM.mkQuote(ADM.mkJSONDateTimeConstructor(status.getCreatedAt())));
ADM.keyValueToSbWithComma(sb, ID, String.valueOf(status.getId()));
ADM.keyValueToSbWithComma(sb, TEXT, ADM.mkQuote(status.getText()));
ADM.keyValueToSbWithComma(sb, IN_REPLY_TO_STATUS, String.valueOf(status.getInReplyToStatusId()));
ADM.keyValueToSbWithComma(sb, IN_REPLY_TO_USER, String.valueOf(status.getInReplyToUserId()));
ADM.keyValueToSbWithComma(sb, FAVORITE_COUNT, String.valueOf(status.getFavoriteCount()));
ADM.keyValueToSbWithComma(sb, RETWEET_COUNT, String.valueOf(status.getRetweetCount()));
ADM.keyValueToSbWithComma(sb, LANG, ADM.mkQuote(status.getLang()));
ADM.keyValueToSbWithComma(sb, IS_RETWEET, String.valueOf(status.isRetweet()));

if (status.getHashtagEntities().length > 0) {
ADM.keyValueToSbWithComma(sb, HASHTAG, mkStringSet(status.getHashtagEntities()));
}
if (status.getUserMentionEntities().length > 0) {
ADM.keyValueToSbWithComma(sb, USER_MENTION, mkStringSet(status.getUserMentionEntities()));
}
if (status.getPlace() != null) {
ADM.keyValueToSbWithComma(sb, PLACE, Place.toADM(status.getPlace()));
}
if (status.getGeoLocation() != null) {
ADM.keyValueToSbWithComma(sb, GEO_COORDINATE, ADM.mkQuote(mkPoint(status.getGeoLocation())));
} else if (status.getPlace() != null && status.getPlace().getPlaceType().equals("poi")) {
ADM.keyValueToSbWithComma(sb, GEO_COORDINATE, ADM.mkQuote(mkPoint(status.getPlace().getBoundingBoxCoordinates()[0][0])));
}
if(geoTags != null){
ADM.keyValueToSbWithComma(sb, GEO_TAG, geoTags);
}
ADM.keyValueToSb(sb, USER, User.toJSON(status.getUser()));
sb.append("}");
return sb.toString();
}

public static String toADM(Status status, USGeoGnosis gnosis, boolean requireGeoField) throws UnknownPlaceException{
String geoTags = geoTag(status, gnosis, requireGeoField);
if (geoTags == null && requireGeoField)
Expand Down Expand Up @@ -59,6 +105,10 @@ public static String toADM(Status status, USGeoGnosis gnosis, boolean requireGeo
return sb.toString();
}

private static String mkPoint(GeoLocation geoLocation) {
return "point(" + geoLocation.getLongitude() + " " + geoLocation.getLatitude() + ")";
}

public static String geoTag(Status status, USGeoGnosis gnosis, boolean requireGeoField) throws UnknownPlaceException{
StringBuilder sb = new StringBuilder();
if (textMatchPlace(sb, status, gnosis)) {
Expand Down Expand Up @@ -122,7 +172,7 @@ protected static boolean textMatchPlace(StringBuilder sb, Status status, USGeoGn
}
cityName = place.getFullName().substring(index + 1).trim();
info = gnosis.tagNeighborhood(cityName,
ADM.coordinates2Rectangle(place.getBoundingBoxCoordinates()));
ADM.coordinates2Rectangle(place.getBoundingBoxCoordinates()));
break;
case "poi": // a point
double longitude = (place.getBoundingBoxCoordinates())[0][0].getLongitude();
Expand All @@ -141,4 +191,30 @@ protected static boolean textMatchPlace(StringBuilder sb, Status status, USGeoGn
return true;
}

}
private static String mkStringSet(HashtagEntity[] hashtagEntities) {
StringBuilder sb = new StringBuilder();
sb.append("[");
for (int i = 0; i < hashtagEntities.length; i++) {
if (i > 0) {
sb.append(',');
}
sb.append(ADM.mkQuote(hashtagEntities[i].getText()));
}
sb.append("]");
return sb.toString();
}

private static String mkStringSet(UserMentionEntity[] userMentionEntities) {
StringBuilder sb = new StringBuilder();
sb.append("[");
for (int i = 0; i < userMentionEntities.length; i++) {
if (i > 0) {
sb.append(',');
}
sb.append(userMentionEntities[i].getId());
}
sb.append("]");
return sb.toString();
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package edu.uci.ics.cloudberry.noah.adm;

import java.text.SimpleDateFormat;
import java.util.Locale;

import static edu.uci.ics.cloudberry.noah.adm.ADM.ADMDateFormat;
import static edu.uci.ics.cloudberry.noah.adm.ADM.ADMTimeFormat;

public class User {

public static final String ID = "id";
Expand Down Expand Up @@ -33,4 +39,25 @@ public static String toADM(twitter4j.User user) {

return sb.toString();
}
}

public static String toJSON(twitter4j.User user) {

StringBuilder sb = new StringBuilder();
sb.append("{");
ADM.keyValueToSbWithComma(sb, ID, String.valueOf(user.getId()));
ADM.keyValueToSbWithComma(sb, NAME, ADM.mkQuote(user.getName()));
ADM.keyValueToSbWithComma(sb, SCREEN_NAME, ADM.mkQuote(user.getScreenName()));
ADM.keyValueToSbWithComma(sb, PROFILE_IMAGE_URL, ADM.mkQuote(user.getProfileImageURL()));
ADM.keyValueToSbWithComma(sb, LANG, ADM.mkQuote(user.getLang()));
ADM.keyValueToSbWithComma(sb, LOCATION, ADM.mkQuote(user.getLocation()));

ADM.keyValueToSbWithComma(sb, CREATE_AT, ADM.mkQuote(ADM.mkJSONDateTimeConstructor(user.getCreatedAt())));
ADM.keyValueToSbWithComma(sb, DESCRIPTION, ADM.mkQuote(user.getDescription()));
ADM.keyValueToSbWithComma(sb, FOLLOWERS_COUNT, String.valueOf(user.getFollowersCount()));
ADM.keyValueToSbWithComma(sb, FRIENDS_COUNT, String.valueOf(user.getFriendsCount()));
ADM.keyValueToSb(sb, STATUS_COUNT, String.valueOf(user.getStatusesCount()));
sb.append("}");

return sb.toString();
}
}
28 changes: 28 additions & 0 deletions examples/twittermap/script/elasticGeoTag.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/bash -
#===============================================================================
#
# FILE: elasticGeoTag.sh
#
# USAGE: ./elasticGeoTag.sh < read stdin > write stdout
#
# DESCRIPTION:
#
# OPTIONS: ---
# REQUIREMENTS: ---
# BUGS: ---
# NOTES: ---
# AUTHOR: Dayue Bai (dayueb@uci.edu), Baihao Wang (baihaow@uci.edu)
# ORGANIZATION: ics.uci.edu
# CREATED: 11/02/2019 21:29:00 PM PST
# REVISION: ---
#===============================================================================

set -o nounset # Treat unset variables as an error

thread=${1:-1}
sbt -mem 2048 "project noah" --error 'set showSuccess := false' "run-main edu.uci.ics.cloudberry.noah.TwitterJSONTagToADM\
-state web/public/data/state.json\
-county web/public/data/county.json \
-city web/public/data/city.json \
-thread $thread \
-fileFormat \"JSON\""
Loading