Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce IBasicPublishBatch #368

Merged
merged 12 commits into from
Dec 4, 2017
2 changes: 1 addition & 1 deletion projects/client/ApigenBootstrap/ApigenBootstrap.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
</PropertyGroup>

<ItemGroup>
<Compile Include="..\RabbitMQ.Client\src\client\api\AmqpTimestamp.cs;..\RabbitMQ.Client\src\client\api\IBasicConsumer.cs;..\RabbitMQ.Client\src\client\api\IBasicProperties.cs;..\RabbitMQ.Client\src\client\api\IContentHeader.cs;..\RabbitMQ.Client\src\client\api\IModel.cs;..\RabbitMQ.Client\src\client\api\PublicationAddress.cs;..\RabbitMQ.Client\src\client\api\BasicGetResult.cs;..\RabbitMQ.Client\src\client\api\QueueDeclareOk.cs;..\RabbitMQ.Client\src\client\api\ShutdownEventArgs.cs;..\RabbitMQ.Client\src\client\api\ShutdownInitiator.cs;..\RabbitMQ.Client\src\client\events\BasicReturnEventArgs.cs;..\RabbitMQ.Client\src\client\events\BasicAckEventArgs.cs;..\RabbitMQ.Client\src\client\events\BasicNackEventArgs.cs;..\RabbitMQ.Client\src\client\events\CallbackExceptionEventArgs.cs;..\RabbitMQ.Client\src\client\events\ConsumerEventArgs.cs;..\RabbitMQ.Client\src\client\events\FlowControlEventArgs.cs;..\RabbitMQ.Client\src\client\impl\IFullModel.cs" />
<Compile Include="..\RabbitMQ.Client\src\client\api\AmqpTimestamp.cs;..\RabbitMQ.Client\src\client\api\IBasicConsumer.cs;..\RabbitMQ.Client\src\client\api\IBasicProperties.cs;..\RabbitMQ.Client\src\client\api\IContentHeader.cs;..\RabbitMQ.Client\src\client\api\IModel.cs;..\RabbitMQ.Client\src\client\api\PublicationAddress.cs;..\RabbitMQ.Client\src\client\api\IBasicPublishBatch.cs;..\RabbitMQ.Client\src\client\api\BasicGetResult.cs;..\RabbitMQ.Client\src\client\api\QueueDeclareOk.cs;..\RabbitMQ.Client\src\client\api\ShutdownEventArgs.cs;..\RabbitMQ.Client\src\client\api\ShutdownInitiator.cs;..\RabbitMQ.Client\src\client\events\BasicReturnEventArgs.cs;..\RabbitMQ.Client\src\client\events\BasicAckEventArgs.cs;..\RabbitMQ.Client\src\client\events\BasicNackEventArgs.cs;..\RabbitMQ.Client\src\client\events\CallbackExceptionEventArgs.cs;..\RabbitMQ.Client\src\client\events\ConsumerEventArgs.cs;..\RabbitMQ.Client\src\client\events\FlowControlEventArgs.cs;..\RabbitMQ.Client\src\client\impl\IFullModel.cs" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 1.1.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2016 Pivotal Software, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v1.1:
//
//---------------------------------------------------------------------------
// The contents of this file are subject to the Mozilla Public License
// Version 1.1 (the "License"); you may not use this file except in
// compliance with the License. You may obtain a copy of the License
// at http://www.mozilla.org/MPL/
//
// Software distributed under the License is distributed on an "AS IS"
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
// the License for the specific language governing rights and
// limitations under the License.
//
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
//---------------------------------------------------------------------------
namespace RabbitMQ.Client
{
public interface IBasicPublishBatch
{
void Add(string exchange, string routingKey, bool mandatory, IBasicProperties properties, byte[] body);
void Publish();
}
}
7 changes: 7 additions & 0 deletions projects/client/RabbitMQ.Client/src/client/api/IModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

using RabbitMQ.Client.Apigen.Attributes;
using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;

