Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve](testcase) Increase case coverage #392

Merged
merged 5 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions flink-doris-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,13 @@ under the License.
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,4 +559,14 @@ public void alterPartitionColumnStatistics(
throws PartitionNotExistException, CatalogException {
throw new UnsupportedOperationException();
}

@VisibleForTesting
public DorisConnectionOptions getConnectionOptions() {
return connectionOptions;
}

@VisibleForTesting
public Map<String, String> getProperties() {
return properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Set;

import static org.apache.doris.flink.catalog.DorisCatalogOptions.DEFAULT_DATABASE;
import static org.apache.doris.flink.catalog.DorisCatalogOptions.TABLE_PROPERTIES_PREFIX;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_BATCH_SIZE;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_ARROW_ASYNC;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_QUEUE_SIZE;
Expand Down Expand Up @@ -112,7 +113,7 @@ public Set<ConfigOption<?>> optionalOptions() {
public Catalog createCatalog(Context context) {
final FactoryUtil.CatalogFactoryHelper helper =
FactoryUtil.createCatalogFactoryHelper(this, context);
helper.validateExcept(STREAM_LOAD_PROP_PREFIX);
helper.validateExcept(STREAM_LOAD_PROP_PREFIX, TABLE_PROPERTIES_PREFIX);

DorisConnectionOptions connectionOptions =
new DorisConnectionOptions.DorisConnectionOptionsBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,23 +85,6 @@ public boolean tableExists(String database, String table) {
return databaseExists(database) && listTables(database).contains(table);
}

public boolean columnExists(String database, String table, String columnName) {
if (tableExists(database, table)) {
List<String> columns =
extractColumnValuesBySQL(
"SELECT COLUMN_NAME FROM information_schema.`COLUMNS` WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND COLUMN_NAME = ?",
1,
null,
database,
table,
columnName);
if (columns != null && !columns.isEmpty()) {
return true;
}
}
return false;
}

public List<String> listTables(String databaseName) {
if (!databaseExists(databaseName)) {
throw new DorisRuntimeException("database" + databaseName + " is not exists");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.util.Preconditions;

import java.io.Serializable;
import java.util.Objects;

/** Doris connection options. */
public class DorisConnectionOptions implements Serializable {
Expand Down Expand Up @@ -86,6 +87,28 @@ public boolean isAutoRedirect() {
return autoRedirect;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DorisConnectionOptions that = (DorisConnectionOptions) o;
return autoRedirect == that.autoRedirect
&& Objects.equals(fenodes, that.fenodes)
&& Objects.equals(username, that.username)
&& Objects.equals(password, that.password)
&& Objects.equals(jdbcUrl, that.jdbcUrl)
&& Objects.equals(benodes, that.benodes);
}

@Override
public int hashCode() {
return Objects.hash(fenodes, username, password, jdbcUrl, benodes, autoRedirect);
}

/** Builder for {@link DorisConnectionOptions}. */
public static class DorisConnectionOptionsBuilder {
private String fenodes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.flink.sink.writer.WriteMode;

import java.io.Serializable;
import java.util.Objects;
import java.util.Properties;

import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
Expand Down Expand Up @@ -213,6 +214,58 @@ public boolean ignoreCommitError() {
return ignoreCommitError;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DorisExecutionOptions that = (DorisExecutionOptions) o;
return checkInterval == that.checkInterval
&& maxRetries == that.maxRetries
&& bufferSize == that.bufferSize
&& bufferCount == that.bufferCount
&& useCache == that.useCache
&& force2PC == that.force2PC
&& flushQueueSize == that.flushQueueSize
&& bufferFlushMaxRows == that.bufferFlushMaxRows
&& bufferFlushMaxBytes == that.bufferFlushMaxBytes
&& bufferFlushIntervalMs == that.bufferFlushIntervalMs
&& enableBatchMode == that.enableBatchMode
&& ignoreUpdateBefore == that.ignoreUpdateBefore
&& ignoreCommitError == that.ignoreCommitError
&& Objects.equals(labelPrefix, that.labelPrefix)
&& Objects.equals(streamLoadProp, that.streamLoadProp)
&& Objects.equals(enableDelete, that.enableDelete)
&& Objects.equals(enable2PC, that.enable2PC)
&& writeMode == that.writeMode;
}

@Override
public int hashCode() {
return Objects.hash(
checkInterval,
maxRetries,
bufferSize,
bufferCount,
labelPrefix,
useCache,
streamLoadProp,
enableDelete,
enable2PC,
force2PC,
flushQueueSize,
bufferFlushMaxRows,
bufferFlushMaxBytes,
bufferFlushIntervalMs,
enableBatchMode,
ignoreUpdateBefore,
writeMode,
ignoreCommitError);
}

/** Builder of {@link DorisExecutionOptions}. */
public static class Builder {
private int checkInterval = DEFAULT_CHECK_INTERVAL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.flink.cfg;

import java.io.Serializable;
import java.util.Objects;

public class DorisLookupOptions implements Serializable {

Expand Down Expand Up @@ -75,6 +76,36 @@ public boolean isAsync() {
return async;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DorisLookupOptions that = (DorisLookupOptions) o;
return cacheMaxSize == that.cacheMaxSize
&& cacheExpireMs == that.cacheExpireMs
&& maxRetryTimes == that.maxRetryTimes
&& jdbcReadBatchSize == that.jdbcReadBatchSize
&& jdbcReadBatchQueueSize == that.jdbcReadBatchQueueSize
&& jdbcReadThreadSize == that.jdbcReadThreadSize
&& async == that.async;
}

@Override
public int hashCode() {
return Objects.hash(
cacheMaxSize,
cacheExpireMs,
maxRetryTimes,
jdbcReadBatchSize,
jdbcReadBatchQueueSize,
jdbcReadThreadSize,
async);
}

public static Builder builder() {
return new Builder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

package org.apache.doris.flink.cfg;

import org.apache.doris.flink.util.IOUtils;

import java.util.Properties;
import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -65,15 +63,34 @@ public void setTableIdentifier(String tableIdentifier) {
this.tableIdentifier = tableIdentifier;
}

public String save() throws IllegalArgumentException {
Properties copy = new Properties();
return IOUtils.propsToString(copy);
}

public static Builder builder() {
return new Builder();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DorisOptions that = (DorisOptions) o;
return Objects.equals(tableIdentifier, that.tableIdentifier)
&& autoRedirect == that.autoRedirect
&& Objects.equals(fenodes, that.fenodes)
&& Objects.equals(username, that.username)
&& Objects.equals(password, that.password)
&& Objects.equals(jdbcUrl, that.jdbcUrl)
&& Objects.equals(benodes, that.benodes);
}

@Override
public int hashCode() {
return Objects.hash(
fenodes, username, password, jdbcUrl, benodes, autoRedirect, tableIdentifier);
}

/** Builder of {@link DorisOptions}. */
public static class Builder {
private String fenodes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.flink.cfg;

import java.io.Serializable;
import java.util.Objects;

/** Doris read Options. */
public class DorisReadOptions implements Serializable {
Expand Down Expand Up @@ -128,6 +129,46 @@ public static DorisReadOptions defaults() {
return DorisReadOptions.builder().build();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DorisReadOptions that = (DorisReadOptions) o;
return useOldApi == that.useOldApi
&& Objects.equals(readFields, that.readFields)
&& Objects.equals(filterQuery, that.filterQuery)
&& Objects.equals(requestTabletSize, that.requestTabletSize)
&& Objects.equals(requestConnectTimeoutMs, that.requestConnectTimeoutMs)
&& Objects.equals(requestReadTimeoutMs, that.requestReadTimeoutMs)
&& Objects.equals(requestQueryTimeoutS, that.requestQueryTimeoutS)
&& Objects.equals(requestRetries, that.requestRetries)
&& Objects.equals(requestBatchSize, that.requestBatchSize)
&& Objects.equals(execMemLimit, that.execMemLimit)
&& Objects.equals(deserializeQueueSize, that.deserializeQueueSize)
&& Objects.equals(deserializeArrowAsync, that.deserializeArrowAsync);
}

@Override
public int hashCode() {
return Objects.hash(
readFields,
filterQuery,
requestTabletSize,
requestConnectTimeoutMs,
requestReadTimeoutMs,
requestQueryTimeoutS,
requestRetries,
requestBatchSize,
execMemLimit,
deserializeQueueSize,
deserializeArrowAsync,
useOldApi);
}

/** Builder of {@link DorisReadOptions}. */
public static class Builder {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,4 @@ public CreateTableException() {
public CreateTableException(String message) {
super(message);
}

public CreateTableException(String message, Throwable cause) {
super(message, cause);
}

public CreateTableException(Throwable cause) {
super(cause);
}

protected CreateTableException(
String message,
Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,4 @@ public DorisBatchLoadException(String message, Throwable cause) {
public DorisBatchLoadException(Throwable cause) {
super(cause);
}

protected DorisBatchLoadException(
String message,
Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,4 @@ public DorisSystemException(String message) {
public DorisSystemException(String message, Throwable cause) {
super(message, cause);
}

public DorisSystemException(Throwable cause) {
super(cause);
}

protected DorisSystemException(
String message,
Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,4 @@ public StreamLoadException() {
public StreamLoadException(String message) {
super(message);
}

public StreamLoadException(String message, Throwable cause) {
super(message, cause);
}

public StreamLoadException(Throwable cause) {
super(cause);
}

protected StreamLoadException(
String message,
Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Loading
Loading