Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement media server in MapeoManager #365

Merged
merged 27 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
39bc4cb
update blob api
achou11 Oct 31, 2023
077492c
major refactoring
achou11 Oct 31, 2023
7f5284c
update blob plugin tests
achou11 Oct 31, 2023
421b010
set up media server in MapeoManager
achou11 Nov 1, 2023
df3544b
rename getBaseUrl opt to getMediaBaseUrl in BlobApi
achou11 Nov 1, 2023
b2f0aec
small format fix
achou11 Nov 1, 2023
81d909a
create MediaServer class
achou11 Nov 6, 2023
b44c2fa
remove unused symbol
achou11 Nov 6, 2023
2800c72
simplify manager start + stop test
achou11 Nov 6, 2023
8bef7ef
shorten names of prefix constants
achou11 Nov 6, 2023
84c9f74
update test message
achou11 Nov 6, 2023
cc61a9b
Add failing test
gmaclennan Nov 7, 2023
58ed4f0
speed up timeout test with fake timers
gmaclennan Nov 7, 2023
c2503cc
bind this.getProject to this
gmaclennan Nov 7, 2023
95b5d62
use projectPublicId in blob-api
gmaclennan Nov 7, 2023
943dc63
temp fix to blobApi
gmaclennan Nov 7, 2023
b154eea
small plugin updates
achou11 Nov 7, 2023
4c38fff
Merge branch 'main' into media-server
achou11 Nov 7, 2023
e50272d
fix issues with start and stop
achou11 Nov 8, 2023
64861a3
remove accidental solo
achou11 Nov 8, 2023
9ae53a6
Merge branch 'main' into media-server
achou11 Nov 9, 2023
5b0d7e8
move fastify to direct deps
achou11 Nov 9, 2023
a90efe4
use call instead of bind for server.listen()
achou11 Nov 9, 2023
3d4bbd4
remove onRequest hook
achou11 Nov 9, 2023
3dfbf27
update BlobApi projectId opt to projectPublicId
achou11 Nov 9, 2023
2e13b42
comment out failing BlobApi test assertion
achou11 Nov 9, 2023
3282473
reinsert accidentally removed ts-expect-error
achou11 Nov 9, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 95 additions & 64 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
"depcheck": "^1.4.3",
"drizzle-kit": "^0.19.12",
"eslint": "^8.39.0",
"fastify": "^4.20.0",
"fastify": "^4.24.3",
"husky": "^8.0.0",
"light-my-request": "^5.10.0",
"lint-staged": "^14.0.1",
Expand Down
33 changes: 20 additions & 13 deletions src/blob-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,24 @@ import { createHash } from 'node:crypto'
import sodium from 'sodium-universal'
import b4a from 'b4a'

import { getPort } from './blob-server/index.js'

/** @typedef {import('./types.js').BlobId} BlobId */
/** @typedef {import('./types.js').BlobType} BlobType */
/** @typedef {import('./types.js').BlobVariant<BlobType>} BlobVariant */