Expand Down Expand Up @@ -278,6 +279,12 @@ void BasicPublish(string exchange, string routingKey, bool mandatory,
[AmqpMethodDoNotImplement(null)]
void ConfirmSelect();

/// <summary>
/// Creates a BasicPublishBatch instance
/// </summary>
[AmqpMethodDoNotImplement(null)]
IBasicPublishBatch CreateBasicPublishBatch();

/// <summary>
/// Construct a completely empty content header for use with the Basic content class.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1212,5 +1212,10 @@ protected void RunRecoveryEventHandlers()
}
}
}

public IBasicPublishBatch CreateBasicPublishBatch()
{
return ((IFullModel)m_delegate).CreateBasicPublishBatch();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 1.1.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2016 Pivotal Software, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v1.1:
//
//---------------------------------------------------------------------------
// The contents of this file are subject to the Mozilla Public License
// Version 1.1 (the "License"); you may not use this file except in
// compliance with the License. You may obtain a copy of the License
// at http://www.mozilla.org/MPL/
//
// Software distributed under the License is distributed on an "AS IS"
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
// the License for the specific language governing rights and
// limitations under the License.
//
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
//---------------------------------------------------------------------------

namespace RabbitMQ.Client.Impl
{
using System.Collections.Generic;
using RabbitMQ.Client;
using RabbitMQ.Client.Framing.Impl;
using RabbitMQ.Client.Impl;

public class BasicPublishBatch : IBasicPublishBatch
{
private List<Command> commands = new List<Command>();
private ModelBase model;
internal BasicPublishBatch (ModelBase model)
{
this.model = model;
}

public void Add(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, byte[] body)
{
var bp = basicProperties == null ? model.CreateBasicProperties() : basicProperties;
var method = new BasicPublish
{
m_exchange = exchange,
m_routingKey = routingKey,
m_mandatory = mandatory
};

commands.Add(new Command(method, (ContentHeaderBase)bp, body));
}

public void Publish()
{
model.SendCommands(commands);
}
Copy link
Contributor

@YulerB YulerB Nov 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we clear commands to allow reuse of the batch , or do you see this as a way of issuing a batch of commands daily, like an actor could?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the intention is that you'll be creating a new instance via CreateBasicPublishBatch() and not reusing these because the constructor is internal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've deliberately not put any particular intent on the API but just kept it as a simple buffer abstraction that users can use as they see fit.

}
}
29 changes: 29 additions & 0 deletions projects/client/RabbitMQ.Client/src/client/impl/Command.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ public void Transmit(int channelNumber, Connection connection)
}
}



public void TransmitAsSingleFrame(int channelNumber, Connection connection)
{
connection.WriteFrame(new MethodOutboundFrame(channelNumber, Method));
Expand All @@ -166,5 +168,32 @@ public void TransmitAsFrameSet(int channelNumber, Connection connection)

connection.WriteFrameSet(frames);
}


public static List<OutboundFrame> CalculateFrames(int channelNumber, Connection connection, IList<Command> commands)
{
var frames = new List<OutboundFrame>();

foreach (var cmd in commands)
{
frames.Add(new MethodOutboundFrame(channelNumber, cmd.Method));
if (cmd.Method.HasContent)
{
var body = cmd.Body;// var body = ConsolidateBody(); // Cache, since the property is compiled.

frames.Add(new HeaderOutboundFrame(channelNumber, cmd.Header, body.Length));
var frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax);
var bodyPayloadMax = (frameMax == 0) ? body.Length : frameMax - EmptyFrameSize;
for (int offset = 0; offset < body.Length; offset += bodyPayloadMax)
{
var remaining = body.Length - offset;
var count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
frames.Add(new BodySegmentOutboundFrame(channelNumber, body, offset, count));
}
}
}

return frames;
}
}
}
18 changes: 11 additions & 7 deletions projects/client/RabbitMQ.Client/src/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

using System.Text;
using System.Threading;
using System.Reflection;

