From 5d5cf321d8dae7bda0b81285256336e3de0adae3 Mon Sep 17 00:00:00 2001
From: Pradithya Aria <pradithya.pura@go-jek.com>
Date: Fri, 1 Feb 2019 19:23:49 +0800
Subject: [PATCH] Fix missing redis write

---
 .../feast/storage/redis/RedisCustomIO.java    | 21 ++++++++-----------
 1 file changed, 9 insertions(+), 12 deletions(-)

diff --git a/ingestion/src/main/java/feast/storage/redis/RedisCustomIO.java b/ingestion/src/main/java/feast/storage/redis/RedisCustomIO.java
index 388319e84df..d9b8261093b 100644
--- a/ingestion/src/main/java/feast/storage/redis/RedisCustomIO.java
+++ b/ingestion/src/main/java/feast/storage/redis/RedisCustomIO.java
@@ -168,7 +168,10 @@ public void processElement(ProcessContext context) {
         }
         batchCount++;
         if (batchCount >= batchSize) {
-          exec();
+          pipeline.exec();
+          pipeline.sync();
+          pipeline.multi();
+          batchCount = 0;
         }
       }
 
@@ -192,19 +195,13 @@ private Response<?> writeRecord(RedisMutation mutation) {
         }
       }
 
-      private void exec() {
-        // LOGGER.info("Flushing pipeline");
-        if (!pipeline.isInMulti()) {
-          pipeline.multi();
-        }
-        pipeline.exec();
-        pipeline.multi();
-        batchCount = 0;
-      }
-
       @FinishBundle
       public void finishBundle() {
-        exec();
+        if (pipeline.isInMulti()) {
+          pipeline.exec();
+          pipeline.sync();
+        }
+        batchCount = 0;
       }
 
       @Teardown