Skip to content

Commit

Permalink
feat: Add RPC sendDeviceInfo() + device-info event (#270)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrew Chou <andrewchou@fastmail.com>
  • Loading branch information
2 people authored and Tomás Ciccola committed Oct 3, 2023
1 parent 5d6db79 commit 75cfe5f
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 8 deletions.
4 changes: 4 additions & 0 deletions proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ message InviteResponse {
bytes projectKey = 1;
Decision decision = 2;
}

message DeviceInfo {
string name = 1;
}
17 changes: 17 additions & 0 deletions src/generated/rpc.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ export declare enum InviteResponse_Decision {
}
export declare function inviteResponse_DecisionFromJSON(object: any): InviteResponse_Decision;
export declare function inviteResponse_DecisionToNumber(object: InviteResponse_Decision): number;
export interface DeviceInfo {
name: string;
}
export declare const Invite: {
encode(message: Invite, writer?: _m0.Writer): _m0.Writer;
decode(input: _m0.Reader | Uint8Array, length?: number): Invite;
Expand Down Expand Up @@ -124,3 +127,17 @@ export declare const InviteResponse: {
decision?: InviteResponse_Decision;
} & { [K_1 in Exclude<keyof I_1, keyof InviteResponse>]: never; }>(object: I_1): InviteResponse;
};
export declare const DeviceInfo: {
encode(message: DeviceInfo, writer?: _m0.Writer): _m0.Writer;
decode(input: _m0.Reader | Uint8Array, length?: number): DeviceInfo;
create<I extends {
name?: string;
} & {
name?: string;
} & { [K in Exclude<keyof I, "name">]: never; }>(base?: I): DeviceInfo;
fromPartial<I_1 extends {
name?: string;
} & {
name?: string;
} & { [K_1 in Exclude<keyof I_1, "name">]: never; }>(object: I_1): DeviceInfo;
};
42 changes: 42 additions & 0 deletions src/generated/rpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,45 @@ export var InviteResponse = {
return message;
},
};
function createBaseDeviceInfo() {
return { name: "" };
}
export var DeviceInfo = {
encode: function (message, writer) {
if (writer === void 0) { writer = _m0.Writer.create(); }
if (message.name !== "") {
writer.uint32(10).string(message.name);
}
return writer;
},
decode: function (input, length) {
var reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input);
var end = length === undefined ? reader.len : reader.pos + length;
var message = createBaseDeviceInfo();
while (reader.pos < end) {
var tag = reader.uint32();
switch (tag >>> 3) {
case 1:
if (tag !== 10) {
break;
}
message.name = reader.string();
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
}
reader.skipType(tag & 7);
}
return message;
},
create: function (base) {
return DeviceInfo.fromPartial(base !== null && base !== void 0 ? base : {});
},
fromPartial: function (object) {
var _a;
var message = createBaseDeviceInfo();
message.name = (_a = object.name) !== null && _a !== void 0 ? _a : "";
return message;
},
};
49 changes: 49 additions & 0 deletions src/generated/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ export function inviteResponse_DecisionToNumber(object: InviteResponse_Decision)
}
}

export interface DeviceInfo {
name: string;
}

function createBaseInvite(): Invite {
return { projectKey: Buffer.alloc(0), encryptionKeys: undefined };
}
Expand Down Expand Up @@ -229,6 +233,51 @@ export const InviteResponse = {
},
};

function createBaseDeviceInfo(): DeviceInfo {
return { name: "" };
}

export const DeviceInfo = {
encode(message: DeviceInfo, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.name !== "") {
writer.uint32(10).string(message.name);
}
return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): DeviceInfo {
const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseDeviceInfo();
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
if (tag !== 10) {
break;
}

message.name = reader.string();
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
}
reader.skipType(tag & 7);
}
return message;
},

create<I extends Exact<DeepPartial<DeviceInfo>, I>>(base?: I): DeviceInfo {
return DeviceInfo.fromPartial(base ?? ({} as any));
},
fromPartial<I extends Exact<DeepPartial<DeviceInfo>, I>>(object: I): DeviceInfo {
const message = createBaseDeviceInfo();
message.name = object.name ?? "";
return message;
},
};

type Builtin = Date | Function | Uint8Array | string | number | boolean | undefined;

