Skip to content

Commit

Permalink
tech feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
norareidy committed Dec 26, 2024
1 parent 5764b75 commit 949557d
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 56 deletions.
9 changes: 8 additions & 1 deletion source/fundamentals/crud/read-operations/change-streams.txt
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,14 @@ reassemble any event fragments:
:end-before: end-split-change-event-sync
:language: csharp

The preceding example uses the ``GetNextChangeStreamEvent()`` and ``MergeFragment()``
.. note::

We recommend reassembling change event fragments, as shown in the
preceding example, but this step is optional. You can use the same
logic to watch split events and complete change events.

The preceding example uses the ``GetNextChangeStreamEvent()``,
``GetNextChangeStreamEventAsync()``, and ``MergeFragment()``
methods to reassemble change event fragments into a single change stream document.
The following code defines these methods:

Expand Down
110 changes: 55 additions & 55 deletions source/includes/code-examples/change-streams/change-streams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,25 @@ await cursor.ForEachAsync(change =>

// start-split-event-helpers-sync
// Fetches the next complete change stream event
private static ChangeStreamDocument<TDocument> GetNextChangeStreamEvent<TDocument>(
private static IEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEvent<TDocument>(
IEnumerator<ChangeStreamDocument<TDocument>> changeStreamEnumerator)
{
var changeStreamEvent = changeStreamEnumerator.Current;

// Reassembles change event fragments if the event is split
if (changeStreamEvent.SplitEvent != null)
{
var fragment = changeStreamEvent;
while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of)
{
changeStreamEnumerator.MoveNext();
fragment = changeStreamEnumerator.Current;
MergeFragment(changeStreamEvent, fragment);
}
}
return changeStreamEvent;
}
{
while (changeStreamEnumerator.MoveNext())
{
var changeStreamEvent = changeStreamEnumerator.Current;
if (changeStreamEvent.SplitEvent != null)
{
var fragment = changeStreamEvent;
while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of)
{
changeStreamEnumerator.MoveNext();
fragment = changeStreamEnumerator.Current;
MergeFragment(changeStreamEvent, fragment);
}
}
yield return changeStreamEvent;
}
}

// Merges a fragment into the base event
private static void MergeFragment<TDocument>(
Expand All @@ -103,26 +104,37 @@ private static void MergeFragment<TDocument>(

// start-split-event-helpers-async
// Fetches the next complete change stream event
private static async Task<ChangeStreamDocument<TDocument>> GetNextChangeStreamEvent<TDocument>(
private static async IAsyncEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEventAsync<TDocument>(
IAsyncCursor<ChangeStreamDocument<TDocument>> changeStreamCursor)
{
var changeStreamEvent = changeStreamCursor.Current.First();

// Reassembles change event fragments if the event is split
if (changeStreamEvent.SplitEvent != null)
var changeStreamEnumerator = GetNextChangeStreamEventFragmentAsync(changeStreamCursor).GetAsyncEnumerator();
while (await changeStreamEnumerator.MoveNextAsync())
{
var fragment = changeStreamEvent;
while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of)
var changeStreamEvent = changeStreamEnumerator.Current;
if (changeStreamEvent.SplitEvent != null)
{
if (!await changeStreamCursor.MoveNextAsync())
var fragment = changeStreamEvent;
while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of)
{
throw new InvalidOperationException("Incomplete split event fragments.");
await changeStreamEnumerator.MoveNextAsync();
fragment = changeStreamEnumerator.Current;
MergeFragment(changeStreamEvent, fragment);
}
fragment = changeStreamCursor.Current.First();
MergeFragment(changeStreamEvent, fragment);
}
yield return changeStreamEvent;
}
}

private static async IAsyncEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEventFragmentAsync<TDocument>(
IAsyncCursor<ChangeStreamDocument<TDocument>> changeStreamCursor)
{
while (await changeStreamCursor.MoveNextAsync())
{
foreach (var changeStreamEvent in changeStreamCursor.Current)
{
yield return changeStreamEvent;
}
}
return changeStreamEvent;
}

// Merges a fragment into the base event
Expand All @@ -140,39 +152,27 @@ private static void MergeFragment<TDocument>(
}
// end-split-event-helpers-async

// start-split-change-event-async
// start-split-change-event-sync
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.ChangeStreamSplitLargeEvent();

using (var cursor = await collection.WatchAsync(pipeline))
{
while (await cursor.MoveNextAsync())
{
foreach (var changeStreamEvent in cursor.Current)
{
var completeEvent = await GetNextChangeStreamEvent(cursor);
Console.WriteLine("Received the following change: " + completeEvent.BackingDocument);
}
}
}
// end-split-change-event-async
using var cursor = collection.Watch(pipeline);
foreach (var completeEvent in GetNextChangeStreamEvent(cursor.ToEnumerable().GetEnumerator()))
{
Console.WriteLine("Received the following change: " + completeEvent.BackingDocument);
}
// end-split-change-event-sync

// start-split-change-event-sync
// start-split-change-event-async
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.ChangeStreamSplitLargeEvent();

using (var cursor = collection.Watch(pipeline))
{
using (var enumerator = cursor.ToEnumerable().GetEnumerator())
{
while (enumerator.MoveNext())
{
var completeEvent = GetNextChangeStreamEvent(enumerator);
Console.WriteLine("Received the following change: " + completeEvent.BackingDocument);
}
}
}
// end-split-change-event-sync
using var cursor = await collection.WatchAsync(pipeline);
await foreach (var completeEvent in GetNextChangeStreamEventAsync(cursor))
{
Console.WriteLine("Received the following change: " + completeEvent.BackingDocument);
}
// end-split-change-event-async

// start-change-stream-post-image
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
Expand Down

0 comments on commit 949557d

Please sign in to comment.