Skip to content

Commit

Permalink
Merge branch 'main' into blobstore-createreadstream-options-#80
Browse files Browse the repository at this point in the history
  • Loading branch information
sethvincent authored Aug 31, 2023
2 parents 1d9b95e + fb33178 commit 3b0db39
Show file tree
Hide file tree
Showing 14 changed files with 302 additions and 7 deletions.
118 changes: 118 additions & 0 deletions src/blob-api.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import fs from 'node:fs'
import { pipeline } from 'node:stream/promises'
import sodium from 'sodium-universal'
import b4a from 'b4a'

import { getPort } from './blob-server/index.js'

/** @typedef {import('./types.js').BlobId} BlobId */
/** @typedef {import('./types.js').BlobType} BlobType */
/** @typedef {import('./types.js').BlobVariant<BlobType>} BlobVariant */

export class BlobApi {
/**
* @param {object} options
* @param {string} options.projectId
* @param {import('./blob-store/index.js').BlobStore} options.blobStore
* @param {import('fastify').FastifyInstance} options.blobServer
*/
constructor({ projectId, blobStore, blobServer }) {
this.projectId = projectId
this.blobStore = blobStore
this.blobServer = blobServer
}

/**
* Get a url for a blob based on its BlobId
* @param {import('./types.js').BlobId} blobId
* @returns {Promise<string>}
*/
async getUrl(blobId) {
const { driveId, type, variant, name } = blobId
const port = await getPort(this.blobServer.server)
return `http://127.0.0.1:${port}/${this.projectId}/${driveId}/${type}/${variant}/${name}`
}

/**
* Write blobs for provided variants of a file
* @param {{ original: string, preview?: string, thumbnail?: string }} filepaths
* @param {{ mimeType: string }} metadata
* @returns {Promise<{ driveId: string, name: string, type: 'photo' | 'video' | 'audio' }>}
*/
async create(filepaths, metadata) {
const { original, preview, thumbnail } = filepaths
const { mimeType } = metadata
const blobType = getType(mimeType)
const hash = b4a.alloc(8)
sodium.randombytes_buf(hash)
const name = hash.toString('hex')

await this.writeFile(
original,
{
name: `${name}`,
variant: 'original',
type: blobType,
},
metadata
)

if (preview) {
await this.writeFile(
preview,
{
name: `${name}`,
variant: 'preview',
type: blobType,
},
metadata
)
}

if (thumbnail) {
await this.writeFile(
thumbnail,
{
name: `${name}`,
variant: 'thumbnail',
type: blobType,
},
metadata
)
}

return {
driveId: this.blobStore.writerDriveId,
name,
type: blobType,
}
}

/**
* @param {string} filepath
* @param {Omit<BlobId, 'driveId'>} options
* @param {object} metadata
* @param {string} metadata.mimeType
*/
async writeFile(filepath, { name, variant, type }, metadata) {
// @ts-ignore TODO: return value types don't match pipeline's expectations, though they should
await pipeline(
fs.createReadStream(filepath),
this.blobStore.createWriteStream({ type, variant, name }, { metadata })
)

return { name, variant, type }
}
}

/**
* @param {string} mimeType
* @returns {BlobType}
*/
function getType(mimeType) {
if (mimeType.startsWith('image')) return 'photo'
if (mimeType.startsWith('video')) return 'video'
if (mimeType.startsWith('audio')) return 'audio'

throw new Error(`Unsupported mimeType: ${mimeType}`)
}
17 changes: 16 additions & 1 deletion src/blob-server/index.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { once } from 'events'
import fastify from 'fastify'

import BlobServerPlugin from './fastify-plugin.js'
Expand All @@ -6,7 +7,6 @@ import BlobServerPlugin from './fastify-plugin.js'
* @param {object} opts
* @param {import('fastify').FastifyServerOptions['logger']} opts.logger
* @param {import('../blob-store/index.js').BlobStore} opts.blobStore
* @param {import('./fastify-plugin.js').BlobServerPluginOpts['getBlobStore']} opts.getBlobStore
* @param {import('fastify').RegisterOptions['prefix']} opts.prefix
* @param {string} opts.projectId Temporary option to enable `getBlobStore` option. Will be removed when multiproject support in Mapeo class is implemented.
*
Expand All @@ -23,3 +23,18 @@ export function createBlobServer({ logger, blobStore, prefix, projectId }) {
})
return server
}

