Skip to content

Commit

Permalink
Merge branch 'master' into elastic-ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
baiqiushi authored Oct 27, 2020
2 parents 4881aa4 + 86790a2 commit d3bcf93
Show file tree
Hide file tree
Showing 116 changed files with 245,578 additions and 178 deletions.
6 changes: 3 additions & 3 deletions cloudberry/neo/app/controllers/Cloudberry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ class Cloudberry @Inject()(val wsClient: WSClient,

def ws = WebSocket.accept[JsValue, JsValue] { request =>
ActorFlow.actorRef { out =>
RequestRouter.props(BerryClient.props(new JSONParser(), manager, new QueryPlanner(), config, out), config, request)
RequestRouter.props(BerryClient.props(new JSONParser(), manager, new QueryPlanner(config), config, out), config, request)
}
}

// A WebSocket for checking whether a query is solvable by view
def checkQuerySolvableByView = WebSocket.accept[JsValue, JsValue] { request =>
ActorFlow.actorRef { out =>
RequestRouter.props(ViewStatusClient.props(new JSONParser(), manager, new QueryPlanner(), config, out), config, request)
RequestRouter.props(ViewStatusClient.props(new JSONParser(), manager, new QueryPlanner(config), config, out), config, request)
}
}

Expand All @@ -108,7 +108,7 @@ class Cloudberry @Inject()(val wsClient: WSClient,
val source = Source.single(request.body)

val flow = Cloudberry.actorFlow[JsValue, JsValue]({ out =>
BerryClient.props(new JSONParser(), manager, new QueryPlanner(), config, out)
BerryClient.props(new JSONParser(), manager, new QueryPlanner(config), config, out)
}, BerryClient.Done)
val toStringFlow = Flow[JsValue].map(js => js.toString() + System.lineSeparator())
Ok.chunked((source via flow) via toStringFlow)
Expand Down
14 changes: 13 additions & 1 deletion cloudberry/neo/conf/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ akka {
}
}
}
test {
timefactor = 1.0
filter-leeway = 10s
single-expect-default = 10s
default-timeout = 10s

calling-thread-dispatcher {
type = akka.testkit.CallingThreadDispatcherConfigurator
}
}
}

