Skip to content

Commit

Permalink
[improve](testcase) Increase case coverage (apache#392)
Browse files Browse the repository at this point in the history
(cherry picked from commit f5340ef)
  • Loading branch information
JNSimba authored and PeatBoy committed Jan 21, 2025
1 parent cf6665e commit 3626557
Show file tree
Hide file tree
Showing 57 changed files with 2,078 additions and 563 deletions.
7 changes: 7 additions & 0 deletions flink-doris-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,13 @@ under the License.
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,4 +559,14 @@ public void alterPartitionColumnStatistics(
throws PartitionNotExistException, CatalogException {
throw new UnsupportedOperationException();
}

@VisibleForTesting
public DorisConnectionOptions getConnectionOptions() {
return connectionOptions;
}

@VisibleForTesting
public Map<String, String> getProperties() {
return properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Set;

import static org.apache.doris.flink.catalog.DorisCatalogOptions.DEFAULT_DATABASE;
import static org.apache.doris.flink.catalog.DorisCatalogOptions.TABLE_PROPERTIES_PREFIX;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_BATCH_SIZE;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_ARROW_ASYNC;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_QUEUE_SIZE;
Expand Down Expand Up @@ -112,7 +113,7 @@ public Set<ConfigOption<?>> optionalOptions() {
public Catalog createCatalog(Context context) {
final FactoryUtil.CatalogFactoryHelper helper =
FactoryUtil.createCatalogFactoryHelper(this, context);
helper.validateExcept(STREAM_LOAD_PROP_PREFIX);
helper.validateExcept(STREAM_LOAD_PROP_PREFIX, TABLE_PROPERTIES_PREFIX);

DorisConnectionOptions connectionOptions =
new DorisConnectionOptions.DorisConnectionOptionsBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,23 +85,6 @@ public boolean tableExists(String database, String table) {
return databaseExists(database) && listTables(database).contains(table);
}

public boolean columnExists(String database, String table, String columnName) {
if (tableExists(database, table)) {
List<String> columns =
extractColumnValuesBySQL(
"SELECT COLUMN_NAME FROM information_schema.`COLUMNS` WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND COLUMN_NAME = ?",
1,
null,
database,
table,
columnName);
if (columns != null && !columns.isEmpty()) {
return true;
}
}
return false;
}

public List<String> listTables(String databaseName) {
if (!databaseExists(databaseName)) {
throw new DorisRuntimeException("database" + databaseName + " is not exists");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.util.Preconditions;

import java.io.Serializable;
import java.util.Objects;

/** Doris connection options. */
public class DorisConnectionOptions implements Serializable {
Expand Down Expand Up @@ -86,6 +87,28 @@ public boolean isAutoRedirect() {
return autoRedirect;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DorisConnectionOptions that = (DorisConnectionOptions) o;
return autoRedirect == that.autoRedirect
&& Objects.equals(fenodes, that.fenodes)
&& Objects.equals(username, that.username)
&& Objects.equals(password, that.password)
&& Objects.equals(jdbcUrl, that.jdbcUrl)
&& Objects.equals(benodes, that.benodes);
}

@Override
public int hashCode() {
return Objects.hash(fenodes, username, password, jdbcUrl, benodes, autoRedirect);
}

/** Builder for {@link DorisConnectionOptions}. */
public static class DorisConnectionOptionsBuilder {
private String fenodes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.flink.sink.writer.WriteMode;

import java.io.Serializable;
import java.util.Objects;
import java.util.Properties;

import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
Expand Down Expand Up @@ -213,6 +214,58 @@ public boolean ignoreCommitError() {
return ignoreCommitError;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DorisExecutionOptions that = (DorisExecutionOptions) o;
return checkInterval == that.checkInterval
&& maxRetries == that.maxRetries
&& bufferSize == that.bufferSize
&& bufferCount == that.bufferCount
&& useCache == that.useCache
&& force2PC == that.force2PC
&& flushQueueSize == that.flushQueueSize
&& bufferFlushMaxRows == that.bufferFlushMaxRows
&& bufferFlushMaxBytes == that.bufferFlushMaxBytes
&& bufferFlushIntervalMs == that.bufferFlushIntervalMs
&& enableBatchMode == that.enableBatchMode
&& ignoreUpdateBefore == that.ignoreUpdateBefore
&& ignoreCommitError == that.ignoreCommitError
&& Objects.equals(labelPrefix, that.labelPrefix)
&& Objects.equals(streamLoadProp, that.streamLoadProp)
&& Objects.equals(enableDelete, that.enableDelete)
&& Objects.equals(enable2PC, that.enable2PC)
&& writeMode == that.writeMode;
}

@Override
public int hashCode() {
return Objects.hash(
checkInterval,
maxRetries,
bufferSize,
bufferCount,
labelPrefix,
useCache,
streamLoadProp,
enableDelete,
enable2PC,
force2PC,
flushQueueSize,
bufferFlushMaxRows,
bufferFlushMaxBytes,
bufferFlushIntervalMs,
enableBatchMode,
ignoreUpdateBefore,
writeMode,
ignoreCommitError);
}

/** Builder of {@link DorisExecutionOptions}. */
public static class Builder {
private int checkInterval = DEFAULT_CHECK_INTERVAL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.flink.cfg;

import java.io.Serializable;
import java.util.Objects;

public class DorisLookupOptions implements Serializable {

Expand Down Expand Up @@ -75,6 +76,36 @@ public boolean isAsync() {
return async;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DorisLookupOptions that = (DorisLookupOptions) o;
return cacheMaxSize == that.cacheMaxSize
&& cacheExpireMs == that.cacheExpireMs
&& maxRetryTimes == that.maxRetryTimes
&& jdbcReadBatchSize == that.jdbcReadBatchSize
&& jdbcReadBatchQueueSize == that.jdbcReadBatchQueueSize
&& jdbcReadThreadSize == that.jdbcReadThreadSize
&& async == that.async;
}

@Override
public int hashCode() {
return Objects.hash(
cacheMaxSize,
cacheExpireMs,
maxRetryTimes,
jdbcReadBatchSize,
jdbcReadBatchQueueSize,
jdbcReadThreadSize,
async);
}

public static Builder builder() {
return new Builder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

package org.apache.doris.flink.cfg;

import org.apache.doris.flink.util.IOUtils;

import java.util.Properties;
import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -65,15 +63,34 @@ public void setTableIdentifier(String tableIdentifier) {
this.tableIdentifier = tableIdentifier;
}

public String save() throws IllegalArgumentException {
Properties copy = new Properties();
return IOUtils.propsToString(copy);
}

public static Builder builder() {
return new Builder();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DorisOptions that = (DorisOptions) o;
return Objects.equals(tableIdentifier, that.tableIdentifier)
&& autoRedirect == that.autoRedirect
&& Objects.equals(fenodes, that.fenodes)
&& Objects.equals(username, that.username)
&& Objects.equals(password, that.password)
&& Objects.equals(jdbcUrl, that.jdbcUrl)
&& Objects.equals(benodes, that.benodes);
}

@Override
public int hashCode() {
return Objects.hash(
fenodes, username, password, jdbcUrl, benodes, autoRedirect, tableIdentifier);
}

/** Builder of {@link DorisOptions}. */
public static class Builder {
private String fenodes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.flink.cfg;

import java.io.Serializable;
import java.util.Objects;

/** Doris read Options. */
public class DorisReadOptions implements Serializable {
Expand Down Expand Up @@ -128,6 +129,46 @@ public static DorisReadOptions defaults() {
return DorisReadOptions.builder().build();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DorisReadOptions that = (DorisReadOptions) o;
return useOldApi == that.useOldApi
&& Objects.equals(readFields, that.readFields)
&& Objects.equals(filterQuery, that.filterQuery)
&& Objects.equals(requestTabletSize, that.requestTabletSize)
&& Objects.equals(requestConnectTimeoutMs, that.requestConnectTimeoutMs)
&& Objects.equals(requestReadTimeoutMs, that.requestReadTimeoutMs)
&& Objects.equals(requestQueryTimeoutS, that.requestQueryTimeoutS)
&& Objects.equals(requestRetries, that.requestRetries)
&& Objects.equals(requestBatchSize, that.requestBatchSize)
&& Objects.equals(execMemLimit, that.execMemLimit)
&& Objects.equals(deserializeQueueSize, that.deserializeQueueSize)
&& Objects.equals(deserializeArrowAsync, that.deserializeArrowAsync);
}

@Override
public int hashCode() {
return Objects.hash(
readFields,
filterQuery,
requestTabletSize,
requestConnectTimeoutMs,
requestReadTimeoutMs,
requestQueryTimeoutS,
requestRetries,
requestBatchSize,
execMemLimit,
deserializeQueueSize,
deserializeArrowAsync,
useOldApi);
}

/** Builder of {@link DorisReadOptions}. */
public static class Builder {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,4 @@ public CreateTableException() {
public CreateTableException(String message) {
super(message);
}

public CreateTableException(String message, Throwable cause) {
super(message, cause);
}

public CreateTableException(Throwable cause) {
super(cause);
}

protected CreateTableException(
String message,
Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,4 @@ public DorisBatchLoadException(String message, Throwable cause) {
public DorisBatchLoadException(Throwable cause) {
super(cause);
}

protected DorisBatchLoadException(
String message,
Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,4 @@ public DorisSystemException(String message) {
public DorisSystemException(String message, Throwable cause) {
super(message, cause);
}

public DorisSystemException(Throwable cause) {
super(cause);
}

protected DorisSystemException(
String message,
Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,4 @@ public StreamLoadException() {
public StreamLoadException(String message) {
super(message);
}

public StreamLoadException(String message, Throwable cause) {
super(message, cause);
}

public StreamLoadException(Throwable cause) {
super(cause);
}

protected StreamLoadException(
String message,
Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Loading

0 comments on commit 3626557

Please sign in to comment.