Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some data is getting missed while using CassandraIO Read #375

Closed
nagamanikanta-sml opened this issue Apr 12, 2022 · 3 comments
Closed

Some data is getting missed while using CassandraIO Read #375

nagamanikanta-sml opened this issue Apr 12, 2022 · 3 comments
Labels
bug Something isn't working needs triage p2

Comments

@nagamanikanta-sml
Copy link

Related Template(s)

CassandraToBigtable.java

What happened?

We are doing a dataflow batch job to read data from a cassandra database and write it to bigtable.

We are using the CassandraToBigtable.java template and found that some the data is missing while reading from the Cassandra database.

Pipeline p = Pipeline.create(options);

// Create a factory method to inject the CassandraRowMapperFn to allow custom type mapping.
SerializableFunction<Session, Mapper> cassandraObjectMapperFactory = new CassandraRowMapperFactory(options.getCassandraTable(), options.getCassandraKeyspace());

CassandraIO.Read<Row> source =
    CassandraIO.<Row>read()
        .withHosts(hosts)
        .withPort(options.getCassandraPort())
        .withKeyspace(options.getCassandraKeyspace())
        .withTable(options.getCassandraTable())
        .withMapperFactoryFn(cassandraObjectMapperFactory)
        .withEntity(Row.class)
        .withCoder(SerializableCoder.of(Row.class));
        // .withEntity(User.class)
        // .withCoder(SerializableCoder.of(User.class));

After the above step we tried to print the data that is being read and found that some rows are missing, to be precise 15 rows ( first 5 and last 10) are missing out of 512 rows.

Beam Version

Newer than 2.35.0

Relevant log output

No response

@nagamanikanta-sml nagamanikanta-sml added bug Something isn't working needs triage p2 labels Apr 12, 2022
@KriKroff
Copy link

KriKroff commented Jun 3, 2022

@nagamanikanta-sml
I have just faced the same issue today.

Here is what i found: Bug was introduced here apache/beam@e12fc33, when a tokenRange is wrapping (minValue > maxValue), it used to create 2 requests like TOKEN >= minValue & TOKEN < maxValue.
e.g.: minValue=9191351645448603782, maxValue=-9168465915656840686

I plan to fill a bug and a test case today, and a fix next week.

Here are 2 possible workarounds:

  1. Downgrade apache beam to 2.33 (Safe option)
  2. Use the new readall introduced by the bug commit (Can be really faster than .read for small tables or if you can do manually your splits)
    You will need to define manually your tokenRanges
    e.g.:
// YOUR CODE
SerializableFunction<Session, Mapper> cassandraObjectMapperFactory = new CassandraRowMapperFactory(options.getCassandraTable(), options.getCassandraKeyspace());

CassandraIO.Read<Row> source =
    CassandraIO.<Row>read()
        .withHosts(hosts)
        .withPort(options.getCassandraPort())
        .withKeyspace(options.getCassandraKeyspace())
        .withTable(options.getCassandraTable())
        .withMapperFactoryFn(cassandraObjectMapperFactory)
        .withEntity(Row.class)
        .withCoder(SerializableCoder.of(Row.class))
// Changes starts HERE
// You will need to create your own ranges, here there will be no split
.withRingRanges(Set.of(RingRange.of(
                    BigInteger.valueOf(Long.MIN_VALUE),
                    BigInteger.valueOf(Long.MAX_VALUE)
                ))); 

        return input.apply("SourceCassandraRead", Create.of(source))
                   .setCoder(SerializableCoder.of(new TypeDescriptor<Read<Row>>() {
                   })).apply("ReadAll", CassandraIO.<Row>readAll().withCoder(SerializableCoder.of(Row.class)));

You can improve this workaround by copying CassandraIO code, and fix the tokenRanges.
Unfortunately, multiples classes are private.

Hope it will help you.

Edit:
Ticket created: https://issues.apache.org/jira/browse/BEAM-14558

@vermas2012
Copy link

I think the issue got fixed.

apache/beam#21786

@bvolpato
Copy link
Contributor

Thanks @vermas2012.

This bug appear to be fixed in Beam 2.41.0.

--

This issue has been stale for some time now. Please reopen it if there is a follow up or related questions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs triage p2
Projects
None yet
Development

No branches or pull requests

4 participants