type DeepPartial<T> = T extends Builtin ? T
Expand Down
44 changes: 36 additions & 8 deletions src/rpc/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import Protomux from 'protomux'
import { openedNoiseSecretStream, keyToId } from '../utils.js'
import cenc from 'compact-encoding'
import {
DeviceInfo,
Invite,
InviteResponse,
InviteResponse_Decision,
Expand All @@ -14,13 +15,12 @@ const PROTOCOL_NAME = 'mapeo/rpc'
// Protomux message types depend on the order that messages are added to a
// channel (this needs to remain consistent). To avoid breaking changes, the
// types here should not change.
//
// TODO: Add @satisfies to check this matches the imports from './messages.js'
// when we switch to Typescript v5
const MESSAGE_TYPES = /** @type {const} */ ({
/** @satisfies {{ [k in keyof typeof import('../generated/rpc.js')]?: number }} */
const MESSAGE_TYPES = {
Invite: 0,
InviteResponse: 1,
})
DeviceInfo: 2,
}
const MESSAGES_MAX_ID = Math.max.apply(null, [...Object.values(MESSAGE_TYPES)])

/** @typedef {Peer['info']} PeerInfoInternal */
Expand Down Expand Up @@ -100,6 +100,13 @@ class Peer {
const messageType = MESSAGE_TYPES.InviteResponse
this.#channel.messages[messageType].send(buf)
}
/** @param {DeviceInfo} deviceInfo */
sendDeviceInfo(deviceInfo) {
this.#assertConnected()
const buf = Buffer.from(DeviceInfo.encode(deviceInfo).finish())
const messageType = MESSAGE_TYPES.DeviceInfo
this.#channel.messages[messageType].send(buf)
}
#assertConnected() {
if (this.#state === 'connected' && !this.#channel.closed) return
/* c8 ignore next */
Expand All @@ -111,6 +118,7 @@ class Peer {
* @typedef {object} MapeoRPCEvents
* @property {(peers: PeerInfo[]) => void} peers Emitted whenever the connection status of peers changes. An array of peerInfo objects with a peer id and the peer connection status
* @property {(peerId: string, invite: InviteWithKeys) => void} invite Emitted when an invite is received
* @property {(deviceInfo: DeviceInfo & { deviceId: string }) => void} device-info Emitted when we receive device info for a device
*/

/** @extends {TypedEmitter<MapeoRPCEvents>} */
Expand Down Expand Up @@ -189,6 +197,17 @@ export class MapeoRPC extends TypedEmitter {
peer.sendInviteResponse(options)
}

/**
*
* @param {string} peerId id of the peer you want to send to (publicKey of peer as hex string)
* @param {DeviceInfo} deviceInfo device info to send
*/
sendDeviceInfo(peerId, deviceInfo) {
const peer = this.#peers.get(peerId)
if (!peer) throw new UnknownPeerError('Unknown peer ' + peerId)
peer.sendDeviceInfo(deviceInfo)
}

/**
* Connect to a peer over an existing NoiseSecretStream
*
Expand Down Expand Up @@ -315,9 +334,18 @@ export class MapeoRPC extends TypedEmitter {
peer.pendingInvites.set(projectId, [])
break
}
/* c8 ignore next 2 */
default:
// TODO: report unhandled message error
case 'DeviceInfo': {
const deviceInfo = DeviceInfo.decode(value)
this.emit('device-info', { ...deviceInfo, deviceId: peerId })
break
}
/* c8 ignore next 5 */
default: {
/** @type {never} */
const _exhaustiveCheck = type
return _exhaustiveCheck
// TODO: report unhandled message error
}
}
}
}
Expand Down
52 changes: 52 additions & 0 deletions tests/rpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -410,3 +410,55 @@ test('invalid stream', (t) => {
// @ts-expect-error
t.exception(() => r1.connect(regularStream), 'Invalid stream')
})

test('Send device info', async (t) => {
t.plan(3)

const r1 = new MapeoRPC()
const r2 = new MapeoRPC()

/** @type {import('../src/generated/rpc.js').DeviceInfo} */
const expectedDeviceInfo = { name: 'mapeo' }

r1.on('peers', async (peers) => {
t.is(peers.length, 1)
r1.sendDeviceInfo(peers[0].id, expectedDeviceInfo)
})

r2.on('device-info', ({ deviceId, ...deviceInfo }) => {
t.ok(deviceId)
t.alike(deviceInfo, expectedDeviceInfo)
})

replicate(r1, r2)
})

test('Reconnect peer and send device info', async (t) => {
t.plan(6)

const r1 = new MapeoRPC()
const r2 = new MapeoRPC()

/** @type {import('../src/generated/rpc.js').DeviceInfo} */
const expectedDeviceInfo = { name: 'mapeo' }

const destroy = replicate(r1, r2)
await once(r1, 'peers')
await destroy()

t.is(r1.peers.length, 1)
t.is(r1.peers[0].status, 'disconnected')

r2.on('device-info', ({ deviceId, ...deviceInfo }) => {
t.ok(deviceId)
t.alike(deviceInfo, expectedDeviceInfo)
})

replicate(r1, r2)

const [peers] = await once(r1, 'peers')
t.is(r1.peers.length, 1)
t.is(peers[0].status, 'connected')

r1.sendDeviceInfo(peers[0].id, expectedDeviceInfo)
})

0 comments on commit 75cfe5f

Please sign in to comment.