Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#520 Add ability to run incremental transformers and sinks #526

Merged
merged 26 commits into from
Jan 2, 2025

Conversation

yruslan
Copy link
Collaborator

@yruslan yruslan commented Dec 12, 2024

Closes #520

This PR adds the ability to specify increnental as the schedule type which would enable incremental transformers and sinks.

In order for a transformer or a sink to use a table from metastore in incremental way, the code should invoke metastore.getCurrentBatch() method instead of metastore.getTable(). metastore.getCurrentBatch() also works for normal batch pipelines.

  • When getCurrentBatch() is used with daily, weekly or monthly schedule, it returns data for the information date corresponding to the running job, same as invoking metastore.getTable("my_table", Some(infoDate), Some(infoDate)).
  • When getCurrentBatch() is used with incremental schedule, it returns only latests non-processed data. The offset management is used to keep tracked of processed data.
  • The column pramen_batchid is added automatically to output tables of ingested and transformed data in order to track offsets. The exception is metastore raw format, which keeps original files as they are, and so we can't add the pramen_batchid column to such tables.
  • The offsets manager updates the offsets only after output of transformers or sinks have succeeded. It does the update in transactional manner. But if update failed in the middle, duplicates are possible on next runs, so we can say that Pramen provides 'AT LEAST ONCE' semantics for incremental transformation pipelines.
  • Reruns are possible for full days to remove duplicates. But for incremental sinks, such ask Kafka sink duplicates still might happen.

Example offsets with one ingestion of raw files, one transient transformer, one normal transformer, and a sink:

Notification of the second run for a day:
Screenshot 2024-12-12 at 14 44 39

The corresponding offsets are:
Screenshot 2024-12-12 at 14 37 40

Offsets committed together are highlighted. They are committed atomically. Ether all or none. Note that transient jobs are committed only if the jobs that is calling it was successfully saved. This is so that no records are lost.

Records are as follows:

  • TEST_POC_raw minimum and maximum offsets for the raw file ingestion
  • TEST_POC_raw->TEST_POC_parquet for tracking the input of the conversion transient transformer
  • TEST_POC_parquet->TEST_POC_publish for tracking the input of the standardization transformer
  • TEST_POC_publish for tracking the output of the standardization transformer
  • TEST_POC_publish->TEST_POC_publish->kafka_avro for tracking the input of the sink
  • TEST_POC_publish->kafka_avro for tracking the output of the sink.

The table name in 'offsets' table should be bigger than in bookkeeping because there are virtual tables that take other table names as part of it.
Comment on lines +201 to +203
if (isDataQuery) {
df = SparkUtils.sanitizeDfColumns(df, jdbcReaderConfig.specialCharacters)
}
Copy link
Collaborator Author

@yruslan yruslan Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix for #398

Not part of incremental transformer PR, but fixes a bug that is fixed in 1.9.12 and ported here.

Copy link

github-actions bot commented Dec 12, 2024

Unit Test Coverage

Overall Project 84.22% -1.15% 🍏
Files changed 72.92%

Module Coverage
pramen:core Jacoco Report 84.97% -1.27%
pramen-extras Jacoco Report 76.75% 🍏
Files
Module File Coverage
pramen:core Jacoco Report ConversionTransformer.scala 100% 🍏
IdentityTransformer.scala 100% 🍏
TableReaderJdbcConfig.scala 100% -1.03% 🍏
TrackingTable.scala 100%
OrchestratorImpl.scala 100% 🍏
OffsetManagerUtils.scala 100% 🍏
MetastoreReaderBatchImpl.scala 100% 🍏
OffsetRecords.scala 100% 🍏
OffsetCommitRequest.scala 100%
MetaTable.scala 98.96% 🍏
TransformationJob.scala 96.54% 🍏
MetastorePersistenceRaw.scala 96.51% -2.33% 🍏
OffsetManagerCached.scala 96.21% -3.79% 🍏
OperationSplitter.scala 93.32% 🍏
LocalCsvSink.scala 93.22% 🍏
TransientTableManager.scala 92.74% 🍏
RawFileSource.scala 92.13% 🍏
JobBase.scala 91.42% 🍏
RuntimeConfig.scala 91.28% 🍏
PipelineNotificationBuilderHtml.scala 91.04% -0.9%
AppContextImpl.scala 90.79% 🍏
PythonTransformationJob.scala 90.52% -1.38%
ConcurrentJobRunnerImpl.scala 88.86% 🍏
SinkJob.scala 88.35% -7.91% 🍏
OffsetManagerJdbc.scala 87.96% 🍏
MetastoreImpl.scala 87.49% -4.24% 🍏
TaskRunnerBase.scala 85.54% 🍏
ReaderMode.scala 84.85%
TableReaderJdbc.scala 84.62% 🍏
MetastoreReaderIncrementalImpl.scala 83.23% -16.77% 🍏
IngestionJob.scala 81.1% 🍏
IncrementalIngestionJob.scala 80.98% -0.45% 🍏
MetastoreReaderBase.scala 77.93% -22.07%
BookkeeperJdbc.scala 56.51% 🍏
pramen-extras Jacoco Report EcsPipelineNotificationTarget.scala 91.45% 🍏
KafkaAvroSink.scala 0% 🍏

pramen/core/src/main/resources/reference.conf Outdated Show resolved Hide resolved
import za.co.absa.pramen.api.MetastoreReader

trait MetastoreReaderCore extends MetastoreReader {
def commitOutputTable(tableName: String, trackingName: String): Unit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rename the class to something like MetastoreReaderIncremental and the method to commitIncrementalOutputTable to make it clear that this trait is only for incremental processing

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent of the trait is that it should contain methods that are available to the framework only, not to user code. Yes, it has only incremental related methods now. But can extend in the future. But I agree, the name might be misleading. What about MetastoreReaderInternal?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But now that I have looked at the other suggestion (the one about encapsulation), it it does make sense to specialize this trait only for the incremental processing logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, for the future, I think MetastoreReaderInternal is more self-explanatory than MetastoreReaderCore. I thought it was some core functionality that is essential for the Metastore functionality, therefore I was confused 😆

val metastore = this

new MetastoreReader {
new MetastoreReaderCore {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be a clearer separation of concerns and encapsulation if you had two separate implementations, one for batch and one for the incremental processing. E.g. leave the original MetastoreReader impl as is, and then extend it for the incremental functionality, and override getCurrentBatch for the additional logic for the incremental case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion! I agree. The inline anonymous class created here is becoming way too big.

@yruslan yruslan merged commit cc0301e into main Jan 2, 2025
8 checks passed
@yruslan yruslan deleted the feature/520-incremental-processing-trafsformers branch January 2, 2025 16:29
@yruslan yruslan mentioned this pull request Jan 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Incremental transformations and sinks
2 participants