Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

renames with preserve timestamps don't work together #91

Open
wtibbitts opened this issue Sep 29, 2022 · 4 comments
Open

renames with preserve timestamps don't work together #91

wtibbitts opened this issue Sep 29, 2022 · 4 comments
Labels
bug Something isn't working

Comments

@wtibbitts
Copy link

Renames was working for me when I pulled and built from about 10 days ago, but has stopped working since then. Schema after renames still shows correctly, but the error is the same as when I had the naming conflict prior to renames (see tokn_ttl type string of original schema, conflicted with generated tokn_ttl of type integer)

Config:

source:
  type: cassandra
  host: carbine01.######.com
  port: 9042
  keyspace: carbine
  table: login
  consistencyLevel: LOCAL_QUORUM
  preserveTimestamps: true
  splitCount: 256
  connections: 8
  fetchSize: 1000
target:
  type: scylla
  host: scylla-us-west-2-2a-0
  port: 9042
  keyspace: carbine
  table: login
  consistencyLevel: LOCAL_QUORUM
  connections: 16
  stripTrailingZerosForDecimals: false
savepoints:
  path: /opt/bitnami/spark/tmp/savepoints
  intervalSeconds: 300
renames:
  - from: tokn_ttl
    to: tokn_expir
skipTokenRanges: []
validation:
  compareTimestamps: true
  ttlToleranceMillis: 60000
  writetimeToleranceMillis: 1000
  failuresToFetch: 100
  floatingPointTolerance: 0.001
  timestampMsTolerance: 0

Run logs:

22/09/29 19:44:15 INFO Cassandra: TableDef(carbine,login,ArrayBuffer(ColumnDef(email,PartitionKeyColumn,VarCharType)),ArrayBuffer(),Stream(ColumnDef(account_id,RegularColumn,VarCharType), ColumnDef(details,RegularColumn,VarCharType), ColumnDef(last_updated_ts,RegularColumn,TimestampType), ColumnDef(password_hash,RegularColumn,VarCharType), ColumnDef(password_salt,RegularColumn,VarCharType), ColumnDef(scheme,RegularColumn,VarCharType), ColumnDef(tokn,RegularColumn,VarCharType), ColumnDef(tokn_ttl,RegularColumn,VarCharType)),Stream(),false,false,Map())
22/09/29 19:44:15 INFO Cassandra: Original schema loaded:
root
 |-- email: string (nullable = false)
 |-- account_id: string (nullable = true)
 |-- details: string (nullable = true)
 |-- last_updated_ts: timestamp (nullable = true)
 |-- password_hash: string (nullable = true)
 |-- password_salt: string (nullable = true)
 |-- scheme: string (nullable = true)
 |-- tokn: string (nullable = true)
 |-- tokn_ttl: string (nullable = true)

22/09/29 19:44:15 INFO Cassandra: ColumnRefs generated for selection:
22/09/29 19:44:15 INFO Cassandra: email
account_id
ttl(account_id)
writetime(account_id)
details
ttl(details)
writetime(details)
last_updated_ts
ttl(last_updated_ts)
writetime(last_updated_ts)
password_hash
ttl(password_hash)
writetime(password_hash)
password_salt
ttl(password_salt)
writetime(password_salt)
scheme
ttl(scheme)
writetime(scheme)
tokn
ttl(tokn)
writetime(tokn)
tokn_ttl
ttl(tokn_ttl)
writetime(tokn_ttl)
22/09/29 19:44:15 INFO Cassandra: Schema generated with TTLs and Writetimes:
root
 |-- email: string (nullable = false)
 |-- account_id: string (nullable = true)
 |-- account_id_ttl: integer (nullable = true)
 |-- account_id_writetime: long (nullable = true)
 |-- details: string (nullable = true)
 |-- details_ttl: integer (nullable = true)
 |-- details_writetime: long (nullable = true)
 |-- last_updated_ts: timestamp (nullable = true)
 |-- last_updated_ts_ttl: integer (nullable = true)
 |-- last_updated_ts_writetime: long (nullable = true)
 |-- password_hash: string (nullable = true)
 |-- password_hash_ttl: integer (nullable = true)
 |-- password_hash_writetime: long (nullable = true)
 |-- password_salt: string (nullable = true)
 |-- password_salt_ttl: integer (nullable = true)
 |-- password_salt_writetime: long (nullable = true)
 |-- scheme: string (nullable = true)
 |-- scheme_ttl: integer (nullable = true)
 |-- scheme_writetime: long (nullable = true)
 |-- tokn: string (nullable = true)
 |-- tokn_ttl: integer (nullable = true)
 |-- tokn_writetime: long (nullable = true)
 |-- tokn_ttl: string (nullable = true)
 |-- tokn_ttl_ttl: integer (nullable = true)
 |-- tokn_ttl_writetime: long (nullable = true)

22/09/29 19:44:16 INFO Cassandra: Schema that'll be used for writing to Scylla:
22/09/29 19:44:16 INFO Cassandra: root
 |-- email: string (nullable = false)
 |-- account_id: string (nullable = true)
 |-- details: string (nullable = true)
 |-- last_updated_ts: timestamp (nullable = true)
 |-- password_hash: string (nullable = true)
 |-- password_salt: string (nullable = true)
 |-- scheme: string (nullable = true)
 |-- tokn: string (nullable = true)
 |-- tokn_ttl: string (nullable = true)
 |-- ttl: integer (nullable = true)
 |-- writetime: long (nullable = true)

