Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add RPC sendDeviceInfo() + device-info event #270

Merged
merged 4 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ message InviteResponse {
bytes projectKey = 1;
Decision decision = 2;
}

message DeviceInfo {
string name = 1;
}
17 changes: 17 additions & 0 deletions src/generated/rpc.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ export declare enum InviteResponse_Decision {
}
export declare function inviteResponse_DecisionFromJSON(object: any): InviteResponse_Decision;
export declare function inviteResponse_DecisionToNumber(object: InviteResponse_Decision): number;
export interface DeviceInfo {
name: string;
}
export declare const Invite: {
encode(message: Invite, writer?: _m0.Writer): _m0.Writer;
decode(input: _m0.Reader | Uint8Array, length?: number): Invite;
Expand Down Expand Up @@ -124,3 +127,17 @@ export declare const InviteResponse: {
decision?: InviteResponse_Decision;
} & { [K_1 in Exclude<keyof I_1, keyof InviteResponse>]: never; }>(object: I_1): InviteResponse;
};
export declare const DeviceInfo: {
encode(message: DeviceInfo, writer?: _m0.Writer): _m0.Writer;
decode(input: _m0.Reader | Uint8Array, length?: number): DeviceInfo;
create<I extends {
name?: string;
} & {
name?: string;
} & { [K in Exclude<keyof I, "name">]: never; }>(base?: I): DeviceInfo;
fromPartial<I_1 extends {
name?: string;
} & {
name?: string;
} & { [K_1 in Exclude<keyof I_1, "name">]: never; }>(object: I_1): DeviceInfo;
};
42 changes: 42 additions & 0 deletions src/generated/rpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,45 @@ export var InviteResponse = {
return message;
},
};
function createBaseDeviceInfo() {
return { name: "" };
}
export var DeviceInfo = {
encode: function (message, writer) {
if (writer === void 0) { writer = _m0.Writer.create(); }
if (message.name !== "") {
writer.uint32(10).string(message.name);
}
return writer;
},
decode: function (input, length) {
var reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input);
var end = length === undefined ? reader.len : reader.pos + length;
var message = createBaseDeviceInfo();
while (reader.pos < end) {
var tag = reader.uint32();
switch (tag >>> 3) {
case 1:
if (tag !== 10) {
break;
}
message.name = reader.string();
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
}
reader.skipType(tag & 7);
}
return message;
},
create: function (base) {
return DeviceInfo.fromPartial(base !== null && base !== void 0 ? base : {});
},
fromPartial: function (object) {
var _a;
var message = createBaseDeviceInfo();
message.name = (_a = object.name) !== null && _a !== void 0 ? _a : "";
return message;
},
};
49 changes: 49 additions & 0 deletions src/generated/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ export function inviteResponse_DecisionToNumber(object: InviteResponse_Decision)
}
}

export interface DeviceInfo {
name: string;
}

function createBaseInvite(): Invite {
return { projectKey: Buffer.alloc(0), encryptionKeys: undefined };
}
Expand Down Expand Up @@ -229,6 +233,51 @@ export const InviteResponse = {
},
};

function createBaseDeviceInfo(): DeviceInfo {
return { name: "" };
}

export const DeviceInfo = {
encode(message: DeviceInfo, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.name !== "") {
writer.uint32(10).string(message.name);
}
return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): DeviceInfo {
const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseDeviceInfo();
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
if (tag !== 10) {
break;
}

message.name = reader.string();
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
}
reader.skipType(tag & 7);
}
return message;
},

create<I extends Exact<DeepPartial<DeviceInfo>, I>>(base?: I): DeviceInfo {
return DeviceInfo.fromPartial(base ?? ({} as any));
},
fromPartial<I extends Exact<DeepPartial<DeviceInfo>, I>>(object: I): DeviceInfo {
const message = createBaseDeviceInfo();
message.name = object.name ?? "";
return message;
},
};

type Builtin = Date | Function | Uint8Array | string | number | boolean | undefined;

