Skip to content

Commit

Permalink
DataSetInfo parser (#175)
Browse files Browse the repository at this point in the history
* Write DataSetInfo parser to and from JSON record

* Add DataSetInfo Test Cases

* Update JSONParser to also write query
  • Loading branch information
MoniMoledo authored Aug 23, 2016
1 parent 7a65c09 commit 58c310a
Show file tree
Hide file tree
Showing 9 changed files with 663 additions and 83 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package edu.uci.ics.cloudberry.zion.model.impl

import edu.uci.ics.cloudberry.zion.model.schema.{Query, Schema}
import edu.uci.ics.cloudberry.zion.model.datastore.JsonRequestException
import edu.uci.ics.cloudberry.zion.model.schema._
import org.joda.time.{DateTime, Interval}
import play.api.libs.functional.syntax._
import play.api.libs.json.Reads._
import play.api.libs.json._

case class Stats(createTime: DateTime,
lastModifyTime: DateTime,
Expand All @@ -13,3 +17,128 @@ case class DataSetInfo(name: String,
schema: Schema,
dataInterval: Interval,
stats: Stats)

object DataSetInfo {

def parse(json: JsValue): DataSetInfo = {
json.validate[DataSetInfo] match {
case js: JsSuccess[DataSetInfo] => js.get
case e: JsError => throw JsonRequestException(JsError.toJson(e).toString())
}
}

def write(dataSetInfo: DataSetInfo): JsValue = Json.toJson(dataSetInfo)

implicit val intervalFormat: Format[Interval] = new Format[Interval] {
override def reads(json: JsValue) = {
val start = (json \ "start").as[DateTime]
val end = (json \ "end").as[DateTime]
JsSuccess(new Interval(start.getMillis, end.getMillis))
}

override def writes(interval: Interval): JsValue = {
JsObject(List("start" -> Json.toJson(interval.getStart), "end" -> Json.toJson(interval.getEnd)))
}
}

//Used by: HierarchyField: "levels" -> Json.toJson(hierarchy.levels))) to write from Seq[(String,String)] to JSON
implicit def tuple2Writes: Writes[Tuple2[String, String]] = Writes[(String, String)](t => Json.obj("level" -> t._1, "field" -> t._2))

def parseLevels(levelSeq: Seq[Map[String, String]]): Seq[(String, String)] = {
levelSeq.map {
levelMap => (levelMap.get("level").get, levelMap.get("field").get)
}
}

implicit val fieldFormat: Format[Field] = new Format[Field] {
override def reads(json: JsValue): JsResult[Field] = {
val name = (json \ "name").as[String]
val isOptional = (json \ "isOptional").as[Boolean]
DataType.withName((json \ "datatype").as[String]) match {
case DataType.Number =>
JsSuccess(NumberField(name, isOptional))
case DataType.Record => ???
//TODO think about Record type later
case DataType.Point =>
JsSuccess(PointField(name, isOptional))
case DataType.Bag =>
val innerType = (json \ "innerType").as[String]
JsSuccess(BagField(name, DataType.withName(innerType), isOptional))
case DataType.Boolean =>
JsSuccess(BooleanField(name, isOptional))
case DataType.Hierarchy =>
val innerType = (json \ "innerType").as[String]
val levelSeq = (json \ "levels").as[Seq[Map[String, String]]]
JsSuccess(HierarchyField(name, DataType.withName(innerType), parseLevels(levelSeq)))
case DataType.Text =>
JsSuccess(TextField(name, isOptional))
case DataType.String =>
JsSuccess(StringField(name, isOptional))
case DataType.Time =>
JsSuccess(TimeField(name, isOptional))
case unknown: DataType.Value => JsError(s"field datatype invalid: $unknown")
}
}

override def writes(field: Field): JsValue = {
val name = field.name
val isOptional = field.isOptional
val dataType = field.dataType.toString
field match {
case record: RecordField => JsNull
case bag: BagField => JsObject(List(
"name" -> JsString(name),
"isOptional" -> JsBoolean(isOptional),
"datatype" -> JsString(dataType),
"innerType" -> JsString(bag.innerType.toString)))
case hierarchy: HierarchyField => JsObject(List(
"name" -> JsString(name),
"isOptional" -> JsBoolean(isOptional),
"datatype" -> JsString(dataType),
"innerType" -> JsString(hierarchy.innerType.toString),
"levels" -> Json.toJson(hierarchy.levels)))
case basicField: Field => JsObject(List(
"name" -> JsString(name),
"isOptional" -> JsBoolean(isOptional),
"datatype" -> JsString(dataType)))
case any: Field => throw JsonRequestException(s"Field $any unsupported")
}

}
}

implicit val datetimeFormat: Format[DateTime] = new Format[DateTime] {
override def reads(json: JsValue) = {
val datetime = IQuery.TimeFormat.parseDateTime(json.as[String])
JsSuccess(datetime)
}

override def writes(dateTime: DateTime): JsValue = JsString(dateTime.toString(IQuery.TimeFormat))

}

implicit val statsFormat: Format[Stats] = (
(JsPath \ "createTime").format[DateTime] and
(JsPath \ "lastModifyTime").format[DateTime] and
(JsPath \ "lastReadTime").format[DateTime] and
(JsPath \ "cardinality").format[Int]
) (Stats.apply, unlift(Stats.unapply))

implicit val queryFormat: Format[Query] = JSONParser.queryFormat

implicit val schemaFormat: Format[Schema] = (
(JsPath \ "typeName").format[String] and
(JsPath \ "dimension").format[Seq[Field]] and
(JsPath \ "measurement").format[Seq[Field]] and
(JsPath \ "primaryKey").format[Seq[String]] and
(JsPath \ "timeField").format[String]
) (Schema.apply, unlift(Schema.unapply))

implicit val dataSetInfoFormat: Format[DataSetInfo] = (
(JsPath \ "name").format[String] and
(JsPath \ "createQuery").formatNullable[Query] and
(JsPath \ "schema").format[Schema] and
(JsPath \ "dataInterval").format[Interval] and
(JsPath \ "stats").format[Stats]
) (DataSetInfo.apply, unlift(DataSetInfo.unapply))
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ import edu.uci.ics.cloudberry.zion.model.datastore.{IJSONParser, JsonRequestExce
import edu.uci.ics.cloudberry.zion.model.schema.Relation.Relation
import edu.uci.ics.cloudberry.zion.model.schema._
import play.api.libs.functional.syntax._
import play.api.libs.json.Reads._
import play.api.libs.json._
import play.api.libs.json.{JsObject, _}

class JSONParser extends IJSONParser {

Expand All @@ -22,7 +21,7 @@ class JSONParser extends IJSONParser {
object JSONParser {
//Warn: the order of implicit values matters. The dependence should be initialized earlier

implicit val seqAnyValue: Reads[Seq[Any]] = new Reads[Seq[Any]] {
implicit val seqAnyValue: Format[Seq[Any]] = new Format[Seq[Any]] {
override def reads(json: JsValue): JsResult[Seq[Any]] = {
json.asOpt[JsArray] match {
case Some(array) =>
Expand Down Expand Up @@ -51,26 +50,46 @@ object JSONParser {
case None => JsSuccess(Seq.empty)
}
}

override def writes(seq: Seq[Any]): JsValue = {
JsArray(seq.map {
case b: Boolean => JsBoolean(b)
case s: String => JsString(s)
case i: Int => JsNumber(i)
case d: Double => JsNumber(d)
case l: Long => JsNumber(l)
case fs: FilterStatement => filterFormat.writes(fs)
case by: ByStatement => byFormat.writes(by)
case ags: AggregateStatement => aggFormat.writes(ags)
case unS: UnnestStatement => unnestFormat.writes(unS)
case other: JsValue => throw JsonRequestException(s"unknown data type: $other")
})
}
}

implicit val transformFuncReads: Reads[TransformFunc] = new Reads[TransformFunc] {
implicit val transformFuncFormat: Format[TransformFunc] = new Format[TransformFunc] {
override def reads(json: JsValue): JsResult[TransformFunc] = ???

override def writes(transformFunc: TransformFunc): JsValue = ???
}

implicit val relationReads: Reads[Relation] = new Reads[Relation] {
implicit val relationFormat: Format[Relation] = new Format[Relation] {
override def reads(json: JsValue): JsResult[Relation] = {
try {
JsSuccess(Relation.withName(json.as[String]))
} catch {
case e: NoSuchElementException => JsError(s"unknown relation: $json")
}
}
}

override def writes(relation: Relation): JsValue = JsString(relation.toString)
}

implicit val groupFuncReads: Reads[GroupFunc] = new Reads[GroupFunc] {
implicit val groupFuncFormat: Format[GroupFunc] = new Format[GroupFunc] {
override def reads(json: JsValue): JsResult[GroupFunc] = (json \ "name").as[String] match {
case GroupFunc.Bin => ???
case GroupFunc.Bin =>
val scale = (json \ "args" \ "scale").as[Int]
JsSuccess(Bin(scale))
case GroupFunc.Level =>
val level = (json \ "args" \ "level").as[String]
JsSuccess(Level(level))
Expand All @@ -89,83 +108,112 @@ object JSONParser {
case GroupFunc.GeoCellThousandth => JsSuccess(GeoCellThousandth)
case unknown: String => JsError(s"group function not found: $unknown")
}

override def writes(groupFunc: GroupFunc): JsValue = {
groupFunc match {
case fBin: Bin => JsObject(Seq("name" -> JsString(fBin.name), "args" -> JsObject(Seq("scale" -> JsNumber(fBin.scale)))))
case fLevel: Level => JsObject(Seq("name" -> JsString(fLevel.name), "args" -> JsObject(Seq("level" -> JsString(fLevel.levelTag)))))
case fInterval: Interval => JsObject(Seq("name" -> JsString(fInterval.name), "args" -> JsObject(Seq("unit" -> JsString(fInterval.unit.toString)))))
case fGeoCellScale: GeoCellScale => JsObject(Seq("name" -> JsString(groupFunc.name)))
}
}
}

implicit val aggFuncReads: Reads[AggregateFunc] = new Reads[AggregateFunc] {
implicit val aggFuncFormat: Format[AggregateFunc] = new Format[AggregateFunc] {
override def reads(json: JsValue): JsResult[AggregateFunc] = {
(json \ "name").as[String] match {
case AggregateFunc.Count => JsSuccess(Count)
case AggregateFunc.TopK => ???
case AggregateFunc.Sum => ???
case AggregateFunc.Sum => JsSuccess(Sum)
case AggregateFunc.Max => JsSuccess(Max)
case AggregateFunc.Min => JsSuccess(Min)
case AggregateFunc.Avg => ???
case AggregateFunc.Avg => JsSuccess(Avg)
case AggregateFunc.DistinctCount => ???
case unknown: String => JsError(s"unknown aggregation function: $unknown")
}
}

override def writes(aggregateFunc: AggregateFunc): JsValue = {
JsObject(List("name" -> JsString(aggregateFunc.name)))
}
}

implicit val aggReads: Reads[AggregateStatement] = {
(JsPath \ "field").read[String] and
(JsPath \ "apply").read[AggregateFunc] and
(JsPath \ "as").read[String]
}.apply(AggregateStatement.apply _)

implicit val byReads: Reads[ByStatement] = {
(JsPath \ "field").read[String] and
(JsPath \ "apply").readNullable[GroupFunc] and
(JsPath \ "as").readNullable[String]
}.apply(ByStatement.apply _)

implicit val groupReads: Reads[GroupStatement] = {
(JsPath \ "by").read[Seq[ByStatement]] and
(JsPath \ "aggregate").read[Seq[AggregateStatement]]
}.apply(GroupStatement.apply _)

implicit val globalReads: Reads[GlobalAggregateStatement] = {
(JsPath \ "globalAggregate").read[AggregateStatement].map(GlobalAggregateStatement.apply)
implicit val aggFormat: Format[AggregateStatement] = (
(JsPath \ "field").format[String] and
(JsPath \ "apply").format[AggregateFunc] and
(JsPath \ "as").format[String]
) (AggregateStatement.apply, unlift(AggregateStatement.unapply))

implicit val byFormat: Format[ByStatement] = (
(JsPath \ "field").format[String] and
(JsPath \ "apply").formatNullable[GroupFunc] and
(JsPath \ "as").formatNullable[String]
) (ByStatement.apply, unlift(ByStatement.unapply))

implicit val groupFormat: Format[GroupStatement] = (
(JsPath \ "by").format[Seq[ByStatement]] and
(JsPath \ "aggregate").format[Seq[AggregateStatement]]
) (GroupStatement.apply, unlift(GroupStatement.unapply))

implicit val globalFormat: Format[GlobalAggregateStatement] = {
(JsPath \ "globalAggregate").format[AggregateStatement].inmap(GlobalAggregateStatement.apply, unlift(GlobalAggregateStatement.unapply))
}
implicit val selectReads: Reads[SelectStatement] = {
(JsPath \ "order").read[Seq[String]] and
(JsPath \ "limit").read[Int] and
(JsPath \ "offset").read[Int] and
(JsPath \ "field").readNullable[Seq[String]].map(_.getOrElse(Seq.empty))
}.apply(SelectStatement.apply _)

implicit val lookupReads: Reads[LookupStatement] = {
(JsPath \ "sourceKey").read[Seq[String]] and
(JsPath \ "dataset").read[String] and
(JsPath \ "lookupKey").read[Seq[String]] and
(JsPath \ "select").read[Seq[String]] and
(JsPath \ "as").read[Seq[String]]
}.apply(LookupStatement.apply _)

implicit val unnestReads: Reads[Seq[UnnestStatement]] = new Reads[Seq[UnnestStatement]] {
override def reads(json: JsValue): JsResult[Seq[UnnestStatement]] = {
implicit val selectFormat: Format[SelectStatement] = (
(JsPath \ "order").format[Seq[String]] and
(JsPath \ "limit").format[Int] and
(JsPath \ "offset").format[Int] and
(JsPath \ "field").formatNullable[Seq[String]].inmap[Seq[String]](
o => o.getOrElse(Seq.empty[String]),
s => if (s.isEmpty) None else Some(s)
)
) (SelectStatement.apply, unlift(SelectStatement.unapply))

implicit val lookupFormat: Format[LookupStatement] = (
(JsPath \ "sourceKey").format[Seq[String]] and
(JsPath \ "dataset").format[String] and
(JsPath \ "lookupKey").format[Seq[String]] and
(JsPath \ "select").format[Seq[String]] and
(JsPath \ "as").format[Seq[String]]
) (LookupStatement.apply, unlift(LookupStatement.unapply))

implicit val unnestFormat: Format[UnnestStatement] = new Format[UnnestStatement] {
override def reads(json: JsValue): JsResult[UnnestStatement] = {
JsSuccess(json.as[JsObject].value.map {
case (key, jsValue: JsValue) =>
UnnestStatement(key, jsValue.as[String])
}.toSeq)
}.head)
}

override def writes(unnestStatement: UnnestStatement): JsValue = {
JsObject(Seq(unnestStatement.fieldName -> JsString(unnestStatement.as)))
}
}

implicit val filterReads: Reads[FilterStatement] = {
(JsPath \ "field").read[String] and
(JsPath \ "apply").readNullable[TransformFunc] and
(JsPath \ "relation").read[Relation] and
(JsPath \ "values").read[Seq[Any]]
}.apply(FilterStatement.apply _)
implicit val filterFormat: Format[FilterStatement] = (
(JsPath \ "field").format[String] and
(JsPath \ "apply").formatNullable[TransformFunc] and
(JsPath \ "relation").format[Relation] and
(JsPath \ "values").format[Seq[Any]]
) (FilterStatement.apply, unlift(FilterStatement.unapply))

// TODO find better name for 'global'
implicit val queryReads: Reads[Query] = {
(JsPath \ "dataset").read[String] and
(JsPath \ "lookup").readNullable[Seq[LookupStatement]].map(_.getOrElse(Seq.empty)) and
(JsPath \ "filter").readNullable[Seq[FilterStatement]].map(_.getOrElse(Seq.empty)) and
(JsPath \ "unnest").readNullable[Seq[UnnestStatement]].map(_.getOrElse(Seq.empty)) and
(JsPath \ "group").readNullable[GroupStatement] and
(JsPath \ "select").readNullable[SelectStatement] and
(JsPath \ "global").readNullable[GlobalAggregateStatement]
}.apply(Query.apply _)
implicit val queryFormat: Format[Query] = (
(JsPath \ "dataset").format[String] and
(JsPath \ "lookup").formatNullable[Seq[LookupStatement]].inmap[Seq[LookupStatement]](
o => o.getOrElse(Seq.empty[LookupStatement]),
s => if (s.isEmpty) None else Some(s)
) and
(JsPath \ "filter").formatNullable[Seq[FilterStatement]].inmap[Seq[FilterStatement]](
o => o.getOrElse(Seq.empty[FilterStatement]),
s => if (s.isEmpty) None else Some(s)
) and
(JsPath \ "unnest").formatNullable[Seq[UnnestStatement]].inmap[Seq[UnnestStatement]](
o => o.getOrElse(Seq.empty[UnnestStatement]),
s => if (s.isEmpty) None else Some(s)
) and
(JsPath \ "group").formatNullable[GroupStatement] and
(JsPath \ "select").formatNullable[SelectStatement] and
(JsPath \ "global").formatNullable[GlobalAggregateStatement]
) (Query.apply, unlift(Query.unapply))

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ trait IQuery {
}

object IQuery {
val TimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'")
val TimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
}

case class Query(dataset: String,
Expand Down
Loading

0 comments on commit 58c310a

Please sign in to comment.