Skip to content

Commit

Permalink
feat: integrate LocalDiscovery & LocalPeers
Browse files Browse the repository at this point in the history
  • Loading branch information
gmaclennan committed Nov 9, 2023
1 parent 0444a0b commit 04c8cc3
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 61 deletions.
21 changes: 15 additions & 6 deletions src/local-peers.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const MESSAGES_MAX_ID = Math.max.apply(null, [...Object.values(MESSAGE_TYPES)])
* @property {string | undefined} name
*/
/** @typedef {PeerInfoBase & { status: 'connecting' }} PeerInfoConnecting */
/** @typedef {PeerInfoBase & { status: 'connected', connectedAt: number, protomux: Protomux }} PeerInfoConnected */
/** @typedef {PeerInfoBase & { status: 'connected', connectedAt: number, protomux: Protomux<import('@hyperswarm/secret-stream')> }} PeerInfoConnected */
/** @typedef {PeerInfoBase & { status: 'disconnected', disconnectedAt: number }} PeerInfoDisconnected */

/** @typedef {PeerInfoConnecting | PeerInfoConnected | PeerInfoDisconnected} PeerInfoInternal */
Expand All @@ -57,7 +57,7 @@ class Peer {
#name
#connectedAt = 0
#disconnectedAt = 0
/** @type {Protomux} */
/** @type {Protomux<import('@hyperswarm/secret-stream')>} */
#protomux

/**
Expand Down Expand Up @@ -103,7 +103,7 @@ class Peer {
}
}
}
/** @param {Protomux} protomux */
/** @param {Protomux<import('@hyperswarm/secret-stream')>} protomux */
connect(protomux) {
this.#protomux = protomux
/* c8 ignore next 3 */
Expand Down Expand Up @@ -166,7 +166,9 @@ class Peer {
/**
* @typedef {object} LocalPeersEvents
* @property {(peers: PeerInfo[]) => void} peers Emitted whenever the connection status of peers changes. An array of peerInfo objects with a peer id and the peer connection status
* @property {(peer: PeerInfoConnected) => void} peer-add Emitted when a new peer is connected
* @property {(peerId: string, invite: InviteWithKeys) => void} invite Emitted when an invite is received
* @property {(discoveryKey: Buffer, stream: import('./types.js').ReplicationStream) => void} discovery-key Emitted when a new hypercore is replicated (by a peer) to a peer replication stream (passed as the second parameter)
*/

/** @extends {TypedEmitter<LocalPeersEvents>} */
Expand Down Expand Up @@ -272,6 +274,13 @@ export class LocalPeers extends TypedEmitter {
stream.userData = protomux
this.#opening.add(stream.opened)

protomux.pair(
{ protocol: 'hypercore/alpha' },
/** @param {Buffer} discoveryKey */ async (discoveryKey) => {
this.emit('discovery-key', discoveryKey, stream.rawStream)
}
)

// No need to connect error handler to stream because Protomux does this,
// and errors are eventually handled by #closePeer

Expand Down Expand Up @@ -319,16 +328,16 @@ export class LocalPeers extends TypedEmitter {

/**
* @param {Buffer} publicKey
* @param {Protomux} protomux
* @param {Protomux<import('@hyperswarm/secret-stream')>} protomux
*/
#openPeer(publicKey, protomux) {
const peerId = keyToId(publicKey)
const peer = this.#peers.get(peerId)
/* c8 ignore next */
if (!peer) return // TODO: report error - this should not happen
const wasConnected = peer.info.status === 'connected'
peer.connect(protomux)
if (!wasConnected) this.#emitPeers()
this.#emitPeers()
this.emit('peer-add', /** @type {PeerInfoConnected} */ (peer.info))
}

/** @param {Buffer} publicKey */
Expand Down
67 changes: 49 additions & 18 deletions src/mapeo-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ import { ProjectKeys } from './generated/keys.js'
import {
deNullify,
getDeviceId,
keyToId,
openedNoiseSecretStream,
projectIdToNonce,
projectKeyToId,
projectKeyToPublicId,
} from './utils.js'
import { RandomAccessFilePool } from './core-manager/random-access-file-pool.js'
import { LocalPeers } from './local-peers.js'
import { InviteApi } from './invite-api.js'
import { LocalDiscovery } from './discovery/local-discovery.js'

/** @typedef {import("@mapeo/schema").ProjectSettingsValue} ProjectValue */

Expand All @@ -48,8 +51,9 @@ export class MapeoManager {
#coreStorage
#dbFolder
#deviceId
#rpc
#localPeers
#invite
#localDiscovery

/**
* @param {Object} opts
Expand All @@ -69,7 +73,7 @@ export class MapeoManager {
migrationsFolder: new URL('../drizzle/client', import.meta.url).pathname,
})

this.#rpc = new LocalPeers()
this.#localPeers = new LocalPeers()
this.#keyManager = new KeyManager(rootKey)
this.#deviceId = getDeviceId(this.#keyManager)
this.#projectSettingsIndexWriter = new IndexWriter({
Expand All @@ -79,7 +83,7 @@ export class MapeoManager {
this.#activeProjects = new Map()

this.#invite = new InviteApi({
rpc: this.#rpc,
rpc: this.#localPeers,
queries: {
isMember: async (projectId) => {
const projectExists = this.#db
Expand All @@ -99,17 +103,43 @@ export class MapeoManager {
if (typeof coreStorage === 'string') {
const pool = new RandomAccessFilePool(MAX_FILE_DESCRIPTORS)
// @ts-ignore
this.#coreStorage = Hypercore.createStorage(coreStorage, { pool })
this.#coreStorage = Hypercore.defaultStorage(coreStorage, { pool })
} else {
this.#coreStorage = coreStorage
}

this.#localDiscovery = new LocalDiscovery({
identityKeypair: this.#keyManager.getIdentityKeypair(),
})
this.#localDiscovery.on('connection', this.replicate.bind(this))
}

/**
* MapeoRPC instance, used for tests
*/
get [kRPC]() {
return this.#rpc
return this.#localPeers
}

/**
* Replicate Mapeo to a `@hyperswarm/secret-stream`. Should only be used for
* local (trusted) connections, because the RPC channel key is public
*
* @param {import('@hyperswarm/secret-stream')<any>} noiseStream
*/
replicate(noiseStream) {
const replicationStream = this.#localPeers.connect(noiseStream)
Promise.all([this.getDeviceInfo(), openedNoiseSecretStream(noiseStream)])
.then(([{ name }, openedNoiseStream]) => {
if (openedNoiseStream.destroyed || !name) return
const peerId = keyToId(openedNoiseStream.remotePublicKey)
return this.#localPeers.sendDeviceInfo(peerId, { name })
})
.catch((e) => {
// Ignore error but log
console.error('Failed to send device info to peer', e)
})
return replicationStream
}

/**
Expand Down Expand Up @@ -205,15 +235,10 @@ export class MapeoManager {
})

// 4. Create MapeoProject instance
const project = new MapeoProject({
...this.#projectStorage(projectId),
const project = this.#createProjectInstance({
encryptionKeys,
keyManager: this.#keyManager,
projectKey: projectKeypair.publicKey,
projectSecretKey: projectKeypair.secretKey,
sharedDb: this.#db,
sharedIndexWriter: this.#projectSettingsIndexWriter,
rpc: this.#rpc,
})

// 5. Write project name and any other relevant metadata to project instance
Expand Down Expand Up @@ -263,19 +288,25 @@ export class MapeoManager {
projectId
)

const project = new MapeoProject({
const project = this.#createProjectInstance(projectKeys)

// 3. Keep track of project instance as we know it's a properly existing project
this.#activeProjects.set(projectPublicId, project)

return project
}

/** @param {ProjectKeys} projectKeys */
#createProjectInstance(projectKeys) {
const projectId = keyToId(projectKeys.projectKey)
return new MapeoProject({
...this.#projectStorage(projectId),
...projectKeys,
keyManager: this.#keyManager,
sharedDb: this.#db,
sharedIndexWriter: this.#projectSettingsIndexWriter,
rpc: this.#rpc,
localPeers: this.#localPeers,
})

// 3. Keep track of project instance as we know it's a properly existing project
this.#activeProjects.set(projectPublicId, project)

return project
}

/**
Expand Down
34 changes: 29 additions & 5 deletions src/mapeo-project.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import { Capabilities } from './capabilities.js'
import { getDeviceId, projectKeyToId, valueOf } from './utils.js'
import { MemberApi } from './member-api.js'
import { SyncController } from './sync/sync-controller.js'
import Hypercore from 'hypercore'

/** @typedef {Omit<import('@mapeo/schema').ProjectSettingsValue, 'schemaName'>} EditableProjectSettings */

Expand Down Expand Up @@ -67,7 +68,7 @@ export class MapeoProject {
* @param {import('drizzle-orm/better-sqlite3').BetterSQLite3Database} opts.sharedDb
* @param {IndexWriter} opts.sharedIndexWriter
* @param {import('./types.js').CoreStorage} opts.coreStorage Folder to store all hypercore data
* @param {import('./local-peers.js').LocalPeers} opts.rpc
* @param {import('./local-peers.js').LocalPeers} opts.localPeers
*
*/
constructor({
Expand All @@ -79,7 +80,7 @@ export class MapeoProject {
projectKey,
projectSecretKey,
encryptionKeys,
rpc,
localPeers,
}) {
this.#deviceId = getDeviceId(keyManager)
this.#projectId = projectKeyToId(projectKey)
Expand Down Expand Up @@ -237,7 +238,7 @@ export class MapeoProject {
// @ts-expect-error
encryptionKeys,
projectKey,
rpc,
rpc: localPeers,
dataTypes: {
deviceInfo: this.#dataTypes.deviceInfo,
project: this.#dataTypes.projectSettings,
Expand All @@ -249,6 +250,26 @@ export class MapeoProject {
capabilities: this.#capabilities,
})

// Replicate already connected local peers
for (const peer of localPeers.peers) {
if (peer.status !== 'connected') continue
this.#syncController.replicate(peer.protomux)
}

localPeers.on('discovery-key', (discoveryKey, stream) => {
// The core identified by this discovery key might not be part of this
// project, but we can't know that so we will request it from the peer if
// we don't have it. The peer will not return the core key unless it _is_
// part of this project
this.#coreManager.handleDiscoveryKey(discoveryKey, stream)
})

// When a new peer is found, try to replicate (if it is not a member of the
// project it will fail the capability check and be ignored)
localPeers.on('peer-add', (peer) => {
this.#syncController.replicate(peer.protomux)
})

///////// 4. Write core ownership record

const deferred = pDefer()
Expand Down Expand Up @@ -396,11 +417,14 @@ export class MapeoProject {

/**
*
* @param {import('./types.js').ReplicationStream} stream
* @param {Exclude<Parameters<Hypercore.createProtocolStream>[0], boolean>} stream A duplex stream, a @hyperswarm/secret-stream, or a Protomux instance
* @returns
*/
[kReplicate](stream) {
return this.#syncController.replicate(stream)
const replicationStream = Hypercore.createProtocolStream(stream, {})
const protomux = replicationStream.noiseStream.userData
// @ts-ignore - got fed up jumping through hoops to keep TS heppy
return this.#syncController.replicate(protomux)
}

/**
Expand Down
35 changes: 4 additions & 31 deletions src/sync/sync-controller.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import Hypercore from 'hypercore'
import { TypedEmitter } from 'tiny-typed-emitter'
import Protomux from 'protomux'
import { SyncState } from './sync-state.js'
import { PeerSyncController } from './peer-sync-controller.js'

export class SyncController extends TypedEmitter {
#syncState
#coreManager
#capabilities
/** @type {Map<Protomux, PeerSyncController>} */
/** @type {Map<import('protomux'), PeerSyncController>} */
#peerSyncControllers = new Map()

/**
Expand All @@ -30,35 +28,10 @@ export class SyncController extends TypedEmitter {
}

/**
* @param {Exclude<Parameters<Hypercore.createProtocolStream>[0], boolean>} stream A duplex stream, a @hyperswarm/secret-stream, or a Protomux instance
* @param {import('protomux')<import('@hyperswarm/secret-stream')>} protomux A protomux instance
*/
replicate(stream) {
if (
Protomux.isProtomux(stream) ||
('userData' in stream && Protomux.isProtomux(stream.userData)) ||
('noiseStream' in stream &&
Protomux.isProtomux(stream.noiseStream.userData))
) {
console.warn(
'Passed an existing protocol stream to syncController.replicate(). Currently any pairing for the `hypercore/alpha` protocol is overwritten'
)
}
const protocolStream = Hypercore.createProtocolStream(stream, {
ondiscoverykey: /** @param {Buffer} discoveryKey */ (discoveryKey) => {
return this.#coreManager.handleDiscoveryKey(discoveryKey, stream)
},
})
const protomux =
// Need to coerce this until we update Hypercore.createProtocolStream types
/** @type {import('protomux')<import('@hyperswarm/secret-stream')>} */ (
protocolStream.noiseStream.userData
)
if (!protomux) throw new Error('Invalid stream')

if (this.#peerSyncControllers.has(protomux)) {
console.warn('Already replicating to this stream')
return
}
replicate(protomux) {
if (this.#peerSyncControllers.has(protomux)) return

const peerSyncController = new PeerSyncController({
protomux,
Expand Down
2 changes: 1 addition & 1 deletion test-types/data-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const mapeoProject = new MapeoProject({
tables: [projectSettingsTable],
sqlite,
}),
rpc: new LocalPeers(),
localPeers: new LocalPeers(),
})

///// Observations
Expand Down

0 comments on commit 04c8cc3

Please sign in to comment.