Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce IBasicPublishBatch #368

Merged
merged 12 commits into from
Dec 4, 2017
2 changes: 1 addition & 1 deletion projects/client/ApigenBootstrap/ApigenBootstrap.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
</PropertyGroup>

<ItemGroup>
<Compile Include="..\RabbitMQ.Client\src\client\api\AmqpTimestamp.cs;..\RabbitMQ.Client\src\client\api\IBasicConsumer.cs;..\RabbitMQ.Client\src\client\api\IBasicProperties.cs;..\RabbitMQ.Client\src\client\api\IContentHeader.cs;..\RabbitMQ.Client\src\client\api\IModel.cs;..\RabbitMQ.Client\src\client\api\PublicationAddress.cs;..\RabbitMQ.Client\src\client\api\BasicGetResult.cs;..\RabbitMQ.Client\src\client\api\QueueDeclareOk.cs;..\RabbitMQ.Client\src\client\api\ShutdownEventArgs.cs;..\RabbitMQ.Client\src\client\api\ShutdownInitiator.cs;..\RabbitMQ.Client\src\client\events\BasicReturnEventArgs.cs;..\RabbitMQ.Client\src\client\events\BasicAckEventArgs.cs;..\RabbitMQ.Client\src\client\events\BasicNackEventArgs.cs;..\RabbitMQ.Client\src\client\events\CallbackExceptionEventArgs.cs;..\RabbitMQ.Client\src\client\events\ConsumerEventArgs.cs;..\RabbitMQ.Client\src\client\events\FlowControlEventArgs.cs;..\RabbitMQ.Client\src\client\impl\IFullModel.cs" />
<Compile Include="..\RabbitMQ.Client\src\client\api\AmqpTimestamp.cs;..\RabbitMQ.Client\src\client\api\IBasicConsumer.cs;..\RabbitMQ.Client\src\client\api\IBasicProperties.cs;..\RabbitMQ.Client\src\client\api\IContentHeader.cs;..\RabbitMQ.Client\src\client\api\IModel.cs;..\RabbitMQ.Client\src\client\api\PublicationAddress.cs;..\RabbitMQ.Client\src\client\api\IMessageBatch.cs;..\RabbitMQ.Client\src\client\api\BasicGetResult.cs;..\RabbitMQ.Client\src\client\api\QueueDeclareOk.cs;..\RabbitMQ.Client\src\client\api\ShutdownEventArgs.cs;..\RabbitMQ.Client\src\client\api\ShutdownInitiator.cs;..\RabbitMQ.Client\src\client\events\BasicReturnEventArgs.cs;..\RabbitMQ.Client\src\client\events\BasicAckEventArgs.cs;..\RabbitMQ.Client\src\client\events\BasicNackEventArgs.cs;..\RabbitMQ.Client\src\client\events\CallbackExceptionEventArgs.cs;..\RabbitMQ.Client\src\client\events\ConsumerEventArgs.cs;..\RabbitMQ.Client\src\client\events\FlowControlEventArgs.cs;..\RabbitMQ.Client\src\client\impl\IFullModel.cs" />
</ItemGroup>

</Project>
47 changes: 47 additions & 0 deletions projects/client/RabbitMQ.Client/src/client/api/IMessageBatch.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 1.1.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2016 Pivotal Software, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v1.1:
//
//---------------------------------------------------------------------------
// The contents of this file are subject to the Mozilla Public License
// Version 1.1 (the "License"); you may not use this file except in
// compliance with the License. You may obtain a copy of the License
// at http://www.mozilla.org/MPL/
//
// Software distributed under the License is distributed on an "AS IS"
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
// the License for the specific language governing rights and
// limitations under the License.
//
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
//---------------------------------------------------------------------------
namespace RabbitMQ.Client
{
public interface IMessageBatch
{
void Add(string exchange, string routingKey, bool mandatory, IBasicProperties properties, byte[] body);
Copy link
Contributor

@YulerB YulerB Nov 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would love an overload called AddRange that took an IEnumerable of Message that contained these elements.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's something that could be very easily added as an application specific extension method as required. From a core api point of view I don't see the need to introduce a Message type to hold the fields in question (If I understood your request correctly).

that said maybe the name IMessageBatch isn't that great - IBasicPublishBatch feels more appropriate perhaps.

void Publish();
}
}
7 changes: 7 additions & 0 deletions projects/client/RabbitMQ.Client/src/client/api/IModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

