Skip to content

Commit

Permalink
[Test](auto-partition) Add auto partition p1 test (#28028)
Browse files Browse the repository at this point in the history
Add auto partition p1 test
  • Loading branch information
zclllyybb authored Dec 6, 2023
1 parent 1be513b commit 202cc84
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
CREATE TABLE `concurrent`(
`col1` datetimev2 not null,
`col2` boolean,
`col3` tinyint,
`col4` date,
`col5` float,
`col6` double,
`col7` string,
`col8` varchar(128),
`col9` decimal(9, 3),
`col10` char(128)
) duplicate KEY(`col1`)
AUTO PARTITION BY range date_trunc(`col1`, 'day')
(
)
DISTRIBUTED BY HASH(`col1`) BUCKETS 10
PROPERTIES (
"replication_num" = "1"
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

tables:
small_data_high_concurrent_load_range:
col1:
range: {min: "2020-01-01 00:00:00", max: "2023-12-31 23:59:59"}
force_not_null: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import groovy.io.FileType
import java.nio.file.Files
import java.nio.file.Paths
import java.net.URL
import java.io.File
import java.util.concurrent.locks.ReentrantLock

suite("multi_thread_load") {
def lock = new ReentrantLock()

// get doris-db from s3
def dirPath = context.file.parent
def fatherPath = context.file.parentFile.parentFile.getPath()
def fileName = "doris-dbgen"
def fileUrl = "http://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/doris-dbgen-23-10-18/doris-dbgen-23-10-20/doris-dbgen"
def filePath = Paths.get(dirPath, fileName)
if (!Files.exists(filePath)) {
new URL(fileUrl).withInputStream { inputStream ->
Files.copy(inputStream, filePath)
}
def file = new File(dirPath + "/" + fileName)
file.setExecutable(true)
}

def data_count = 20 // number of load tasks and threads
def rows = 100 // total rows to load

// generate datafiles via doris-dbgen
def doris_dbgen_create_data = { db_name, tb_name, part_type ->
def bulkSize = rows
def tableName = tb_name

def jdbcUrl = context.config.jdbcUrl
def urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3)
def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":"))
def sql_port
if (urlWithoutSchema.indexOf("/") >= 0) {
// e.g: jdbc:mysql://locahost:8080/?a=b
sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1, urlWithoutSchema.indexOf("/"))
} else {
// e.g: jdbc:mysql://locahost:8080
sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1)
}
String feHttpAddress = context.config.feHttpAddress
def http_port = feHttpAddress.substring(feHttpAddress.indexOf(":") + 1)

String realDb = db_name
String user = context.config.jdbcUser
String password = context.config.jdbcPassword

for (int i = 0; i < data_count; i++) {
def cm
if (password) {
cm = """${dirPath}/doris-dbgen gen --host ${sql_ip} --sql-port ${sql_port} --user ${user} --pass ${password} --database ${realDb} --table ${tableName} --rows ${rows} --bulk-size ${bulkSize} --http-port ${http_port} --config ${fatherPath}/doris_dbgen_conf/two_stream_load_conflict.yaml --save-to-dir ${dirPath}/${part_type}/${part_type}_${i}/"""
} else {
cm = """${dirPath}/doris-dbgen gen --host ${sql_ip} --sql-port ${sql_port} --user ${user} --database ${realDb} --table ${tableName} --rows ${rows} --bulk-size ${bulkSize} --http-port ${http_port} --config ${fatherPath}/doris_dbgen_conf/two_stream_load_conflict.yaml --save-to-dir ${dirPath}/${part_type}/${part_type}_${i}/"""
}
logger.info("datagen: " + cm)
def proc = cm.execute()
def sout = new StringBuilder(), serr = new StringBuilder()
proc.consumeProcessOutput(sout, serr)
proc.waitForOrKill(7200000)
// logger.info("std out: " + sout + "std err: " + serr)
}
}

def write_to_file = { cur_path, content ->
File file = new File(cur_path)
file.write(content)
}

def cm_list = []
def doris_dbgen_load_data = { db_name, tb_name, part_type ->
def tableName = tb_name

def jdbcUrl = context.config.jdbcUrl
def urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3)
def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":"))
def sql_port
if (urlWithoutSchema.indexOf("/") >= 0) {
// e.g: jdbc:mysql://locahost:8080/?a=b
sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1, urlWithoutSchema.indexOf("/"))
} else {
// e.g: jdbc:mysql://locahost:8080
sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1)
}
String feHttpAddress = context.config.feHttpAddress
def http_port = feHttpAddress.substring(feHttpAddress.indexOf(":") + 1)

