Skip to content

Commit

Permalink
Merge branch 'main' into fix/local-peers
Browse files Browse the repository at this point in the history
* main:
  fix: fix writing to blob store when also calculating content hash (#370)
  integrate icon plugin into MediaServer (#369)
  • Loading branch information
gmaclennan committed Nov 14, 2023
2 parents 2224f2b + 4a18a9c commit 97df1c0
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 147 deletions.
112 changes: 38 additions & 74 deletions src/blob-api.js
Original file line number Diff line number Diff line change
@@ -1,27 +1,23 @@
import fs from 'node:fs'
import { pipeline } from 'node:stream/promises'
import { createHash } from 'node:crypto'
import sodium from 'sodium-universal'
import b4a from 'b4a'
// @ts-expect-error - pipelinePromise missing from streamx types
import { Transform, pipelinePromise as pipeline } from 'streamx'
import { createHash, randomBytes } from 'node:crypto'

/** @typedef {import('./types.js').BlobId} BlobId */
/** @typedef {import('./types.js').BlobType} BlobType */

export class BlobApi {
#blobStore
#getMediaBaseUrl
#projectPublicId

/**
* @param {object} options
* @param {string} options.projectPublicId
* @param {import('./blob-store/index.js').BlobStore} options.blobStore
* @param {() => Promise<string>} options.getMediaBaseUrl
*/
constructor({ projectPublicId, blobStore, getMediaBaseUrl }) {
constructor({ blobStore, getMediaBaseUrl }) {
this.#blobStore = blobStore
this.#getMediaBaseUrl = getMediaBaseUrl
this.#projectPublicId = projectPublicId
}

/**
Expand All @@ -38,9 +34,7 @@ export class BlobApi {
base += '/'
}

return (
base + `${this.#projectPublicId}/${driveId}/${type}/${variant}/${name}`
)
return base + `${driveId}/${type}/${variant}/${name}`
}

/**
Expand All @@ -52,85 +46,55 @@ export class BlobApi {
async create(filepaths, metadata) {
const { original, preview, thumbnail } = filepaths
const { mimeType } = metadata
const blobType = getType(mimeType)
const hash = b4a.alloc(8)
sodium.randombytes_buf(hash)
const name = hash.toString('hex')

const contentHash = createHash('sha256')

await this.writeFile(
original,
{
name: `${name}`,
variant: 'original',
type: blobType,
},
metadata
// contentHash
const type = getType(mimeType)
const name = randomBytes(8).toString('hex')
const hash = createHash('sha256')

const ws = this.#blobStore.createWriteStream(
{ type, variant: 'original', name },
{ metadata }
)
const writePromises = [
pipeline(fs.createReadStream(original), hashTransform(hash), ws),
]

if (preview) {
await this.writeFile(
preview,
{
name: `${name}`,
variant: 'preview',
type: blobType,
},
metadata
const ws = this.#blobStore.createWriteStream(
{ type, variant: 'preview', name },
{ metadata }
)
writePromises.push(pipeline(fs.createReadStream(preview), ws))
}

if (thumbnail) {
await this.writeFile(
thumbnail,
{
name: `${name}`,
variant: 'thumbnail',
type: blobType,
},
metadata
const ws = this.#blobStore.createWriteStream(
{ type, variant: 'thumbnail', name },
{ metadata }
)
writePromises.push(pipeline(fs.createReadStream(thumbnail), ws))
}

await Promise.all(writePromises)

return {
driveId: this.#blobStore.writerDriveId,
name,
type: blobType,
hash: contentHash.digest('hex'),
type,
hash: hash.digest('hex'),
}
}
}

/**
* @param {string} filepath
* @param {Omit<BlobId, 'driveId'>} options
* @param {object} metadata
* @param {string} metadata.mimeType
* @param {import('node:crypto').Hash} [hash]
*/
async writeFile(filepath, { name, variant, type }, metadata, hash) {
if (hash) {
// @ts-ignore TODO: return value types don't match pipeline's expectations, though they should
await pipeline(
fs.createReadStream(filepath),
hash,

// @ts-ignore TODO: remove driveId property from createWriteStream
this.#blobStore.createWriteStream({ type, variant, name }, { metadata })
)

return { name, variant, type, hash }
}

// @ts-ignore TODO: return value types don't match pipeline's expectations, though they should
await pipeline(
fs.createReadStream(filepath),
this.#blobStore.createWriteStream({ type, variant, name }, { metadata })
)

return { name, variant, type }
}
/**
* @param {import('node:crypto').Hash} hash
*/
function hashTransform(hash) {
return new Transform({
transform: (data, cb) => {
hash.update(data)
cb(null, data)
},
})
}

/**
Expand Down
11 changes: 3 additions & 8 deletions src/icon-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ const MIME_TO_EXTENSION = {
}

export class IconApi {
#projectId
#dataType
#dataStore
#getMediaBaseUrl
Expand All @@ -36,13 +35,11 @@ export class IconApi {
* import('@mapeo/schema').IconValue
* >} opts.iconDataType
* @param {import('./datastore/index.js').DataStore<'config'>} opts.iconDataStore
* @param {string} opts.projectId
* @param {() => Promise<string>} opts.getMediaBaseUrl
*/
constructor({ iconDataType, iconDataStore, projectId, getMediaBaseUrl }) {
constructor({ iconDataType, iconDataStore, getMediaBaseUrl }) {
this.#dataType = iconDataType
this.#dataStore = iconDataStore
this.#projectId = projectId
this.#getMediaBaseUrl = getMediaBaseUrl
}

Expand Down Expand Up @@ -110,15 +107,13 @@ export class IconApi {
base += '/'
}

base += `${this.#projectId}/${iconId}/`

const mimeExtension = MIME_TO_EXTENSION[opts.mimeType]

if (opts.mimeType === 'image/svg+xml') {
return base + `${opts.size}${mimeExtension}`
return base + `${iconId}/${opts.size}${mimeExtension}`
}

return base + `${opts.size}@${opts.pixelDensity}x${mimeExtension}`
return base + `${iconId}/${opts.size}@${opts.pixelDensity}x${mimeExtension}`
}
}

Expand Down
38 changes: 23 additions & 15 deletions src/mapeo-project.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ export class MapeoProject {
#capabilities
#ownershipWriteDone
#memberApi
#projectPublicId
#iconApi
#syncApi
#l
Expand Down Expand Up @@ -98,7 +97,6 @@ export class MapeoProject {
this.#l = Logger.create('project', logger)
this.#deviceId = getDeviceId(keyManager)
this.#projectId = projectKeyToId(projectKey)
this.#projectPublicId = projectKeyToPublicId(projectKey)

///////// 1. Setup database
const sqlite = new Database(dbPath)
Expand Down Expand Up @@ -137,6 +135,7 @@ export class MapeoProject {
coreOwnershipTable,
roleTable,
deviceInfoTable,
iconTable,
],
sqlite,
getWinner,
Expand Down Expand Up @@ -219,16 +218,6 @@ export class MapeoProject {
}),
}

this.#blobStore = new BlobStore({
coreManager: this.#coreManager,
})

this.$blobs = new BlobApi({
projectPublicId: this.#projectPublicId,
blobStore: this.#blobStore,
getMediaBaseUrl: async () => getMediaBaseUrl('blobs'),
})

this.#coreOwnership = new CoreOwnership({
dataType: this.#dataTypes.coreOwnership,
})
Expand All @@ -254,13 +243,32 @@ export class MapeoProject {
},
})

