diff --git a/docs/en/connector-v2/sink/Amazondynamodb.md b/docs/en/connector-v2/sink/Amazondynamodb.md new file mode 100644 index 00000000000..47f003d6a49 --- /dev/null +++ b/docs/en/connector-v2/sink/Amazondynamodb.md @@ -0,0 +1,69 @@ + +# Amazondynamodb + +> Amazondynamodb sink connector + +## Description + +Write data to `Amazondynamodb` + +## Key features + +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [schema projection](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +|----------------- | ------ |----------| ------------- | +| url | string | yes | - | +| region | string | yes | - | +| access_key_id | string | yes | - | +| secret_access_key| string | yes | - | +| table | string | yes | - | +| batch_size | string | no | 25 | +| batch_interval_ms| string | no | 1000 | +| common-options | | no | - | + +### url [string] + +url to write to Amazondynamodb. + +### region [string] + +The region of Amazondynamodb. + +### accessKeyId [string] + +The access id of Amazondynamodb. + +### secretAccessKey [string] + +The access secret of Amazondynamodb. + +### table [string] + +The table of Amazondynamodb. + +### common options + +Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. + +## Example + +```bash +Amazondynamodb { + url = "http://127.0.0.1:8000" + region = "us-east-1" + accessKeyId = "dummy-key" + secretAccessKey = "dummy-secret" + table = "TableName" + } +``` + +## Changelog + +### next version + +- Add Amazondynamodb Sink Connector + diff --git a/docs/en/connector-v2/source/Amazondynamodb.md b/docs/en/connector-v2/source/Amazondynamodb.md new file mode 100644 index 00000000000..050a95f951a --- /dev/null +++ b/docs/en/connector-v2/source/Amazondynamodb.md @@ -0,0 +1,108 @@ +# Amazondynamodb + +> Amazondynamodb source connector + +## Description + +Read data from Amazondynamodb. + +## Key features + +- [x] [batch](../../concept/connector-v2-features.md) +- [ ] [stream](../../concept/connector-v2-features.md) +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [x] [schema projection](../../concept/connector-v2-features.md) +- [ ] [parallelism](../../concept/connector-v2-features.md) +- [ ] [support user-defined split](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +| ---------------- | ------ | -------- | ------------- | +| url | string | yes | - | +| region | string | yes | - | +| access_key_id | string | yes | - | +| secret_access_key| string | yes | - | +| table | string | yes | - | +| schema | object | yes | - | +| common-options | | yes | - | + +### url [string] + +url to read to Amazondynamodb. + +### region [string] + +The region of Amazondynamodb. + +### accessKeyId [string] + +The access id of Amazondynamodb. + +### secretAccessKey [string] + +The access secret of Amazondynamodb. + +### table [string] + +The table of Amazondynamodb. + +### schema [object] + +#### fields [Config] + +Amazon Dynamodb is a NOSQL database service of support keys-value storage and document data structure,there is no way to get the data type.Therefore, we must configure schma. + +such as: + +``` +schema { + fields { + id = int + key_aa = string + key_bb = string + } +} +``` + +### common options + +Source Plugin common parameters, refer to [Source Plugin](common-options.md) for details + +## Example + +```bash + Amazondynamodb { + url = "http://127.0.0.1:8000" + region = "us-east-1" + accessKeyId = "dummy-key" + secretAccessKey = "dummy-secret" + table = "TableName" + schema = { + fields { + artist = string + c_map = "map>" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(30, 8)" + c_null = "null" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } + } +``` + +## Changelog + +### next version + +- Add Amazondynamodb Source Connector diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 8080a44f444..1555c1c1427 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -136,4 +136,6 @@ seatunnel.sink.MongoDB = connector-mongodb seatunnel.source.Iceberg = connector-iceberg seatunnel.source.InfluxDB = connector-influxdb seatunnel.source.S3File = connector-file-s3 -seatunnel.sink.S3File = connector-file-s3 \ No newline at end of file +seatunnel.sink.S3File = connector-file-s3 +seatunnel.source.Amazondynamodb = connector-amazondynamodb +seatunnel.sink.Amazondynamodb = connector-amazondynamodb diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/pom.xml b/seatunnel-connectors-v2/connector-amazondynamodb/pom.xml new file mode 100644 index 00000000000..f540386b16a --- /dev/null +++ b/seatunnel-connectors-v2/connector-amazondynamodb/pom.xml @@ -0,0 +1,64 @@ + + + + + seatunnel-connectors-v2 + org.apache.seatunnel + ${revision} + + 4.0.0 + + connector-amazondynamodb + + + 2.18.1 + + + + + + software.amazon.awssdk + bom + ${amazon.awssdk} + pom + import + + + + + + + org.apache.seatunnel + connector-common + ${project.version} + + + software.amazon.awssdk + dynamodb-enhanced + + + software.amazon.awssdk + dynamodb + + + + diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbConfig.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbConfig.java new file mode 100644 index 00000000000..46f5a7f4e80 --- /dev/null +++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbConfig.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config; + +import java.io.Serializable; + +public class AmazondynamodbConfig implements Serializable { + public static final String URL = "url"; + public static final String REGION = "region"; + public static final String ACCESS_KEY_ID = "access_key_id"; + public static final String SECRET_ACCESS_KEY = "secret_access_key"; + public static final String TABLE = "table"; + public static final String BATCH_SIZE = "batch_size"; + public static final String DEFAULT_BATCH_INTERVAL_MS = "batch_interval_ms"; +} diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java new file mode 100644 index 00000000000..0d0eec4a4b1 --- /dev/null +++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazondynamodbSourceOptions.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config; + +import org.apache.seatunnel.connectors.seatunnel.common.config.CommonConfig; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; + +@Data +@AllArgsConstructor +public class AmazondynamodbSourceOptions implements Serializable { + + private static final int DEFAULT_BATCH_SIZE = 25; + private static final int DEFAULT_BATCH_INTERVAL_MS = 1000; + + private String url; + + private String region; + + private String accessKeyId; + + private String secretAccessKey; + + private String table; + + private Config schema; + + public int batchSize = DEFAULT_BATCH_SIZE; + public int batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS; + + public AmazondynamodbSourceOptions(Config config) { + this.url = config.getString(AmazondynamodbConfig.URL); + this.region = config.getString(AmazondynamodbConfig.REGION); + this.accessKeyId = config.getString(AmazondynamodbConfig.ACCESS_KEY_ID); + this.secretAccessKey = config.getString(AmazondynamodbConfig.SECRET_ACCESS_KEY); + this.table = config.getString(AmazondynamodbConfig.TABLE); + + if (config.hasPath(CommonConfig.SCHEMA)) { + this.schema = config.getConfig(CommonConfig.SCHEMA); + } + if (config.hasPath(AmazondynamodbConfig.BATCH_SIZE)) { + this.batchSize = config.getInt(AmazondynamodbConfig.BATCH_SIZE); + } + if (config.hasPath(AmazondynamodbConfig.DEFAULT_BATCH_INTERVAL_MS)) { + this.batchIntervalMs = config.getInt(AmazondynamodbConfig.DEFAULT_BATCH_INTERVAL_MS); + } + } +} diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowDeserializer.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowDeserializer.java new file mode 100644 index 00000000000..e70c1c1285a --- /dev/null +++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowDeserializer.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import lombok.AllArgsConstructor; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; + +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@AllArgsConstructor +public class DefaultSeaTunnelRowDeserializer implements SeaTunnelRowDeserializer { + + private final SeaTunnelRowType typeInfo; + + @Override + public SeaTunnelRow deserialize(Map item) { + SeaTunnelDataType[] seaTunnelDataTypes = typeInfo.getFieldTypes(); + return new SeaTunnelRow(convertRow(seaTunnelDataTypes, item).toArray()); + } + + private List convertRow(SeaTunnelDataType[] seaTunnelDataTypes, Map item) { + List fields = new ArrayList<>(); + String[] fieldNames = typeInfo.getFieldNames(); + for (int i = 0; i < seaTunnelDataTypes.length; i++) { + SeaTunnelDataType seaTunnelDataType = seaTunnelDataTypes[i]; + AttributeValue attributeValue = item.get(fieldNames[i]); + fields.add(convert(seaTunnelDataType, attributeValue)); + } + return fields; + } + + private Object convert(SeaTunnelDataType seaTunnelDataType, AttributeValue attributeValue) { + if (attributeValue.type().equals(AttributeValue.Type.NUL)) { + return null; + } else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) { + return attributeValue.bool(); + } else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) { + if (attributeValue.n() != null) { + return Byte.parseByte(attributeValue.n()); + } + return attributeValue.s().getBytes(StandardCharsets.UTF_8)[0]; + } else if (BasicType.SHORT_TYPE.equals(seaTunnelDataType)) { + return Short.parseShort(attributeValue.n()); + } else if (BasicType.INT_TYPE.equals(seaTunnelDataType)) { + return Integer.parseInt(attributeValue.n()); + } else if (BasicType.LONG_TYPE.equals(seaTunnelDataType)) { + return Long.parseLong(attributeValue.n()); + } else if (seaTunnelDataType instanceof DecimalType) { + return new BigDecimal(attributeValue.n()); + } else if (BasicType.FLOAT_TYPE.equals(seaTunnelDataType)) { + return Float.parseFloat(attributeValue.n()); + } else if (BasicType.DOUBLE_TYPE.equals(seaTunnelDataType)) { + return Double.parseDouble(attributeValue.n()); + } else if (BasicType.STRING_TYPE.equals(seaTunnelDataType)) { + return attributeValue.s(); + } else if (LocalTimeType.LOCAL_TIME_TYPE.equals(seaTunnelDataType)) { + return LocalTime.parse(attributeValue.s()); + } else if (LocalTimeType.LOCAL_DATE_TYPE.equals(seaTunnelDataType)) { + return LocalDate.parse(attributeValue.s()); + } else if (LocalTimeType.LOCAL_DATE_TIME_TYPE.equals(seaTunnelDataType)) { + return LocalDateTime.parse(attributeValue.s()); + } else if (PrimitiveByteArrayType.INSTANCE.equals(seaTunnelDataType)) { + return attributeValue.b().asByteArray(); + } else if (seaTunnelDataType instanceof MapType) { + Map seatunnelMap = new HashMap<>(); + attributeValue.m().forEach((s, attributeValueInfo) -> { + seatunnelMap.put(s, convert(((MapType) seaTunnelDataType).getValueType(), attributeValueInfo)); + }); + return seatunnelMap; + } else if (seaTunnelDataType instanceof ArrayType) { + Object array = Array.newInstance(String.class, attributeValue.l().size()); + if (attributeValue.hasL()) { + List datas = attributeValue.l(); + array = Array.newInstance(((ArrayType) seaTunnelDataType).getElementType().getTypeClass(), attributeValue.l().size()); + for (int index = 0; index < datas.size(); index++) { + Array.set(array, index, convert(((ArrayType) seaTunnelDataType).getElementType(), datas.get(index))); + } + } else if (attributeValue.hasSs()) { + List datas = attributeValue.ss(); + for (int index = 0; index < datas.size(); index++) { + Array.set(array, index, AttributeValue.fromS(datas.get(index))); + } + } else if (attributeValue.hasNs()) { + List datas = attributeValue.ns(); + for (int index = 0; index < datas.size(); index++) { + Array.set(array, index, AttributeValue.fromS(datas.get(index))); + } + } else if (attributeValue.hasBs()) { + List datas = attributeValue.bs(); + for (int index = 0; index < datas.size(); index++) { + Array.set(array, index, AttributeValue.fromB(datas.get(index))); + } + } + return array; + } else { + throw new IllegalStateException("Unexpected value: " + seaTunnelDataType); + } + } + +} diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowSerializer.java new file mode 100644 index 00000000000..017436c20b3 --- /dev/null +++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowSerializer.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbSourceOptions; + +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer { + + private final SeaTunnelRowType seaTunnelRowType; + private final AmazondynamodbSourceOptions amazondynamodbSourceOptions; + private final List measurementsType; + + public DefaultSeaTunnelRowSerializer(SeaTunnelRowType seaTunnelRowType, AmazondynamodbSourceOptions amazondynamodbSourceOptions) { + this.seaTunnelRowType = seaTunnelRowType; + this.amazondynamodbSourceOptions = amazondynamodbSourceOptions; + this.measurementsType = convertTypes(seaTunnelRowType); + } + + @Override + public PutItemRequest serialize(SeaTunnelRow seaTunnelRow) { + HashMap itemValues = new HashMap<>(); + for (int index = 0; index < seaTunnelRowType.getFieldNames().length; index++) { + itemValues.put(seaTunnelRowType.getFieldName(index), convertItem(seaTunnelRow.getField(index), + seaTunnelRowType.getFieldType(index), + measurementsType.get(index))); + } + return PutItemRequest.builder() + .tableName(amazondynamodbSourceOptions.getTable()) + .item(itemValues) + .build(); + } + + private List convertTypes(SeaTunnelRowType seaTunnelRowType) { + return Arrays.stream(seaTunnelRowType.getFieldTypes()).map(this::convertType).collect(Collectors.toList()); + } + + private AttributeValue.Type convertType(SeaTunnelDataType seaTunnelDataType) { + switch (seaTunnelDataType.getSqlType()) { + case INT: + case TINYINT: + case SMALLINT: + case BIGINT: + case FLOAT: + case DOUBLE: + case DECIMAL: + return AttributeValue.Type.N; + case STRING: + case DATE: + case TIME: + case TIMESTAMP: + return AttributeValue.Type.S; + case BOOLEAN: + return AttributeValue.Type.BOOL; + case NULL: + return AttributeValue.Type.NUL; + case BYTES: + return AttributeValue.Type.B; + case MAP: + return AttributeValue.Type.M; + case ARRAY: + return AttributeValue.Type.L; + default: + throw new UnsupportedOperationException("Unsupported dataType: " + seaTunnelDataType); + } + } + + private AttributeValue convertItem(Object value, SeaTunnelDataType seaTunnelDataType, AttributeValue.Type measurementsType) { + if (value == null) { + return AttributeValue.builder().nul(true).build(); + } + switch (measurementsType) { + case N: + return AttributeValue.builder().n(Integer.toString(((Number) value).intValue())).build(); + case S: + return AttributeValue.builder().s(String.valueOf(value)).build(); + case BOOL: + return AttributeValue.builder().bool((Boolean) value).build(); + case B: + return AttributeValue.builder().b(SdkBytes.fromByteArrayUnsafe((byte[]) value)).build(); + case SS: + return AttributeValue.builder().ss((Collection) value).build(); + case NS: + return AttributeValue.builder().ns(((Collection) value) + .stream().map(Object::toString).collect(Collectors.toList())).build(); + case BS: + return AttributeValue.builder().bs( + ((Collection) value) + .stream().map(number -> + SdkBytes.fromByteArray((byte[]) value)).collect(Collectors.toList()) + ).build(); + case M: + MapType mapType = (MapType) seaTunnelDataType; + Map map = (Map) value; + Map resultMap = new HashMap<>(map.size()); + for (Map.Entry entry : map.entrySet()) { + String mapKeyName = entry.getKey(); + resultMap.put(mapKeyName, convertItem(entry.getValue(), mapType.getValueType(), convertType(mapType.getValueType()))); + } + return AttributeValue.builder().m(resultMap).build(); + case L: + ArrayType arrayType = (ArrayType) seaTunnelDataType; + BasicType elementType = arrayType.getElementType(); + Object[] l = (Object[]) value; + return AttributeValue.builder() + .l(Stream.of(l).map(o -> convertItem(o, elementType, convertType(elementType))) + .collect(Collectors.toList())).build(); + case NUL: + return AttributeValue.builder().nul(true).build(); + default: + throw new UnsupportedOperationException("Unsupported dataType: " + measurementsType); + } + } + +} diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/SeaTunnelRowDeserializer.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/SeaTunnelRowDeserializer.java new file mode 100644 index 00000000000..5596701a1a3 --- /dev/null +++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/SeaTunnelRowDeserializer.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; + +import java.util.Map; + +public interface SeaTunnelRowDeserializer { + + SeaTunnelRow deserialize(Map item); +} diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/SeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/SeaTunnelRowSerializer.java new file mode 100644 index 00000000000..c4b30e27f0c --- /dev/null +++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/SeaTunnelRowSerializer.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; + +public interface SeaTunnelRowSerializer { + + PutItemRequest serialize(SeaTunnelRow seaTunnelRow); +} diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbSink.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbSink.java new file mode 100644 index 00000000000..3d196f92bd8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbSink.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink; + +import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbConfig.ACCESS_KEY_ID; +import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbConfig.REGION; +import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbConfig.SECRET_ACCESS_KEY; +import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbConfig.TABLE; +import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbConfig.URL; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbSourceOptions; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; + +import java.io.IOException; + +@AutoService(SeaTunnelSink.class) +public class AmazondynamodbSink extends AbstractSimpleSink { + + private SeaTunnelRowType rowType; + + private AmazondynamodbSourceOptions amazondynamodbSourceOptions; + + @Override + public String getPluginName() { + return "Amazondynamodb"; + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, URL, TABLE, REGION, ACCESS_KEY_ID, SECRET_ACCESS_KEY); + if (!result.isSuccess()) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); + } + amazondynamodbSourceOptions = new AmazondynamodbSourceOptions(pluginConfig); + } + + @Override + public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { + this.rowType = seaTunnelRowType; + } + + @Override + public SeaTunnelDataType getConsumedType() { + return rowType; + } + + @Override + public AbstractSinkWriter createWriter(SinkWriter.Context context) throws IOException { + return new AmazondynamodbWriter(amazondynamodbSourceOptions, rowType); + } +} diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbWriter.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbWriter.java new file mode 100644 index 00000000000..9f6405790b6 --- /dev/null +++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazondynamodbWriter.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbSourceOptions; +import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.DefaultSeaTunnelRowSerializer; +import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.SeaTunnelRowSerializer; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; + +import java.io.IOException; + +public class AmazondynamodbWriter extends AbstractSinkWriter { + + private final DdynamoDbSinkClient ddynamoDbSinkClient; + private final SeaTunnelRowSerializer serializer; + + public AmazondynamodbWriter(AmazondynamodbSourceOptions amazondynamodbSourceOptions, SeaTunnelRowType seaTunnelRowType) { + ddynamoDbSinkClient = new DdynamoDbSinkClient(amazondynamodbSourceOptions, seaTunnelRowType); + serializer = new DefaultSeaTunnelRowSerializer(seaTunnelRowType, amazondynamodbSourceOptions); + } + + @Override + public void write(SeaTunnelRow element) throws IOException { + ddynamoDbSinkClient.write(serializer.serialize(element)); + } + + @Override + public void close() throws IOException { + ddynamoDbSinkClient.close(); + } +} diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DdynamoDbSinkClient.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DdynamoDbSinkClient.java new file mode 100644 index 00000000000..8066261aeb5 --- /dev/null +++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DdynamoDbSinkClient.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink; + +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbSourceOptions; +import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.DefaultSeaTunnelRowDeserializer; +import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.SeaTunnelRowDeserializer; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.services.dynamodb.model.PutRequest; +import software.amazon.awssdk.services.dynamodb.model.WriteRequest; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class DdynamoDbSinkClient { + private final AmazondynamodbSourceOptions amazondynamodbSourceOptions; + private ScheduledExecutorService scheduler; + private ScheduledFuture scheduledFuture; + private volatile boolean initialize; + private volatile Exception flushException; + private DynamoDbClient dynamoDbClient; + private final List batchList; + protected SeaTunnelRowDeserializer seaTunnelRowDeserializer; + + public DdynamoDbSinkClient(AmazondynamodbSourceOptions amazondynamodbSourceOptions, SeaTunnelRowType typeInfo) { + this.amazondynamodbSourceOptions = amazondynamodbSourceOptions; + this.batchList = new ArrayList<>(); + this.seaTunnelRowDeserializer = new DefaultSeaTunnelRowDeserializer(typeInfo); + } + + private void tryInit() throws IOException { + if (initialize) { + return; + } + dynamoDbClient = DynamoDbClient.builder() + .endpointOverride(URI.create(amazondynamodbSourceOptions.getUrl())) + // The region is meaningless for local DynamoDb but required for client builder validation + .region(Region.of(amazondynamodbSourceOptions.getRegion())) + .credentialsProvider(StaticCredentialsProvider.create( + AwsBasicCredentials.create(amazondynamodbSourceOptions.getAccessKeyId(), amazondynamodbSourceOptions.getSecretAccessKey()))) + .build(); + + scheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("DdynamoDb-sink-output-%s").build()); + scheduledFuture = scheduler.scheduleAtFixedRate( + () -> { + try { + flush(); + } catch (IOException e) { + flushException = e; + } + }, + amazondynamodbSourceOptions.getBatchIntervalMs(), + amazondynamodbSourceOptions.getBatchIntervalMs(), + TimeUnit.MILLISECONDS); + + initialize = true; + } + + public synchronized void write(PutItemRequest putItemRequest) throws IOException { + tryInit(); + checkFlushException(); + batchList.add(WriteRequest.builder().putRequest( + PutRequest.builder().item(putItemRequest.item()).build()).build()); + if (amazondynamodbSourceOptions.getBatchSize() > 0 + && batchList.size() >= amazondynamodbSourceOptions.getBatchSize()) { + flush(); + } + } + + public synchronized void close() throws IOException { + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + scheduler.shutdown(); + } + if (dynamoDbClient != null) { + flush(); + dynamoDbClient.close(); + } + } + + synchronized void flush() throws IOException { + checkFlushException(); + if (batchList.isEmpty()) { + return; + } + Map> requestItems = new HashMap<>(1); + requestItems.put(amazondynamodbSourceOptions.getTable(), batchList); + dynamoDbClient.batchWriteItem(BatchWriteItemRequest + .builder() + .requestItems(requestItems) + .build()); + + batchList.clear(); + } + + private void checkFlushException() { + if (flushException != null) { + throw new RuntimeException("Writing items to DdynamoDb failed.", flushException); + } + } + +} diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSource.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSource.java new file mode 100644 index 00000000000..40ac9eeef34 --- /dev/null +++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSource.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.source; + +import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbConfig.ACCESS_KEY_ID; +import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbConfig.REGION; +import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbConfig.SECRET_ACCESS_KEY; +import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbConfig.TABLE; +import static org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbConfig.URL; +import static org.apache.seatunnel.connectors.seatunnel.common.config.CommonConfig.SCHEMA; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbSourceOptions; +import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@AutoService(SeaTunnelSource.class) +public class AmazondynamodbSource extends AbstractSingleSplitSource { + + private AmazondynamodbSourceOptions amazondynamodbSourceOptions; + + private SeaTunnelRowType typeInfo; + + @Override + public String getPluginName() { + return "Amazondynamodb"; + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, URL, TABLE, REGION, ACCESS_KEY_ID, SECRET_ACCESS_KEY, SCHEMA); + if (!result.isSuccess()) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); + } + amazondynamodbSourceOptions = new AmazondynamodbSourceOptions(pluginConfig); + typeInfo = SeaTunnelSchema.buildWithConfig(amazondynamodbSourceOptions.getSchema()).getSeaTunnelRowType(); + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public SeaTunnelDataType getProducedType() { + return this.typeInfo; + } + + @Override + public AbstractSingleSplitReader createReader(SingleSplitReaderContext readerContext) throws Exception { + return new AmazondynamodbSourceReader(readerContext, amazondynamodbSourceOptions, typeInfo); + } +} diff --git a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java new file mode 100644 index 00000000000..fe8fd47bb7d --- /dev/null +++ b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazondynamodbSourceReader.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.source; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazondynamodbSourceOptions; +import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.DefaultSeaTunnelRowDeserializer; +import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.SeaTunnelRowDeserializer; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; + +import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.ScanRequest; +import software.amazon.awssdk.services.dynamodb.model.ScanResponse; + +import java.io.IOException; +import java.net.URI; + +@Slf4j +public class AmazondynamodbSourceReader extends AbstractSingleSplitReader { + + protected DynamoDbClient dynamoDbClient; + protected SingleSplitReaderContext context; + protected AmazondynamodbSourceOptions amazondynamodbSourceOptions; + protected SeaTunnelRowDeserializer seaTunnelRowDeserializer; + + public AmazondynamodbSourceReader(SingleSplitReaderContext context, + AmazondynamodbSourceOptions amazondynamodbSourceOptions, + SeaTunnelRowType typeInfo) { + this.context = context; + this.amazondynamodbSourceOptions = amazondynamodbSourceOptions; + this.seaTunnelRowDeserializer = new DefaultSeaTunnelRowDeserializer(typeInfo); + } + + @Override + public void open() throws Exception { + dynamoDbClient = DynamoDbClient.builder() + .endpointOverride(URI.create(amazondynamodbSourceOptions.getUrl())) + // The region is meaningless for local DynamoDb but required for client builder validation + .region(Region.of(amazondynamodbSourceOptions.getRegion())) + .credentialsProvider(StaticCredentialsProvider.create( + AwsBasicCredentials.create(amazondynamodbSourceOptions.getAccessKeyId(), amazondynamodbSourceOptions.getSecretAccessKey()))) + .build(); + } + + @Override + public void close() throws IOException { + dynamoDbClient.close(); + } + + @Override + @SuppressWarnings("magicnumber") + public void pollNext(Collector output) throws Exception { + ScanResponse scan = dynamoDbClient.scan(ScanRequest.builder() + .tableName(amazondynamodbSourceOptions.getTable()) + .build()); + if (scan.hasItems()) { + scan.items().forEach(item -> { + output.collect(seaTunnelRowDeserializer.deserialize(item)); + }); + } + context.signalNoMoreElement(); + } +} diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/config/CommonConfig.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/config/CommonConfig.java new file mode 100644 index 00000000000..bb695ea88da --- /dev/null +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/config/CommonConfig.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.common.config; + +public class CommonConfig { + public static final String SCHEMA = "schema"; +} diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index ec05e06e584..8ca99e023cf 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -56,6 +56,7 @@ connector-mongodb connector-iceberg connector-influxdb + connector-amazondynamodb diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index cac91e97d0b..b810086dfa4 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -297,6 +297,12 @@ ${project.version} provided + + org.apache.seatunnel + connector-amazondynamodb + ${project.version} + provided + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/pom.xml new file mode 100644 index 00000000000..e25e5d4e07b --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/pom.xml @@ -0,0 +1,54 @@ + + + + + org.apache.seatunnel + seatunnel-connector-v2-e2e + ${revision} + + 4.0.0 + connector-amazondynamodb-e2e + + 2.18.1 + + + + + software.amazon.awssdk + bom + ${amazon.awssdk} + pom + import + + + + + + org.apache.seatunnel + connector-amazondynamodb + ${project.version} + test + + + software.amazon.awssdk + dynamodb + test + + + + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/amazondynamodb/AmazondynamodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/amazondynamodb/AmazondynamodbIT.java new file mode 100644 index 00000000000..de78241c9c5 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/amazondynamodb/AmazondynamodbIT.java @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.amazondynamodb; + +import static org.awaitility.Awaitility.given; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; + +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerLoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.core.waiters.WaiterResponse; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; +import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; +import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; +import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; +import software.amazon.awssdk.services.dynamodb.model.DynamoDbException; +import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; +import software.amazon.awssdk.services.dynamodb.model.KeyType; +import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; +import software.amazon.awssdk.services.dynamodb.model.ScanRequest; +import software.amazon.awssdk.services.dynamodb.model.ScanResponse; +import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbWaiter; + +import java.math.BigDecimal; +import java.net.ConnectException; +import java.net.URI; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Slf4j +public class AmazondynamodbIT extends TestSuiteBase implements TestResource { + private static final String AMAZONDYNAMODB_DOCKER_IMAGE = "amazon/dynamodb-local"; + private static final String AMAZONDYNAMODB_CONTAINER_HOST = "dynamodb-host"; + private static final int AMAZONDYNAMODB_CONTAINER_PORT = 8000; + private static final String AMAZONDYNAMODB_JOB_CONFIG = "/amazondynamodbIT_source_to_sink.conf"; + private static final String SINK_TABLE = "sink_table"; + private static final String SOURCE_TABLE = "source_table"; + private static final String PARTITION_KEY = "id"; + + private GenericContainer dynamoDB; + protected DynamoDbClient dynamoDbClient; + + @TestTemplate + public void testAmazondynamodb(TestContainer container) throws Exception { + Container.ExecResult execResult = container.executeJob(AMAZONDYNAMODB_JOB_CONFIG); + Assertions.assertEquals(0, execResult.getExitCode()); + assertHasData(); + compareResult(); + clearSinkTable(); + } + + @BeforeAll + @Override + public void startUp() throws Exception { + dynamoDB = new GenericContainer<>(AMAZONDYNAMODB_DOCKER_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(AMAZONDYNAMODB_CONTAINER_HOST) + .withExposedPorts(AMAZONDYNAMODB_CONTAINER_PORT) + .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(AMAZONDYNAMODB_DOCKER_IMAGE))); + dynamoDB.setPortBindings(Lists.newArrayList(String.format("%s:%s", AMAZONDYNAMODB_CONTAINER_PORT, AMAZONDYNAMODB_CONTAINER_PORT))); + Startables.deepStart(Stream.of(dynamoDB)).join(); + log.info("dynamodb container started"); + given().ignoreExceptions() + .await() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(120, TimeUnit.SECONDS) + .untilAsserted(this::initializeDynamodbClient); + batchInsertData(); + + } + + @AfterAll + @Override + public void tearDown() throws Exception { + if (dynamoDB != null) { + dynamoDB.close(); + } + } + + private void initializeDynamodbClient() throws ConnectException { + dynamoDbClient = DynamoDbClient.builder() + .endpointOverride(URI.create("http://" + dynamoDB.getHost() + ":" + AMAZONDYNAMODB_CONTAINER_PORT)) + // The region is meaningless for local DynamoDb but required for client builder validation + .region(Region.US_EAST_1) + .credentialsProvider(StaticCredentialsProvider.create( + AwsBasicCredentials.create("dummy-key", "dummy-secret"))) + .build(); + + createTable(dynamoDbClient, SOURCE_TABLE); + createTable(dynamoDbClient, SINK_TABLE); + } + + private void batchInsertData() { + dynamoDbClient.putItem(PutItemRequest.builder() + .tableName(SOURCE_TABLE) + .item(randomRow()) + .build()); + } + + private void clearSinkTable() { + dynamoDbClient.deleteTable(DeleteTableRequest.builder().tableName(SINK_TABLE).build()); + createTable(dynamoDbClient, SINK_TABLE); + } + + private void assertHasData() { + ScanResponse scan = dynamoDbClient.scan(ScanRequest.builder().tableName(SINK_TABLE).build()); + Assertions.assertTrue(scan.hasItems(), "sink table is empty."); + } + + private void compareResult() { + Map sourceAttributeValueMap = dynamoDbClient.scan(ScanRequest.builder().tableName(SOURCE_TABLE).build()).items().get(0); + Map sinkAttributeValueMap = dynamoDbClient.scan(ScanRequest.builder().tableName(SINK_TABLE).build()).items().get(0); + sourceAttributeValueMap.keySet().forEach(key -> { + AttributeValue sourceAttributeValue = sourceAttributeValueMap.get(key); + AttributeValue sinkAttributeValue = sinkAttributeValueMap.get(key); + Assertions.assertEquals(sourceAttributeValue, sinkAttributeValue); + }); + + } + + private Map randomRow() { + SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType( + new String[]{ + "id", + "c_map", + "c_array", + "c_string", + "c_boolean", + "c_tinyint", + "c_smallint", + "c_int", + "c_bigint", + "c_float", + "c_double", + "c_decimal", + "c_bytes", + "c_date", + "c_timestamp" + }, + new SeaTunnelDataType[]{ + BasicType.STRING_TYPE, + new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE), + ArrayType.BYTE_ARRAY_TYPE, + BasicType.STRING_TYPE, + BasicType.BOOLEAN_TYPE, + BasicType.BYTE_TYPE, + BasicType.SHORT_TYPE, + BasicType.INT_TYPE, + BasicType.LONG_TYPE, + BasicType.FLOAT_TYPE, + BasicType.DOUBLE_TYPE, + new DecimalType(2, 1), + PrimitiveByteArrayType.INSTANCE, + LocalTimeType.LOCAL_DATE_TYPE, + LocalTimeType.LOCAL_DATE_TIME_TYPE + } + ); + + SeaTunnelRow row = new SeaTunnelRow( + new Object[]{ + "1", + Collections.singletonMap("key", Short.parseShort("1")), + new Byte[]{Byte.parseByte("1")}, + "string", + Boolean.FALSE, + Byte.parseByte("1"), + Short.parseShort("1"), + Integer.parseInt("1"), + Long.parseLong("1"), + Float.parseFloat("1.1"), + Double.parseDouble("1.1"), + BigDecimal.valueOf(11, 1), + "test".getBytes(), + LocalDate.now(), + LocalDateTime.now() + }); + + Map data = new HashMap<>(seatunnelRowType.getTotalFields()); + for (int index = 0; index < seatunnelRowType.getTotalFields(); index++) { + data.put(seatunnelRowType.getFieldName(index), + convertItem(row.getField(index), seatunnelRowType.getFieldType(index), convertType(seatunnelRowType.getFieldType(index)))); + } + return data; + } + + private static void createTable(DynamoDbClient ddb, String tableName) { + DynamoDbWaiter dbWaiter = ddb.waiter(); + CreateTableRequest request = CreateTableRequest.builder() + .attributeDefinitions(AttributeDefinition.builder() + .attributeName(PARTITION_KEY) + .attributeType(ScalarAttributeType.S) + .build()) + .keySchema(KeySchemaElement.builder() + .attributeName(PARTITION_KEY) + .keyType(KeyType.HASH) + .build()) + .provisionedThroughput(ProvisionedThroughput.builder() + .readCapacityUnits(10L) + .writeCapacityUnits(10L) + .build()) + .tableName(tableName) + .build(); + + try { + ddb.createTable(request); + DescribeTableRequest tableRequest = DescribeTableRequest.builder() + .tableName(tableName) + .build(); + + // Wait until the Amazon DynamoDB table is created. + WaiterResponse waiterResponse = dbWaiter.waitUntilTableExists(tableRequest); + waiterResponse.matched().response().ifPresent(describeTableResponse -> { + log.info(describeTableResponse.toString()); + }); + + } catch (DynamoDbException e) { + log.error(e.getMessage()); + } + } + + private AttributeValue convertItem(Object value, SeaTunnelDataType seaTunnelDataType, AttributeValue.Type measurementsType) { + if (value == null) { + return AttributeValue.builder().nul(true).build(); + } + switch (measurementsType) { + case N: + return AttributeValue.builder().n(Integer.toString(((Number) value).intValue())).build(); + case S: + return AttributeValue.builder().s(String.valueOf(value)).build(); + case BOOL: + return AttributeValue.builder().bool((Boolean) value).build(); + case B: + return AttributeValue.builder().b(SdkBytes.fromByteArrayUnsafe((byte[]) value)).build(); + case SS: + return AttributeValue.builder().ss((Collection) value).build(); + case NS: + return AttributeValue.builder().ns(((Collection) value) + .stream().map(Object::toString).collect(Collectors.toList())).build(); + case BS: + return AttributeValue.builder().bs( + ((Collection) value) + .stream().map(number -> + SdkBytes.fromByteArray((byte[]) value)).collect(Collectors.toList()) + ).build(); + case M: + MapType mapType = (MapType) seaTunnelDataType; + Map map = (Map) value; + Map resultMap = new HashMap<>(map.size()); + for (Map.Entry entry : map.entrySet()) { + String mapKeyName = entry.getKey(); + resultMap.put(mapKeyName, convertItem(entry.getValue(), mapType.getValueType(), convertType(mapType.getValueType()))); + } + return AttributeValue.builder().m(resultMap).build(); + case L: + ArrayType arrayType = (ArrayType) seaTunnelDataType; + BasicType elementType = arrayType.getElementType(); + Object[] l = (Object[]) value; + return AttributeValue.builder() + .l(Stream.of(l).map(o -> convertItem(o, elementType, convertType(elementType))) + .collect(Collectors.toList())).build(); + case NUL: + return AttributeValue.builder().nul(true).build(); + default: + throw new UnsupportedOperationException("Unsupported dataType: " + measurementsType); + } + } + + private AttributeValue.Type convertType(SeaTunnelDataType seaTunnelDataType) { + switch (seaTunnelDataType.getSqlType()) { + case INT: + case TINYINT: + case SMALLINT: + case BIGINT: + case FLOAT: + case DOUBLE: + case DECIMAL: + return AttributeValue.Type.N; + case STRING: + case DATE: + case TIME: + case TIMESTAMP: + return AttributeValue.Type.S; + case BOOLEAN: + return AttributeValue.Type.BOOL; + case NULL: + return AttributeValue.Type.NUL; + case BYTES: + return AttributeValue.Type.B; + case MAP: + return AttributeValue.Type.M; + case ARRAY: + return AttributeValue.Type.L; + default: + throw new UnsupportedOperationException("Unsupported dataType: " + seaTunnelDataType); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/resources/amazondynamodbIT_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/resources/amazondynamodbIT_source_to_sink.conf new file mode 100644 index 00000000000..e3e2e58704a --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/resources/amazondynamodbIT_source_to_sink.conf @@ -0,0 +1,72 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Amazondynamodb { + url = "http://dynamodb-host:8000" + region = "us-east-1" + access_key_id = "dummy-key" + secret_access_key = "dummy-secret" + table = "source_table" + schema = { + fields { + id = string + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(2, 1)" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } + } +} + +transform { + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform/sql +} + +sink { + Amazondynamodb { + url = "http://dynamodb-host:8000" + region = "us-east-1" + access_key_id = "dummy-key" + secret_access_key = "dummy-secret" + table = "sink_table" + } + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/category/sink-v2 +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index 46bb9df8fff..ced7386dd58 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -29,6 +29,7 @@ connector-redis-e2e connector-clickhouse-e2e connector-influxdb-e2e + connector-amazondynamodb-e2e connector-file-local-e2e @@ -56,4 +57,4 @@ - \ No newline at end of file + diff --git a/tools/update_modules_check/update_modules_check.py b/tools/update_modules_check/update_modules_check.py index 59aaccfee66..0600b82f3db 100644 --- a/tools/update_modules_check/update_modules_check.py +++ b/tools/update_modules_check/update_modules_check.py @@ -40,13 +40,13 @@ def get_modules(files, index, start_pre, root_module): update_files = json.loads(files) modules_name_set = set([]) for file in update_files: - module_name = file.split('/')[index] + names = file.split('/') + module_name = names[index] if module_name.startswith(start_pre): modules_name_set.add(module_name) - sub_module_name = file.split('/')[index + 1] - if sub_module_name.startswith(start_pre): - modules_name_set.add(sub_module_name) + if len(names) > index + 1 and names[index + 1].startswith(start_pre): + modules_name_set.add(names[index + 1]) output_module = "" if len(modules_name_set) > 0: @@ -135,4 +135,4 @@ def main(argv): get_sub_modules(argv[2]) if __name__ == "__main__": - main(sys.argv) \ No newline at end of file + main(sys.argv)