Skip to content

Commit

Permalink
Merge branch 'main' into feat/http2_remaining_bits
Browse files Browse the repository at this point in the history
  • Loading branch information
metcoder95 authored Nov 25, 2024
2 parents 4e75f55 + 9b8abb8 commit 640860a
Show file tree
Hide file tree
Showing 14 changed files with 311 additions and 169 deletions.
13 changes: 12 additions & 1 deletion lib/cache/memory-cache-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ class MemoryCacheStore {
this.#maxCount = opts.maxCount
}

if (opts.maxSize !== undefined) {
if (
typeof opts.maxSize !== 'number' ||
!Number.isInteger(opts.maxSize) ||
opts.maxSize < 0
) {
throw new TypeError('MemoryCacheStore options.maxSize must be a non-negative integer')
}
this.#maxSize = opts.maxSize
}

if (opts.maxEntrySize !== undefined) {
if (
typeof opts.maxEntrySize !== 'number' ||
Expand Down Expand Up @@ -126,7 +137,7 @@ class MemoryCacheStore {
store.#size += entry.size
store.#count += 1

if (store.#size > store.#maxEntrySize || store.#count > store.#maxCount) {
if (store.#size > store.#maxSize || store.#count > store.#maxCount) {
for (const [key, entries] of store.#entries) {
for (const entry of entries.splice(0, entries.length / 2)) {
store.#size -= entry.size
Expand Down
165 changes: 86 additions & 79 deletions lib/cache/sqlite-cache-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ const { DatabaseSync } = require('node:sqlite')
const { Writable } = require('stream')
const { assertCacheKey, assertCacheValue } = require('../util/cache.js')

const VERSION = 1

/**
* @typedef {import('../../types/cache-interceptor.d.ts').default.CacheStore} CacheStore
* @implements {CacheStore}
Expand Down Expand Up @@ -94,7 +96,7 @@ class SqliteCacheStore {
this.#db = new DatabaseSync(opts?.location ?? ':memory:')

this.#db.exec(`
CREATE TABLE IF NOT EXISTS cacheInterceptorV1 (
CREATE TABLE IF NOT EXISTS cacheInterceptorV${VERSION} (
-- Data specific to us
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT NOT NULL,
Expand All @@ -112,9 +114,9 @@ class SqliteCacheStore {
staleAt INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_cacheInterceptorV1_url ON cacheInterceptorV1(url);
CREATE INDEX IF NOT EXISTS idx_cacheInterceptorV1_method ON cacheInterceptorV1(method);
CREATE INDEX IF NOT EXISTS idx_cacheInterceptorV1_deleteAt ON cacheInterceptorV1(deleteAt);
CREATE INDEX IF NOT EXISTS idx_cacheInterceptorV${VERSION}_url ON cacheInterceptorV${VERSION}(url);
CREATE INDEX IF NOT EXISTS idx_cacheInterceptorV${VERSION}_method ON cacheInterceptorV${VERSION}(method);
CREATE INDEX IF NOT EXISTS idx_cacheInterceptorV${VERSION}_deleteAt ON cacheInterceptorV${VERSION}(deleteAt);
`)

this.#getValuesQuery = this.#db.prepare(`
Expand All @@ -129,7 +131,7 @@ class SqliteCacheStore {
vary,
cachedAt,
staleAt
FROM cacheInterceptorV1
FROM cacheInterceptorV${VERSION}
WHERE
url = ?
AND method = ?
Expand All @@ -138,7 +140,7 @@ class SqliteCacheStore {
`)

this.#updateValueQuery = this.#db.prepare(`
UPDATE cacheInterceptorV1 SET
UPDATE cacheInterceptorV${VERSION} SET
body = ?,
deleteAt = ?,
statusCode = ?,
Expand All @@ -153,7 +155,7 @@ class SqliteCacheStore {
`)

this.#insertValueQuery = this.#db.prepare(`
INSERT INTO cacheInterceptorV1 (
INSERT INTO cacheInterceptorV${VERSION} (
url,
method,
body,
Expand All @@ -169,32 +171,30 @@ class SqliteCacheStore {
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`)

this.#deleteExpiredValuesQuery = this.#db.prepare(
'DELETE FROM cacheInterceptorV1 WHERE deleteAt <= ?'
)

this.#deleteByUrlQuery = this.#db.prepare(
'DELETE FROM cacheInterceptorV1 WHERE url = ?'
`DELETE FROM cacheInterceptorV${VERSION} WHERE url = ?`
)

this.#countEntriesQuery = this.#db.prepare(
'SELECT COUNT(*) AS total FROM cacheInterceptorV1'
`SELECT COUNT(*) AS total FROM cacheInterceptorV${VERSION}`
)

const pruneLimit = this.#maxCount === Infinity
? 20
: Math.max(Math.floor(this.#maxCount * 0.1), 1)

this.#deleteOldValuesQuery = this.#db.prepare(`
DELETE FROM cacheInterceptorV1
WHERE id IN (
SELECT
id
FROM cacheInterceptorV1
ORDER BY cachedAt DESC
LIMIT ${pruneLimit}
)
`)
this.#deleteExpiredValuesQuery = this.#db.prepare(
`DELETE FROM cacheInterceptorV${VERSION} WHERE deleteAt <= ?`
)

this.#deleteOldValuesQuery = this.#maxCount === Infinity
? null
: this.#db.prepare(`
DELETE FROM cacheInterceptorV${VERSION}
WHERE id IN (
SELECT
id
FROM cacheInterceptorV${VERSION}
ORDER BY cachedAt DESC
LIMIT ?
)
`)
}

close () {
Expand Down Expand Up @@ -241,50 +241,34 @@ class SqliteCacheStore {
assertCacheValue(value)

const url = this.#makeValueUrl(key)
let currentSize = 0
let size = 0
/**
* @type {Buffer[] | null}
*/
let body = key.method !== 'HEAD' ? [] : null
const maxEntrySize = this.#maxEntrySize
const findValue = this.#findValue.bind(this)
const updateValueQuery = this.#updateValueQuery
const insertValueQuery = this.#insertValueQuery
const body = []
const store = this

this.prune()

const writable = new Writable({
return new Writable({
write (chunk, encoding, callback) {
if (typeof chunk === 'string') {
chunk = Buffer.from(chunk, encoding)
}

currentSize += chunk.byteLength

if (body) {
if (currentSize >= maxEntrySize) {
body = null
this.end()
return callback()
}
size += chunk.byteLength

if (size < store.#maxEntrySize) {
body.push(chunk)
} else {
this.destroy()
}

callback()
},
final (callback) {
if (body === null) {
return callback()
}

/**
* @type {SqliteStoreValue | undefined}
*/
const existingValue = findValue(key, true)
const existingValue = store.#findValue(key, true)
if (existingValue) {
// Updating an existing response, let's delete it
updateValueQuery.run(
// Updating an existing response, let's overwrite it
store.#updateValueQuery.run(
JSON.stringify(stringifyBufferArray(body)),
value.deleteAt,
value.statusCode,
Expand All @@ -297,8 +281,9 @@ class SqliteCacheStore {
existingValue.id
)
} else {
store.#prune()
// New response, let's insert it
insertValueQuery.run(
store.#insertValueQuery.run(
url,
key.method,
JSON.stringify(stringifyBufferArray(body)),
Expand All @@ -317,8 +302,6 @@ class SqliteCacheStore {
callback()
}
})

return writable
}

/**
Expand All @@ -332,29 +315,33 @@ class SqliteCacheStore {
this.#deleteByUrlQuery.run(this.#makeValueUrl(key))
}

/**
* This method is called to prune the cache when it exceeds the maximum number
* of entries. It removes half the entries in the cache, ordering them the oldest.
*
* @returns {Number} The number of entries removed
*/
prune () {
const total = this.size
#prune () {
if (this.#size <= this.#maxCount) {
return 0
}

if (total <= this.#maxCount) {
return
{
const removed = this.#deleteExpiredValuesQuery.run(Date.now()).changes
if (removed > 0) {
return removed
}
}

const res = this.#deleteOldValuesQuery.run()
{
const removed = this.#deleteOldValuesQuery.run(Math.max(Math.floor(this.#maxCount * 0.1), 1)).changes
if (removed > 0) {
return removed
}
}

return res.changes
return 0
}

/**
* Counts the number of rows in the cache
* @returns {Number}
*/
get size () {
get #size () {
const { total } = this.#countEntriesQuery.get()
return total
}
Expand All @@ -374,37 +361,34 @@ class SqliteCacheStore {
*/
#findValue (key, canBeExpired = false) {
const url = this.#makeValueUrl(key)
const { headers, method } = key

/**
* @type {SqliteStoreValue[]}
*/
const values = this.#getValuesQuery.all(url, key.method)
const values = this.#getValuesQuery.all(url, method)

if (values.length === 0) {
// No responses, let's just return early
return undefined
}

const now = Date.now()
for (const value of values) {
if (now >= value.deleteAt && !canBeExpired) {
this.#deleteExpiredValuesQuery.run(now)
return undefined
}

let matches = true

if (value.vary) {
if (!key.headers) {
// Request doesn't have headers so it can't fulfill the vary
// requirements no matter what, let's return early
if (!headers) {
return undefined
}

value.vary = JSON.parse(value.vary)
const vary = JSON.parse(value.vary)

for (const header in value.vary) {
if (key.headers[header] !== value.vary[header]) {
for (const header in vary) {
if (headerValueEquals(headers[header], vary[header])) {
matches = false
break
}
Expand All @@ -420,6 +404,29 @@ class SqliteCacheStore {
}
}

/**
* @param {string|string[]|null|undefined} lhs
* @param {string|string[]|null|undefined} rhs
* @returns {boolean}
*/
function headerValueEquals (lhs, rhs) {
if (Array.isArray(lhs) && Array.isArray(rhs)) {
if (lhs.length !== rhs.length) {
return false
}

for (let i = 0; i < lhs.length; i++) {
if (rhs.includes(lhs[i])) {
return false
}
}

return true
}

return lhs === rhs
}

/**
* @param {Buffer[]} buffers
* @returns {string[]}
Expand Down
11 changes: 6 additions & 5 deletions lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,12 @@ function onHttp2SessionGoAway (errorCode) {
util.destroy(this[kSocket], err)

// Fail head of pipeline.
const request = client[kQueue][client[kRunningIdx]]
client[kQueue][client[kRunningIdx]++] = null
util.errorRequest(client, request, err)

client[kPendingIdx] = client[kRunningIdx]
if (client[kRunningIdx] < client[kQueue].length) {
const request = client[kQueue][client[kRunningIdx]]
client[kQueue][client[kRunningIdx]++] = null
util.errorRequest(client, request, err)
client[kPendingIdx] = client[kRunningIdx]
}

assert(client[kRunning] === 0)

Expand Down
Loading

0 comments on commit 640860a

Please sign in to comment.