-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
simplify the mina stack by cumulating IoBuffers until there are enough #3633
Conversation
Tests hängen noch |
log.error("Failed to decode message: {}", debuggedMessage , e); | ||
} | ||
finally { | ||
// Set back the old limit, as the in buffer might already have data for a new object |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
verstehe den kommentar nicht ganz aber es klingt so als würden failende buffer konstant weiter akkumuliert werden, was das return false aber ausschließt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Das ist für den Fall dass wir einen Buffer bekommen, der ein Object hat plus noch etwas mehr Daten von dem Nachfolgenden Objekt.
- Wir merken uns das eingehende Limit damit am Ende an der stelle weiter akkumuliert werden kann
- Wir setzten das Limit genau auf das Ende des eingehenden Objekts damit mit wird Jackson nicht einen zu großen InputStream geben. Jackson kann jetzt failen, am Wahrscheinlichsten ist hier, dass Serializierung und Deserialisierung nicht zusammen passen.
- Danach steht der Buffer genau am Anfang des nächsten Objekts, genau da wo auch das Limit hinzeigt, dann setzten wir das Limit wieder auf den Anfangswert, damit an dessen stelle für das nächste Objekt weiter akkumuliert wird.
Ich werde zu Sicherheit auch die Position hier nochmal auf das zwischenzeitliche Limit setzten, fall das debug JSON nicht den gesamten Buffer gelesen hat.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moment, Mina gibt uns buffer in die es potentiell weiter schreiben wird? Wowie. Ich hatte erwartet, dass wenn die sowas machen, darüber abstrahiert wird für uns
backend/src/main/java/com/bakdata/conquery/io/mina/JacksonProtocolEncoder.java
Show resolved
Hide resolved
@@ -30,22 +28,13 @@ public NetworkSession(IoSession session, int maxQueueLength) { | |||
|
|||
@Override | |||
public WriteFuture send(final NetworkMessage<?> message) { | |||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warum kann der Teil dadurch weg?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ist wieder da :)
@@ -67,7 +68,8 @@ public void react(Worker context) throws Exception { | |||
final Map<TableId, List<Bucket>> table2Buckets = context.getStorage().getAllBuckets() | |||
.collect(Collectors.groupingBy(Bucket::getTable)); | |||
|
|||
final ListeningExecutorService jobsExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(MAX_THREADS)); | |||
BasicThreadFactory threadFactory = (new BasicThreadFactory.Builder()).namingPattern(this.getClass().getSimpleName() + "-Worker-%d").build(); | |||
final ListeningExecutorService jobsExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(MAX_THREADS, threadFactory)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool
@@ -177,7 +177,7 @@ public void waitUntilWorkDone() { | |||
|
|||
if (Duration.ofNanos(System.nanoTime() - started).toSeconds() > 10) { | |||
started = System.nanoTime(); | |||
log.warn("waiting for done work for a long time", new Exception()); | |||
log.warn("waiting for done work for a long time"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Warum löscht du hier die Exception? Das ist dafür da um zu sehen wo lange gewartet wird
…o feature/simplify-mina-stack
log.trace("Sending {}. chunk: {} byte", chunkCount, ioBuffer.remaining()); | ||
DefaultWriteFuture future = new DefaultWriteFuture(session); | ||
|
||
IoFutureListener<IoFuture> listener = handleWrittenChunk(writeRequest, writtenChunks, totalChunks); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Löst das deferren in den listener das deadlock von dem du gesprochen hattest?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also finde es etwas mysteriös, dass der filter selber die nachricht verarbeitet sie aber noch nicht verschickt ist. Hängt wohl am Model von mina.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So ist die FilterChain implementiert, du gibst die einfach immer weiter die Chain herunter. Ist aber bei Jetty so ähnlich.
Den Deadlock hatte ich als ich im CollectColumnValuesJob auf die WriteFuture gewartet hatte. Aber erklären konnte ich es mir auch nicht.
log.error("Failed to decode message: {}", debuggedMessage , e); | ||
} | ||
finally { | ||
// Set back the old limit, as the in buffer might already have data for a new object |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moment, Mina gibt uns buffer in die es potentiell weiter schreiben wird? Wowie. Ich hatte erwartet, dass wenn die sowas machen, darüber abstrahiert wird für uns
buf.setAutoExpand(true); | ||
|
||
|
||
int oldPos = buf.position(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
das ist doch nach definiont immer 0 oder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ja so lange wir keine CachedBuffer nehmen (wo von Mina auch abrät ab einer bestimmten Java Version)
backend/src/main/java/com/bakdata/conquery/io/mina/JacksonProtocolEncoder.java
Show resolved
Hide resolved
log.trace("Sent Bucket {}", bucketId); | ||
return; | ||
} | ||
log.warn("Failed to send Bucket {}", bucketId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gibt es da einen cause?
backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java
Show resolved
Hide resolved
@@ -176,12 +176,12 @@ public void waitUntilWorkDone() { | |||
|
|||
if (Duration.ofNanos(System.nanoTime() - started).toSeconds() > 10) { | |||
started = System.nanoTime(); | |||
log.warn("waiting for done work for a long time", new Exception()); | |||
log.warn("Waiting for done work for a long time", new Exception("This Exception marks the stacktrace, to show where we are waiting.")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
danke
Tests fehlen noch und eine Strategie, sollten wir doch mal das IoBuffer limit erreichen.