Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs Client] Producer Dead Code Removal #18424

Merged
merged 1 commit into from
Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -491,64 +491,6 @@ internal virtual async Task<PartitionPublishingProperties> GetPartitionPublishin
}
}

/// <summary>
/// Sends the <see cref="EventData" /> to the associated Event Hub. To avoid the
/// overhead associated with measuring and validating the size in the client, validation will
/// be delegated to the Event Hubs service and is deferred until the operation is invoked.
/// The call will fail if the size of the specified <paramref name="eventData"/> exceeds the
/// maximum allowable size of a single event.
/// </summary>
///
/// <param name="eventData">The event data to send.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken" /> instance to signal the request to cancel the operation.</param>
///
/// <returns>
/// A task to be resolved on when the operation has completed; if no exception is thrown when awaited, the
/// Event Hubs service has acknowledge receipt and assumed responsibility for delivery of the event.
/// </returns>
///
/// <seealso cref="SendAsync(EventData, SendEventOptions, CancellationToken)" />
/// <seealso cref="SendAsync(IEnumerable{EventData}, CancellationToken)" />
/// <seealso cref="SendAsync(IEnumerable{EventData}, SendEventOptions, CancellationToken)" />
/// <seealso cref="SendAsync(EventDataBatch, CancellationToken)" />
///
internal virtual async Task SendAsync(EventData eventData,
CancellationToken cancellationToken = default)
{
Argument.AssertNotNull(eventData, nameof(eventData));
await SendAsync(new[] { eventData }, null, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Sends the <see cref="EventData" /> to the associated Event Hub. To avoid the
/// overhead associated with measuring and validating the size in the client, validation will
/// be delegated to the Event Hubs service and is deferred until the operation is invoked.
/// The call will fail if the size of the specified <paramref name="eventData"/> exceeds the
/// maximum allowable size of a single event.
/// </summary>
///
/// <param name="eventData">The event data to send.</param>
/// <param name="options">The set of options to consider when sending this batch.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken" /> instance to signal the request to cancel the operation.</param>
///
/// <returns>
/// A task to be resolved on when the operation has completed; if no exception is thrown when awaited, the
/// Event Hubs service has acknowledge receipt and assumed responsibility for delivery of the event.
/// </returns>
///
/// <seealso cref="SendAsync(EventData, CancellationToken)" />
/// <seealso cref="SendAsync(IEnumerable{EventData}, CancellationToken)" />
/// <seealso cref="SendAsync(IEnumerable{EventData}, SendEventOptions, CancellationToken)" />
/// <seealso cref="SendAsync(EventDataBatch, CancellationToken)" />
///
internal virtual async Task SendAsync(EventData eventData,
SendEventOptions options,
CancellationToken cancellationToken = default)
{
Argument.AssertNotNull(eventData, nameof(eventData));
await SendAsync(new[] { eventData }, options, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Sends a set of events to the associated Event Hub as a single operation. To avoid the
/// overhead associated with measuring and validating the size in the client, validation will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public async Task EventHubProducerCreatesDiagnosticScopeOnSend()
var producer = new EventHubProducerClient(fakeConnection, transportMock.Object);

var eventData = new EventData(ReadOnlyMemory<byte>.Empty);
await producer.SendAsync(eventData);
await producer.SendAsync(new[] { eventData });

activity.Stop();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,48 +229,6 @@ public async Task ProducerCanSendAnEventBatchUsingAPartitionHashKey()
}
}

/// <summary>
/// Verifies that the <see cref="EventHubProducerClient" /> is able to
/// connect to the Event Hubs service and perform operations.
/// </summary>
///
[Test]
public async Task ProducerCanSendSingleZeroLengthEvent()
{
await using (EventHubScope scope = await EventHubScope.CreateAsync(1))
{
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);

await using (var producer = new EventHubProducerClient(connectionString))
{
var singleEvent = new EventData(Array.Empty<byte>());
Assert.That(async () => await producer.SendAsync(singleEvent), Throws.Nothing);
}
}
}

/// <summary>
/// Verifies that the <see cref="EventHubProducerClient" /> is able to
/// connect to the Event Hubs service and perform operations.
/// </summary>
///
[Test]
public async Task ProducerCanSendSingleLargeEvent()
{
await using (EventHubScope scope = await EventHubScope.CreateAsync(1))
{
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);

await using (var producer = new EventHubProducerClient(connectionString, new EventHubProducerClientOptions { RetryOptions = new EventHubsRetryOptions { TryTimeout = TimeSpan.FromMinutes(5) } }))
{
// Actual limit is 1046520 for a single event.

var singleEvent = new EventData(new byte[100000]);
Assert.That(async () => await producer.SendAsync(singleEvent), Throws.Nothing);
}
}
}

/// <summary>
/// Verifies that the <see cref="EventHubProducerClient" /> is able to
/// connect to the Event Hubs service and perform operations.
Expand All @@ -293,31 +251,6 @@ public async Task ProducerCanSendSingleLargeEventInASet()
}
}