String realDb = db_name
String user = context.config.jdbcUser
String password = context.config.jdbcPassword

for (int i = 0; i < data_count; i++) {
def cm = ""
def list = []
def dir = new File("""${dirPath}""" + "/" + part_type + "/" + part_type + "_" + i)
dir.eachFileRecurse (FileType.FILES) { file ->
list << file
}

if (password) {
cm = """curl --location-trusted -u ${user}:${password} -H "column_separator:|" -T ${list[0]} http://${sql_ip}:${http_port}/api/${realDb}/${tableName}/_stream_load"""
} else {
cm = """curl --location-trusted -u root: -H "column_separator:|" -T ${list[0]} http://${sql_ip}:${http_port}/api/${realDb}/${tableName}/_stream_load"""
}
logger.info("load data: " + cm)

def load_path = """${dirPath}/range/thread_load_${i}.sh"""
write_to_file(load_path, cm)
cm_list.add("""bash ${dirPath}/range/thread_load_${i}.sh""")
}
}

def data_delete = { part_type ->
def sql_cm = """rm -rf ${dirPath}/${part_type}"""
sql_cm.execute()
}

def database_name = "regression_test_auto_partition_concurrent"
def table_name = "concurrent"

sql """create database if not exists ${database_name};"""
sql """use ${database_name};"""
sql """drop table if exists ${table_name};"""
sql new File("""${fatherPath}/ddl/concurrent.sql""").text

data_delete("range")
doris_dbgen_create_data(database_name, table_name, "range")
doris_dbgen_load_data(database_name, table_name, "range")

def load_threads = []
def concurrent_load = { str ->
logger.info("load start:" + str)
def proc = str.execute()
def sout = new StringBuilder(), serr = new StringBuilder()
proc.consumeProcessOutput(sout, serr)
proc.waitForOrKill(600000) // 10 minutes
}

// for (int i = 0; i < data_count; i++) {
// logger.info("try to run " + i + " : " + cm_list[i])
// load_threads.add(Thread.startDaemon{concurrent_load(cm_list[i])})
// }
load_threads.add(Thread.startDaemon{concurrent_load(cm_list[0])})
load_threads.add(Thread.startDaemon{concurrent_load(cm_list[1])})
load_threads.add(Thread.startDaemon{concurrent_load(cm_list[2])})
load_threads.add(Thread.startDaemon{concurrent_load(cm_list[3])})
load_threads.add(Thread.startDaemon{concurrent_load(cm_list[4])})
load_threads.add(Thread.startDaemon{concurrent_load(cm_list[5])})
load_threads.add(Thread.startDaemon{concurrent_load(cm_list[6])})
load_threads.add(Thread.startDaemon{concurrent_load(cm_list[7])})
load_threads.add(Thread.startDaemon{concurrent_load(cm_list[8])})
load_threads.add(Thread.startDaemon{concurrent_load(cm_list[9])})
load_threads.add(Thread.startDaemon{concurrent_load(cm_list[10])})
load_threads.add(Thread.startDaemon{concurrent_load(cm_list[11])})
load_threads.add(Thread.startDaemon{concurrent_load(cm_list[12])})
load_threads.add(Thread.startDaemon{concurrent_load(cm_list[13])})
load_threads.add(Thread.startDaemon{concurrent_load(cm_list[14])})
load_threads.add(Thread.startDaemon{concurrent_load(cm_list[15])})
load_threads.add(Thread.startDaemon{concurrent_load(cm_list[16])})
load_threads.add(Thread.startDaemon{concurrent_load(cm_list[17])})
load_threads.add(Thread.startDaemon{concurrent_load(cm_list[18])})
load_threads.add(Thread.startDaemon{concurrent_load(cm_list[19])})

// wait them for finishing
for (Thread th in load_threads) {
th.join()
}

// check data count
def row_count_range = sql """select count() from ${table_name};"""
assertTrue(data_count*rows == row_count_range[0][0], "${data_count*rows}, ${row_count_range[0][0]}")
// check there's no intersect in partitions
def partition_res_range = sql """show partitions from ${table_name} order by PartitionName;"""
for (int i = 0; i < partition_res_range.size(); i++) {
for (int j = i+1; j < partition_res_range.size(); j++) {
if (partition_res_range[i][6] == partition_res_range[j][6]) {
assertTrue(false, "$i, $j")
}
}
}
}

0 comments on commit 202cc84

Please sign in to comment.