Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cosmos spark3 write code path DataSourceV2 skeleton #17532

Conversation

moderakh
Copy link
Contributor

@moderakh moderakh commented Nov 12, 2020

This PR

  • adds the skeleton for write code path DataSourceV2
  • adds skeleton for unit tests with basic unit tests for DataFrame to ObjectNode conversion see CosmosRowConverterSpec
  • end to end write code path implementation see TestE2EMain this writes to cosmos db.
  • Datasource is registered as "cosmos.items". name suggestion?
  • module-info.java was removed (scala doesn't have support for java module system: error: illegal start of type declaration scala/bug#11423)
val df = Seq(
  (8, "bat"),
  (64, "mouse"),
  (-27, "horse")
).toDF("number", "word")

df.printSchema()

df.write.format("cosmos.write").mode("append").options {
  destCfg
}.save()

TODO (come later)

  • schema inference (schema is hard coded to make TestE2EMain work)
  • passing down user config, for now account endpoints etc are hard code to make TestE2EMain work
  • more discussion is required on Row <-> ObjectNode conversion. will come later.
  • add more tests for DataFrame to ObjectNode conversion

@ghost ghost added the Cosmos label Nov 12, 2020
@moderakh moderakh changed the title Cosmos spark3.9 write code path DataSourceV2 skeleton Cosmos spark3 write code path DataSourceV2 skeleton Nov 12, 2020
caseInsensitiveStringMap.asCaseSensitiveMap()).schema()
}

override def shortName(): String = "cosmos.write"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my mental model I though about something like

cosmos.items (which would implement the inetrfaces for read and write(batch and point))

cosmos.changefeed for changefeed - just read interfaces

Copy link
Contributor Author

@moderakh moderakh Nov 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had picked "cosmos.write" to get started. "cosmos.items" for batch read/write looks better to me.

regarding "cosmos.changefeed", that will be used mainly for streaming scenario right? should we add streaming suffix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changefeed would be used for both streaming and batch (mostly streaming i guess but we also have plenty of customers using batch in regular intervals)
Streaming and batch are just different notions on the read write capabilities so I would not add any prefixes there
But between Items and Changefeed there are significant differences - write on CF isn't possible and even read is very different because for example predicate push down isn't possible for CF etc.


// TODO moderakh account config and databaseName, containerName need to passed down from the user
val client = new CosmosClientBuilder()
.key(TestConfigurations.MASTER_KEY)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assuem long term we would want to have a cache similar to what I added in the 3.* release for today's OLTP connector? If so I can take a stab at that early next week.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to CosmosDBConnectionCache.scala

val userProvidedSchema = StructType(Seq(StructField("number", IntegerType), StructField("word", StringType)))

val objectNode = CosmosRowConverter.internalRowToObjectNode(internalRow, userProvidedSchema)
// TODO: moderakh how should we handle absence of id?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks liek the best approach - to generate it in the spark layer before calling the sdk

case _ => objectNode.putNull(fieldName)
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this implementation coming from OLAP, built-in Spark connectors like CSVDataDSource? I though Spark also added capability to transform DataFrame forma nd to json - my gut feeling is that it would be good to stick with taht one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh please don't review RowConverter yet. this class is evolving ...

Copy link
Contributor Author

@moderakh moderakh Nov 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Row -> ObjectNode, I didn't find any out of the box suitable Row -> ObjectNode conversion.

based on my reading, a few interesting things which I found:

  1. There is a "private internal" class converter in the spark code 
    https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
    As this is a private class not suitable for us.

  2. The other option is serializing Row to a json String and passing Json string to CosmosClient. Not a good options as it requires parsing the string to extract PK (perf overhead)

  3. the other option is dealing with
    https://spark.apache.org/docs/3.0.1/api/java/org/apache/spark/sql/Row.html#jsonValue--
    It should be possible to convert from org.json4s to jackson

but anyway this option also doesn't seem to be a good one. The Row.jsonValue in the Scala code seem to be a private method not a public one.
see here: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala#L548

  1. one other thing I would like to read about is Row Encoders and Decoders:
    https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-Encoder.html

########

The RowConverter code in this PR is a rewritten version of what exists in OLTP spark connector today.
I rewrote it to work with jackson with some fixes and added some unit tests.

Are you referring to some other workaround?

Copy link
Contributor Author

@moderakh moderakh Nov 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this to TODO section. This requires more discussion/investigation. Will be done after this PR.

Copy link
Member

@FabianMeiswinkel FabianMeiswinkel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LKTM - couple of comments - let's chat offline if they are unclear or we disagree

@moderakh moderakh merged commit 744aa1c into Azure:feature/cosmos/spark30 Nov 14, 2020
@moderakh moderakh added the cosmos:spark3 Cosmos DB Spark3 OLTP Connector label Dec 8, 2020
@moderakh moderakh linked an issue Dec 18, 2020 that may be closed by this pull request
@moderakh moderakh deleted the users/moderakh/spark-write-code-path-datasourcev2 branch February 8, 2021 23:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cosmos:spark3 Cosmos DB Spark3 OLTP Connector Cosmos
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Write code path DataSourceV2 skeleton
2 participants