Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read Projections #6

Merged
merged 6 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,8 @@
target/
*/target/
private/

.bloop/
.metals/
.vscode/
metals.sbt
1 change: 1 addition & 0 deletions .sbtopts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-J-Xss4M
18 changes: 11 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ To be able to write/read data to/from parquet files you need to define the follo
You can get Java SDK's `Type` by using `SchemaEncoder` generated by `SchemaEncoderDeriver.default` ZIO Schema deriver:

```scala
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.2
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4

import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._
Expand Down Expand Up @@ -57,7 +57,7 @@ Alternatively, you can override the schemas of some fields in your record by def
and using `SchemaEncoderDeriver.summoned` deriver.

```scala
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.2
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4

import me.mnedokushev.zio.apache.parquet.core.Schemas
import zio.schema._
Expand Down Expand Up @@ -96,7 +96,7 @@ For converting Scala values into `Value` and back we need to define instances of
type classes. This could be done by using `ValueDecoderDeriver.default` ZIO Schema deriver.

```scala
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.2
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4

import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._
Expand Down Expand Up @@ -129,7 +129,7 @@ Same as for `SchemaEncoder`, you can override the schemas of some fields in your
`ValueEncoder`/`ValueDecoder` and using `ValueEncoderDeriver.summoned`/`ValueDecoderDeriver.summoned` derivers accordingly.

```scala
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.2
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4

import me.mnedokushev.zio.apache.parquet.core.Value
import zio.schema._
Expand Down Expand Up @@ -177,10 +177,10 @@ println(record)
## Reading/Writing files

Finally, to perform some IO operations we need to initialize `ParquetWriter` and `ParquetReader` and use either
`writeChunk`/`readChunk` or `writeStream`/`readStream` methods
`writeChunk`/`readChunk` or `writeStream`/`readStream` methods.

