Skip to content

Commit 917a363

Browse files
committed
created websocketcontroller
1 parent 5ffb8f4 commit 917a363

File tree

3 files changed

+116
-79
lines changed

3 files changed

+116
-79
lines changed

reactions/platform/debug-reaction/debug-reaction.Server/Controllers/QueriesController.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class QueryController : ControllerBase
2727
public QueryController(IQueryDebugService debugService, IConfiguration configuration)
2828
{
2929
_debugService = debugService;
30-
_configDirectory = configuration.GetValue<string>("QueryConfigPath", "/etc/queries");
30+
_configDirectory = configuration.GetValue<string>("QueryConfigPath", "/etc/queries")!;
3131
}
3232

3333
// GET queries
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Copyright 2025 The Drasi Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
16+
using Microsoft.AspNetCore.Mvc;
17+
using Microsoft.Extensions.Logging;
18+
using System.Net.WebSockets;
19+
using System.Text;
20+
using System.Threading;
21+
using System.Threading.Tasks;
22+
using Drasi.Reactions.Debug.Server.Models;
23+
using Drasi.Reactions.Debug.Server.Services;
24+
25+
26+
[Route("ws")]
27+
public class WebSocketController : ControllerBase
28+
{
29+
private readonly IChangeBroadcaster _webSocketService;
30+
private readonly ILogger<WebSocketController> _logger;
31+
32+
public WebSocketController(IChangeBroadcaster webSocketService, ILogger<WebSocketController> logger)
33+
{
34+
_webSocketService = webSocketService;
35+
_logger = logger;
36+
}
37+
38+
[Route("query/{queryId?}")]
39+
public async Task Query(string? queryId)
40+
{
41+
if (!HttpContext.WebSockets.IsWebSocketRequest)
42+
{
43+
HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;
44+
return;
45+
}
46+
47+
if (string.IsNullOrEmpty(queryId))
48+
{
49+
using var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
50+
await webSocket.CloseAsync(WebSocketCloseStatus.InvalidPayloadData, "Invalid queryId", CancellationToken.None);
51+
return;
52+
}
53+
54+
using var webSocketQuery = await HttpContext.WebSockets.AcceptWebSocketAsync();
55+
_webSocketService.AddConnection(queryId, webSocketQuery);
56+
_logger.LogInformation($"WebSocket connected for queryId: {queryId}");
57+
58+
var buffer = new byte[1024 * 4];
59+
var lastPingTime = DateTime.Now;
60+
61+
try
62+
{
63+
while (webSocketQuery.State == WebSocketState.Open)
64+
{
65+
var result = await webSocketQuery.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
66+
if (result.MessageType == WebSocketMessageType.Close)
67+
{
68+
_logger.LogInformation($"WebSocket closing for queryId: {queryId}");
69+
await webSocketQuery.CloseAsync(WebSocketCloseStatus.NormalClosure, "Client closed connection", CancellationToken.None);
70+
return;
71+
}
72+
}
73+
}
74+
catch (Exception ex)
75+
{
76+
_logger.LogError(ex, $"WebSocket error for queryId: {queryId}");
77+
await webSocketQuery.CloseAsync(WebSocketCloseStatus.InternalServerError, "Server error", CancellationToken.None);
78+
}
79+
}
80+
81+
[Route("stream")]
82+
public async Task Stream()
83+
{
84+
if (!HttpContext.WebSockets.IsWebSocketRequest)
85+
{
86+
HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;
87+
return;
88+
}
89+
90+
using var webSocketStream = await HttpContext.WebSockets.AcceptWebSocketAsync();
91+
_webSocketService.AddConnection("stream", webSocketStream);
92+
_logger.LogInformation($"WebSocket connected for Event Stream");
93+
94+
var buffer = new byte[1024 * 4];
95+
96+
try
97+
{
98+
while (webSocketStream.State == WebSocketState.Open)
99+
{
100+
var result = await webSocketStream.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
101+
if (result.MessageType == WebSocketMessageType.Close)
102+
{
103+
_logger.LogInformation($"WebSocket closing for Event Stream");
104+
await webSocketStream.CloseAsync(WebSocketCloseStatus.NormalClosure, "Client closed connection", CancellationToken.None);
105+
return;
106+
}
107+
}
108+
}
109+
catch (Exception ex)
110+
{
111+
_logger.LogError(ex, "WebSocket error for Event Stream");
112+
await webSocketStream.CloseAsync(WebSocketCloseStatus.InternalServerError, "Server error", CancellationToken.None);
113+
}
114+
}
115+
}

reactions/platform/debug-reaction/debug-reaction.Server/Program.cs

-78
Original file line numberDiff line numberDiff line change
@@ -77,84 +77,6 @@ public static async Task Main(string[] args)
7777
return Results.Json(await debugService.GetRawEvents());
7878
});
7979

80-
server.Use(async (context, next) =>
81-
{
82-
if (context.Request.Path.StartsWithSegments("/ws/query"))
83-
{
84-
if (context.WebSockets.IsWebSocketRequest)
85-
{
86-
var webSocket = await context.WebSockets.AcceptWebSocketAsync();
87-
var pathSegments = context.Request.Path.Value.Split('/');
88-
var queryId = string.Empty;
89-
if (pathSegments.Length > 3)
90-
{
91-
queryId = pathSegments[3];
92-
}
93-
94-
if (string.IsNullOrEmpty(queryId))
95-
{
96-
await webSocket.CloseAsync(WebSocketCloseStatus.InvalidPayloadData, "Invalid queryId", CancellationToken.None);
97-
return;
98-
}
99-
100-
// Add the WebSocket connection to the WebSocket service
101-
var webSocketService = context.RequestServices.GetRequiredService<IChangeBroadcaster>();
102-
webSocketService.AddConnection(queryId, webSocket);
103-
104-
// Handle WebSocket connection
105-
server.Logger.LogInformation($"WebSocket connected for queryId: {queryId}hange");
106-
107-
var buffer = new byte[1024 * 4];
108-
var lastPingTime = DateTime.Now;
109-
while (webSocket.State == WebSocketState.Open)
110-
{
111-
var result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
112-
113-
if (result.MessageType == WebSocketMessageType.Close)
114-
{
115-
server.Logger.LogInformation($"WebSocket closing for queryId: {queryId}");
116-
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Client closed connection", CancellationToken.None);
117-
return;
118-
}
119-
}
120-
}
121-
else
122-
{
123-
context.Response.StatusCode = 400; // Bad Request if not a WebSocket request
124-
}
125-
}
126-
else if (context.Request.Path.StartsWithSegments("/ws/stream"))
127-
{
128-
if (context.WebSockets.IsWebSocketRequest)
129-
{
130-
var webSocket = await context.WebSockets.AcceptWebSocketAsync();
131-
server.Logger.LogInformation($"WebSocket connected for Event Stream");
132-
133-
var streamService = context.RequestServices.GetRequiredService<IChangeBroadcaster>();
134-
streamService.AddConnection("stream", webSocket);
135-
136-
var buffer = new byte[1024 * 4];
137-
while (webSocket.State == WebSocketState.Open)
138-
{
139-
var result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
140-
if (result.MessageType == WebSocketMessageType.Close)
141-
{
142-
server.Logger.LogInformation($"WebSocket closing for Event Stream");
143-
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Client closed connection", CancellationToken.None);
144-
return;
145-
}
146-
}
147-
}
148-
else
149-
{
150-
context.Response.StatusCode = 400; // Bad Request if not a WebSocket request
151-
}
152-
}
153-
else
154-
{
155-
await next();
156-
}
157-
});
15880

15981
var port = Environment.GetEnvironmentVariable("PORT") ?? "5195";
16082
server.Urls.Add($"http://0.0.0.0:{port}");

0 commit comments

Comments
 (0)