Skip to content

Commit

Permalink
ENG-2560: Initial multi-row transaction support in CQL
Browse files Browse the repository at this point in the history
Summary:
Support multi-row insert/update/delete in a distributed transaction in CQL using syntax:

```START TRANSACTION;
  dml_statement;
  ...
  dml_statement;
COMMIT;```

```BEGIN [ TRANSACTION ];
  dml_statement;
  ...
  dml_statement;
END [ TRANSACTION ];```

For example:

```cqlsh> create keyspace k;
cqlsh> use k;
cqlsh:k> create table t1 (k int primary key, c int) with transactions = {'enabled' : true};
cqlsh:k> create table t2 (k int primary key, c int) with transactions = {'enabled' : true};
cqlsh:k>
cqlsh:k> begin transaction;
     ... insert into t1 (k, c) values (1, 3);
     ... insert into t2 (k, c) values (2, 4);
     ... end transaction;
cqlsh:k>
cqlsh:k> select k, c, writetime(c) from t1;
 k | c | writetime(c)
---+---+------------------
 1 | 3 | 1515095484715909
(1 rows)
cqlsh:k> select k, c, writetime(c) from t2;
 k | c | writetime(c)
---+---+------------------
 2 | 4 | 1515095484715909
(1 rows)
cqlsh:k> begin transaction;
     ... insert into t1 (k, c) values (1, 5);
     ... insert into t2 (k, c) values (2, 6);
     ... end transaction;
cqlsh:k>
cqlsh:k> select k, c, writetime(c) from t1;
 k | c | writetime(c)
---+---+------------------
 1 | 5 | 1515095486606113
(1 rows)
cqlsh:k> select k, c, writetime(c) from t2;
 k | c | writetime(c)
---+---+------------------
 2 | 6 | 1515095486606113
(1 rows)```

Conditional DMLs are not supported yet.

Test Plan: TODO

Reviewers: mihnea, mikhail, sergei

Reviewed By: sergei

Subscribers: yql

Differential Revision: https://phabricator.dev.yugabyte.com/D3836
  • Loading branch information
robertpang committed Jan 17, 2018
1 parent 33dafb3 commit 901777a
Show file tree
Hide file tree
Showing 49 changed files with 1,385 additions and 489 deletions.
6 changes: 6 additions & 0 deletions java/yb-cql/src/test/java/org/yb/cql/BaseCQLTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import static org.junit.Assert.fail;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.SimpleStatement;
Expand Down Expand Up @@ -109,13 +111,17 @@ public static void setUpBeforeClass() throws Exception {
}

public Cluster.Builder getDefaultClusterBuilder() {
// Set default consistency level to strong consistency
QueryOptions queryOptions = new QueryOptions();
queryOptions.setConsistencyLevel(ConsistencyLevel.YB_STRONG);
// Set a long timeout for CQL queries since build servers might be really slow (especially Mac
// Mini).
SocketOptions socketOptions = new SocketOptions();
socketOptions.setReadTimeoutMillis(60 * 1000);
socketOptions.setConnectTimeoutMillis(60 * 1000);
return Cluster.builder()
.addContactPointsWithPorts(miniCluster.getCQLContactPoints())
.withQueryOptions(queryOptions)
.withSocketOptions(socketOptions);
}

Expand Down
228 changes: 228 additions & 0 deletions java/yb-cql/src/test/java/org/yb/cql/TestTransaction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//
package org.yb.cql;

import java.util.*;

import org.junit.Test;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertEquals;

import com.datastax.driver.core.Row;
import com.datastax.driver.core.PreparedStatement;

