Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] unsynchronize TableViewLoadDataStoreImpl #23487

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public TableViewLoadDataStoreImpl(PulsarService pulsar, String topic, Class<T> c
}

@Override
public synchronized CompletableFuture<Void> pushAsync(String key, T loadData) {
public CompletableFuture<Void> pushAsync(String key, T loadData) {
String msg = validateProducer();
if (StringUtils.isNotBlank(msg)) {
return CompletableFuture.failedFuture(new IllegalStateException(msg));
Expand All @@ -79,7 +79,7 @@ public synchronized CompletableFuture<Void> pushAsync(String key, T loadData) {
}

@Override
public synchronized CompletableFuture<Void> removeAsync(String key) {
public CompletableFuture<Void> removeAsync(String key) {
String msg = validateProducer();
if (StringUtils.isNotBlank(msg)) {
return CompletableFuture.failedFuture(new IllegalStateException(msg));
Expand All @@ -89,7 +89,7 @@ public synchronized CompletableFuture<Void> removeAsync(String key) {
}

@Override
public synchronized Optional<T> get(String key) {
public Optional<T> get(String key) {
String msg = validateTableView();
if (StringUtils.isNotBlank(msg)) {
if (msg.equals(SHUTDOWN_ERR_MSG)) {
Expand All @@ -102,15 +102,15 @@ public synchronized Optional<T> get(String key) {
}

@Override
public synchronized void forEach(BiConsumer<String, T> action) {
public void forEach(BiConsumer<String, T> action) {
String msg = validateTableView();
if (StringUtils.isNotBlank(msg)) {
throw new IllegalStateException(msg);
}
tableView.forEach(action);
}

public synchronized Set<Map.Entry<String, T>> entrySet() {
public Set<Map.Entry<String, T>> entrySet() {
String msg = validateTableView();
if (StringUtils.isNotBlank(msg)) {
throw new IllegalStateException(msg);
Expand All @@ -119,7 +119,7 @@ public synchronized Set<Map.Entry<String, T>> entrySet() {
}

@Override
public synchronized int size() {
public int size() {
String msg = validateTableView();
if (StringUtils.isNotBlank(msg)) {
throw new IllegalStateException(msg);
Expand All @@ -135,14 +135,14 @@ private void validateState() {


@Override
public synchronized void init() throws IOException {
public void init() throws IOException {
validateState();
close();
start();
}

@Override
public synchronized void closeTableView() throws IOException {
public void closeTableView() throws IOException {
validateState();
if (tableView != null) {
tableView.close();
Expand All @@ -151,21 +151,21 @@ public synchronized void closeTableView() throws IOException {
}

@Override
public synchronized void start() throws LoadDataStoreException {
public void start() throws LoadDataStoreException {
validateState();
startProducer();
startTableView();
}

private synchronized void closeProducer() throws IOException {
private void closeProducer() throws IOException {
validateState();
if (producer != null) {
producer.close();
producer = null;
}
}
@Override
public synchronized void startTableView() throws LoadDataStoreException {
public void startTableView() throws LoadDataStoreException {
validateState();
if (tableView == null) {
try {
Expand All @@ -181,7 +181,7 @@ public synchronized void startTableView() throws LoadDataStoreException {
}
}
@Override
public synchronized void startProducer() throws LoadDataStoreException {
public void startProducer() throws LoadDataStoreException {
validateState();
if (producer == null) {
try {
Expand All @@ -196,7 +196,7 @@ public synchronized void startProducer() throws LoadDataStoreException {
}

@Override
public synchronized void close() throws IOException {
public void close() throws IOException {
if (isShutdown) {
return;
}
Expand All @@ -205,7 +205,7 @@ public synchronized void close() throws IOException {
}

@Override
public synchronized void shutdown() throws IOException {
public void shutdown() throws IOException {
close();
isShutdown = true;
}
Expand Down
Loading