play {
Expand All @@ -79,6 +89,8 @@ actor {
user.timeout = "500 seconds"
}

view.enable = true

view.update.interval = "30 minutes"

view.meta.flush.interval = "30 minutes"
Expand Down Expand Up @@ -108,4 +120,4 @@ asterixdb.view.meta.name = "viewMeta"
# For example, when the resolution is extremely high in the TwitterMap,
# the frontend may send queries that contain a lot of cityIDs where the size
# can easily exceed the default message size limit of 65536 bytes.
play.websocket.buffer.limit = 5M
play.websocket.buffer.limit = 20M
49 changes: 49 additions & 0 deletions cloudberry/neo/conf/logback-debug.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<configuration>

<conversionRule conversionWord="coloredLevel" converterClass="play.api.libs.logback.ColoredLevel" />

<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>logs/application.log</file>
<encoder>
<pattern>%date [%level] from %logger in %thread - %message%n%xException</pattern>
</encoder>
</appender>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<encoder>
<pattern>%coloredLevel %logger{15} - %message%n%xException{10}</pattern>
</encoder>
</appender>

<appender name="CLIENT_FILE" class="ch.qos.logback.core.FileAppender">
<file>logs/client.log</file>
<encoder>
<pattern>%date [%level] - %message%n</pattern>
</encoder>
</appender>

<!-- additivity=false ensures client log data only goes to the client log -->
<logger name="client" level="INFO" additivity="false">
<appender-ref ref="CLIENT_FILE" />
</logger>
<logger name="play" level="INFO" />
<logger name="application" level="DEBUG" />
<logger name="akka" level="DEBUG" />

<!-- Off these ones as they are annoying, and anyway we manage configuration ourself -->
<logger name="com.avaje.ebean.config.PropertyMapLoader" level="OFF" />
<logger name="com.avaje.ebeaninternal.server.core.XmlConfigLoader" level="OFF" />
<logger name="com.avaje.ebeaninternal.server.lib.BackgroundThread" level="OFF" />
<logger name="com.gargoylesoftware.htmlunit.javascript" level="OFF" />
<logger name="io.netty" level="OFF"/>
<logger name="org.asynchttpclient" level="OFF"/>

<root level="DEBUG">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE" />
</root>

</configuration>
12 changes: 6 additions & 6 deletions cloudberry/neo/conf/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
<level>ERROR</level>
</filter>
<encoder>
<pattern>%coloredLevel %logger{15} - %message%n%xException{10}</pattern>
Expand All @@ -26,12 +26,12 @@
</appender>

<!-- additivity=false ensures client log data only goes to the client log -->
<logger name="client" level="INFO" additivity="false">
<logger name="client" level="ERROR" additivity="false">
<appender-ref ref="CLIENT_FILE" />
</logger>
<logger name="play" level="INFO" />
<logger name="application" level="DEBUG" />
<logger name="akka" level="DEBUG" />
<logger name="play" level="ERROR" />
<logger name="application" level="ERROR" />
<logger name="akka" level="ERROR" />

<!-- Off these ones as they are annoying, and anyway we manage configuration ourself -->
<logger name="com.avaje.ebean.config.PropertyMapLoader" level="OFF" />
Expand All @@ -41,7 +41,7 @@
<logger name="io.netty" level="OFF"/>
<logger name="org.asynchttpclient" level="OFF"/>

<root level="DEBUG">
<root level="ERROR">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE" />
</root>
Expand Down
6 changes: 5 additions & 1 deletion cloudberry/neo/conf/production.conf
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ actor {
user.timeout = "500 seconds"
}

view.enable = true

view.update.interval = "30 minutes"

view.meta.flush.interval = "30 minutes"
Expand All @@ -87,8 +89,10 @@ berry.firstquery.gap = "2 days"
berry.query.gap = "1 day"

asterixdb.url = "http://localhost:19002/query/service"

#acceptable values: "AQL" or "SQLPP" or "sparksql" or "SQL"
asterixdb.lang = SQLPP

asterixdb.view.meta.name = "viewMeta"

# WebSocket message size limit
Expand All @@ -97,4 +101,4 @@ asterixdb.view.meta.name = "viewMeta"
# For example, when the resolution is extremely high in the TwitterMap,
# the frontend may send queries that contain a lot of cityIDs where the size
# can easily exceed the default message size limit of 65536 bytes.
play.websocket.buffer.limit = 5M
play.websocket.buffer.limit = 20M
2 changes: 1 addition & 1 deletion cloudberry/neo/conf/routes
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ POST /admin/deregister controllers.Cloudberry.deregiste

# Map static resources from the /public folder to the /assets URL path
GET /assets/*file controllers.Assets.versioned(path="/public", file: Asset)
GET /favicon.ico controllers.Assets.at(path="/public/images", file="favicon.ico")
GET /favicon.ico controllers.Assets.at(path="/public/images", file="favicon.ico")
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,4 @@ class RESTSolver(val dataManager: ActorRef,
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class Reporter(out: ActorRef)(implicit val ec: ExecutionContext) extends Actor w

override def receive: Actor.Receive = commonReceive orElse {
case result: PartialResult =>
queue.enqueue(result)
queue.enqueue(result)
case TimeToReport => {
if (queue.isEmpty) {
timer.cancel()
Expand All @@ -36,8 +36,8 @@ class Reporter(out: ActorRef)(implicit val ec: ExecutionContext) extends Actor w
}

private def hungry(since: DateTime): Actor.Receive = commonReceive orElse {
case r: PartialResult =>
out ! Json.toJson(r.content)
case result: PartialResult =>
out ! Json.toJson(result.content)
val delay = new TInterval(since, DateTime.now())
log.warning(s"delayed ${delay.toDurationMillis / 1000.0} seconds ")
timer = context.system.scheduler.schedule(limit, limit, self, TimeToReport)
Expand All @@ -57,9 +57,9 @@ class Reporter(out: ActorRef)(implicit val ec: ExecutionContext) extends Actor w
if (queue.nonEmpty) {
/*
Logistic Here is when query finished, but there are still some results in queue
we return them altogher.
we return them altogether.
*/
if(fin.returnDelta){
if (fin.returnDelta) {
queue.dequeueAll(deltaResult=>
{
out ! Json.toJson(deltaResult.content)
Expand All @@ -68,12 +68,12 @@ class Reporter(out: ActorRef)(implicit val ec: ExecutionContext) extends Actor w
)

}
else{
else {
out ! Json.toJson(queue.dequeueAll(_ => true).last.content)
}
//TODO remove this special DONE message
out ! fin.lastMsg // notifying the client the processing is done
}
// notifying the client the processing is done
out ! fin.lastMsg
timer.cancel()
context.become(receive)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class Config(config: Configuration) {

val UserTimeOut = config.getString("actor.user.timeout").map(parseTimePair).getOrElse(60 seconds)

val viewMaintenanceEnable = config.getBoolean("view.enable").getOrElse(true)

val ViewUpdateInterval = config.getString("view.update.interval").map(parseTimePair).getOrElse(60 minutes)

val ViewMetaFlushInterval = config.getString("view.meta.flush.interval").map(parseTimePair).getOrElse(60 minutes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class AsterixSQLPPConn(url: String, wSClient: WSClient)(implicit ec: ExecutionCo
}

protected def params(query: String): Map[String, Seq[String]] = {
Map("statement" -> Seq(query), "mode" -> Seq("synchronous"), "include-results" -> Seq("true"))
Map("statement" -> Seq(query), "mode" -> Seq("immediate"), "include-results" -> Seq("true"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,25 @@ package edu.uci.ics.cloudberry.zion.model.impl

import java.security.MessageDigest

import edu.uci.ics.cloudberry.zion.common.Config
import edu.uci.ics.cloudberry.zion.model.schema._
import org.joda.time.{DateTime, Interval}
import play.api.libs.json._

class QueryPlanner {
class QueryPlanner (val config: Config) {

import QueryPlanner._

def makePlan(query: Query, source: DataSetInfo, views: Seq[DataSetInfo]): (Seq[Query], IMerger) = {

//TODO currently only get the best one
val bestView = selectBestView(findMatchedViews(query, source, views))
splitQuery(query, source, bestView)
config.viewMaintenanceEnable match {
case true =>
//TODO currently only get the best one
val bestView = selectBestView(findMatchedViews(query, source, views))
splitQuery(query, source, bestView)
case false =>
(Seq(query), Unioner)
}
}

// Return whether there is matched views for a query, and it is used by the ViewStatusClient
Expand All @@ -28,19 +34,23 @@ class QueryPlanner {
}

def suggestNewView(query: Query, source: DataSetInfo, views: Seq[DataSetInfo]): Seq[CreateView] = {
//TODO currently only suggest the keyword subset views
if (views.exists(v => v.createQueryOpt.exists(vq => vq.canSolve(query, source.schema)))) {
Seq.empty[CreateView]
} else {
val keywordFilters = query.filter.filter(f => f.field.dataType == DataType.Text)
keywordFilters.flatMap { kwFilter =>
kwFilter.values.map { wordAny =>
val word = wordAny.asInstanceOf[String]
val wordFilter = FilterStatement(kwFilter.field, None, Relation.contains, Seq(word))
val wordQuery = Query(query.dataset, Seq.empty, Seq.empty, Seq(wordFilter), Seq.empty, None, None)
CreateView(getViewKey(query.dataset, word), wordQuery)
config.viewMaintenanceEnable match {
case true =>
//TODO currently only suggest the keyword subset views
if (views.exists(v => v.createQueryOpt.exists(vq => vq.canSolve(query, source.schema)))) {
Seq.empty[CreateView]
} else {
val keywordFilters = query.filter.filter(f => f.field.dataType == DataType.Text)
keywordFilters.flatMap { kwFilter =>
kwFilter.values.map { wordAny =>
val word = wordAny.asInstanceOf[String]
val wordFilter = FilterStatement(kwFilter.field, None, Relation.contains, Seq(word))
val wordQuery = Query(query.dataset, Seq.empty, Seq.empty, Seq(wordFilter), Seq.empty, None, None)
CreateView(getViewKey(query.dataset, word), wordQuery)
}
}
}
}
case false => Seq.empty[CreateView]
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package edu.uci.ics.cloudberry.zion.model.impl

import edu.uci.ics.cloudberry.zion.common.Config
import edu.uci.ics.cloudberry.zion.model.schema._
import org.joda.time.DateTime
import org.specs2.mutable.Specification
Expand All @@ -16,7 +17,7 @@ class QueryPlannerTest extends Specification {
val queryTag = new Query(TwitterDataSet, Seq.empty, Seq.empty, filter, Seq(unnestHashTag), Some(groupTag), Some(selectTop10Tag))
val querySample = new Query(TwitterDataSet, Seq.empty, Seq.empty, filter, Seq.empty, None, Some(selectRecent))

val planner = new QueryPlanner
val planner = new QueryPlanner(Config.Default)

val zikaFullStats = Stats(sourceInterval.getStart, sourceInterval.getEnd, sourceInterval.getEnd, 50)
val zikaFullYearViewInfo = DataSetInfo("zika", Some(zikaCreateQuery), twitterSchema, sourceInterval, zikaFullStats)
Expand Down
26 changes: 26 additions & 0 deletions examples/coronavirustwittermap/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
logs
project/project
project/target
bin
target
tmp
.history
.sbtserver
.sbtserver.*
dist
/.idea
/*.iml
/out
/.idea_modules
.classpath
.project
/RUNNING_PID
.settings
/project/*-shim.sbt
/activator-sbt-*-shim.sbt
*.iml
OutputFolderPromedmail/
.DS_Store
.cache-main
.cache-tests
.pydevproject
26 changes: 26 additions & 0 deletions examples/coronavirustwittermap/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Coronavirus TwitterMap Setup

Follow the quick-start instructions here: [(II) Setup Twittermap on your Local Machine](https://github.com/ISG-ICS/cloudberry/wiki/quick-start#ii-setup-twittermap-on-your-local-machine)

The different steps are as follows.

1) In **Step 2.3: Download and ingest the synthetic sample tweets (about 100K) data into AsterixDB.**

Replace **(1) Download the synthetic sample tweets (about 100K) data:** with the following commands,
```bash
cd ~/quick-start/cloudberry/examples/twittermap/script/
wget http://cloudberry.ics.uci.edu/img/coronavirus-tweets/sample.adm.gz
```
**_Note: this will download coronavirus replated sample tweets._**

2) In **Step 2.4: Start the TwitterMap Web server (in port 9001):**

Use the following command to start CoronavirusTwitterMap,
```bash
cd ~/quick-start/cloudberry/examples/coronavirustwittermap/
sbt "project web" "run 9001"
```

Now you should be able to see Coronavirus TwitterMap frontend by visiting: [http://localhost:9001](http://localhost:9001).

**_Note: all other information and trouble shooting should be the same with quick-start here:_** [(II) Setup Twittermap on your Local Machine](https://github.com/ISG-ICS/cloudberry/wiki/quick-start#ii-setup-twittermap-on-your-local-machine)
Loading

0 comments on commit d3bcf93

Please sign in to comment.