Skip to content

Commit

Permalink
fix(server): complex array formatter causing gen2 gc
Browse files Browse the repository at this point in the history
  • Loading branch information
kahojyun committed Jul 21, 2023
1 parent 62db7c8 commit 154c42b
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 132 deletions.
1 change: 1 addition & 0 deletions src/Qynit.PulseGen.Server/.prettierrc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
41 changes: 35 additions & 6 deletions src/Qynit.PulseGen.Server/ComplexArrayFormatter.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Runtime.InteropServices;
using System.Runtime.InteropServices;

using MessagePack;
using MessagePack.Formatters;
Expand All @@ -21,10 +21,39 @@ public void Serialize(ref MessagePackWriter writer, PooledComplexArray<double> v
}

writer.WriteArrayHeader(2);
var nBytes = sizeof(double) * value.Length;
writer.WriteBinHeader(nBytes);
writer.WriteRaw(MemoryMarshal.AsBytes(value.DataI));
writer.WriteBinHeader(nBytes);
writer.WriteRaw(MemoryMarshal.AsBytes(value.DataQ));
var bytesI = MemoryMarshal.AsBytes(value.DataI);
var bytesQ = MemoryMarshal.AsBytes(value.DataQ);
var length = bytesI.Length;
if (length <= ushort.MaxValue)
{
writer.Write(bytesI);
writer.Write(bytesQ);
}
else
{
WriteBinHeader(ref writer, length);
writer.WriteRaw(bytesI);
WriteBinHeader(ref writer, length);
writer.WriteRaw(bytesQ);
}
}

private static void WriteBinHeader(ref MessagePackWriter writer, int length)
{
var span = writer.GetSpan(5);
span[0] = MessagePackCode.Bin32;
WriteBigEndian(length, span[1..]);
writer.Advance(5);
}

private static void WriteBigEndian(int value, Span<byte> span)
{
unchecked
{
span[3] = (byte)value;
span[2] = (byte)(value >> 8);
span[1] = (byte)(value >> 16);
span[0] = (byte)(value >> 24);
}
}
}
22 changes: 17 additions & 5 deletions src/Qynit.PulseGen.Server/Models/PulseGenResponse.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,28 @@
using MessagePack;
using MessagePack;

namespace Qynit.PulseGen.Server.Models;

[MessagePackObject]
public sealed record PulseGenResponse(
[property: Key(0)] IList<PooledComplexArray<double>> Waveforms) : IDisposable
public sealed class PulseGenResponse : IDisposable
{
[Key(0)]
public IList<PooledComplexArray<double>> Waveforms { get; set; } = null!;

private readonly List<ArcUnsafe<PooledComplexArray<double>>> _disposables = null!;

public PulseGenResponse() { }

public PulseGenResponse(List<ArcUnsafe<PooledComplexArray<double>>> waveforms)
{
_disposables = waveforms;
Waveforms = _disposables.Select(x => x.Target).ToList();
}

public void Dispose()
{
foreach (var waveform in Waveforms)
foreach (var disposable in _disposables)
{
waveform.Dispose();
disposable.Dispose();
}
}
}
121 changes: 78 additions & 43 deletions src/Qynit.PulseGen.Server/Pages/Index.razor
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
@using Qynit.PulseGen.Server.Services
@using System.IO.Pipelines;
@using System.Buffers;
@using System.Collections.Concurrent;
@using System.Threading.Channels;
@inject NavigationManager Navigation
@inject IJSRuntime JS
@inject IPlotService PlotService
Expand All @@ -13,103 +15,136 @@

<div class="box">
<h1 class="row header">Qynit PulseGen</h1>
<div @ref="chart" class="row content" />
<div @ref="_chart" class="row content" />
</div>

