Skip to content

Commit

Permalink
Fixes #445 ENG-3799 Invalid start_doc_key error with IN condition and…
Browse files Browse the repository at this point in the history
… paging state

Summary:
In some rare cases for multi-partition selects (IN conditions on the partition key)
with paging state, the partition index we needed to resume from (for the next fetch)
was not being set correctly leading to a mismatch between the scan start point (set
by the paging state) and the scan range (set by the partition key option).

Test Plan: TestPagingSelect.java

Reviewers: robert

Reviewed By: robert

Subscribers: kannan, yql

Differential Revision: https://phabricator.dev.yugabyte.com/D5349
  • Loading branch information
m-iancu committed Aug 17, 2018
1 parent 0186764 commit d355997
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 9 deletions.
8 changes: 5 additions & 3 deletions java/yb-cql/src/test/java/org/yb/cql/BaseCQLTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.exceptions.QueryValidationException;
import com.datastax.driver.core.exceptions.InvalidQueryException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -47,7 +46,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -491,8 +489,12 @@ protected void assertQuery(String stmt, String expectedColumns, String expectedR
}

protected void assertQuery(String stmt, Set<String> expectedRows) {
assertQuery(new SimpleStatement(stmt), expectedRows);
}

protected void assertQuery(Statement stmt, Set<String> expectedRows) {
ResultSet rs = session.execute(stmt);
HashSet<String> actualRows = new HashSet<String>();
HashSet<String> actualRows = new HashSet<>();
for (Row row : rs) {
actualRows.add(row.toString());
}
Expand Down
34 changes: 34 additions & 0 deletions java/yb-cql/src/test/java/org/yb/cql/TestPagingSelect.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
//
package org.yb.cql;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

import org.junit.Test;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
Expand Down Expand Up @@ -125,4 +128,35 @@ public void testIncompleteRangeColumns() throws Exception {
assertIncompleteRangeColumns("SELECT r1 FROM test_paging WHERE h = 0 AND r1 <= 10;",
"r1", TOTAL_ROWS);
}

@Test
public void testMultiPartitionSelect() throws Exception {
session.execute("CREATE TABLE test_in_paging (h1 int, h2 int, r int, v int, " +
"PRIMARY KEY ((h1, h2), r));");
PreparedStatement pstmt = session.prepare("INSERT INTO test_in_paging (h1, h2, r, v) " +
"VALUES (?, ?, ?, 0);");
BatchStatement batch = new BatchStatement();
for (Integer h1 = 0; h1 < 30; h1++) {
for (Integer h2 = 0; h2 < 30; h2++) {
for (Integer r = 0; r < 5; r++) {
batch.add(pstmt.bind(h1, h2, r));
}
}
}
session.execute(batch);

SimpleStatement stmt = new SimpleStatement("SELECT * FROM test_in_paging " +
"WHERE h1 IN (4,3) AND h2 = 1 " +
"AND r >= 2 and r <= 8");
stmt.setFetchSize(4);
Set<String> expectedRows = new HashSet<>();
String rowTemplate = "Row[%d, 1, %d, 0]";
for (int h1 = 3; h1 <= 4; h1++) {
for (int r = 2; r < 5; r++) {
expectedRows.add(String.format(rowTemplate, h1, r));
}
}
assertQuery(stmt, expectedRows);

}
}
24 changes: 18 additions & 6 deletions src/yb/yql/cql/ql/exec/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ Status Executor::ExecPTNode(const PTSelectStmt *tnode) {
}

// If we have several hash partitions (i.e. IN condition on hash columns) we initialize the
// start partition here, and then iteratively scan the rest in FetchMoreRowsIfNeeded.
// start partition here, and then iteratively scan the rest in FetchMoreRows.
// Otherwise, the request will already have the right hashed column values set.
TnodeContext& tnode_context = exec_context_->tnode_context();
if (tnode_context.UnreadPartitionsRemaining() > 0) {
Expand Down Expand Up @@ -785,7 +785,7 @@ Result<bool> Executor::FetchMoreRows(const PTSelectStmt* tnode,
TnodeContext* tnode_context,
ExecContext* exec_context) {
if (result_ == nullptr) {
return false;
return STATUS(InternalError, "Missing result for SELECT operation");
}

// Rows read so far: in this fetch, previous fetches (for paging selects), and in total.
Expand Down Expand Up @@ -842,22 +842,34 @@ Result<bool> Executor::FetchMoreRows(const PTSelectStmt* tnode,
return false;
}

// Sanity check that if we finished a partition the next partition/row key are empty.
// Otherwise we could start scanning the next partition from the wrong place.
DCHECK(current_params.next_partition_key().empty());
DCHECK(current_params.next_row_key().empty());

// Otherwise, we continue to the next partition.
tnode_context->AdvanceToNextPartition(op->mutable_request());
}

// If we reached the fetch limit (min of paging state and limit clause) we are done.
if (current_fetch_row_count >= fetch_limit) {

// If we reached the paging limit at the end of the previous partition for a multi-partition
// select the next fetch should continue directly from the current partition.
// We create a paging state here so that we can resume from the right partition.
if (finished_current_read_partition && op->request().return_paging_state()) {
// If we need to return a paging state to the user, we create it here so that we can resume from
// the exact place where we left off: partition index and primary key within that partition.
if (op->request().return_paging_state()) {
QLPagingStatePB paging_state;
paging_state.set_total_num_rows_read(total_row_count);
paging_state.set_total_rows_skipped(total_rows_skipped);
paging_state.set_table_id(tnode->table()->id());

// Set the partition to resume from. Relevant for multi-partition selects, i.e. with IN
// condition on the partition columns.
paging_state.set_next_partition_index(tnode_context->current_partition_index());

// Within a partition, set the exact primary key to resume from (if any).
paging_state.set_next_partition_key(current_params.next_partition_key());
paging_state.set_next_row_key(current_params.next_row_key());

current_result->set_paging_state(paging_state);
}

Expand Down

0 comments on commit d355997

Please sign in to comment.