Skip to content

Commit

Permalink
MessageBatch -> BasicPublishBatch
Browse files Browse the repository at this point in the history
BasicPublishBatch can be used for more efficient batch publishing that
will better utilise TCP and provides more throughput.

[#152041636]
  • Loading branch information
kjnilsson committed Nov 22, 2017
1 parent c020263 commit 74c9c24
Show file tree
Hide file tree
Showing 10 changed files with 38 additions and 37 deletions.
2 changes: 1 addition & 1 deletion projects/client/ApigenBootstrap/ApigenBootstrap.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
</PropertyGroup>

<ItemGroup>
<Compile Include="..\RabbitMQ.Client\src\client\api\AmqpTimestamp.cs;..\RabbitMQ.Client\src\client\api\IBasicConsumer.cs;..\RabbitMQ.Client\src\client\api\IBasicProperties.cs;..\RabbitMQ.Client\src\client\api\IContentHeader.cs;..\RabbitMQ.Client\src\client\api\IModel.cs;..\RabbitMQ.Client\src\client\api\PublicationAddress.cs;..\RabbitMQ.Client\src\client\api\IMessageBatch.cs;..\RabbitMQ.Client\src\client\api\BasicGetResult.cs;..\RabbitMQ.Client\src\client\api\QueueDeclareOk.cs;..\RabbitMQ.Client\src\client\api\ShutdownEventArgs.cs;..\RabbitMQ.Client\src\client\api\ShutdownInitiator.cs;..\RabbitMQ.Client\src\client\events\BasicReturnEventArgs.cs;..\RabbitMQ.Client\src\client\events\BasicAckEventArgs.cs;..\RabbitMQ.Client\src\client\events\BasicNackEventArgs.cs;..\RabbitMQ.Client\src\client\events\CallbackExceptionEventArgs.cs;..\RabbitMQ.Client\src\client\events\ConsumerEventArgs.cs;..\RabbitMQ.Client\src\client\events\FlowControlEventArgs.cs;..\RabbitMQ.Client\src\client\impl\IFullModel.cs" />
<Compile Include="..\RabbitMQ.Client\src\client\api\AmqpTimestamp.cs;..\RabbitMQ.Client\src\client\api\IBasicConsumer.cs;..\RabbitMQ.Client\src\client\api\IBasicProperties.cs;..\RabbitMQ.Client\src\client\api\IContentHeader.cs;..\RabbitMQ.Client\src\client\api\IModel.cs;..\RabbitMQ.Client\src\client\api\PublicationAddress.cs;..\RabbitMQ.Client\src\client\api\IBasicPublishBatch.cs;..\RabbitMQ.Client\src\client\api\BasicGetResult.cs;..\RabbitMQ.Client\src\client\api\QueueDeclareOk.cs;..\RabbitMQ.Client\src\client\api\ShutdownEventArgs.cs;..\RabbitMQ.Client\src\client\api\ShutdownInitiator.cs;..\RabbitMQ.Client\src\client\events\BasicReturnEventArgs.cs;..\RabbitMQ.Client\src\client\events\BasicAckEventArgs.cs;..\RabbitMQ.Client\src\client\events\BasicNackEventArgs.cs;..\RabbitMQ.Client\src\client\events\CallbackExceptionEventArgs.cs;..\RabbitMQ.Client\src\client\events\ConsumerEventArgs.cs;..\RabbitMQ.Client\src\client\events\FlowControlEventArgs.cs;..\RabbitMQ.Client\src\client\impl\IFullModel.cs" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
//---------------------------------------------------------------------------
namespace RabbitMQ.Client
{
public interface IMessageBatch
public interface IBasicPublishBatch
{
void Add(string exchange, string routingKey, bool mandatory, IBasicProperties properties, byte[] body);
void Publish();
Expand Down
4 changes: 2 additions & 2 deletions projects/client/RabbitMQ.Client/src/client/api/IModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,10 @@ void BasicPublish(string exchange, string routingKey, bool mandatory,
void ConfirmSelect();

/// <summary>
/// Creates a MessagBatch instance
/// Creates a BasicPublishBatch instance
/// </summary>
[AmqpMethodDoNotImplement(null)]
IMessageBatch CreateMessageBatch();
IBasicPublishBatch CreateBasicPublishBatch();

/// <summary>
/// Construct a completely empty content header for use with the Basic content class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1213,9 +1213,9 @@ protected void RunRecoveryEventHandlers()
}
}

public IMessageBatch CreateMessageBatch()
public IBasicPublishBatch CreateBasicPublishBatch()
{
return ((IFullModel)m_delegate).CreateMessageBatch();
return ((IFullModel)m_delegate).CreateBasicPublishBatch();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ namespace RabbitMQ.Client.Impl
using RabbitMQ.Client.Framing.Impl;
using RabbitMQ.Client.Impl;

public class MessageBatch : IMessageBatch
public class BasicPublishBatch : IBasicPublishBatch
{
private List<Command> commands = new List<Command>();
private ModelBase model;
internal MessageBatch(ModelBase model)
internal BasicPublishBatch (ModelBase model)
{
this.model = model;
}
Expand Down
41 changes: 21 additions & 20 deletions projects/client/RabbitMQ.Client/src/client/impl/Command.cs
Original file line number Diff line number Diff line change
Expand Up @@ -170,28 +170,29 @@ public void TransmitAsFrameSet(int channelNumber, Connection connection)
}


public static IList<OutboundFrame> CalculateFrames(int channelNumber, Connection connection, IEnumerable<Command> commands)
public static List<OutboundFrame> CalculateFrames(int channelNumber, Connection connection, IList<Command> commands)
{
List<OutboundFrame> frames = new List<Impl.OutboundFrame>();

foreach (var cmd in commands)
{
frames.Add(new MethodOutboundFrame(channelNumber, cmd.Method));
if (cmd.Method.HasContent)
{
var body = cmd.Body;// var body = ConsolidateBody(); // Cache, since the property is compiled.

frames.Add(new HeaderOutboundFrame(channelNumber, cmd.Header, body.Length));
var frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax);
var bodyPayloadMax = (frameMax == 0) ? body.Length : frameMax - EmptyFrameSize;
for (int offset = 0; offset < body.Length; offset += bodyPayloadMax)
{
var remaining = body.Length - offset;
var count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
frames.Add(new BodySegmentOutboundFrame(channelNumber, body, offset, count));
}
}
var frames = new List<OutboundFrame>();

foreach (var cmd in commands)
{
frames.Add(new MethodOutboundFrame(channelNumber, cmd.Method));
if (cmd.Method.HasContent)
{
var body = cmd.Body;// var body = ConsolidateBody(); // Cache, since the property is compiled.

frames.Add(new HeaderOutboundFrame(channelNumber, cmd.Header, body.Length));
var frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax);
var bodyPayloadMax = (frameMax == 0) ? body.Length : frameMax - EmptyFrameSize;
for (int offset = 0; offset < body.Length; offset += bodyPayloadMax)
{
var remaining = body.Length - offset;
var count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
frames.Add(new BodySegmentOutboundFrame(channelNumber, body, offset, count));
}
}
}

return frames;
}
}
Expand Down
6 changes: 3 additions & 3 deletions projects/client/RabbitMQ.Client/src/client/impl/ISession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;

using System.Collections.Generic;

namespace RabbitMQ.Client.Impl
{
public interface ISession
Expand Down Expand Up @@ -80,6 +80,6 @@ public interface ISession
void HandleFrame(InboundFrame frame);
void Notify();
void Transmit(Command cmd);
void Transmit(IEnumerable<Command> cmd);
void Transmit(IList<Command> cmd);
}
}
4 changes: 2 additions & 2 deletions projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1293,9 +1293,9 @@ public void ConfirmSelect()
///////////////////////////////////////////////////////////////////////////