export class BlobApi {
#blobStore
#getMediaBaseUrl
#projectId

/**
* @param {object} options
* @param {string} options.projectId
* @param {import('./blob-store/index.js').BlobStore} options.blobStore
* @param {import('fastify').FastifyInstance} options.blobServer
* @param {() => Promise<string>} options.getMediaBaseUrl
*/
constructor({ projectId, blobStore, blobServer }) {
this.projectId = projectId
this.blobStore = blobStore
this.blobServer = blobServer
constructor({ projectId, blobStore, getMediaBaseUrl }) {
this.#blobStore = blobStore
this.#getMediaBaseUrl = getMediaBaseUrl
this.#projectId = projectId
}

/**
Expand All @@ -30,8 +31,14 @@ export class BlobApi {
*/
async getUrl(blobId) {
const { driveId, type, variant, name } = blobId
const port = await getPort(this.blobServer.server)
return `http://127.0.0.1:${port}/${this.projectId}/${driveId}/${type}/${variant}/${name}`

let base = await this.#getMediaBaseUrl()

if (!base.endsWith('/')) {
base += '/'
}

return base + `${this.#projectId}/${driveId}/${type}/${variant}/${name}`
}

/**
Expand Down Expand Up @@ -86,7 +93,7 @@ export class BlobApi {
}

return {
driveId: this.blobStore.writerDriveId,
driveId: this.#blobStore.writerDriveId,
name,
type: blobType,
hash: contentHash.digest('hex'),
Expand All @@ -108,7 +115,7 @@ export class BlobApi {
hash,

// @ts-ignore TODO: remove driveId property from createWriteStream
this.blobStore.createWriteStream({ type, variant, name }, { metadata })
this.#blobStore.createWriteStream({ type, variant, name }, { metadata })
)

return { name, variant, type, hash }
Expand All @@ -117,7 +124,7 @@ export class BlobApi {
// @ts-ignore TODO: return value types don't match pipeline's expectations, though they should
await pipeline(
fs.createReadStream(filepath),
this.blobStore.createWriteStream({ type, variant, name }, { metadata })
this.#blobStore.createWriteStream({ type, variant, name }, { metadata })
)

return { name, variant, type }
Expand Down
40 changes: 0 additions & 40 deletions src/blob-server/index.js

This file was deleted.

16 changes: 11 additions & 5 deletions src/blob-server/fastify-plugin.js → src/fastify-plugins/blobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export default fp(blobServerPlugin, {
/**
* @typedef {Object} BlobServerPluginOpts
*
* @property {(projectId: string) => import('../blob-store/index.js').BlobStore} getBlobStore
* @property {(projectPublicId: string) => Promise<import('../blob-store/index.js').BlobStore>} getBlobStore
*/

const BLOB_TYPES = /** @type {BlobId['type'][]} */ (
Expand All @@ -27,8 +27,14 @@ const BLOB_VARIANTS = [
const HEX_REGEX_32_BYTES = '^[0-9a-fA-F]{64}$'
const HEX_STRING_32_BYTES = T.String({ pattern: HEX_REGEX_32_BYTES })

const Z_BASE_32_REGEX_32_BYTES = '^[0-9a-zA-Z]{52}$'
const Z_BASE_32_STRING_32_BYTES = T.String({
pattern: Z_BASE_32_REGEX_32_BYTES,
})

const PARAMS_JSON_SCHEMA = T.Object({
projectId: HEX_STRING_32_BYTES,
// the projectPublicId is encoded to a z-base-32 52-character string (32 bytes)
projectPublicId: Z_BASE_32_STRING_32_BYTES,
driveId: HEX_STRING_32_BYTES,
type: T.Union(
BLOB_TYPES.map((type) => {
Expand Down Expand Up @@ -57,10 +63,10 @@ async function routes(fastify, options) {
const { getBlobStore } = options

fastify.get(
'/:projectId/:driveId/:type/:variant/:name',
'/:projectPublicId/:driveId/:type/:variant/:name',
{ schema: { params: PARAMS_JSON_SCHEMA } },
async (request, reply) => {
const { projectId, ...blobId } = request.params
const { projectPublicId, ...blobId } = request.params

if (!isValidBlobId(blobId)) {
reply.code(400)
Expand All @@ -72,7 +78,7 @@ async function routes(fastify, options) {

let blobStore
try {
blobStore = getBlobStore(projectId)
blobStore = await getBlobStore(projectPublicId)
} catch (e) {
reply.code(404)
throw e
Expand Down
69 changes: 67 additions & 2 deletions src/mapeo-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import { eq } from 'drizzle-orm'
import { drizzle } from 'drizzle-orm/better-sqlite3'
import { migrate } from 'drizzle-orm/better-sqlite3/migrator'
import Hypercore from 'hypercore'
import fastify from 'fastify'
import pDefer from 'p-defer'

import { IndexWriter } from './index-writer/index.js'
import { MapeoProject, kSetOwnDeviceInfo } from './mapeo-project.js'
import { MapeoProject, kBlobStore, kSetOwnDeviceInfo } from './mapeo-project.js'
import {
localDeviceInfoTable,
projectKeysTable,
Expand All @@ -24,6 +27,7 @@ import {
import { RandomAccessFilePool } from './core-manager/random-access-file-pool.js'
import { LocalPeers } from './local-peers.js'
import { InviteApi } from './invite-api.js'
import BlobServerPlugin from './fastify-plugins/blobs.js'

/** @typedef {import("@mapeo/schema").ProjectSettingsValue} ProjectValue */

Expand All @@ -35,7 +39,11 @@ const CLIENT_SQLITE_FILE_NAME = 'client.db'
// other things e.g. SQLite and other parts of the app.
const MAX_FILE_DESCRIPTORS = 768

const MEDIA_SERVER_BLOBS_PREFIX = 'blobs'
const MEDIA_SERVER_ICONS_PREFIX = 'icons'

export const kRPC = Symbol('rpc')
export const kClose = Symbol('close')

export class MapeoManager {
#keyManager
Expand All @@ -50,14 +58,18 @@ export class MapeoManager {
#deviceId
#rpc
#invite
/** @type {import('p-defer').DeferredPromise<string>} */
#deferredMediaServerListen
#mediaServer

/**
* @param {Object} opts
* @param {Buffer} opts.rootKey 16-bytes of random data that uniquely identify the device, used to derive a 32-byte master key, which is used to derive all the keypairs used for Mapeo
* @param {string} opts.dbFolder Folder for sqlite Dbs. Folder must exist. Use ':memory:' to store everything in-memory
* @param {string | import('./types.js').CoreStorage} opts.coreStorage Folder for hypercore storage or a function that returns a RandomAccessStorage instance
* @param {{port?: number, logger: import('fastify').FastifyServerOptions['logger'] }} [opts.mediaServerOpts]
*/
constructor({ rootKey, dbFolder, coreStorage }) {
constructor({ rootKey, dbFolder, coreStorage, mediaServerOpts }) {
this.#dbFolder = dbFolder
const sqlite = new Database(
dbFolder === ':memory:'
Expand Down Expand Up @@ -103,6 +115,25 @@ export class MapeoManager {
} else {
this.#coreStorage = coreStorage
}

this.#mediaServer = fastify({ logger: mediaServerOpts?.logger })

this.#mediaServer.register(BlobServerPlugin, {
prefix: MEDIA_SERVER_BLOBS_PREFIX,
getBlobStore: async (projectPublicId) => {
const project = await this.getProject(projectPublicId)
return project[kBlobStore]
},
})

this.#deferredMediaServerListen = pDefer()
achou11 marked this conversation as resolved.
Show resolved Hide resolved
this.#mediaServer
.listen({ port: mediaServerOpts?.port, host: '127.0.0.1' })
achou11 marked this conversation as resolved.
Show resolved Hide resolved
.then(this.#deferredMediaServerListen.resolve)
.catch((err) => {
console.error('Could not start media server', err)
this.#deferredMediaServerListen.reject(err)
})
achou11 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand All @@ -112,6 +143,32 @@ export class MapeoManager {
return this.#rpc
}

/**
* @param {'blobs' | 'icons'} mediaType
* @returns
*/
async #getMediaBaseUrl(mediaType) {
await this.#deferredMediaServerListen.promise

let address = this.#mediaServer.server.address()
achou11 marked this conversation as resolved.
Show resolved Hide resolved

// Should happen but just in case
if (!address) throw new Error('Could not get address')

if (typeof address !== 'string') {
address = address.address
}

switch (mediaType) {
case 'blobs': {
return `${address}/${MEDIA_SERVER_BLOBS_PREFIX}`
}
case 'icons': {
return `${address}/${MEDIA_SERVER_ICONS_PREFIX}`
}
}
}

/**
* @param {Buffer} keysCipher
* @param {string} projectId
Expand Down Expand Up @@ -214,6 +271,7 @@ export class MapeoManager {
sharedDb: this.#db,
sharedIndexWriter: this.#projectSettingsIndexWriter,
rpc: this.#rpc,
getMediaBaseUrl: this.#getMediaBaseUrl,
})

// 5. Write project name and any other relevant metadata to project instance
Expand Down Expand Up @@ -270,6 +328,7 @@ export class MapeoManager {
sharedDb: this.#db,
sharedIndexWriter: this.#projectSettingsIndexWriter,
rpc: this.#rpc,
getMediaBaseUrl: this.#getMediaBaseUrl,
})

// 3. Keep track of project instance as we know it's a properly existing project
Expand Down Expand Up @@ -425,4 +484,10 @@ export class MapeoManager {
get invite() {
return this.#invite
}

async [kClose]() {
achou11 marked this conversation as resolved.
Show resolved Hide resolved
// Needs to be called to ensure that the server.listen() finished fully
await this.#deferredMediaServerListen.promise
await this.#mediaServer.close()
}
}
19 changes: 8 additions & 11 deletions src/mapeo-project.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import { CoreManager, NAMESPACES } from './core-manager/index.js'
import { DataStore } from './datastore/index.js'
import { DataType, kCreateWithDocId } from './datatype/index.js'
import { BlobStore } from './blob-store/index.js'
import { createBlobServer } from './blob-server/index.js'
import { BlobApi } from './blob-api.js'
import { IndexWriter } from './index-writer/index.js'
import { projectSettingsTable } from './schema/client.js'
Expand Down Expand Up @@ -42,6 +41,7 @@ export const kCoreOwnership = Symbol('coreOwnership')
export const kCapabilities = Symbol('capabilities')
export const kSetOwnDeviceInfo = Symbol('kSetOwnDeviceInfo')
export const kReplicate = Symbol('replicate')
export const kBlobStore = Symbol('blobStore')

export class MapeoProject {
#projectId
Expand All @@ -50,7 +50,6 @@ export class MapeoProject {
#dataStores
#dataTypes
#blobStore
#blobServer
#coreOwnership
#capabilities
#ownershipWriteDone
Expand All @@ -68,6 +67,7 @@ export class MapeoProject {
* @param {IndexWriter} opts.sharedIndexWriter
* @param {import('./types.js').CoreStorage} opts.coreStorage Folder to store all hypercore data
* @param {import('./local-peers.js').LocalPeers} opts.rpc
* @param {(mediaType: 'blobs' | 'icons') => Promise<string>} opts.getMediaBaseUrl
*
*/
constructor({
Expand All @@ -80,6 +80,7 @@ export class MapeoProject {
projectSecretKey,
encryptionKeys,
rpc,
getMediaBaseUrl,
}) {
this.#deviceId = getDeviceId(keyManager)
this.#projectId = projectKeyToId(projectKey)
Expand Down Expand Up @@ -205,18 +206,10 @@ export class MapeoProject {
coreManager: this.#coreManager,
})

this.#blobServer = createBlobServer({
logger: true,
blobStore: this.#blobStore,
prefix: '/blobs/',
projectId: this.#projectId,
})

// @ts-ignore TODO: pass in blobServer
this.$blobs = new BlobApi({
projectId: this.#projectId,
blobStore: this.#blobStore,
blobServer: this.#blobServer,
getMediaBaseUrl: async () => getMediaBaseUrl('blobs'),
})

this.#coreOwnership = new CoreOwnership({
Expand Down Expand Up @@ -286,6 +279,10 @@ export class MapeoProject {
return this.#capabilities
}

get [kBlobStore]() {
return this.#blobStore
}

get deviceId() {
return this.#deviceId
}
Expand Down
Loading
Loading