Skip to content

Commit

Permalink
Add MigrationLevel to HandshakeRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
terencefan committed Jan 8, 2020
1 parent f542e51 commit 6f7a5da
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 53 deletions.
22 changes: 19 additions & 3 deletions specs/ServiceProtocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ The Azure SignalR Service Protocol is a protocol between Azure SignalR Service a
## Terms

- Service - Azure SignalR Service. It accepts connections from both clients and servers, acting as the abstract transport between them. It will internally maintain a one-to-one mapping between clients and servers, to make sure that messages are correctly routed to the recipients as if it is a physical transport.
- Server - Application server node, which is connected to the Azure SignalR Service, using this protocol to receive data from and send data to clients through Azure SignalR Service.
- Server - Application server node, which is connected to the Azure SignalR Service, using this protocol to receive data from and send data to clients through Azure SignalR Service.
- Client - The SignalR client connected to the Azure SignalR Service. The Azure SignalR Service will look exactly the same as a self-hosted SignalR server from the client's perspective.

## Overview
Expand Down Expand Up @@ -53,7 +53,7 @@ Ack | Service | Sent from Service to Server to return the operation result of Jo

## Communication Model

This protocol will be used between Service and Server. There will be one or a few physical connections between Service and Server. Data from/to multiple client connections will be multiplexed within these physical connections. Each client connection will be identified by a unique connection Id.
This protocol will be used between Service and Server. There will be one or a few physical connections between Service and Server. Data from/to multiple client connections will be multiplexed within these physical connections. Each client connection will be identified by a unique connection Id.

The number of client connections will be far more (over 100 times) than the number of physical connections between Service and Server.

Expand All @@ -67,15 +67,23 @@ Server will initiate a physical connection to Service, using WebSocket transport

When a new client is connected to Service, a `OpenConnection` message will be sent by Service to Server.

#### Client migrate-in from another server

When a new client is migrated from another server, a `OpenConnection` message will be sent by Service to Server, with an `Asrs-Migrated-In` header given.

### Client Disconnect

- When a client is disconnected from Service, a `CloseConnection` message will be sent by Service to Server.
- When Server wants to disconnect a client, a `CloseConnection` message will be sent by Server to Service. Then Service will disconnect the phyical connection with the target client.

#### Client migrate-out to another server

When a client is migrated to another server, a `CloseConnection` message will be sent by Service to Server, with an `Asrs-Migrated-Out` header given.

### Client Data Pass Through

- When a client sends data to Service, a `ConnectionData` message will be sent by Service to Server.
- When Server wants to send data to a client, a `ConnectionData` message will be sent by Server to Service.
- When Server wants to send data to a client, a `ConnectionData` message will be sent by Server to Service.

### SignalR scenarios

