Skip to content

Commit

Permalink
Merge pull request #9 from alex268/main
Browse files Browse the repository at this point in the history
Updated tests and added code coverage
  • Loading branch information
alex268 authored Feb 20, 2025
2 parents 10b17a2 + d2e13e8 commit 43e2522
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 114 deletions.
55 changes: 36 additions & 19 deletions connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,19 @@
<artifactId>protobuf-java</artifactId>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-core</artifactId>
Expand All @@ -35,7 +48,6 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-scheme</artifactId>
Expand All @@ -50,35 +62,40 @@
<artifactId>yc-auth-provider</artifactId>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>tech.ydb.test</groupId>
<artifactId>ydb-junit4-support</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.26.3</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
</plugin>
<!-- Code Coverage report generation -->
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
15 changes: 7 additions & 8 deletions connector/src/main/java/tech/ydb/spark/connector/YdbCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import tech.ydb.core.Issue;
import tech.ydb.core.Result;
Expand All @@ -45,14 +47,10 @@
*
* @author zinal
*/
public class YdbCatalog extends YdbOptions
implements CatalogPlugin, TableCatalog, SupportsNamespaces {
public class YdbCatalog extends YdbOptions implements CatalogPlugin, TableCatalog, SupportsNamespaces {

private static final org.slf4j.Logger LOG
= org.slf4j.LoggerFactory.getLogger(YdbCatalog.class);
private static final Logger logger = LoggerFactory.getLogger(YdbCatalog.class);

// X.Y[-SNAPSHOT]
public static final String VERSION = "1.4-SNAPSHOT";
public static final String INDEX_PREFIX = "ix/";
public static final String ENTRY_TYPE = "ydb_entry_type";
public static final String ENTRY_OWNER = "ydb_entry_owner";
Expand All @@ -63,6 +61,7 @@ public class YdbCatalog extends YdbOptions

@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
logger.info("Initialize YDB catalog {}", name);
this.catalogName = name;
this.connector = YdbRegistry.getOrCreate(name, options);
this.listIndexes = options.getBoolean(LIST_INDEXES, false);
Expand Down Expand Up @@ -147,7 +146,7 @@ private void listIndexes(String[] namespace, List<Identifier> retval,
}).join();
if (!res.isSuccess()) {
// Skipping problematic entries.
LOG.warn("Skipping index listing for table {} due to failed describe, status {}",
logger.warn("Skipping index listing for table {} due to failed describe, status {}",
tablePath, res.getStatus());
return;
}
Expand Down Expand Up @@ -239,7 +238,7 @@ public boolean dropTable(Identifier ident) {
throw new UnsupportedOperationException("Cannot drop index table " + ident);
}
final String tablePath = mergePath(ident);
LOG.debug("Dropping table {}", tablePath);
logger.debug("Dropping table {}", tablePath);
Result<TableDescription> res = getRetryCtx().supplyResult(session -> {
final DescribeTableSettings dts = new DescribeTableSettings();
dts.setIncludeShardKeyBounds(false);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,67 +16,65 @@
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.After;
import org.junit.Before;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.testcontainers.containers.GenericContainer;

import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.spark.connector.impl.YdbConnector;
import tech.ydb.spark.connector.impl.YdbRegistry;
import tech.ydb.table.query.Params;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.table.SessionRetryContext;
import tech.ydb.table.TableClient;
import tech.ydb.table.transaction.TxControl;

import static org.assertj.core.api.Assertions.assertThat;
import tech.ydb.test.junit4.YdbHelperRule;

public class IntegrationTest {
@ClassRule
public static final YdbHelperRule YDB = new YdbHelperRule();

public static final String CATALOG = "spark.sql.catalog.ydb1";
public static final String TEST_TABLE = "test_table";

public static final GenericContainer<?> YDB =
new GenericContainer<>("cr.yandex/yc/yandex-docker-local-ydb:latest")
.withCreateContainerCmdModifier(cmd -> cmd.withHostName("localhost"))
.withNetworkMode("host")
.withEnv("GRPC_TLS_PORT", "2135")
.withEnv("GRPC_PORT", "2136")
.withEnv("MON_PORT", "8765")
.withEnv("YDB_USE_IN_MEMORY_PDISKS", "true");

static {
YDB.start();
}
private static GrpcTransport transport;
private static TableClient tableClient;
private static SessionRetryContext retryCtx;

private static final Map<String, String> options = new HashMap<>();
private static SparkSession spark;

@BeforeClass
public static void prepare() {
options.put("url", (YDB.useTls() ? "grpcs" : "grpc") + "://" + YDB.endpoint() + YDB.database());
if (YDB.authToken() != null) {
options.put("auth.mode", "TOKEN");
options.put("auth.token", YDB.authToken());
} else {
options.put("auth.mode", "NONE");
}
options.put("dbtable", TEST_TABLE);

protected SparkSession spark;
protected Map<String, String> options;
protected YdbConnector connector;
SparkConf conf = new SparkConf()
.setMaster("local[4]")
.setAppName("ydb-spark-integration-test")
.set("spark.ui.enabled", "false")
.set("spark.sql.catalog.ydb", "tech.ydb.spark.connector.YdbCatalog");
options.forEach((k, v) -> conf.set("spark.sql.catalog.ydb." + k, v));

transport = YDB.createTransport();
tableClient = TableClient.newClient(transport).build();
retryCtx = SessionRetryContext.create(tableClient).build();

@Before
public void prepare() {
options = commonConfigs();
spark = SparkSession.builder()
.config(getSparkConf())
.config(conf)
.config(conf)
.getOrCreate();
connector = YdbRegistry.getOrCreate(options);
}

private SparkConf getSparkConf() {
SparkConf conf = new SparkConf()
.setMaster("local")
.setAppName("test")
.set(CATALOG, "tech.ydb.spark.connector.YdbCatalog");
for (Map.Entry<String, String> one : options.entrySet()) {
conf.set(CATALOG + "." + one.getKey(), one.getValue());
}
return conf;
}

private Map<String, String> commonConfigs() {
HashMap<String, String> result = new HashMap<>();
result.put("url", "grpc://127.0.0.1:2136?database=/local");
result.put("auth.mode", "NONE");
result.put("dbtable", TEST_TABLE);
return result;
@AfterClass
public static void closeAll() {
spark.close();
tableClient.close();
transport.close();
}

@After
Expand All @@ -102,7 +100,8 @@ public void testViaJavaApi() {
.schema(dataFrame.schema())
.load()
.collectAsList();
assertThat(rows1).hasSize(1);

Assert.assertEquals(1, rows1.size());
}

@Test
Expand All @@ -118,7 +117,7 @@ public void testOverwrite() {
.schema(dataFrame.schema())
.load()
.collectAsList();
assertThat(rows1).hasSize(3);
Assert.assertEquals(3, rows1.size());

dataFrame.write()
.format("ydb")
Expand All @@ -132,7 +131,7 @@ public void testOverwrite() {
.schema(dataFrame.schema())
.load()
.collectAsList();
assertThat(rows1).hasSize(1);
Assert.assertEquals(1, rows1.size());
}

@Test
Expand All @@ -141,11 +140,12 @@ public void testCreateBySparkSQL() {
schemaQuery("create table " + original + "(id Uint64, value Text, PRIMARY KEY(id))").expectSuccess();
dataQuery("upsert into " + original + "(id, value) values (1, 'asdf'), (2, 'zxcv'), (3, 'fghj')");

spark.sql("create table ydb1." + TEST_TABLE + "(id bigint, value string) ").queryExecution();
spark.sql("insert into ydb1." + TEST_TABLE + " select * from ydb1." + original).queryExecution();
List<Row> rows = spark.sql("select * from ydb1." + TEST_TABLE)
spark.sql("create table ydb." + TEST_TABLE + "(id bigint, value string) ").queryExecution();
spark.sql("insert into ydb." + TEST_TABLE + " select * from ydb." + original).queryExecution();
List<Row> rows = spark.sql("select * from ydb." + TEST_TABLE)
.collectAsList();
assertThat(rows).hasSize(3);

Assert.assertEquals(3, rows.size());
}

@Test
Expand All @@ -154,11 +154,12 @@ public void testCreateWithSelect() {
schemaQuery("create table " + original + "(id Uint64, value Text, PRIMARY KEY(id))").expectSuccess();
dataQuery("upsert into " + original + "(id, value) values (1, 'asdf'), (2, 'zxcv'), (3, 'fghj')");

spark.sql("create table ydb1." + TEST_TABLE + " as select * from ydb1." + original)
spark.sql("create table ydb." + TEST_TABLE + " as select * from ydb." + original)
.queryExecution();
List<Row> rows = spark.sql("select * from ydb1." + TEST_TABLE)
List<Row> rows = spark.sql("select * from ydb." + TEST_TABLE)
.collectAsList();
assertThat(rows).hasSize(3);

Assert.assertEquals(3, rows.size());
}

@Test
Expand Down Expand Up @@ -223,9 +224,10 @@ public void testCatalogAccess() {

dataQuery(upsertToster);

List<Row> rows = spark.sql("SELECT * FROM ydb1.toster")
List<Row> rows = spark.sql("SELECT * FROM ydb.toster")
.collectAsList();
assertThat(rows).hasSize(2);

Assert.assertEquals(2, rows.size());
}

private Dataset<Row> sampleDataset() {
Expand All @@ -238,17 +240,11 @@ private Dataset<Row> sampleDataset() {
}

private void dataQuery(String query) {
connector.getRetryCtx().supplyStatus(session -> session.executeDataQuery(
query,
TxControl.serializableRw().setCommitTx(true),
Params.empty())
.thenApply(Result::getStatus))
.join().expectSuccess();
retryCtx.supplyResult(session -> session.executeDataQuery(query, TxControl.serializableRw()))
.join().getStatus().expectSuccess();
}

private Status schemaQuery(String query) {
return connector.getRetryCtx().supplyStatus(
session -> session.executeSchemeQuery(query)).join();
private static Status schemaQuery(String query) {
return retryCtx.supplyStatus(session -> session.executeSchemeQuery(query)).join();
}

}
Loading

0 comments on commit 43e2522

Please sign in to comment.