public abstract IBasicProperties CreateBasicProperties();
public IMessageBatch CreateMessageBatch()
public IBasicPublishBatch CreateBasicPublishBatch()
{
return new MessageBatch(this);
return new BasicPublishBatch(this);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public virtual void Transmit(Command cmd)
// of frames within a channel. But that is fixed in socket frame handler instead, so no need to lock.
cmd.Transmit(ChannelNumber, Connection);
}
public virtual void Transmit(IEnumerable<Command> commands)
public virtual void Transmit(IList<Command> commands)
{
Connection.WriteFrameSet(Command.CalculateFrames(ChannelNumber, Connection, commands));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@

namespace RabbitMQ.Client.Unit
{
internal class TestMessageBatch : IntegrationFixture
internal class TestBasicPublishBatch : IntegrationFixture
{
[Test]
public void TestMessageBatchSend()
public void TestBasicPublishBatchSend()
{
Model.QueueDeclare(queue: "test-message-batch-a", durable: false);
Model.QueueDeclare(queue: "test-message-batch-b", durable: false);
var batch = Model.CreateMessageBatch();
var batch = Model.CreateBasicPublishBatch();
batch.Add("", "test-message-batch-a", false, null, new byte [] {});
batch.Add("", "test-message-batch-b", false, null, new byte [] {});
batch.Publish();
Expand Down

0 comments on commit 74c9c24

Please sign in to comment.