Skip to content

Commit

Permalink
Merge pull request #200 from eclipse-thingweb/sse
Browse files Browse the repository at this point in the history
feat: add initial support for server-sent events
  • Loading branch information
JKRhb authored Nov 27, 2024
2 parents e8373bf + a31de5c commit a5ca0b4
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 43 deletions.
18 changes: 15 additions & 3 deletions lib/src/binding_http/http_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import "../../core.dart";
import "http_config.dart";
import "http_request_method.dart";
import "http_security_exception.dart";
import "http_subscription.dart";

const _authorizationHeader = "Authorization";

Expand Down Expand Up @@ -306,13 +307,24 @@ final class HttpClient extends ProtocolClient

@override
Future<Subscription> subscribeResource(
Form form, {
AugmentedForm form, {
required void Function(Content content) next,
void Function(Exception error)? error,
required void Function() complete,
}) async {
// TODO(JKRhb): implement subscribeResource
throw UnimplementedError();
if (form.subprotocol != "sse") {
throw const DartWotException(
"Only server-sent events are supported at the moment by dart_wot",
);
}

return HttpSseSubscription(
form,
complete,
next: next,
onError: error,
complete: complete,
);
}

Future<DiscoveryContent> _sendDiscoveryRequest(
Expand Down
13 changes: 1 addition & 12 deletions lib/src/binding_http/http_client_factory.dart
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,7 @@ final class HttpClientFactory implements ProtocolClientFactory {

@override
bool supportsOperation(OperationType operationType, String? subprotocol) {
const unsupportedOperations = [
OperationType.observeproperty,
OperationType.unobserveproperty,
OperationType.subscribeevent,
OperationType.unsubscribeevent,
];

if (unsupportedOperations.contains(operationType)) {
return false;
}

if (subprotocol != null) {
if (subprotocol != null && !["sse"].contains(subprotocol)) {
return false;
}

Expand Down
66 changes: 66 additions & 0 deletions lib/src/binding_http/http_subscription.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2024 Contributors to the Eclipse Foundation. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//
// SPDX-License-Identifier: BSD-3-Clause

import "dart:convert";

import "package:sse_channel/sse_channel.dart";

import "../../core.dart";

/// A [ProtocolSubscription] for supporting server-sent events.
final class HttpSseSubscription extends ProtocolSubscription {
/// Constructor
HttpSseSubscription(
AugmentedForm form,
super._complete, {
required void Function(Content content) next,
void Function(Exception error)? onError,
void Function()? complete,
}) : _active = true,
_sseChannel = SseChannel.connect(form.resolvedHref) {
_sseChannel.stream.listen(
(data) {
if (data is! String) {
return;
}
next(
Content(form.contentType, Stream.fromIterable([utf8.encode(data)])),
);
},
onError: (error) {
if (error is! Exception) {
return;
}

onError?.call(error);
},
onDone: complete,
);
}

final SseChannel _sseChannel;

bool _active;

@override
bool get active => _active;

@override
Future<void> stop({
int? formIndex,
Map<String, Object>? uriVariables,
Object? data,
}) async {
if (!_active) {
return;
}
_active = false;

await _sseChannel.sink.close();
await super
.stop(formIndex: formIndex, uriVariables: uriVariables, data: data);
}
}
1 change: 1 addition & 0 deletions pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies:
meta: ^1.8.0
mqtt_client: ^10.0.0
multicast_dns: ^0.3.2+1
sse_channel: ^0.1.1
typed_data: ^1.3.2
uri: ^1.0.0
uuid: ^4.2.1
37 changes: 9 additions & 28 deletions test/binding_http/http_client_factory_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,48 +13,29 @@ void main() {
test("indicate correctly whether an operation is supported", () {
final httpClientFactory = HttpClientFactory();

const observeOperations = [
OperationType.observeproperty,
OperationType.unobserveproperty,
OperationType.subscribeevent,
OperationType.unsubscribeevent,
];
final otherOperations = OperationType.values
.where((operationType) => !observeOperations.contains(operationType));

final testVector = [
(
expectedResult: false,
operationTypes: observeOperations,
expectedResult: true,
subprotocol: null,
),
(
expectedResult: false,
operationTypes: observeOperations,
subprotocol: "foobar",
),
(
expectedResult: true,
operationTypes: otherOperations,
subprotocol: null,
subprotocol: "sse",
),
(
expectedResult: false,
operationTypes: otherOperations,
subprotocol: "foobar",
),
];

for (final testCase in testVector) {
for (final operationType in testCase.operationTypes) {
expect(
httpClientFactory.supportsOperation(
operationType,
testCase.subprotocol,
),
testCase.expectedResult,
);
}
expect(
httpClientFactory.supportsOperation(
OperationType.invokeaction,
testCase.subprotocol,
),
testCase.expectedResult,
);
}
});
});
Expand Down

0 comments on commit a5ca0b4

Please sign in to comment.