forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SC-5606] Inline spark-avro sources into databricks/spark
This patch ports `spark-avro` as of databricks/spark-avro@b01a034 and updates it to run with Spark 2.10 by including the fixes from databricks/spark-avro#206. Via a transitive dependency on `spark-core` and `spark-sql`, this winds up not needing to add new dependencies on Avro to the poms. I've updated the license headers to use the header that we use for Spark-edge features. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#170 from JoshRosen/add-spark-avro.
- Loading branch information
Showing
33 changed files
with
2,236 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
~ Licensed to the Apache Software Foundation (ASF) under one or more | ||
~ contributor license agreements. See the NOTICE file distributed with | ||
~ this work for additional information regarding copyright ownership. | ||
~ The ASF licenses this file to You under the Apache License, Version 2.0 | ||
~ (the "License"); you may not use this file except in compliance with | ||
~ the License. You may obtain a copy of the License at | ||
~ | ||
~ http://www.apache.org/licenses/LICENSE-2.0 | ||
~ | ||
~ Unless required by applicable law or agreed to in writing, software | ||
~ distributed under the License is distributed on an "AS IS" BASIS, | ||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
~ See the License for the specific language governing permissions and | ||
~ limitations under the License. | ||
--> | ||
|
||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
<parent> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-parent_2.11</artifactId> | ||
<version>2.1.0</version> | ||
<relativePath>../../pom.xml</relativePath> | ||
</parent> | ||
|
||
<groupId>com.databricks</groupId> | ||
<artifactId>spark-avro_2.11</artifactId> | ||
<properties> | ||
<sbt.project.name>avro</sbt.project.name> | ||
</properties> | ||
<packaging>jar</packaging> | ||
<name>Spark Avro</name> | ||
<url>http://spark.apache.org/</url> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-sql_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-core_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> | ||
<type>test-jar</type> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-catalyst_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> | ||
<type>test-jar</type> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-sql_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> | ||
<type>test-jar</type> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-tags_${scala.binary.version}</artifactId> | ||
</dependency> | ||
</dependencies> | ||
<build> | ||
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> | ||
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> | ||
</build> | ||
</project> |
140 changes: 140 additions & 0 deletions
140
external/avro/src/main/scala/com/databricks/spark/avro/AvroOutputWriter.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
/* | ||
* Copyright (C) 2016 Databricks, Inc. | ||
* | ||
* Portions of this software incorporate or are derived from software contained within Apache Spark, | ||
* and this modified software differs from the Apache Spark software provided under the Apache | ||
* License, Version 2.0, a copy of which you may obtain at | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
*/ | ||
|
||
package com.databricks.spark.avro | ||
|
||
import java.io.{IOException, OutputStream} | ||
import java.nio.ByteBuffer | ||
import java.sql.Timestamp | ||
import java.util.HashMap | ||
|
||
import scala.collection.immutable.Map | ||
|
||
import org.apache.avro.{Schema, SchemaBuilder} | ||
import org.apache.avro.generic.GenericData.Record | ||
import org.apache.avro.generic.GenericRecord | ||
import org.apache.avro.mapred.AvroKey | ||
import org.apache.avro.mapreduce.AvroKeyOutputFormat | ||
import org.apache.hadoop.fs.Path | ||
import org.apache.hadoop.io.NullWritable | ||
import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext} | ||
|
||
import org.apache.spark.sql.Row | ||
import org.apache.spark.sql.execution.datasources.OutputWriter | ||
import org.apache.spark.sql.types._ | ||
|
||
// NOTE: This class is instantiated and used on executor side only, no need to be serializable. | ||
private[avro] class AvroOutputWriter( | ||
path: String, | ||
context: TaskAttemptContext, | ||
schema: StructType, | ||
recordName: String, | ||
recordNamespace: String) extends OutputWriter { | ||
|
||
private lazy val converter = createConverterToAvro(schema, recordName, recordNamespace) | ||
|
||
/** | ||
* Overrides the couple of methods responsible for generating the output streams / files so | ||
* that the data can be correctly partitioned | ||
*/ | ||
private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] = | ||
new AvroKeyOutputFormat[GenericRecord]() { | ||
|
||
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { | ||
new Path(path) | ||
} | ||
|
||
@throws(classOf[IOException]) | ||
override def getAvroFileOutputStream(c: TaskAttemptContext): OutputStream = { | ||
val path = getDefaultWorkFile(context, ".avro") | ||
path.getFileSystem(context.getConfiguration).create(path) | ||
} | ||
|
||
}.getRecordWriter(context) | ||
|
||
override def write(row: Row): Unit = { | ||
val key = new AvroKey(converter(row).asInstanceOf[GenericRecord]) | ||
recordWriter.write(key, NullWritable.get()) | ||
} | ||
|
||
override def close(): Unit = recordWriter.close(context) | ||
|
||
/** | ||
* This function constructs converter function for a given sparkSQL datatype. This is used in | ||
* writing Avro records out to disk | ||
*/ | ||
private def createConverterToAvro( | ||
dataType: DataType, | ||
structName: String, | ||
recordNamespace: String): (Any) => Any = { | ||
dataType match { | ||
case BinaryType => (item: Any) => item match { | ||
case null => null | ||
case bytes: Array[Byte] => ByteBuffer.wrap(bytes) | ||
} | ||
case ByteType | ShortType | IntegerType | LongType | | ||
FloatType | DoubleType | StringType | BooleanType => identity | ||
case _: DecimalType => (item: Any) => if (item == null) null else item.toString | ||
case TimestampType => (item: Any) => | ||
if (item == null) null else item.asInstanceOf[Timestamp].getTime | ||
case ArrayType(elementType, _) => | ||
val elementConverter = createConverterToAvro(elementType, structName, recordNamespace) | ||
(item: Any) => { | ||
if (item == null) { | ||
null | ||
} else { | ||
val sourceArray = item.asInstanceOf[Seq[Any]] | ||
val sourceArraySize = sourceArray.size | ||
val targetArray = new Array[Any](sourceArraySize) | ||
var idx = 0 | ||
while (idx < sourceArraySize) { | ||
targetArray(idx) = elementConverter(sourceArray(idx)) | ||
idx += 1 | ||
} | ||
targetArray | ||
} | ||
} | ||
case MapType(StringType, valueType, _) => | ||
val valueConverter = createConverterToAvro(valueType, structName, recordNamespace) | ||
(item: Any) => { | ||
if (item == null) { | ||
null | ||
} else { | ||
val javaMap = new HashMap[String, Any]() | ||
item.asInstanceOf[Map[String, Any]].foreach { case (key, value) => | ||
javaMap.put(key, valueConverter(value)) | ||
} | ||
javaMap | ||
} | ||
} | ||
case structType: StructType => | ||
val builder = SchemaBuilder.record(structName).namespace(recordNamespace) | ||
val schema: Schema = SchemaConverters.convertStructToAvro( | ||
structType, builder, recordNamespace) | ||
val fieldConverters = structType.fields.map(field => | ||
createConverterToAvro(field.dataType, field.name, recordNamespace)) | ||
(item: Any) => { | ||
if (item == null) { | ||
null | ||
} else { | ||
val record = new Record(schema) | ||
val convertersIterator = fieldConverters.iterator | ||
val fieldNamesIterator = dataType.asInstanceOf[StructType].fieldNames.iterator | ||
val rowIterator = item.asInstanceOf[Row].toSeq.iterator | ||
|
||
while (convertersIterator.hasNext) { | ||
val converter = convertersIterator.next() | ||
record.put(fieldNamesIterator.next(), converter(rowIterator.next())) | ||
} | ||
record | ||
} | ||
} | ||
} | ||
} | ||
} |
30 changes: 30 additions & 0 deletions
30
external/avro/src/main/scala/com/databricks/spark/avro/AvroOutputWriterFactory.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
/* | ||
* Copyright (C) 2016 Databricks, Inc. | ||
* | ||
* Portions of this software incorporate or are derived from software contained within Apache Spark, | ||
* and this modified software differs from the Apache Spark software provided under the Apache | ||
* License, Version 2.0, a copy of which you may obtain at | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
*/ | ||
|
||
package com.databricks.spark.avro | ||
|
||
import org.apache.hadoop.mapreduce.TaskAttemptContext | ||
|
||
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} | ||
import org.apache.spark.sql.types.StructType | ||
|
||
private[avro] class AvroOutputWriterFactory( | ||
schema: StructType, | ||
recordName: String, | ||
recordNamespace: String) extends OutputWriterFactory { | ||
|
||
override def getFileExtension(context: TaskAttemptContext): String = ".avro" | ||
|
||
def newInstance( | ||
path: String, | ||
dataSchema: StructType, | ||
context: TaskAttemptContext): OutputWriter = { | ||
new AvroOutputWriter(path, context, schema, recordName, recordNamespace) | ||
} | ||
} |
Oops, something went wrong.