using RabbitMQ.Client.Apigen.Attributes;
using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;

Expand Down Expand Up @@ -278,6 +279,12 @@ void BasicPublish(string exchange, string routingKey, bool mandatory,
[AmqpMethodDoNotImplement(null)]
void ConfirmSelect();

/// <summary>
/// Creates a MessagBatch instance
/// </summary>
[AmqpMethodDoNotImplement(null)]
IMessageBatch CreateMessageBatch();

/// <summary>
/// Construct a completely empty content header for use with the Basic content class.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1212,5 +1212,10 @@ protected void RunRecoveryEventHandlers()
}
}
}

public IMessageBatch CreateMessageBatch()
{
return ((IFullModel)m_delegate).CreateMessageBatch();
}
}
}
28 changes: 28 additions & 0 deletions projects/client/RabbitMQ.Client/src/client/impl/Command.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ public void Transmit(int channelNumber, Connection connection)
}
}



public void TransmitAsSingleFrame(int channelNumber, Connection connection)
{
connection.WriteFrame(new MethodOutboundFrame(channelNumber, Method));
Expand All @@ -166,5 +168,31 @@ public void TransmitAsFrameSet(int channelNumber, Connection connection)

connection.WriteFrameSet(frames);
}


public static IList<OutboundFrame> CalculateFrames(int channelNumber, Connection connection, IEnumerable<Command> commands)
{
List<OutboundFrame> frames = new List<Impl.OutboundFrame>();

foreach (var cmd in commands)
{
frames.Add(new MethodOutboundFrame(channelNumber, cmd.Method));
if (cmd.Method.HasContent)
{
var body = cmd.Body;// var body = ConsolidateBody(); // Cache, since the property is compiled.

frames.Add(new HeaderOutboundFrame(channelNumber, cmd.Header, body.Length));
var frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax);
var bodyPayloadMax = (frameMax == 0) ? body.Length : frameMax - EmptyFrameSize;
for (int offset = 0; offset < body.Length; offset += bodyPayloadMax)
{
var remaining = body.Length - offset;
var count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
frames.Add(new BodySegmentOutboundFrame(channelNumber, body, offset, count));
}
}
}
return frames;
}
}
}
18 changes: 11 additions & 7 deletions projects/client/RabbitMQ.Client/src/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

using System.Text;
using System.Threading;
using System.Reflection;

