Skip to content

Commit

Permalink
GEOMESA-976 Converter Ingest Job and InputFormat
Browse files Browse the repository at this point in the history
* Supports all converter types for local & mapreduce modes (including binary converters like avro)
* GEOMESA-1033 - Fix Guava version conflict with tools
* GEOMESA-1035 - Define SFT and converter config on command line as files or strings
* GEOMESA-968 - MapReduce counters
* Validated that converted SimpleFeatures have date and time
* Added process(is: InputStream) method to converters (also needed for Nifi)
* Support for multi-line and single-line json/xml documents
* GZ compression for HDFS files
* GZ, XZ, BZ compression support for local files
* Multi-threaded local ingest
* Changed counters to Longs

Signed-off-by: Andrew Hulbert <andrew.hulbert@ccri.com>
  • Loading branch information
jahhulbert-ccri authored and elahrvivaz committed Jan 14, 2016
1 parent 8dbd206 commit 1f34513
Show file tree
Hide file tree
Showing 48 changed files with 1,276 additions and 262 deletions.
8 changes: 4 additions & 4 deletions geomesa-convert/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -669,10 +669,10 @@ Let's say we want to convert our avro array of kvpairs into a simple feature. We
We can define a converter config to parse the avro:

converter = {
type = "avro"
schema = "/tmp/schema.avsc"
sft = "testsft"
id-field = "uuid()"
type = "avro"
schema-file = "/tmp/schema.avsc"
sft = "testsft"
id-field = "uuid()"
fields = [
{ name = "tobj", transform = "avroPath($1, '/content$type=DataObj')" },
{ name = "lat", transform = "avroPath($tobj, '/kvmap[$k=lat]/v')" },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@

package org.locationtech.geomesa.convert.avro

import java.io.InputStream

import com.typesafe.config.Config
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.io.{BinaryDecoder, DecoderFactory}
import org.locationtech.geomesa.convert.Transformers.Expr
import org.locationtech.geomesa.convert.{Field, SimpleFeatureConverter, SimpleFeatureConverterFactory, ToSimpleFeatureConverter}
import org.opengis.feature.simple.SimpleFeatureType
import org.locationtech.geomesa.convert.Transformers.{EvaluationContext, Expr}
import org.locationtech.geomesa.convert._
import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}

import scala.collection.JavaConversions._

Expand All @@ -23,13 +25,18 @@ class AvroSimpleFeatureConverterFactory extends SimpleFeatureConverterFactory[Ar
override def canProcess(conf: Config): Boolean = canProcessType(conf, "avro")

override def buildConverter(targetSFT: SimpleFeatureType, conf: Config): SimpleFeatureConverter[Array[Byte]] = {
val avroSchemaPath = conf.getString("schema")
val avroSchema = new org.apache.avro.Schema.Parser().parse(getClass.getResourceAsStream(avroSchemaPath))
val avroSchema =
if (conf.hasPath("schema-file")) {
new org.apache.avro.Schema.Parser().parse(getClass.getResourceAsStream(conf.getString("schema-file")))
} else {
new org.apache.avro.Schema.Parser().parse(conf.getString("schema"))
}

val reader = new GenericDatumReader[GenericRecord](avroSchema)
val fields = buildFields(conf.getConfigList("fields"))
val idBuilder = buildIdBuilder(conf.getString("id-field"))

new AvroSimpleFeatureConverter(avroSchema, reader, targetSFT, fields, idBuilder)
new AvroSimpleFeatureConverter(avroSchema, reader, targetSFT, fields, idBuilder, isValidating(conf))
}

}
Expand All @@ -38,7 +45,8 @@ class AvroSimpleFeatureConverter(avroSchema: Schema,
reader: GenericDatumReader[GenericRecord],
val targetSFT: SimpleFeatureType,
val inputFields: IndexedSeq[Field],
val idBuilder: Expr)
val idBuilder: Expr,
val validating: Boolean)
extends ToSimpleFeatureConverter[Array[Byte]] {

var decoder: BinaryDecoder = null
Expand All @@ -49,4 +57,52 @@ class AvroSimpleFeatureConverter(avroSchema: Schema,
Seq(Array(bytes, reader.read(recordReuse, decoder)))
}

override def process(is: InputStream, ec: EvaluationContext = createEvaluationContext()): Iterator[SimpleFeature] = {
decoder = DecoderFactory.get.binaryDecoder(is, null)

class FeatureItr extends Iterator[SimpleFeature] {
private var cur: SimpleFeature = null

override def hasNext: Boolean = {
if (cur == null) {
do { fetchNext() } while (cur == null && !decoder.isEnd)
cur != null
} else {
true
}
}

override def next(): SimpleFeature = {
hasNext
if (cur != null) {
val ret = cur
cur = null
ret
} else throw new NoSuchElementException
}

def fetchNext() = {
if (!decoder.isEnd) {
ec.counter.incLineCount()
val rec = reader.read(null, decoder)
try {
cur = convert(Array[Any](null, rec), ec)
if (cur != null) {
ec.counter.incSuccess()
} else {
ec.counter.incFailure()
}
}
catch {
case e: Exception =>
logger.warn(s"Failed to parse avro record '${rec.toString}'", e)
ec.counter.incFailure()
}
}
}
}

new FeatureItr
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
type-name = "testsft"
attributes = [
{ name = "dtg", type = "Date", index = "true", default = true }
{ name = "geom", type = "Point", index = "true", srid = 4326, default = true }
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class AvroPathTest extends Specification with AvroUtils {
val result = avroPath.eval(gr1)
result.isDefined mustEqual true
val arr = result.get.asInstanceOf[GenericArray[GenericRecord]]
arr.length mustEqual 4
arr.length mustEqual 5
}

"filter arrays of records by a field predicate" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.locationtech.geomesa.convert.avro

import java.io.ByteArrayInputStream

import com.typesafe.config.ConfigFactory
import org.junit.runner.RunWith
import org.locationtech.geomesa.convert.SimpleFeatureConverters
Expand All @@ -18,17 +20,20 @@ import org.specs2.runner.JUnitRunner
@RunWith(classOf[JUnitRunner])
class AvroSimpleFeatureConverterTest extends Specification with AvroUtils {

sequential

"Avro2SimpleFeature should" should {

val conf = ConfigFactory.parseString(
"""
| converter = {
| type = "avro"
| schema = "/schema.avsc"
| sft = "testsft"
| id-field = "uuid()"
| type = "avro"
| schema-file = "/schema.avsc"
| sft = "testsft"
| id-field = "uuid()"
| fields = [
| { name = "tobj", transform = "avroPath($1, '/content$type=TObj')" },
| { name = "dtg", transform = "date('YYYY-MM-dd', avroPath($tobj, '/kvmap[$k=dtg]/v'))" },
| { name = "lat", transform = "avroPath($tobj, '/kvmap[$k=lat]/v')" },
| { name = "lon", transform = "avroPath($tobj, '/kvmap[$k=lon]/v')" },
| { name = "geom", transform = "point($lon, $lat)" }
Expand All @@ -39,8 +44,27 @@ class AvroSimpleFeatureConverterTest extends Specification with AvroUtils {
"properly convert a GenericRecord to a SimpleFeature" >> {
val sft = SimpleFeatureTypes.createType(ConfigFactory.load("sft_testsft.conf"))
val converter = SimpleFeatureConverters.build[Array[Byte]](sft, conf)
val sf = converter.processInput(Iterator.apply[Array[Byte]](bytes)).next()
sf.getAttributeCount must be equalTo 1
val ec = converter.createEvaluationContext()
val sf = converter.processInput(Iterator.apply[Array[Byte]](bytes), ec).next()
sf.getAttributeCount must be equalTo 2
sf.getAttribute("dtg") must not beNull

ec.counter.getFailure mustEqual 0L
ec.counter.getSuccess mustEqual 1L
ec.counter.getLineCount mustEqual 1L // only 1 record passed in itr
}

"properly convert an input stream" >> {
val sft = SimpleFeatureTypes.createType(ConfigFactory.load("sft_testsft.conf"))
val converter = SimpleFeatureConverters.build[Array[Byte]](sft, conf)
val ec = converter.createEvaluationContext()
val sf = converter.process(new ByteArrayInputStream(bytes), ec).next()
sf.getAttributeCount must be equalTo 2
sf.getAttribute("dtg") must not beNull

ec.counter.getFailure mustEqual 0L
ec.counter.getSuccess mustEqual 1L
ec.counter.getLineCount mustEqual 1L // zero indexed so this is 2 records
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ trait AvroUtils {
val rec2 = innerBuilder.set("k", "lon").set("v", 45.0).build
val rec3 = innerBuilder.set("k", "prop3").set("v", " foo ").build
val rec4 = innerBuilder.set("k", "prop4").set("v", 1.0).build
val rec5 = innerBuilder.set("k", "dtg").set("v", "2015-01-02").build

val outerBuilder = new GenericRecordBuilder(tObjSchema)
val tObj = outerBuilder.set("kvmap", List(rec1, rec2, rec3, rec4).asJava).build()
val tObj = outerBuilder.set("kvmap", List(rec1, rec2, rec3, rec4, rec5).asJava).build()

val compositeBuilder = new GenericRecordBuilder(schema)
val obj = compositeBuilder.set("content", tObj).build()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/***********************************************************************
* Copyright (c) 2013-2015 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0 which
* accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
*************************************************************************/
package org.locationtech.geomesa.convert.avro

import java.io.{FileOutputStream, File}

import org.apache.avro.file.DataFileWriter

// helper for integration tests and such
object GenerateAvro extends AvroUtils {

def main(args: Array[String]): Unit = {
val f = new File("/tmp/no-header.avro")
f.createNewFile()
val fos = new FileOutputStream(f)
fos.write(bytes)
fos.write(bytes2)
fos.close()

val dfw = new DataFileWriter(writer)
dfw.create(schema, new File("/tmp/with-header.avro"))
dfw.append(obj)
dfw.append(obj2)
dfw.close()
}

}
4 changes: 4 additions & 0 deletions geomesa-convert/geomesa-convert-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-feature-all</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-utils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.locationtech.geomesa.convert

import java.io.InputStream

import com.typesafe.config.Config
import org.locationtech.geomesa.convert.Transformers.{Counter, EvaluationContext, Predicate}
import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}
Expand Down Expand Up @@ -88,7 +90,9 @@ class CompositeConverter[I](val targetSFT: SimpleFeatureType, converters: Seq[(P
}
}

override def processSingleInput(i: I, ec: EvaluationContext): Seq[SimpleFeature] = ???

override def process(is: InputStream, ec: EvaluationContext): Iterator[SimpleFeature] = ???
}

case class CompositeEvaluationContext(contexts: IndexedSeq[EvaluationContext]) extends EvaluationContext {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/***********************************************************************
* Copyright (c) 2013-2015 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0 which
* accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
*************************************************************************/
package org.locationtech.geomesa.convert

import com.typesafe.config.Config

object LineMode extends Enumeration {
type LineMode = Value

val Single = Value("single")
val Multi = Value("multi")

val Default = Single

def getLineMode(conf: Config): LineMode = {
if (conf.hasPath(StandardOptions.LineMode)) {
val m = conf.getString(StandardOptions.LineMode).toLowerCase
LineMode.withName(m)
} else {
LineMode.Default
}
}

}
Loading

0 comments on commit 1f34513

Please sign in to comment.