Skip to content
This repository has been archived by the owner on Sep 11, 2024. It is now read-only.

Commit

Permalink
Live location sharing - Stop publishing location to beacons with cons…
Browse files Browse the repository at this point in the history
…ecutive errors (#8194)

* add error state after consecutive errors

Signed-off-by: Kerry Archibald <kerrya@element.io>

* polish

Signed-off-by: Kerry Archibald <kerrya@element.io>

* comment

Signed-off-by: Kerry Archibald <kerrya@element.io>

* remove debug

Signed-off-by: Kerry Archibald <kerrya@element.io>
  • Loading branch information
Kerry authored Mar 30, 2022
1 parent 31cd7ed commit d092051
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 7 deletions.
67 changes: 62 additions & 5 deletions src/stores/OwnBeaconStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,14 @@ const isOwnBeacon = (beacon: Beacon, userId: string): boolean => beacon.beaconIn
export enum OwnBeaconStoreEvent {
LivenessChange = 'OwnBeaconStore.LivenessChange',
MonitoringLivePosition = 'OwnBeaconStore.MonitoringLivePosition',
WireError = 'WireError',
}

const MOVING_UPDATE_INTERVAL = 2000;
const STATIC_UPDATE_INTERVAL = 30000;

const BAIL_AFTER_CONSECUTIVE_ERROR_COUNT = 2;

type OwnBeaconStoreState = {
beacons: Map<string, Beacon>;
beaconWireErrors: Map<string, Beacon>;
Expand All @@ -65,9 +68,11 @@ export class OwnBeaconStore extends AsyncStoreWithClient<OwnBeaconStoreState> {
public readonly beacons = new Map<string, Beacon>();
public readonly beaconsByRoomId = new Map<Room['roomId'], Set<string>>();
/**
* Track over the wire errors for beacons
* Track over the wire errors for published positions
* Counts consecutive wire errors per beacon
* Reset on successful publish of location
*/
public readonly beaconWireErrors = new Map<string, Error>();
public readonly beaconWireErrorCounts = new Map<string, number>();
private liveBeaconIds = [];
private locationInterval: number;
private geolocationError: GeolocationError | undefined;
Expand Down Expand Up @@ -106,7 +111,7 @@ export class OwnBeaconStore extends AsyncStoreWithClient<OwnBeaconStoreState> {
this.beacons.clear();
this.beaconsByRoomId.clear();
this.liveBeaconIds = [];
this.beaconWireErrors.clear();
this.beaconWireErrorCounts.clear();
}

protected async onReady(): Promise<void> {
Expand All @@ -125,6 +130,25 @@ export class OwnBeaconStore extends AsyncStoreWithClient<OwnBeaconStoreState> {
return !!this.getLiveBeaconIds(roomId).length;
}

/**
* If a beacon has failed to publish position
* past the allowed consecutive failure count (BAIL_AFTER_CONSECUTIVE_ERROR_COUNT)
* Then consider it to have an error
*/
public hasWireError(beaconId: string): boolean {
return this.beaconWireErrorCounts.get(beaconId) >= BAIL_AFTER_CONSECUTIVE_ERROR_COUNT;
}

public resetWireError(beaconId: string): void {
this.incrementBeaconWireErrorCount(beaconId, false);

// always publish to all live beacons together
// instead of just one that was changed
// to keep lastPublishedTimestamp simple
// and extra published locations don't hurt
this.publishCurrentLocationToBeacons();
}

public getLiveBeaconIds(roomId?: string): string[] {
if (!roomId) {
return this.liveBeaconIds;
Expand Down Expand Up @@ -202,6 +226,13 @@ export class OwnBeaconStore extends AsyncStoreWithClient<OwnBeaconStoreState> {
* State management
*/

/**
* Live beacon ids that do not have wire errors
*/
private get healthyLiveBeaconIds() {
return this.liveBeaconIds.filter(beaconId => !this.hasWireError(beaconId));
}

private initialiseBeaconState = () => {
const userId = this.matrixClient.getUserId();
const visibleRooms = this.matrixClient.getVisibleRooms();
Expand Down Expand Up @@ -399,7 +430,7 @@ export class OwnBeaconStore extends AsyncStoreWithClient<OwnBeaconStoreState> {
*/
private publishLocationToBeacons = async (position: TimedGeoUri) => {
this.lastPublishedPositionTimestamp = Date.now();
await Promise.all(this.liveBeaconIds.map(beaconId =>
await Promise.all(this.healthyLiveBeaconIds.map(beaconId =>
this.sendLocationToBeacon(this.beacons.get(beaconId), position)),
);
};
Expand All @@ -413,9 +444,35 @@ export class OwnBeaconStore extends AsyncStoreWithClient<OwnBeaconStoreState> {
const content = makeBeaconContent(geoUri, timestamp, beacon.beaconInfoId);
try {
await this.matrixClient.sendEvent(beacon.roomId, M_BEACON.name, content);
this.incrementBeaconWireErrorCount(beacon.identifier, false);
} catch (error) {
logger.error(error);
this.beaconWireErrors.set(beacon.identifier, error);
this.incrementBeaconWireErrorCount(beacon.identifier, true);
}
};

/**
* Manage beacon wire error count
* - clear count for beacon when not error
* - increment count for beacon when is error
* - emit if beacon error count crossed threshold
*/
private incrementBeaconWireErrorCount = (beaconId: string, isError: boolean): void => {
const hadError = this.hasWireError(beaconId);

if (isError) {
// increment error count
this.beaconWireErrorCounts.set(
beaconId,
(this.beaconWireErrorCounts.get(beaconId) ?? 0) + 1,
);
} else {
// clear any error count
this.beaconWireErrorCounts.delete(beaconId);
}

if (this.hasWireError(beaconId) !== hadError) {
this.emit(OwnBeaconStoreEvent.WireError, beaconId);
}
};
}
134 changes: 132 additions & 2 deletions test/stores/OwnBeaconStore-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ describe('OwnBeaconStore', () => {
geolocation = mockGeolocation();
mockClient.getVisibleRooms.mockReturnValue([]);
mockClient.unstable_setLiveBeacon.mockClear().mockResolvedValue({ event_id: '1' });
mockClient.sendEvent.mockClear().mockResolvedValue({ event_id: '1' });
mockClient.sendEvent.mockReset().mockResolvedValue({ event_id: '1' });
jest.spyOn(global.Date, 'now').mockReturnValue(now);
jest.spyOn(OwnBeaconStore.instance, 'emit').mockRestore();
jest.spyOn(logger, 'error').mockRestore();
Expand Down Expand Up @@ -696,7 +696,7 @@ describe('OwnBeaconStore', () => {
});
});

describe('sending positions', () => {
describe('publishing positions', () => {
it('stops watching position when user has no more live beacons', async () => {
// geolocation is only going to emit 1 position
geolocation.watchPosition.mockImplementation(
Expand Down Expand Up @@ -825,6 +825,136 @@ describe('OwnBeaconStore', () => {
});
});

describe('when publishing position fails', () => {
beforeEach(() => {
geolocation.watchPosition.mockImplementation(
watchPositionMockImplementation([0, 1000, 3000, 3000, 3000]),
);

// eat expected console error logs
jest.spyOn(logger, 'error').mockImplementation(() => { });
});

// we need to advance time and then flush promises
// individually for each call to sendEvent
// otherwise the sendEvent doesn't reject/resolve and update state
// before the next call
// advance and flush every 1000ms
// until given ms is 'elapsed'
const advanceAndFlushPromises = async (timeMs: number) => {
while (timeMs > 0) {
jest.advanceTimersByTime(1000);
await flushPromisesWithFakeTimers();
timeMs -= 1000;
}
};

it('continues publishing positions after one publish error', async () => {
// fail to send first event, then succeed
mockClient.sendEvent.mockRejectedValueOnce(new Error('oups')).mockResolvedValue({ event_id: '1' });
makeRoomsWithStateEvents([
alicesRoom1BeaconInfo,
]);
const store = await makeOwnBeaconStore();
// wait for store to settle
await flushPromisesWithFakeTimers();

await advanceAndFlushPromises(50000);

// called for each position from watchPosition
expect(mockClient.sendEvent).toHaveBeenCalledTimes(5);
expect(store.hasWireError(alicesRoom1BeaconInfo.getType())).toBe(false);
});

it('continues publishing positions when a beacon fails intermittently', async () => {
// every second event rejects
// meaning this beacon has more errors than the threshold
// but they are not consecutive
mockClient.sendEvent
.mockRejectedValueOnce(new Error('oups'))
.mockResolvedValueOnce({ event_id: '1' })
.mockRejectedValueOnce(new Error('oups'))
.mockResolvedValueOnce({ event_id: '1' })
.mockRejectedValueOnce(new Error('oups'));

makeRoomsWithStateEvents([
alicesRoom1BeaconInfo,
]);
const store = await makeOwnBeaconStore();
const emitSpy = jest.spyOn(store, 'emit');
// wait for store to settle
await flushPromisesWithFakeTimers();

await advanceAndFlushPromises(50000);

// called for each position from watchPosition
expect(mockClient.sendEvent).toHaveBeenCalledTimes(5);
expect(store.hasWireError(alicesRoom1BeaconInfo.getType())).toBe(false);
expect(emitSpy).not.toHaveBeenCalledWith(
OwnBeaconStoreEvent.WireError, alicesRoom1BeaconInfo.getType(),
);
});

it('stops publishing positions when a beacon fails consistently', async () => {
// always fails to send events
mockClient.sendEvent.mockRejectedValue(new Error('oups'));
makeRoomsWithStateEvents([
alicesRoom1BeaconInfo,
]);
const store = await makeOwnBeaconStore();
const emitSpy = jest.spyOn(store, 'emit');
// wait for store to settle
await flushPromisesWithFakeTimers();

// 5 positions from watchPosition in this period
await advanceAndFlushPromises(50000);

// only two allowed failures
expect(mockClient.sendEvent).toHaveBeenCalledTimes(2);
expect(store.hasWireError(alicesRoom1BeaconInfo.getType())).toBe(true);
expect(emitSpy).toHaveBeenCalledWith(
OwnBeaconStoreEvent.WireError, alicesRoom1BeaconInfo.getType(),
);
});

it('restarts publishing a beacon after resetting wire error', async () => {
// always fails to send events
mockClient.sendEvent.mockRejectedValue(new Error('oups'));
makeRoomsWithStateEvents([
alicesRoom1BeaconInfo,
]);
const store = await makeOwnBeaconStore();
const emitSpy = jest.spyOn(store, 'emit');
// wait for store to settle
await flushPromisesWithFakeTimers();

// 3 positions from watchPosition in this period
await advanceAndFlushPromises(4000);

// only two allowed failures
expect(mockClient.sendEvent).toHaveBeenCalledTimes(2);
expect(store.hasWireError(alicesRoom1BeaconInfo.getType())).toBe(true);
expect(emitSpy).toHaveBeenCalledWith(
OwnBeaconStoreEvent.WireError, alicesRoom1BeaconInfo.getType(),
);

// reset emitSpy mock counts to asser on wireError again
emitSpy.mockClear();
store.resetWireError(alicesRoom1BeaconInfo.getType());

expect(store.hasWireError(alicesRoom1BeaconInfo.getType())).toBe(false);

// 2 more positions from watchPosition in this period
await advanceAndFlushPromises(10000);

// 2 from before, 2 new ones
expect(mockClient.sendEvent).toHaveBeenCalledTimes(4);
expect(emitSpy).toHaveBeenCalledWith(
OwnBeaconStoreEvent.WireError, alicesRoom1BeaconInfo.getType(),
);
});
});

it('publishes subsequent positions', async () => {
// modern fake timers + debounce + promises are not friends
// just testing that positions are published
Expand Down

0 comments on commit d092051

Please sign in to comment.