namespace RabbitMQ.Client.Framing.Impl
{
Expand Down Expand Up @@ -105,7 +106,16 @@ public class Connection : IConnection
private Timer _heartbeatWriteTimer;
private Timer _heartbeatReadTimer;
private AutoResetEvent m_heartbeatRead = new AutoResetEvent(false);
private AutoResetEvent m_heartbeatWrite = new AutoResetEvent(false);

#if CORECLR
private static string version = typeof(Connection).GetTypeInfo().Assembly
.GetCustomAttribute<AssemblyInformationalVersionAttribute>()
.InformationalVersion;
#else
private static string version = typeof(Connection).Assembly
.GetCustomAttribute<AssemblyInformationalVersionAttribute>()
.InformationalVersion;
#endif


// true if we haven't finished connection negotiation.
Expand Down Expand Up @@ -354,9 +364,6 @@ IProtocol IConnection.Protocol

public static IDictionary<string, object> DefaultClientProperties()
{

string version = "0.0.0.0";// assembly.GetName().Version.ToString();
//TODO: Get the rest of this data from the Assembly Attributes
IDictionary<string, object> table = new Dictionary<string, object>();
table["product"] = Encoding.UTF8.GetBytes("RabbitMQ");
table["version"] = Encoding.UTF8.GetBytes(version);
Expand Down Expand Up @@ -537,7 +544,6 @@ public void FinishClose()
{
// Notify hearbeat loops that they can leave
m_heartbeatRead.Set();
m_heartbeatWrite.Set();
m_closed = true;
MaybeStopHeartbeatTimers();

Expand Down Expand Up @@ -1170,13 +1176,11 @@ public override string ToString()
public void WriteFrame(OutboundFrame f)
{
m_frameHandler.WriteFrame(f);
m_heartbeatWrite.Set();
}

public void WriteFrameSet(IList<OutboundFrame> f)
{
m_frameHandler.WriteFrameSet(f);
m_heartbeatWrite.Set();
}

///<summary>API-side invocation of connection abort.</summary>
Expand Down
4 changes: 3 additions & 1 deletion projects/client/RabbitMQ.Client/src/client/impl/ISession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
//---------------------------------------------------------------------------

using System;

using System.Collections.Generic;

namespace RabbitMQ.Client.Impl
{
public interface ISession
Expand Down Expand Up @@ -79,5 +80,6 @@ public interface ISession
void HandleFrame(InboundFrame frame);
void Notify();
void Transmit(Command cmd);
void Transmit(IEnumerable<Command> cmd);
}
}
75 changes: 75 additions & 0 deletions projects/client/RabbitMQ.Client/src/client/impl/MessageBatch.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 1.1.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2016 Pivotal Software, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v1.1:
//
//---------------------------------------------------------------------------
// The contents of this file are subject to the Mozilla Public License
// Version 1.1 (the "License"); you may not use this file except in
// compliance with the License. You may obtain a copy of the License
// at http://www.mozilla.org/MPL/
//
// Software distributed under the License is distributed on an "AS IS"
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
// the License for the specific language governing rights and
// limitations under the License.
//
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
//---------------------------------------------------------------------------

namespace RabbitMQ.Client.Impl
{
using System.Collections.Generic;
using RabbitMQ.Client;
using RabbitMQ.Client.Framing.Impl;
using RabbitMQ.Client.Impl;

public class MessageBatch : IMessageBatch
{
private List<Command> commands = new List<Command>();
private ModelBase model;
internal MessageBatch(ModelBase model)
{
this.model = model;
}

public void Add(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, byte[] body)
{
var bp = basicProperties == null ? model.CreateBasicProperties() : basicProperties;
var method = new BasicPublish
{
m_exchange = exchange,
m_routingKey = routingKey,
m_mandatory = mandatory
};

commands.Add(new Command(method, (ContentHeaderBase)bp, body));
}

public void Publish()
{
model.SendCommands(commands);
}
}
}
32 changes: 32 additions & 0 deletions projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1205,6 +1205,26 @@ public abstract void BasicNack(ulong deliveryTag,
bool multiple,
bool requeue);

internal void AllocatatePublishSeqNos(int count)
{
var c = 0;
lock (m_unconfirmedSet.SyncRoot)
{
while(c < count)
{
if (NextPublishSeqNo > 0)
{
if (!m_unconfirmedSet.Contains(NextPublishSeqNo))
{
m_unconfirmedSet.Add(NextPublishSeqNo);
}
NextPublishSeqNo++;
}
c++;
}
}
}

public void BasicPublish(string exchange,
string routingKey,
bool mandatory,
Expand Down Expand Up @@ -1273,6 +1293,10 @@ public void ConfirmSelect()
///////////////////////////////////////////////////////////////////////////

public abstract IBasicProperties CreateBasicProperties();
public IMessageBatch CreateMessageBatch()
{
return new MessageBatch(this);
}


public void ExchangeBind(string destination,
Expand Down Expand Up @@ -1496,6 +1520,13 @@ public void WaitForConfirmsOrDie(TimeSpan timeout)
}
}

internal void SendCommands(IList<Command> commands)
{
m_flowControlBlock.WaitOne();
AllocatatePublishSeqNos(commands.Count);
Session.Transmit(commands);
}

protected virtual void handleAckNack(ulong deliveryTag, bool multiple, bool isNack)
{
lock (m_unconfirmedSet.SyncRoot)
Expand Down Expand Up @@ -1534,6 +1565,7 @@ private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bo
return k.m_result;
}


public class BasicConsumerRpcContinuation : SimpleBlockingRpcContinuation
{
public IBasicConsumer m_consumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
using System;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing.Impl;
using System.Collections.Generic;

namespace RabbitMQ.Client.Impl
{
Expand Down Expand Up @@ -199,5 +200,9 @@ public virtual void Transmit(Command cmd)
// of frames within a channel. But that is fixed in socket frame handler instead, so no need to lock.
cmd.Transmit(ChannelNumber, Connection);
}
public virtual void Transmit(IEnumerable<Command> commands)
{
Connection.WriteFrameSet(Command.CalculateFrames(ChannelNumber, Connection, commands));
}
}
}
Loading