Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: notify clients with acceptance/rejection of events instead of snapshots #2772

Merged
merged 5 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,8 @@ public Flux<ObjectNode> subscribe() {
LOGGER.debug("New Flux subscription...");
lock.lock();
try {
var currentValue = createStatusUpdateEvent(this.id.toString(),
StateEvent.EventType.SNAPSHOT);
sink.tryEmitNext(currentValue);
var snapshot = createSnapshotEvent();
sink.tryEmitNext(snapshot);
subscribers.add(sink);
} finally {
lock.unlock();
Expand All @@ -102,14 +101,11 @@ public Flux<ObjectNode> subscribe() {
public void submit(ObjectNode event) {
lock.lock();
try {
boolean success = processEvent(event);
boolean accepted = processEvent(event);
// Notify subscribers
subscribers.removeIf(sink -> {
var updatedValue = createStatusUpdateEvent(
event.get("id").asText(),
success ? StateEvent.EventType.SNAPSHOT
: StateEvent.EventType.REJECT);
boolean failure = sink.tryEmitNext(updatedValue).isFailure();
var eventWithStatus = StateEvent.setAccepted(event, accepted);
boolean failure = sink.tryEmitNext(eventWithStatus).isFailure();
if (failure) {
LOGGER.debug("Failed push");
}
Expand Down Expand Up @@ -139,10 +135,10 @@ public T getValue() {
return this.value;
}

private ObjectNode createStatusUpdateEvent(String eventId,
StateEvent.EventType eventType) {
var snapshot = new StateEvent<>(eventId, eventType, this.value);
return snapshot.toJson();
private ObjectNode createSnapshotEvent() {
var snapshot = new StateEvent<>(getId().toString(),
StateEvent.EventType.SNAPSHOT, this.value).toJson();
return StateEvent.setAccepted(snapshot, true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ public static final class Field {
public static final String TYPE = "type";
public static final String VALUE = "value";
public static final String EXPECTED = "expected";
public static final String ACCEPTED = "accepted";
}

/**
* Possible types of state events.
*/
public enum EventType {
SNAPSHOT, SET, REPLACE, REJECT, INCREMENT
SNAPSHOT, SET, REPLACE, INCREMENT
}

/**
Expand Down Expand Up @@ -165,6 +166,15 @@ public ObjectNode toJson() {
return json;
}

public static ObjectNode setAccepted(ObjectNode event, boolean accepted) {
return event.put(Field.ACCEPTED, accepted);
}

public static boolean isAccepted(ObjectNode event) {
return event.has(Field.ACCEPTED)
&& event.get(Field.ACCEPTED).asBoolean();
}

private JsonNode valueAsJsonNode(T value) {
return MAPPER.valueToTree(value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

public class ValueSignalTest {

Expand Down Expand Up @@ -96,9 +98,13 @@ public void submit_notifies_subscribers() {
// notification for the initial value
assertNull(stateEvent.getValue());
} else if (counter.get() == 1) {
assertTrue(StateEvent.isAccepted(eventJson));
assertEquals(name, stateEvent.getValue().getName());
assertEquals(name, signal.getValue().getName());
assertEquals(age, stateEvent.getValue().getAge());
assertEquals(age, signal.getValue().getAge());
assertEquals(adult, stateEvent.getValue().isAdult());
assertEquals(adult, signal.getValue().isAdult());
}
counter.incrementAndGet();
});
Expand All @@ -123,12 +129,14 @@ public void submit_conditionIsMet_notifies_subscribers_with_snapshot_event() {
if (counter.get() == 0) {
// notification for the initial value
assertEquals(2.0, stateEvent.getValue(), 0.0);
assertTrue(StateEvent.isAccepted(eventJson));
} else if (counter.get() == 1) {
assertEquals(conditionalReplaceEvent.get(StateEvent.Field.ID)
.asText(), stateEvent.getId());
assertEquals(StateEvent.EventType.SNAPSHOT,
assertEquals(StateEvent.EventType.REPLACE,
stateEvent.getEventType());
assertEquals(3.0, stateEvent.getValue(), 0.0);
assertTrue(StateEvent.isAccepted(eventJson));
assertEquals(3.0, signal.getValue(), 0.0);
}
counter.incrementAndGet();
});
Expand All @@ -151,13 +159,14 @@ public void submit_conditionIsNotMet_notifies_subscribers_with_reject_event() {
var stateEvent = new StateEvent<>(eventJson, Double.class);
if (counter.get() == 0) {
// notification for the initial value
assertEquals(1.0, stateEvent.getValue(), 0.0);
assertTrue(StateEvent.isAccepted(eventJson));
} else if (counter.get() == 1) {
assertEquals(conditionalReplaceEvent.get(StateEvent.Field.ID)
.asText(), stateEvent.getId());
assertEquals(StateEvent.EventType.REJECT,
assertEquals(StateEvent.EventType.REPLACE,
stateEvent.getEventType());
assertEquals(1.0, stateEvent.getValue(), 0.0);
assertFalse(StateEvent.isAccepted(eventJson));
assertEquals(1.0, signal.getValue(), 0.0);
}
counter.incrementAndGet();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void constructor_withJsonInvalidEventType_shouldThrowInvalidEventTypeExce
StateEvent.InvalidEventTypeException.class,
() -> new StateEvent<>(json, String.class));

String expectedMessage = "Invalid event type invalidType. Type should be one of: [SNAPSHOT, SET, REPLACE, REJECT, INCREMENT]";
String expectedMessage = "Invalid event type invalidType. Type should be one of: [SNAPSHOT, SET, REPLACE, INCREMENT]";
String actualMessage = exception.getMessage();

assertTrue(actualMessage.contains(expectedMessage));
Expand All @@ -152,7 +152,7 @@ public void constructor_withJsonMissingEventType_shouldThrowInvalidEventTypeExce
StateEvent.InvalidEventTypeException.class,
() -> new StateEvent<>(json, String.class));

String expectedMessage = "Missing event type. Type is required, and should be one of: [SNAPSHOT, SET, REPLACE, REJECT, INCREMENT]";
String expectedMessage = "Missing event type. Type is required, and should be one of: [SNAPSHOT, SET, REPLACE, INCREMENT]";
String actualMessage = exception.getMessage();

assertTrue(actualMessage.contains(expectedMessage));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public void when_signalIsRegistered_update_notifiesTheSubscribers()

var expectedUpdatedSignalEventJson = new ObjectNode(
mapper.getNodeFactory()).put("value", 42.0)
.put("id", signalId.toString()).put("type", "snapshot");
.put("id", signalId.toString()).put("type", "snapshot")
.put("accepted", true);
StepVerifier.create(firstFlux)
.expectNext(expectedUpdatedSignalEventJson).thenCancel()
.verify();
Expand Down
18 changes: 16 additions & 2 deletions packages/ts/react-signals/src/NumberSignal.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createIncrementStateEvent } from './events.js';
import { $update } from './FullStackSignal.js';
import { createIncrementStateEvent, type StateEvent } from './events.js';
import { $processServerResponse, $update } from './FullStackSignal.js';
import { ValueSignal } from './ValueSignal.js';

/**
Expand All @@ -26,6 +26,7 @@ import { ValueSignal } from './ValueSignal.js';
* ```
*/
export class NumberSignal extends ValueSignal<number> {
readonly #sentIncrementEvents = new Map<string, StateEvent<number>>();
/**
* Increments the value by the specified delta. The delta can be negative to
* decrease the value.
Expand All @@ -44,6 +45,19 @@ export class NumberSignal extends ValueSignal<number> {
}
this.setValueLocal(this.value + delta);
const event = createIncrementStateEvent(delta);
this.#sentIncrementEvents.set(event.id, event);
this[$update](event);
}

protected override [$processServerResponse](event: StateEvent<number>): void {
if (event.accepted && event.type === 'increment') {
if (this.#sentIncrementEvents.has(event.id)) {
this.#sentIncrementEvents.delete(event.id);
return;
}
this.setValueLocal(this.value + event.value);
} else {
super[$processServerResponse](event);
}
}
}
22 changes: 16 additions & 6 deletions packages/ts/react-signals/src/ValueSignal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,27 @@ export class ValueSignal<T> extends FullStackSignal<T> {
this.#pendingRequests.delete(event.id);
}

if (event.type === 'snapshot') {
if (!event.accepted && record) {
if (!record.canceled) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.update(record.callback);
}
}

if (event.accepted || event.type === 'snapshot') {
if (record) {
record.waiter.resolve();
}
this.value = event.value;
this.#applyAcceptedEvent(event);
}
}

if (event.type === 'reject' && record) {
if (!record.canceled) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.update(record.callback);
#applyAcceptedEvent(event: StateEvent<T>): void {
if (event.type === 'set' || event.type === 'snapshot') {
this.value = event.value;
} else if (event.type === 'replace') {
if (JSON.stringify(this.value) === JSON.stringify(event.expected)) {
this.value = event.value;
}
}
}
Expand Down
14 changes: 5 additions & 9 deletions packages/ts/react-signals/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type CreateStateEventType<V, T extends string, C extends Record<string, unknown>
id: string;
type: T;
value: V;
accepted: boolean;
}> &
Readonly<C>
>;
Expand All @@ -19,11 +20,8 @@ type CreateStateEventType<V, T extends string, C extends Record<string, unknown>
*/
export type SnapshotStateEvent<T> = CreateStateEventType<T, 'snapshot'>;

export type RejectStateEvent = CreateStateEventType<never, 'reject'>;

/**
* A state event defines a new value of the signal shared with the server. The
*
*/
export type SetStateEvent<T> = CreateStateEventType<T, 'set'>;

Expand All @@ -32,6 +30,7 @@ export function createSetStateEvent<T>(value: T): SetStateEvent<T> {
id: nanoid(),
type: 'set',
value,
accepted: false,
};
}

Expand All @@ -43,6 +42,7 @@ export function createReplaceStateEvent<T>(expected: T, value: T): ReplaceStateE
type: 'replace',
value,
expected,
accepted: false,
};
}

Expand All @@ -53,15 +53,11 @@ export function createIncrementStateEvent(delta: number): IncrementStateEvent {
id: nanoid(),
type: 'increment',
value: delta,
accepted: false,
};
}

/**
* An object that describes the change of the signal state.
*/
export type StateEvent<T> =
| IncrementStateEvent
| RejectStateEvent
| ReplaceStateEvent<T>
| SetStateEvent<T>
| SnapshotStateEvent<T>;
export type StateEvent<T> = IncrementStateEvent | ReplaceStateEvent<T> | SetStateEvent<T> | SnapshotStateEvent<T>;
15 changes: 9 additions & 6 deletions packages/ts/react-signals/test/FullStackSignal.spec.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { ActionOnLostSubscription, ConnectClient, type Subscription } from '@vaa
import { nanoid } from 'nanoid';
import sinon from 'sinon';
import sinonChai from 'sinon-chai';
import type { SnapshotStateEvent, StateEvent } from '../src/events.js';
import type { StateEvent } from '../src/events.js';
import { DependencyTrackingSignal } from '../src/FullStackSignal.js';
import { computed, NumberSignal } from '../src/index.js';
import { nextFrame } from './utils.js';
Expand Down Expand Up @@ -51,8 +51,11 @@ describe('@vaadin/hilla-react-signals', () => {
});

describe('FullStackSignal', () => {
function createSnapshotEvent(value: number): SnapshotStateEvent<number> {
return { id: nanoid(), type: 'snapshot', value };
function createAcceptedEvent(
value: number,
type: 'increment' | 'replace' | 'set' | 'snapshot',
): StateEvent<number> {
return { id: nanoid(), type, value, expected: 0, accepted: true };
}

function simulateReceivedChange(
Expand Down Expand Up @@ -195,7 +198,7 @@ describe('@vaadin/hilla-react-signals', () => {
await nextFrame();

// Simulate the event received from the server:
const snapshotEvent = createSnapshotEvent(42);
const snapshotEvent = createAcceptedEvent(42, 'snapshot');
simulateReceivedChange(subscription, snapshotEvent);

// Check if the signal value is updated:
Expand All @@ -207,13 +210,13 @@ describe('@vaadin/hilla-react-signals', () => {

let result = render(<span>Value is {numberSignal}</span>);
await nextFrame();
simulateReceivedChange(subscription, createSnapshotEvent(42));
simulateReceivedChange(subscription, createAcceptedEvent(42, 'snapshot'));

result = render(<span>Value is {numberSignal}</span>);
await nextFrame();
expect(result.container.textContent).to.equal('Value is 42');

simulateReceivedChange(subscription, createSnapshotEvent(99));
simulateReceivedChange(subscription, createAcceptedEvent(99, 'set'));
await nextFrame();
expect(result.container.textContent).to.equal('Value is 99');
});
Expand Down
Loading