-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#374 Incremental Ingestion #487
Conversation
9d9ee04
to
41c8f14
Compare
a5f21d3
to
91e7891
Compare
35017a7
to
09420c2
Compare
…and add more useful methods to metastore interfaces.
…d info dates + offsets.
Apparently, Spark 2.4.8 infers '2021-02-18' as timestamp :O
6c108ed
to
f861bf9
Compare
Putting this back to draft since Kevin suggested to use full intervals for offset tracking instead of half-intervals.
|
96d43fc
to
99b3689
Compare
This is because for inclusive intervals minimums are not needed.
This is when the input table does not have an information date field, and uncommitted offsets are old. Then they wasn't checked.
...core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyIncremental.scala
Show resolved
Hide resolved
TaskPreDef(date, TaskRunReason.New) | ||
}) | ||
} else { | ||
Seq(TaskPreDef(infoDate, TaskRunReason.New)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there not be the same logic as in line 75? I.e. empty seq if maximumInfoDate is after infoDate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because when we don't have an information date at the source, our information date is the processing date, and we can't run for previous information dates, hence empty Seq. But when the source has an information column, new offsets could have arrived for that specific date, so we allow it to run.
When I looked at the code, and especially when started writing unit tests for it as you suggested, I decided to refactor it a bit to make it less confising.
...core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyIncremental.scala
Outdated
Show resolved
Hide resolved
pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala
Outdated
Show resolved
Hide resolved
pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala
Outdated
Show resolved
Hide resolved
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala
Outdated
Show resolved
Hide resolved
offsets.map(OffsetRecordConverter.toDataOffset) | ||
} | ||
|
||
override def getUncommittedOffsets(table: String, onlyForInfoDate: Option[LocalDate]): Array[UncommittedOffset] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There seems to be some duplicated logic with getOffsets
. What different purpose do these two methods have?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The number offsets for a table could be quite a lot, but uncommitted offsets count should be very low. So this method makes sure that when we get uncommitted offsets for a table, the filtering is done on the database, not after they are transferred.
getOffsets()
requires an information date to ensure millions of offsets won't be loaded, but returns both committed and uncommitted offsets.
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you not test the splitComplexIdentifier for mysql, but for MicrosoftSQL?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
splitComplexIdentifier()
is defined in SqlGeneratorBase
which is tested already in SqlGeneratorDenodoSuite
. But for MicrosoftSQL
the method is special in order to support 2 types of escaping []
and ""
and its mix.
But I'll move testing of splitComplexIdentifier()
defined at SqlGeneratorBase
to SqlGeneratorGenericSuite
or create SqlGeneratorBaseSuite
to avoid confusion
"wrapped query without alias for SQL queries " in { | ||
assert(gen.getDtable("SELECT A FROM B") == "(SELECT A FROM B)") | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you not add a test for quote and unquote?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added tests. And found a bug. 😄
Co-authored-by: Kevin Wallimann <kevin.wallimann@absa.africa>
…SparkUtilsSuite.scala Co-authored-by: Kevin Wallimann <kevin.wallimann@absa.africa>
Co-authored-by: Kevin Wallimann <kevin.wallimann@absa.africa>
Hi Kevin, I've fixed all obvious issues, and added explanations for some. I'm happy with the results. Thanks again for the effort reviewing this! Going to merge this now to unblock some other issues on the stack. Comments remain open, and if you want to continue conversation on some of them will be happy to. |
Closes #374
Closes #421
This PR adds 'incremental' as a schedule type, and mechanisms for managing offsets (experimental).
Pramen
version 1.10
introduces the concept of incremental ingestion. It allows running a pipeline multiple times a daywithout reprocessing data that was already processed. In order to enable it, use
incremental
schedule when defining youringestion operation:
In order for the incremental ingestion to work you need to define a monotonically increasing field, called an offset.
Usually, this incremental field can be a counter, or a record creation timestamp. You need to define the offset field in
your source. The source should support incremental ingestion in order to use this mode.
Offset types available at the moment:
short
,int
,long
)datetime
ortimestamp
fieldsstring
/varchar(n)
types.Only ingestion jobs support incremental schedule at the moment. Incremental transformations and sinks are planned to be
available soon.