diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java index a8a44dd4..b88f8a92 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.java @@ -14,12 +14,11 @@ package com.starrocks.connector.flink.table.source; +import com.google.common.base.Strings; +import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo; import com.starrocks.connector.flink.table.source.struct.QueryBeXTablets; import com.starrocks.connector.flink.table.source.struct.QueryInfo; import com.starrocks.connector.flink.table.source.struct.SelectColumn; -import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo; - -import com.google.common.base.Strings; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; @@ -28,13 +27,18 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; public class StarRocksDynamicSourceFunction extends RichParallelSourceFunction implements ResultTypeQueryable { + private static final Logger LOG = LoggerFactory.getLogger(StarRocksDynamicSourceFunction.class); + private final StarRocksSourceOptions sourceOptions; private QueryInfo queryInfo; private Long dataCount; @@ -45,6 +49,7 @@ public class StarRocksDynamicSourceFunction extends RichParallelSourceFunction sourceContext) { @Override public void cancel() { - this.dataReaderList.parallelStream().forEach(dataReader -> { - if (dataReader != null) { - dataReader.close(); - } - }); + internalClose(); + } + + @Override + public void close() { + internalClose(); + } + + private void internalClose() { + if (dataReaderClosed.compareAndSet(false, true)) { + LOG.info("Close readers"); + this.dataReaderList.parallelStream().forEach(dataReader -> { + if (dataReader != null) { + dataReader.close(); + } + }); + } } @Override diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceBeReader.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceBeReader.java index 236fbcdd..34adb80b 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceBeReader.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceBeReader.java @@ -15,10 +15,10 @@ package com.starrocks.connector.flink.table.source; import com.starrocks.connector.flink.row.source.StarRocksSourceFlinkRows; +import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo; import com.starrocks.connector.flink.table.source.struct.Const; import com.starrocks.connector.flink.table.source.struct.SelectColumn; import com.starrocks.connector.flink.table.source.struct.StarRocksSchema; -import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo; import com.starrocks.shade.org.apache.thrift.TException; import com.starrocks.shade.org.apache.thrift.protocol.TBinaryProtocol; import com.starrocks.shade.org.apache.thrift.protocol.TProtocol; @@ -31,7 +31,6 @@ import com.starrocks.thrift.TScanOpenResult; import com.starrocks.thrift.TStarrocksExternalService; import com.starrocks.thrift.TStatusCode; - import org.apache.flink.table.data.GenericRowData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,9 +75,9 @@ public StarRocksSourceBeReader(String beNodeInfo, throw new RuntimeException("Not find be node info from the be port mappping list"); } beNodeInfo = mappingMap.get(beNodeInfo); - LOG.info("query data from be by using be-hostname"); + LOG.info("query data from be by using be-hostname {}", beNodeInfo); } else { - LOG.info("query data from be by using be-ip"); + LOG.info("query data from be by using be-ip {}", beNodeInfo); } String[] beNode = beNodeInfo.split(":"); String ip = beNode[0].trim(); @@ -113,7 +112,8 @@ public void openScanner(List tablets, String opaqued_query_plan, StarRocks params.setProperties(sourceOptions.getProperties()); } // params.setLimit(sourceOptions.getLimit()); - params.setKeep_alive_min((short) sourceOptions.getKeepAliveMin()); + short keepAliveMin = (short) Math.min(Short.MAX_VALUE, sourceOptions.getKeepAliveMin()); + params.setKeep_alive_min(keepAliveMin); params.setQuery_timeout(sourceOptions.getQueryTimeout()); params.setMem_limit(sourceOptions.getMemLimit()); LOG.info("open Scan params.mem_limit {} B", params.getMem_limit()); @@ -133,6 +133,8 @@ public void openScanner(List tablets, String opaqued_query_plan, StarRocks } this.srSchema = StarRocksSchema.genSchema(result.getSelected_columns()); this.contextId = result.getContext_id(); + LOG.info("Open scanner for {}:{} with context id {}, and there are {} tablets {}", + IP, PORT, contextId, tablets.size(), tablets); } public void startToRead() { @@ -189,11 +191,13 @@ private void handleResult(TScanBatchResult result) { @Override public void close() { + LOG.info("Close reader for {}:{} with context id {}", IP, PORT, contextId); TScanCloseParams tScanCloseParams = new TScanCloseParams(); tScanCloseParams.setContext_id(this.contextId); try { this.client.close_scanner(tScanCloseParams); } catch (TException e) { + LOG.error("Failed to close reader {}:{} with context id {}", IP, PORT, contextId, e); throw new RuntimeException(e.getMessage()); } }