Skip to content

Commit

Permalink
Destination Iceberg: Fix tests to run in airbyte-ci (#45206)
Browse files Browse the repository at this point in the history
  • Loading branch information
gisripa authored Sep 17, 2024
1 parent 912bc77 commit daee3b6
Show file tree
Hide file tree
Showing 24 changed files with 419 additions and 321 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.2.0'
cdkVersionRequired = '0.44.23'
features = ['db-destinations']
useLocalCdk = false
}
Expand Down

This file was deleted.

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: df65a8f3-9908-451b-aa9b-445462803560
dockerImageTag: 0.1.7
dockerImageTag: 0.1.8
dockerRepository: airbyte/destination-iceberg
githubIssueLabel: destination-iceberg
icon: iceberg.svg
license: MIT
name: Apache Iceberg
registryOverrides:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,9 @@ public class IcebergConstants {
public static final String CATALOG_NAME = "iceberg";
public static final String DEFAULT_DATABASE = "default";

/**
* constant for QA checks to ignore http endpoint
*/
public static final String HTTP_PREFIX = "http://"; // # ignore-https-check

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession.Builder;
import org.jetbrains.annotations.NotNull;

@Slf4j
public class IcebergDestination extends BaseConnector implements Destination {
Expand All @@ -43,7 +44,7 @@ public static void main(String[] args) throws Exception {
}

@Override
public AirbyteConnectionStatus check(JsonNode config) {
public AirbyteConnectionStatus check(@NotNull JsonNode config) {
try {
IcebergCatalogConfig icebergCatalogConfig = icebergCatalogConfigFactory.fromJsonNodeConfig(config);
icebergCatalogConfig.check();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.destination.iceberg.config.storage;

import static io.airbyte.integrations.destination.iceberg.IcebergConstants.HTTP_PREFIX;
import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_ACCESS_KEY_ID_CONFIG_KEY;
import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_BUCKET_REGION_CONFIG_KEY;
import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_ENDPOINT_CONFIG_KEY;
Expand Down Expand Up @@ -84,14 +85,14 @@ public static S3Config fromDestinationConfig(@Nonnull final JsonNode config) {
// use Amazon S3
builder.sslEnabled(true);
} else {
boolean sslEnabled = !endpointStr.startsWith("http://");
boolean sslEnabled = !endpointStr.startsWith(HTTP_PREFIX);
String pureEndpoint = removeSchemaSuffix(endpointStr);
builder.sslEnabled(sslEnabled);
builder.endpoint(pureEndpoint);
if (sslEnabled) {
builder.endpointWithSchema("https://" + pureEndpoint);
} else {
builder.endpointWithSchema("http://" + pureEndpoint);
builder.endpointWithSchema(HTTP_PREFIX + pureEndpoint);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,25 @@

package io.airbyte.integrations.destination.iceberg;

import static io.airbyte.integrations.destination.iceberg.container.MinioContainer.DEFAULT_ACCESS_KEY;
import static io.airbyte.integrations.destination.iceberg.container.MinioContainer.DEFAULT_SECRET_KEY;
import static org.sparkproject.jetty.util.StringUtil.isNotBlank;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.Bucket;
import com.fasterxml.jackson.databind.JsonNode;
import com.github.dockerjava.api.model.ContainerNetwork;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
import io.airbyte.cdk.integrations.destination.StandardNameTransformer;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.destination.iceberg.config.catalog.IcebergCatalogConfig;
import io.airbyte.integrations.destination.iceberg.config.catalog.IcebergCatalogConfigFactory;
import io.airbyte.integrations.destination.iceberg.config.storage.S3Config;
import io.airbyte.integrations.destination.iceberg.container.MinioContainer;
import io.airbyte.integrations.destination.iceberg.container.MinioContainer.CredentialsProvider;
import java.io.IOException;
import java.sql.Timestamp;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
Expand All @@ -39,7 +33,6 @@
import org.glassfish.jersey.internal.guava.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.lifecycle.Startable;

/**
Expand All @@ -48,22 +41,11 @@
public class IcebergIntegrationTestUtil {

private static final Logger LOGGER = LoggerFactory.getLogger(IcebergIntegrationTestUtil.class);

public static final String ICEBERG_IMAGE_NAME = "airbyte/destination-iceberg:dev";

public static final String WAREHOUSE_BUCKET_NAME = "warehouse";
private static final NamingConventionTransformer namingResolver = new StandardNameTransformer();
private static final IcebergCatalogConfigFactory icebergCatalogConfigFactory = new IcebergCatalogConfigFactory();

public static MinioContainer createAndStartMinioContainer(Integer bindPort) {
CredentialsProvider credentialsProvider = new CredentialsProvider(DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY);
String minioImage = "minio/minio:RELEASE.2022-10-29T06-21-33Z.fips";
MinioContainer container = new MinioContainer(minioImage, credentialsProvider, bindPort);
container.start();
LOGGER.info("==> Started Minio docker container...");
return container;
}

public static void stopAndCloseContainer(Startable container, String name) {
container.stop();
container.close();
Expand Down Expand Up @@ -102,14 +84,4 @@ private static long offsetDataTimeToTimestamp(OffsetDateTime offsetDateTime) {
return Timestamp.valueOf(offsetDateTime.atZoneSameInstant(ZoneOffset.UTC).toLocalDateTime()).getTime();
}

public static String getContainerIpAddr(GenericContainer<?> container) {
for (Entry<String, ContainerNetwork> entry : container.getContainerInfo()
.getNetworkSettings()
.getNetworks()
.entrySet()) {
return entry.getValue().getIpAddress();
}
return container.getContainerIpAddress();
}

}
Loading

0 comments on commit daee3b6

Please sign in to comment.