Expand All @@ -100,6 +108,14 @@ MessagePack uses different formats to encode values. Refer to the [MessagePack F
```
- 1 - Message Type, indicating this is a `HandshakeRequest` message.
- Version - A `Int32` encoding number of the protocol version.
- ConnectionType - A `Int32` encoding number of the connection type.
- 0, Default, it can carry clients, service runtime should always accept this kind of connection.
- 1, OnDemand, creating when service requested more connections, it can carry clients, but it may be rejected by service runtime.
- 2, Weak, it can not carry clients, but it can send message.
- MigrationLevel - A `Int32` encoding number indicates if further client connections associated with this server connection could be migrated.
- 0, Off, a client connection can not be migrated to another server.
- 1, ShutdownOnly, a client connection can be migrated only if the matched server was shutdown gracefully.
- 2, Any, a client connection can be migrated even if the matched server connection was dropped accidentally. (may cause data loss)

#### Example: TODO

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace Microsoft.Azure.SignalR
{
public enum ServerConnectionMigrationLevel
{
Off = 0,
ShutdownOnly = 1,
Any = 2,
}
}
12 changes: 12 additions & 0 deletions src/Microsoft.Azure.SignalR.Protocols/ServiceMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ public class HandshakeRequestMessage : ServiceMessage
/// </value>
public int ConnectionType { get; set; }

/// <summary>
/// Gets or sets the migratable flag.
/// <value>
/// <list type="bullet">
/// <item>0, Off, a client connection can not be migrated to another server.</item>
/// <item>1, ShutdownOnly, a client connection can be migrated only if the pairing server was shutdown gracefully.</item>
/// <item>2, Any, a client connection can be migrated even if the pairing server connection was dropped accidentally. (may cause data loss)</item>
/// </list>
/// </value>
/// </summary>
public int MigrationLevel { get; set; }

/// <summary>
/// Gets or sets the target of service connection, only work for OnDemand connections.
/// </summary>
Expand Down
26 changes: 9 additions & 17 deletions src/Microsoft.Azure.SignalR.Protocols/ServiceProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -227,20 +227,12 @@ private static void WriteMessageCore(ServiceMessage message, Stream packer)

private static void WriteHandshakeRequestMessage(HandshakeRequestMessage message, Stream packer)
{
if (message.ConnectionType == 0)
{
MessagePackBinary.WriteArrayHeader(packer, 2);
MessagePackBinary.WriteInt32(packer, ServiceProtocolConstants.HandshakeRequestType);
MessagePackBinary.WriteInt32(packer, message.Version);
}
else
{
MessagePackBinary.WriteArrayHeader(packer, 4);
MessagePackBinary.WriteInt32(packer, ServiceProtocolConstants.HandshakeRequestType);
MessagePackBinary.WriteInt32(packer, message.Version);
MessagePackBinary.WriteInt32(packer, message.ConnectionType);
MessagePackBinary.WriteString(packer, message.Target ?? string.Empty);
}
MessagePackBinary.WriteArrayHeader(packer, 5);
MessagePackBinary.WriteInt32(packer, ServiceProtocolConstants.HandshakeRequestType);
MessagePackBinary.WriteInt32(packer, message.Version);
MessagePackBinary.WriteInt32(packer, message.ConnectionType);
MessagePackBinary.WriteString(packer, message.ConnectionType == 0 ? "" : message.Target ?? string.Empty);
MessagePackBinary.WriteInt32(packer, (int) message.MigrationLevel);
}

private static void WriteHandshakeResponseMessage(HandshakeResponseMessage message, Stream packer)
Expand Down Expand Up @@ -516,7 +508,8 @@ private static HandshakeRequestMessage CreateHandshakeRequestMessage(byte[] inpu
{
result.ConnectionType = ReadInt32(input, ref offset, "connectionType");
result.Target = ReadString(input, ref offset, "target");
}
}
result.MigrationLevel = arrayLength >= 5 ? ReadInt32(input, ref offset, "migratableStatus") : 0;
return result;
}

Expand Down Expand Up @@ -553,7 +546,6 @@ private static OpenConnectionMessage CreateOpenConnectionMessage(int arrayLength
{
var headers = ReadHeaders(input, ref offset);
var queryString = ReadString(input, ref offset, "queryString");

return new OpenConnectionMessage(connectionId, claims, headers, queryString);
}
else
Expand All @@ -575,7 +567,7 @@ private static ConnectionDataMessage CreateConnectionDataMessage(byte[] input, r
var connectionId = ReadString(input, ref offset, "connectionId");
var payload = ReadBytes(input, ref offset, "payload");

return new ConnectionDataMessage(connectionId, payload);
return new ConnectionDataMessage(connectionId, payload);
}

private static MultiConnectionDataMessage CreateMultiConnectionDataMessage(byte[] input, ref int offset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ public static class ServiceProtocolConstants
public const int JoinGroupWithAckMessageType = 18;
public const int LeaveGroupWithAckMessageType = 19;
public const int AckMessageType = 20;
}
}
}
10 changes: 9 additions & 1 deletion src/Microsoft.Azure.SignalR/ServiceOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,15 @@ public class ServiceOptions : IServiceEndpointOptions
internal TimeSpan ServerShutdownTimeout { get; set; } = TimeSpan.FromSeconds(Constants.DefaultShutdownTimeoutInSeconds);

/// <summary>
/// Gets or sets the proxy used when ServiceEndpoint will attempt to connect to Azure SignalR Service.
/// Specifies if the client-connection assigned to this server can be migrated to another server.
/// Default value is 0.
/// 1: Only migrate client-connection if server was shutdown gracefully.
/// 2: Migrate client-connection even if server-connection was accidentally dropped. (Potential data losses)
/// </summary>
internal ServerConnectionMigrationLevel MigrationLevel { get; set; } = ServerConnectionMigrationLevel.Off;

/// <summary>
/// Gets or sets the proxy used when ServiceEndpoint will attempt to connect to Azure SignalR.
/// </summary>
public IWebProxy Proxy { get; set; }
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp3.0</TargetFramework>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public static IEnumerable<object[]> TestWriteData
{
get
{
foreach (var k in TestData.Values)
foreach (var k in TestData.Keys)
{
yield return new object[] { k };
}
Expand All @@ -30,39 +30,32 @@ public static IEnumerable<object[]> TestParseData
{
get
{
foreach (var k in TestCompacityData.Values)
{
yield return new object[] { k };
}
foreach (var k in TestData.Values)
foreach (var k in TestData.Keys)
{
yield return new object[] { k };
}
}
}

public static IDictionary<string, ProtocolTestData> TestCompacityData => new[]
{
new ProtocolTestData(
name: "CloseConnection",
message: new CloseConnectionMessage("conn3"),
binary: "kwWlY29ubjPA"),
new ProtocolTestData(
name: "CloseConnectionWithError",
message: new CloseConnectionMessage("conn4", "Error message."),
binary: "kwWlY29ubjSuRXJyb3IgbWVzc2FnZS4="),
}.ToDictionary(t => t.Name);

public static IDictionary<string, ProtocolTestData> TestData => new[]
public static IEnumerable<object[]> TestParseOldData
{
new ProtocolTestData(
name: "CloseConnection",
message: new CloseConnectionMessage("conn3"),
binary: "lAWlY29ubjOggA=="),
new ProtocolTestData(
name: "CloseConnectionWithError",
message: new CloseConnectionMessage("conn4", "Error message."),
binary: "lAWlY29ubjSuRXJyb3IgbWVzc2FnZS6A"),
get
{
foreach (var k in TestCompatibilityData.Keys)
{
yield return new object[] { k };
}
}
}
public static IDictionary<string, ProtocolTestData> TestCompatibilityData => new[] {
new ProtocolTestData(
name: "CloseConnection",
message: new CloseConnectionMessage("conn3"),
binary: "kwWlY29ubjPA"),
new ProtocolTestData(
name: "CloseConnectionWithError",
message: new CloseConnectionMessage("conn4", "Error message."),
binary: "kwWlY29ubjSuRXJyb3IgbWVzc2FnZS4="),
new ProtocolTestData(
name: "CloseConnectionWithHeaders",
message: new CloseConnectionMessage("conn4", "Error message.", new Dictionary<string, StringValues>() {
Expand All @@ -77,6 +70,22 @@ public static IEnumerable<object[]> TestParseData
name: "HandshakeRequestWithProperty",
message: new HandshakeRequestMessage(1) { ConnectionType = 1, Target = "abc" },
binary: "lAEBAaNhYmM="),
}.ToDictionary(t => t.Name);

public static IDictionary<string, ProtocolTestData> TestData => new[]
{
new ProtocolTestData(
name: "HandshakeRequest",
message: new HandshakeRequestMessage(1),
binary: "lQEBAKAA"),
new ProtocolTestData(
name: "HandshakeRequestWithProperty",
message: new HandshakeRequestMessage(1) { ConnectionType = 1, Target = "abc" },
binary: "lQEBAaNhYmMA"),
new ProtocolTestData(
name: "HandshakeRequestWithMigratableStatus",
message: new HandshakeRequestMessage(1) { MigrationLevel = 1},
binary: "lQEBAKAB"),
new ProtocolTestData(
name: "HandshakeResponse",
message: new HandshakeResponseMessage(),
Expand Down Expand Up @@ -118,6 +127,14 @@ public static IEnumerable<object[]> TestParseData
name: "OpenConnectionWithQueryString2",
message: new OpenConnectionMessage("conn4", null, new Dictionary<string, StringValues>(), "query1=value1&query2=query2&query3=value3"),
binary: "lQSlY29ubjSAgNkpcXVlcnkxPXZhbHVlMSZxdWVyeTI9cXVlcnkyJnF1ZXJ5Mz12YWx1ZTM="),
new ProtocolTestData(
name: "CloseConnection",
message: new CloseConnectionMessage("conn3"),
binary: "lAWlY29ubjOggA=="),
new ProtocolTestData(
name: "CloseConnectionWithError",
message: new CloseConnectionMessage("conn4", "Error message."),
binary: "lAWlY29ubjSuRXJyb3IgbWVzc2FnZS6A"),
new ProtocolTestData(
name: "ConnectionData",
message: new ConnectionDataMessage("conn5", new byte[] {1, 2, 3, 4, 5, 6, 7}),
Expand Down Expand Up @@ -231,9 +248,26 @@ public static IEnumerable<object[]> TestParseData
}.ToDictionary(t => t.Name);

[Theory]
[MemberData(nameof(TestParseData))]
public void ParseMessages(ProtocolTestData testData)
[MemberData(nameof(TestParseOldData))]
public void ParseOldMessages(string testDataName)
{
var testData = TestCompatibilityData[testDataName];

// Verify that the input binary string decodes to the expected MsgPack primitives
var bytes = Convert.FromBase64String(testData.Binary);

// Parse the input fully now.
bytes = Frame(bytes);
var message = ParseServiceMessage(bytes);
Assert.Equal(testData.Message, message, ServiceMessageEqualityComparer.Instance);
}

[Theory]
[MemberData(nameof(TestParseData))]
public void ParseMessages(string testDataName)
{
var testData = TestData[testDataName];

// Verify that the input binary string decodes to the expected MsgPack primitives
var bytes = Convert.FromBase64String(testData.Binary);

Expand All @@ -245,8 +279,10 @@ public void ParseMessages(ProtocolTestData testData)

[Theory]
[MemberData(nameof(TestWriteData))]
public void WriteMessages(ProtocolTestData testData)
public void WriteMessages(string testDataName)
{
var testData = TestData[testDataName];

var bytes = Protocol.GetMessageBytes(testData.Message);

// Unframe the message to check the binary encoding
Expand Down

0 comments on commit 6f7a5da

Please sign in to comment.