Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #303] Support upper layer write entry and get response data when entry is applied #304

Merged
merged 2 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.openmessaging.storage.dledger.common.ShutdownAbleThread;
import io.openmessaging.storage.dledger.common.Status;
import io.openmessaging.storage.dledger.common.TimeoutFuture;
import io.openmessaging.storage.dledger.common.WriteClosure;
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.openmessaging.storage.dledger.exception.DLedgerException;
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
Expand All @@ -32,6 +33,7 @@
import io.openmessaging.storage.dledger.snapshot.SnapshotManager;
import io.openmessaging.storage.dledger.snapshot.SnapshotMeta;
import io.openmessaging.storage.dledger.snapshot.SnapshotReader;
import io.openmessaging.storage.dledger.statemachine.ApplyEntry;
import io.openmessaging.storage.dledger.statemachine.StateMachineCaller;
import io.openmessaging.storage.dledger.store.DLedgerStore;
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
Expand Down Expand Up @@ -187,12 +189,17 @@ public void wakeUpDispatchers() {
*
* @return true if complete success
*/
public boolean completeResponseFuture(final long index) {
public boolean completeResponseFuture(final ApplyEntry task) {
final long index = task.getEntry().getIndex();
final long term = this.memberState.currTerm();
ConcurrentMap<Long, Closure> closureMap = this.pendingClosure.get(term);
if (closureMap != null) {
Closure closure = closureMap.remove(index);
if (closure != null) {
if (closure instanceof WriteClosure) {
WriteClosure writeClosure = (WriteClosure) closure;
writeClosure.setResp(task.getResp());
}
closure.done(Status.ok());
LOGGER.info("Complete closure, term = {}, index = {}", term, index);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.openmessaging.storage.dledger.common.ReadClosure;
import io.openmessaging.storage.dledger.common.ReadMode;
import io.openmessaging.storage.dledger.common.Status;
import io.openmessaging.storage.dledger.common.WriteClosure;
import io.openmessaging.storage.dledger.common.WriteTask;
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.openmessaging.storage.dledger.entry.DLedgerEntryType;
import io.openmessaging.storage.dledger.exception.DLedgerException;
Expand Down Expand Up @@ -388,6 +390,21 @@ public void handleRead(ReadMode mode, ReadClosure closure) {
}
}

@Override
public void handleWrite(WriteTask task, WriteClosure closure) {
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
PreConditions.check(memberState.getTransferee() == null, DLedgerResponseCode.LEADER_TRANSFERRING);
long currTerm = memberState.currTerm();
if (dLedgerEntryPusher.isPendingFull(currTerm)) {
closure.done(Status.error(DLedgerResponseCode.LEADER_PENDING_FULL));
return;
}
DLedgerEntry dLedgerEntry = new DLedgerEntry();
dLedgerEntry.setBody(task.getBody());
DLedgerEntry entry = dLedgerStore.appendAsLeader(dLedgerEntry);
dLedgerEntryPusher.appendClosure(closure, entry.getTerm(), entry.getIndex());
}

private void dealUnsafeRead(ReadClosure closure) throws DLedgerException {
closure.done(Status.ok());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2017-2022 The DLedger Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.openmessaging.storage.dledger.common;

public abstract class WriteClosure<T> extends Closure {
public abstract void setResp(T t);

public abstract T getResp(T t);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2017-2022 The DLedger Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.openmessaging.storage.dledger.common;


public class WriteTask {

private byte[] body;

public void setBody(byte[] body) {
this.body = body;
}

public byte[] getBody() {
return body;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@

import io.openmessaging.storage.dledger.common.ReadClosure;
import io.openmessaging.storage.dledger.common.ReadMode;
import io.openmessaging.storage.dledger.common.WriteClosure;
import io.openmessaging.storage.dledger.common.WriteTask;

public interface DLedgerInnerProtocolHandler {

void handleRead(ReadMode mode, ReadClosure closure);

void handleWrite(WriteTask task, WriteClosure closure);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2017-2022 The DLedger Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.openmessaging.storage.dledger.statemachine;

import io.openmessaging.storage.dledger.entry.DLedgerEntry;

public class ApplyEntry<T> {

private DLedgerEntry entry;

private T resp;

public ApplyEntry(DLedgerEntry entry) {
this.entry = entry;
}

public void setResp(T resp) {
this.resp = resp;
}

public T getResp() {
return resp;
}

public void setEntry(DLedgerEntry entry) {
this.entry = entry;
}

public DLedgerEntry getEntry() {
return entry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
/**
* The iterator implementation of committed entries.
*/
public class CommittedEntryIterator implements Iterator<DLedgerEntry> {
private final CommittedEntryIteratorInner inner;
private DLedgerEntry nextEntry;
public class ApplyEntryIterator implements Iterator<ApplyEntry> {
private final ApplyEntryInnerIterator inner;
private ApplyEntry nextTask;

private final Predicate<DLedgerEntry> filter = new Predicate<DLedgerEntry>() {
@Override
Expand All @@ -38,9 +38,9 @@ public boolean test(DLedgerEntry entry) {
}
};

public CommittedEntryIterator(final DLedgerStore dLedgerStore, final long committedIndex, final long lastAppliedIndex,
final Function<Long, Boolean> completeEntryCallback) {
this.inner = new CommittedEntryIteratorInner(dLedgerStore, committedIndex, lastAppliedIndex, completeEntryCallback);
public ApplyEntryIterator(final DLedgerStore dLedgerStore, final long committedIndex, final long lastAppliedIndex,
final Function<ApplyEntry, Boolean> completeEntryCallback) {
this.inner = new ApplyEntryInnerIterator(dLedgerStore, committedIndex, lastAppliedIndex, completeEntryCallback);
}

public long getIndex() {
Expand All @@ -54,33 +54,35 @@ public int getCompleteAckNums() {
@Override
public boolean hasNext() {
while (inner.hasNext()) {
DLedgerEntry dLedgerEntry = inner.next();
if (filter.test(dLedgerEntry)) {
nextEntry = dLedgerEntry;
ApplyEntry applyEntry = inner.next();
if (filter.test(applyEntry.getEntry())) {
nextTask = applyEntry;
return true;
}
}
return false;
}

@Override
public DLedgerEntry next() {
DLedgerEntry entry = nextEntry;
nextEntry = null;
return entry;
public ApplyEntry next() {
ApplyEntry task = nextTask;
nextTask = null;
return task;
}

private static class CommittedEntryIteratorInner implements Iterator<DLedgerEntry> {
private static class ApplyEntryInnerIterator implements Iterator<ApplyEntry> {

private final Function<Long, Boolean> completeEntryCallback;
private final Function<ApplyEntry, Boolean> completeEntryCallback;
private final DLedgerStore dLedgerStore;
private final long committedIndex;
private final long firstApplyingIndex;
private long currentIndex;

private ApplyEntry currentTask;
private int completeAckNums = 0;

private CommittedEntryIteratorInner(final DLedgerStore dLedgerStore, final long committedIndex, final long lastAppliedIndex,
final Function<Long, Boolean> completeEntryCallback) {
private ApplyEntryInnerIterator(final DLedgerStore dLedgerStore, final long committedIndex, final long lastAppliedIndex,
final Function<ApplyEntry, Boolean> completeEntryCallback) {
this.dLedgerStore = dLedgerStore;
this.committedIndex = committedIndex;
this.firstApplyingIndex = lastAppliedIndex + 1;
Expand All @@ -100,17 +102,18 @@ public boolean hasNext() {
}

@Override
public DLedgerEntry next() {
public ApplyEntry next() {
++this.currentIndex;
if (this.currentIndex <= this.committedIndex) {
final DLedgerEntry dLedgerEntry = this.dLedgerStore.get(this.currentIndex);
return dLedgerEntry;
this.currentTask = new ApplyEntry(dLedgerEntry);
return this.currentTask;
}
return null;
}

private void completeApplyingEntry() {
if (this.completeEntryCallback.apply(this.currentIndex)) {
if (this.completeEntryCallback.apply(this.currentTask)) {
this.completeAckNums++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

public class NoOpStatemachine implements StateMachine {
@Override
public void onApply(CommittedEntryIterator iter) {
public void onApply(ApplyEntryIterator iter) {
while (iter.hasNext()) {
iter.next();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public interface StateMachine {
*
* @param iter iterator of committed entry
*/
void onApply(final CommittedEntryIterator iter);
void onApply(final ApplyEntryIterator iter);

/**
* User defined snapshot generate function.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public Thread newThread(Runnable r) {
return new Thread(r, "RetryOnCommittedScheduledThread");
}
});
private final Function<Long, Boolean> completeEntryCallback;
private final Function<ApplyEntry, Boolean> completeEntryCallback;
private volatile DLedgerException error;
private Optional<SnapshotManager> snapshotManager;

Expand All @@ -101,7 +101,7 @@ public StateMachineCaller(final DLedgerStore dLedgerStore, final StateMachine st
this.completeEntryCallback = entryPusher::completeResponseFuture;
entryPusher.registerStateMachine(this);
} else {
this.completeEntryCallback = index -> true;
this.completeEntryCallback = entry -> true;
}
this.snapshotManager = Optional.empty();
}
Expand Down Expand Up @@ -186,7 +186,7 @@ private void doCommitted(final long committedIndex) {
}, RETRY_ON_COMMITTED_DELAY, TimeUnit.MILLISECONDS);
return;
}
final CommittedEntryIterator iter = new CommittedEntryIterator(this.dLedgerStore, committedIndex, lastAppliedIndex, this.completeEntryCallback);
final ApplyEntryIterator iter = new ApplyEntryIterator(this.dLedgerStore, committedIndex, lastAppliedIndex, this.completeEntryCallback);
this.statemachine.onApply(iter);
final long lastIndex = iter.getIndex();
DLedgerEntry entry = this.dLedgerStore.get(lastIndex);
Expand Down
Loading