diff --git a/.gitignore b/.gitignore index 73c9e05b4eec1..4ab6afa302610 100644 --- a/.gitignore +++ b/.gitignore @@ -136,4 +136,3 @@ playground/frontend/playground_components/pubspec.lock **/*.tfvars # Ignore Katas auto-generated files -**/*-remote-info.yaml \ No newline at end of file diff --git a/CHANGES.md b/CHANGES.md index e856ef18dab38..fbf0f6c6a31f8 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -98,6 +98,7 @@ * S3 implementation of the Beam filesystem (Go) ([#23991](https://github.com/apache/beam/issues/23991)). * Support for SingleStoreDB source and sink added (Java) ([#22617](https://github.com/apache/beam/issues/22617)). * Added support for DefaultAzureCredential authentication in Azure Filesystem (Python) ([#24210](https://github.com/apache/beam/issues/24210)). +* Support for read from Cosmos DB Core SQL API [#23610](https://github.com/apache/beam/pull/23610) ## New Features / Improvements diff --git a/sdks/java/io/azure-cosmosdb/README.md b/sdks/java/io/azure-cosmosdb/README.md new file mode 100644 index 0000000000000..f7c61c73ab74a --- /dev/null +++ b/sdks/java/io/azure-cosmosdb/README.md @@ -0,0 +1,40 @@ +# Cosmos DB Core SQL API + +Compile all module azure-cosmosdb + +```shell +gradle sdks:java:io:azure-cosmosdb:build +``` + +## Test + +Run TEST for this module (Cosmos DB Core SQL API): + +```shell +gradle sdks:java:io:azure-cosmosdb:test +``` + + +## Publish in Maven Local + +Publish this module + +```shell +# apache beam core +gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/core/ publishToMavenLocal + +# apache beam azure-cosmosdb +gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/io/azure-cosmosdb/ publishToMavenLocal +``` + +Publish all modules of apache beam + +```shell +gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/ publishToMavenLocal + +gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p runners/ publishToMavenLocal + +gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p model/ publishToMavenLocal +``` + + diff --git a/sdks/java/io/azure-cosmosdb/build.gradle b/sdks/java/io/azure-cosmosdb/build.gradle new file mode 100644 index 0000000000000..38755326521a6 --- /dev/null +++ b/sdks/java/io/azure-cosmosdb/build.gradle @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { + id("org.apache.beam.module") + id("scala") +} + +ext { + junitVersion = "5.9.1" + cosmosVersion = "4.37.1" + cosmosContainerVersion = "1.17.5" + bsonMongoVersion = "4.7.2" + log4jVersion = "2.19.0" +} + +applyJavaNature(automaticModuleName: "org.apache.beam.sdk.io.azure.cosmosdb") + +description = "Apache Beam :: SDKs :: Java :: IO :: Azure Cosmos DB" +ext.summary = "IO library to read and write Azure Cosmos DB services from Beam." + +dependencies { + implementation("org.scala-lang:scala-library:2.12.17") + implementation("com.azure:azure-cosmos:${cosmosVersion}") + implementation library.java.commons_io + permitUnusedDeclared library.java.commons_io // BEAM-11761 + implementation library.java.slf4j_api + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation("org.mongodb:bson:${bsonMongoVersion}") +} + +// TEST +dependencies { + testImplementation("org.testcontainers:azure:${cosmosContainerVersion}") + testImplementation("com.outr:scribe_2.12:3.10.4") + testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") + testImplementation library.java.mockito_core + testImplementation("org.junit.vintage:junit-vintage-engine:${junitVersion}") + testRuntimeOnly("org.apache.logging.log4j:log4j-api:$log4jVersion") + testRuntimeOnly("org.apache.logging.log4j:log4j-core:$log4jVersion") + testRuntimeOnly library.java.slf4j_jdk14 + testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") +} diff --git a/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedReader.scala b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedReader.scala new file mode 100644 index 0000000000000..f25b90306894d --- /dev/null +++ b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedReader.scala @@ -0,0 +1,50 @@ +package org.apache.beam.sdk.io.azure.cosmos + +import com.azure.cosmos.models.CosmosQueryRequestOptions +import com.azure.cosmos.{CosmosClient, CosmosClientBuilder} +import org.apache.beam.sdk.io.BoundedSource +import org.bson.Document +import org.slf4j.LoggerFactory + + +private class CosmosBoundedReader(cosmosSource: CosmosBoundedSource) extends BoundedSource.BoundedReader[Document] { + private val log = LoggerFactory.getLogger(getClass) + private var maybeClient: Option[CosmosClient] = None + private var maybeIterator: Option[java.util.Iterator[Document]] = None + + override def start(): Boolean = { + maybeClient = Some( + new CosmosClientBuilder() + .gatewayMode + .endpointDiscoveryEnabled(false) + .endpoint(cosmosSource.readCosmos.endpoint) + .key(cosmosSource.readCosmos.key) + .buildClient + ) + + maybeIterator = maybeClient.map { client => + log.info("Get the container name") + + log.info(s"Get the iterator of the query in container ${cosmosSource.readCosmos.container}") + client + .getDatabase(cosmosSource.readCosmos.database) + .getContainer(cosmosSource.readCosmos.container) + .queryItems(cosmosSource.readCosmos.query, new CosmosQueryRequestOptions(), classOf[Document]) + .iterator() + } + + true + } + + override def advance(): Boolean = maybeIterator.exists(_.hasNext) + + override def getCurrent: Document = maybeIterator + .filter(_.hasNext) + //.map(iterator => new Document(iterator.next())) + .map(_.next()) + .orNull + + override def getCurrentSource: CosmosBoundedSource = cosmosSource + + override def close(): Unit = maybeClient.foreach(_.close()) +} diff --git a/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedSource.scala b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedSource.scala new file mode 100644 index 0000000000000..39fafc90394c4 --- /dev/null +++ b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedSource.scala @@ -0,0 +1,25 @@ +package org.apache.beam.sdk.io.azure.cosmos + +import org.apache.beam.sdk.coders.{Coder, SerializableCoder} +import org.apache.beam.sdk.io.BoundedSource +import org.apache.beam.sdk.options.PipelineOptions +import org.bson.Document + +import java.util +import java.util.Collections + +class CosmosBoundedSource(val readCosmos: CosmosRead) extends BoundedSource[Document] { + + /** @inheritDoc + * TODO: You have to find a better way, maybe by partition key */ + override def split(desiredBundleSizeBytes: Long, options: PipelineOptions): util.List[CosmosBoundedSource] = Collections.singletonList(this) + + /** @inheritDoc + * The Cosmos DB Coro (SQL) API not support this metrics by the querys */ + override def getEstimatedSizeBytes(options: PipelineOptions): Long = 0L + + override def getOutputCoder: Coder[Document] = SerializableCoder.of(classOf[Document]) + + override def createReader(options: PipelineOptions): BoundedSource.BoundedReader[Document] = + new CosmosBoundedReader(this) +} diff --git a/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.scala b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.scala new file mode 100644 index 0000000000000..fcd3a7abfc75d --- /dev/null +++ b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.scala @@ -0,0 +1,8 @@ +package org.apache.beam.sdk.io.azure.cosmos;; + +object CosmosIO { + def read(): CosmosRead = { + CosmosRead() + } +} + diff --git a/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosRead.scala b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosRead.scala new file mode 100644 index 0000000000000..48a0ece0d40dc --- /dev/null +++ b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosRead.scala @@ -0,0 +1,45 @@ +package org.apache.beam.sdk.io.azure.cosmos + +import org.apache.beam.sdk.io.Read +import org.apache.beam.sdk.transforms.PTransform +import org.apache.beam.sdk.values.{PBegin, PCollection} +import org.bson.Document +import org.slf4j.LoggerFactory + +case class CosmosRead(private[cosmos] val endpoint: String = null, + private[cosmos] val key: String = null, + private[cosmos] val database: String = null, + private[cosmos] val container: String = null, + private[cosmos] val query: String = null) + extends PTransform[PBegin, PCollection[Document]] { + + + private val log = LoggerFactory.getLogger(classOf[CosmosRead]) + + /** Create new ReadCosmos based into previous ReadCosmos, modifying the endpoint */ + def withCosmosEndpoint(endpoint: String): CosmosRead = this.copy(endpoint = endpoint) + + def withCosmosKey(key: String): CosmosRead = this.copy(key = key) + + def withDatabase(database: String): CosmosRead = this.copy(database = database) + + def withQuery(query: String): CosmosRead = this.copy(query = query) + + def withContainer(container: String): CosmosRead = this.copy(container = container) + + override def expand(input: PBegin): PCollection[Document] = { + log.debug(s"Read CosmosDB with endpoint: $endpoint and query: $query") + validate() + + // input.getPipeline.apply(Read.from(new CosmosSource(this))) + input.apply(Read.from(new CosmosBoundedSource(this))) + } + + private def validate(): Unit = { + require(endpoint != null, "CosmosDB endpoint is required") + require(key != null, "CosmosDB key is required") + require(database != null, "CosmosDB database is required") + require(container != null, "CosmosDB container is required") + require(query != null, "CosmosDB query is required") + } +} diff --git a/sdks/java/io/azure-cosmosdb/src/test/resources/log4j.properties b/sdks/java/io/azure-cosmosdb/src/test/resources/log4j.properties new file mode 100644 index 0000000000000..28f1bb63f7be3 --- /dev/null +++ b/sdks/java/io/azure-cosmosdb/src/test/resources/log4j.properties @@ -0,0 +1,8 @@ +# Root logger option +log4j.rootLogger=DEBUG, stdout + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +#log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n diff --git a/sdks/java/io/azure-cosmosdb/src/test/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIOTest.scala b/sdks/java/io/azure-cosmosdb/src/test/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIOTest.scala new file mode 100644 index 0000000000000..7de8dda145a65 --- /dev/null +++ b/sdks/java/io/azure-cosmosdb/src/test/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIOTest.scala @@ -0,0 +1,103 @@ +package org.apache.beam.sdk.io.azure.cosmos + +import com.azure.cosmos.CosmosClientBuilder +import org.apache.beam.sdk.Pipeline +import org.apache.beam.sdk.io.azure.cosmos.CosmosIOTest.{CONTAINER, DATABASE, cosmosDBEmulatorContainer} +import org.apache.beam.sdk.testing.PAssert +import org.apache.beam.sdk.transforms.Count +import org.apache.beam.sdk.values.PCollection +import org.bson.Document +import org.junit._ +import org.junit.rules.TemporaryFolder +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.slf4j.LoggerFactory +import org.testcontainers.containers.CosmosDBEmulatorContainer +import org.testcontainers.utility.DockerImageName + +import java.nio.file.Files +import scala.util.Using + +@RunWith(classOf[JUnit4]) +class CosmosIOTest { + private val log = LoggerFactory.getLogger("CosmosIOTest") + // @(Rule @getter) + // val pipelineWrite: TestPipeline = TestPipeline.create + // @(Rule @getter) + // val pipelineRead: TestPipeline = TestPipeline.create + + @Test + def readFromCosmosCoreSqlApi(): Unit = { + val read = CosmosIO + .read() + .withCosmosEndpoint(cosmosDBEmulatorContainer.getEmulatorEndpoint) + .withCosmosKey(cosmosDBEmulatorContainer.getEmulatorKey) + .withQuery(s"SELECT * FROM c") + .withContainer(CONTAINER) + .withDatabase(DATABASE) + + val pipeline = Pipeline.create() + val count: PCollection[java.lang.Long] = pipeline + .apply(read) + .apply(Count.globally()) + + PAssert.thatSingleton(count).isEqualTo(10) + + pipeline.run().waitUntilFinish() + } +} + +/** Initialization of static fields and methods */ +@RunWith(classOf[JUnit4]) +object CosmosIOTest { + private val log = LoggerFactory.getLogger("CosmosIOTest[Obj]") + private val DOCKER_NAME = "mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator:latest" + private val cosmosDBEmulatorContainer = new CosmosDBEmulatorContainer(DockerImageName.parse(DOCKER_NAME)) + private val DATABASE = "test" + private val CONTAINER = "test" + + @BeforeClass + def setup(): Unit = { + log.info("Starting CosmosDB emulator") + cosmosDBEmulatorContainer.start() + + val tempFolder = new TemporaryFolder + tempFolder.create() + val keyStoreFile = tempFolder.newFile("azure-cosmos-emulator.keystore").toPath + val keyStore = cosmosDBEmulatorContainer.buildNewKeyStore + keyStore.store(Files.newOutputStream(keyStoreFile.toFile.toPath), cosmosDBEmulatorContainer.getEmulatorKey.toCharArray) + System.setProperty("javax.net.ssl.trustStore", keyStoreFile.toString) + System.setProperty("javax.net.ssl.trustStorePassword", cosmosDBEmulatorContainer.getEmulatorKey) + System.setProperty("javax.net.ssl.trustStoreType", "PKCS12") + + + log.info("Creando la data -------------------------------------------------------->") + val triedCreateData = Using(new CosmosClientBuilder() + .gatewayMode + .endpointDiscoveryEnabled(false) + .endpoint(cosmosDBEmulatorContainer.getEmulatorEndpoint) + .key(cosmosDBEmulatorContainer.getEmulatorKey) + .buildClient) { client => + + client.createDatabase(DATABASE) + val db = client.getDatabase(DATABASE) + db.createContainer(CONTAINER, "/id") + val container = db.getContainer(CONTAINER) + for (i <- 1 to 10) { + container.createItem(new Document("id", i.toString)) + } + } + if (triedCreateData.isFailure) { + val throwable = triedCreateData.failed.get + log.error("Error creando la data", throwable) + throw throwable + } + log.info("Data creada ------------------------------------------------------------<") + } + + @AfterClass + def close(): Unit = { + log.info("Stop CosmosDB emulator") + cosmosDBEmulatorContainer.stop() + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 22bdad7e8c2c0..cf865c439ff5f 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -163,6 +163,7 @@ include(":sdks:java:io:amazon-web-services") include(":sdks:java:io:amazon-web-services2") include(":sdks:java:io:amqp") include(":sdks:java:io:azure") +include(":sdks:java:io:azure-cosmosdb") include(":sdks:java:io:cassandra") include(":sdks:java:io:clickhouse") include(":sdks:java:io:common")