Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simplify the mina stack by cumulating IoBuffers until there are enough #3633

Merged
merged 21 commits into from
Dec 13, 2024

Conversation

thoniTUB
Copy link
Collaborator

@thoniTUB thoniTUB commented Dec 4, 2024

Tests fehlen noch und eine Strategie, sollten wir doch mal das IoBuffer limit erreichen.

@thoniTUB thoniTUB requested a review from awildturtok December 4, 2024 11:39
@thoniTUB
Copy link
Collaborator Author

thoniTUB commented Dec 4, 2024

Tests hängen noch

@thoniTUB thoniTUB marked this pull request as ready for review December 4, 2024 17:21
@thoniTUB thoniTUB self-assigned this Dec 4, 2024
log.error("Failed to decode message: {}", debuggedMessage , e);
}
finally {
// Set back the old limit, as the in buffer might already have data for a new object
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

verstehe den kommentar nicht ganz aber es klingt so als würden failende buffer konstant weiter akkumuliert werden, was das return false aber ausschließt

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Das ist für den Fall dass wir einen Buffer bekommen, der ein Object hat plus noch etwas mehr Daten von dem Nachfolgenden Objekt.

  • Wir merken uns das eingehende Limit damit am Ende an der stelle weiter akkumuliert werden kann
  • Wir setzten das Limit genau auf das Ende des eingehenden Objekts damit mit wird Jackson nicht einen zu großen InputStream geben. Jackson kann jetzt failen, am Wahrscheinlichsten ist hier, dass Serializierung und Deserialisierung nicht zusammen passen.
  • Danach steht der Buffer genau am Anfang des nächsten Objekts, genau da wo auch das Limit hinzeigt, dann setzten wir das Limit wieder auf den Anfangswert, damit an dessen stelle für das nächste Objekt weiter akkumuliert wird.

Ich werde zu Sicherheit auch die Position hier nochmal auf das zwischenzeitliche Limit setzten, fall das debug JSON nicht den gesamten Buffer gelesen hat.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moment, Mina gibt uns buffer in die es potentiell weiter schreiben wird? Wowie. Ich hatte erwartet, dass wenn die sowas machen, darüber abstrahiert wird für uns

@@ -30,22 +28,13 @@ public NetworkSession(IoSession session, int maxQueueLength) {

@Override
public WriteFuture send(final NetworkMessage<?> message) {
try {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warum kann der Teil dadurch weg?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ist wieder da :)

@@ -67,7 +68,8 @@ public void react(Worker context) throws Exception {
final Map<TableId, List<Bucket>> table2Buckets = context.getStorage().getAllBuckets()
.collect(Collectors.groupingBy(Bucket::getTable));

final ListeningExecutorService jobsExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(MAX_THREADS));
BasicThreadFactory threadFactory = (new BasicThreadFactory.Builder()).namingPattern(this.getClass().getSimpleName() + "-Worker-%d").build();
final ListeningExecutorService jobsExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(MAX_THREADS, threadFactory));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool

@@ -177,7 +177,7 @@ public void waitUntilWorkDone() {

if (Duration.ofNanos(System.nanoTime() - started).toSeconds() > 10) {
started = System.nanoTime();
log.warn("waiting for done work for a long time", new Exception());
log.warn("waiting for done work for a long time");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warum löscht du hier die Exception? Das ist dafür da um zu sehen wo lange gewartet wird

@thoniTUB thoniTUB enabled auto-merge December 13, 2024 10:08
@thoniTUB thoniTUB merged commit ef752fc into develop Dec 13, 2024
6 checks passed
log.trace("Sending {}. chunk: {} byte", chunkCount, ioBuffer.remaining());
DefaultWriteFuture future = new DefaultWriteFuture(session);

IoFutureListener<IoFuture> listener = handleWrittenChunk(writeRequest, writtenChunks, totalChunks);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Löst das deferren in den listener das deadlock von dem du gesprochen hattest?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also finde es etwas mysteriös, dass der filter selber die nachricht verarbeitet sie aber noch nicht verschickt ist. Hängt wohl am Model von mina.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So ist die FilterChain implementiert, du gibst die einfach immer weiter die Chain herunter. Ist aber bei Jetty so ähnlich.

Den Deadlock hatte ich als ich im CollectColumnValuesJob auf die WriteFuture gewartet hatte. Aber erklären konnte ich es mir auch nicht.

log.error("Failed to decode message: {}", debuggedMessage , e);
}
finally {
// Set back the old limit, as the in buffer might already have data for a new object
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moment, Mina gibt uns buffer in die es potentiell weiter schreiben wird? Wowie. Ich hatte erwartet, dass wenn die sowas machen, darüber abstrahiert wird für uns

buf.setAutoExpand(true);


int oldPos = buf.position();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

das ist doch nach definiont immer 0 oder?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ja so lange wir keine CachedBuffer nehmen (wo von Mina auch abrät ab einer bestimmten Java Version)

log.trace("Sent Bucket {}", bucketId);
return;
}
log.warn("Failed to send Bucket {}", bucketId);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gibt es da einen cause?

@@ -176,12 +176,12 @@ public void waitUntilWorkDone() {

if (Duration.ofNanos(System.nanoTime() - started).toSeconds() > 10) {
started = System.nanoTime();
log.warn("waiting for done work for a long time", new Exception());
log.warn("Waiting for done work for a long time", new Exception("This Exception marks the stacktrace, to show where we are waiting."));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

danke

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants