Skip to content

Commit

Permalink
fix!: storefront content store rename and separation for test (storac…
Browse files Browse the repository at this point in the history
…ha#1409)

With the aim to facilitate
storacha#1349 storefront `dataStore`
is renamed to `contentStore` (note alignment of name with the `content`
property in Filecoin Pipeline capabilities/message namings). My goal is
to make `contentStore.stream()` perform HTTP Request to read thing from
Roundabout (if `filecoin/offer` comes with `content` CARCID code, it
will try to get a CAR, if `content` comes as RAW, it will try to get
RAW).

The main point in this change is to not require `contentStore` to have a
`put` method, like `dataStore` was requiring just for testing. It
extracts that requirement only to run tests, so that we can implement a
`contentStore` as something with just a `stream` function that behind
the scenes can perform a HTTP Request to Roundabout instead of being an
abstraction on top of a S3 bucket like today.

Note that this is not strictly needed, we could just use this as is and
in `dataStore` implementation and make `put`
https://github.com/w3s-project/w3infra/blob/main/filecoin/store/data.js#L39
`throw new Error('not implemented')` and just use an extended
`DataStore` class for w3infra testing. But I feel this way is nicer, and
also like more new naming as it aligns with everything including content
name defined in w3index
https://github.com/w3s-project/specs/blob/main/w3-index.md#sharded-dag-index-example

BREAKING CHANGE: dataStore in storefront renamed to contentStore
  • Loading branch information
vasco-santos authored Apr 26, 2024
1 parent 1b71b89 commit 05e5db3
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 29 deletions.
13 changes: 10 additions & 3 deletions packages/filecoin-api/src/storefront/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type {
DID,
Proof,
ConnectionView,
Result,
} from '@ucanto/interface'
import { PieceLink } from '@web3-storage/data-segment'
import {
Expand All @@ -18,9 +19,9 @@ import {
import {
Store,
UpdatableAndQueryableStore,
StreammableStore,
Queue,
ServiceConfig,
StoreGetError
} from '../types.js'

export type PieceStore = UpdatableAndQueryableStore<
Expand All @@ -30,7 +31,6 @@ export type PieceStore = UpdatableAndQueryableStore<
>
export type FilecoinSubmitQueue = Queue<FilecoinSubmitMessage>
export type PieceOfferQueue = Queue<PieceOfferMessage>
export type DataStore = StreammableStore<UnknownLink, Uint8Array>
export type TaskStore = Store<UnknownLink, Invocation>
export type ReceiptStore = Store<UnknownLink, Receipt>

Expand Down Expand Up @@ -71,7 +71,7 @@ export interface ServiceContext {

export interface FilecoinSubmitMessageContext
extends Pick<ServiceContext, 'pieceStore'> {
dataStore: DataStore
contentStore: ContentStore<UnknownLink, Uint8Array>
}

export interface PieceOfferMessageContext {
Expand Down Expand Up @@ -189,3 +189,10 @@ export interface PieceOfferMessage {
export interface DataAggregationProofNotFound extends Failure {
name: 'DataAggregationProofNotFound'
}

export interface ContentStore<RecKey, Rec> {
/**
* Gets a record from the store.
*/
stream: (key: RecKey) => Promise<Result<ReadableStream<Rec>, StoreGetError>>
}
2 changes: 1 addition & 1 deletion packages/filecoin-api/src/storefront/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export const handleFilecoinSubmitMessage = async (context, message) => {

// read and compute piece for content
// TODO: needs to be hooked with location claims
const contentStreamRes = await context.dataStore.stream(message.content)
const contentStreamRes = await context.contentStore.stream(message.content)
if (contentStreamRes.error) {
return { error: new BlobNotFound(contentStreamRes.error.message) }
}
Expand Down
3 changes: 2 additions & 1 deletion packages/filecoin-api/src/storefront/piece.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import { ComputePieceFailed } from '../errors.js'
/**
* Compute PieceCid for provided async iterable.
*
* @param {AsyncIterable<Uint8Array>} stream
* @param {ReadableStream<Uint8Array>} stream
*/
export async function computePieceCid(stream) {
/** @type {import('../types.js').PieceLink} */
let piece
try {
const hasher = Hasher.create()
// @ts-ignore Readable stream is Aync Iterator
for await (const chunk of stream) {
hasher.write(chunk)
}
Expand Down
4 changes: 2 additions & 2 deletions packages/filecoin-api/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ export interface UpdatableStore<RecKey, Rec> extends Store<RecKey, Rec> {
) => Promise<Result<Rec, StoreGetError>>
}

export interface StreammableStore<RecKey, Rec> {
export interface ReadableStreamStore<RecKey, Rec> {
/**
* Puts a record in the store.
*/
put: (record: Rec) => Promise<Result<Unit, StorePutError>>
/**
* Gets a record from the store.
*/
stream: (key: RecKey) => Promise<Result<AsyncIterable<Rec>, StoreGetError>>
stream: (key: RecKey) => Promise<Result<ReadableStream<Rec>, StoreGetError>>
}

export interface QueryableStore<RecKey, Rec, Query> extends Store<RecKey, Rec> {
Expand Down
23 changes: 11 additions & 12 deletions packages/filecoin-api/test/context/store-implementations.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { UpdatableStore, StreammableStore } from './store.js'
import { UpdatableStore, ReadableStreamStore } from './store.js'

/**
* @typedef {import('@ucanto/interface').Link} Link
Expand All @@ -19,7 +19,7 @@ import { UpdatableStore, StreammableStore } from './store.js'
*/
export const getStoreImplementations = (
StoreImplementation = UpdatableStore,
StreammableStoreImplementation = StreammableStore
ReadableStreamStoreImplementation = ReadableStreamStore
) => ({
storefront: {
pieceStore: new StoreImplementation({
Expand Down Expand Up @@ -77,7 +77,7 @@ export const getStoreImplementations = (
return Array.from(items).find((i) => i.ran.link().equals(record))
},
}),
dataStore: new StreammableStore({
contentStore: new ReadableStreamStore({
streamFn: (
/** @type {Set<Uint8Array>} */ items,
/** @type {import('@ucanto/interface').UnknownLink} */ record
Expand All @@ -86,15 +86,14 @@ export const getStoreImplementations = (
if (!item) {
return undefined
}
const asyncIterableRes = {
[Symbol.asyncIterator]: async function* () {
// Yield the Uint8Array asynchronously
if (item) {
yield item
}
},
}
return asyncIterableRes
return new ReadableStream({
start(controller) {
// Push the data into the stream
controller.enqueue(item)
// Close the stream
controller.close()
}
})
},
}),
},
Expand Down
8 changes: 4 additions & 4 deletions packages/filecoin-api/test/context/store.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ export class Store {
/**
* @template K
* @template V
* @implements {API.StreammableStore<K,V>}
* @implements {API.ReadableStreamStore<K,V>}
*/
export class StreammableStore {
export class ReadableStreamStore {
/**
* @param {import('./types.js').StreammableStoreOptions<K, V>} options
* @param {import('./types.js').ReadableStreamStoreOptions<K, V>} options
*/
constructor(options) {
/** @type {Set<V>} */
Expand All @@ -123,7 +123,7 @@ export class StreammableStore {

/**
* @param {K} item
* @returns {Promise<import('@ucanto/interface').Result<AsyncIterable<V>, StoreGetError>>}
* @returns {Promise<import('@ucanto/interface').Result<ReadableStream<V>, StoreGetError>>}
*/
async stream(item) {
if (!this.streamFn) {
Expand Down
4 changes: 2 additions & 2 deletions packages/filecoin-api/test/context/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ export interface UpdatableStoreOptions<K, V> extends StoreOptions<K, V> {
updateFn?: (items: Set<V>, key: K, item: Partial<V>) => V
}

export interface StreammableStoreOptions<K, V> extends StoreOptions<K, V> {
streamFn?: (items: Set<V>, item: K) => AsyncIterable<V> | undefined
export interface ReadableStreamStoreOptions<K, V> extends StoreOptions<K, V> {
streamFn?: (items: Set<V>, item: K) => ReadableStream<V> | undefined
}
2 changes: 1 addition & 1 deletion packages/filecoin-api/test/events/storefront.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export const test = {
}

// Store bytes on datastore
await context.dataStore.put(cargo.bytes)
await context.testContentStore.put(cargo.bytes)

// Handle message
const handledMessageRes =
Expand Down
5 changes: 3 additions & 2 deletions packages/filecoin-api/test/storefront.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ describe('storefront', () => {

// context
const {
storefront: { pieceStore, taskStore, receiptStore, dataStore },
storefront: { pieceStore, taskStore, receiptStore, contentStore },
} = getStoreImplementations()

await test(
Expand All @@ -118,7 +118,8 @@ describe('storefront', () => {
pieceStore,
receiptStore,
taskStore,
dataStore,
contentStore,
testContentStore: contentStore,
storefrontService: {
connection: storefrontConnection,
invocationConfig: {
Expand Down
16 changes: 15 additions & 1 deletion packages/filecoin-api/test/types.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import type { Signer } from '@ucanto/interface'
import type {
Signer,
Result,
Unit,
UnknownLink
} from '@ucanto/interface'
import * as AggregatorInterface from '../src/aggregator/api.js'
import * as DealerInterface from '../src/dealer/api.js'
import * as StorefrontInterface from '../src/storefront/api.js'
import { StorePutError } from '../src/types.js'

export interface AggregatorTestEventsContext
extends AggregatorInterface.PieceMessageContext,
Expand Down Expand Up @@ -43,6 +49,7 @@ export interface StorefrontTestEventsContext
StorefrontInterface.CronContext {
id: Signer
aggregatorId: Signer
testContentStore: TestContentStore<UnknownLink, Uint8Array>
service: Partial<{
filecoin: Partial<import('../src/types.js').StorefrontService['filecoin']>
piece: Partial<import('../src/types.js').AggregatorService['piece']>
Expand All @@ -53,3 +60,10 @@ export interface StorefrontTestEventsContext
>
}>
}

export interface TestContentStore<RecKey, Rec> extends StorefrontInterface.ContentStore<RecKey, Rec> {
/**
* Puts a record in the store.
*/
put: (record: Rec) => Promise<Result<Unit, StorePutError>>
}

0 comments on commit 05e5db3

Please sign in to comment.