Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#374 Incremental Ingestion #487

Merged
merged 55 commits into from
Oct 29, 2024
Merged

#374 Incremental Ingestion #487

merged 55 commits into from
Oct 29, 2024

Conversation

yruslan
Copy link
Collaborator

@yruslan yruslan commented Sep 18, 2024

Closes #374
Closes #421

This PR adds 'incremental' as a schedule type, and mechanisms for managing offsets (experimental).

Pramen version 1.10 introduces the concept of incremental ingestion. It allows running a pipeline multiple times a day
without reprocessing data that was already processed. In order to enable it, use incremental schedule when defining your
ingestion operation:

schedule = "incremental"

In order for the incremental ingestion to work you need to define a monotonically increasing field, called an offset.
Usually, this incremental field can be a counter, or a record creation timestamp. You need to define the offset field in
your source. The source should support incremental ingestion in order to use this mode.

offset.column {
  name = "created_at"
  type = "datetime"
}

Offset types available at the moment:

Type Description
integral Any integral type (short, int, long)
datetime A datetime or timestamp fields
string Only string / varchar(n) types.

Only ingestion jobs support incremental schedule at the moment. Incremental transformations and sinks are planned to be
available soon.

@yruslan yruslan force-pushed the feature/374-incremental-ingestion branch 3 times, most recently from 9d9ee04 to 41c8f14 Compare September 18, 2024 15:01
Copy link

github-actions bot commented Sep 19, 2024

Unit Test Coverage

Overall Project 84.16% -2.66% 🍏
Files changed 77.8%

Module Coverage
pramen:core Jacoco Report 84.93% -2.93%
Files
Module File Coverage
pramen:core Jacoco Report IdentityTransformer.scala 100% 🍏
OffsetInfoParser.scala 100% 🍏
TableReaderJdbcConfig.scala 100% -1.08% 🍏
JournalHadoopCsv.scala 100% 🍏
JdbcSource.scala 100% 🍏
TableReaderJdbcBase.scala 100% 🍏
JournalTasks.scala 100% 🍏
TaskCompletedCsv.scala 100% -18.46%
JournalTask.scala 100% -6.15%
MetaTableStats.scala 100% -57.14%
OffsetRecords.scala 100% 🍏
OffsetRecordConverter.scala 100% 🍏
DataOffsetRequest.scala 100%
DataOffsetAggregated.scala 100%
OffsetRecord.scala 100%
MetaTable.scala 98.91% 🍏
Schedule.scala 98.58% -8.87%
MetastorePersistenceRaw.scala 98.39% 🍏
TransferTable.scala 97.83% 🍏
TaskCompleted.scala 97.48% -7.56% 🍏
InfoDateConfig.scala 97% 🍏
TransferJob.scala 96.77% 🍏
SqlGeneratorDenodo.scala 96.43% -1.34% 🍏
SqlGeneratorDb2.scala 96.4% -1.35% 🍏
SqlGeneratorHive.scala 96.38% -1.36% 🍏
SqlGeneratorHsqlDb.scala 96.38% -1.36% 🍏
OperationSplitter.scala 96.29% 🍏
SqlGeneratorGeneric.scala 96.26% -1.4% 🍏
ScheduleStrategyUtils.scala 95.78% 🍏
TransformationJob.scala 95.56% -1.78% 🍏
SqlGeneratorSas.scala 95.5% -1.74% 🍏
SparkSource.scala 95.45% -4.03% 🍏
LocalSparkSource.scala 94.88% -1.02%
SparkUtils.scala 94.76% -0.57% 🍏
SinkJob.scala 93.67% 🍏
SqlGeneratorOracle.scala 93.38% -1.32% 🍏
PramenImpl.scala 93.23% 🍏
ScheduleStrategyIncremental.scala 93.16% -6.84% 🍏
SqlGeneratorPostgreSQL.scala 91.71% -5.99% 🍏
SqlGeneratorMySQL.scala 91.71% -3.23% 🍏
PythonTransformationJob.scala 91.71% 🍏
PipelineNotificationBuilderHtml.scala 91.3% -1.74%
JobBase.scala 91.26% -1.82% 🍏
AppContextImpl.scala 90.56% -2.58% 🍏
MetastoreImpl.scala 90.33% -1.73% 🍏
ConcurrentJobRunnerImpl.scala 88.98% -8.33%
AppRunner.scala 88.91% 🍏
PipelineStateImpl.scala 88.89% 🍏
OperationDef.scala 88.74% -1.2%
RawFileSource.scala 88.72% -0.51%
TaskRunnerBase.scala 88.47% -0.17% 🍏
SqlGeneratorMicrosoft.scala 87.5% -2.53% 🍏
OffsetManagerJdbc.scala 86.85% -13.15% 🍏
MetastorePersistenceParquet.scala 86.6% -2.49% 🍏
MetastorePersistence.scala 84.43% 🍏
MetastorePersistenceTransient.scala 84.38% 🍏
TableReaderJdbc.scala 84.03% -2.47%
IncrementalIngestionJob.scala 81.65% -18.35% 🍏
JournalJdbc.scala 81.53% 🍏
BookkeeperBase.scala 81.21% -3.76%
IngestionJob.scala 80.93% 🍏
TableReaderJdbcNative.scala 78.96% -16.69%
MetastorePersistenceDelta.scala 71.7% -11.52%
TableReaderSpark.scala 70.63% -25.92%
Bookkeeper.scala 69.28% 🍏
SlickUtils.scala 67.05% -13.64%
MetastorePersistenceTransientEager.scala 64.39% -7.2%
BookkeeperJdbc.scala 56.35% -0.73%
PramenDb.scala 54.19% -1.83%

