-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#520 Add ability to run incremental transformers and sinks #526
#520 Add ability to run incremental transformers and sinks #526
Conversation
… match raw file ingestion.
…es consistently with the old behavior.
… job dependencies.
…st load on the database.
The table name in 'offsets' table should be bigger than in bookkeeping because there are virtual tables that take other table names as part of it.
if (isDataQuery) { | ||
df = SparkUtils.sanitizeDfColumns(df, jdbcReaderConfig.specialCharacters) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the fix for #398
Not part of incremental transformer PR, but fixes a bug that is fixed in 1.9.12
and ported here.
pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala
Outdated
Show resolved
Hide resolved
import za.co.absa.pramen.api.MetastoreReader | ||
|
||
trait MetastoreReaderCore extends MetastoreReader { | ||
def commitOutputTable(tableName: String, trackingName: String): Unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rename the class to something like MetastoreReaderIncremental
and the method to commitIncrementalOutputTable
to make it clear that this trait is only for incremental processing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intent of the trait is that it should contain methods that are available to the framework only, not to user code. Yes, it has only incremental related methods now. But can extend in the future. But I agree, the name might be misleading. What about MetastoreReaderInternal
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But now that I have looked at the other suggestion (the one about encapsulation), it it does make sense to specialize this trait only for the incremental processing logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, for the future, I think MetastoreReaderInternal
is more self-explanatory than MetastoreReaderCore
. I thought it was some core functionality that is essential for the Metastore functionality, therefore I was confused 😆
val metastore = this | ||
|
||
new MetastoreReader { | ||
new MetastoreReaderCore { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be a clearer separation of concerns and encapsulation if you had two separate implementations, one for batch and one for the incremental processing. E.g. leave the original MetastoreReader
impl as is, and then extend it for the incremental functionality, and override getCurrentBatch
for the additional logic for the incremental case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion! I agree. The inline anonymous class created here is becoming way too big.
pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala
Outdated
Show resolved
Hide resolved
pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala
Show resolved
Hide resolved
pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala
Outdated
Show resolved
Hide resolved
pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala
Outdated
Show resolved
Hide resolved
pramen/core/src/test/scala/za/co/absa/pramen/core/source/RawFileSourceSuite.scala
Outdated
Show resolved
Hide resolved
...en/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerUtilsSuite.scala
Outdated
Show resolved
Hide resolved
Co-authored-by: Kevin Wallimann <kevin.wallimann@absa.africa>
Closes #520
This PR adds the ability to specify
increnental
as the schedule type which would enable incremental transformers and sinks.In order for a transformer or a sink to use a table from metastore in incremental way, the code should invoke
metastore.getCurrentBatch()
method instead ofmetastore.getTable()
.metastore.getCurrentBatch()
also works for normal batch pipelines.getCurrentBatch()
is used with daily, weekly or monthly schedule, it returns data for the information date corresponding to the running job, same as invokingmetastore.getTable("my_table", Some(infoDate), Some(infoDate))
.getCurrentBatch()
is used with incremental schedule, it returns only latests non-processed data. The offset management is used to keep tracked of processed data.pramen_batchid
is added automatically to output tables of ingested and transformed data in order to track offsets. The exception is metastoreraw
format, which keeps original files as they are, and so we can't add thepramen_batchid
column to such tables.Example offsets with one ingestion of raw files, one transient transformer, one normal transformer, and a sink:
Notification of the second run for a day:
The corresponding offsets are:
Offsets committed together are highlighted. They are committed atomically. Ether all or none. Note that transient jobs are committed only if the jobs that is calling it was successfully saved. This is so that no records are lost.
Records are as follows:
TEST_POC_raw
minimum and maximum offsets for the raw file ingestionTEST_POC_raw->TEST_POC_parquet
for tracking the input of the conversion transient transformerTEST_POC_parquet->TEST_POC_publish
for tracking the input of the standardization transformerTEST_POC_publish
for tracking the output of the standardization transformerTEST_POC_publish->TEST_POC_publish->kafka_avro
for tracking the input of the sinkTEST_POC_publish->kafka_avro
for tracking the output of the sink.