Skip to content

Commit

Permalink
fix(example/counter): fix event subscription
Browse files Browse the repository at this point in the history
Also:
- Better UI in the example
- Fixed a glitch in binding-http server
  • Loading branch information
relu91 committed May 31, 2023
1 parent e943953 commit 18af484
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 33 deletions.
21 changes: 21 additions & 0 deletions examples/browser/counter.html
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,23 @@
// JSONEditor.defaults.theme = 'foundation5';
JSONEditor.defaults.iconlib = "fontawesome4";
</script>
<style>
@keyframes blink {
50% {
opacity: 0;
}
100% {
opacity: 1;
}
}
.blink {
animation: blink 0.5s step-start 0s 1;
}

.hidden {
display: none;
}
</style>
</head>
<body>
<div id="topbar" class="row">
Expand Down Expand Up @@ -48,6 +65,10 @@ <h1>Counter Client Example in the Browser</h1>
<figure>
<figcaption><h4>Events</h4></figcaption>
<ul id="events" class="side-nav"></ul>
<div id="event-display" class="hidden">
<p>Event count: <span id="event-count"></span></p>
<p>Last payload: <span id="event-payload"></span></p>
</div>
</figure>
</div>
</div>
Expand Down
14 changes: 12 additions & 2 deletions examples/browser/counter.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ function showInteractions(thing) {

eventSubscriptions[evnt] = false;
let subscription;
let eventCounter = 0;
const eventDisplay = document.getElementById("event-display");
checkbox.onclick = (click) => {
if (
document.getElementById(evnt).checked &&
Expand All @@ -124,12 +126,16 @@ function showInteractions(thing) {
eventSubscriptions[evnt] = true;
thing
.subscribeEvent(evnt, async function (data) {
console.log('Event "' + evnt + '"');
updateProperties();
const value = await data.value();
console.log('Event "' + evnt + '"', value);
eventCounter++;
document.getElementById("event-count").innerHTML = eventCounter;
document.getElementById("event-payload").innerHTML = value;
})
.then((sub) => {
subscription = sub;
console.log("Subscribed for event: " + evnt);
eventDisplay.style.display = "block";
})
.catch((error) => {
window.alert("Event " + evnt + " error\nMessage: " + error);
Expand All @@ -142,6 +148,10 @@ function showInteractions(thing) {
.stop()
.then(() => {
console.log("Unsubscribed for event: " + evnt);
eventDisplay.style.display = "none";
eventCounter = 0;
document.getElementById("event-count").innerHTML = eventCounter;
document.getElementById("event-payload").innerHTML = "";
})
.catch((error) => {
window.alert("Event " + evnt + " error\nMessage: " + error);
Expand Down
10 changes: 8 additions & 2 deletions packages/binding-http/src/http-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -985,8 +985,14 @@ export default class HttpServer implements ProtocolServer {
const listener = async (value: Content) => {
try {
// send event data
res.setHeader("Content-Type", value.type);
res.writeHead(200);
if (!res.headersSent) {
// We are polite and use the same request as long as the client
// does not close the connection (or we hit the timeout; see below).
// Therefore we are sending the headers
// only if we didn't have sent them before.
res.setHeader("Content-Type", value.type);
res.writeHead(200);
}
value.body.pipe(res);
} catch (err) {
if (err?.code === "ERR_HTTP_HEADERS_SENT") {
Expand Down
5 changes: 5 additions & 0 deletions packages/binding-http/src/subscription-protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ export class LongPollingSubscription implements InternalSubscription {
private client: HttpClient;

private closed: boolean;
private abortController: AbortController;
/**
*
*/
constructor(form: HttpForm, client: HttpClient) {
this.form = form;
this.client = client;
this.closed = false;
this.abortController = new AbortController();
}

open(next: (value: Content) => void, error?: (error: Error) => void, complete?: () => void): Promise<void> {
Expand All @@ -45,6 +47,7 @@ export class LongPollingSubscription implements InternalSubscription {
if (handshake) {
const headRequest = await this.client["generateFetchRequest"](this.form, "HEAD", {
timeout: 1000,
signal: this.abortController.signal as any,
});
const result = await this.client["fetch"](headRequest);
if (result.ok) resolve();
Expand All @@ -53,6 +56,7 @@ export class LongPollingSubscription implements InternalSubscription {
// long timeout for long polling
const request = await this.client["generateFetchRequest"](this.form, "GET", {
timeout: 60 * 60 * 1000,
signal: this.abortController.signal as any,
});
debug(`HttpClient (subscribeResource) sending ${request.method} to ${request.url}`);

Expand Down Expand Up @@ -85,6 +89,7 @@ export class LongPollingSubscription implements InternalSubscription {
}

close(): void {
this.abortController.abort();
this.closed = true;
}
}
Expand Down
62 changes: 33 additions & 29 deletions packages/core/src/consumed-thing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,26 @@ abstract class InternalSubscription implements Subscription {
abstract stop(options?: WoT.InteractionOptions): Promise<void>;
}

function handleUriVariables(
thing: ConsumedThing,
ti: ThingInteraction,
form: TD.Form,
options?: WoT.InteractionOptions
): TD.Form {
const ut = UriTemplate.parse(form.href);
const uriVariables = Helpers.parseInteractionOptions(thing, ti, options).uriVariables;
const updatedHref = ut.expand(uriVariables ?? {});
if (updatedHref !== form.href) {
// create shallow copy and update href
const updForm = { ...form };
updForm.href = updatedHref;
form = updForm;
debug(`ConsumedThing '${thing.title}' update form URI to ${form.href}`);
}

return form;
}

class InternalPropertySubscription extends InternalSubscription {
active = false;
private formIndex: number;
Expand Down Expand Up @@ -157,8 +177,9 @@ class InternalPropertySubscription extends InternalSubscription {
throw new Error(`ConsumedThing '${this.thing.title}' did not get suitable form`);
}

const formWithoutURIvariables = handleUriVariables(this.thing, tp, form, options);
debug(`ConsumedThing '${this.thing.title}' unobserving to ${form.href}`);
await this.client.unlinkResource(form);
await this.client.unlinkResource(formWithoutURIvariables);
this.active = false;
}

Expand Down Expand Up @@ -282,20 +303,14 @@ class InternalEventSubscription extends InternalSubscription {
options.formIndex = this.matchingUnsubscribeForm();
}

const { client, form } = this.thing.getClientFor(
te.forms,
"unsubscribeevent",
Affordance.EventAffordance,
options
);
const { form } = this.thing.getClientFor(te.forms, "unsubscribeevent", Affordance.EventAffordance, options);
if (!form) {
throw new Error(`ConsumedThing '${this.thing.title}' did not get suitable form`);
}
if (!client) {
throw new Error(`ConsumedThing '${this.thing.title}' did not get suitable client for ${form.href}`);
}

const formWithoutURIvariables = handleUriVariables(this.thing, te, form, options);
debug(`ConsumedThing '${this.thing.title}' unsubscribing to ${form.href}`);
client.unlinkResource(form);
this.client.unlinkResource(formWithoutURIvariables);
this.active = false;
}

Expand Down Expand Up @@ -702,7 +717,7 @@ export default class ConsumedThing extends TD.Thing implements IConsumedThing {
if (!tp) {
throw new Error(`ConsumedThing '${this.title}' does not have property ${name}`);
}
let { client, form } = this.getClientFor(tp.forms, "observeproperty", Affordance.PropertyAffordance, options);
const { client, form } = this.getClientFor(tp.forms, "observeproperty", Affordance.PropertyAffordance, options);
if (!form) {
throw new Error(`ConsumedThing '${this.title}' did not get suitable form`);
}
Expand All @@ -717,10 +732,10 @@ export default class ConsumedThing extends TD.Thing implements IConsumedThing {
debug(`ConsumedThing '${this.title}' observing to ${form.href}`);

// uriVariables ?
form = this.handleUriVariables(tp, form, options);
const formWithoutURITemplates = this.handleUriVariables(tp, form, options);

await client.subscribeResource(
form,
formWithoutURITemplates,
// next
(content) => {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion -- tsc get confused when nullables are to listeners lambdas
Expand Down Expand Up @@ -760,7 +775,7 @@ export default class ConsumedThing extends TD.Thing implements IConsumedThing {
if (!te) {
throw new Error(`ConsumedThing '${this.title}' does not have event ${name}`);
}
let { client, form } = this.getClientFor(te.forms, "subscribeevent", Affordance.EventAffordance, options);
const { client, form } = this.getClientFor(te.forms, "subscribeevent", Affordance.EventAffordance, options);
if (!form) {
throw new Error(`ConsumedThing '${this.title}' did not get suitable form`);
}
Expand All @@ -775,10 +790,10 @@ export default class ConsumedThing extends TD.Thing implements IConsumedThing {
debug(`ConsumedThing '${this.title}' subscribing to ${form.href}`);

// uriVariables ?
form = this.handleUriVariables(te, form, options);
const formWithoutURITemplates = this.handleUriVariables(te, form, options);

await client.subscribeResource(
form,
formWithoutURITemplates,
(content) => {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion -- tsc get confused when nullables are to listeners lambdas
if (!content.type) content.type = form!.contentType ?? "application/json";
Expand Down Expand Up @@ -808,17 +823,6 @@ export default class ConsumedThing extends TD.Thing implements IConsumedThing {
// http://192.168.178.24:8080/counter/actions/increment{?step} with options {uriVariables: {'step' : 3}} --> http://192.168.178.24:8080/counter/actions/increment?step=3
// see RFC6570 (https://tools.ietf.org/html/rfc6570) for URI Template syntax
handleUriVariables(ti: ThingInteraction, form: TD.Form, options?: WoT.InteractionOptions): TD.Form {
const ut = UriTemplate.parse(form.href);
const uriVariables = Helpers.parseInteractionOptions(this, ti, options).uriVariables;
const updatedHref = ut.expand(uriVariables ?? {});
if (updatedHref !== form.href) {
// create shallow copy and update href
const updForm = { ...form };
updForm.href = updatedHref;
form = updForm;
debug(`ConsumedThing '${this.title}' update form URI to ${form.href}`);
}

return form;
return handleUriVariables(this, ti, form, options);
}
}

0 comments on commit 18af484

Please sign in to comment.