@yruslan yruslan force-pushed the feature/374-incremental-ingestion branch 2 times, most recently from a5f21d3 to 91e7891 Compare September 25, 2024 12:28
@yruslan yruslan force-pushed the feature/374-incremental-ingestion branch 2 times, most recently from 35017a7 to 09420c2 Compare September 30, 2024 13:39
…and add more useful methods to metastore interfaces.
Apparently, Spark 2.4.8 infers '2021-02-18' as timestamp :O
@yruslan yruslan force-pushed the feature/374-incremental-ingestion branch from 6c108ed to f861bf9 Compare September 30, 2024 14:00
@yruslan yruslan marked this pull request as ready for review October 1, 2024 08:40
@yruslan yruslan marked this pull request as draft October 3, 2024 13:02
@yruslan
Copy link
Collaborator Author

yruslan commented Oct 3, 2024

Putting this back to draft since Kevin suggested to use full intervals for offset tracking instead of half-intervals.
Potential benefits:

  • Easier to understand the offsets table
  • We don't have to rely on absolute minimum values for each offset type

@yruslan yruslan force-pushed the feature/374-incremental-ingestion branch from 96d43fc to 99b3689 Compare October 4, 2024 11:08
@yruslan yruslan marked this pull request as ready for review October 8, 2024 07:55
@yruslan yruslan requested a review from kevinwallimann October 8, 2024 07:56
This is because for inclusive intervals minimums are not needed.
This is when the input table does not have an information date field, and uncommitted offsets are old. Then they wasn't checked.
TaskPreDef(date, TaskRunReason.New)
})
} else {
Seq(TaskPreDef(infoDate, TaskRunReason.New))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there not be the same logic as in line 75? I.e. empty seq if maximumInfoDate is after infoDate?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because when we don't have an information date at the source, our information date is the processing date, and we can't run for previous information dates, hence empty Seq. But when the source has an information column, new offsets could have arrived for that specific date, so we allow it to run.

When I looked at the code, and especially when started writing unit tests for it as you suggested, I decided to refactor it a bit to make it less confising.

offsets.map(OffsetRecordConverter.toDataOffset)
}

override def getUncommittedOffsets(table: String, onlyForInfoDate: Option[LocalDate]): Array[UncommittedOffset] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be some duplicated logic with getOffsets. What different purpose do these two methods have?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number offsets for a table could be quite a lot, but uncommitted offsets count should be very low. So this method makes sure that when we get uncommitted offsets for a table, the filtering is done on the database, not after they are transferred.

getOffsets() requires an information date to ensure millions of offsets won't be loaded, but returns both committed and uncommitted offsets.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you not test the splitComplexIdentifier for mysql, but for MicrosoftSQL?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

splitComplexIdentifier() is defined in SqlGeneratorBase which is tested already in SqlGeneratorDenodoSuite. But for MicrosoftSQL the method is special in order to support 2 types of escaping [] and "" and its mix.

But I'll move testing of splitComplexIdentifier() defined at SqlGeneratorBase to SqlGeneratorGenericSuite or create SqlGeneratorBaseSuite to avoid confusion

"wrapped query without alias for SQL queries " in {
assert(gen.getDtable("SELECT A FROM B") == "(SELECT A FROM B)")
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you not add a test for quote and unquote?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests. And found a bug. 😄

@yruslan
Copy link
Collaborator Author

yruslan commented Oct 29, 2024

Hi Kevin, I've fixed all obvious issues, and added explanations for some. I'm happy with the results. Thanks again for the effort reviewing this!

Going to merge this now to unblock some other issues on the stack. Comments remain open, and if you want to continue conversation on some of them will be happy to.

@yruslan yruslan merged commit 23171be into main Oct 29, 2024
8 checks passed
@yruslan yruslan deleted the feature/374-incremental-ingestion branch October 29, 2024 11:08
@yruslan yruslan mentioned this pull request Nov 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for incremental ingestion
2 participants