22/09/29 19:44:16 INFO migrator: Created source dataframe; resulting schema:
root
 |-- email: string (nullable = false)
 |-- account_id: string (nullable = true)
 |-- details: string (nullable = true)
 |-- last_updated_ts: timestamp (nullable = true)
 |-- password_hash: string (nullable = true)
 |-- password_salt: string (nullable = true)
 |-- scheme: string (nullable = true)
 |-- tokn: string (nullable = true)
 |-- tokn_ttl: string (nullable = true)
 |-- ttl: integer (nullable = true)
 |-- writetime: long (nullable = true)

22/09/29 19:44:16 INFO migrator: Installing SIGINT/TERM/USR2 handler. Send this to dump the current progress to a savepoint.
22/09/29 19:44:16 INFO migrator: Starting savepoint schedule; will write a savepoint every 300 seconds
22/09/29 19:44:16 INFO migrator: Created a savepoint config at /opt/bitnami/spark/tmp/savepoints/savepoint_1664480656.yaml due to schedule. Ranges added: Set()
22/09/29 19:44:17 INFO migrator: We need to transfer: 280 partitions in total
22/09/29 19:44:17 INFO migrator: All token ranges extracted from partitions size:3073
22/09/29 19:44:17 INFO migrator: Savepoints array defined, size of the array: 0
22/09/29 19:44:17 INFO migrator: Diff ... total diff of full ranges to savepoints is: 3073
22/09/29 19:44:17 INFO migrator: Starting write...
22/09/29 19:44:17 INFO Scylla: Using consistencyLevel [LOCAL_QUORUM] for TARGET based on target config [LOCAL_QUORUM]
22/09/29 19:44:17 INFO Scylla: Schema after renames:
22/09/29 19:44:17 INFO Scylla: root
 |-- email: string (nullable = false)
 |-- account_id: string (nullable = true)
 |-- details: string (nullable = true)
 |-- last_updated_ts: timestamp (nullable = true)
 |-- password_hash: string (nullable = true)
 |-- password_salt: string (nullable = true)
 |-- scheme: string (nullable = true)
 |-- tokn: string (nullable = true)
 |-- tokn_expiry: string (nullable = true)
 |-- ttl: integer (nullable = true)
 |-- writetime: long (nullable = true)
22/09/29 19:44:22 WARN TaskSetManager: Lost task 5.0 in stage 0.0 (TID 5, 172.21.5.60, executor 5): java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
	at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
	at org.apache.spark.sql.Row$class.getInt(Row.scala:223)
	at org.apache.spark.sql.catalyst.expressions.GenericRow.getInt(rows.scala:166)
	at com.scylladb.migrator.readers.Cassandra$$anonfun$11.apply(Cassandra.scala:108)
	at com.scylladb.migrator.readers.Cassandra$$anonfun$11.apply(Cassandra.scala:101)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
	at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at com.scylladb.migrator.readers.Cassandra$.explodeRow(Cassandra.scala:101)
	at com.scylladb.migrator.readers.Cassandra$$anonfun$adjustDataframeForTimestampPreservation$1.apply(Cassandra.scala:193)
	at com.scylladb.migrator.readers.Cassandra$$anonfun$adjustDataframeForTimestampPreservation$1.apply(Cassandra.scala:193)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
	at com.datastax.spark.connector.writer.GroupingBatchBuilder.hasNext(GroupingBatchBuilder.scala:100)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:30)
	at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$2.apply(TableWriter.scala:241)
	at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$2.apply(TableWriter.scala:210)
	at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
	at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
	at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:129)
	at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
	at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
	at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:188)
	at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:175)
	at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:38)
	at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:38)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

22/09/29 19:44:22 WARN TaskSetManager: Lost task 3.2 in stage 0.0 (TID 12, 172.21.4.113, executor 0): java.lang.ClassCastException
@tarzanek
Copy link
Contributor

tarzanek commented Sep 30, 2022

can you try disable preserve timestamps if it fixes your issue?
@jwsomis ?

I just want to see if ordering for preserve timestamp and column rewrite don't conflict

@wtibbitts
Copy link
Author

wtibbitts commented Sep 30, 2022

That did seem to resolve it, but that also seems to avoid this particular collision all together as it doesn't seem to generate the duplicate tokn_ttl like it did before. Which leads me to another question, we were using the renames to handle this collision, but would disabling preserveTimestamps be just as effective?

@tarzanek
Copy link
Contributor

tarzanek commented Oct 1, 2022

so this is not really a regression
but we have a bug when both rename and preserve timestamp is set

both affect schema generation in code
and migrator needs to do rename for ttl tokens too

@tarzanek tarzanek changed the title Regression on renames renames with preserve timestamps don't work together Oct 1, 2022
@wtibbitts
Copy link
Author

The only reason I consider it a regression is that it did work for me a couple weeks ago to have both preserveTimestamps and a rename, but I have not checked out an older version to re confirm that the same config that is broken now worked prior it is just from memory that nothing was changed.

@julienrf julienrf added the bug Something isn't working label Jul 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants