From a5ccb92fc5e2767c55312716a3a1284877252ff5 Mon Sep 17 00:00:00 2001 From: North Lin <240383347@qq.com> Date: Thu, 13 Jun 2024 18:04:02 +0800 Subject: [PATCH 1/2] [fix] Fix garbled of table or column comments contain Chinese characters(#401) --- .../apache/doris/flink/cfg/DorisOptions.java | 41 +++++++++++-- .../sink/schema/SchemaChangeManager.java | 13 ++++- .../sink/schema/SchemaManagerITCase.java | 57 +++++++++++++++++++ .../flink/sink/schema/SchemaManagerTest.java | 6 ++ 4 files changed, 110 insertions(+), 7 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java index bf6c7a28c..4a8787ac0 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java @@ -27,6 +27,7 @@ public class DorisOptions extends DorisConnectionOptions { private static final long serialVersionUID = 1L; private String tableIdentifier; + private String charsetEncoding = "UTF-8"; public DorisOptions(String fenodes, String username, String password, String tableIdentifier) { super(fenodes, username, password); @@ -50,9 +51,11 @@ public DorisOptions( String password, String tableIdentifier, String jdbcUrl, - boolean redirect) { + boolean redirect, + String charsetEncoding) { super(fenodes, beNodes, username, password, jdbcUrl, redirect); this.tableIdentifier = tableIdentifier; + this.charsetEncoding = charsetEncoding; } public String getTableIdentifier() { @@ -63,6 +66,14 @@ public void setTableIdentifier(String tableIdentifier) { this.tableIdentifier = tableIdentifier; } + public String getCharsetEncoding() { + return charsetEncoding; + } + + public void setCharsetEncoding(String charsetEncoding) { + this.charsetEncoding = charsetEncoding; + } + public static Builder builder() { return new Builder(); } @@ -82,13 +93,21 @@ public boolean equals(Object o) { && Objects.equals(username, that.username) && Objects.equals(password, that.password) && Objects.equals(jdbcUrl, that.jdbcUrl) - && Objects.equals(benodes, that.benodes); + && Objects.equals(benodes, that.benodes) + && Objects.equals(charsetEncoding, that.charsetEncoding); } @Override public int hashCode() { return Objects.hash( - fenodes, username, password, jdbcUrl, benodes, autoRedirect, tableIdentifier); + fenodes, + username, + password, + jdbcUrl, + benodes, + autoRedirect, + tableIdentifier, + charsetEncoding); } /** Builder of {@link DorisOptions}. */ @@ -100,6 +119,7 @@ public static class Builder { private String password; private boolean autoRedirect = true; private String tableIdentifier; + private String charsetEncoding = "UTF-8"; /** required, tableIdentifier. */ public Builder setTableIdentifier(String tableIdentifier) { @@ -142,12 +162,25 @@ public Builder setAutoRedirect(boolean autoRedirect) { return this; } + /** optional, Charset encoding for Http client, default UTF-8. */ + public Builder setCharsetEncoding(String charsetEncoding) { + this.charsetEncoding = charsetEncoding; + return this; + } + public DorisOptions build() { checkNotNull(fenodes, "No fenodes supplied."); // multi table load, don't need check // checkNotNull(tableIdentifier, "No tableIdentifier supplied."); return new DorisOptions( - fenodes, benodes, username, password, tableIdentifier, jdbcUrl, autoRedirect); + fenodes, + benodes, + username, + password, + tableIdentifier, + jdbcUrl, + autoRedirect, + charsetEncoding); } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java index 2aca3c7bd..7ab7ca772 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java @@ -133,7 +133,10 @@ public boolean checkSchemaChange(String database, String table, Map responseMap = handleResponse(httpGet, responseEntity); return handleSchemaChange(responseMap, responseEntity); @@ -173,8 +176,12 @@ public HttpPost buildHttpPost(String ddl, String database) database); HttpPost httpPost = new HttpPost(requestUrl); httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); - httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json"); - httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(param))); + httpPost.setHeader( + HttpHeaders.CONTENT_TYPE, + String.format("application/json;charset=%s", dorisOptions.getCharsetEncoding())); + httpPost.setEntity( + new StringEntity( + objectMapper.writeValueAsString(param), dorisOptions.getCharsetEncoding())); return httpPost; } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java index 053cf65c4..d24d0d85e 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java @@ -28,8 +28,11 @@ import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; public class SchemaManagerITCase extends DorisTestBase { @@ -82,6 +85,60 @@ public void testAddColumn() throws SQLException, IOException, IllegalArgumentExc Assert.assertTrue(exists); } + @Test + public void testAddColumnWithChineseComment() + throws SQLException, IOException, IllegalArgumentException { + String addColumnTbls = "add_column"; + initDorisSchemaChangeTable(addColumnTbls); + + // add a column by UTF-8 encoding + String addColumnName = "col_with_comment1"; + String chineseComment = "中文注释1"; + addColumnWithChineseCommentAndAssert(addColumnTbls, addColumnName, chineseComment, true); + + // change charset encoding to US-ASCII would cause garbled of Chinese. + options.setCharsetEncoding("US-ASCII"); + addColumnName = "col_with_comment2"; + chineseComment = "中文注释2"; + addColumnWithChineseCommentAndAssert(addColumnTbls, addColumnName, chineseComment, false); + } + + private void addColumnWithChineseCommentAndAssert( + String tableName, String addColumnName, String chineseComment, boolean assertFlag) + throws SQLException, IOException, IllegalArgumentException { + FieldSchema field = new FieldSchema(addColumnName, "string", chineseComment); + schemaChangeManager.addColumn(DATABASE, tableName, field); + boolean exists = schemaChangeManager.addColumn(DATABASE, tableName, field); + Assert.assertTrue(exists); + + exists = schemaChangeManager.checkColumnExists(DATABASE, tableName, addColumnName); + Assert.assertTrue(exists); + + // check Chinese comment + Map columnComments = getColumnComments(tableName); + if (assertFlag) { + Assert.assertEquals(columnComments.get(addColumnName), chineseComment); + } else { + Assert.assertNotEquals(columnComments.get(addColumnName), chineseComment); + } + } + + private Map getColumnComments(String table) throws SQLException { + Map columnCommentsMap = new HashMap<>(); + try (Connection connection = + DriverManager.getConnection( + String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD)) { + ResultSet columns = connection.getMetaData().getColumns(null, DATABASE, table, null); + + while (columns.next()) { + String columnName = columns.getString("COLUMN_NAME"); + String comment = columns.getString("REMARKS"); + columnCommentsMap.put(columnName, comment); + } + } + return columnCommentsMap; + } + @Test public void testDropColumn() throws SQLException, IOException, IllegalArgumentException { String dropColumnTbls = "drop_column"; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java index 529cc860b..16c901e52 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java @@ -132,6 +132,12 @@ public void testAddColumn() { Assert.assertEquals( "ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int DEFAULT current_timestamp COMMENT 'comment \"\\'sdf\\''", addColumnDDL); + + field = new FieldSchema("col", "int", "current_timestamp", "中文注释"); + addColumnDDL = SchemaChangeHelper.buildAddColumnDDL("test.test_flink", field); + Assert.assertEquals( + "ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int DEFAULT current_timestamp COMMENT '中文注释'", + addColumnDDL); } @Test From e9585a6dcb40e1a03ae26baf4b685b52eb320f72 Mon Sep 17 00:00:00 2001 From: North Lin <240383347@qq.com> Date: Mon, 17 Jun 2024 19:18:47 +0800 Subject: [PATCH 2/2] [fix] Fix garbled of table or column comments contain Chinese characters(#401) Change charsetEncoding parameter into the constructor of SchemaChangeManager. --- .../apache/doris/flink/cfg/DorisOptions.java | 41 ++----------------- .../sink/schema/SchemaChangeManager.java | 15 ++++--- .../sink/schema/SchemaManagerITCase.java | 2 +- 3 files changed, 14 insertions(+), 44 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java index 4a8787ac0..bf6c7a28c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java @@ -27,7 +27,6 @@ public class DorisOptions extends DorisConnectionOptions { private static final long serialVersionUID = 1L; private String tableIdentifier; - private String charsetEncoding = "UTF-8"; public DorisOptions(String fenodes, String username, String password, String tableIdentifier) { super(fenodes, username, password); @@ -51,11 +50,9 @@ public DorisOptions( String password, String tableIdentifier, String jdbcUrl, - boolean redirect, - String charsetEncoding) { + boolean redirect) { super(fenodes, beNodes, username, password, jdbcUrl, redirect); this.tableIdentifier = tableIdentifier; - this.charsetEncoding = charsetEncoding; } public String getTableIdentifier() { @@ -66,14 +63,6 @@ public void setTableIdentifier(String tableIdentifier) { this.tableIdentifier = tableIdentifier; } - public String getCharsetEncoding() { - return charsetEncoding; - } - - public void setCharsetEncoding(String charsetEncoding) { - this.charsetEncoding = charsetEncoding; - } - public static Builder builder() { return new Builder(); } @@ -93,21 +82,13 @@ public boolean equals(Object o) { && Objects.equals(username, that.username) && Objects.equals(password, that.password) && Objects.equals(jdbcUrl, that.jdbcUrl) - && Objects.equals(benodes, that.benodes) - && Objects.equals(charsetEncoding, that.charsetEncoding); + && Objects.equals(benodes, that.benodes); } @Override public int hashCode() { return Objects.hash( - fenodes, - username, - password, - jdbcUrl, - benodes, - autoRedirect, - tableIdentifier, - charsetEncoding); + fenodes, username, password, jdbcUrl, benodes, autoRedirect, tableIdentifier); } /** Builder of {@link DorisOptions}. */ @@ -119,7 +100,6 @@ public static class Builder { private String password; private boolean autoRedirect = true; private String tableIdentifier; - private String charsetEncoding = "UTF-8"; /** required, tableIdentifier. */ public Builder setTableIdentifier(String tableIdentifier) { @@ -162,25 +142,12 @@ public Builder setAutoRedirect(boolean autoRedirect) { return this; } - /** optional, Charset encoding for Http client, default UTF-8. */ - public Builder setCharsetEncoding(String charsetEncoding) { - this.charsetEncoding = charsetEncoding; - return this; - } - public DorisOptions build() { checkNotNull(fenodes, "No fenodes supplied."); // multi table load, don't need check // checkNotNull(tableIdentifier, "No tableIdentifier supplied."); return new DorisOptions( - fenodes, - benodes, - username, - password, - tableIdentifier, - jdbcUrl, - autoRedirect, - charsetEncoding); + fenodes, benodes, username, password, tableIdentifier, jdbcUrl, autoRedirect); } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java index 7ab7ca772..d2bacf263 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java @@ -56,11 +56,17 @@ public class SchemaChangeManager implements Serializable { private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s"; private ObjectMapper objectMapper = new ObjectMapper(); private DorisOptions dorisOptions; + private String charsetEncoding = "UTF-8"; public SchemaChangeManager(DorisOptions dorisOptions) { this.dorisOptions = dorisOptions; } + public SchemaChangeManager(DorisOptions dorisOptions, String charsetEncoding) { + this.dorisOptions = dorisOptions; + this.charsetEncoding = charsetEncoding; + } + public boolean createTable(TableSchema table) throws IOException, IllegalArgumentException { String createTableDDL = DorisSystem.buildCreateTableDDL(table); return execute(createTableDDL, table.getDatabase()); @@ -134,9 +140,7 @@ public boolean checkSchemaChange(String database, String table, Map responseMap = handleResponse(httpGet, responseEntity); return handleSchemaChange(responseMap, responseEntity); @@ -178,10 +182,9 @@ public HttpPost buildHttpPost(String ddl, String database) httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); httpPost.setHeader( HttpHeaders.CONTENT_TYPE, - String.format("application/json;charset=%s", dorisOptions.getCharsetEncoding())); + String.format("application/json;charset=%s", charsetEncoding)); httpPost.setEntity( - new StringEntity( - objectMapper.writeValueAsString(param), dorisOptions.getCharsetEncoding())); + new StringEntity(objectMapper.writeValueAsString(param), charsetEncoding)); return httpPost; } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java index d24d0d85e..8d2a9b0d3 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java @@ -97,7 +97,7 @@ public void testAddColumnWithChineseComment() addColumnWithChineseCommentAndAssert(addColumnTbls, addColumnName, chineseComment, true); // change charset encoding to US-ASCII would cause garbled of Chinese. - options.setCharsetEncoding("US-ASCII"); + schemaChangeManager = new SchemaChangeManager(options, "US-ASCII"); addColumnName = "col_with_comment2"; chineseComment = "中文注释2"; addColumnWithChineseCommentAndAssert(addColumnTbls, addColumnName, chineseComment, false);