Skip to content

Commit

Permalink
Fix initialization of DataStorm samples after session recovery (#3294)
Browse files Browse the repository at this point in the history
  • Loading branch information
pepone authored Dec 30, 2024
1 parent 075234d commit ec19397
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 20 deletions.
55 changes: 36 additions & 19 deletions cpp/src/DataStorm/SessionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1134,28 +1134,45 @@ SessionI::subscriberInitialized(
out << _id << ": initialized '" << element << "' from 'e" << elementId << '@' << topicId << "'";
}
elementSubscriber->initialized = true;
elementSubscriber->lastId = samples.empty() ? 0 : samples.back().id;

vector<shared_ptr<Sample>> samplesI;
samplesI.reserve(samples.size());
auto sampleFactory = element->getTopic()->getSampleFactory();
auto keyFactory = element->getTopic()->getKeyFactory();
for (const auto& sample : samples)
// If the samples collection is empty, the element subscriber's lastId remains unchanged:
// - If no samples have been received, lastId is 0.
// - If the element subscriber has been initialized before, lastId represents the ID of the latest received sample.
//
// If the samples collection is not empty:
// - It contains samples queued in the peer writer for the element that are valid according to the element's
// configuration.
// - These samples have not yet been processed by the element subscriber, according to the subscriber's lastId.
if (samples.empty())
{
assert((!key && !sample.keyValue.empty()) || key == subscriber.keys[sample.keyId].first);

samplesI.push_back(sampleFactory->create(
_id,
elementSubscribers->name,
sample.id,
sample.event,
key ? key : keyFactory->decode(_instance->getCommunicator(), sample.keyValue),
subscriber.tags[sample.tag],
sample.value,
sample.timestamp));
assert(samplesI.back()->key);
return {};
}
else
{
assert(samples.front().id > elementSubscriber->lastId);
elementSubscriber->lastId = samples.back().id;

vector<shared_ptr<Sample>> samplesI;
samplesI.reserve(samples.size());
auto sampleFactory = element->getTopic()->getSampleFactory();
auto keyFactory = element->getTopic()->getKeyFactory();
for (const auto& sample : samples)
{
assert((!key && !sample.keyValue.empty()) || key == subscriber.keys[sample.keyId].first);

samplesI.push_back(sampleFactory->create(
_id,
elementSubscribers->name,
sample.id,
sample.event,
key ? key : keyFactory->decode(_instance->getCommunicator(), sample.keyValue),
subscriber.tags[sample.tag],
sample.value,
sample.timestamp));
assert(samplesI.back()->key);
}
return samplesI;
}
return samplesI;
}

void
Expand Down
58 changes: 57 additions & 1 deletion cpp/test/DataStorm/reliability/Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void ::Reader::run(int argc, char* argv[])
auto connection = node.getSessionConnection(sample.getSession());
while (!connection)
{
this_thread::sleep_for(chrono::milliseconds(200));
this_thread::sleep_for(chrono::milliseconds(10));
connection = node.getSessionConnection(sample.getSession());
}
connection->close().get();
Expand All @@ -68,6 +68,62 @@ void ::Reader::run(int argc, char* argv[])
writer.update(0);
writer.waitForNoReaders();
}

{
Topic<string, int> topic(node, "int2");
auto reader = makeSingleKeyReader(topic, "element", "", config);
string session;

// Read 100 samples from the "element" key and close the connection.
for (int i = 0; i < 100; ++i)
{
auto sample = reader.getNextUnread();
if (sample.getValue() != i)
{
cerr << "unexpected sample: " << sample.getValue() << " expected:" << i << endl;
test(false);
}
session = sample.getSession();
}

auto connection = node.getSessionConnection(session);
test(connection);
connection->close().get();

// Send a sample to the writer on "reader_barrier" to let it know that the connection was closed.
// The writer will read it after the session is reestablished.
auto writerB = makeSingleKeyWriter(topic, "reader_barrier");
writerB.waitForReaders();
writerB.update(0);

// Wait for the writer to acknowledge the sample send on "reader_barrier" and close the connection again.
auto readerB = makeSingleKeyReader(topic, "writer_barrier");
[[maybe_unused]] auto _ = readerB.getNextUnread();

// Session was reestablished; close it again.
connection = node.getSessionConnection(session);
test(connection);
connection->close().get();

// Let the writer know the connection was closed again, and that it can proceed with the second batch of
// samples.
writerB.update(0);

for (int i = 0; i < 100; ++i)
{
auto sample = reader.getNextUnread();
if (sample.getValue() != i + 100)
{
cerr << "unexpected sample: " << sample.getValue() << " expected:" << (i + 100) << endl;
test(false);
}
}

// Let the writer know we have processed all samples.
writerB.waitForReaders();
writerB.update(0);
writerB.waitForNoReaders();
}
}

DEFINE_TEST(::Reader)
37 changes: 37 additions & 0 deletions cpp/test/DataStorm/reliability/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,43 @@ void ::Writer::run(int argc, char* argv[])
[[maybe_unused]] auto _ = makeSingleKeyReader(topic, "barrier").getNextUnread();
}
cout << "ok" << endl;

// Publish a batch of samples to a topic's key, follow by two consecutive session recovery events without writer
// activity on the given key.
// Then send a second batch of samples to the same topic's key and ensure the reader continue reading from when it
// left off.
cout << "testing reader multiple connection closure without writer activity... " << flush;
{
Topic<string, int> topic(node, "int2");
auto writer = makeSingleKeyWriter(topic, "element", "", config);
writer.waitForReaders();
for (int i = 0; i < 100; ++i)
{
writer.update(i);
}

auto readerB = makeSingleKeyReader(topic, "reader_barrier");

// A control sample sent by the reader to let the writer know the connection was closed. The writer processes
// this sample after the first session reestablishment.
auto sample = readerB.getNextUnread();

// Send a control sample to let the reader know session was reestablished.
auto writerB = makeSingleKeyWriter(topic, "writer_barrier");
writerB.update(0);

// Wait for a second control sample from the reader indicating the second session closure. The writer process
// this sample after the second session reestablishment.
sample = readerB.getNextUnread();

// Session has been reestablish twice without activity in "element" key. Send the second batch of samples.
for (int i = 0; i < 100; ++i)
{
writer.update(i + 100);
}
sample = readerB.getNextUnread();
}
cout << "ok" << endl;
}

DEFINE_TEST(::Writer)

0 comments on commit ec19397

Please sign in to comment.