From 9a4464bf63ca1af5ca82d95e0ab6d2e136de49f6 Mon Sep 17 00:00:00 2001 From: gnehil Date: Tue, 18 Jul 2023 14:33:37 +0800 Subject: [PATCH] [improvement] Escaping special characters (#118) --- .../doris/spark/load/DorisStreamLoad.java | 6 ++- .../doris/spark/util/EscapeHandler.java | 40 +++++++++++++++++++ .../doris/spark/util/EscapeHandlerTest.java | 36 +++++++++++++++++ 3 files changed, 80 insertions(+), 2 deletions(-) create mode 100644 spark-doris-connector/src/main/java/org/apache/doris/spark/util/EscapeHandler.java create mode 100644 spark-doris-connector/src/test/java/org/apache/doris/spark/util/EscapeHandlerTest.java diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index 61379e36..07e6624b 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -14,6 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + package org.apache.doris.spark.load; import org.apache.doris.spark.cfg.ConfigurationOptions; @@ -22,6 +23,7 @@ import org.apache.doris.spark.rest.RestService; import org.apache.doris.spark.rest.models.BackendV2; import org.apache.doris.spark.rest.models.RespContent; +import org.apache.doris.spark.util.EscapeHandler; import org.apache.doris.spark.util.ListUtils; import com.fasterxml.jackson.core.JsonProcessingException; @@ -102,9 +104,9 @@ public DorisStreamLoad(SparkSettings settings) { .build(new BackendCacheLoader(settings)); fileType = streamLoadProp.getOrDefault("format", "csv"); if ("csv".equals(fileType)){ - FIELD_DELIMITER = streamLoadProp.getOrDefault("column_separator", "\t"); - LINE_DELIMITER = streamLoadProp.getOrDefault("line_delimiter", "\n"); + FIELD_DELIMITER = EscapeHandler.escapeString(streamLoadProp.getOrDefault("column_separator", "\t")); } + LINE_DELIMITER = EscapeHandler.escapeString(streamLoadProp.getOrDefault("line_delimiter", "\n")); } public String getLoadUrlStr() { diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/EscapeHandler.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/EscapeHandler.java new file mode 100644 index 00000000..87a39896 --- /dev/null +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/EscapeHandler.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.util; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class EscapeHandler { + public static final String ESCAPE_DELIMITERS_FLAGS = "\\x"; + public static final Pattern ESCAPE_PATTERN = Pattern.compile("\\\\x([0-9|a-f|A-F]{2})"); + + public static String escapeString(String source) { + if (source.contains(ESCAPE_DELIMITERS_FLAGS)) { + Matcher m = ESCAPE_PATTERN.matcher(source); + StringBuffer buf = new StringBuffer(); + while (m.find()) { + m.appendReplacement(buf, String.format("%s", (char) Integer.parseInt(m.group(1), 16))); + } + m.appendTail(buf); + return buf.toString(); + } + return source; + } + +} \ No newline at end of file diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/util/EscapeHandlerTest.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/util/EscapeHandlerTest.java new file mode 100644 index 00000000..d8fb2707 --- /dev/null +++ b/spark-doris-connector/src/test/java/org/apache/doris/spark/util/EscapeHandlerTest.java @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.util; + +import junit.framework.TestCase; +import org.junit.Assert; + +import java.util.Properties; + +public class EscapeHandlerTest extends TestCase { + + public void testEscapeString() { + + + String s1 = "\\x09\\x09"; + String s2 = "\\x0A\\x0A"; + Assert.assertEquals("\t\t", EscapeHandler.escapeString(s1)); + Assert.assertEquals("\n\n", EscapeHandler.escapeString(s2)); + + } +} \ No newline at end of file