type DeepPartial<T> = T extends Builtin ? T
Expand Down
44 changes: 36 additions & 8 deletions src/rpc/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import Protomux from 'protomux'
import { openedNoiseSecretStream, keyToId } from '../utils.js'
import cenc from 'compact-encoding'
import {
DeviceInfo,
Invite,
InviteResponse,
InviteResponse_Decision,
Expand All @@ -14,13 +15,12 @@ const PROTOCOL_NAME = 'mapeo/rpc'
// Protomux message types depend on the order that messages are added to a
// channel (this needs to remain consistent). To avoid breaking changes, the
// types here should not change.
//
// TODO: Add @satisfies to check this matches the imports from './messages.js'
// when we switch to Typescript v5
const MESSAGE_TYPES = /** @type {const} */ ({
/** @satisfies {{ [k in keyof typeof import('../generated/rpc.js')]?: number }} */
const MESSAGE_TYPES = {
Invite: 0,
InviteResponse: 1,
})
DeviceInfo: 2,
}
const MESSAGES_MAX_ID = Math.max.apply(null, [...Object.values(MESSAGE_TYPES)])

/** @typedef {Peer['info']} PeerInfoInternal */
Expand Down Expand Up @@ -100,6 +100,13 @@ class Peer {
const messageType = MESSAGE_TYPES.InviteResponse
this.#channel.messages[messageType].send(buf)
}
/** @param {DeviceInfo} deviceInfo */
sendDeviceInfo(deviceInfo) {
this.#assertConnected()
const buf = Buffer.from(DeviceInfo.encode(deviceInfo).finish())
const messageType = MESSAGE_TYPES.DeviceInfo
this.#channel.messages[messageType].send(buf)
}
#assertConnected() {
if (this.#state === 'connected' && !this.#channel.closed) return
/* c8 ignore next */
Expand All @@ -111,6 +118,7 @@ class Peer {
* @typedef {object} MapeoRPCEvents
* @property {(peers: PeerInfo[]) => void} peers Emitted whenever the connection status of peers changes. An array of peerInfo objects with a peer id and the peer connection status
* @property {(peerId: string, invite: InviteWithKeys) => void} invite Emitted when an invite is received
* @property {(deviceInfo: DeviceInfo & { deviceId: string }) => void} device-info Emitted when we receive device info for a device
*/

/** @extends {TypedEmitter<MapeoRPCEvents>} */
Expand Down Expand Up @@ -189,6 +197,17 @@ export class MapeoRPC extends TypedEmitter {
peer.sendInviteResponse(options)
}

/**
*
* @param {string} peerId id of the peer you want to send to (publicKey of peer as hex string)
* @param {DeviceInfo} deviceInfo device info to send
*/
sendDeviceInfo(peerId, deviceInfo) {
const peer = this.#peers.get(peerId)
if (!peer) throw new UnknownPeerError('Unknown peer ' + peerId)
peer.sendDeviceInfo(deviceInfo)
}

/**
* Connect to a peer over an existing NoiseSecretStream
*
Expand Down Expand Up @@ -315,9 +334,18 @@ export class MapeoRPC extends TypedEmitter {
peer.pendingInvites.set(projectId, [])
break
}
/* c8 ignore next 2 */
default:
// TODO: report unhandled message error
case 'DeviceInfo': {
const deviceInfo = DeviceInfo.decode(value)
this.emit('device-info', { ...deviceInfo, deviceId: peerId })
break
}
/* c8 ignore next 5 */
default: {
/** @type {never} */
const _exhaustiveCheck = type
return _exhaustiveCheck
// TODO: report unhandled message error
}
}
}
}
Expand Down
52 changes: 52 additions & 0 deletions tests/rpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -410,3 +410,55 @@ test('invalid stream', (t) => {
// @ts-expect-error
t.exception(() => r1.connect(regularStream), 'Invalid stream')
})

test('Send device info', async (t) => {
t.plan(3)

const r1 = new MapeoRPC()
const r2 = new MapeoRPC()

/** @type {import('../src/generated/rpc.js').DeviceInfo} */
const expectedDeviceInfo = { name: 'mapeo' }

r1.on('peers', async (peers) => {
t.is(peers.length, 1)
r1.sendDeviceInfo(peers[0].id, expectedDeviceInfo)
})

r2.on('device-info', ({ deviceId, ...deviceInfo }) => {
t.ok(deviceId)
t.alike(deviceInfo, expectedDeviceInfo)
})

replicate(r1, r2)
})

test('Reconnect peer and send device info', async (t) => {
t.plan(6)

const r1 = new MapeoRPC()
const r2 = new MapeoRPC()

/** @type {import('../src/generated/rpc.js').DeviceInfo} */
const expectedDeviceInfo = { name: 'mapeo' }

const destroy = replicate(r1, r2)
await once(r1, 'peers')
await destroy()

t.is(r1.peers.length, 1)
t.is(r1.peers[0].status, 'disconnected')

r2.on('device-info', ({ deviceId, ...deviceInfo }) => {
t.ok(deviceId)
t.alike(deviceInfo, expectedDeviceInfo)
})

replicate(r1, r2)

const [peers] = await once(r1, 'peers')
t.is(r1.peers.length, 1)
t.is(peers[0].status, 'connected')

r1.sendDeviceInfo(peers[0].id, expectedDeviceInfo)
})