Skip to content

Commit

Permalink
Merge pull request #103 from autonomys/feat/cold-file-cache
Browse files Browse the repository at this point in the history
Cold pg-based download cache
  • Loading branch information
clostao authored Nov 19, 2024
2 parents fc8f033 + 98d8a46 commit 4d3d03c
Show file tree
Hide file tree
Showing 13 changed files with 353 additions and 54 deletions.
Binary file modified backend/.yarn/install-state.gz
Binary file not shown.
53 changes: 53 additions & 0 deletions backend/migrations/20241118155150-cold-file-cache.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
'use strict';

var dbm;
var type;
var seed;
var fs = require('fs');
var path = require('path');
var Promise;

/**
* We receive the dbmigrate dependency from dbmigrate initially.
* This enables us to not have to rely on NODE_PATH.
*/
exports.setup = function(options, seedLink) {
dbm = options.dbmigrate;
type = dbm.dataType;
seed = seedLink;
Promise = options.Promise;
};

exports.up = function(db) {
var filePath = path.join(__dirname, 'sqls', '20241118155150-cold-file-cache-up.sql');
return new Promise( function( resolve, reject ) {
fs.readFile(filePath, {encoding: 'utf-8'}, function(err,data){
if (err) return reject(err);
console.log('received data: ' + data);

resolve(data);
});
})
.then(function(data) {
return db.runSql(data);
});
};

exports.down = function(db) {
var filePath = path.join(__dirname, 'sqls', '20241118155150-cold-file-cache-down.sql');
return new Promise( function( resolve, reject ) {
fs.readFile(filePath, {encoding: 'utf-8'}, function(err,data){
if (err) return reject(err);
console.log('received data: ' + data);

resolve(data);
});
})
.then(function(data) {
return db.runSql(data);
});
};

exports._meta = {
"version": 1
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP TABLE download_cache.file_parts;
DROP TABLE download_cache.registry;
DROP SCHEMA download_cache;
14 changes: 14 additions & 0 deletions backend/migrations/sqls/20241118155150-cold-file-cache-up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE SCHEMA IF NOT EXISTS download_cache;

CREATE TABLE IF NOT EXISTS download_cache.file_parts (
"cid" TEXT,
"index" INTEGER NOT NULL,
"data" BYTEA NOT NULL,
PRIMARY KEY (cid, index)
);

CREATE TABLE IF NOT EXISTS download_cache.registry (
"cid" TEXT PRIMARY KEY,
"last_accessed_at" TIMESTAMP WITH TIME ZONE NOT NULL,
"size" BIGINT NOT NULL
);
50 changes: 50 additions & 0 deletions backend/src/repositories/cache/fileParts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { getDatabase } from '../../drivers/pg.js'

export interface CacheFilePart {
cid: string
index: number
data: Buffer
}

const addFilePart = async (filePart: CacheFilePart) => {
const db = await getDatabase()
await db.query({
text: 'INSERT INTO download_cache.file_parts (cid, index, data) VALUES ($1, $2, $3)',
values: [filePart.cid, filePart.index, filePart.data],
})
}

const getFilePartCount = async (cid: string) => {
const db = await getDatabase()
const result = await db.query<{ count: number }>({
text: 'SELECT COUNT(*) as count FROM download_cache.file_parts WHERE cid = $1',
values: [cid],
})

return result.rows[0].count
}

const getFilePart = async (cid: string, index: number) => {
const db = await getDatabase()
const result = await db.query<CacheFilePart>({
text: 'SELECT * FROM download_cache.file_parts WHERE cid = $1 AND index = $2',
values: [cid, index],
})

return result.rows[0]
}

const removeFileParts = async (cids: string[]) => {
const db = await getDatabase()
await db.query({
text: 'DELETE FROM download_cache.file_parts WHERE cid = ANY($1)',
values: [cids],
})
}

export const downloadCacheFilePartsRepository = {
addFilePart,
getFilePartCount,
getFilePart,
removeFileParts,
}
66 changes: 66 additions & 0 deletions backend/src/repositories/cache/registry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { getDatabase } from '../../drivers/pg.js'

interface RegistryEntry {
cid: string
last_accessed_at: Date
size: string
}

const toBigInt = (value: RegistryEntry) => ({
...value,
size: BigInt(value.size).valueOf(),
})

const addEntry = async (entry: RegistryEntry) => {
const db = await getDatabase()
await db
.query({
text: 'INSERT INTO download_cache.registry (cid, last_accessed_at, size) VALUES ($1, $2, $3)',
values: [entry.cid, entry.last_accessed_at, entry.size],
})
.then((e) => e.rows.map(toBigInt))
}

const removeEntries = async (cids: string[]) => {
const db = await getDatabase()
await db.query({
text: 'DELETE FROM download_cache.registry WHERE cid = ANY($1)',
values: [cids],
})
}

const getEntriesSortedByLastAccessedAt = async () => {
const db = await getDatabase()
const result = await db.query<RegistryEntry>({
text: 'SELECT * FROM download_cache.registry ORDER BY last_accessed_at ASC',
})

return result.rows.map(toBigInt)
}

const getTotalSize = async () => {
const db = await getDatabase()
return db
.query<{ size: string | null }>({
text: 'SELECT SUM(size) as size FROM download_cache.registry',
})
.then((result) => BigInt(result.rows[0].size ?? '0').valueOf())
}

const getEntry = async (cid: string) => {
const db = await getDatabase()
const result = await db.query<RegistryEntry>({
text: 'SELECT * FROM download_cache.registry WHERE cid = $1',
values: [cid],
})

return result.rows.map(toBigInt)[0]
}

export const registryRepository = {
addEntry,
removeEntries,
getEntriesSortedByLastAccessedAt,
getTotalSize,
getEntry,
}
15 changes: 0 additions & 15 deletions backend/src/repositories/objects/transactionResults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,6 @@ const getPendingUploads = async (limit: number = 100) => {
return result
}

const getPendingUploadsByHeadCid = async (headCid: string) => {
const db = await getDatabase()
const result = await db
.query<Node>({
text: `
select tr.* from transaction_results tr join nodes n on tr.cid = n.cid where n.head_cid = $1 and tr.transaction_result->>'blockNumber' is not null order by tr.transaction_result->>'blockNumber' asc
`,
values: [headCid],
})
.then(({ rows }) => rows)

return result
}

const getUploadedNodesByRootCid = async (rootCid: string) => {
const db = await getDatabase()
const result = await db
Expand Down Expand Up @@ -107,7 +93,6 @@ export const transactionResultsRepository = {
getTransactionResult,
getPendingUploads,
getHeadTransactionResults,
getPendingUploadsByHeadCid,
getUploadedNodesByRootCid,
getFirstNotArchivedNode,
}
92 changes: 92 additions & 0 deletions backend/src/services/download/databaseDownloadCache/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { AwaitIterable } from 'interface-store'
import { downloadCacheFilePartsRepository } from '../../../repositories/cache/fileParts.js'
import { registryRepository } from '../../../repositories/cache/registry.js'
import { asyncByChunk } from '../../../utils/async.js'

const DEFAULT_CHUNK_SIZE = 10 * 1024 ** 2 // 10MB
const DEFAULT_MAX_CACHE_SIZE = BigInt(10 * 1024 ** 3) // 10GB

const config = {
chunkSize: process.env.DATABASE_DOWNLOAD_CACHE_CHUNK_SIZE
? parseInt(process.env.DATABASE_DOWNLOAD_CACHE_CHUNK_SIZE)
: DEFAULT_CHUNK_SIZE,
maxCacheSize: process.env.DATABASE_DOWNLOAD_CACHE_MAX_SIZE
? BigInt(process.env.DATABASE_DOWNLOAD_CACHE_MAX_SIZE)
: DEFAULT_MAX_CACHE_SIZE,
}

const internalSet = async function* (
cid: string,
data: AwaitIterable<Buffer>,
size: bigint,
): AsyncIterable<Buffer> {
await registryRepository.addEntry({
cid,
size: size.toString(),
last_accessed_at: new Date(),
})

let i = 0
for await (const chunk of asyncByChunk(data, config.chunkSize)) {
await downloadCacheFilePartsRepository.addFilePart({
cid,
index: i,
data: chunk,
})
yield chunk
i++
}
}

const updateCacheSize = async (size: bigint) => {
let currentSize = await registryRepository.getTotalSize()
const newSize = currentSize + size
if (newSize > config.maxCacheSize) {
const entries = await registryRepository.getEntriesSortedByLastAccessedAt()
for (const entry of entries) {
if (currentSize <= config.maxCacheSize) {
break
}
await registryRepository.removeEntries([entry.cid])
currentSize -= BigInt(entry.size)
}
}
}

const set = async (
cid: string,
data: AwaitIterable<Buffer>,
size: bigint,
): Promise<AwaitIterable<Buffer>> => {
if (await has(cid)) {
return data
}

await updateCacheSize(size)
return internalSet(cid, data, size)
}

const get = async function* (cid: string): AsyncIterable<Buffer> {
const entry = await registryRepository.getEntry(cid)
if (!entry) {
return null
}

const fileParts = await downloadCacheFilePartsRepository.getFilePartCount(cid)
for (let i = 0; i < fileParts; i++) {
const filePart = await downloadCacheFilePartsRepository.getFilePart(cid, i)
yield filePart.data
}
}

const has = async (cid: string) => {
const entry = await registryRepository.getEntry(cid)

return !!entry
}

export const databaseDownloadCache = {
set,
get,
has,
}
33 changes: 33 additions & 0 deletions backend/src/services/download/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { FilesUseCases, ObjectUseCases } from '../../useCases/index.js'
import { databaseDownloadCache } from './databaseDownloadCache/index.js'
import { memoryDownloadCache } from './memoryDownloadCache/index.js'
import { AwaitIterable } from 'interface-store'

export const downloadService = {
download: async (cid: string): Promise<AwaitIterable<Buffer>> => {
if (memoryDownloadCache.has(cid)) {
console.log('Downloading file from memory', cid)
return memoryDownloadCache.get(cid)!
}

if (await databaseDownloadCache.has(cid)) {
console.log('Downloading file from database', cid)
let data = databaseDownloadCache.get(cid)!
data = memoryDownloadCache.set(cid, data)
return data
}

const metadata = await ObjectUseCases.getMetadata(cid)
if (!metadata) {
throw new Error('Not found')
}

let data = await FilesUseCases.retrieveObject(metadata)

data = await databaseDownloadCache.set(cid, data, metadata.totalSize)

data = memoryDownloadCache.set(cid, data)

return data
},
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { LRUCache } from 'lru-cache'
import { bufferToAsyncIterable } from '../../utils/async.js'
import { bufferToAsyncIterable } from '../../../utils/async.js'
import { AwaitIterable } from 'interface-store'

const ONE_GB = 1024 ** 3

Expand All @@ -23,7 +24,7 @@ const get = (cid: string) => {

const set = async function* (
cid: string,
value: AsyncIterable<Buffer>,
value: AwaitIterable<Buffer>,
): AsyncIterable<Buffer> {
let buffer = Buffer.alloc(0)
for await (const chunk of value) {
Expand All @@ -35,7 +36,7 @@ const set = async function* (
})
}

export const downloadCache = {
export const memoryDownloadCache = {
has,
get,
set,
Expand Down
Loading

0 comments on commit 4d3d03c

Please sign in to comment.