From 5d5cf321d8dae7bda0b81285256336e3de0adae3 Mon Sep 17 00:00:00 2001 From: Pradithya Aria <pradithya.pura@go-jek.com> Date: Fri, 1 Feb 2019 19:23:49 +0800 Subject: [PATCH] Fix missing redis write --- .../feast/storage/redis/RedisCustomIO.java | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/ingestion/src/main/java/feast/storage/redis/RedisCustomIO.java b/ingestion/src/main/java/feast/storage/redis/RedisCustomIO.java index 388319e84df..d9b8261093b 100644 --- a/ingestion/src/main/java/feast/storage/redis/RedisCustomIO.java +++ b/ingestion/src/main/java/feast/storage/redis/RedisCustomIO.java @@ -168,7 +168,10 @@ public void processElement(ProcessContext context) { } batchCount++; if (batchCount >= batchSize) { - exec(); + pipeline.exec(); + pipeline.sync(); + pipeline.multi(); + batchCount = 0; } } @@ -192,19 +195,13 @@ private Response<?> writeRecord(RedisMutation mutation) { } } - private void exec() { - // LOGGER.info("Flushing pipeline"); - if (!pipeline.isInMulti()) { - pipeline.multi(); - } - pipeline.exec(); - pipeline.multi(); - batchCount = 0; - } - @FinishBundle public void finishBundle() { - exec(); + if (pipeline.isInMulti()) { + pipeline.exec(); + pipeline.sync(); + } + batchCount = 0; } @Teardown