public class TestTransaction extends BaseCQLTest {

private void createTable(String name, String columns, boolean transactional) {
session.execute(String.format("create table %s (%s) with transactions = { 'enabled' : %b };",
name, columns, transactional));
}

private void createTables() throws Exception {
createTable("test_txn1", "k int primary key, c1 int, c2 text", true);
createTable("test_txn2", "k int primary key, c1 int, c2 text", true);
createTable("test_txn3", "k int primary key, c1 int, c2 text", true);
}

@Test
public void testInsertMultipleTables() throws Exception {

createTables();

// Insert into multiple tables and ensure all rows are written with same writetime.
session.execute("begin transaction;" +
"insert into test_txn1 (k, c1, c2) values (?, ?, ?);" +
"insert into test_txn2 (k, c1, c2) values (?, ?, ?);" +
"insert into test_txn3 (k, c1, c2) values (?, ?, ?);" +
"end transaction;",
Integer.valueOf(1), Integer.valueOf(1), "v1",
Integer.valueOf(2), Integer.valueOf(2), "v2",
Integer.valueOf(3), Integer.valueOf(3), "v3");

Vector<Row> rows = new Vector<Row>();
for (int i = 1; i <= 3; i++) {
rows.add(session.execute(String.format("select c1, c2, writetime(c1), writetime(c2) " +
"from test_txn%d where k = ?;", i),
Integer.valueOf(i)).one());
assertNotNull(rows.get(i - 1));
assertEquals(i, rows.get(i - 1).getInt("c1"));
assertEquals("v" + i, rows.get(i - 1).getString("c2"));
}

// Verify writetimes are same.
assertEquals(rows.get(0).getLong("writetime(c1)"), rows.get(1).getLong("writetime(c1)"));
assertEquals(rows.get(0).getLong("writetime(c1)"), rows.get(2).getLong("writetime(c1)"));
assertEquals(rows.get(0).getLong("writetime(c2)"), rows.get(1).getLong("writetime(c2)"));
assertEquals(rows.get(0).getLong("writetime(c2)"), rows.get(2).getLong("writetime(c2)"));
}

@Test
public void testInsertUpdateSameTable() throws Exception {

createTables();

// Insert multiple keys into same table and ensure all rows are written with same writetime.
session.execute("start transaction;" +
"insert into test_txn1 (k, c1, c2) values (?, ?, ?);" +
"insert into test_txn1 (k, c1, c2) values (?, ?, ?);" +
"update test_txn1 set c1 = ?, c2 = ? where k = ?;" +
"commit;",
Integer.valueOf(1), Integer.valueOf(1), "v1",
Integer.valueOf(2), Integer.valueOf(2), "v2",
Integer.valueOf(3), "v3", Integer.valueOf(3));

Vector<Row> rows = new Vector<Row>();
HashSet<String> values = new HashSet<String>();
for (Row row : session.execute("select c1, c2, writetime(c1), writetime(c2) " +
"from test_txn1 where k in ?;",
Arrays.asList(Integer.valueOf(1),
Integer.valueOf(2),
Integer.valueOf(3)))) {
rows.add(row);
values.add(row.getInt("c1") + "," + row.getString("c2"));
}
assertEquals(3, rows.size());
assertEquals(new HashSet<String>(Arrays.asList("1,v1", "2,v2", "3,v3")), values);

// Verify writetimes are same.
assertEquals(rows.get(0).getLong("writetime(c1)"), rows.get(1).getLong("writetime(c1)"));
assertEquals(rows.get(0).getLong("writetime(c1)"), rows.get(2).getLong("writetime(c1)"));
assertEquals(rows.get(0).getLong("writetime(c2)"), rows.get(1).getLong("writetime(c2)"));
assertEquals(rows.get(0).getLong("writetime(c2)"), rows.get(2).getLong("writetime(c2)"));
}

@Test
public void testMixDML() throws Exception {

createTables();

// Test non-transactional writes to transaction-enabled table.
for (int i = 1; i <= 2; i++) {
session.execute("insert into test_txn1 (k, c1, c2) values (?, ?, ?);",
Integer.valueOf(i), Integer.valueOf(i), "v" + i);
}
assertQuery("select * from test_txn1;",
new HashSet<String>(Arrays.asList("Row[1, 1, v1]", "Row[2, 2, v2]")));

// Test a mix of insert/update/delete in the same transaction.
session.execute("begin transaction;" +
"insert into test_txn1 (k, c1, c2) values (?, ?, ?);" +
"update test_txn1 set c1 = 0, c2 = 'v0' where k = ?;" +
"delete from test_txn1 where k = ?;" +
"end transaction;",
Integer.valueOf(3), Integer.valueOf(3), "v3",
Integer.valueOf(2),
Integer.valueOf(1));

// Verify the rows.
Vector<Row> rows = new Vector<Row>();
HashSet<String> values = new HashSet<String>();
for (Row row : session.execute("select k, c1, c2, writetime(c1), writetime(c2) " +
"from test_txn1 where k in ?;",
Arrays.asList(Integer.valueOf(1),
Integer.valueOf(2),
Integer.valueOf(3)))) {
rows.add(row);
values.add(row.getInt("k") + "," + row.getInt("c1") + "," + row.getString("c2"));
}
assertEquals(2, rows.size());
assertEquals(new HashSet<String>(Arrays.asList("2,0,v0", "3,3,v3")), values);

// Verify writetimes are same.
assertEquals(rows.get(0).getLong("writetime(c1)"), rows.get(1).getLong("writetime(c1)"));
assertEquals(rows.get(0).getLong("writetime(c2)"), rows.get(1).getLong("writetime(c2)"));
}

@Test
public void testPrepareStatement() throws Exception {

createTable("test_hash", "h1 int, h2 int, r int, c text, primary key ((h1, h2), r)", true);

// Prepare a transaction statement. Verify the hash key of the whole statement is the first
// insert statement that has the full hash key specified (third insert).
PreparedStatement stmt =
session.prepare("begin transaction;" +
"insert into test_hash (h1, h2, r, c) values (1, 1, ?, ?);" +
"insert into test_hash (h1, h2, r, c) values (?, 2, ?, ?);" +
"insert into test_hash (h1, h2, r, c) values (?, ?, ?, ?);" +
"end transaction;");
int hashIndexes[] = stmt.getRoutingKeyIndexes();
assertEquals(2, hashIndexes.length);
assertEquals(5, hashIndexes[0]);
assertEquals(6, hashIndexes[1]);

session.execute(stmt.bind(Integer.valueOf(1), "v1",
Integer.valueOf(2), Integer.valueOf(2), "v2",
Integer.valueOf(3), Integer.valueOf(3), Integer.valueOf(3), "v3"));

// Verify the rows.
Vector<Row> rows = new Vector<Row>();
HashSet<String> values = new HashSet<String>();
for (Row row : session.execute("select h1, h2, r, c, writetime(c) from test_hash;")) {
rows.add(row);
values.add(row.getInt("h1")+","+row.getInt("h2")+","+row.getInt("r")+","+row.getString("c"));
}
assertEquals(3, rows.size());
assertEquals(new HashSet<String>(Arrays.asList("1,1,1,v1",
"2,2,2,v2",
"3,3,3,v3")), values);
// Verify writetimes are same.
assertEquals(rows.get(0).getLong("writetime(c)"), rows.get(1).getLong("writetime(c)"));
assertEquals(rows.get(0).getLong("writetime(c)"), rows.get(2).getLong("writetime(c)"));
}

@Test
public void testInvalidStatements() throws Exception {
createTables();

// Missing "begin transaction"
runInvalidStmt("insert into test_txn1 (k, c1, c2) values (?, ?, ?);" +
"insert into test_txn2 (k, c1, c2) values (?, ?, ?);" +
"commit;");

// Missing "end transaction"
runInvalidStmt("begin transaction;" +
"insert into test_txn1 (k, c1, c2) values (?, ?, ?);" +
"insert into test_txn2 (k, c1, c2) values (?, ?, ?);");

// Missing "begin / end transaction"
runInvalidStmt("insert into test_txn1 (k, c1, c2) values (?, ?, ?);" +
"insert into test_txn2 (k, c1, c2) values (?, ?, ?);");

// Writing to non-transactional table
createTable("test_non_txn", "k int primary key, c1 int, c2 text", false);
runInvalidStmt("begin transaction;" +
"insert into test_txn1 (k, c1, c2) values (?, ?, ?);" +
"insert into test_non_txn (k, c1, c2) values (?, ?, ?);" +
"end transaction;");

// Conditional DML not supported yet
runInvalidStmt("begin transaction;" +
"insert into test_txn1 (k, c1, c2) values (?, ?, ?) if not exists;" +
"end transaction;");

// Multiple writes to the same row are not allowed
runInvalidStmt("begin transaction;" +
"insert into test_txn1 (k, c1, c2) values (1, 1, 'v1');" +
"delete from test_txn1 where k = 1;" +
"end transaction;");

// Multiple writes to the same static row are not allowed
createTable("test_static", "h int, r int, s int static, c int, primary key ((h), r)", true);
runInvalidStmt("begin transaction;" +
"insert into test_static (h, s) values (1, 1);" +
"insert into test_static (h, r, s, c) values (1, 2, 3, 4);" +
"end transaction;");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -383,24 +384,16 @@ public void appendMessage(StringBuilder sb) {
}

/**
*Method to print the description for the app.
* @param linePrefix : a prefix to be added to each line that is being printed.
* @param lineSuffix : a suffix to be added at the end of each line.
* @return the formatted description string.
* Returns the description of the app.
* @return the description splitted in lines.
*/
public String getWorkloadDescription(String linePrefix, String lineSuffix) {
return "";
}
public List<String> getWorkloadDescription() { return Collections.EMPTY_LIST; }

/**
* Method to pretty print the example usage for the app.
* @param linePrefix
* @param lineSuffix
* @return
* Returns the example usage of the app.
* @return the example usage splitted in lines.
*/
public String getExampleUsageOptions(String linePrefix, String lineSuffix) {
return "";
}
public List<String> getExampleUsageOptions() { return Collections.EMPTY_LIST; }


////////////// The following methods framework/helper methods for subclasses. ////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
package com.yugabyte.sample.apps;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;

import org.apache.log4j.Logger;

Expand Down Expand Up @@ -78,50 +80,24 @@ public long doWrite() {
}

@Override
public String getWorkloadDescription(String optsPrefix, String optsSuffix) {
StringBuilder sb = new StringBuilder();
sb.append(optsPrefix);
sb.append("Sample batch key-value app built on Cassandra. The app writes out 1M unique string");
sb.append(optsSuffix);
sb.append(optsPrefix);
sb.append("key in batches, each key with a string value. There are multiple readers and");
sb.append(optsSuffix);
sb.append(optsPrefix);
sb.append("writers that update these keys and read them indefinitely. Note that the batch");
sb.append(optsSuffix);
sb.append(optsPrefix);
sb.append("size and the number of reads and writes to perform can be specified as parameters.");
sb.append(optsSuffix);
return sb.toString();
public List<String> getWorkloadDescription() {
return Arrays.asList(
"Sample batch key-value app built on Cassandra. The app writes out 1M unique string",
"key in batches, each key with a string value. There are multiple readers and",
"writers that update these keys and read them indefinitely. Note that the batch",
"size and the number of reads and writes to perform can be specified as parameters.");
}

@Override
public String getExampleUsageOptions(String optsPrefix, String optsSuffix) {
StringBuilder sb = new StringBuilder();
sb.append(optsPrefix);
sb.append("--num_unique_keys " + appConfig.numUniqueKeysToWrite);
sb.append(optsSuffix);
sb.append(optsPrefix);
sb.append("--num_reads " + appConfig.numKeysToRead);
sb.append(optsSuffix);
sb.append(optsPrefix);
sb.append("--num_writes " + appConfig.numKeysToWrite);
sb.append(optsSuffix);
sb.append(optsPrefix);
sb.append("--value_size " + appConfig.valueSize);
sb.append(optsSuffix);
sb.append(optsPrefix);
sb.append("--num_threads_read " + appConfig.numReaderThreads);
sb.append(optsSuffix);
sb.append(optsPrefix);
sb.append("--num_threads_write " + appConfig.numWriterThreads);
sb.append(optsSuffix);
sb.append(optsPrefix);
sb.append("--batch_size " + appConfig.cassandraBatchSize);
sb.append(optsSuffix);
sb.append(optsPrefix);
sb.append("--table_ttl_seconds " + appConfig.tableTTLSeconds);
sb.append(optsSuffix);
return sb.toString();
public List<String> getExampleUsageOptions() {
return Arrays.asList(
"--num_unique_keys " + appConfig.numUniqueKeysToWrite,
"--num_reads " + appConfig.numKeysToRead,
"--num_writes " + appConfig.numKeysToWrite,
"--value_size " + appConfig.valueSize,
"--num_threads_read " + appConfig.numReaderThreads,
"--num_threads_write " + appConfig.numWriterThreads,
"--batch_size " + appConfig.cassandraBatchSize,
"--table_ttl_seconds " + appConfig.tableTTLSeconds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package com.yugabyte.sample.apps;

import java.util.Arrays;
import java.util.List;

import org.apache.log4j.Logger;
Expand Down Expand Up @@ -83,14 +84,9 @@ public void run() {
}

@Override
public String getWorkloadDescription(String optsPrefix, String optsSuffix) {
StringBuilder sb = new StringBuilder();
sb.append(optsPrefix);
sb.append("A very simple hello world app built on Cassandra. The app writes one employee row");
sb.append(optsSuffix);
sb.append(optsPrefix);
sb.append("into the 'Employee' table");
sb.append(optsSuffix);
return sb.toString();
public List<String> getWorkloadDescription() {
return Arrays.asList(
"A very simple hello world app built on Cassandra. The app writes one employee row",
"into the 'Employee' table");
}
}
Loading

0 comments on commit 901777a

Please sign in to comment.