-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproducer.js
36 lines (31 loc) · 942 Bytes
/
producer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
const { kafka } = require("./client");
const readline = require("readline");
const { KAFKA_RIDER_TOPIC } = require("./constant");
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
async function init() {
const producer = kafka.producer();
console.log("connecting producer...");
await producer.connect();
console.log("Producer connected successfully");
rl.setPrompt("> ");
rl.prompt();
rl.on("line", async (line) => {
const [riderName, location] = line.split(" ");
await producer.send({
topic: KAFKA_RIDER_TOPIC,
messages: [
{
partition: location.toLowerCase() === "north" ? 0 : 1, // setting the partition according to consumer location
key: "location-update",
value: JSON.stringify({ name: riderName, location }),
},
],
});
}).on("close", async () => {
await producer.disconnect();
});
}
init();