/**
* @param {import('node:http').Server} server
* @returns {Promise<number>}
*/
export async function getPort(server) {
const address = server.address()

if (!address || !(typeof address === 'object') || !address.port) {
await once(server, 'listening')
return getPort(server)
}

return address.port
}
26 changes: 25 additions & 1 deletion src/mapeo-project.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import pDefer from 'p-defer'
import { CoreManager, NAMESPACES } from './core-manager/index.js'
import { DataStore } from './datastore/index.js'
import { DataType, kCreateWithDocId } from './datatype/index.js'
import { BlobStore } from './blob-store/index.js'
import { createBlobServer } from './blob-server/index.js'
import { BlobApi } from './blob-api.js'
import { IndexWriter } from './index-writer/index.js'
import { projectTable } from './schema/client.js'
import {
Expand All @@ -34,10 +37,12 @@ export const kCoreOwnership = Symbol('coreOwnership')
export const kCapabilities = Symbol('capabilities')

export class MapeoProject {
#projectId
#coreManager
#dataStores
#dataTypes
#projectId
#blobStore
#blobServer
#coreOwnership
#capabilities
#ownershipWriteDone
Expand Down Expand Up @@ -166,6 +171,25 @@ export class MapeoProject {
db,
}),
}

this.#blobStore = new BlobStore({
coreManager: this.#coreManager,
})

this.#blobServer = createBlobServer({
logger: true,
blobStore: this.#blobStore,
prefix: '/blobs/',
projectId: this.#projectId,
})

// @ts-ignore TODO: pass in blobServer
this.$blobs = new BlobApi({
projectId: this.#projectId,
blobStore: this.#blobStore,
blobServer: this.#blobServer,
})

this.#coreOwnership = new CoreOwnership({
dataType: this.#dataTypes.coreOwnership,
})
Expand Down
4 changes: 2 additions & 2 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import Hypercore from 'hypercore'
import RandomAccessStorage from 'random-access-storage'

type SupportedBlobVariants = typeof SUPPORTED_BLOB_VARIANTS
type BlobType = keyof SupportedBlobVariants
type BlobVariant<TBlobType extends BlobType> = TupleToUnion<
export type BlobType = keyof SupportedBlobVariants
export type BlobVariant<TBlobType extends BlobType> = TupleToUnion<
SupportedBlobVariants[TBlobType]
>

Expand Down
2 changes: 2 additions & 0 deletions test-types/data-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type ObservationWithForks = Observation & Forks
type PresetWithForks = Preset & Forks
type FieldWithForks = Field & Forks

const projectKey = randomBytes(32)
const keyManager = new KeyManager(randomBytes(32))
const sqlite = new Database(':memory:')

