diff --git a/java/yb-cql/src/test/java/org/yb/cql/BaseCQLTest.java b/java/yb-cql/src/test/java/org/yb/cql/BaseCQLTest.java index 5f89a98d2079..be98b97a306a 100644 --- a/java/yb-cql/src/test/java/org/yb/cql/BaseCQLTest.java +++ b/java/yb-cql/src/test/java/org/yb/cql/BaseCQLTest.java @@ -22,6 +22,8 @@ import static org.junit.Assert.fail; import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.QueryOptions; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.SimpleStatement; @@ -109,6 +111,9 @@ public static void setUpBeforeClass() throws Exception { } public Cluster.Builder getDefaultClusterBuilder() { + // Set default consistency level to strong consistency + QueryOptions queryOptions = new QueryOptions(); + queryOptions.setConsistencyLevel(ConsistencyLevel.YB_STRONG); // Set a long timeout for CQL queries since build servers might be really slow (especially Mac // Mini). SocketOptions socketOptions = new SocketOptions(); @@ -116,6 +121,7 @@ public Cluster.Builder getDefaultClusterBuilder() { socketOptions.setConnectTimeoutMillis(60 * 1000); return Cluster.builder() .addContactPointsWithPorts(miniCluster.getCQLContactPoints()) + .withQueryOptions(queryOptions) .withSocketOptions(socketOptions); } diff --git a/java/yb-cql/src/test/java/org/yb/cql/TestTransaction.java b/java/yb-cql/src/test/java/org/yb/cql/TestTransaction.java new file mode 100644 index 000000000000..0a5c3f88c75d --- /dev/null +++ b/java/yb-cql/src/test/java/org/yb/cql/TestTransaction.java @@ -0,0 +1,228 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// +package org.yb.cql; + +import java.util.*; + +import org.junit.Test; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertEquals; + +import com.datastax.driver.core.Row; +import com.datastax.driver.core.PreparedStatement; + +public class TestTransaction extends BaseCQLTest { + + private void createTable(String name, String columns, boolean transactional) { + session.execute(String.format("create table %s (%s) with transactions = { 'enabled' : %b };", + name, columns, transactional)); + } + + private void createTables() throws Exception { + createTable("test_txn1", "k int primary key, c1 int, c2 text", true); + createTable("test_txn2", "k int primary key, c1 int, c2 text", true); + createTable("test_txn3", "k int primary key, c1 int, c2 text", true); + } + + @Test + public void testInsertMultipleTables() throws Exception { + + createTables(); + + // Insert into multiple tables and ensure all rows are written with same writetime. + session.execute("begin transaction;" + + "insert into test_txn1 (k, c1, c2) values (?, ?, ?);" + + "insert into test_txn2 (k, c1, c2) values (?, ?, ?);" + + "insert into test_txn3 (k, c1, c2) values (?, ?, ?);" + + "end transaction;", + Integer.valueOf(1), Integer.valueOf(1), "v1", + Integer.valueOf(2), Integer.valueOf(2), "v2", + Integer.valueOf(3), Integer.valueOf(3), "v3"); + + Vector rows = new Vector(); + for (int i = 1; i <= 3; i++) { + rows.add(session.execute(String.format("select c1, c2, writetime(c1), writetime(c2) " + + "from test_txn%d where k = ?;", i), + Integer.valueOf(i)).one()); + assertNotNull(rows.get(i - 1)); + assertEquals(i, rows.get(i - 1).getInt("c1")); + assertEquals("v" + i, rows.get(i - 1).getString("c2")); + } + + // Verify writetimes are same. + assertEquals(rows.get(0).getLong("writetime(c1)"), rows.get(1).getLong("writetime(c1)")); + assertEquals(rows.get(0).getLong("writetime(c1)"), rows.get(2).getLong("writetime(c1)")); + assertEquals(rows.get(0).getLong("writetime(c2)"), rows.get(1).getLong("writetime(c2)")); + assertEquals(rows.get(0).getLong("writetime(c2)"), rows.get(2).getLong("writetime(c2)")); + } + + @Test + public void testInsertUpdateSameTable() throws Exception { + + createTables(); + + // Insert multiple keys into same table and ensure all rows are written with same writetime. + session.execute("start transaction;" + + "insert into test_txn1 (k, c1, c2) values (?, ?, ?);" + + "insert into test_txn1 (k, c1, c2) values (?, ?, ?);" + + "update test_txn1 set c1 = ?, c2 = ? where k = ?;" + + "commit;", + Integer.valueOf(1), Integer.valueOf(1), "v1", + Integer.valueOf(2), Integer.valueOf(2), "v2", + Integer.valueOf(3), "v3", Integer.valueOf(3)); + + Vector rows = new Vector(); + HashSet values = new HashSet(); + for (Row row : session.execute("select c1, c2, writetime(c1), writetime(c2) " + + "from test_txn1 where k in ?;", + Arrays.asList(Integer.valueOf(1), + Integer.valueOf(2), + Integer.valueOf(3)))) { + rows.add(row); + values.add(row.getInt("c1") + "," + row.getString("c2")); + } + assertEquals(3, rows.size()); + assertEquals(new HashSet(Arrays.asList("1,v1", "2,v2", "3,v3")), values); + + // Verify writetimes are same. + assertEquals(rows.get(0).getLong("writetime(c1)"), rows.get(1).getLong("writetime(c1)")); + assertEquals(rows.get(0).getLong("writetime(c1)"), rows.get(2).getLong("writetime(c1)")); + assertEquals(rows.get(0).getLong("writetime(c2)"), rows.get(1).getLong("writetime(c2)")); + assertEquals(rows.get(0).getLong("writetime(c2)"), rows.get(2).getLong("writetime(c2)")); + } + + @Test + public void testMixDML() throws Exception { + + createTables(); + + // Test non-transactional writes to transaction-enabled table. + for (int i = 1; i <= 2; i++) { + session.execute("insert into test_txn1 (k, c1, c2) values (?, ?, ?);", + Integer.valueOf(i), Integer.valueOf(i), "v" + i); + } + assertQuery("select * from test_txn1;", + new HashSet(Arrays.asList("Row[1, 1, v1]", "Row[2, 2, v2]"))); + + // Test a mix of insert/update/delete in the same transaction. + session.execute("begin transaction;" + + "insert into test_txn1 (k, c1, c2) values (?, ?, ?);" + + "update test_txn1 set c1 = 0, c2 = 'v0' where k = ?;" + + "delete from test_txn1 where k = ?;" + + "end transaction;", + Integer.valueOf(3), Integer.valueOf(3), "v3", + Integer.valueOf(2), + Integer.valueOf(1)); + + // Verify the rows. + Vector rows = new Vector(); + HashSet values = new HashSet(); + for (Row row : session.execute("select k, c1, c2, writetime(c1), writetime(c2) " + + "from test_txn1 where k in ?;", + Arrays.asList(Integer.valueOf(1), + Integer.valueOf(2), + Integer.valueOf(3)))) { + rows.add(row); + values.add(row.getInt("k") + "," + row.getInt("c1") + "," + row.getString("c2")); + } + assertEquals(2, rows.size()); + assertEquals(new HashSet(Arrays.asList("2,0,v0", "3,3,v3")), values); + + // Verify writetimes are same. + assertEquals(rows.get(0).getLong("writetime(c1)"), rows.get(1).getLong("writetime(c1)")); + assertEquals(rows.get(0).getLong("writetime(c2)"), rows.get(1).getLong("writetime(c2)")); + } + + @Test + public void testPrepareStatement() throws Exception { + + createTable("test_hash", "h1 int, h2 int, r int, c text, primary key ((h1, h2), r)", true); + + // Prepare a transaction statement. Verify the hash key of the whole statement is the first + // insert statement that has the full hash key specified (third insert). + PreparedStatement stmt = + session.prepare("begin transaction;" + + "insert into test_hash (h1, h2, r, c) values (1, 1, ?, ?);" + + "insert into test_hash (h1, h2, r, c) values (?, 2, ?, ?);" + + "insert into test_hash (h1, h2, r, c) values (?, ?, ?, ?);" + + "end transaction;"); + int hashIndexes[] = stmt.getRoutingKeyIndexes(); + assertEquals(2, hashIndexes.length); + assertEquals(5, hashIndexes[0]); + assertEquals(6, hashIndexes[1]); + + session.execute(stmt.bind(Integer.valueOf(1), "v1", + Integer.valueOf(2), Integer.valueOf(2), "v2", + Integer.valueOf(3), Integer.valueOf(3), Integer.valueOf(3), "v3")); + + // Verify the rows. + Vector rows = new Vector(); + HashSet values = new HashSet(); + for (Row row : session.execute("select h1, h2, r, c, writetime(c) from test_hash;")) { + rows.add(row); + values.add(row.getInt("h1")+","+row.getInt("h2")+","+row.getInt("r")+","+row.getString("c")); + } + assertEquals(3, rows.size()); + assertEquals(new HashSet(Arrays.asList("1,1,1,v1", + "2,2,2,v2", + "3,3,3,v3")), values); + // Verify writetimes are same. + assertEquals(rows.get(0).getLong("writetime(c)"), rows.get(1).getLong("writetime(c)")); + assertEquals(rows.get(0).getLong("writetime(c)"), rows.get(2).getLong("writetime(c)")); + } + + @Test + public void testInvalidStatements() throws Exception { + createTables(); + + // Missing "begin transaction" + runInvalidStmt("insert into test_txn1 (k, c1, c2) values (?, ?, ?);" + + "insert into test_txn2 (k, c1, c2) values (?, ?, ?);" + + "commit;"); + + // Missing "end transaction" + runInvalidStmt("begin transaction;" + + "insert into test_txn1 (k, c1, c2) values (?, ?, ?);" + + "insert into test_txn2 (k, c1, c2) values (?, ?, ?);"); + + // Missing "begin / end transaction" + runInvalidStmt("insert into test_txn1 (k, c1, c2) values (?, ?, ?);" + + "insert into test_txn2 (k, c1, c2) values (?, ?, ?);"); + + // Writing to non-transactional table + createTable("test_non_txn", "k int primary key, c1 int, c2 text", false); + runInvalidStmt("begin transaction;" + + "insert into test_txn1 (k, c1, c2) values (?, ?, ?);" + + "insert into test_non_txn (k, c1, c2) values (?, ?, ?);" + + "end transaction;"); + + // Conditional DML not supported yet + runInvalidStmt("begin transaction;" + + "insert into test_txn1 (k, c1, c2) values (?, ?, ?) if not exists;" + + "end transaction;"); + + // Multiple writes to the same row are not allowed + runInvalidStmt("begin transaction;" + + "insert into test_txn1 (k, c1, c2) values (1, 1, 'v1');" + + "delete from test_txn1 where k = 1;" + + "end transaction;"); + + // Multiple writes to the same static row are not allowed + createTable("test_static", "h int, r int, s int static, c int, primary key ((h), r)", true); + runInvalidStmt("begin transaction;" + + "insert into test_static (h, s) values (1, 1);" + + "insert into test_static (h, r, s, c) values (1, 2, 3, 4);" + + "end transaction;"); + } +} diff --git a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/AppBase.java b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/AppBase.java index 47075e5feb55..eb2b7050bb9e 100644 --- a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/AppBase.java +++ b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/AppBase.java @@ -16,6 +16,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Random; import java.util.concurrent.atomic.AtomicBoolean; @@ -383,24 +384,16 @@ public void appendMessage(StringBuilder sb) { } /** - *Method to print the description for the app. - * @param linePrefix : a prefix to be added to each line that is being printed. - * @param lineSuffix : a suffix to be added at the end of each line. - * @return the formatted description string. + * Returns the description of the app. + * @return the description splitted in lines. */ - public String getWorkloadDescription(String linePrefix, String lineSuffix) { - return ""; - } + public List getWorkloadDescription() { return Collections.EMPTY_LIST; } /** - * Method to pretty print the example usage for the app. - * @param linePrefix - * @param lineSuffix - * @return + * Returns the example usage of the app. + * @return the example usage splitted in lines. */ - public String getExampleUsageOptions(String linePrefix, String lineSuffix) { - return ""; - } + public List getExampleUsageOptions() { return Collections.EMPTY_LIST; } ////////////// The following methods framework/helper methods for subclasses. //////////////////// diff --git a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraBatchKeyValue.java b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraBatchKeyValue.java index 813c57acb2ca..062742eb8136 100644 --- a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraBatchKeyValue.java +++ b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraBatchKeyValue.java @@ -14,7 +14,9 @@ package com.yugabyte.sample.apps; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.HashSet; +import java.util.List; import org.apache.log4j.Logger; @@ -78,50 +80,24 @@ public long doWrite() { } @Override - public String getWorkloadDescription(String optsPrefix, String optsSuffix) { - StringBuilder sb = new StringBuilder(); - sb.append(optsPrefix); - sb.append("Sample batch key-value app built on Cassandra. The app writes out 1M unique string"); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("key in batches, each key with a string value. There are multiple readers and"); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("writers that update these keys and read them indefinitely. Note that the batch"); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("size and the number of reads and writes to perform can be specified as parameters."); - sb.append(optsSuffix); - return sb.toString(); + public List getWorkloadDescription() { + return Arrays.asList( + "Sample batch key-value app built on Cassandra. The app writes out 1M unique string", + "key in batches, each key with a string value. There are multiple readers and", + "writers that update these keys and read them indefinitely. Note that the batch", + "size and the number of reads and writes to perform can be specified as parameters."); } @Override - public String getExampleUsageOptions(String optsPrefix, String optsSuffix) { - StringBuilder sb = new StringBuilder(); - sb.append(optsPrefix); - sb.append("--num_unique_keys " + appConfig.numUniqueKeysToWrite); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--num_reads " + appConfig.numKeysToRead); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--num_writes " + appConfig.numKeysToWrite); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--value_size " + appConfig.valueSize); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--num_threads_read " + appConfig.numReaderThreads); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--num_threads_write " + appConfig.numWriterThreads); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--batch_size " + appConfig.cassandraBatchSize); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--table_ttl_seconds " + appConfig.tableTTLSeconds); - sb.append(optsSuffix); - return sb.toString(); + public List getExampleUsageOptions() { + return Arrays.asList( + "--num_unique_keys " + appConfig.numUniqueKeysToWrite, + "--num_reads " + appConfig.numKeysToRead, + "--num_writes " + appConfig.numKeysToWrite, + "--value_size " + appConfig.valueSize, + "--num_threads_read " + appConfig.numReaderThreads, + "--num_threads_write " + appConfig.numWriterThreads, + "--batch_size " + appConfig.cassandraBatchSize, + "--table_ttl_seconds " + appConfig.tableTTLSeconds); } } diff --git a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraHelloWorld.java b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraHelloWorld.java index 00e02f3476d9..2954b87cedca 100644 --- a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraHelloWorld.java +++ b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraHelloWorld.java @@ -13,6 +13,7 @@ package com.yugabyte.sample.apps; +import java.util.Arrays; import java.util.List; import org.apache.log4j.Logger; @@ -83,14 +84,9 @@ public void run() { } @Override - public String getWorkloadDescription(String optsPrefix, String optsSuffix) { - StringBuilder sb = new StringBuilder(); - sb.append(optsPrefix); - sb.append("A very simple hello world app built on Cassandra. The app writes one employee row"); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("into the 'Employee' table"); - sb.append(optsSuffix); - return sb.toString(); + public List getWorkloadDescription() { + return Arrays.asList( + "A very simple hello world app built on Cassandra. The app writes one employee row", + "into the 'Employee' table"); } } diff --git a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraKeyValue.java b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraKeyValue.java index ad72dd2ec412..2d3035547b1f 100644 --- a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraKeyValue.java +++ b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraKeyValue.java @@ -216,47 +216,23 @@ public void appendParentMessage(StringBuilder sb) { } @Override - public String getWorkloadDescription(String optsPrefix, String optsSuffix) { - StringBuilder sb = new StringBuilder(); - sb.append(optsPrefix); - sb.append("Sample key-value app built on Cassandra. The app writes out 1M unique string keys"); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("each with a string value. There are multiple readers and writers that update these"); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("keys and read them indefinitely. Note that the number of reads and writes to"); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("perform can be specified as a parameter."); - sb.append(optsSuffix); - return sb.toString(); + public List getWorkloadDescription() { + return Arrays.asList( + "Sample key-value app built on Cassandra. The app writes out 1M unique string keys", + "each with a string value. There are multiple readers and writers that update these", + "keys and read them indefinitely. Note that the number of reads and writes to", + "perform can be specified as a parameter."); } @Override - public String getExampleUsageOptions(String optsPrefix, String optsSuffix) { - StringBuilder sb = new StringBuilder(); - sb.append(optsPrefix); - sb.append("--num_unique_keys " + appConfig.numUniqueKeysToWrite); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--num_reads " + appConfig.numKeysToRead); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--num_writes " + appConfig.numKeysToWrite); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--value_size " + appConfig.valueSize); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--num_threads_read " + appConfig.numReaderThreads); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--num_threads_write " + appConfig.numWriterThreads); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--table_ttl_seconds " + appConfig.tableTTLSeconds); - sb.append(optsSuffix); - return sb.toString(); + public List getExampleUsageOptions() { + return Arrays.asList( + "--num_unique_keys " + appConfig.numUniqueKeysToWrite, + "--num_reads " + appConfig.numKeysToRead, + "--num_writes " + appConfig.numKeysToWrite, + "--value_size " + appConfig.valueSize, + "--num_threads_read " + appConfig.numReaderThreads, + "--num_threads_write " + appConfig.numWriterThreads, + "--table_ttl_seconds " + appConfig.tableTTLSeconds); } } diff --git a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraSparkKeyValueCopy.java b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraSparkKeyValueCopy.java index a0b4257d20c8..8c2e651f23a9 100644 --- a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraSparkKeyValueCopy.java +++ b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraSparkKeyValueCopy.java @@ -14,6 +14,8 @@ package com.yugabyte.sample.apps; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; import com.datastax.driver.core.Session; import com.datastax.driver.core.SimpleStatement; @@ -136,35 +138,19 @@ public void run() { } @Override - public String getWorkloadDescription(String optsPrefix, String optsSuffix) { - StringBuilder sb = new StringBuilder(); - sb.append(optsPrefix); - sb.append("Simple Spark app that reads from an input table and writes to an output table."); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("It does no additional processing of the data, only tests reading/writing data."); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("Use num_threads_write option to set the number of Spark worker threads."); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("Input source an input_table which defaults to '" + DEFAULT_INPUT_TABLE_NAME + "'"); - sb.append(optsSuffix); - return sb.toString(); + public List getWorkloadDescription() { + return Arrays.asList( + "Simple Spark app that reads from an input table and writes to an output table.", + "It does no additional processing of the data, only tests reading/writing data.", + "Use num_threads_write option to set the number of Spark worker threads.", + "Input source an input_table which defaults to '" + DEFAULT_INPUT_TABLE_NAME + "'"); } @Override - public String getExampleUsageOptions(String optsPrefix, String optsSuffix) { - StringBuilder sb = new StringBuilder(); - sb.append(optsPrefix); - sb.append("--num_threads_write " + appConfig.numWriterThreads); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--" + INPUT_TABLE_ARG_NAME + " " + DEFAULT_INPUT_TABLE_NAME); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--" + OUTPUT_TABLE_ARG_NAME + " " + DEFAULT_OUTPUT_TABLE_NAME); - sb.append(optsSuffix); - return sb.toString(); + public List getExampleUsageOptions() { + return Arrays.asList( + "--num_threads_write " + appConfig.numWriterThreads, + "--" + INPUT_TABLE_ARG_NAME + " " + DEFAULT_INPUT_TABLE_NAME, + "--" + OUTPUT_TABLE_ARG_NAME + " " + DEFAULT_OUTPUT_TABLE_NAME); } } diff --git a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraSparkWordCount.java b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraSparkWordCount.java index d51ab23a98f4..5f68b3acd47e 100644 --- a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraSparkWordCount.java +++ b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraSparkWordCount.java @@ -14,6 +14,7 @@ package com.yugabyte.sample.apps; import java.util.Arrays; +import java.util.List; import com.datastax.driver.core.Session; import com.yugabyte.sample.common.CmdLineOpts; @@ -175,38 +176,20 @@ public void run() { } @Override - public String getWorkloadDescription(String optsPrefix, String optsSuffix) { - StringBuilder sb = new StringBuilder(); - sb.append(optsPrefix); - sb.append("Simple Spark word count app that reads from an input table or file to compute "); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("word count and saves results in an output table"); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("Input source is either input_file or input_table. If none is given a sample "); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("Cassandra table " + defaultInputTableName + " is created and used as input"); - sb.append(optsSuffix); - return sb.toString(); + public List getWorkloadDescription() { + return Arrays.asList( + "Simple Spark word count app that reads from an input table or file to compute ", + "word count and saves results in an output table", + "Input source is either input_file or input_table. If none is given a sample ", + "Cassandra table " + defaultInputTableName + " is created and used as input"); } @Override - public String getExampleUsageOptions(String optsPrefix, String optsSuffix) { - StringBuilder sb = new StringBuilder(); - sb.append(optsPrefix); - sb.append("--num_threads_write " + appConfig.numWriterThreads); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--wordcount_output_table " + defaultOutputTableName); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--wordcount_input_file "); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--wordcount_input_table "); - sb.append(optsSuffix); - return sb.toString(); + public List getExampleUsageOptions() { + return Arrays.asList( + "--num_threads_write " + appConfig.numWriterThreads, + "--wordcount_output_table " + defaultOutputTableName, + "--wordcount_input_file ", + "--wordcount_input_table
"); } } diff --git a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraStockTicker.java b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraStockTicker.java index d05f7ba542bc..94e1c0445b60 100644 --- a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraStockTicker.java +++ b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraStockTicker.java @@ -305,47 +305,23 @@ public String toString() { } @Override - public String getWorkloadDescription(String optsPrefix, String optsSuffix) { - StringBuilder sb = new StringBuilder(); - sb.append(optsPrefix); - sb.append("Sample stock ticker app built on CQL. The app models 10,000 stock tickers"); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("each of which emits quote data every second. The raw data is written into the"); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("'stock_ticker_raw' table, which retains data for one day. The 'stock_ticker_1min'"); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("table models downsampled ticker data, is written to once a minute and retains data"); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("for 60 days. Every read query gets the latest value of the stock symbol from the"); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("'stock_ticker_raw' table."); - sb.append(optsSuffix); - return sb.toString(); + public List getWorkloadDescription() { + return Arrays.asList( + "Sample stock ticker app built on CQL. The app models 10,000 stock tickers", + "each of which emits quote data every second. The raw data is written into the", + "'stock_ticker_raw' table, which retains data for one day. The 'stock_ticker_1min'", + "table models downsampled ticker data, is written to once a minute and retains data", + "for 60 days. Every read query gets the latest value of the stock symbol from the", + "'stock_ticker_raw' table."); } @Override - public String getExampleUsageOptions(String optsPrefix, String optsSuffix) { - StringBuilder sb = new StringBuilder(); - sb.append(optsPrefix); - sb.append("--num_threads_read " + appConfig.numReaderThreads); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--num_threads_write " + appConfig.numWriterThreads); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--num_ticker_symbols " + num_ticker_symbols); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--data_emit_rate_millis " + data_emit_rate_millis); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--table_ttl_seconds " + appConfig.tableTTLSeconds); - sb.append(optsSuffix); - return sb.toString(); + public List getExampleUsageOptions() { + return Arrays.asList( + "--num_threads_read " + appConfig.numReaderThreads, + "--num_threads_write " + appConfig.numWriterThreads, + "--num_ticker_symbols " + num_ticker_symbols, + "--data_emit_rate_millis " + data_emit_rate_millis, + "--table_ttl_seconds " + appConfig.tableTTLSeconds); } } diff --git a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraTimeseries.java b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraTimeseries.java index 24de0ccdb322..188258cf6dfb 100644 --- a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraTimeseries.java +++ b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraTimeseries.java @@ -367,59 +367,27 @@ public String toString() { } @Override - public String getWorkloadDescription(String optsPrefix, String optsSuffix) { - StringBuilder sb = new StringBuilder(); - sb.append(optsPrefix); - sb.append("Sample timeseries/IoT app built on CQL. The app models 100 users, each of"); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("whom own 5-10 devices. Each device emits 5-10 metrics per second. The data is"); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("written into the 'ts_metrics_raw' table, which retains data for one day. Note that"); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("the number of metrics written is a lot more than the number of metrics read as is"); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("typical in such workloads, and the payload size for each write is 100 bytes. Every"); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("read query fetches the last 1-3 hours of metrics for a user's device."); - sb.append(optsSuffix); - return sb.toString(); + public List getWorkloadDescription() { + return Arrays.asList( + "Sample timeseries/IoT app built on CQL. The app models 100 users, each of", + "whom own 5-10 devices. Each device emits 5-10 metrics per second. The data is", + "written into the 'ts_metrics_raw' table, which retains data for one day. Note that", + "the number of metrics written is a lot more than the number of metrics read as is", + "typical in such workloads, and the payload size for each write is 100 bytes. Every", + "read query fetches the last 1-3 hours of metrics for a user's device."); } @Override - public String getExampleUsageOptions(String optsPrefix, String optsSuffix) { - StringBuilder sb = new StringBuilder(); - sb.append(optsPrefix); - sb.append("--num_threads_read " + appConfig.numReaderThreads); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--num_threads_write " + appConfig.numWriterThreads); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--num_users " + num_users); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--min_nodes_per_user " + min_nodes_per_user); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--max_nodes_per_user " + max_nodes_per_user); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--min_metrics_count " + min_metrics_count); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--max_metrics_count " + max_metrics_count); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--data_emit_rate_millis " + data_emit_rate_millis); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--table_ttl_seconds " + appConfig.tableTTLSeconds); - sb.append(optsSuffix); - return sb.toString(); + public List getExampleUsageOptions() { + return Arrays.asList( + "--num_threads_read " + appConfig.numReaderThreads, + "--num_threads_write " + appConfig.numWriterThreads, + "--num_users " + num_users, + "--min_nodes_per_user " + min_nodes_per_user, + "--max_nodes_per_user " + max_nodes_per_user, + "--min_metrics_count " + min_metrics_count, + "--max_metrics_count " + max_metrics_count, + "--data_emit_rate_millis " + data_emit_rate_millis, + "--table_ttl_seconds " + appConfig.tableTTLSeconds); } } diff --git a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraTransactionalKeyValue.java b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraTransactionalKeyValue.java new file mode 100644 index 000000000000..318863b36b13 --- /dev/null +++ b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraTransactionalKeyValue.java @@ -0,0 +1,266 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +package com.yugabyte.sample.apps; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; + +import com.yugabyte.sample.common.CmdLineOpts; +import org.apache.log4j.Logger; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.yugabyte.sample.common.SimpleLoadGenerator.Key; + +/** + * This workload writes and reads some random string keys from a CQL server. By default, this app + * inserts a million keys, and reads/updates them indefinitely. + */ +public class CassandraTransactionalKeyValue extends AppBase { + private static final Logger LOG = Logger.getLogger(CassandraKeyValue.class); + + // Static initialization of this workload's config. These are good defaults for getting a decent + // read dominated workload on a reasonably powered machine. Exact IOPS will of course vary + // depending on the machine and what resources it has to spare. + static { + // Disable the read-write percentage. + appConfig.readIOPSPercentage = -1; + // Set the read and write threads to 1 each. + appConfig.numReaderThreads = 24; + appConfig.numWriterThreads = 2; + // The number of keys to read. + appConfig.numKeysToRead = -1; + // The number of keys to write. This is the combined total number of inserts and updates. + appConfig.numKeysToWrite = -1; + // The number of unique keys to write. This determines the number of inserts (as opposed to + // updates). + appConfig.numUniqueKeysToWrite = NUM_UNIQUE_KEYS; + } + + // The default table name to create and use for CRUD ops. + private static final String DEFAULT_TABLE_NAME = CassandraKeyValue.class.getSimpleName(); + + // The shared prepared select statement for fetching the data. + private static volatile PreparedStatement preparedSelect; + + // The shared prepared statement for inserting into the table. + private static volatile PreparedStatement preparedInsert; + + // Lock for initializing prepared statement objects. + private static final Object prepareInitLock = new Object(); + + public CassandraTransactionalKeyValue() { + buffer = new byte[appConfig.valueSize]; + } + + /** + * Drop the table created by this app. + */ + @Override + public void dropTable() { + dropCassandraTable(getTableName()); + } + + @Override + public List getCreateTableStatements() { + String create_stmt = String.format( + "CREATE TABLE IF NOT EXISTS %s (k varchar, v blob, primary key (k)) " + + "WITH transactions = { 'enabled' : true };", getTableName()); + return Arrays.asList(create_stmt); + } + + protected PreparedStatement getPreparedSelect(String selectStmt, boolean localReads) { + if (preparedSelect == null) { + synchronized (prepareInitLock) { + if (preparedSelect == null) { + // Create the prepared statement object. + preparedSelect = getCassandraClient().prepare(selectStmt); + if (localReads) { + LOG.debug("Doing local reads"); + preparedSelect.setConsistencyLevel(ConsistencyLevel.ONE); + } else { + preparedSelect.setConsistencyLevel(ConsistencyLevel.QUORUM); + } + } + } + } + return preparedSelect; + } + + public String getTableName() { + return appConfig.tableName != null ? appConfig.tableName : DEFAULT_TABLE_NAME; + } + + private PreparedStatement getPreparedSelect() { + return getPreparedSelect(String.format("SELECT k, v, writetime(v) FROM %s WHERE k in :k;", + getTableName()), + appConfig.localReads); + } + + @Override + public synchronized void resetClients() { + synchronized (prepareInitLock) { + preparedInsert = null; + preparedSelect = null; + } + super.resetClients(); + } + + @Override + public synchronized void destroyClients() { + synchronized (prepareInitLock) { + preparedInsert = null; + preparedSelect = null; + } + super.destroyClients(); + } + + @Override + public long doRead() { + Key key = getSimpleLoadGenerator().getKeyToRead(); + if (key == null) { + // There are no keys to read yet. + return 0; + } + // Do the read from Cassandra. + // Bind the select statement. + BoundStatement select = getPreparedSelect().bind(Arrays.asList(key.asString() + "_1", + key.asString() + "_2")); + ResultSet rs = getCassandraClient().execute(select); + List rows = rs.all(); + if (rows.size() != 2) { + LOG.fatal("Read key: " + key.asString() + " expected 2 row in result, got " + rows.size()); + return 1; + } + if (appConfig.valueSize == 0) { + ByteBuffer buf = rows.get(0).getBytes(1); + String value1 = new String(buf.array()); + key.verify(value1); + + buf = rows.get(1).getBytes(1); + String value2 = new String(buf.array()); + if (!value1.equals(value2)) { + LOG.fatal("Value mismatch for key: " + key.toString() + ", " + + value1 + " vs " + value2); + } + } else { + ByteBuffer value = rows.get(0).getBytes(1); + byte[] bytes1 = new byte[value.capacity()]; + value.get(bytes1); + verifyRandomValue(key, bytes1); + + value = rows.get(1).getBytes(1); + byte[] bytes2 = new byte[value.capacity()]; + if (!bytes1.equals(bytes2)) { + LOG.fatal("Value mismatch for key: " + key.toString()); + } + } + if (rows.get(0).getLong(2) != rows.get(1).getLong(2)) { + LOG.fatal("Writetime mismatch for key: " + key.toString() + ", " + + rows.get(0).getLong(2) + " vs " + rows.get(1).getLong(2)); + } + + LOG.debug("Read key: " + key.toString()); + return 1; + } + + protected PreparedStatement getPreparedInsert(String insertStmt) { + if (preparedInsert == null) { + synchronized (prepareInitLock) { + if (preparedInsert == null) { + // Create the prepared statement object. + preparedInsert = getCassandraClient().prepare(insertStmt); + } + } + } + return preparedInsert; + } + + protected PreparedStatement getPreparedInsert() { + return getPreparedInsert(String.format( + "BEGIN TRANSACTION;" + + "INSERT INTO %s (k, v) VALUES (:k1, :v);" + + "INSERT INTO %s (k, v) VALUES (:k2, :v);" + + "END TRANSACTION;", + getTableName(), + getTableName())); + } + + @Override + public long doWrite() { + Key key = getSimpleLoadGenerator().getKeyToWrite(); + try { + // Do the write to Cassandra. + BoundStatement insert = null; + if (appConfig.valueSize == 0) { + String value = key.getValueStr(); + insert = getPreparedInsert() + .bind() + .setString("k1", key.asString() + "_1") + .setString("k2", key.asString() + "_2") + .setBytes("v", ByteBuffer.wrap(value.getBytes())); + } else { + byte[] value = getRandomValue(key); + insert = getPreparedInsert() + .bind() + .setString("k1", key.asString() + "_1") + .setString("k2", key.asString() + "_2") + .setBytes("v", ByteBuffer.wrap(value)); + } + ResultSet resultSet = getCassandraClient().execute(insert); + LOG.debug("Wrote key: " + key.toString() + ", return code: " + resultSet.toString()); + getSimpleLoadGenerator().recordWriteSuccess(key); + return 1; + } catch (Exception e) { + getSimpleLoadGenerator().recordWriteFailure(key); + throw e; + } + } + + @Override + public void appendMessage(StringBuilder sb) { + super.appendMessage(sb); + sb.append("maxWrittenKey: " + getSimpleLoadGenerator().getMaxWrittenKey() + " | "); + sb.append("maxGeneratedKey: " + getSimpleLoadGenerator().getMaxGeneratedKey() + " | "); + } + + public void appendParentMessage(StringBuilder sb) { + super.appendMessage(sb); + } + + @Override + public List getWorkloadDescription() { + return Arrays.asList( + "Sample key-value app with multi-row transaction built on Cassandra. The app writes", + "out 1M unique string keys in pairs, with each pair of keys with the same string", + "value written in a transaction. There are multiple readers and writers that update", + "these keys and read them in pair indefinitely. Note that the number of reads and ", + "writes to perform can be specified as a parameter."); + } + + @Override + public List getExampleUsageOptions() { + return Arrays.asList( + "--num_unique_keys " + appConfig.numUniqueKeysToWrite, + "--num_reads " + appConfig.numKeysToRead, + "--num_writes " + appConfig.numKeysToWrite, + "--value_size " + appConfig.valueSize, + "--num_threads_read " + appConfig.numReaderThreads, + "--num_threads_write " + appConfig.numWriterThreads); + } +} diff --git a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraUserId.java b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraUserId.java index 70482d723201..a83025df27f0 100644 --- a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraUserId.java +++ b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraUserId.java @@ -130,20 +130,11 @@ public void appendMessage(StringBuilder sb) { } @Override - public String getWorkloadDescription(String optsPrefix, String optsSuffix) { - StringBuilder sb = new StringBuilder(); - sb.append(optsPrefix); - sb.append("Sample user id app built on Cassandra. The app writes out 1M unique user ids"); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("each with a string password. There are multiple readers and writers that update"); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("these user ids and passwords them indefinitely. Note that the number of reads and"); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("writes to perform can be specified as a parameter."); - sb.append(optsSuffix); - return sb.toString(); + public List getWorkloadDescription() { + return Arrays.asList( + "Sample user id app built on Cassandra. The app writes out 1M unique user ids", + "each with a string password. There are multiple readers and writers that update", + "these user ids and passwords them indefinitely. Note that the number of reads and", + "writes to perform can be specified as a parameter."); } } diff --git a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/RedisHashPipelined.java b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/RedisHashPipelined.java index 64b11a1e40e5..e786fb7f48fe 100644 --- a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/RedisHashPipelined.java +++ b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/RedisHashPipelined.java @@ -19,6 +19,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Vector; import java.util.concurrent.Callable; import org.apache.commons.math3.distribution.AbstractIntegerDistribution; @@ -277,36 +278,19 @@ public Integer call() throws Exception { } @Override - public String getWorkloadDescription(String optsPrefix, String optsSuffix) { - StringBuilder sb = new StringBuilder(); - sb.append(optsPrefix); - sb.append("Sample redis hash-map based app built on RedisPipelined."); - sb.append(optsSuffix); - return sb.toString(); + public List getWorkloadDescription() { + return Arrays.asList("Sample redis hash-map based app built on RedisPipelined."); } @Override - public String getExampleUsageOptions(String optsPrefix, String optsSuffix) { - StringBuilder sb = new StringBuilder(); - sb.append(super.getExampleUsageOptions(optsPrefix, optsSuffix)); - sb.append(optsPrefix); - sb.append("--num_subkeys_per_key " + appConfig.numSubkeysPerKey); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--num_subkeys_per_write " + appConfig.numSubkeysPerWrite); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--num_subkeys_per_read " + appConfig.numSubkeysPerRead); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--key_freq_zipf_exponent " + appConfig.keyUpdateFreqZipfExponent); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--subkey_freq_zipf_exponent " + appConfig.subkeyUpdateFreqZipfExponent); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--subkey_value_size_zipf_exponent " + appConfig.valueSizeZipfExponent); - sb.append(optsSuffix); - return sb.toString(); + public List getExampleUsageOptions() { + Vector usage = new Vector(super.getExampleUsageOptions()); + usage.add("--num_subkeys_per_key " + appConfig.numSubkeysPerKey); + usage.add("--num_subkeys_per_write " + appConfig.numSubkeysPerWrite); + usage.add("--num_subkeys_per_read " + appConfig.numSubkeysPerRead); + usage.add("--key_freq_zipf_exponent " + appConfig.keyUpdateFreqZipfExponent); + usage.add("--subkey_freq_zipf_exponent " + appConfig.subkeyUpdateFreqZipfExponent); + usage.add("--subkey_value_size_zipf_exponent " + appConfig.valueSizeZipfExponent); + return usage; } } diff --git a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/RedisKeyValue.java b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/RedisKeyValue.java index da68e473f64e..82021e3b4ba4 100644 --- a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/RedisKeyValue.java +++ b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/RedisKeyValue.java @@ -17,6 +17,8 @@ import com.yugabyte.sample.common.SimpleLoadGenerator.Key; +import java.util.Arrays; +import java.util.List; import java.util.Random; /** @@ -81,41 +83,21 @@ public long doWrite() { } @Override - public String getWorkloadDescription(String optsPrefix, String optsSuffix) { - StringBuilder sb = new StringBuilder(); - sb.append(optsPrefix); - sb.append("Sample key-value app built on Redis. The app writes out 1M unique string keys each"); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("with a string value. There are multiple readers and writers that update these keys"); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("and read them indefinitely. Note that the number of reads and writes to perform"); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("can be specified as a parameter."); - sb.append(optsSuffix); - return sb.toString(); + public List getWorkloadDescription() { + return Arrays.asList( + "Sample key-value app built on Redis. The app writes out 1M unique string keys each", + "with a string value. There are multiple readers and writers that update these keys", + "and read them indefinitely. Note that the number of reads and writes to perform", + "can be specified as a parameter."); } @Override - public String getExampleUsageOptions(String optsPrefix, String optsSuffix) { - StringBuilder sb = new StringBuilder(); - sb.append(optsPrefix); - sb.append("--num_unique_keys " + appConfig.numUniqueKeysToWrite); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--num_reads " + appConfig.numKeysToRead); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--num_writes " + appConfig.numKeysToWrite); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--num_threads_read " + appConfig.numReaderThreads); - sb.append(optsSuffix); - sb.append(optsPrefix); - sb.append("--num_threads_write " + appConfig.numWriterThreads); - sb.append(optsSuffix); - return sb.toString(); + public List getExampleUsageOptions() { + return Arrays.asList( + "--num_unique_keys " + appConfig.numUniqueKeysToWrite, + "--num_reads " + appConfig.numKeysToRead, + "--num_writes " + appConfig.numKeysToWrite, + "--num_threads_read " + appConfig.numReaderThreads, + "--num_threads_write " + appConfig.numWriterThreads); } } diff --git a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/RedisPipelinedKeyValue.java b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/RedisPipelinedKeyValue.java index 4ee312a7e374..5a2877648a65 100644 --- a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/RedisPipelinedKeyValue.java +++ b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/RedisPipelinedKeyValue.java @@ -20,7 +20,9 @@ import redis.clients.jedis.Response; import java.util.ArrayList; +import java.util.List; import java.util.Random; +import java.util.Vector; import java.util.concurrent.Callable; import java.util.zip.Adler32; import java.util.zip.Checksum; @@ -147,12 +149,9 @@ public Integer call() throws Exception { } @Override - public String getExampleUsageOptions(String optsPrefix, String optsSuffix) { - StringBuilder sb = new StringBuilder(); - sb.append(super.getExampleUsageOptions(optsPrefix, optsSuffix)); - sb.append(optsPrefix); - sb.append("--pipeline_length " + appConfig.redisPipelineLength); - sb.append(optsSuffix); - return sb.toString(); + public List getExampleUsageOptions() { + Vector usage = new Vector(super.getExampleUsageOptions()); + usage.add("--pipeline_length " + appConfig.redisPipelineLength); + return usage; } } diff --git a/java/yb-loadtester/src/main/java/com/yugabyte/sample/common/CmdLineOpts.java b/java/yb-loadtester/src/main/java/com/yugabyte/sample/common/CmdLineOpts.java index 207ab2b4b9c2..57822548e1fb 100644 --- a/java/yb-loadtester/src/main/java/com/yugabyte/sample/common/CmdLineOpts.java +++ b/java/yb-loadtester/src/main/java/com/yugabyte/sample/common/CmdLineOpts.java @@ -47,6 +47,7 @@ public static enum AppName { CassandraHelloWorld, CassandraKeyValue, CassandraBatchKeyValue, + CassandraTransactionalKeyValue, CassandraStockTicker, CassandraTimeseries, CassandraUserId, @@ -514,9 +515,9 @@ public static CmdLineOpts createFromArgs(String[] args) throws Exception { "chosen such that the expected mean is the value specified by --value_size. " + "If <= 0, all subkeys will have the value specified by --value_size"); options.addOption( - "subkey_value_max_size", true, - "[RedisHashPipelined] If using zipf distribution to choose value sizes, " + - "specifies an upper bound on the value sizes."); + "subkey_value_max_size", true, + "[RedisHashPipelined] If using zipf distribution to choose value sizes, " + + "specifies an upper bound on the value sizes."); CommandLineParser parser = new BasicParser(); CommandLine commandLine = null; @@ -575,9 +576,14 @@ private static void printUsage(Options options, String header) throws Exception } footer.append("\n"); - String description = workload.getWorkloadDescription("\t\t", "\n"); + List description = workload.getWorkloadDescription(); if (!description.isEmpty()) { - footer.append(description + "\n"); + for (String line : description) { + footer.append("\t\t"); + footer.append(line); + footer.append("\n"); + } + footer.append("\n"); } footer.append("\t\tUsage:\n"); footer.append(optsPrefix); @@ -587,7 +593,11 @@ private static void printUsage(Options options, String header) throws Exception footer.append(optsPrefix + "--nodes 127.0.0.1:" + port); footer.append("\n\n\t\tOther options (with default values):\n"); - footer.append(workload.getExampleUsageOptions(optsPrefix + "[ ", " ]\n")); + for (String line : workload.getExampleUsageOptions()) { + footer.append(optsPrefix + "[ "); + footer.append(line); + footer.append(" ]\n"); + } } footer.append("\n"); System.out.println(footer.toString()); diff --git a/src/yb/client/yb_op.cc b/src/yb/client/yb_op.cc index 9eefd426bd12..2816fb40a2fc 100644 --- a/src/yb/client/yb_op.cc +++ b/src/yb/client/yb_op.cc @@ -181,6 +181,63 @@ void YBqlWriteOp::SetHashCode(const uint16_t hash_code) { ql_write_request_->set_hash_code(hash_code); } +uint16_t YBqlWriteOp::GetHashCode() const { + return ql_write_request_->hash_code(); +} + +size_t YBqlWriteOp::Hash::operator() (const shared_ptr& op) const { + return op->GetHashCode(); +} + +namespace { + +bool WriteStaticColumns(const YBTable* table, const QLWriteRequestPB& request) { + const auto& schema = table->InternalSchema(); + for (const auto& col : request.column_values()) { + auto column = schema.column_by_id(ColumnId(col.column_id())); + CHECK_OK(column); + if (column->is_static()) { + return true; + } + } + return false; +} + +} // namespace + +bool YBqlWriteOp::Overlap::operator() (const shared_ptr& op1, + const shared_ptr& op2) const { + // Check if two write ops overlap that they apply to the same hash/primary key in the same table. + // TODO: do more fine-grained checks to see if the columns they write overlap or not. + if (op1->table() != op2->table() && op1->table()->id() != op2->table()->id()) { + return false; + } + const QLWriteRequestPB& req1 = op1->request(); + const QLWriteRequestPB& req2 = op2->request(); + if (req1.hashed_column_values_size() != req2.hashed_column_values_size()) { + return false; + } + for (int i = 0; i < req1.hashed_column_values().size(); i++) { + DCHECK(req1.hashed_column_values()[i].has_value()); + DCHECK(req2.hashed_column_values()[i].has_value()); + if (req1.hashed_column_values()[i].value() != req2.hashed_column_values()[i].value()) + return false; + } + if (WriteStaticColumns(op1->table(), req1) && WriteStaticColumns(op2->table(), req2)) { + return true; + } + if (req1.range_column_values_size() != req2.range_column_values_size()) { + return false; + } + for (int i = 0; i < req1.range_column_values().size(); i++) { + DCHECK(req1.range_column_values()[i].has_value()); + DCHECK(req2.range_column_values()[i].has_value()); + if (req1.range_column_values()[i].value() != req2.range_column_values()[i].value()) + return false; + } + return true; +} + // YBqlReadOp ----------------------------------------------------------------- YBqlReadOp::YBqlReadOp(const shared_ptr& table) diff --git a/src/yb/client/yb_op.h b/src/yb/client/yb_op.h index cebceaabd1a8..b00a8c91c17a 100644 --- a/src/yb/client/yb_op.h +++ b/src/yb/client/yb_op.h @@ -241,8 +241,19 @@ class YBqlWriteOp : public YBqlOp { virtual void SetHashCode(uint16_t hash_code) override; + uint16_t GetHashCode() const; + virtual CHECKED_STATUS GetPartitionKey(std::string* partition_key) const override; + // Hash and equal functions to define a set of non-overlapped write operations. + struct Hash { + size_t operator() (const std::shared_ptr& op) const; + }; + struct Overlap { + bool operator() (const std::shared_ptr& op1, + const std::shared_ptr& op2) const; + }; + protected: virtual Type type() const override { return QL_WRITE; diff --git a/src/yb/client/yb_table_name.h b/src/yb/client/yb_table_name.h index 93cc61416a82..29aab34bb118 100644 --- a/src/yb/client/yb_table_name.h +++ b/src/yb/client/yb_table_name.h @@ -65,6 +65,10 @@ class YBTableName { YBTableName(YBTableName&& name) : namespace_name_(std::move(name.namespace_name_)), table_name_(std::move(name.table_name_)) {} + bool empty() const { + return namespace_name_.empty() && table_name_.empty(); + } + bool has_namespace() const { return !namespace_name_.empty(); } diff --git a/src/yb/common/ql_scanspec.cc b/src/yb/common/ql_scanspec.cc index 7ff217a491ed..68c1b96ce0e8 100644 --- a/src/yb/common/ql_scanspec.cc +++ b/src/yb/common/ql_scanspec.cc @@ -203,7 +203,7 @@ QLScanRange& QLScanRange::operator&=(const QLScanRange& other) { auto& range = elem.second; const auto& other_range = other.ranges_.at(elem.first); - // Interact operation: + // Intersect operation: // - min_value = max(min_value, other_min_value) // - max_value = min(max_value, other_max_value) if (BothNotNull(range.min_value, other_range.min_value)) { diff --git a/src/yb/yql/cql/cqlserver/cql_message.cc b/src/yb/yql/cql/cqlserver/cql_message.cc index f2f67043f699..f3cae866d211 100644 --- a/src/yb/yql/cql/cqlserver/cql_message.cc +++ b/src/yb/yql/cql/cqlserver/cql_message.cc @@ -1495,16 +1495,22 @@ PreparedResultResponse::PreparedMetadata::PreparedMetadata() { PreparedResultResponse::PreparedMetadata::PreparedMetadata( const client::YBTableName& table_name, const std::vector& hash_col_indices, + const vector& bind_table_names, const vector& bind_variable_schemas) - : flags(kHasGlobalTableSpec), + : flags(table_name.empty() ? 0 : kHasGlobalTableSpec), global_table_spec(table_name.namespace_name(), table_name.table_name()) { this->pk_indices.reserve(hash_col_indices.size()); for (const size_t index : hash_col_indices) { this->pk_indices.emplace_back(static_cast(index)); } col_specs.reserve(bind_variable_schemas.size()); - for (const auto var : bind_variable_schemas) { - col_specs.emplace_back(var.name(), RowsMetadata::Type(var.type())); + for (int i = 0; i < bind_variable_schemas.size(); i++) { + const ColumnSchema& var = bind_variable_schemas[i]; + if (flags & kHasGlobalTableSpec) { + col_specs.emplace_back(var.name(), RowsMetadata::Type(var.type())); + } else { + col_specs.emplace_back(bind_table_names[i], var.name(), RowsMetadata::Type(var.type())); + } } } @@ -1516,7 +1522,7 @@ PreparedResultResponse::PreparedResultResponse( const CQLRequest& request, const QueryId& query_id, const ql::PreparedResult& result) : ResultResponse(request, Kind::PREPARED), query_id_(query_id), prepared_metadata_(result.table_name(), result.hash_col_indices(), - result.bind_variable_schemas()), + result.bind_table_names(), result.bind_variable_schemas()), rows_metadata_(!result.column_schemas().empty() ? RowsMetadata( result.table_name(), result.column_schemas(), diff --git a/src/yb/yql/cql/cqlserver/cql_message.h b/src/yb/yql/cql/cqlserver/cql_message.h index 7bf2450f6611..4f9077c01d25 100644 --- a/src/yb/yql/cql/cqlserver/cql_message.h +++ b/src/yb/yql/cql/cqlserver/cql_message.h @@ -677,8 +677,10 @@ class ResultResponse : public CQLResponse { std::string column; Type type; - ColSpec(std::string column, const Type& type) - : keyspace(""), table(""), column(column), type(type) {} + ColSpec(const client::YBTableName& table_name, const std::string& column, const Type& type) + : keyspace(table_name.namespace_name()), table(table_name.table_name()), + column(column), type(type) {} + ColSpec(const std::string& column, const Type& type) : column(column), type(type) {} }; int32_t col_count; std::vector col_specs; @@ -767,6 +769,7 @@ class PreparedResultResponse : public ResultResponse { PreparedMetadata(); PreparedMetadata( const client::YBTableName& table_name, const std::vector& hash_col_indices, + const std::vector& bind_table_names, const std::vector& bind_variable_schemas); }; diff --git a/src/yb/yql/cql/ql/exec/exec_context.cc b/src/yb/yql/cql/ql/exec/exec_context.cc index 68b1c4e79319..7a433ab5125e 100644 --- a/src/yb/yql/cql/ql/exec/exec_context.cc +++ b/src/yb/yql/cql/ql/exec/exec_context.cc @@ -27,11 +27,19 @@ ExecContext::ExecContext(const char *ql_stmt, QLEnv *ql_env) : ProcessContextBase(ql_stmt, stmt_len), parse_tree_(parse_tree), + tnode_(parse_tree->root().get()), params_(params), start_time_(MonoTime::Now()), - ql_env_(ql_env), - partitions_count_(0), - current_partition_index_(0) { + ql_env_(ql_env) { +} + +ExecContext::ExecContext(const ExecContext& exec_context, const TreeNode *tnode) + : ProcessContextBase(exec_context.stmt(), exec_context.stmt_len()), + parse_tree_(exec_context.parse_tree_), + tnode_(tnode), + params_(exec_context.params_), + start_time_(MonoTime::Now()), + ql_env_(exec_context.ql_env_) { } ExecContext::~ExecContext() { diff --git a/src/yb/yql/cql/ql/exec/exec_context.h b/src/yb/yql/cql/ql/exec/exec_context.h index 361b66af2d9e..9bd59d7001f2 100644 --- a/src/yb/yql/cql/ql/exec/exec_context.h +++ b/src/yb/yql/cql/ql/exec/exec_context.h @@ -41,6 +41,7 @@ class ExecContext : public ProcessContextBase { const ParseTree *parse_tree, const StatementParameters *params, QLEnv *ql_env); + ExecContext(const ExecContext& exec_context, const TreeNode *tnode); virtual ~ExecContext(); // Get a table creator from YB client. @@ -114,7 +115,7 @@ class ExecContext : public ProcessContextBase { // Returns the tree node of the statement being executed. const TreeNode* tnode() const { - return parse_tree_->root().get(); + return tnode_; } // Access function for params. @@ -188,6 +189,9 @@ class ExecContext : public ProcessContextBase { // Statement parse tree to execute. const ParseTree *parse_tree_; + // Tree node of the statement being executed. + const TreeNode* tnode_; + // Statement parameters to execute with. const StatementParameters *params_; @@ -207,8 +211,8 @@ class ExecContext : public ProcessContextBase { // partitions_count_ = 4 (i.e. [2,4,6], [2,5,6], [3,4,6], [4,5,6]). // current_partition_index_ starts from 0 unless set in the paging state. std::unique_ptr>> hash_values_options_; - uint64_t partitions_count_; - uint64_t current_partition_index_; + uint64_t partitions_count_ = 0; + uint64_t current_partition_index_ = 0; }; } // namespace ql diff --git a/src/yb/yql/cql/ql/exec/executor.cc b/src/yb/yql/cql/ql/exec/executor.cc index af9a5aef616f..82d3ed49011f 100644 --- a/src/yb/yql/cql/ql/exec/executor.cc +++ b/src/yb/yql/cql/ql/exec/executor.cc @@ -26,6 +26,7 @@ namespace ql { using std::string; using std::shared_ptr; +using namespace std::placeholders; using client::YBColumnSpec; using client::YBSchema; @@ -35,8 +36,9 @@ using client::YBTableCreator; using client::YBTableAlterer; using client::YBTableType; using client::YBTableName; -using client::YBqlWriteOp; using client::YBqlReadOp; +using client::YBqlWriteOp; +using client::YBqlWriteOpPtr; using strings::Substitute; //-------------------------------------------------------------------------------------------------- @@ -63,7 +65,7 @@ void Executor::ExecuteAsync(const string &ql_stmt, const ParseTree &parse_tree, if (PREDICT_FALSE(!s.ok())) { return StatementExecuted(s); } - if (!ql_env_->FlushAsync(&flush_async_cb_)) { + if (!FlushAsync()) { return StatementExecuted(Status::OK()); } } @@ -112,7 +114,7 @@ void Executor::ExecuteBatch(const std::string &ql_stmt, const ParseTree &parse_t void Executor::ApplyBatch() { // Invoke statement-executed callback when no async operation is pending. - if (!ql_env_->FlushAsync(&flush_async_cb_)) { + if (!FlushAsync()) { return StatementExecuted(Status::OK()); } } @@ -139,6 +141,9 @@ Status Executor::ExecTreeNode(const TreeNode *tnode) { return Status::OK(); } switch (tnode->opcode()) { + case TreeNodeOpcode::kPTListNode: + return ExecPTNode(static_cast(tnode)); + case TreeNodeOpcode::kPTCreateTable: FALLTHROUGH_INTENDED; case TreeNodeOpcode::kPTCreateIndex: return ExecPTNode(static_cast(tnode)); @@ -164,6 +169,12 @@ Status Executor::ExecTreeNode(const TreeNode *tnode) { case TreeNodeOpcode::kPTUpdateStmt: return ExecPTNode(static_cast(tnode)); + case TreeNodeOpcode::kPTStartTransaction: + return ExecPTNode(static_cast(tnode)); + + case TreeNodeOpcode::kPTCommit: + return ExecPTNode(static_cast(tnode)); + case TreeNodeOpcode::kPTTruncateStmt: return ExecPTNode(static_cast(tnode)); @@ -180,6 +191,17 @@ Status Executor::ExecTreeNode(const TreeNode *tnode) { //-------------------------------------------------------------------------------------------------- +Status Executor::ExecPTNode(const PTListNode *tnode) { + for (TreeNode::SharedPtr dml : tnode->node_list()) { + exec_contexts_.emplace_back(exec_contexts_.front(), dml.get()); + exec_context_ = &exec_contexts_.back(); + RETURN_NOT_OK(ProcessStatementStatus(*exec_context_->parse_tree(), ExecTreeNode(dml.get()))); + } + return Status::OK(); +} + +//-------------------------------------------------------------------------------------------------- + Status Executor::ExecPTNode(const PTCreateType *tnode) { YBTableName yb_name = tnode->yb_type_name(); @@ -698,7 +720,7 @@ Status Executor::ExecPTNode(const PTInsertStmt *tnode) { } // Apply the operator. - return exec_context_->Apply(insert_op); + return ApplyWriteOp(tnode, insert_op); } //-------------------------------------------------------------------------------------------------- @@ -742,7 +764,7 @@ Status Executor::ExecPTNode(const PTDeleteStmt *tnode) { } // Apply the operator. - return exec_context_->Apply(delete_op); + return ApplyWriteOp(tnode, delete_op); } //-------------------------------------------------------------------------------------------------- @@ -794,7 +816,21 @@ Status Executor::ExecPTNode(const PTUpdateStmt *tnode) { } // Apply the operator. - return exec_context_->Apply(update_op); + return ApplyWriteOp(tnode, update_op); +} + +//-------------------------------------------------------------------------------------------------- + +Status Executor::ExecPTNode(const PTStartTransaction *tnode) { + ql_env_->StartTransaction(tnode->isolation_level()); + return Status::OK(); +} + +//-------------------------------------------------------------------------------------------------- + +Status Executor::ExecPTNode(const PTCommit *tnode) { + // Commit happens after the write operations have been flushed and responded. + return Status::OK(); } //-------------------------------------------------------------------------------------------------- @@ -840,27 +876,87 @@ Status Executor::ExecPTNode(const PTUseKeyspace *tnode) { return Status::OK(); } +//-------------------------------------------------------------------------------------------------- + +bool Executor::FlushAsync() { + batched_write_ops_.clear(); + return ql_env_->FlushAsync(&flush_async_cb_); +} + void Executor::FlushAsyncDone(const Status &s) { Status ss = s; if (ss.ok()) { ss = ProcessAsyncResults(); - if (ss.ok() && exec_context_->tnode()->opcode() == TreeNodeOpcode::kPTSelectStmt) { - - ql_env_->Reset(); - ss = FetchMoreRowsIfNeeded(); - if (ss.ok()) { - if (ql_env_->FlushAsync(&flush_async_cb_)) { - return; - } else { - // Evaluate aggregate functions if they are selected. - ss = AggregateResultSets(); + if (ss.ok()) { + const TreeNode *last_stmt = exec_context_->tnode(); + + if (last_stmt->opcode() == TreeNodeOpcode::kPTSelectStmt) { + + ql_env_->Reset(); + ss = FetchMoreRowsIfNeeded(); + if (ss.ok()) { + if (FlushAsync()) { + return; + } else { + // Evaluate aggregate functions if they are selected. + ss = AggregateResultSets(); + } + } + } + + // Update the metrics for SELECT/INSERT/UPDATE/DELETE here after the ops have been completed + // but exclude the time to commit the transaction if any. + if (ql_metrics_ != nullptr) { + const MonoTime now = MonoTime::Now(); + for (const auto& exec_context : exec_contexts_) { + const auto delta_usec = (now - exec_context.start_time()).ToMicroseconds(); + switch (exec_context.tnode()->opcode()) { + case TreeNodeOpcode::kPTSelectStmt: + ql_metrics_->ql_select_->Increment(delta_usec); + break; + case TreeNodeOpcode::kPTInsertStmt: + ql_metrics_->ql_insert_->Increment(delta_usec); + break; + case TreeNodeOpcode::kPTUpdateStmt: + ql_metrics_->ql_update_->Increment(delta_usec); + break; + case TreeNodeOpcode::kPTDeleteStmt: + ql_metrics_->ql_delete_->Increment(delta_usec); + break; + default: + break; + } } } + + if (last_stmt->opcode() == TreeNodeOpcode::kPTCommit) { + ql_env_->CommitTransaction(std::bind(&Executor::CommitDone, this, _1)); + return; + } } } StatementExecuted(ss); } +//-------------------------------------------------------------------------------------------------- + +Status Executor::ApplyWriteOp(const TreeNode *tnode, const YBqlWriteOpPtr& op) { + if (!batched_write_ops_.insert(op).second) { + // TODO: defer multiple writes of the same row by separating them into batches and executing + // them one after another. + return exec_context_->Error(tnode, + "Multiple inserts, updates or deletes of the same row " + "are not supported yet", ErrorCode::EXEC_ERROR); + } + return exec_context_->Apply(op); +} + +//-------------------------------------------------------------------------------------------------- + +void Executor::CommitDone(const Status &s) { + StatementExecuted(s); +} + Status Executor::ProcessStatementStatus(const ParseTree &parse_tree, const Status& s) { if (PREDICT_FALSE(!s.ok() && s.IsQLError() && !parse_tree.reparsed())) { // If execution fails because the statement was analyzed with stale metadata cache, the @@ -901,10 +997,10 @@ Status Executor::ProcessOpResponse(client::YBqlOp* op, ExecContext* exec_context Status Executor::ProcessAsyncResults() { Status s, ss; for (auto& exec_context : exec_contexts_) { - if (exec_context.tnode() == nullptr) { - continue; // Skip empty statement. - } client::YBqlOp* op = exec_context.op().get(); + if (op == nullptr) { + continue; // Skip empty op. + } ss = ql_env_->GetOpError(op); if (PREDICT_FALSE(!ss.ok())) { // YBOperation returns not-found error when the tablet is not found. @@ -923,7 +1019,7 @@ Status Executor::ProcessAsyncResults() { return s; } -Status Executor::AppendResult(const ExecutedResult::SharedPtr& result) { +Status Executor::AppendResult(const RowsResult::SharedPtr& result) { if (result == nullptr) { return Status::OK(); } @@ -932,39 +1028,42 @@ Status Executor::AppendResult(const ExecutedResult::SharedPtr& result) { return Status::OK(); } CHECK(result_->type() == ExecutedResult::Type::ROWS); - CHECK(result->type() == ExecutedResult::Type::ROWS); - return std::static_pointer_cast(result_)->Append( - static_cast(*result)); + return std::static_pointer_cast(result_)->Append(*result); } void Executor::StatementExecuted(const Status& s) { // Update metrics for all statements executed. if (s.ok() && ql_metrics_ != nullptr) { + const MonoTime now = MonoTime::Now(); for (const auto& exec_context : exec_contexts_) { const TreeNode* tnode = exec_context.tnode(); if (tnode != nullptr) { - MonoDelta delta = MonoTime::Now().GetDeltaSince(exec_context.start_time()); + const auto delta_usec = (now - exec_context.start_time()).ToMicroseconds(); - ql_metrics_->time_to_execute_ql_query_->Increment(delta.ToMicroseconds()); + ql_metrics_->time_to_execute_ql_query_->Increment(delta_usec); switch (tnode->opcode()) { - case TreeNodeOpcode::kPTSelectStmt: - ql_metrics_->ql_select_->Increment(delta.ToMicroseconds()); - break; - case TreeNodeOpcode::kPTInsertStmt: - ql_metrics_->ql_insert_->Increment(delta.ToMicroseconds()); - break; - case TreeNodeOpcode::kPTUpdateStmt: - ql_metrics_->ql_update_->Increment(delta.ToMicroseconds()); - break; - case TreeNodeOpcode::kPTDeleteStmt: - ql_metrics_->ql_delete_->Increment(delta.ToMicroseconds()); + case TreeNodeOpcode::kPTSelectStmt: FALLTHROUGH_INTENDED; + case TreeNodeOpcode::kPTInsertStmt: FALLTHROUGH_INTENDED; + case TreeNodeOpcode::kPTUpdateStmt: FALLTHROUGH_INTENDED; + case TreeNodeOpcode::kPTDeleteStmt: FALLTHROUGH_INTENDED; + case TreeNodeOpcode::kPTListNode: + // The metrics for SELECT/INSERT/UPDATE/DELETE have been updated when the ops have + // been completed in FlushAsyncDone(). Exclude PTListNode also as we are interested + // in the metrics of its consistuent DMLs only. break; default: - ql_metrics_->ql_others_->Increment(delta.ToMicroseconds()); + ql_metrics_->ql_others_->Increment(delta_usec); + break; } } } + + const TreeNode* last_tnode = exec_context_->tnode(); + if (last_tnode != nullptr && last_tnode->opcode() == TreeNodeOpcode::kPTCommit) { + const MonoDelta delta = now - exec_contexts_.front().start_time(); + ql_metrics_->ql_transaction_->Increment(delta.ToMicroseconds()); + } } // Clean up and invoke statement-executed callback. @@ -977,6 +1076,7 @@ void Executor::StatementExecuted(const Status& s) { void Executor::Reset() { exec_contexts_.clear(); exec_context_ = nullptr; + batched_write_ops_.clear(); result_ = nullptr; cb_.Reset(); } diff --git a/src/yb/yql/cql/ql/exec/executor.h b/src/yb/yql/cql/ql/exec/executor.h index 70553ebadf4f..5a38baf4c8a1 100644 --- a/src/yb/yql/cql/ql/exec/executor.h +++ b/src/yb/yql/cql/ql/exec/executor.h @@ -18,6 +18,7 @@ #ifndef YB_YQL_CQL_QL_EXEC_EXECUTOR_H_ #define YB_YQL_CQL_QL_EXEC_EXECUTOR_H_ +#include "yb/client/yb_op.h" #include "yb/common/ql_expr.h" #include "yb/common/ql_rowblock.h" #include "yb/common/partial_row.h" @@ -33,6 +34,7 @@ #include "yb/yql/cql/ql/ptree/pt_insert.h" #include "yb/yql/cql/ql/ptree/pt_delete.h" #include "yb/yql/cql/ql/ptree/pt_update.h" +#include "yb/yql/cql/ql/ptree/pt_transaction.h" #include "yb/yql/cql/ql/ptree/pt_truncate.h" #include "yb/yql/cql/ql/util/statement_params.h" #include "yb/yql/cql/ql/util/statement_result.h" @@ -81,16 +83,19 @@ class Executor : public QLExprExecutor { // Execute any TreeNode. This function determines how to execute a node. CHECKED_STATUS ExecTreeNode(const TreeNode *tnode); - // Creates a table (including index table for CREATE INDEX). + // Execute a list of statements. + CHECKED_STATUS ExecPTNode(const PTListNode *tnode); + + // Create a table (including index table for CREATE INDEX). CHECKED_STATUS ExecPTNode(const PTCreateTable *tnode); - // Alters a table. + // Alter a table. CHECKED_STATUS ExecPTNode(const PTAlterTable *tnode); - // Drops a table. + // Drop a table. CHECKED_STATUS ExecPTNode(const PTDropStmt *tnode); - // Creates a user-defined type; + // Create a user-defined type; CHECKED_STATUS ExecPTNode(const PTCreateType *tnode); // Select statement. @@ -108,18 +113,30 @@ class Executor : public QLExprExecutor { // Truncate statement. CHECKED_STATUS ExecPTNode(const PTTruncateStmt *tnode); - // Creates a keyspace. + // Start a transaction. + CHECKED_STATUS ExecPTNode(const PTStartTransaction *tnode); + + // Commit a transaction. + CHECKED_STATUS ExecPTNode(const PTCommit *tnode); + + // Create a keyspace. CHECKED_STATUS ExecPTNode(const PTCreateKeyspace *tnode); - // Uses a keyspace. + // Use a keyspace. CHECKED_STATUS ExecPTNode(const PTUseKeyspace *tnode); //------------------------------------------------------------------------------------------------ // Result processing. + // Flush operations that have been applied. + bool FlushAsync(); + // Callback for FlushAsync. void FlushAsyncDone(const Status& s); + // Callback for Commit. + void CommitDone(const Status& s); + // Process the status of executing a statement. CHECKED_STATUS ProcessStatementStatus(const ParseTree& parse_tree, const Status& s); @@ -130,7 +147,7 @@ class Executor : public QLExprExecutor { CHECKED_STATUS ProcessAsyncResults(); // Append execution result. - CHECKED_STATUS AppendResult(const ExecutedResult::SharedPtr& result); + CHECKED_STATUS AppendResult(const RowsResult::SharedPtr& result); // Continue a multi-partition select (e.g. table scan or query with 'IN' condition on hash cols). CHECKED_STATUS FetchMoreRowsIfNeeded(); @@ -250,6 +267,9 @@ class Executor : public QLExprExecutor { CHECKED_STATUS WhereSubColOpToPB(QLConditionPB *condition, const SubscriptedColumnOp& subcol_op); CHECKED_STATUS FuncOpToPB(QLConditionPB *condition, const FuncOp& func_op); + //------------------------------------------------------------------------------------------------ + CHECKED_STATUS ApplyWriteOp(const TreeNode *tnode, const client::YBqlWriteOpPtr& op); + //------------------------------------------------------------------------------------------------ // Environment (YBClient) for executing statements. QLEnv *ql_env_; @@ -261,6 +281,11 @@ class Executor : public QLExprExecutor { // Execution context of the last statement being executed. ExecContext* exec_context_; + // Set of write operations that have been applied. + std::unordered_set batched_write_ops_; + // Execution result. ExecutedResult::SharedPtr result_; diff --git a/src/yb/yql/cql/ql/parser/parse_context.cc b/src/yb/yql/cql/ql/parser/parse_context.cc index 1f6e474fa6a0..bc822e7555ac 100644 --- a/src/yb/yql/cql/ql/parser/parse_context.cc +++ b/src/yb/yql/cql/ql/parser/parse_context.cc @@ -68,16 +68,18 @@ size_t ParseContext::Read(char* buf, size_t max_size) { void ParseContext::GetBindVariables(MCVector *vars) { vars->clear(); - int64_t pos = 0; for (auto it = bind_variables_.cbegin(); it != bind_variables_.cend(); it++) { PTBindVar *var = *it; // Set the ordinal position of the bind variable in the statement also. if (var->is_unset_pos()) { - var->set_pos(pos); + var->set_pos(bind_pos_); } - pos++; vars->push_back(var); + bind_pos_++; } + // Once the current statement has copied the bind variables found in it, clear the bind vars + // before we process the next statement. + bind_variables_.clear(); } } // namespace ql diff --git a/src/yb/yql/cql/ql/parser/parse_context.h b/src/yb/yql/cql/ql/parser/parse_context.h index 18f57ae288f8..bd2ac1904f8d 100644 --- a/src/yb/yql/cql/ql/parser/parse_context.h +++ b/src/yb/yql/cql/ql/parser/parse_context.h @@ -92,6 +92,9 @@ class ParseContext : public ProcessContext { // statement. MCSet bind_variables_; + // Ordinal position for the next bind variable for the statement to be parsed. + int64_t bind_pos_ = 0; + //------------------------------------------------------------------------------------------------ // We don't use istream (i.e. file) as input when parsing. In the future, if we also support file // as an SQL input, we need to define a constructor that takes a file as input and initializes diff --git a/src/yb/yql/cql/ql/parser/parser_gram.y b/src/yb/yql/cql/ql/parser/parser_gram.y index 4b47461c5393..05c22a75df25 100644 --- a/src/yb/yql/cql/ql/parser/parser_gram.y +++ b/src/yb/yql/cql/ql/parser/parser_gram.y @@ -84,6 +84,7 @@ #include "yb/yql/cql/ql/ptree/pt_insert.h" #include "yb/yql/cql/ql/ptree/pt_delete.h" #include "yb/yql/cql/ql/ptree/pt_update.h" +#include "yb/yql/cql/ql/ptree/pt_transaction.h" #include "yb/gutil/macros.h" namespace yb { @@ -222,6 +223,9 @@ using namespace yb::ql; UpdateStmt set_target_list + // Begin / end transaction. + TransactionStmt + // Alter table. AlterTableStmt addColumnDef dropColumn alterProperty @@ -419,7 +423,7 @@ using namespace yb::ql; CreateFunctionStmt AlterFunctionStmt ReindexStmt RemoveAggrStmt RemoveFuncStmt RemoveOperStmt RenameStmt RevokeStmt RevokeRoleStmt RuleActionStmt RuleActionStmtOrEmpty RuleStmt - SecLabelStmt TransactionStmt + SecLabelStmt UnlistenStmt VacuumStmt VariableResetStmt VariableSetStmt VariableShowStmt ViewStmt CheckPointStmt CreateConversionStmt @@ -750,11 +754,21 @@ stmtblock: // The thrashing around here is to discard "empty" statements... stmtmulti: stmt { - $$ = MAKE_NODE(@1, PTListNode, $1); + if ($1 == nullptr) { + $$ = nullptr; + } else { + $$ = MAKE_NODE(@1, PTListNode, $1); + } } | stmtmulti ';' stmt { - $1->Append($3); - $$ = $1; + if ($3 == nullptr) { + $$ = $1; + } else if ($1 == nullptr) { + $$ = MAKE_NODE(@1, PTListNode, $3); + } else { + $1->Append($3); + $$ = $1; + } } ; @@ -810,6 +824,9 @@ stmt: } $$ = $1; } + | TransactionStmt { + $$ = $1; + } | inactive_stmt { // Report error that the syntax is not yet supported. PARSER_UNSUPPORTED(@1); @@ -5331,7 +5348,6 @@ inactive_stmt: | RevokeRoleStmt | RuleStmt | SecLabelStmt - | TransactionStmt | UnlistenStmt | VacuumStmt | VariableResetStmt @@ -8660,37 +8676,51 @@ UnlistenStmt: TransactionStmt: ABORT_P opt_transaction { + PARSER_UNSUPPORTED(@1); } | BEGIN_P opt_transaction transaction_mode_list_or_empty { + $$ = MAKE_NODE(@1, PTStartTransaction); } | START TRANSACTION transaction_mode_list_or_empty { + $$ = MAKE_NODE(@1, PTStartTransaction); } | COMMIT opt_transaction { + $$ = MAKE_NODE(@1, PTCommit); } | END_P opt_transaction { + $$ = MAKE_NODE(@1, PTCommit); } | ROLLBACK opt_transaction { + PARSER_UNSUPPORTED(@1); } | SAVEPOINT ColId { + PARSER_UNSUPPORTED(@1); } | RELEASE SAVEPOINT ColId { + PARSER_UNSUPPORTED(@1); } | RELEASE ColId { + PARSER_UNSUPPORTED(@1); } | ROLLBACK opt_transaction TO SAVEPOINT ColId { + PARSER_UNSUPPORTED(@1); } | ROLLBACK opt_transaction TO ColId { + PARSER_UNSUPPORTED(@1); } | PREPARE TRANSACTION Sconst { + PARSER_UNSUPPORTED(@1); } | COMMIT PREPARED Sconst { + PARSER_UNSUPPORTED(@1); } | ROLLBACK PREPARED Sconst { + PARSER_UNSUPPORTED(@1); } ; opt_transaction: - WORK {} + WORK { PARSER_UNSUPPORTED(@1); } | TRANSACTION {} | /*EMPTY*/ {} ; @@ -8719,7 +8749,7 @@ transaction_mode_list: ; transaction_mode_list_or_empty: - transaction_mode_list { $$ = $1; } + transaction_mode_list { PARSER_UNSUPPORTED(@1); } | /* EMPTY */ { } ; diff --git a/src/yb/yql/cql/ql/ptree/CMakeLists.txt b/src/yb/yql/cql/ql/ptree/CMakeLists.txt index ab881178d53b..2f9cbf62cede 100644 --- a/src/yb/yql/cql/ql/ptree/CMakeLists.txt +++ b/src/yb/yql/cql/ql/ptree/CMakeLists.txt @@ -14,6 +14,7 @@ add_library(ql_ptree parse_tree.cc tree_node.cc + list_node.cc pt_type.cc pt_name.cc pt_property.cc @@ -34,6 +35,7 @@ add_library(ql_ptree pt_insert.cc pt_delete.cc pt_update.cc + pt_transaction.cc pt_expr.cc pt_bcall.cc pt_option.cc diff --git a/src/yb/yql/cql/ql/ptree/list_node.cc b/src/yb/yql/cql/ql/ptree/list_node.cc new file mode 100644 index 000000000000..f07833b774b4 --- /dev/null +++ b/src/yb/yql/cql/ql/ptree/list_node.cc @@ -0,0 +1,90 @@ +//-------------------------------------------------------------------------------------------------- +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// +// +// Implementation of a list of tree nodes. +//-------------------------------------------------------------------------------------------------- + +#include "yb/client/client.h" +#include "yb/yql/cql/ql/ptree/sem_context.h" +#include "yb/yql/cql/ql/ptree/list_node.h" +#include "yb/yql/cql/ql/ptree/pt_dml.h" +#include "yb/yql/cql/ql/ptree/pt_transaction.h" + +namespace yb { +namespace ql { + +//-------------------------------------------------------------------------------------------------- + +Status PTListNode::AnalyzeStatementBlock(SemContext *sem_context) { + // The only statement block we support currently is + // START TRANSACTION | BEGIN TRANSACTION; + // dml; + // ... + // COMMIT | END TRANSACTION; + + if (node_list().empty()) { + return Status::OK(); + } + + const TreeNode::SharedPtr front = node_list().front(); + if (front->opcode() != TreeNodeOpcode::kPTStartTransaction) { + return sem_context->Error(front, + "A transaction must be started at the beginning of a statement batch", + ErrorCode::CQL_STATEMENT_INVALID); + } + const TreeNode::SharedPtr back = node_list().back(); + if (back->opcode() != TreeNodeOpcode::kPTCommit) { + return sem_context->Error(back, + "A transaction must be committed at the end of a statement batch", + ErrorCode::CQL_STATEMENT_INVALID); + } + for (TreeNode::SharedPtr tnode : node_list()) { + const PTDmlStmt *dml = nullptr; + if (tnode != front && tnode != back) { + const auto opcode = tnode->opcode(); + if (opcode != TreeNodeOpcode::kPTInsertStmt && + opcode != TreeNodeOpcode::kPTUpdateStmt && + opcode != TreeNodeOpcode::kPTDeleteStmt) { + return sem_context->Error(tnode, + "Only insert, update, and delete statements are allowed in a " + "statement batch", + ErrorCode::CQL_STATEMENT_INVALID); + } + dml = static_cast(tnode.get()); + } + sem_context->Reset(); + RETURN_NOT_OK(tnode->Analyze(sem_context)); + if (dml != nullptr) { + if (!dml->table()->schema().table_properties().is_transactional()) { + return sem_context->Error(dml->table_loc(), + "Transactions are not enabled in the table", + ErrorCode::CQL_STATEMENT_INVALID); + } + if (dml->if_clause() != nullptr) { + return sem_context->Error(dml, + "Conditional DML not supported in a statement batch yet", + ErrorCode::CQL_STATEMENT_INVALID); + } + // Isolation level needs to be SNAPSHOT_ISOLATION if any of the DML requires a read. + if (!dml->column_refs().empty() || !dml->static_column_refs().empty()) { + const auto txn = std::static_pointer_cast(front); + txn->set_isolation_level(SNAPSHOT_ISOLATION); + } + } + } + return Status::OK(); +} + +} // namespace ql +} // namespace yb diff --git a/src/yb/yql/cql/ql/ptree/list_node.h b/src/yb/yql/cql/ql/ptree/list_node.h index 34828f4f142f..c26cf42e7f63 100644 --- a/src/yb/yql/cql/ql/ptree/list_node.h +++ b/src/yb/yql/cql/ql/ptree/list_node.h @@ -14,7 +14,7 @@ // // List Node Declaration. // -// This modules includes specifications for nodes that contain a list of tree node. +// This modules includes specifications for nodes that contain a list of tree nodes. //-------------------------------------------------------------------------------------------------- #ifndef YB_YQL_CQL_QL_PTREE_LIST_NODE_H_ @@ -172,7 +172,11 @@ class TreeListNode : public TreeNode { MCList> node_list_; }; -using PTListNode = TreeListNode<>; +class PTListNode : public TreeListNode<> { + public: + // Run semantics analysis on a statement block. + CHECKED_STATUS AnalyzeStatementBlock(SemContext *sem_context); +}; } // namespace ql } // namespace yb diff --git a/src/yb/yql/cql/ql/ptree/parse_tree.cc b/src/yb/yql/cql/ql/ptree/parse_tree.cc index 1a9b313d7d92..d8cbaade8fb4 100644 --- a/src/yb/yql/cql/ql/ptree/parse_tree.cc +++ b/src/yb/yql/cql/ql/ptree/parse_tree.cc @@ -57,23 +57,19 @@ CHECKED_STATUS ParseTree::Analyze(SemContext *sem_context) { return Status::OK(); } - // Restrict statement list to single statement only and hoist the statement to the root node. - if (root_->opcode() == TreeNodeOpcode::kPTListNode) { - const auto lnode = std::static_pointer_cast(root_); - switch (lnode->size()) { - case 0: - root_ = nullptr; - return Status::OK(); - case 1: - root_ = lnode->node_list().front(); - break; - default: - return sem_context->Error(root_, "Multi-statement list not supported yet", - ErrorCode::CQL_STATEMENT_INVALID); - } + DCHECK_EQ(root_->opcode(), TreeNodeOpcode::kPTListNode) << "statement list expected"; + const auto lnode = std::static_pointer_cast(root_); + switch (lnode->size()) { + case 0: + return sem_context->Error(lnode, "Unexpected empty statement list", + ErrorCode::SQL_STATEMENT_INVALID); + case 1: + // Hoist the statement to the root node. + root_ = lnode->node_list().front(); + return root_->Analyze(sem_context); + default: + return lnode->AnalyzeStatementBlock(sem_context); } - - return root_->Analyze(sem_context); } void ParseTree::AddAnalyzedTable(const client::YBTableName& table_name) { diff --git a/src/yb/yql/cql/ql/ptree/pt_alter_table.cc b/src/yb/yql/cql/ql/ptree/pt_alter_table.cc index a3e74ac70f1c..acfd788120eb 100644 --- a/src/yb/yql/cql/ql/ptree/pt_alter_table.cc +++ b/src/yb/yql/cql/ql/ptree/pt_alter_table.cc @@ -21,11 +21,6 @@ namespace yb { namespace ql { -using client::YBSchema; -using client::YBColumnSchema; -using client::YBTableName; -using client::YBTable; - //-------------------------------------------------------------------------------------------------- PTAlterTable::PTAlterTable(MemoryContext *memctx, diff --git a/src/yb/yql/cql/ql/ptree/pt_dml.cc b/src/yb/yql/cql/ql/ptree/pt_dml.cc index 2181ee40c277..45e20bc4ac6f 100644 --- a/src/yb/yql/cql/ql/ptree/pt_dml.cc +++ b/src/yb/yql/cql/ql/ptree/pt_dml.cc @@ -25,12 +25,6 @@ namespace yb { namespace ql { -using client::YBSchema; -using client::YBTable; -using client::YBTableType; -using client::YBTableName; -using client::YBColumnSchema; - const PTExpr::SharedPtr PTDmlStmt::kNullPointerRef = nullptr; PTDmlStmt::PTDmlStmt(MemoryContext *memctx, diff --git a/src/yb/yql/cql/ql/ptree/pt_insert.cc b/src/yb/yql/cql/ql/ptree/pt_insert.cc index 9a3569999105..7f1df8f7b3e0 100644 --- a/src/yb/yql/cql/ql/ptree/pt_insert.cc +++ b/src/yb/yql/cql/ql/ptree/pt_insert.cc @@ -24,11 +24,6 @@ namespace yb { namespace ql { -using client::YBSchema; -using client::YBTable; -using client::YBTableType; -using client::YBColumnSchema; - //-------------------------------------------------------------------------------------------------- PTInsertStmt::PTInsertStmt(MemoryContext *memctx, diff --git a/src/yb/yql/cql/ql/ptree/pt_select.cc b/src/yb/yql/cql/ql/ptree/pt_select.cc index 0178827f58c1..51a06c7f2d61 100644 --- a/src/yb/yql/cql/ql/ptree/pt_select.cc +++ b/src/yb/yql/cql/ql/ptree/pt_select.cc @@ -28,11 +28,6 @@ namespace ql { using std::make_shared; -using client::YBSchema; -using client::YBTable; -using client::YBTableType; -using client::YBColumnSchema; - //-------------------------------------------------------------------------------------------------- PTValues::PTValues(MemoryContext *memctx, diff --git a/src/yb/yql/cql/ql/ptree/pt_transaction.cc b/src/yb/yql/cql/ql/ptree/pt_transaction.cc new file mode 100644 index 000000000000..bfabdcffbb44 --- /dev/null +++ b/src/yb/yql/cql/ql/ptree/pt_transaction.cc @@ -0,0 +1,54 @@ +//-------------------------------------------------------------------------------------------------- +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// +// +// Treenode implementation for TRANSACTION statements. +//-------------------------------------------------------------------------------------------------- + +#include "yb/yql/cql/ql/ptree/sem_context.h" +#include "yb/yql/cql/ql/ptree/pt_transaction.h" +#include "yb/yql/cql/ql/ptree/pt_insert.h" +#include "yb/yql/cql/ql/ptree/pt_update.h" +#include "yb/yql/cql/ql/ptree/pt_delete.h" + +namespace yb { +namespace ql { + +//-------------------------------------------------------------------------------------------------- + +PTStartTransaction::PTStartTransaction(MemoryContext *memctx, YBLocation::SharedPtr loc) + : TreeNode(memctx, loc) { +} + +PTStartTransaction::~PTStartTransaction() { +} + +Status PTStartTransaction::Analyze(SemContext *sem_context) { + return Status::OK(); +} + +//-------------------------------------------------------------------------------------------------- + +PTCommit::PTCommit(MemoryContext *memctx, YBLocation::SharedPtr loc) + : TreeNode(memctx, loc) { +} + +PTCommit::~PTCommit() { +} + +Status PTCommit::Analyze(SemContext *sem_context) { + return Status::OK(); +} + +} // namespace ql +} // namespace yb diff --git a/src/yb/yql/cql/ql/ptree/pt_transaction.h b/src/yb/yql/cql/ql/ptree/pt_transaction.h new file mode 100644 index 000000000000..47e02b239a80 --- /dev/null +++ b/src/yb/yql/cql/ql/ptree/pt_transaction.h @@ -0,0 +1,99 @@ +//-------------------------------------------------------------------------------------------------- +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// +// +// Tree node definitions for TRANSACTION statements. +//-------------------------------------------------------------------------------------------------- + +#ifndef YB_YQL_CQL_QL_PTREE_PT_TRANSACTION_H_ +#define YB_YQL_CQL_QL_PTREE_PT_TRANSACTION_H_ + +#include "yb/yql/cql/ql/ptree/list_node.h" +#include "yb/yql/cql/ql/ptree/tree_node.h" +#include "yb/yql/cql/ql/ptree/pt_select.h" +#include "yb/yql/cql/ql/ptree/column_desc.h" +#include "yb/yql/cql/ql/ptree/pt_dml.h" + +namespace yb { +namespace ql { + +//-------------------------------------------------------------------------------------------------- + +class PTStartTransaction : public TreeNode { + public: + //------------------------------------------------------------------------------------------------ + // Public types. + typedef MCSharedPtr SharedPtr; + typedef MCSharedPtr SharedPtrConst; + + //------------------------------------------------------------------------------------------------ + // Constructor and destructor. + PTStartTransaction(MemoryContext *memctx, YBLocation::SharedPtr loc); + virtual ~PTStartTransaction(); + + template + inline static PTStartTransaction::SharedPtr MakeShared(MemoryContext *memctx, + TypeArgs&&... args) { + return MCMakeShared(memctx, std::forward(args)...); + } + + IsolationLevel isolation_level() const { + return isolation_level_; + } + void set_isolation_level(const IsolationLevel isolation_level) { + isolation_level_ = isolation_level; + } + + // Node semantics analysis. + virtual CHECKED_STATUS Analyze(SemContext *sem_context) override; + + // Node type. + virtual TreeNodeOpcode opcode() const override { + return TreeNodeOpcode::kPTStartTransaction; + } + + private: + IsolationLevel isolation_level_ = SERIALIZABLE_ISOLATION; +}; + +class PTCommit : public TreeNode { + public: + //------------------------------------------------------------------------------------------------ + // Public types. + typedef MCSharedPtr SharedPtr; + typedef MCSharedPtr SharedPtrConst; + + //------------------------------------------------------------------------------------------------ + // Constructor and destructor. + PTCommit(MemoryContext *memctx, YBLocation::SharedPtr loc); + virtual ~PTCommit(); + + template + inline static PTCommit::SharedPtr MakeShared(MemoryContext *memctx, + TypeArgs&&... args) { + return MCMakeShared(memctx, std::forward(args)...); + } + + // Node semantics analysis. + virtual CHECKED_STATUS Analyze(SemContext *sem_context) override; + + // Node type. + virtual TreeNodeOpcode opcode() const override { + return TreeNodeOpcode::kPTCommit; + } +}; + +} // namespace ql +} // namespace yb + +#endif // YB_YQL_CQL_QL_PTREE_PT_TRANSACTION_H_ diff --git a/src/yb/yql/cql/ql/ptree/sem_context.cc b/src/yb/yql/cql/ql/ptree/sem_context.cc index a26abc53de14..16e741ad32b8 100644 --- a/src/yb/yql/cql/ql/ptree/sem_context.cc +++ b/src/yb/yql/cql/ql/ptree/sem_context.cc @@ -251,6 +251,14 @@ const ColumnDesc *SemContext::GetColumnDesc(const MCString& col_name) { return entry->column_desc_; } +void SemContext::Reset() { + symtab_.clear(); + current_processing_id_ = SymbolEntry(); + current_dml_stmt_ = nullptr; + current_table_ = nullptr; + sem_state_ = nullptr; +} + //-------------------------------------------------------------------------------------------------- bool SemContext::IsConvertible(const std::shared_ptr& lhs_type, diff --git a/src/yb/yql/cql/ql/ptree/sem_context.h b/src/yb/yql/cql/ql/ptree/sem_context.h index 4b0325eeb873..7d3305afd304 100644 --- a/src/yb/yql/cql/ql/ptree/sem_context.h +++ b/src/yb/yql/cql/ql/ptree/sem_context.h @@ -263,6 +263,8 @@ class SemContext : public ProcessContext { current_table_ = table; } + void Reset(); + private: // Find symbol. SymbolEntry *SeekSymbol(const MCString& name); diff --git a/src/yb/yql/cql/ql/ptree/tree_node.h b/src/yb/yql/cql/ql/ptree/tree_node.h index e2c061d5261b..b7b047a42173 100644 --- a/src/yb/yql/cql/ql/ptree/tree_node.h +++ b/src/yb/yql/cql/ql/ptree/tree_node.h @@ -24,6 +24,7 @@ #include "yb/yql/cql/ql/ptree/yb_location.h" #include "yb/yql/cql/ql/ptree/pt_option.h" #include "yb/yql/cql/ql/util/errcodes.h" +#include "yb/util/enums.h" #include "yb/util/status.h" #include "yb/util/memory/mc_types.h" @@ -31,31 +32,33 @@ namespace yb { namespace ql { class SemContext; -enum class TreeNodeOpcode { - kNoOp = 0, - kTreeNode, - kPTListNode, - kPTCreateKeyspace, - kPTUseKeyspace, - kPTCreateTable, - kPTAlterTable, - kPTCreateType, - kPTCreateIndex, - kPTTruncateStmt, - kPTDropStmt, - kPTSelectStmt, - kPTInsertStmt, - kPTDeleteStmt, - kPTUpdateStmt, +YB_DEFINE_ENUM(TreeNodeOpcode, + ((kNoOp, 0)) + (kTreeNode) + (kPTListNode) + (kPTCreateKeyspace) + (kPTUseKeyspace) + (kPTCreateTable) + (kPTAlterTable) + (kPTCreateType) + (kPTCreateIndex) + (kPTTruncateStmt) + (kPTDropStmt) + (kPTSelectStmt) + (kPTInsertStmt) + (kPTDeleteStmt) + (kPTUpdateStmt) - // Expressions. - kPTExpr, - kPTRef, - kPTSubscript, - kPTAllColumns, - kPTAssign, - kPTBindVar, -}; + (kPTStartTransaction) + (kPTCommit) + + // Expressions. + (kPTExpr) + (kPTRef) + (kPTSubscript) + (kPTAllColumns) + (kPTAssign) + (kPTBindVar)); // TreeNode base class. class TreeNode : public MCBase { diff --git a/src/yb/yql/cql/ql/ql_processor.cc b/src/yb/yql/cql/ql/ql_processor.cc index 3bf5ce317476..f996c53a4829 100644 --- a/src/yb/yql/cql/ql/ql_processor.cc +++ b/src/yb/yql/cql/ql/ql_processor.cc @@ -37,25 +37,29 @@ METRIC_DEFINE_histogram( "Number of rounds to successfully parse a SQL query", 60000000LU, 2); METRIC_DEFINE_histogram( server, handler_latency_yb_cqlserver_SQLProcessor_SelectStmt, - "Time spent processing a Select stmt", yb::MetricUnit::kMicroseconds, - "Time spent processing a Select stmt", 60000000LU, 2); + "Time spent processing a SELECT statement", yb::MetricUnit::kMicroseconds, + "Time spent processing a SELECT statement", 60000000LU, 2); METRIC_DEFINE_histogram( server, handler_latency_yb_cqlserver_SQLProcessor_InsertStmt, - "Time spent processing a Insert stmt", yb::MetricUnit::kMicroseconds, - "Time spent processing a Insert stmt", 60000000LU, 2); + "Time spent processing an INSERT statement", yb::MetricUnit::kMicroseconds, + "Time spent processing an INSERT statement", 60000000LU, 2); METRIC_DEFINE_histogram( server, handler_latency_yb_cqlserver_SQLProcessor_UpdateStmt, - "Time spent processing a Update stmt", yb::MetricUnit::kMicroseconds, - "Time spent processing a Update stmt", 60000000LU, 2); + "Time spent processing an UPDATE statement", yb::MetricUnit::kMicroseconds, + "Time spent processing an UPDATE statement", 60000000LU, 2); METRIC_DEFINE_histogram( server, handler_latency_yb_cqlserver_SQLProcessor_DeleteStmt, - "Time spent processing a Delete stmt", yb::MetricUnit::kMicroseconds, - "Time spent processing a Delete stmt", 60000000LU, 2); + "Time spent processing a DELETE statement", yb::MetricUnit::kMicroseconds, + "Time spent processing a DELETE statement", 60000000LU, 2); METRIC_DEFINE_histogram( server, handler_latency_yb_cqlserver_SQLProcessor_OtherStmts, - "Time spent processing any stmt other than Select/Insert/Update/Delete", + "Time spent processing any statement other than SELECT/INSERT/UPDATE/DELETE", yb::MetricUnit::kMicroseconds, - "Time spent processing any stmt other than Select/Insert/Update/Delete", 60000000LU, 2); + "Time spent processing any statement other than SELECT/INSERT/UPDATE/DELETE", 60000000LU, 2); +METRIC_DEFINE_histogram( + server, handler_latency_yb_cqlserver_SQLProcessor_Transaction, + "Time spent processing a transaction", yb::MetricUnit::kMicroseconds, + "Time spent processing a transaction", 60000000LU, 2); METRIC_DEFINE_histogram( server, handler_latency_yb_cqlserver_SQLProcessor_ResponseSize, "Size of the returned response blob (in bytes)", yb::MetricUnit::kBytes, @@ -101,6 +105,8 @@ QLMetrics::QLMetrics(const scoped_refptr &metric_entity) { METRIC_handler_latency_yb_cqlserver_SQLProcessor_DeleteStmt.Instantiate(metric_entity); ql_others_ = METRIC_handler_latency_yb_cqlserver_SQLProcessor_OtherStmts.Instantiate(metric_entity); + ql_transaction_ = + METRIC_handler_latency_yb_cqlserver_SQLProcessor_Transaction.Instantiate(metric_entity); ql_response_size_bytes_ = METRIC_handler_latency_yb_cqlserver_SQLProcessor_ResponseSize.Instantiate(metric_entity); diff --git a/src/yb/yql/cql/ql/ql_processor.h b/src/yb/yql/cql/ql/ql_processor.h index 8ac3f533748a..52010c461f76 100644 --- a/src/yb/yql/cql/ql/ql_processor.h +++ b/src/yb/yql/cql/ql/ql_processor.h @@ -47,6 +47,7 @@ class QLMetrics { scoped_refptr ql_update_; scoped_refptr ql_delete_; scoped_refptr ql_others_; + scoped_refptr ql_transaction_; scoped_refptr ql_response_size_bytes_; }; diff --git a/src/yb/yql/cql/ql/statement.cc b/src/yb/yql/cql/ql/statement.cc index e4656a65caa2..91f3a6dd1be0 100644 --- a/src/yb/yql/cql/ql/statement.cc +++ b/src/yb/yql/cql/ql/statement.cc @@ -58,6 +58,9 @@ Status Statement::Prepare( case TreeNodeOpcode::kPTDeleteStmt: result->reset(new PreparedResult(static_cast(stmt))); break; + case TreeNodeOpcode::kPTListNode: + result->reset(new PreparedResult(static_cast(stmt))); + break; default: break; } diff --git a/src/yb/yql/cql/ql/util/ql_env.cc b/src/yb/yql/cql/ql/util/ql_env.cc index 025458defe25..2061b55aeb95 100644 --- a/src/yb/yql/cql/ql/util/ql_env.cc +++ b/src/yb/yql/cql/ql/util/ql_env.cc @@ -18,10 +18,12 @@ #include "yb/yql/cql/ql/util/ql_env.h" #include "yb/client/callbacks.h" #include "yb/client/client.h" +#include "yb/client/transaction.h" #include "yb/client/yb_op.h" #include "yb/master/catalog_manager.h" #include "yb/rpc/messenger.h" +#include "yb/server/hybrid_clock.h" #include "yb/util/trace.h" using namespace std::literals; @@ -33,12 +35,15 @@ using std::string; using std::shared_ptr; using std::weak_ptr; +using client::CommitCallback; +using client::TransactionManager; using client::YBClient; using client::YBError; using client::YBOperation; using client::YBSession; using client::YBStatusMemberCallback; using client::YBTable; +using client::YBTransaction; using client::YBMetaDataCache; using client::YBTableCreator; using client::YBTableAlterer; @@ -102,6 +107,26 @@ void QLEnv::SetCurrentCall(rpc::InboundCallPtr cql_call) { current_call_ = std::move(cql_call); } +void QLEnv::StartTransaction(const IsolationLevel isolation_level) { + if (transaction_manager_ == nullptr) { + server::ClockPtr clock(new server::HybridClock()); + CHECK_OK(clock->Init()); + transaction_manager_ = std::make_unique(client_, clock); + } + transaction_ = std::make_shared(transaction_manager_.get(), isolation_level); + session_->SetTransaction(transaction_); +} + +void QLEnv::CommitTransaction(CommitCallback callback) { + if (!transaction_) { + LOG(DFATAL) << "No transaction to commit"; + return; + } + shared_ptr transaction = std::move(transaction_); + transaction->Commit(std::move(callback)); + session_->SetTransaction(nullptr); +} + CHECKED_STATUS QLEnv::Apply(std::shared_ptr op) { has_session_operations_ = true; @@ -208,10 +233,16 @@ void QLEnv::RemoveCachedUDType(const std::string& keyspace_name, const std::stri } void QLEnv::Reset() { + session_->Abort(); has_session_operations_ = false; requested_callback_ = nullptr; flush_status_ = Status::OK(); op_errors_.clear(); + if (transaction_ != nullptr) { + shared_ptr transaction = std::move(transaction_); + transaction->Abort(); + session_->SetTransaction(nullptr); + } } Status QLEnv::CreateKeyspace(const std::string& keyspace_name) { diff --git a/src/yb/yql/cql/ql/util/ql_env.h b/src/yb/yql/cql/ql/util/ql_env.h index 7f5de4cc15ff..4a780168d706 100644 --- a/src/yb/yql/cql/ql/util/ql_env.h +++ b/src/yb/yql/cql/ql/util/ql_env.h @@ -22,6 +22,8 @@ #define YB_YQL_CQL_QL_UTIL_QL_ENV_H_ #include "yb/client/callbacks.h" +#include "yb/client/transaction.h" +#include "yb/client/transaction_manager.h" #include "yb/gutil/callback.h" @@ -77,6 +79,12 @@ class QLEnv { // Abort the batched ops. virtual void AbortOps(); + // Start a distributed transaction. + void StartTransaction(IsolationLevel isolation_level); + + // Commit the current distributed transaction. + void CommitTransaction(client::CommitCallback callback); + virtual std::shared_ptr GetTableDesc( const client::YBTableName& table_name, bool *cache_used); @@ -145,6 +153,12 @@ class QLEnv { // YBSession to apply operations. std::shared_ptr session_; + // Transaction manager to create distributed transactions. + std::unique_ptr transaction_manager_; + + // Current distributed transaction if present. + std::shared_ptr transaction_; + bool has_session_operations_ = false; // Messenger used to requeue the CQL call upon callback. diff --git a/src/yb/yql/cql/ql/util/statement_result.cc b/src/yb/yql/cql/ql/util/statement_result.cc index d784a79fb739..8cace31320ba 100644 --- a/src/yb/yql/cql/ql/util/statement_result.cc +++ b/src/yb/yql/cql/ql/util/statement_result.cc @@ -40,18 +40,25 @@ using client::YBOperation; using client::YBqlOp; using client::YBqlReadOp; using client::YBqlWriteOp; +using client::YBTableName; //------------------------------------------------------------------------------------------------ namespace { // Get bind column schemas for DML. -vector GetBindVariableSchemasFromDmlStmt(const PTDmlStmt& stmt) { - vector bind_variable_schemas; - bind_variable_schemas.reserve(stmt.bind_variables().size()); +void GetBindVariableSchemasFromDmlStmt(const PTDmlStmt& stmt, + vector* schemas, + vector* table_names = nullptr) { + schemas->reserve(schemas->size() + stmt.bind_variables().size()); + if (table_names != nullptr) { + table_names->reserve(table_names->size() + stmt.bind_variables().size()); + } for (const PTBindVar *var : stmt.bind_variables()) { - bind_variable_schemas.emplace_back(string(var->name()->c_str()), var->ql_type()); + schemas->emplace_back(string(var->name()->c_str()), var->ql_type()); + if (table_names != nullptr) { + table_names->emplace_back(stmt.table()->name()); + } } - return bind_variable_schemas; } shared_ptr> GetColumnSchemasFromOp(const YBqlOp& op, const PTDmlStmt *tnode) { @@ -109,13 +116,33 @@ QLClient GetClientFromOp(const YBqlOp& op) { PreparedResult::PreparedResult(const PTDmlStmt& stmt) : table_name_(stmt.table()->name()), hash_col_indices_(stmt.hash_col_indices()), - bind_variable_schemas_(GetBindVariableSchemasFromDmlStmt(stmt)), column_schemas_(stmt.selected_schemas()) { + GetBindVariableSchemasFromDmlStmt(stmt, &bind_variable_schemas_); if (column_schemas_ == nullptr) { column_schemas_ = make_shared>(); } } +PreparedResult::PreparedResult(const PTListNode& stmt) + : column_schemas_(make_shared>()) { + for (TreeNode::SharedPtr tnode : stmt.node_list()) { + switch (tnode->opcode()) { + case TreeNodeOpcode::kPTInsertStmt: FALLTHROUGH_INTENDED; + case TreeNodeOpcode::kPTUpdateStmt: FALLTHROUGH_INTENDED; + case TreeNodeOpcode::kPTDeleteStmt: { + const auto& dml = static_cast(*tnode); + GetBindVariableSchemasFromDmlStmt(dml, &bind_variable_schemas_, &bind_table_names_); + if (hash_col_indices_.empty()) { + hash_col_indices_ = dml.hash_col_indices(); + } + break; + } + default: + break; + } + } +} + PreparedResult::~PreparedResult() { } diff --git a/src/yb/yql/cql/ql/util/statement_result.h b/src/yb/yql/cql/ql/util/statement_result.h index 139767796bb7..986f964778b5 100644 --- a/src/yb/yql/cql/ql/util/statement_result.h +++ b/src/yb/yql/cql/ql/util/statement_result.h @@ -31,6 +31,7 @@ namespace ql { // This module is included by a few outside classes, so we cannot include ptree header files here. // Use forward declaration. class PTDmlStmt; +class PTListNode; //------------------------------------------------------------------------------------------------ // Result of preparing a statement. Only DML statement will return a prepared result that describes @@ -44,18 +45,21 @@ class PreparedResult { // Constructors. explicit PreparedResult(const PTDmlStmt& stmt); + explicit PreparedResult(const PTListNode& stmt); virtual ~PreparedResult(); // Accessors. const client::YBTableName& table_name() const { return table_name_; } - const std::vector& hash_col_indices() const { return hash_col_indices_; } + const std::vector& bind_table_names() const { return bind_table_names_; } const std::vector& bind_variable_schemas() const { return bind_variable_schemas_; } + const std::vector& hash_col_indices() const { return hash_col_indices_; } const std::vector& column_schemas() const { return *column_schemas_; } private: const client::YBTableName table_name_; - const std::vector hash_col_indices_; - const std::vector bind_variable_schemas_; + std::vector bind_table_names_; + std::vector bind_variable_schemas_; + std::vector hash_col_indices_; std::shared_ptr> column_schemas_; };