From 04c8cc3e3494224016f560c22bec40e39da9db29 Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Thu, 26 Oct 2023 15:34:10 +0900 Subject: [PATCH] feat: integrate LocalDiscovery & LocalPeers --- src/local-peers.js | 21 ++++++++---- src/mapeo-manager.js | 67 +++++++++++++++++++++++++++---------- src/mapeo-project.js | 34 ++++++++++++++++--- src/sync/sync-controller.js | 35 +++---------------- test-types/data-types.ts | 2 +- 5 files changed, 98 insertions(+), 61 deletions(-) diff --git a/src/local-peers.js b/src/local-peers.js index 5989a5bd8..32c0d2305 100644 --- a/src/local-peers.js +++ b/src/local-peers.js @@ -30,7 +30,7 @@ const MESSAGES_MAX_ID = Math.max.apply(null, [...Object.values(MESSAGE_TYPES)]) * @property {string | undefined} name */ /** @typedef {PeerInfoBase & { status: 'connecting' }} PeerInfoConnecting */ -/** @typedef {PeerInfoBase & { status: 'connected', connectedAt: number, protomux: Protomux }} PeerInfoConnected */ +/** @typedef {PeerInfoBase & { status: 'connected', connectedAt: number, protomux: Protomux }} PeerInfoConnected */ /** @typedef {PeerInfoBase & { status: 'disconnected', disconnectedAt: number }} PeerInfoDisconnected */ /** @typedef {PeerInfoConnecting | PeerInfoConnected | PeerInfoDisconnected} PeerInfoInternal */ @@ -57,7 +57,7 @@ class Peer { #name #connectedAt = 0 #disconnectedAt = 0 - /** @type {Protomux} */ + /** @type {Protomux} */ #protomux /** @@ -103,7 +103,7 @@ class Peer { } } } - /** @param {Protomux} protomux */ + /** @param {Protomux} protomux */ connect(protomux) { this.#protomux = protomux /* c8 ignore next 3 */ @@ -166,7 +166,9 @@ class Peer { /** * @typedef {object} LocalPeersEvents * @property {(peers: PeerInfo[]) => void} peers Emitted whenever the connection status of peers changes. An array of peerInfo objects with a peer id and the peer connection status + * @property {(peer: PeerInfoConnected) => void} peer-add Emitted when a new peer is connected * @property {(peerId: string, invite: InviteWithKeys) => void} invite Emitted when an invite is received + * @property {(discoveryKey: Buffer, stream: import('./types.js').ReplicationStream) => void} discovery-key Emitted when a new hypercore is replicated (by a peer) to a peer replication stream (passed as the second parameter) */ /** @extends {TypedEmitter} */ @@ -272,6 +274,13 @@ export class LocalPeers extends TypedEmitter { stream.userData = protomux this.#opening.add(stream.opened) + protomux.pair( + { protocol: 'hypercore/alpha' }, + /** @param {Buffer} discoveryKey */ async (discoveryKey) => { + this.emit('discovery-key', discoveryKey, stream.rawStream) + } + ) + // No need to connect error handler to stream because Protomux does this, // and errors are eventually handled by #closePeer @@ -319,16 +328,16 @@ export class LocalPeers extends TypedEmitter { /** * @param {Buffer} publicKey - * @param {Protomux} protomux + * @param {Protomux} protomux */ #openPeer(publicKey, protomux) { const peerId = keyToId(publicKey) const peer = this.#peers.get(peerId) /* c8 ignore next */ if (!peer) return // TODO: report error - this should not happen - const wasConnected = peer.info.status === 'connected' peer.connect(protomux) - if (!wasConnected) this.#emitPeers() + this.#emitPeers() + this.emit('peer-add', /** @type {PeerInfoConnected} */ (peer.info)) } /** @param {Buffer} publicKey */ diff --git a/src/mapeo-manager.js b/src/mapeo-manager.js index 32965a237..e37110e2e 100644 --- a/src/mapeo-manager.js +++ b/src/mapeo-manager.js @@ -17,6 +17,8 @@ import { ProjectKeys } from './generated/keys.js' import { deNullify, getDeviceId, + keyToId, + openedNoiseSecretStream, projectIdToNonce, projectKeyToId, projectKeyToPublicId, @@ -24,6 +26,7 @@ import { import { RandomAccessFilePool } from './core-manager/random-access-file-pool.js' import { LocalPeers } from './local-peers.js' import { InviteApi } from './invite-api.js' +import { LocalDiscovery } from './discovery/local-discovery.js' /** @typedef {import("@mapeo/schema").ProjectSettingsValue} ProjectValue */ @@ -48,8 +51,9 @@ export class MapeoManager { #coreStorage #dbFolder #deviceId - #rpc + #localPeers #invite + #localDiscovery /** * @param {Object} opts @@ -69,7 +73,7 @@ export class MapeoManager { migrationsFolder: new URL('../drizzle/client', import.meta.url).pathname, }) - this.#rpc = new LocalPeers() + this.#localPeers = new LocalPeers() this.#keyManager = new KeyManager(rootKey) this.#deviceId = getDeviceId(this.#keyManager) this.#projectSettingsIndexWriter = new IndexWriter({ @@ -79,7 +83,7 @@ export class MapeoManager { this.#activeProjects = new Map() this.#invite = new InviteApi({ - rpc: this.#rpc, + rpc: this.#localPeers, queries: { isMember: async (projectId) => { const projectExists = this.#db @@ -99,17 +103,43 @@ export class MapeoManager { if (typeof coreStorage === 'string') { const pool = new RandomAccessFilePool(MAX_FILE_DESCRIPTORS) // @ts-ignore - this.#coreStorage = Hypercore.createStorage(coreStorage, { pool }) + this.#coreStorage = Hypercore.defaultStorage(coreStorage, { pool }) } else { this.#coreStorage = coreStorage } + + this.#localDiscovery = new LocalDiscovery({ + identityKeypair: this.#keyManager.getIdentityKeypair(), + }) + this.#localDiscovery.on('connection', this.replicate.bind(this)) } /** * MapeoRPC instance, used for tests */ get [kRPC]() { - return this.#rpc + return this.#localPeers + } + + /** + * Replicate Mapeo to a `@hyperswarm/secret-stream`. Should only be used for + * local (trusted) connections, because the RPC channel key is public + * + * @param {import('@hyperswarm/secret-stream')} noiseStream + */ + replicate(noiseStream) { + const replicationStream = this.#localPeers.connect(noiseStream) + Promise.all([this.getDeviceInfo(), openedNoiseSecretStream(noiseStream)]) + .then(([{ name }, openedNoiseStream]) => { + if (openedNoiseStream.destroyed || !name) return + const peerId = keyToId(openedNoiseStream.remotePublicKey) + return this.#localPeers.sendDeviceInfo(peerId, { name }) + }) + .catch((e) => { + // Ignore error but log + console.error('Failed to send device info to peer', e) + }) + return replicationStream } /** @@ -205,15 +235,10 @@ export class MapeoManager { }) // 4. Create MapeoProject instance - const project = new MapeoProject({ - ...this.#projectStorage(projectId), + const project = this.#createProjectInstance({ encryptionKeys, - keyManager: this.#keyManager, projectKey: projectKeypair.publicKey, projectSecretKey: projectKeypair.secretKey, - sharedDb: this.#db, - sharedIndexWriter: this.#projectSettingsIndexWriter, - rpc: this.#rpc, }) // 5. Write project name and any other relevant metadata to project instance @@ -263,19 +288,25 @@ export class MapeoManager { projectId ) - const project = new MapeoProject({ + const project = this.#createProjectInstance(projectKeys) + + // 3. Keep track of project instance as we know it's a properly existing project + this.#activeProjects.set(projectPublicId, project) + + return project + } + + /** @param {ProjectKeys} projectKeys */ + #createProjectInstance(projectKeys) { + const projectId = keyToId(projectKeys.projectKey) + return new MapeoProject({ ...this.#projectStorage(projectId), ...projectKeys, keyManager: this.#keyManager, sharedDb: this.#db, sharedIndexWriter: this.#projectSettingsIndexWriter, - rpc: this.#rpc, + localPeers: this.#localPeers, }) - - // 3. Keep track of project instance as we know it's a properly existing project - this.#activeProjects.set(projectPublicId, project) - - return project } /** diff --git a/src/mapeo-project.js b/src/mapeo-project.js index ed24cbc50..cf9805174 100644 --- a/src/mapeo-project.js +++ b/src/mapeo-project.js @@ -33,6 +33,7 @@ import { Capabilities } from './capabilities.js' import { getDeviceId, projectKeyToId, valueOf } from './utils.js' import { MemberApi } from './member-api.js' import { SyncController } from './sync/sync-controller.js' +import Hypercore from 'hypercore' /** @typedef {Omit} EditableProjectSettings */ @@ -67,7 +68,7 @@ export class MapeoProject { * @param {import('drizzle-orm/better-sqlite3').BetterSQLite3Database} opts.sharedDb * @param {IndexWriter} opts.sharedIndexWriter * @param {import('./types.js').CoreStorage} opts.coreStorage Folder to store all hypercore data - * @param {import('./local-peers.js').LocalPeers} opts.rpc + * @param {import('./local-peers.js').LocalPeers} opts.localPeers * */ constructor({ @@ -79,7 +80,7 @@ export class MapeoProject { projectKey, projectSecretKey, encryptionKeys, - rpc, + localPeers, }) { this.#deviceId = getDeviceId(keyManager) this.#projectId = projectKeyToId(projectKey) @@ -237,7 +238,7 @@ export class MapeoProject { // @ts-expect-error encryptionKeys, projectKey, - rpc, + rpc: localPeers, dataTypes: { deviceInfo: this.#dataTypes.deviceInfo, project: this.#dataTypes.projectSettings, @@ -249,6 +250,26 @@ export class MapeoProject { capabilities: this.#capabilities, }) + // Replicate already connected local peers + for (const peer of localPeers.peers) { + if (peer.status !== 'connected') continue + this.#syncController.replicate(peer.protomux) + } + + localPeers.on('discovery-key', (discoveryKey, stream) => { + // The core identified by this discovery key might not be part of this + // project, but we can't know that so we will request it from the peer if + // we don't have it. The peer will not return the core key unless it _is_ + // part of this project + this.#coreManager.handleDiscoveryKey(discoveryKey, stream) + }) + + // When a new peer is found, try to replicate (if it is not a member of the + // project it will fail the capability check and be ignored) + localPeers.on('peer-add', (peer) => { + this.#syncController.replicate(peer.protomux) + }) + ///////// 4. Write core ownership record const deferred = pDefer() @@ -396,11 +417,14 @@ export class MapeoProject { /** * - * @param {import('./types.js').ReplicationStream} stream + * @param {Exclude[0], boolean>} stream A duplex stream, a @hyperswarm/secret-stream, or a Protomux instance * @returns */ [kReplicate](stream) { - return this.#syncController.replicate(stream) + const replicationStream = Hypercore.createProtocolStream(stream, {}) + const protomux = replicationStream.noiseStream.userData + // @ts-ignore - got fed up jumping through hoops to keep TS heppy + return this.#syncController.replicate(protomux) } /** diff --git a/src/sync/sync-controller.js b/src/sync/sync-controller.js index f68d2f6e1..a32b6cb95 100644 --- a/src/sync/sync-controller.js +++ b/src/sync/sync-controller.js @@ -1,6 +1,4 @@ -import Hypercore from 'hypercore' import { TypedEmitter } from 'tiny-typed-emitter' -import Protomux from 'protomux' import { SyncState } from './sync-state.js' import { PeerSyncController } from './peer-sync-controller.js' @@ -8,7 +6,7 @@ export class SyncController extends TypedEmitter { #syncState #coreManager #capabilities - /** @type {Map} */ + /** @type {Map} */ #peerSyncControllers = new Map() /** @@ -30,35 +28,10 @@ export class SyncController extends TypedEmitter { } /** - * @param {Exclude[0], boolean>} stream A duplex stream, a @hyperswarm/secret-stream, or a Protomux instance + * @param {import('protomux')} protomux A protomux instance */ - replicate(stream) { - if ( - Protomux.isProtomux(stream) || - ('userData' in stream && Protomux.isProtomux(stream.userData)) || - ('noiseStream' in stream && - Protomux.isProtomux(stream.noiseStream.userData)) - ) { - console.warn( - 'Passed an existing protocol stream to syncController.replicate(). Currently any pairing for the `hypercore/alpha` protocol is overwritten' - ) - } - const protocolStream = Hypercore.createProtocolStream(stream, { - ondiscoverykey: /** @param {Buffer} discoveryKey */ (discoveryKey) => { - return this.#coreManager.handleDiscoveryKey(discoveryKey, stream) - }, - }) - const protomux = - // Need to coerce this until we update Hypercore.createProtocolStream types - /** @type {import('protomux')} */ ( - protocolStream.noiseStream.userData - ) - if (!protomux) throw new Error('Invalid stream') - - if (this.#peerSyncControllers.has(protomux)) { - console.warn('Already replicating to this stream') - return - } + replicate(protomux) { + if (this.#peerSyncControllers.has(protomux)) return const peerSyncController = new PeerSyncController({ protomux, diff --git a/test-types/data-types.ts b/test-types/data-types.ts index 10b2fcb15..aa24bed80 100644 --- a/test-types/data-types.ts +++ b/test-types/data-types.ts @@ -36,7 +36,7 @@ const mapeoProject = new MapeoProject({ tables: [projectSettingsTable], sqlite, }), - rpc: new LocalPeers(), + localPeers: new LocalPeers(), }) ///// Observations