-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.rs
47 lines (42 loc) · 1.29 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
use kcl::checkpointer::Checkpointer;
use kcl::reader::StdinReader;
use kcl::writer::StdoutWriter;
use kcl::{run, Processor, Record};
use serde::Deserialize;
#[derive(Debug, Deserialize)]
struct MyPayload {
event_field: String,
}
struct MyConsumer;
impl Processor<StdoutWriter, StdinReader> for MyConsumer {
fn initialize(&mut self, _shard_id: &str) {}
fn process_records(
&mut self,
data: &[Record],
checkpointer: &mut Checkpointer<StdoutWriter, StdinReader>,
) {
for record in data {
match record.json::<MyPayload>() {
Ok(data) => println!("{:?}", data.event_field),
Err(e) => println!("{:?}", e),
}
}
checkpointer
.checkpoint(None, None)
.expect("Checkpoint to succeed.");
}
fn lease_lost(&mut self) {}
fn shard_ended(&mut self, checkpointer: &mut Checkpointer<StdoutWriter, StdinReader>) {
checkpointer
.checkpoint(None, None)
.expect("Checkpoint to succeed.");
}
fn shutdown_requested(&mut self, checkpointer: &mut Checkpointer<StdoutWriter, StdinReader>) {
checkpointer
.checkpoint(None, None)
.expect("Checkpoint to succeed.");
}
}
fn main() {
run(&mut MyConsumer {});
}