Skip to content

Commit

Permalink
[Enhancement] Add delimiter parser in sdk for common use (#285)
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <lpengfei2016@gmail.com>
  • Loading branch information
banmoy authored Sep 13, 2023
1 parent 21a970d commit 39ad45a
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ public void testEnableExactlyOnceLabelGen() throws Exception {
Map<String, String> options = new HashMap<>();
options.put("sink.semantic", "exactly-once");
options.put("sink.label-prefix", "test_label");
options.put("sink.enable.exactly-once.label-gen", "true");
options.put("sink.exactly-once.enable-label-gen", "true");
String checkpointDir = temporaryFolder.newFolder().toURI().toString();
testConfigurationBase(options,
env -> {
Expand Down Expand Up @@ -464,7 +464,7 @@ public void testAbortLingeringTransactionsWithCheckNum() throws Exception {
Map<String, String> options = new HashMap<>();
options.put("sink.semantic", "exactly-once");
options.put("sink.label-prefix", "test_label");
options.put("sink.abort.check-num-txns", "10");
options.put("sink.exactly-once.check-num-lingering-txn", "10");
String checkpointDir = temporaryFolder.newFolder().toURI().toString();
testConfigurationBase(options,
env -> {
Expand All @@ -478,6 +478,14 @@ public void testAbortLingeringTransactionsWithCheckNum() throws Exception {
);
}

@Test
public void testCsvFormatWithColumnSeparatorAndRowDelimiter() throws Exception {
Map<String, String> map = new HashMap<>();
map.put("sink.properties.column_separator", "\\x01");
map.put("sink.properties.row_delimiter", "\\0x2");
testConfigurationBase(map, env -> null);
}

@Test
public void testJsonFormat() throws Exception {
if (isSinkV2) {
Expand All @@ -492,7 +500,7 @@ public void testJsonFormat() throws Exception {
}

private void testConfigurationBase(Map<String, String> options, Function<StreamExecutionEnvironment, Void> setFlinkEnv) throws Exception {
String tableName = createPkTable("testAtLeastOnceBase");
String tableName = createPkTable("testConfigurationBase");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
setFlinkEnv.apply(env);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.data.load.stream;

import java.io.StringWriter;

/**
* Parse the delimiter which contains escape characters. The logic is same as
* that in StarRocks, See class Delimiter of StarRocks FE.
*/
public class DelimiterParser {

private static final String HEX_STRING = "0123456789ABCDEF";

private static byte[] hexStrToBytes(String hexStr) {
String upperHexStr = hexStr.toUpperCase();
int length = upperHexStr.length() / 2;
char[] hexChars = upperHexStr.toCharArray();
byte[] bytes = new byte[length];
for (int i = 0; i < length; i++) {
int pos = i * 2;
bytes[i] = (byte) (charToByte(hexChars[pos]) << 4 | (0xff & charToByte(hexChars[pos + 1])));
}
return bytes;
}

private static byte charToByte(char c) {
return (byte) HEX_STRING.indexOf(c);
}

public static String convertDelimiter(String originStr) {
if (originStr == null || originStr.isEmpty()) {
throw new RuntimeException("The delimiter can't be null or empty");
}

if (originStr.toUpperCase().startsWith("\\X") || originStr.toUpperCase().startsWith("0X")) {
String hexStr = originStr.substring(2);
// check hex str
if (hexStr.isEmpty()) {
throw new RuntimeException("Invalid delimiter '" + originStr + ": empty hex string");
}
if (hexStr.length() % 2 != 0) {
throw new RuntimeException("Invalid delimiter '" + originStr + ": hex length must be a even number");
}
for (char hexChar : hexStr.toUpperCase().toCharArray()) {
if (HEX_STRING.indexOf(hexChar) == -1) {
throw new RuntimeException("Invalid delimiter '" + originStr + "': invalid hex format");
}
}

// transform to delimiter
StringWriter writer = new StringWriter();
for (byte b : hexStrToBytes(hexStr)) {
writer.append((char) b);
}
return writer.toString();
} else {
return originStr;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.data.load.stream;

import org.junit.Assert;
import org.junit.Test;

public class DelimiterParserTest {

@Test
public void testNormal() {
Assert.assertEquals("\n", DelimiterParser.convertDelimiter("\n"));
Assert.assertEquals("\1", DelimiterParser.convertDelimiter("\\x01"));
Assert.assertEquals("\0\1", DelimiterParser.convertDelimiter("\\x0001"));
Assert.assertEquals("|", DelimiterParser.convertDelimiter("|"));
Assert.assertEquals("\\|", DelimiterParser.convertDelimiter("\\|"));
}

@Test(expected = RuntimeException.class)
public void testHexFormatError() {
DelimiterParser.convertDelimiter("\\x0g");
}

@Test(expected = RuntimeException.class)
public void testHexLengthError() {
DelimiterParser.convertDelimiter("\\x011");
}
}

0 comments on commit 39ad45a

Please sign in to comment.