const mapeoProject = new MapeoProject({
Expand Down
112 changes: 112 additions & 0 deletions tests/blob-api.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import { join } from 'node:path'
import { fileURLToPath } from 'url'
import test from 'brittle'
import { BlobApi } from '../src/blob-api.js'
import { createBlobServer, getPort } from '../src/blob-server/index.js'
import { createBlobStore } from './helpers/blob-store.js'
import { timeoutException } from './helpers/index.js'

test('get port after listening event with explicit port', async (t) => {
const blobStore = createBlobStore()
const server = await createBlobServer({ blobStore })

t.ok(await timeoutException(getPort(server.server)))

await new Promise((resolve) => {
server.listen({ port: 3456 }, (err, address) => {
resolve(address)
})
})

const port = await getPort(server.server)

t.is(typeof port, 'number')
t.is(port, 3456)

t.teardown(async () => {
await server.close()
})
})

test('get port after listening event with unset port', async (t) => {
const blobStore = createBlobStore()
const server = await createBlobServer({ blobStore })

t.ok(await timeoutException(getPort(server.server)))

await new Promise((resolve) => {
server.listen({ port: 0 }, (err, address) => {
resolve(address)
})
})

const port = await getPort(server.server)

t.is(typeof port, 'number', 'port is a number')
t.teardown(async () => {
await server.close()
})
})

test('get url from blobId', async (t) => {
const projectId = '1234'
const type = 'image'
const variant = 'original'
const name = '1234'

const blobStore = createBlobStore()
const blobServer = await createBlobServer({ blobStore })
const blobApi = new BlobApi({ projectId: '1234', blobStore, blobServer })

await new Promise((resolve) => {
blobServer.listen({ port: 0 }, (err, address) => {
resolve(address)
})
})

const url = await blobApi.getUrl({ type, variant, name })

t.is(
url,
`http://127.0.0.1:${
blobServer.server.address().port
}/${projectId}/${blobStore.writerDriveId}/${type}/${variant}/${name}`
)
t.teardown(async () => {
await blobServer.close()
})
})

test('create blobs', async (t) => {
const { blobStore } = createBlobStore()
const blobServer = createBlobServer({ blobStore })
const blobApi = new BlobApi({ projectId: '1234', blobStore, blobServer })

await new Promise((resolve) => {
blobServer.listen({ port: 0 }, (err, address) => {
resolve(address)
})
})

const directory = fileURLToPath(
new URL('./fixtures/blob-api/', import.meta.url)
)

const attachment = await blobApi.create(
{
original: join(directory, 'original.png'),
preview: join(directory, 'preview.png'),
thumbnail: join(directory, 'thumbnail.png'),
},
{
mimeType: 'image/png',
}
)

t.is(attachment.driveId, blobStore.writerDriveId)
t.is(attachment.type, 'photo')

t.teardown(async () => {
await blobServer.close()
})
})
4 changes: 1 addition & 3 deletions tests/blob-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ import test from 'brittle'
import { readdirSync } from 'fs'
import { readFile } from 'fs/promises'
import path from 'path'
import { createCoreManager } from './helpers/core-manager.js'
import { BlobStore } from '../src/blob-store/index.js'
import { createBlobServer } from '../src/blob-server/index.js'
import BlobServerPlugin from '../src/blob-server/fastify-plugin.js'
import fastify from 'fastify'

import { replicateBlobs } from './helpers/blob-store.js'
import { replicateBlobs, createBlobStore } from './helpers/blob-store.js'

test('Plugin throws error if missing getBlobStore option', async (t) => {
const server = fastify()
Expand Down
Binary file added tests/fixtures/blob-api/original.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added tests/fixtures/blob-api/preview.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added tests/fixtures/blob-api/thumbnail.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Empty file added tests/helpers/blob-server.js
Empty file.
15 changes: 15 additions & 0 deletions tests/helpers/blob-store.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
// @ts-nocheck
import { replicate } from './core-manager.js'
import { pipelinePromise as pipeline, Writable } from 'streamx'

import { BlobStore } from '../../src/blob-store/index.js'
import { createCoreManager } from './core-manager.js'

/**
*
* @param {Object} options
* @returns
*/
export function createBlobStore(options = {}) {
const coreManager = createCoreManager(options)
const blobStore = new BlobStore({ coreManager })
return { blobStore, coreManager }
}

/**
*
* @param {import('../../src/core-manager/index.js').CoreManager} cm1
Expand Down
1 change: 1 addition & 0 deletions tests/helpers/core-manager.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// @ts-nocheck
import { CoreManager } from '../../src/core-manager/index.js'
import Sqlite from 'better-sqlite3'
import { randomBytes } from 'crypto'
Expand Down
10 changes: 10 additions & 0 deletions tests/helpers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,13 @@ export async function waitForIndexing(stores) {
})
)
}

export async function timeoutException(promise, timeout = 100) {
const timer = new Promise((resolve) => {
setTimeout(() => {
resolve('timeout')
}, timeout)
})

return (await Promise.race([promise, timer])) === 'timeout'
}

0 comments on commit 3b0db39

Please sign in to comment.