From 9d1559863acf3dce2c970bf8d96bce8823c41739 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Tue, 4 Mar 2025 17:15:26 +0800 Subject: [PATCH 1/2] [Fix][Connector-V2] Fix maxcompute read with partition spec --- .../maxcompute/catalog/MaxComputeCatalog.java | 27 ------------------- .../maxcompute/source/MaxcomputeSource.java | 13 +++++---- 2 files changed, 8 insertions(+), 32 deletions(-) diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java index 67eec6d5160..d0e05031889 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java @@ -158,14 +158,12 @@ public CatalogTable getTable(TablePath tablePath, List fieldNames) } Table odpsTable; com.aliyun.odps.TableSchema odpsSchema; - boolean isPartitioned; try { Odps odps = getOdps(tablePath.getDatabaseName()); odpsTable = MaxcomputeUtil.parseTable( odps, tablePath.getDatabaseName(), tablePath.getTableName()); odpsSchema = odpsTable.getSchema(); - isPartitioned = odpsTable.isPartitioned(); } catch (Exception ex) { throw new CatalogException(catalogName, ex); } @@ -193,31 +191,6 @@ public CatalogTable getTable(TablePath tablePath, List fieldNames) .build(); return MaxComputeTypeConverter.INSTANCE.convert(typeDefine); }); - if (isPartitioned) { - buildColumnsWithErrorCheck( - tablePath, - builder, - odpsSchema.getPartitionColumns().stream() - .filter( - column -> - fieldNames == null - || fieldNames.isEmpty() - || fieldNames.contains(column.getName())) - .iterator(), - (column) -> { - BasicTypeDefine typeDefine = - BasicTypeDefine.builder() - .name(column.getName()) - .nativeType(column.getTypeInfo()) - .columnType(column.getTypeInfo().getTypeName()) - .dataType(column.getTypeInfo().getTypeName()) - .nullable(column.isNullable()) - .comment(column.getComment()) - .build(); - partitionKeys.add(column.getName()); - return MaxComputeTypeConverter.INSTANCE.convert(typeDefine); - }); - } TableSchema tableSchema = builder.build(); TableIdentifier tableIdentifier = getTableIdentifier(tablePath); return CatalogTable.of( diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java index 824585105ad..cb0349dfb8a 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java @@ -95,9 +95,12 @@ private Map getSourceTableInfos(ReadonlyConfig reado .orElse(readonlyConfig.get(PROJECT)); TablePath tablePath = TablePath.of(project, subReadonlyConfig.get(TABLE_NAME)); - if (subReadonlyConfig - .getOptional(ConnectorCommonOptions.SCHEMA) - .isPresent()) { + String partitionSpec = + subReadonlyConfig + .getOptional(PARTITION_SPEC) + .orElse(readonlyConfig.get(PARTITION_SPEC)); + + if (subReadonlyConfig.getOptional(ConnectorCommonOptions.SCHEMA).isPresent()) { CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(subReadonlyConfig); catalogTable = @@ -108,7 +111,7 @@ private Map getSourceTableInfos(ReadonlyConfig reado catalogTable.getTablePath(), new SourceTableInfo( catalogTable, - subReadonlyConfig.get(PARTITION_SPEC), + partitionSpec, subReadonlyConfig.get(SPLIT_ROW))); } else { Integer splitRow = @@ -120,7 +123,7 @@ private Map getSourceTableInfos(ReadonlyConfig reado new SourceTableInfo( catalog.getTable( tablePath, subReadonlyConfig.get(READ_COLUMNS)), - subReadonlyConfig.get(PARTITION_SPEC), + partitionSpec, splitRow)); } } From c1b2511613ae654701de1130658450e68365025d Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Tue, 4 Mar 2025 17:22:12 +0800 Subject: [PATCH 2/2] update --- .../seatunnel/maxcompute/source/MaxcomputeSource.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java index cb0349dfb8a..8b6a8d9676a 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java @@ -100,7 +100,9 @@ private Map getSourceTableInfos(ReadonlyConfig reado .getOptional(PARTITION_SPEC) .orElse(readonlyConfig.get(PARTITION_SPEC)); - if (subReadonlyConfig.getOptional(ConnectorCommonOptions.SCHEMA).isPresent()) { + if (subReadonlyConfig + .getOptional(ConnectorCommonOptions.SCHEMA) + .isPresent()) { CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(subReadonlyConfig); catalogTable =