Skip to content

Commit

Permalink
check for thread interrupt in process
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesChangOkx committed Feb 19, 2024
1 parent 64b5aac commit cc7bb72
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private void process() {
} else {
throw new JedisException("Unknown message type: " + reply);
}
} while (isSubscribed());
} while (!Thread.currentThread().isInterrupted() && isSubscribed());

// /* Invalidate instance since this thread is no longer listening */
// this.client = null;
Expand Down
56 changes: 56 additions & 0 deletions src/test/java/redis/clients/jedis/JedisShardedPubSubBaseTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package redis.clients.jedis;

import junit.framework.TestCase;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static redis.clients.jedis.Protocol.ResponseKeyword.SMESSAGE;
import static redis.clients.jedis.Protocol.ResponseKeyword.SSUBSCRIBE;

public class JedisShardedPubSubBaseTest extends TestCase {

public void testProceed() throws InterruptedException {
// setup
final JedisShardedPubSubBase<String> pubSub = new JedisShardedPubSubBase<String>() {

@Override
public void onSMessage(String channel, String message) {
fail("this should not happen when thread is interrupted");
}

@Override
protected String encode(byte[] raw) {
return new String(raw);
}

};

final Connection mockConnection = mock(Connection.class);
final List<Object> mockSubscribe = Arrays.asList(
SSUBSCRIBE.getRaw(), "channel".getBytes(), 1L
);
final List<Object> mockResponse = Arrays.asList(
SMESSAGE.getRaw(), "channel".getBytes(), "message".getBytes()
);
when(mockConnection.getUnflushedObject()).thenReturn(mockSubscribe, mockResponse);


final CountDownLatch countDownLatch = new CountDownLatch(1);
// action
final Thread thread = new Thread(() -> {
Thread.currentThread().interrupt();
pubSub.proceed(mockConnection, "channel");

countDownLatch.countDown();
});
thread.start();

assertTrue(countDownLatch.await(10, TimeUnit.MILLISECONDS));

}
}

0 comments on commit cc7bb72

Please sign in to comment.