namespace RabbitMQ.Client.Framing.Impl
{
Expand Down Expand Up @@ -105,7 +106,16 @@ public class Connection : IConnection
private Timer _heartbeatWriteTimer;
private Timer _heartbeatReadTimer;
private AutoResetEvent m_heartbeatRead = new AutoResetEvent(false);
private AutoResetEvent m_heartbeatWrite = new AutoResetEvent(false);

#if CORECLR
private static string version = typeof(Connection).GetTypeInfo().Assembly
.GetCustomAttribute<AssemblyInformationalVersionAttribute>()
.InformationalVersion;
#else
private static string version = typeof(Connection).Assembly
.GetCustomAttribute<AssemblyInformationalVersionAttribute>()
.InformationalVersion;
#endif


// true if we haven't finished connection negotiation.
Expand Down Expand Up @@ -354,9 +364,6 @@ IProtocol IConnection.Protocol

public static IDictionary<string, object> DefaultClientProperties()
{

string version = "0.0.0.0";// assembly.GetName().Version.ToString();
//TODO: Get the rest of this data from the Assembly Attributes
IDictionary<string, object> table = new Dictionary<string, object>();
table["product"] = Encoding.UTF8.GetBytes("RabbitMQ");
table["version"] = Encoding.UTF8.GetBytes(version);
Expand Down Expand Up @@ -537,7 +544,6 @@ public void FinishClose()
{
// Notify hearbeat loops that they can leave
m_heartbeatRead.Set();
m_heartbeatWrite.Set();
m_closed = true;
MaybeStopHeartbeatTimers();

Expand Down Expand Up @@ -1170,13 +1176,11 @@ public override string ToString()
public void WriteFrame(OutboundFrame f)
{
m_frameHandler.WriteFrame(f);
m_heartbeatWrite.Set();
}

public void WriteFrameSet(IList<OutboundFrame> f)
{
m_frameHandler.WriteFrameSet(f);
m_heartbeatWrite.Set();
}

///<summary>API-side invocation of connection abort.</summary>
Expand Down
2 changes: 2 additions & 0 deletions projects/client/RabbitMQ.Client/src/client/impl/ISession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;

namespace RabbitMQ.Client.Impl
{
Expand Down Expand Up @@ -79,5 +80,6 @@ public interface ISession
void HandleFrame(InboundFrame frame);
void Notify();
void Transmit(Command cmd);
void Transmit(IList<Command> cmd);
}
}
32 changes: 32 additions & 0 deletions projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1205,6 +1205,26 @@ public abstract void BasicNack(ulong deliveryTag,
bool multiple,
bool requeue);

internal void AllocatatePublishSeqNos(int count)
{
var c = 0;
lock (m_unconfirmedSet.SyncRoot)
{
while(c < count)
{
if (NextPublishSeqNo > 0)
{
if (!m_unconfirmedSet.Contains(NextPublishSeqNo))
{
m_unconfirmedSet.Add(NextPublishSeqNo);
}
NextPublishSeqNo++;
}
c++;
}
}
}

public void BasicPublish(string exchange,
string routingKey,
bool mandatory,
Expand Down Expand Up @@ -1273,6 +1293,10 @@ public void ConfirmSelect()
///////////////////////////////////////////////////////////////////////////

public abstract IBasicProperties CreateBasicProperties();
public IBasicPublishBatch CreateBasicPublishBatch()
{
return new BasicPublishBatch(this);
}


public void ExchangeBind(string destination,
Expand Down Expand Up @@ -1496,6 +1520,13 @@ public void WaitForConfirmsOrDie(TimeSpan timeout)
}
}

internal void SendCommands(IList<Command> commands)
{
m_flowControlBlock.WaitOne();
AllocatatePublishSeqNos(commands.Count);
Session.Transmit(commands);
}

protected virtual void handleAckNack(ulong deliveryTag, bool multiple, bool isNack)
{
lock (m_unconfirmedSet.SyncRoot)
Expand Down Expand Up @@ -1534,6 +1565,7 @@ private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bo
return k.m_result;
}


public class BasicConsumerRpcContinuation : SimpleBlockingRpcContinuation
{
public IBasicConsumer m_consumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
using System;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing.Impl;
using System.Collections.Generic;

namespace RabbitMQ.Client.Impl
{
Expand Down Expand Up @@ -199,5 +200,9 @@ public virtual void Transmit(Command cmd)
// of frames within a channel. But that is fixed in socket frame handler instead, so no need to lock.
cmd.Transmit(ChannelNumber, Connection);
}
public virtual void Transmit(IList<Command> commands)
{
Connection.WriteFrameSet(Command.CalculateFrames(ChannelNumber, Connection, commands));
}
}
}
Loading