Skip to content

Commit

Permalink
perf: KcpConnection OnCheckEnabled callback changed to a simple 'paus…
Browse files Browse the repository at this point in the history
…ed' boolean. This is faster than invoking a Func<bool> every time and allows us to fix #8 more easily later by calling .Pause/.Unpause from OnEnable/OnDisable in MirrorTransport.
  • Loading branch information
vis2k committed Jan 12, 2021
1 parent cc60773 commit cd81abb
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 50 deletions.
50 changes: 32 additions & 18 deletions kcp2k/Assets/Tests/Editor/ClientServerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -770,11 +770,11 @@ public void TimeoutIsResetByPing()
}

// Mirror scene changes might take > 10s timeout time.
// kcp connection should not time out while disabled.
// kcp connection should not time out while paused.
//
// see also: https://github.com/vis2k/kcp2k/issues/8
[Test, Ignore("WIP")]
public void TimeoutIsPausedWhileDisabled()
public void TimeoutIsPausedWhilePaused()
{
Assert.Fail();
}
Expand Down Expand Up @@ -826,12 +826,18 @@ public void ChokeConnectionAutoDisconnects()
Assert.That(server.connections.Count, Is.EqualTo(0));
}

// OnCheckEnabled tests to make sure it works in Mirror
// client paused test to make sure we can savely support scene changes
// in Mirror by calling Pause during the receive while loop to stop
// receiving immediately!
[Test]
public void ClientRespectsOnCheckEnabled()
public void ClientImmediatelyStopsReceivingWhenPaused()
{
// client OnCheckEnabled should stop after first message
client.OnCheckEnabled = () => clientReceived.Count == 0;
// pause client in the middle of a receive while loop by sending two
// messages and pausing immediately after the first one.
client.OnData = (message) => {
ClientOnData(message);
client.Pause();
};

// start
server.Start(Port);
Expand All @@ -847,19 +853,26 @@ public void ClientRespectsOnCheckEnabled()
Assert.That(clientReceived.Count, Is.EqualTo(1));
Assert.That(clientReceived[0].SequenceEqual(new byte[]{0x03, 0x04}), Is.True);

// enable again by clearing received and make sure it still works.
clientReceived.Clear();
// unpause again make sure the second message is received (not dropped!)
client.Unpause();
UpdateSeveralTimes();
Assert.That(clientReceived.Count, Is.EqualTo(1));
Assert.That(clientReceived[0].SequenceEqual(new byte[]{0x05, 0x06}), Is.True);
Assert.That(clientReceived.Count, Is.EqualTo(2));
Assert.That(clientReceived[0].SequenceEqual(new byte[]{0x03, 0x04}), Is.True);
Assert.That(clientReceived[1].SequenceEqual(new byte[]{0x05, 0x06}), Is.True);
}

// OnCheckEnabled tests to make sure it works in Mirror
// server paused test to make sure we can savely support scene changes
// in Mirror by calling Pause during the receive while loop to stop
// receiving immediately!
[Test]
public void ServerRespectsOnCheckEnabled()
public void ServerImmediatelyStopsReceivingWhenPaused()
{
// server OnCheckEnabled should stop after first message
server.OnCheckEnabled = () => serverReceived.Count == 0;
// pause server in the middle of a receive while loop by sending two
// messages and pausing immediately after the first one.
server.OnData = (connectionId, message) => {
ServerOnData(connectionId, message);
server.Pause();
};

// start
server.Start(Port);
Expand All @@ -874,11 +887,12 @@ public void ServerRespectsOnCheckEnabled()
Assert.That(serverReceived.Count, Is.EqualTo(1));
Assert.That(serverReceived[0].SequenceEqual(new byte[]{0x03, 0x04}), Is.True);

// enable again by clearing received and make sure it still works.
serverReceived.Clear();
// unpause again make sure the second message is received (not dropped!)
server.Unpause();
UpdateSeveralTimes();
Assert.That(serverReceived.Count, Is.EqualTo(1));
Assert.That(serverReceived[0].SequenceEqual(new byte[]{0x05, 0x06}), Is.True);
Assert.That(serverReceived.Count, Is.EqualTo(2));
Assert.That(serverReceived[0].SequenceEqual(new byte[]{0x03, 0x04}), Is.True);
Assert.That(serverReceived[1].SequenceEqual(new byte[]{0x05, 0x06}), Is.True);
}
}
}
25 changes: 19 additions & 6 deletions kcp2k/Assets/kcp2k/MirrorTransport/KcpTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,6 @@ void Awake()
ReceiveWindowSize
);

// scene change message will disable transports.
// kcp processes messages in an internal loop which should be
// stopped immediately after scene change (= after disabled)
client.OnCheckEnabled = () => enabled;
server.OnCheckEnabled = () => enabled;

Debug.Log("KcpTransport initialized!");
}

Expand Down Expand Up @@ -126,6 +120,25 @@ public void LateUpdate()
client.Tick();
}

