diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java index 3ce44a1e65a73..1bc2335ee5947 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java @@ -69,7 +69,7 @@ public TableViewLoadDataStoreImpl(PulsarService pulsar, String topic, Class c } @Override - public synchronized CompletableFuture pushAsync(String key, T loadData) { + public CompletableFuture pushAsync(String key, T loadData) { String msg = validateProducer(); if (StringUtils.isNotBlank(msg)) { return CompletableFuture.failedFuture(new IllegalStateException(msg)); @@ -79,7 +79,7 @@ public synchronized CompletableFuture pushAsync(String key, T loadData) { } @Override - public synchronized CompletableFuture removeAsync(String key) { + public CompletableFuture removeAsync(String key) { String msg = validateProducer(); if (StringUtils.isNotBlank(msg)) { return CompletableFuture.failedFuture(new IllegalStateException(msg)); @@ -89,7 +89,7 @@ public synchronized CompletableFuture removeAsync(String key) { } @Override - public synchronized Optional get(String key) { + public Optional get(String key) { String msg = validateTableView(); if (StringUtils.isNotBlank(msg)) { if (msg.equals(SHUTDOWN_ERR_MSG)) { @@ -102,7 +102,7 @@ public synchronized Optional get(String key) { } @Override - public synchronized void forEach(BiConsumer action) { + public void forEach(BiConsumer action) { String msg = validateTableView(); if (StringUtils.isNotBlank(msg)) { throw new IllegalStateException(msg); @@ -110,7 +110,7 @@ public synchronized void forEach(BiConsumer action) { tableView.forEach(action); } - public synchronized Set> entrySet() { + public Set> entrySet() { String msg = validateTableView(); if (StringUtils.isNotBlank(msg)) { throw new IllegalStateException(msg); @@ -119,7 +119,7 @@ public synchronized Set> entrySet() { } @Override - public synchronized int size() { + public int size() { String msg = validateTableView(); if (StringUtils.isNotBlank(msg)) { throw new IllegalStateException(msg); @@ -135,14 +135,14 @@ private void validateState() { @Override - public synchronized void init() throws IOException { + public void init() throws IOException { validateState(); close(); start(); } @Override - public synchronized void closeTableView() throws IOException { + public void closeTableView() throws IOException { validateState(); if (tableView != null) { tableView.close(); @@ -151,13 +151,13 @@ public synchronized void closeTableView() throws IOException { } @Override - public synchronized void start() throws LoadDataStoreException { + public void start() throws LoadDataStoreException { validateState(); startProducer(); startTableView(); } - private synchronized void closeProducer() throws IOException { + private void closeProducer() throws IOException { validateState(); if (producer != null) { producer.close(); @@ -165,7 +165,7 @@ private synchronized void closeProducer() throws IOException { } } @Override - public synchronized void startTableView() throws LoadDataStoreException { + public void startTableView() throws LoadDataStoreException { validateState(); if (tableView == null) { try { @@ -181,7 +181,7 @@ public synchronized void startTableView() throws LoadDataStoreException { } } @Override - public synchronized void startProducer() throws LoadDataStoreException { + public void startProducer() throws LoadDataStoreException { validateState(); if (producer == null) { try { @@ -196,7 +196,7 @@ public synchronized void startProducer() throws LoadDataStoreException { } @Override - public synchronized void close() throws IOException { + public void close() throws IOException { if (isShutdown) { return; } @@ -205,7 +205,7 @@ public synchronized void close() throws IOException { } @Override - public synchronized void shutdown() throws IOException { + public void shutdown() throws IOException { close(); isShutdown = true; }