```scala
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.2
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4

import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._
Expand Down Expand Up @@ -227,4 +227,8 @@ Unsafe.unsafe { implicit unsafe =>
}
// Outputs:
// Chunk(MyRecord(1,first,Some(11)),MyRecord(3,third,None))
```
```

In the previous code snippet we used `ParquetReader.configured[A]()` to initialize a reader that uses a parquet schema taken from a given file. Such a reader will always try to read all columns from a given file.

In case you need to read only part of the columns, use `ParquetReader.projected[A]()` that always will use the schema of the provided type.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ object Schemas {
def repetition(optional: Boolean): Repetition =
if (optional) Repetition.OPTIONAL else Repetition.REQUIRED

def asMessageType(schema: Type): MessageType = {
val groupSchema = schema.asGroupType()
val name = groupSchema.getName
val fields = groupSchema.getFields

new MessageType(name, fields)
}

import PrimitiveTypeName._
import LogicalTypeAnnotation._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@ import org.apache.parquet.schema.Type
import zio._
import zio.schema._

trait SchemaEncoder[A] {
trait SchemaEncoder[A] { self =>

def encode(schema: Schema[A], name: String, optional: Boolean): Type

def encodeZIO(schema: Schema[A], name: String, optional: Boolean): Task[Type] =
ZIO.attempt(encode(schema, name, optional))

def contramap[B](f: Schema[B] => Schema[A]): SchemaEncoder[B] =
new SchemaEncoder[B] {
override def encode(schema: Schema[B], name: String, optional: Boolean): Type =
self.encode(f(schema), name, optional)
}

}
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package me.mnedokushev.zio.apache.parquet.core.hadoop

import me.mnedokushev.zio.apache.parquet.core.Value.GroupValue.RecordValue
import me.mnedokushev.zio.apache.parquet.core.codec.ValueDecoder
import me.mnedokushev.zio.apache.parquet.core.codec.{ SchemaEncoder, ValueDecoder }
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.api.{ ReadSupport => HadoopReadSupport }
import org.apache.parquet.hadoop.{ ParquetReader => HadoopParquetReader }
import org.apache.parquet.io.InputFile
import zio._
import zio.schema.Schema
import zio.stream._

import java.io.IOException
import scala.annotation.nowarn

trait ParquetReader[+A <: Product] {

Expand All @@ -20,7 +20,11 @@ trait ParquetReader[+A <: Product] {

}

final class ParquetReaderLive[A <: Product](hadoopConf: Configuration)(implicit decoder: ValueDecoder[A])
final class ParquetReaderLive[A <: Product: Tag](
hadoopConf: Configuration,
schema: Option[Schema[A]] = None,
schemaEncoder: Option[SchemaEncoder[A]] = None
)(implicit decoder: ValueDecoder[A])
extends ParquetReader[A] {

override def readStream(path: Path): ZStream[Scope, Throwable, A] =
Expand Down Expand Up @@ -64,7 +68,7 @@ final class ParquetReaderLive[A <: Product](hadoopConf: Configuration)(implicit
inputFile <- path.toInputFileZIO(hadoopConf)
reader <- ZIO.fromAutoCloseable(
ZIO.attemptBlockingIO(
new ParquetReader.Builder(inputFile).withConf(hadoopConf).build()
new ParquetReader.Builder(inputFile, schema, schemaEncoder).withConf(hadoopConf).build()
)
)
} yield reader
Expand All @@ -73,16 +77,25 @@ final class ParquetReaderLive[A <: Product](hadoopConf: Configuration)(implicit

object ParquetReader {

final class Builder(file: InputFile) extends HadoopParquetReader.Builder[RecordValue](file) {
final class Builder[A: Tag](
file: InputFile,
schema: Option[Schema[A]] = None,
schemaEncoder: Option[SchemaEncoder[A]] = None
) extends HadoopParquetReader.Builder[RecordValue](file) {

override protected def getReadSupport: HadoopReadSupport[RecordValue] =
new ReadSupport
new ReadSupport(schema, schemaEncoder)

}

def configured[A <: Product: ValueDecoder](
hadoopConf: Configuration = new Configuration()
)(implicit @nowarn tag: Tag[A]): ULayer[ParquetReader[A]] =
)(implicit tag: Tag[A]): ULayer[ParquetReader[A]] =
ZLayer.succeed(new ParquetReaderLive[A](hadoopConf))

def projected[A <: Product: ValueDecoder](
hadoopConf: Configuration = new Configuration()
)(implicit schema: Schema[A], schemaEncoder: SchemaEncoder[A], tag: Tag[A]): ULayer[ParquetReader[A]] =
ZLayer.succeed(new ParquetReaderLive[A](hadoopConf, Some(schema), Some(schemaEncoder)))

}
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package me.mnedokushev.zio.apache.parquet.core.hadoop

import me.mnedokushev.zio.apache.parquet.core.Schemas
import me.mnedokushev.zio.apache.parquet.core.Value.GroupValue.RecordValue
import me.mnedokushev.zio.apache.parquet.core.codec.{ SchemaEncoder, ValueEncoder }
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.api.{ WriteSupport => HadoopWriteSupport }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter => HadoopParquetWriter }
import org.apache.parquet.io.OutputFile
import org.apache.parquet.schema.{ MessageType, Type }
import org.apache.parquet.schema.MessageType
import zio._
import zio.schema.Schema
import zio.stream._
Expand Down Expand Up @@ -55,20 +56,10 @@ final class ParquetWriterLive[A <: Product](
_ <- ZIO.attemptBlockingIO(writer.write(record.asInstanceOf[RecordValue]))
} yield ()

private def build(path: Path): RIO[Scope, HadoopParquetWriter[RecordValue]] = {

def castToMessageSchema(schema: Type) =
ZIO.attempt {
val groupSchema = schema.asGroupType()
val name = groupSchema.getName
val fields = groupSchema.getFields

new MessageType(name, fields)
}

private def build(path: Path): RIO[Scope, HadoopParquetWriter[RecordValue]] =
for {
schema <- schemaEncoder.encodeZIO(schema, tag.tag.shortName, optional = false)
messageSchema <- castToMessageSchema(schema)
messageSchema <- ZIO.attempt(Schemas.asMessageType(schema))
outputFile <- path.toOutputFileZIO(hadoopConf)
builder = new ParquetWriter.Builder(outputFile, messageSchema)
.withWriteMode(writeMode)
Expand All @@ -82,7 +73,6 @@ final class ParquetWriterLive[A <: Product](
.withConf(hadoopConf)
writer <- ZIO.fromAutoCloseable(ZIO.attemptBlockingIO(builder.build()))
} yield writer
}

}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,31 @@
package me.mnedokushev.zio.apache.parquet.core.hadoop

import me.mnedokushev.zio.apache.parquet.core.Schemas
import me.mnedokushev.zio.apache.parquet.core.Value.GroupValue.RecordValue
import me.mnedokushev.zio.apache.parquet.core.codec.SchemaEncoder
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.api.{ InitContext, ReadSupport => HadoopReadSupport }
import org.apache.parquet.io.api.{ GroupConverter, RecordMaterializer }
import org.apache.parquet.schema.MessageType
import zio.Tag
import zio.prelude._
import zio.schema.Schema

import java.util

class ReadSupport extends HadoopReadSupport[RecordValue] {
class ReadSupport[A](
schema: Option[Schema[A]] = None,
schemaEncoder: Option[SchemaEncoder[A]] = None
)(implicit tag: Tag[A])
extends HadoopReadSupport[RecordValue] {

override def prepareForRead(
configuration: Configuration,
keyValueMetaData: util.Map[String, String],
keyValueMetaData: java.util.Map[String, String],
fileSchema: MessageType,
readContext: HadoopReadSupport.ReadContext
): RecordMaterializer[RecordValue] = new RecordMaterializer[RecordValue] {

private val converter =
GroupValueConverter.root(fileSchema)
GroupValueConverter.root(resolveSchema(fileSchema))

override def getCurrentRecord: RecordValue =
converter.get
Expand All @@ -29,6 +36,11 @@ class ReadSupport extends HadoopReadSupport[RecordValue] {
}

override def init(context: InitContext): HadoopReadSupport.ReadContext =
new HadoopReadSupport.ReadContext(context.getFileSchema)
new HadoopReadSupport.ReadContext(resolveSchema(context.getFileSchema))

private def resolveSchema(contextSchema: MessageType): MessageType =
(schema <*> schemaEncoder).fold(contextSchema) { case (schema0, schemaEncoder0) =>
Schemas.asMessageType(schemaEncoder0.encode(schema0, tag.tag.shortName, optional = false))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ object ParquetIOSpec extends ZIOSpecDefault {
Derive.derive[ValueDecoder, Record](ValueDecoderDeriver.summoned)
}

case class ProjectedRecord(a: Int, c: Option[Long], d: List[Int], e: Map[String, Int])
object ProjectedRecord {
implicit val schema: Schema[ProjectedRecord] =
DeriveSchema.gen[ProjectedRecord]
implicit val schemaEncoder: SchemaEncoder[ProjectedRecord] =
Derive.derive[SchemaEncoder, ProjectedRecord](SchemaEncoderDeriver.summoned)
implicit val valueEncoder: ValueEncoder[ProjectedRecord] =
Derive.derive[ValueEncoder, ProjectedRecord](ValueEncoderDeriver.summoned)
implicit val valueDecoder: ValueDecoder[ProjectedRecord] =
Derive.derive[ValueDecoder, ProjectedRecord](ValueDecoderDeriver.summoned)
}

override def spec: Spec[TestEnvironment with Scope, Any] =
suite("ParquetIOSpec")(
test("write and read - chunk") {
Expand Down Expand Up @@ -55,10 +67,27 @@ object ParquetIOSpec extends ZIOSpecDefault {
_ <- writer.writeStream(tmpPath, ZStream.fromChunk(payload))
resultStream <- ZIO.scoped[Any](reader.readStream(tmpPath).runCollect)
} yield assertTrue(resultStream == payload)
} @@ after(cleanTmpFile(tmpDir)),
test("write full and read projected") {
val payload = Chunk(
Record(1, "foo", None, List(1, 2), Map("first" -> 1, "second" -> 2)),
Record(2, "bar", Some(3L), List.empty, Map("third" -> 3))
)
val projectedPayload = payload.map { r =>
ProjectedRecord(r.a, r.c, r.d, r.e)
}

for {
writer <- ZIO.service[ParquetWriter[Record]]
reader <- ZIO.service[ParquetReader[ProjectedRecord]]
_ <- writer.writeChunk(tmpPath, payload)
result <- reader.readChunk(tmpPath)
} yield assertTrue(result == projectedPayload)
} @@ after(cleanTmpFile(tmpDir))
).provide(
ParquetWriter.configured[Record](),
ParquetReader.configured[Record]()
ParquetReader.configured[Record](),
ParquetReader.projected[ProjectedRecord]()
) @@ sequential

private def cleanTmpFile(path: Path) =
Expand Down