Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blobStore.createReadStream should not wait #243

Merged
merged 4 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions src/blob-server/fastify-plugin.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,13 @@ async function routes(fastify, options) {
throw e
}

const entry = await blobStore.entry(blobId, { wait: false })
let entry
try {
entry = await blobStore.entry(blobId, { wait: false })
} catch (e) {
reply.code(404)
throw e
}

if (!entry) {
reply.code(404)
Expand All @@ -87,7 +93,13 @@ async function routes(fastify, options) {

const { metadata } = entry.value

const blobStream = await blobStore.createEntryReadStream(driveId, entry)
let blobStream
try {
blobStream = await blobStore.createEntryReadStream(driveId, entry)
} catch (e) {
reply.code(404)
throw e
}

// Extract the 'mimeType' property of the metadata and use it for the response header if found
if (
Expand Down
31 changes: 27 additions & 4 deletions src/blob-store/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,21 +115,31 @@ export class BlobStore {

/**
* @param {BlobId} blobId
* @param {object} [options]
* @param {boolean} [options.wait=false] Set to `true` to wait for a blob to download, otherwise will throw if blob is not available locally
* @param {number} [options.timeout] Optional timeout to wait for a blob to download
*/
createReadStream({ type, variant, name, driveId }) {
createReadStream(
{ type, variant, name, driveId },
options = { wait: false }
) {
// TODO: Error thrown from this be an emit error on the returned stream?
const drive = this.#getDrive(driveId)
const path = makePath({ type, variant, name })
return drive.createReadStream(path)

// @ts-ignore - TODO: update @digidem/types to include wait/timeout options
return drive.createReadStream(path, options)
}

/**
* Optimization for creating the blobs read stream when you have
* previously read the entry from Hyperdrive using `drive.entry`
* @param {BlobId['driveId']} driveId Hyperdrive drive id
* @param {import('hyperdrive').HyperdriveEntry} entry Hyperdrive entry
* @param {object} [options]
* @param {boolean} [options.wait=false] Set to `true` to wait for a blob to download, otherwise will throw if blob is not available locally
*/
async createEntryReadStream(driveId, entry) {
async createEntryReadStream(driveId, entry, options = { wait: false }) {
const drive = this.#getDrive(driveId)
const blobs = await drive.getBlobs()

Expand All @@ -138,7 +148,7 @@ export class BlobStore {
'Hyperblobs instance not found for drive ' + driveId.slice(0, 7)
)

return blobs.createReadStream(entry.value.blob)
return blobs.createReadStream(entry.value.blob, options)
}

/**
Expand Down Expand Up @@ -204,6 +214,19 @@ export class BlobStore {
const entry = await drive.entry(path, options)
return entry
}

/**
* @param {BlobId} blobId
* @param {object} [options]
* @param {boolean} [options.diff=false] Enable to return an object with a `block` property with number of bytes removed
* @return {Promise<{ blocks: number } | null>}
*/
async clear({ type, variant, name, driveId }, options = {}) {
const path = makePath({ type, variant, name })
const drive = this.#getDrive(driveId)

return drive.clear(path, options)
}
}

/**
Expand Down
6 changes: 3 additions & 3 deletions tests/blob-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ test('get url from blobId', async (t) => {

t.is(
url,
`http://127.0.0.1:${
blobServer.server.address().port
}/${projectId}/${blobStore.writerDriveId}/${type}/${variant}/${name}`
`http://127.0.0.1:${blobServer.server.address().port}/${projectId}/${
blobStore.writerDriveId
}/${type}/${variant}/${name}`
)
t.teardown(async () => {
await blobServer.close()
Expand Down
54 changes: 52 additions & 2 deletions tests/blob-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ import test from 'brittle'
import { readdirSync } from 'fs'
import { readFile } from 'fs/promises'
import path from 'path'
import { BlobStore } from '../src/blob-store/index.js'
import { createCoreManager } from './helpers/core-manager.js'
import { createBlobServer } from '../src/blob-server/index.js'
import BlobServerPlugin from '../src/blob-server/fastify-plugin.js'
import fastify from 'fastify'

import { replicateBlobs, createBlobStore } from './helpers/blob-store.js'
import { replicateBlobs } from './helpers/blob-store.js'

test('Plugin throws error if missing getBlobStore option', async (t) => {
const server = fastify()
Expand Down Expand Up @@ -191,7 +193,7 @@ test('GET photo uses mime type from metadata if found', async (t) => {
test('GET photo returns 404 when trying to get non-replicated blob', async (t) => {
const { data, projectId, coreManager: cm1 } = await testenv()
const projectKey = Buffer.from(projectId, 'hex')
const { blobStore: bs2, coreManager: cm2 } = await createBlobStore({
const { blobStore: bs2, coreManager: cm2 } = createBlobStore({
projectKey,
})

Expand All @@ -215,6 +217,54 @@ test('GET photo returns 404 when trying to get non-replicated blob', async (t) =
t.is(res.statusCode, 404)
})

test('GET photo returns 404 when trying to get non-existent blob', async (t) => {
sethvincent marked this conversation as resolved.
Show resolved Hide resolved
const projectKey = randomBytes(32)
const projectId = projectKey.toString('hex')
const { blobStore } = createBlobStore({ projectKey })
const expected = await readFile(new URL(import.meta.url))

const blobId = /** @type {const} */ ({
type: 'photo',
variant: 'original',
name: 'test-file',
})

const server = createBlobServer({ blobStore, projectId })

// Test that the blob does not exist
{
const res = await server.inject({
method: 'GET',
url: buildRouteUrl({
...blobId,
projectId,
driveId: blobStore.writerDriveId,
}),
})

t.is(res.statusCode, 404)
}

const driveId = await blobStore.put(blobId, expected)
await blobStore.clear({ ...blobId, driveId: blobStore.writerDriveId })

// Test that the entry exists but blob does not
{
const res = await server.inject({
method: 'GET',
url: buildRouteUrl({ ...blobId, projectId, driveId }),
})

t.is(res.statusCode, 404)
}
})

function createBlobStore(opts) {
const coreManager = createCoreManager(opts)
const blobStore = new BlobStore({ coreManager })
return { blobStore, coreManager }
}

async function testenv({ prefix, logger } = {}) {
const projectKey = randomBytes(32)
const projectId = projectKey.toString('hex')
Expand Down
92 changes: 92 additions & 0 deletions tests/blob-store/blob-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,75 @@ test('blobStore.createWriteStream(blobId) and blobStore.createReadStream(blobId)
t.alike(bndlbuf, diskbuf, 'should be equal')
})

test('blobStore.createReadStream should not wait', async (t) => {
const { blobStore } = await testenv()
const expected = await readFile(new URL(import.meta.url))

const blobId = /** @type {const} */ ({
type: 'photo',
variant: 'original',
name: 'test-file',
})

try {
const result = blobStore.createReadStream({
...blobId,
driveId: blobStore.writerDriveId,
})
await concat(result)
} catch (error) {
t.is(error.message, 'Blob does not exist')
}

const { blobStore: blobStore2 } = await testenv()

const ws = blobStore.createWriteStream(blobId)
await pipeline(fs.createReadStream(new URL(import.meta.url)), ws)

{
const stream = blobStore.createReadStream({
...blobId,
driveId: blobStore.writerDriveId,
})
const blob = await concat(stream)
t.alike(blob, expected, 'should be equal')
}

try {
const stream = blobStore2.createReadStream({
...blobId,
driveId: blobStore2.writerDriveId,
})
await concat(stream)
} catch (error) {
t.is(error.message, 'Blob does not exist')
}

const ws2 = blobStore2.createWriteStream(blobId)
await pipeline(fs.createReadStream(new URL(import.meta.url)), ws2)

{
const stream = blobStore2.createReadStream({
...blobId,
driveId: blobStore2.writerDriveId,
})
const blob = await concat(stream)
t.alike(blob, expected, 'should be equal')

await blobStore2.clear({ ...blobId, driveId: blobStore2.writerDriveId })

try {
const stream = blobStore2.createReadStream({
...blobId,
driveId: blobStore2.writerDriveId,
})
await concat(stream)
} catch (error) {
t.is(error.message, 'Block not available')
}
}
})

test('blobStore.writerDriveId', async (t) => {
{
const { blobStore } = await testenv()
Expand Down Expand Up @@ -361,6 +430,29 @@ test('blobStore.getEntryReadStream(driveId, entry)', async (t) => {
t.alike(buf, diskbuf, 'should be equal')
})

test('blobStore.getEntryReadStream(driveId, entry) should not wait', async (t) => {
const { blobStore } = await testenv()

const expected = await readFile(new URL(import.meta.url))

const blobId = /** @type {const} */ ({
type: 'photo',
variant: 'original',
name: 'test-file',
})

const driveId = await blobStore.put(blobId, expected)
const entry = await blobStore.entry({ ...blobId, driveId })
await blobStore.clear({ ...blobId, driveId: blobStore.writerDriveId })

try {
const stream = await blobStore.createEntryReadStream(driveId, entry)
await concat(stream)
} catch (error) {
t.is(error.message, 'Block not available', 'Block not available')
}
})

async function testenv(opts) {
const coreManager = createCoreManager(opts)
const blobStore = new BlobStore({ coreManager })
Expand Down
4 changes: 4 additions & 0 deletions tests/helpers/blob-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ export function replicateBlobs(cm1, cm2) {
}
}

/**
* @param {*} rs
* @returns {Promise<Buffer>}
*/
export async function concat(rs) {
let buf = null
await pipeline(
Expand Down
4 changes: 4 additions & 0 deletions types/hyperdrive.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ declare module 'hyperdrive' {
readdir(folder: string): Readable
mirror(): any
batch(): any
clear(
path: string,
opts?: { diff?: boolean }
): Promise<{ blocks: number } | null>
}

export = Hyperdrive
Expand Down