-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cosmos spark3 write code path DataSourceV2 skeleton #17532
Cosmos spark3 write code path DataSourceV2 skeleton #17532
Conversation
...s/azure-cosmos-spark_3-0_2-12/src/main/scala/com/azure/cosmos/spark/CosmosLoggingTrait.scala
Show resolved
Hide resolved
...s/azure-cosmos-spark_3-0_2-12/src/main/scala/com/azure/cosmos/spark/CosmosLoggingTrait.scala
Outdated
Show resolved
Hide resolved
caseInsensitiveStringMap.asCaseSensitiveMap()).schema() | ||
} | ||
|
||
override def shortName(): String = "cosmos.write" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my mental model I though about something like
cosmos.items (which would implement the inetrfaces for read and write(batch and point))
cosmos.changefeed for changefeed - just read interfaces
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had picked "cosmos.write" to get started. "cosmos.items" for batch read/write looks better to me.
regarding "cosmos.changefeed", that will be used mainly for streaming scenario right? should we add streaming suffix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changefeed would be used for both streaming and batch (mostly streaming i guess but we also have plenty of customers using batch in regular intervals)
Streaming and batch are just different notions on the read write capabilities so I would not add any prefixes there
But between Items and Changefeed there are significant differences - write on CF isn't possible and even read is very different because for example predicate push down isn't possible for CF etc.
...mos/azure-cosmos-spark_3-0_2-12/src/main/scala/com/azure/cosmos/spark/CosmosDataSource.scala
Outdated
Show resolved
Hide resolved
|
||
// TODO moderakh account config and databaseName, containerName need to passed down from the user | ||
val client = new CosmosClientBuilder() | ||
.key(TestConfigurations.MASTER_KEY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assuem long term we would want to have a cache similar to what I added in the 3.* release for today's OLTP connector? If so I can take a stab at that early next week.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to CosmosDBConnectionCache.scala
val userProvidedSchema = StructType(Seq(StructField("number", IntegerType), StructField("word", StringType))) | ||
|
||
val objectNode = CosmosRowConverter.internalRowToObjectNode(internalRow, userProvidedSchema) | ||
// TODO: moderakh how should we handle absence of id? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks liek the best approach - to generate it in the spark layer before calling the sdk
case _ => objectNode.putNull(fieldName) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this implementation coming from OLAP, built-in Spark connectors like CSVDataDSource? I though Spark also added capability to transform DataFrame forma nd to json - my gut feeling is that it would be good to stick with taht one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh please don't review RowConverter yet. this class is evolving ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Row -> ObjectNode, I didn't find any out of the box suitable Row -> ObjectNode conversion.
based on my reading, a few interesting things which I found:
-
There is a "private internal" class converter in the spark code
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
As this is a private class not suitable for us. -
The other option is serializing Row to a json String and passing Json string to CosmosClient. Not a good options as it requires parsing the string to extract PK (perf overhead)
-
the other option is dealing with
https://spark.apache.org/docs/3.0.1/api/java/org/apache/spark/sql/Row.html#jsonValue--
It should be possible to convert from org.json4s to jackson
but anyway this option also doesn't seem to be a good one. The Row.jsonValue
in the Scala code seem to be a private method not a public one.
see here: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala#L548
- one other thing I would like to read about is Row Encoders and Decoders:
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-Encoder.html
########
The RowConverter code in this PR is a rewritten version of what exists in OLTP spark connector today.
I rewrote it to work with jackson with some fixes and added some unit tests.
Are you referring to some other workaround?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this to TODO section. This requires more discussion/investigation. Will be done after this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LKTM - couple of comments - let's chat offline if they are unclear or we disagree
This PR
CosmosRowConverterSpec
TestE2EMain
this writes to cosmos db.TODO (come later)
TestE2EMain
work)TestE2EMain
work