Skip to content

Commit

Permalink
Add service message types for presence API (#2121)
Browse files Browse the repository at this point in the history
* Add service message type for presence API
  • Loading branch information
Y-Sindo authored Jan 15, 2025
1 parent 6955830 commit e0b874f
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 3 deletions.
31 changes: 31 additions & 0 deletions src/Microsoft.Azure.SignalR.Protocols/Models/GroupMember.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using MessagePack;
using static Microsoft.Azure.SignalR.Protocol.MessagePackUtils;

namespace Microsoft.Azure.SignalR.Protocol;

/// <summary>
/// Represents a connection in a group.
/// </summary>
public record GroupMember : IMessagePackSerializable
{
public string ConnectionId { get; set; } = string.Empty;

public string? UserId { get; set; }

void IMessagePackSerializable.Serialize(ref MessagePackWriter writer)
{
writer.WriteArrayHeader(2);
writer.Write(ConnectionId);
writer.Write(UserId);
}

void IMessagePackSerializable.Load(ref MessagePackReader reader, string fieldName)
{
_ = reader.ReadArrayHeader();
ConnectionId = ReadStringNotNull(ref reader, nameof(ConnectionId));
UserId = reader.ReadString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Collections.Generic;
using MessagePack;

namespace Microsoft.Azure.SignalR.Protocol;

public sealed class GroupMemberQueryResponse : IMessagePackSerializable
{
/// <summary>
/// The group members.
/// </summary>
public IReadOnlyCollection<GroupMember> Members { get; set; } = [];

/// <summary>
/// A token that allows the client to retrieve the next page of results.
/// This parameter is provided by the service in the response of a previous request when there are additional results to be fetched.
/// Clients should include the continuationToken in the next request to receive the subsequent page of data. If this parameter is omitted, the server will return the first page of results.
/// </summary>
public string? ContinuationToken { get; set; }

void IMessagePackSerializable.Serialize(ref MessagePackWriter writer)
{
writer.WriteArrayHeader(2);

writer.WriteArrayHeader(Members.Count);
foreach (var member in Members)
{
(member as IMessagePackSerializable).Serialize(ref writer);
}
writer.Write(ContinuationToken);
}

void IMessagePackSerializable.Load(ref MessagePackReader reader, string fieldName)
{
_ = reader.ReadArrayHeader();
var memberCount = reader.ReadArrayHeader();
var members = new List<GroupMember>(memberCount);
for (var i = 0; i < memberCount; i++)
{
members.Add(reader.Deserialize<GroupMember>("groupMembers"));
}
Members = members;
ContinuationToken = reader.ReadString();
}
}
34 changes: 34 additions & 0 deletions src/Microsoft.Azure.SignalR.Protocols/ServiceMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -572,4 +572,38 @@ public ServiceMappingMessage(string invocationId, string connectionId, string in
InstanceId = instanceId;
}
}

/// <summary>
/// A message to list connections in a group.
/// </summary>
/// <remarks>The expected response of this message is an <see cref="AckMessage"/> whose <see cref="AckMessage.Payload"/> is a serialized <see cref="GroupMemberQueryResponse"/>.</remarks>
public class GroupMemberQueryMessage : ExtensibleServiceMessage, IAckableMessage, IMessageWithTracingId
{
/// <summary>
/// The id to ack.
/// </summary>
public int AckId { get; set; }

/// <summary>
/// The name of the group to list.
/// </summary>
public string GroupName { get; set; } = string.Empty;

/// <summary>
/// The max count of connections to return.
/// </summary>
public int Max { get; set; } = 200;

/// <summary>
/// A token to indiate the start point of results.
/// This parameter is provided by the service in the response of a previous request when there are additional results to be fetched.
/// Clients should include the continuationToken in the next request to receive the subsequent page of data. If this parameter is omitted, the server will return the first page of results.
/// </summary>
public string? ContinuationToken { get; set; }

/// <summary>
/// The tracing id.
/// </summary>
public ulong? TracingId { get; set; }
}
}
28 changes: 27 additions & 1 deletion src/Microsoft.Azure.SignalR.Protocols/ServiceProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ public bool TryParseMessage(ref ReadOnlySequence<byte> input, out ServiceMessage
return CreateServiceMappingMessage(ref reader, arrayLength);
case ServiceProtocolConstants.ConnectionFlowControlMessageType:
return CreateConnectionFlowControlMessage(ref reader, arrayLength);
case ServiceProtocolConstants.GroupMemberQueryMessageType:
return CreateGroupMemberQueryMessage(ref reader, arrayLength);
default:
// Future protocol changes can add message types, old clients can ignore them
return null;
Expand Down Expand Up @@ -336,6 +338,9 @@ private static void WriteMessageCore(ref MessagePackWriter writer, ServiceMessag
case ConnectionFlowControlMessage connectionFlowControlMessage:
WriteConnectionFlowControlMessage(ref writer, connectionFlowControlMessage);
break;
case GroupMemberQueryMessage groupMemberQueryMessage:
WriteGroupMemberQueryMessage(ref writer, groupMemberQueryMessage);
break;
default:
throw new InvalidDataException($"Unexpected message type: {message.GetType().Name}");
}
Expand Down Expand Up @@ -751,6 +756,17 @@ private static void WriteConnectionFlowControlMessage(ref MessagePackWriter writ
message.WriteExtensionMembers(ref writer);
}

private static void WriteGroupMemberQueryMessage(ref MessagePackWriter writer, GroupMemberQueryMessage message)
{
writer.WriteArrayHeader(6);
writer.Write(ServiceProtocolConstants.GroupMemberQueryMessageType);
message.WriteExtensionMembers(ref writer);
writer.Write(message.GroupName);
writer.Write(message.AckId);
writer.Write(message.Max);
writer.Write(message.ContinuationToken);
}

private static void WriteStringArray(ref MessagePackWriter writer, IReadOnlyList<string>? array)
{
if (array?.Count > 0)
Expand Down Expand Up @@ -1266,7 +1282,6 @@ private static CheckUserExistenceWithAckMessage CreateCheckUserExistenceWithAckM
{
var userId = ReadStringNotNull(ref reader, "userId");
var ackId = ReadInt32(ref reader, "ackId");

var result = new CheckUserExistenceWithAckMessage(userId, ackId);
result.ReadExtensionMembers(ref reader);
return result;
Expand Down Expand Up @@ -1372,4 +1387,15 @@ private static ConnectionFlowControlMessage CreateConnectionFlowControlMessage(r
(ConnectionType)connectionType);
return result;
}

private static GroupMemberQueryMessage CreateGroupMemberQueryMessage(ref MessagePackReader reader, int arrayLength)
{
var result = new GroupMemberQueryMessage();
result.ReadExtensionMembers(ref reader);
result.GroupName = ReadStringNotNull(ref reader, nameof(GroupMemberQueryMessage.GroupName));
result.AckId = ReadInt32(ref reader, nameof(GroupMemberQueryMessage.AckId));
result.Max = ReadInt32(ref reader, nameof(GroupMemberQueryMessage.Max));
result.ContinuationToken = ReadString(ref reader, nameof(GroupMemberQueryMessage.ContinuationToken));
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Azure.SignalR.Protocol;
using Xunit;

namespace Microsoft.Azure.SignalR.Protocols.Tests.Models;
public class GroupMemberQueryResponsePayloadTests
{
[Fact]
public void TestMessagePackSerialization()
{
var groupMembers = new List<GroupMember>
{
new GroupMember { ConnectionId = "conn1", UserId = "user1" },
new GroupMember { ConnectionId = "conn2", UserId = "user2" }
};
var payload = new GroupMemberQueryResponse
{
Members = groupMembers,
ContinuationToken = "token"
};
var buffer = new ArrayBufferWriter<byte>();
var protocol = new ServiceProtocol();
protocol.WriteMessagePayload(payload, buffer);
var deserialized = protocol.ParseMessagePayload<GroupMemberQueryResponse>(new
ReadOnlySequence<byte>(buffer.WrittenMemory));
Assert.Equal(payload.ContinuationToken, deserialized.ContinuationToken);
Assert.True(payload.Members.SequenceEqual(deserialized.Members));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Buffers;
using Microsoft.Azure.SignalR.Protocol;
using Xunit;

namespace Microsoft.Azure.SignalR.Protocols.Tests.Models;
public class GroupMemberTests
{
[Fact]
public void TestMessagePackSerialization()
{
var groupMember = new GroupMember() { ConnectionId = "conn", UserId = "userId" };
var buffer = new ArrayBufferWriter<byte>();
var protocol = new ServiceProtocol();
protocol.WriteMessagePayload(groupMember, buffer);
var deserialized = protocol.ParseMessagePayload<GroupMember>(new ReadOnlySequence<byte>(buffer.WrittenMemory));
Assert.Equal(groupMember, deserialized);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public bool Equals(ServiceMessage x, ServiceMessage y)
return ServiceMappingMessageEqual(serviceMappingMessage, (ServiceMappingMessage)y);
case ConnectionFlowControlMessage connectionFlowControlMessage:
return ConnectionFlowControlMessageEqual(connectionFlowControlMessage, (ConnectionFlowControlMessage)y);
case GroupMemberQueryMessage groupMemberQueryMessage:
return GroupMemberQueryMessageEqual(groupMemberQueryMessage, (GroupMemberQueryMessage)y);
default:
throw new InvalidOperationException($"Unknown message type: {x.GetType().FullName}");
}
Expand Down Expand Up @@ -398,6 +400,15 @@ private bool ConnectionFlowControlMessageEqual(ConnectionFlowControlMessage x, C
Equals(x.Operation, y.Operation);
}

private bool GroupMemberQueryMessageEqual(GroupMemberQueryMessage x, GroupMemberQueryMessage y)
{
return x.AckId == y.AckId &&
StringEqual(x.GroupName, y.GroupName) &&
x.Max == y.Max &&
StringEqual(x.ContinuationToken, y.ContinuationToken) &&
x.TracingId == y.TracingId;
}

private static bool StringEqual(string x, string y)
{
return string.Equals(x, y, StringComparison.Ordinal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
using System.Text;

using Microsoft.Extensions.Primitives;
using Moq;
using Newtonsoft.Json.Linq;
using Xunit;

namespace Microsoft.Azure.SignalR.Protocol.Tests
Expand Down Expand Up @@ -704,6 +702,10 @@ public static IEnumerable<object[]> TestParseOldData
name: nameof(ConnectionFlowControlMessage) + "-2",
message: new ConnectionFlowControlMessage("conn2", ConnectionFlowControlOperation.Offline, ConnectionType.Server),
binary: "lSelY29ubjLSAAAAAtIAAAAEgA=="),
new ProtocolTestData(
name: "GroupMemberQueryMessage",
message: new GroupMemberQueryMessage() { GroupName = "group", AckId = 1, Max = 10, ContinuationToken = "token", TracingId = 1234UL },
binary: "liiBAc0E0qVncm91cAEKpXRva2Vu"),
}.ToDictionary(t => t.Name);

#pragma warning restore CS0618 // Type or member is obsolete
Expand Down

0 comments on commit e0b874f

Please sign in to comment.