Skip to content

Commit

Permalink
Merge pull request #15 from practo/events-test-fixes
Browse files Browse the repository at this point in the history
Events test fixes
  • Loading branch information
alok87 committed Aug 12, 2020
2 parents 7b88ebe + b4fca6b commit e7df69b
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 10 deletions.
4 changes: 2 additions & 2 deletions build/produce.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ for i in $(seq 1 $messages); do # start 10 jobs in parallel
plumber write message kafka --address=a6f50e841fe284aea870ea716ecf0623-1714444736.ap-south-1.elb.amazonaws.com:9094 --key tipocakey --topic="${topic}" --input-data="$d"
done

# Wait for all parallel jobs to finish
while [ 1 ]; do fg 2> /dev/null; [ $? == 1 ] && break; done
# Wait for all parallel jobs to finish (add & and make it parallel)
# while [ 1 ]; do fg 2> /dev/null; [ $? == 1 ] && break; done
4 changes: 2 additions & 2 deletions cmd/redshiftbatcher/config/config-sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ batcher:
maxWaitSeconds: 20
kafka:
brokers: "localhost:9092, localhost:9094"
group: "cosumer-group-"
group: "dbname"
version: "2.5.0"
topicPrefixes: "tipoca-producer1, tipoca-producer2"
opicPrefixes: "tipocaprefix.dbname.table1, tipocaprefix.dbname.table2"
kafkaClient: "sarama"
sarama:
assignor: "range"
Expand Down
34 changes: 28 additions & 6 deletions pkg/consumer/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,16 @@ type batchProcessor struct {
bodyBuf *bytes.Buffer

// batchId is a forever increasing number which resets after maxBatchId
// this is useful only for logging and debugging purpose
batchId int

// batchStartOffset is the starting offset of the batch
// this is useful only for logging and debugging purpose
batchStartOffset int64

// batchEndOffset is the ending offset of the batch
// this is useful only for logging and debugging purpose
batchEndOffset int64
}

func newBatchProcessor(
Expand Down Expand Up @@ -126,7 +135,7 @@ func (b *batchProcessor) commitOffset(datas []interface{}) {
// need to worry about this. Since the write to s3 again will
// just overwrite the same data.
b.session.Commit()
klog.Infof(
klog.V(5).Infof(
"topic=%s, message=%s: Processed\n",
message.Topic, string(message.Value),
)
Expand All @@ -145,7 +154,10 @@ func (b *batchProcessor) setS3key(topic string, partition int32, offset int64) {
b.s3Key = filepath.Join(
b.s3BucketDir,
topic,
fmt.Sprintf("partition-%d_offset-%d.csv", partition, offset),
fmt.Sprintf(
"%d_offset_%d_partition.csv",
offset,
partition),
)
}

Expand All @@ -170,13 +182,15 @@ func (b *batchProcessor) newtransformedBuffer(datas []interface{}) {
b.setS3key(message.Topic, message.Partition, message.Offset)
klog.V(5).Infof("topic=%s, batchId=%d id=%d: s3Key=%s\n",
b.topic, b.batchId, id, b.s3Key)
b.batchStartOffset = message.Offset
}

b.bodyBuf.Write(message.Value)
b.bodyBuf.Write([]byte{'\n'})
klog.V(5).Infof(
"topic=%s, batchId=%d id=%d: transformed\n",
b.topic, b.batchId, id)
b.batchEndOffset = message.Offset
}
}

Expand All @@ -202,14 +216,22 @@ func (b *batchProcessor) process(workerID int, datas []interface{}) {
if err != nil {
klog.Fatalf("Error writing to s3, err=%v\n", err)
}

klog.V(4).Infof(
"topic=%s, batchId=%d, startOffset: %d, endOffset: %d: Uploaded",
b.topic, b.batchId, b.batchStartOffset, b.batchEndOffset,
)
klog.V(5).Infof(
"topic=%s, batchId=%d: Uploaded batch to S3 at: %s",
"topic=%s, batchId=%d: Uploaded at: %s",
b.topic, b.batchId, b.s3Key,
)
b.bodyBuf.Truncate(0)

// // TODO: add a job to load the batch to redshift
// time.Sleep(time.Second * 3)
b.bodyBuf.Truncate(0)

b.commitOffset(datas)

klog.Infof(
"topic=%s, batchId=%d, startOffset: %d, endOffset: %d: Processed",
b.topic, b.batchId, b.batchStartOffset, b.batchEndOffset,
)
}
27 changes: 27 additions & 0 deletions producer/examples/load.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/bin/sh

# ./load.sh 100 topic-prefix
# run from local in k8s
# k get pods -n kafka | grep mysql | awk '{print $1}' | xargs -I {} kubectl exec {} /bin/sh -n kafka /var/lib/mysql/load.sh

set -m # Enable Job Control

if [ -z "$1" ]; then
inserts=10
else
inserts=$1
fi

echo "Inserting $inserts rows is customers table"

last_id=$(mysql -N -s -uroot -pdebezium -Dinventory -e "select id from customers order by id desc limit 1")
new_id=$((last_id+1))

for i in $(seq 1 $inserts); do # start 10 jobs in parallel
echo "inserting: ${new_id}"
mysql -N -s -uroot -pdebezium -Dinventory -e "insert into customers (id, first_name, last_name, email) values ('${new_id}', 'first_name_${new_id}', 'last_name_${new_id}', 'email_${new_id}');"
new_id=$((new_id+1))
done

# Wait for all parallel jobs to finish (add & and make it parallel)
# while [ 1 ]; do fg 2> /dev/null; [ $? == 1 ] && break; done

0 comments on commit e7df69b

Please sign in to comment.