Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes [21715] - fix ring range read for CassandraIO read() #21786

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Token;
import java.math.BigInteger;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
Expand Down Expand Up @@ -59,27 +60,68 @@ public void processElement(@Element Read<T> read, OutputReceiver<T> receiver) {
for (RingRange rr : ringRanges) {
Token startToken = session.getCluster().getMetadata().newToken(rr.getStart().toString());
Token endToken = session.getCluster().getMetadata().newToken(rr.getEnd().toString());
ResultSet rs =
session.execute(preparedStatement.bind().setToken(0, startToken).setToken(1, endToken));
Iterator<T> iter = mapper.map(rs);
while (iter.hasNext()) {
T n = iter.next();
receiver.output(n);
if (rr.isWrapping()) {
// A wrapping range is one that overlaps from the end of the partitioner range and its
// start (ie : when the start token of the split is greater than the end token)
// We need to generate two queries here : one that goes from the start token to the end
// of
// the partitioner range, and the other from the start of the partitioner range to the
// end token of the split.
outputResults(
session.execute(getLowestSplitQuery(read, partitionKey, rr.getEnd())),
receiver,
mapper);
outputResults(
session.execute(getHighestSplitQuery(read, partitionKey, rr.getStart())),
receiver,
mapper);
} else {
ResultSet rs =
session.execute(
preparedStatement.bind().setToken(0, startToken).setToken(1, endToken));
outputResults(rs, receiver, mapper);
}
}

if (read.ringRanges() == null) {
ResultSet rs = session.execute(preparedStatement.bind());
Iterator<T> iter = mapper.map(rs);
while (iter.hasNext()) {
receiver.output(iter.next());
}
outputResults(rs, receiver, mapper);
}
} catch (Exception ex) {
LOG.error("error", ex);
}
}

private static <T> void outputResults(
ResultSet rs, OutputReceiver<T> outputReceiver, Mapper<T> mapper) {
Iterator<T> iter = mapper.map(rs);
while (iter.hasNext()) {
T n = iter.next();
outputReceiver.output(n);
}
}

private static String getHighestSplitQuery(
Read<?> spec, String partitionKey, BigInteger highest) {
String highestClause = String.format("(token(%s) >= %d)", partitionKey, highest);
String finalHighQuery =
(spec.query() == null)
? buildInitialQuery(spec, true) + highestClause
: spec.query() + " AND " + highestClause;
LOG.debug("CassandraIO generated a wrapAround query : {}", finalHighQuery);
return finalHighQuery;
}

private static String getLowestSplitQuery(Read<?> spec, String partitionKey, BigInteger lowest) {
String lowestClause = String.format("(token(%s) < %d)", partitionKey, lowest);
String finalLowQuery =
(spec.query() == null)
? buildInitialQuery(spec, true) + lowestClause
: spec.query() + " AND " + lowestClause;
LOG.debug("CassandraIO generated a wrapAround query : {}", finalLowQuery);
return finalLowQuery;
}

private static String generateRangeQuery(
Read<?> spec, String partitionKey, Boolean hasRingRange) {
final String rangeFilter =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public class CassandraIOTest implements Serializable {
private static final String CASSANDRA_KEYSPACE = "beam_ks";
private static final String CASSANDRA_HOST = "127.0.0.1";
private static final String CASSANDRA_TABLE = "scientist";
private static final String CASSANDRA_TABLE_SIMPLEDATA = "simpledata";
private static final Logger LOG = LoggerFactory.getLogger(CassandraIOTest.class);
private static final String STORAGE_SERVICE_MBEAN = "org.apache.cassandra.db:type=StorageService";
private static final int FLUSH_TIMEOUT = 30000;
Expand Down Expand Up @@ -190,6 +191,10 @@ private static void insertData() throws Exception {
"CREATE TABLE IF NOT EXISTS %s.%s(person_department text, person_id int, person_name text, PRIMARY KEY"
+ "((person_department), person_id));",
CASSANDRA_KEYSPACE, CASSANDRA_TABLE_WRITE));
session.execute(
String.format(
"CREATE TABLE IF NOT EXISTS %s.%s(id int, data text, PRIMARY KEY (id))",
CASSANDRA_KEYSPACE, CASSANDRA_TABLE_SIMPLEDATA));

LOG.info("Insert records");
String[][] scientists = {
Expand Down Expand Up @@ -221,6 +226,15 @@ private static void insertData() throws Exception {
CASSANDRA_TABLE);
session.execute(insertStr);
}
for (int i = 0; i < 100; i++) {
String insertStr =
String.format(
"INSERT INTO %s.%s(id, data) VALUES(" + i + ",' data_" + i + "');",
CASSANDRA_KEYSPACE,
CASSANDRA_TABLE_SIMPLEDATA);
session.execute(insertStr);
}

flushMemTablesAndRefreshSizeEstimates();
}

Expand Down Expand Up @@ -273,6 +287,35 @@ private static void disableAutoCompaction() throws Exception {
Thread.sleep(JMX_CONF_TIMEOUT);
}

/*
Since we have enough data we will be able to detect if any get put in the ring range that wraps around
*/
@Test
public void testWrapAroundRingRanges() throws Exception {
PCollection<SimpleData> simpledataPCollection =
pipeline.apply(
CassandraIO.<SimpleData>read()
.withHosts(Collections.singletonList(CASSANDRA_HOST))
.withPort(cassandraPort)
.withKeyspace(CASSANDRA_KEYSPACE)
.withTable(CASSANDRA_TABLE_SIMPLEDATA)
.withMinNumberOfSplits(50)
.withCoder(SerializableCoder.of(SimpleData.class))
.withEntity(SimpleData.class));
PCollection<Long> countPCollection = simpledataPCollection.apply("counting", Count.globally());
PAssert.that(countPCollection)
.satisfies(
i -> {
long total = 0;
for (Long aLong : i) {
total = total + aLong;
}
assertEquals(100, total);
return null;
});
pipeline.run();
}

@Test
public void testRead() throws Exception {
PCollection<Scientist> output =
Expand Down Expand Up @@ -654,6 +697,18 @@ public int hashCode() {
}
}

@Table(name = CASSANDRA_TABLE_SIMPLEDATA, keyspace = CASSANDRA_KEYSPACE)
static class SimpleData implements Serializable {
@PartitionKey int id;

@Column String data;

@Override
public String toString() {
return id + ", " + data;
}
}

private static RingRange fromEncodedKey(Metadata metadata, ByteBuffer... bb) {
BigInteger bi = BigInteger.valueOf((long) metadata.newToken(bb).getValue());
return RingRange.of(bi, bi.add(BigInteger.valueOf(1L)));
Expand Down