@code
{
private HubConnection? hubConnection;
private ElementReference chart;
private IJSObjectReference? module;
private DotNetObjectReference<Index>? objRef;
private Task? updateWaveforms;
private CancellationTokenSource? cancellationTokenSource;
private HubConnection? _hubConnection;
private ElementReference _chart;
private IJSObjectReference? _module;
private DotNetObjectReference<Index>? _objRef;
private Task? _renderTask;
private CancellationTokenSource? _renderCts;
private ConcurrentDictionary<string, bool> _channelNewValue = new();
private ConcurrentDictionary<string, bool> _channelVisible = new();
private Channel<string> _renderQueue = Channel.CreateUnbounded<string>();

protected override async Task OnInitializedAsync()
{
hubConnection = new HubConnectionBuilder()
_hubConnection = new HubConnectionBuilder()
.WithUrl(Navigation.ToAbsoluteUri(PlotHub.Uri))
.Build();

hubConnection.On<IEnumerable<string>>(nameof(ReceiveNames), ReceiveNames);
_hubConnection.On<IEnumerable<string>>(nameof(ReceiveNames), ReceiveNames);

await hubConnection.StartAsync();
await _hubConnection.StartAsync();
}

protected override async Task OnAfterRenderAsync(bool firstRender)
{
if (firstRender)
{
module = await JS.InvokeAsync<IJSObjectReference>("import", "./Pages/Index.razor.js");
objRef = DotNetObjectReference.Create(this);
await module.InvokeVoidAsync("init", chart, objRef);
_module = await JS.InvokeAsync<IJSObjectReference>("import", "./Pages/Index.razor.js");
_objRef = DotNetObjectReference.Create(this);
await _module.InvokeVoidAsync("init", _chart, _objRef);
_renderCts = new();
_renderTask = RenderInBackground(_renderCts.Token);
}
}

public async Task ReceiveNames(IEnumerable<string> names)
[JSInvokable]
public async ValueTask VisibilityChanged(string name, bool visible)
{
if (updateWaveforms is not null)
var flag = false;
_channelVisible.AddOrUpdate(name, visible, (k, v) =>
{
if (!updateWaveforms.IsCompleted)
{
cancellationTokenSource!.Cancel();
}
try
{
await updateWaveforms;
}
catch (OperationCanceledException) { }
cancellationTokenSource!.Dispose();
flag = visible && !v;
return visible;
});
if (flag)
{
await EnqueueRender(name);
}
cancellationTokenSource = new();
updateWaveforms = UpdateWaveforms(names, cancellationTokenSource.Token);
}

public async Task UpdateWaveforms(IEnumerable<string> names, CancellationToken token)
public async Task ReceiveNames(IEnumerable<string> names)
{
foreach (var name in names)
{
if (token.IsCancellationRequested)
_channelNewValue[name] = true;
await EnqueueRender(name);
}
}

private async ValueTask EnqueueRender(string name)
{
await _renderQueue.Writer.WriteAsync(name);
}

private async Task RenderInBackground(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
var name = await _renderQueue.Reader.ReadAsync(token);
if (ChannelNeedUpdate(name) && ChannelShouldRender(name))
{
break;
await RenderWaveform(name, token);
_channelNewValue[name] = false;
}
await UpdateWaveform(name);
}
}

[JSInvokable]
public async Task UpdateWaveform(string name)
private bool ChannelNeedUpdate(string name)
{
return _channelNewValue.TryGetValue(name, out var needUpdate) && needUpdate;
}

private bool ChannelShouldRender(string name)
{
var hasValue = _channelVisible.TryGetValue(name, out var visible);
return !hasValue || visible;
}

private async ValueTask RenderWaveform(string name, CancellationToken token)
{
if (PlotService.TryGetPlot(name, out var arc))
{
using (arc)
{
var pipe = new Pipe();
var writer = pipe.Writer;
var reader = pipe.Reader;
using var streamRef = new DotNetStreamReference(reader.AsStream());
var task = module!.InvokeVoidAsync("addWaveform", name, streamRef);
writer.Write(MemoryMarshal.AsBytes(arc.Target.DataI));
writer.Write(MemoryMarshal.AsBytes(arc.Target.DataQ));
await writer.CompleteAsync();
await task;
using var streamRef = new DotNetStreamReference(pipe.Reader.AsStream());
await _module!.InvokeVoidAsync("renderWaveform", token, name, streamRef);
}
}
}