const projectPublicId = projectKeyToPublicId(projectKey)

this.#blobStore = new BlobStore({
coreManager: this.#coreManager,
})

this.$blobs = new BlobApi({
blobStore: this.#blobStore,
getMediaBaseUrl: async () => {
let base = await getMediaBaseUrl('blobs')
if (!base.endsWith('/')) {
base += '/'
}
return base + projectPublicId
},
})

this.#iconApi = new IconApi({
iconDataStore: this.#dataStores.config,
iconDataType: this.#dataTypes.icon,
projectId: this.#projectId,
// TODO: Update after merging https://github.com/digidem/mapeo-core-next/pull/365
getMediaBaseUrl: async () => {
throw new Error('Not yet implemented')
let base = await getMediaBaseUrl('icons')
if (!base.endsWith('/')) {
base += '/'
}
return base + projectPublicId
},
})

Expand Down
7 changes: 7 additions & 0 deletions src/media-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import pTimeout from 'p-timeout'
import StateMachine from 'start-stop-state-machine'

import BlobServerPlugin from './fastify-plugins/blobs.js'
import IconServerPlugin from './fastify-plugins/icons.js'

import { kBlobStore } from './mapeo-project.js'

export const BLOBS_PREFIX = 'blobs'
Expand Down Expand Up @@ -44,6 +46,11 @@ export class MediaServer {
},
})

this.#fastify.register(IconServerPlugin, {
prefix: ICONS_PREFIX,
getProject,
})

this.#serverState = new StateMachine({
start: this.#startServer.bind(this),
stop: this.#stopServer.bind(this),
Expand Down
Loading

0 comments on commit 97df1c0

Please sign in to comment.