Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
vkorukanti committed Oct 15, 2024
1 parent 21109b2 commit 731a9d8
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.types.Types.LongType;
import org.apache.iceberg.types.Types.StringType;
import org.apache.iceberg.types.Types.TimestampType;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -189,7 +190,10 @@ private void runTest(String branch, boolean useSchema, Map<String, String> extra
.config("iceberg.tables", String.format("%s.%s", TEST_DB, TEST_TABLE))
.config("iceberg.control.commit.interval-ms", 1000)
.config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE)
.config("iceberg.kafka.auto.offset.reset", "earliest");
.config("iceberg.kafka.auto.offset.reset", "earliest")
.config(
"iceberg.kafka." + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
KafkaConnectUtils.getBootstrapServers());

context().connectorCatalogProperties().forEach(connectorConfig::config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public Config config(String key, Object value) {

public static void startConnectCluster() {
if (USE_EMBEDDED_CONNECT) {
if (connectCluster != null) {
// cluster is already running
return;
}
Map<String, String> workerProps = new HashMap<>();
// permit all Kafka client overrides; required for testing different consumer partition
// assignment strategies
Expand Down
66 changes: 21 additions & 45 deletions connectors/kafka/src/test/java/io/delta/kafka/TestContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,60 +78,36 @@ public void stopConnector(String name) {
public Catalog initLocalCatalog() {
DeltaCatalog deltaCatalog = new DeltaCatalog();

deltaCatalog.initialize("delta-catalog", deltaCatalogProperties());
deltaCatalog.initialize(
"delta-catalog",
ImmutableMap.<String, String>builder()
.put(CatalogProperties.CATALOG_IMPL, DeltaCatalog.class.getCanonicalName())
.put(CatalogProperties.WAREHOUSE_LOCATION, "s3a://bucket/warehouse/")
.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.aws.s3.S3FileIO")
.put("s3.endpoint", "http://localhost:" + MINIO_PORT)
.put("s3.access-key-id", AWS_ACCESS_KEY)
.put("s3.secret-access-key", AWS_SECRET_KEY)
.put("s3.path-style-access", "true")
.put("client.region", AWS_REGION)
.build());

return deltaCatalog;

// String localCatalogUri = "http://localhost:" + CATALOG_PORT;
// RESTCatalog result = new RESTCatalog();
// result.initialize(
// "local",
// ImmutableMap.<String, String>builder()
// .put(CatalogProperties.URI, localCatalogUri)
// .put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.aws.s3.S3FileIO")
// .put("s3.endpoint", "http://localhost:" + MINIO_PORT)
// .put("s3.access-key-id", AWS_ACCESS_KEY)
// .put("s3.secret-access-key", AWS_SECRET_KEY)
// .put("s3.path-style-access", "true")
// .put("client.region", AWS_REGION)
// .build());
// return result;
}

public Map<String, String> connectorCatalogProperties() {
return deltaCatalogProperties();
// return ImmutableMap.<String, Object>builder()
// .put(
// "iceberg.catalog." + CatalogUtil.ICEBERG_CATALOG_TYPE,
// CatalogUtil.ICEBERG_CATALOG_TYPE_REST)
// .put("iceberg.catalog." + CatalogProperties.URI, "http://iceberg:" + CATALOG_PORT)
// temporary change to use UnityCatalog
// .put(
// "iceberg.catalog." + CatalogProperties.CATALOG_IMPL,
// DeltaCatalog.class.getCanonicalName())
// .put(
// "iceberg.catalog." + CatalogProperties.FILE_IO_IMPL,
// "org.apache.iceberg.aws.s3.S3FileIO")
// .put("iceberg.catalog.s3.endpoint", "http://minio:" + MINIO_PORT)
// .put("iceberg.catalog.s3.access-key-id", AWS_ACCESS_KEY)
// .put("iceberg.catalog.s3.secret-access-key", AWS_SECRET_KEY)
// .put("iceberg.catalog.s3.path-style-access", true)
// .put("iceberg.catalog.client.region", AWS_REGION)
// .build();
}

private Map<String, String> deltaCatalogProperties() {
return ImmutableMap.<String, String>builder()
.put(
"iceberg.catalog." + CatalogProperties.CATALOG_IMPL,
DeltaCatalog.class.getCanonicalName())
.put(CatalogProperties.WAREHOUSE_LOCATION, "s3a://bucket/warehouse/")
.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.aws.s3.S3FileIO")
.put("s3.endpoint", "http://localhost:" + MINIO_PORT)
.put("s3.access-key-id", AWS_ACCESS_KEY)
.put("s3.secret-access-key", AWS_SECRET_KEY)
.put("s3.path-style-access", "true")
.put("client.region", AWS_REGION)
.put("iceberg.catalog." + CatalogProperties.WAREHOUSE_LOCATION, "s3a://bucket/warehouse/")
.put(
"iceberg.catalog." + CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.aws.s3.S3FileIO")
.put("iceberg.catalog.s3.endpoint", "http://localhost:" + MINIO_PORT)
.put("iceberg.catalog.s3.access-key-id", AWS_ACCESS_KEY)
.put("iceberg.catalog.s3.secret-access-key", AWS_SECRET_KEY)
.put("iceberg.catalog.s3.path-style-access", "true")
.put("iceberg.catalog.client.region", AWS_REGION)
.build();
}

Expand Down

0 comments on commit 731a9d8

Please sign in to comment.