/// <summary>
/// Verifies that the <see cref="EventHubProducerClient" /> is able to
/// connect to the Event Hubs service and perform operations.
/// </summary>
///
[Test]
public async Task ProducerCannotSendSingleEventLargerThanMaximumSize()
{
await using (EventHubScope scope = await EventHubScope.CreateAsync(1))
{
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);

await using (var producer = new EventHubProducerClient(connectionString))
{
// Actual limit is 1046520 for a single event.

var singleEvent = new EventData(new byte[1500000]);
EventData[] eventBatch = new[] { new EventData(new byte[1500000]) };

Assert.That(async () => await producer.SendAsync(singleEvent), Throws.InstanceOf<EventHubsException>().And.Property(nameof(EventHubsException.Reason)).EqualTo(EventHubsException.FailureReason.MessageSizeExceeded));
Assert.That(async () => await producer.SendAsync(eventBatch), Throws.InstanceOf<EventHubsException>().And.Property(nameof(EventHubsException.Reason)).EqualTo(EventHubsException.FailureReason.MessageSizeExceeded));
}
}
}

/// <summary>
/// Verifies that the <see cref="EventHubProducerClient" /> is able to
/// connect to the Event Hubs service and perform operations.
Expand Down Expand Up @@ -1011,7 +944,7 @@ public async Task ProducerSendsEventsWithTheSamePartitionHashKeyToTheSamePartiti

for (var index = 0; index < batches; index++)
{
await producer.SendAsync(new EventData(Encoding.UTF8.GetBytes($"Just a few messages ({ index })")), batchOptions);
await producer.SendAsync(new[] { new EventData(Encoding.UTF8.GetBytes($"Just a few messages ({ index })")) }, batchOptions);
}

// Read the events.
Expand Down Expand Up @@ -1076,7 +1009,7 @@ public async Task ProducerCannotSendWhenProxyIsInvalid()

await using (var invalidProxyProducer = new EventHubProducerClient(connectionString, producerOptions))
{
Assert.That(async () => await invalidProxyProducer.SendAsync(new EventData(new byte[1])), Throws.InstanceOf<WebSocketException>().Or.InstanceOf<TimeoutException>());
Assert.That(async () => await invalidProxyProducer.SendAsync(new[] { new EventData(new byte[1]) }), Throws.InstanceOf<WebSocketException>().Or.InstanceOf<TimeoutException>());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,68 +514,6 @@ public async Task ReadPartitionPublishingPropertiesAsyncReturnsEmptyPartitionSta
"Partition state should not have been initialized.");
}

/// <summary>
/// Verifies functionality of the <see cref="EventHubProducerClient.SendAsync" />
/// method.
/// </summary>
///
[Test]
public void SendSingleWithoutOptionsRequiresAnEvent()
{
var producer = new EventHubProducerClient(new MockConnection());
Assert.That(async () => await producer.SendAsync(default(EventData)), Throws.ArgumentNullException);
}

/// <summary>
/// Verifies functionality of the <see cref="EventHubProducerClient.SendAsync" />
/// method.
/// </summary>
///
[Test]
public void SendSingleRequiresAnEvent()
{
var producer = new EventHubProducerClient(new MockConnection());
Assert.That(async () => await producer.SendAsync(default(EventData), new SendEventOptions()), Throws.ArgumentNullException);
}

/// <summary>
/// Verifies functionality of the <see cref="EventHubProducerClient.SendAsync" />
/// method.
/// </summary>
///
[Test]
public async Task SendSingleWithoutOptionsDelegatesToBatchSend()
{
var transportProducer = new ObservableTransportProducerMock();
var producer = new Mock<EventHubProducerClient> { CallBase = true };

producer
.Setup(instance => instance.SendAsync(It.Is<IEnumerable<EventData>>(value => value.Count() == 1), It.IsAny<SendEventOptions>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask)
.Verifiable("The single send should delegate to the batch send.");

await producer.Object.SendAsync(new EventData(new byte[] { 0x22 }));
}

/// <summary>
/// Verifies functionality of the <see cref="EventHubProducerClient.SendAsync" />
/// method.
/// </summary>
///
[Test]
public async Task SendSingleWitOptionsDelegatesToBatchSend()
{
var transportProducer = new ObservableTransportProducerMock();
var producer = new Mock<EventHubProducerClient> { CallBase = true };

producer
.Setup(instance => instance.SendAsync(It.Is<IEnumerable<EventData>>(value => value.Count() == 1), It.IsAny<SendEventOptions>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask)
.Verifiable("The single send should delegate to the batch send.");

await producer.Object.SendAsync(new EventData(new byte[] { 0x22 }), new SendEventOptions());
}

/// <summary>
/// Verifies functionality of the <see cref="EventHubProducerClient.SendAsync" />
/// method.
Expand Down