public async ValueTask DisposeAsync()
{
cancellationTokenSource?.Dispose();
objRef?.Dispose();
if (hubConnection is not null)
_renderCts?.Cancel();
if (_renderTask is not null)
{
try
{
await _renderTask;
}
catch (OperationCanceledException) { }
}
_renderCts?.Dispose();
_objRef?.Dispose();
if (_hubConnection is not null)
{
await hubConnection.DisposeAsync();
await _hubConnection.DisposeAsync();
}
if (module is not null)
if (_module is not null)
{
try
{
await module.DisposeAsync();
await _module.DisposeAsync();
}
catch (JSDisconnectedException) { }
}
Expand Down
84 changes: 40 additions & 44 deletions src/Qynit.PulseGen.Server/Pages/Index.razor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import { ChartXY, LegendBox, LineSeries, lightningChart } from "@arction/lcjs";
import {
ChartXY,
LegendBox,
LineSeries,
LineSeriesOptions,
lightningChart,
} from "@arction/lcjs";
import { DotNet } from "@microsoft/dotnet-js-interop";

let chart: ChartXY;
Expand All @@ -11,7 +17,10 @@ export function init(targetElem: HTMLDivElement, objRef: DotNet.DotNetObject) {
container: targetElem,
})
.setAnimationsEnabled(false);
legend = chart.addLegendBox();
legend = chart.addLegendBox().setAutoDispose({
type: "max-width",
maxWidth: 0.2,
});
dotnetRef = objRef;
}

Expand All @@ -22,53 +31,40 @@ type WaveformSeries = {

const series = new Map<string, WaveformSeries>();

export async function addWaveform(name: string, iqBytesStream) {
export async function renderWaveform(name: string, iqBytesStream) {
const iqBytesArray: ArrayBuffer = await iqBytesStream.arrayBuffer();
const float64Array = new Float64Array(iqBytesArray);
const length = float64Array.length / 2;
const iArray = float64Array.subarray(0, length);
const qArray = float64Array.subarray(length, length * 2);
if (series.has(name)) {
const { i, q } = series.get(name);
if (i.getVisible()) {
i.clear();
i.addArrayY(iArray);
}
if (q.getVisible()) {
q.clear();
q.addArrayY(qArray);
}
if (!series.has(name)) {
const { i, q } = initSeries(name);
i.addArrayY(iArray);
q.addArrayY(qArray);
} else {
const i = chart
.addLineSeries({
dataPattern: {
pattern: "ProgressiveX",
regularProgressiveStep: true,
},
})
.setName(`${name}_I`)
.addArrayY(iArray);
legend.add(i);
i.onVisibleStateChanged(async (_, state) => {
if (state) {
await dotnetRef.invokeMethodAsync("UpdateWaveform", name);
}
});
const q = chart
.addLineSeries({
dataPattern: {
pattern: "ProgressiveX",
regularProgressiveStep: true,
},
})
.setName(`${name}_Q`)
.addArrayY(qArray);
series.set(name, { i, q });
legend.add(q);
q.onVisibleStateChanged(async (_, state) => {
if (state) {
await dotnetRef.invokeMethodAsync("UpdateWaveform", name);
}
});
const { i, q } = series.get(name);
i.clear();
i.addArrayY(iArray);
q.clear();
q.addArrayY(qArray);
}
}

function initSeries(name: string) {
const opts: LineSeriesOptions = {
dataPattern: {
pattern: "ProgressiveX",
regularProgressiveStep: true,
},
};
const i = chart.addLineSeries(opts).setName(`${name}_I`);
const q = chart.addLineSeries(opts).setName(`${name}_Q`);
series.set(name, { i, q });
legend.add(i).add(q);
const handler = async (_, state: boolean) => {
await dotnetRef.invokeMethodAsync("VisibilityChanged", name, state);
};
i.onVisibleStateChanged(handler);
q.onVisibleStateChanged(handler);
return { i, q };
}
Loading

0 comments on commit 154c42b

Please sign in to comment.