forked from pflooky/data-caterer-example
-
Notifications
You must be signed in to change notification settings - Fork 0
/
AdvancedKafkaPlanRun.scala
52 lines (46 loc) · 2.08 KB
/
AdvancedKafkaPlanRun.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package io.github.datacatering.plan
import io.github.datacatering.datacaterer.api.PlanRun
import io.github.datacatering.datacaterer.api.model.{ArrayType, DateType, DoubleType, HeaderType, IntegerType, StructType, TimestampType}
import java.sql.Date
class AdvancedKafkaPlanRun extends PlanRun {
val kafkaTask = kafka("my_kafka", "kafkaserver:29092")
.topic("account-topic")
.schema(
field.name("key").sql("content.account_id"),
field.name("value").sql("TO_JSON(content)"),
//field.name("partition").type(IntegerType), can define partition here
field.name("headers")
.`type`(HeaderType.getType)
.sql(
"""ARRAY(
| NAMED_STRUCT('key', 'account-id', 'value', TO_BINARY(content.account_id, 'utf-8')),
| NAMED_STRUCT('key', 'updated', 'value', TO_BINARY(content.details.updated_by.time, 'utf-8'))
|)""".stripMargin
),
field.name("content").`type`(StructType)
.schema(
field.name("account_id").regex("ACC[0-9]{8}"),
field.name("year").`type`(IntegerType).min(2021).max(2023),
field.name("account_status").oneOf("open", "closed", "suspended", "pending"),
field.name("amount").`type`(DoubleType),
field.name("details").`type`(StructType)
.schema(
field.name("name").expression("#{Name.name}"),
field.name("first_txn_date").`type`(DateType).sql("ELEMENT_AT(SORT_ARRAY(content.transactions.txn_date), 1)"),
field.name("updated_by").`type`(StructType)
.schema(
field.name("user"),
field.name("time").`type`(TimestampType),
),
),
field.name("transactions").`type`(ArrayType)
.schema(
field.name("txn_date").`type`(DateType).min(Date.valueOf("2021-01-01")).max("2021-12-31"),
field.name("amount").`type`(DoubleType),
)
),
).count(count.records(10))
val config = configuration
.generatedReportsFolderPath("/opt/app/data/report")
execute(config, kafkaTask)
}