// scene change message will disable transports.
// kcp processes messages in an internal loop which should be
// stopped immediately after scene change (= after disabled)
// => kcp has tests to guaranteed that calling .Pause() during the
// receive loop stops the receive loop immediately, not after.
void OnEnable()
{
// unpause when enabled again
client?.Unpause();
server?.Unpause();
}

void OnDisable()
{
// pause immediately when not enabled anymore
client?.Pause();
server?.Pause();
}

// server
public override Uri ServerUri()
{
Expand Down
15 changes: 5 additions & 10 deletions kcp2k/Assets/kcp2k/highlevel/KcpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@ public class KcpClient
public Action<ArraySegment<byte>> OnData;
public Action OnDisconnected;

// Mirror needs a way to stop kcp message processing while loop
// immediately after a scene change message. Mirror can't process any
// other messages during a scene change.
// (could be useful for others too)
public Func<bool> OnCheckEnabled = () => true;

// state
public KcpClientConnection connection;
public bool connected;
Expand Down Expand Up @@ -58,10 +52,6 @@ public void Connect(string address, ushort port, bool noDelay, uint interval, in
OnDisconnected.Invoke();
};

// setup OnCheckEnabled to safely support Mirror
// scene changes (see comments in Awake() above)
connection.OnCheckEnabled = OnCheckEnabled;

// connect
connection.Connect(address, port, noDelay, interval, fastResend, congestionWindow, sendWindowSize, receiveWindowSize);
}
Expand Down Expand Up @@ -100,5 +90,10 @@ public void Tick()
connection.Tick();
}
}

// pause/unpause to safely support mirror scene handling and to
// immediately pause the receive while loop if needed.
public void Pause() => connection?.Pause();
public void Unpause() => connection?.Unpause();
}
}
17 changes: 11 additions & 6 deletions kcp2k/Assets/kcp2k/highlevel/KcpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public abstract class KcpConnection
// immediately after a scene change message. Mirror can't process any
// other messages during a scene change.
// (could be useful for others too)
public Func<bool> OnCheckEnabled = () => true;
bool paused;

// If we don't receive anything these many milliseconds
// then consider us disconnected
Expand Down Expand Up @@ -307,7 +307,7 @@ void TickAuthenticated(uint time)
//
// note that we check it BEFORE ever calling ReceiveNext. otherwise
// we would silently eat the received message and never process it.
while (OnCheckEnabled() &&
while (!paused &&
ReceiveNextReliable(out KcpHeader header, out ArraySegment<byte> message))
{
// message type FSM. no default so we never miss a case.
Expand Down Expand Up @@ -428,11 +428,11 @@ public void RawInput(byte[] buffer, int msgLength)
// the current state allows it.
if (state == KcpState.Authenticated)
{
// only process messages while enabled for Mirror
// only process messages while not paused for Mirror
// scene switching etc.
// -> if an unreliable message comes in while not
// enabled, simply drop it. it's unreliable!
if (OnCheckEnabled())
// -> if an unreliable message comes in while
// paused, simply drop it. it's unreliable!
if (!paused)
{
ArraySegment<byte> message = new ArraySegment<byte>(buffer, 1, msgLength - 1);
OnData?.Invoke(message);
Expand Down Expand Up @@ -588,5 +588,10 @@ public void Disconnect()

// get remote endpoint
public EndPoint GetRemoteEndPoint() => remoteEndpoint;

// pause/unpause to safely support mirror scene handling and to
// immediately pause the receive while loop if needed.
public void Pause() => paused = true;
public void Unpause() => paused = false;
}
}
24 changes: 14 additions & 10 deletions kcp2k/Assets/kcp2k/highlevel/KcpServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@ public class KcpServer
public Action<int, ArraySegment<byte>> OnData;
public Action<int> OnDisconnected;

// Mirror needs a way to stop kcp message processing while loop
// immediately after a scene change message. Mirror can't process any
// other messages during a scene change.
// (could be useful for others too)
public Func<bool> OnCheckEnabled = () => true;

// configuration
// NoDelay is recommended to reduce latency. This also scales better
// without buffers getting full.
Expand Down Expand Up @@ -205,10 +199,6 @@ public void Tick()
OnConnected.Invoke(connectionId);
};

// setup OnCheckEnabled to safely support Mirror
// scene changes (see comments in Awake() above)
connection.OnCheckEnabled = OnCheckEnabled;

// now input the message & tick
// connected event was set up.
// tick will process the first message and adds the
Expand Down Expand Up @@ -257,5 +247,19 @@ public void Stop()
socket?.Close();
socket = null;
}

// pause/unpause to safely support mirror scene handling and to
// immediately pause the receive while loop if needed.
public void Pause()
{
foreach (KcpServerConnection connection in connections.Values)
connection.Pause();
}

public void Unpause()
{
foreach (KcpServerConnection connection in connections.Values)
connection.Unpause();
}
}
}

0 comments on commit cd81abb

Please sign in to comment.