Skip to content

Commit

Permalink
(core) Add flexibility to daily API usage limit
Browse files Browse the repository at this point in the history
Summary: Allow exceeding the daily API usage limit for a doc based on additional allocations for the current hour and minute. See the doc comment on getDocApiUsageKeysToIncr for details. This means that up to 5 redis keys may be relevant at a time for a single document.

Test Plan: Updated and expanded 'Daily API Limit' tests.

Reviewers: dsagal

Reviewed By: dsagal

Differential Revision: https://phab.getgrist.com/D3368
  • Loading branch information
alexmojaki committed Apr 28, 2022
1 parent 4de5928 commit 0beb289
Show file tree
Hide file tree
Showing 10 changed files with 351 additions and 111 deletions.
13 changes: 10 additions & 3 deletions app/gen-server/entity/Product.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ export const teamFreeFeatures: Features = {
gracePeriodDays: 14,
};

export const testDailyApiLimitFeatures = {
...teamFreeFeatures,
baseMaxApiUnitsPerDocumentPerDay: 3,
};

/**
* A summary of features used in unrestricted grandfathered accounts, and also
* in some test settings.
Expand Down Expand Up @@ -87,7 +92,7 @@ export interface IProduct {
* TODO: change capitalization of name of grandfather product.
*
*/
const PRODUCTS: IProduct[] = [
export const PRODUCTS: IProduct[] = [
// This is a product for grandfathered accounts/orgs.
{
name: 'Free',
Expand Down Expand Up @@ -166,7 +171,9 @@ export class Product extends BaseEntity {
* If `apply` is set, the products are changed in the db, otherwise
* the are left unchanged. A summary of affected products is returned.
*/
export async function synchronizeProducts(connection: Connection, apply: boolean): Promise<string[]> {
export async function synchronizeProducts(
connection: Connection, apply: boolean, products = PRODUCTS
): Promise<string[]> {
try {
await connection.query('select name, features, stripe_product_id from products limit 1');
} catch (e) {
Expand All @@ -175,7 +182,7 @@ export async function synchronizeProducts(connection: Connection, apply: boolean
}
const changingProducts: string[] = [];
await connection.transaction(async transaction => {
const desiredProducts = new Map(PRODUCTS.map(p => [p.name, p]));
const desiredProducts = new Map(products.map(p => [p.name, p]));
const existingProducts = new Map((await transaction.find(Product))
.map(p => [p.name, p]));
for (const product of desiredProducts.values()) {
Expand Down
16 changes: 4 additions & 12 deletions app/gen-server/lib/DocWorkerMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ class DummyDocWorkerMap implements IDocWorkerMap {
return null;
}

public incrementDocApiUsage(key: string): Promise<number> {
return Promise.resolve(0);
public getRedisClient(): RedisClient {
throw new Error("No redis client here");
}
}

Expand Down Expand Up @@ -517,16 +517,8 @@ export class DocWorkerMap implements IDocWorkerMap {
return this._client.getAsync(`doc-${docId}-group`);
}

/**
* Increment the value at the given redis key representing API usage of one document in one day.
* Expire the key after a day just so that it cleans itself up.
* Returns the value after incrementing.
* This is not related to other responsibilities of this class,
* but this class conveniently manages the redis client.
*/
public async incrementDocApiUsage(key: string): Promise<number | null> {
const result = await this._client.multi().incr(key).expire(key, 24 * 60 * 60).execAsync();
return Number(result?.[0]);
public getRedisClient(): RedisClient {
return this._client;
}

/**
Expand Down
178 changes: 132 additions & 46 deletions app/server/lib/DocApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import * as contentDisposition from 'content-disposition';
import {Application, NextFunction, Request, RequestHandler, Response} from "express";
import * as _ from "lodash";
import * as LRUCache from 'lru-cache';
import * as moment from 'moment';
import fetch from 'node-fetch';
import * as path from 'path';
import * as t from "ts-interface-checker";
Expand All @@ -72,6 +73,12 @@ import * as uuidv4 from "uuid/v4";
// reply with status 429.
const MAX_PARALLEL_REQUESTS_PER_DOC = 10;

// This is NOT the number of docs that can be handled at a time.
// It's a very generous upper bound of what that number might be.
// If there are more docs than this for which API requests are being regularly made at any moment,
// then the _dailyUsage cache may become unreliable and users may be able to exceed their allocated requests.
const MAX_ACTIVE_DOCS_USAGE_CACHE = 1000;

type WithDocHandler = (activeDoc: ActiveDoc, req: RequestWithLogin, resp: Response) => Promise<void>;

// Schema validators for api endpoints that creates or updates records.
Expand Down Expand Up @@ -99,6 +106,14 @@ function validate(checker: Checker): RequestHandler {
}

export class DocWorkerApi {
// Map from docId to number of requests currently being handled for that doc
private _currentUsage = new Map<string, number>();

// Map from (docId, time period) combination produced by docPeriodicApiUsageKey
// to number of requests previously served for that combination.
// We multiply by 5 because there are 5 relevant keys per doc at any time (current/next day/hour and current minute).
private _dailyUsage = new LRUCache<string, number>({max: 5 * MAX_ACTIVE_DOCS_USAGE_CACHE});

constructor(private _app: Application, private _docWorker: DocWorker,
private _docWorkerMap: IDocWorkerMap, private _docManager: DocManager,
private _dbManager: HomeDBManager, private _grist: GristServer) {}
Expand Down Expand Up @@ -771,32 +786,30 @@ export class DocWorkerApi {
private _apiThrottle(callback: (req: RequestWithLogin,
resp: Response,
next: NextFunction) => void | Promise<void>): RequestHandler {
const usage = new Map<string, number>();
const dailyUsage = new LRUCache<string, number>({max: 1024});
return async (req, res, next) => {
const docId = getDocId(req);
try {
const count = usage.get(docId) || 0;
usage.set(docId, count + 1);
const count = this._currentUsage.get(docId) || 0;
this._currentUsage.set(docId, count + 1);
if (count + 1 > MAX_PARALLEL_REQUESTS_PER_DOC) {
throw new ApiError(`Too many backlogged requests for document ${docId} - ` +
`try again later?`, 429);
}

if (await this._checkDailyDocApiUsage(req, docId, dailyUsage)) {
if (await this._checkDailyDocApiUsage(req, docId)) {
throw new ApiError(`Exceeded daily limit for document ${docId}`, 429);
}

await callback(req as RequestWithLogin, res, next);
} catch (err) {
next(err);
} finally {
const count = usage.get(docId);
const count = this._currentUsage.get(docId);
if (count) {
if (count === 1) {
usage.delete(docId);
this._currentUsage.delete(docId);
} else {
usage.set(docId, count - 1);
this._currentUsage.set(docId, count - 1);
}
}
}
Expand All @@ -805,57 +818,66 @@ export class DocWorkerApi {

/**
* Usually returns true if too many requests (based on the user's product plan)
* have been made today for this document.
* have been made today for this document and the request should be rejected.
* Access to a document must already have been authorized.
* This is called frequently so it uses caches to check quickly in the common case,
* which allows a few ways for users to exceed the limit slightly if the timing works out,
* but these should be acceptable.
*/
private async _checkDailyDocApiUsage(req: Request, docId: string, dailyUsage: LRUCache<string, number>) {
// Start with the possibly stale cached doc to avoid a database call.
// This leaves a small window for the user to bypass this limit after downgrading.
let doc = (req as RequestWithLogin).docAuth!.cachedDoc!;

function getMax() {
return doc.workspace.org.billingAccount?.product.features.baseMaxApiUnitsPerDocumentPerDay;
}
private async _checkDailyDocApiUsage(req: Request, docId: string): Promise<boolean> {
// Use the cached doc to avoid a database call.
// This leaves a small window (currently 5 seconds) for the user to bypass this limit after downgrading,
// or to be wrongly rejected after upgrading.
const doc = (req as RequestWithLogin).docAuth!.cachedDoc!;

let max = getMax();
const max = doc.workspace.org.billingAccount?.product.features.baseMaxApiUnitsPerDocumentPerDay;
if (!max) {
// This doc has no associated product (happens to new unsaved docs)
// or the product has no API limit.
return;
// or the product has no API limit. Allow the request through.
return false;
}

// Get the current count from the dailyUsage cache rather than waiting for redis.
// The cache will not have a count if this is the first request for this document served by this worker process
// or if so many other documents have been served since then that this key was evicted from the LRU cache.
// Check the counts in the dailyUsage cache rather than waiting for redis.
// The cache will not have counts if this is the first request for this document served by this worker process
// or if so many other documents have been served since then that the keys were evicted from the LRU cache.
// Both scenarios are temporary and unlikely when usage has been exceeded.
const key = docDailyApiUsageKey(docId);
const count = dailyUsage.get(key);

if (count && count >= max) {
// The limit has apparently been exceeded.
// In case the user just upgraded, get a fresh Document entity from the DB and check again.
doc = await this._dbManager.getDoc(getDocScope(req));
max = getMax();
if (max && count >= max) {
return true;
}
// Note that if the limits are exceeded then `keys` below will be undefined,
// otherwise it will be an array of three keys corresponding to a day, hour, and minute.
const m = moment.utc();
const keys = getDocApiUsageKeysToIncr(docId, this._dailyUsage, max, m);
if (!keys) {
// The limit has been exceeded, reject the request.
return true;
}

// Note the increased API usage on redis and in our local cache.
// Do this in the background so that the rest of the request can continue without waiting for redis.
// If the user makes many concurrent requests quickly,
// a few extra might slip through before we see the count exceeding the limit, but this is basically unavoidable.
this._docWorkerMap.incrementDocApiUsage(key).then(newCount => {
if (newCount) {
// Update redis in the background so that the rest of the request can continue without waiting for redis.
const multi = this._docWorkerMap.getRedisClient().multi();
for (let i = 0; i < keys.length; i++) {
const key = keys[i];
// Incrementing the local count immediately prevents many requests from being squeezed through every minute
// before counts are received from redis.
// But this cache is not 100% reliable and the count from redis may be higher.
this._dailyUsage.set(key, (this._dailyUsage.get(key) ?? 0) + 1);
const period = docApiUsagePeriods[i];
// Expire the key just so that it cleans itself up and saves memory on redis.
// Expire after two periods to handle 'next' buckets.
const expiry = 2 * 24 * 60 * 60 / period.periodsPerDay;
multi.incr(key).expire(key, expiry);
}
multi.execAsync().then(result => {
for (let i = 0; i < keys.length; i++) {
const key = keys[i];
const newCount = Number(result![i * 2]); // incrs are at even positions, expires at odd positions
// Theoretically this could be overwritten by a lower count that was requested earlier
// but somehow arrived after.
// This doesn't really matter, and the count on redis will still increase reliably.
dailyUsage.set(key, newCount);
this._dailyUsage.set(key, newCount);
}
}).catch(e => console.error(`Error tracking API usage for doc ${docId}`, e));

// Allow the request through.
return false;
}

private async _assertAccess(role: 'viewers'|'editors'|'owners'|null, allowRemoved: boolean,
Expand Down Expand Up @@ -1140,14 +1162,78 @@ async function handleSandboxError<T>(tableId: string, colNames: string[], p: Pro
return handleSandboxErrorOnPlatform(tableId, colNames, p, getErrorPlatform(tableId));
}

export interface DocApiUsagePeriod {
unit: 'day' | 'hour' | 'minute',
format: string;
periodsPerDay: number;
}

export const docApiUsagePeriods: DocApiUsagePeriod[] = [
{
unit: 'day',
format: 'YYYY-MM-DD',
periodsPerDay: 1,
},
{
unit: 'hour',
format: 'YYYY-MM-DDTHH',
periodsPerDay: 24,
},
{
unit: 'minute',
format: 'YYYY-MM-DDTHH:mm',
periodsPerDay: 24 * 60,
},
];

/**
* Returns a key used for redis and a local cache
* which store the number of API requests made for the given document today.
* Defined here so that it can easily be accessed in tests.
* The key contains the current UTC date so that counts from previous days are simply ignored and eventually evicted.
* which store the number of API requests made for the given document in the given period.
* The key contains the current UTC date (and maybe hour and minute)
* so that counts from previous periods are simply ignored and eventually evicted.
* This means that the daily measured usage conceptually 'resets' at UTC midnight.
* If `current` is false, returns a key for the next day/hour.
*/
export function docDailyApiUsageKey(docId: string) {
const d = new Date();
return `doc-${docId}-dailyApiUsage-${d.getUTCFullYear()}-${d.getUTCMonth() + 1}-${d.getUTCDate()}`;
export function docPeriodicApiUsageKey(docId: string, current: boolean, period: DocApiUsagePeriod, m: moment.Moment) {
if (!current) {
m = m.clone().add(1, period.unit);
}
return `doc-${docId}-periodicApiUsage-${m.format(period.format)}`;
}

/**
* Checks whether the doc API usage fits within the daily maximum.
* If so, returns an array of keys for each unit of time whose usage should be incremented.
* If not, returns undefined.
*
* Description of the algorithm this is implementing:
*
* Maintain up to 5 buckets: current day, next day, current hour, next hour, current minute.
* For each API request, check in order:
* - if current_day < DAILY_LIMIT, allow; increment all 3 current buckets
* - else if current_hour < DAILY_LIMIT/24, allow; increment next_day, current_hour, and current_minute buckets.
* - else if current_minute < DAILY_LIMIT/24/60, allow; increment next_day, next_hour, and current_minute buckets.
* - else reject.
* I think it has pretty good properties:
* - steady low usage may be maintained even if a burst exhausted the daily limit
* - user could get close to twice the daily limit on the first day with steady usage after a burst,
* but would then be limited to steady usage the next day.
*/
export function getDocApiUsageKeysToIncr(
docId: string, usage: LRUCache<string, number>, dailyMax: number, m: moment.Moment
): string[] | undefined {
// Start with keys for the current day, minute, and hour
const keys = docApiUsagePeriods.map(p => docPeriodicApiUsageKey(docId, true, p, m));
for (let i = 0; i < docApiUsagePeriods.length; i++) {
const period = docApiUsagePeriods[i];
const key = keys[i];
const periodMax = Math.ceil(dailyMax / period.periodsPerDay);
const count = usage.get(key) || 0;
if (count < periodMax) {
return keys;
}
// Allocation for the current day/hour/minute has been exceeded, increment the next day/hour/minute instead.
keys[i] = docPeriodicApiUsageKey(docId, false, period, m);
}
// Usage exceeded all the time buckets, so return undefined to reject the request.
}
3 changes: 2 additions & 1 deletion app/server/lib/DocWorkerMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import { IChecksumStore } from 'app/server/lib/IChecksumStore';
import { IElectionStore } from 'app/server/lib/IElectionStore';
import { IPermitStores } from 'app/server/lib/Permit';
import {RedisClient} from 'redis';

export interface DocWorkerInfo {
id: string;
Expand Down Expand Up @@ -67,5 +68,5 @@ export interface IDocWorkerMap extends IPermitStores, IElectionStore, IChecksumS
getWorkerGroup(workerId: string): Promise<string|null>;
getDocGroup(docId: string): Promise<string|null>;

incrementDocApiUsage(key: string): Promise<number|null>;
getRedisClient(): RedisClient;
}
1 change: 1 addition & 0 deletions stubs/app/server/declarations.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ declare module "redis" {
public sadd(key: string, val: string): Multi;
public set(key: string, val: string): Multi;
public setex(key: string, ttl: number, val: string): Multi;
public ttl(key: string): Multi;
public smembers(key: string): Multi;
public srandmember(key: string): Multi;
public srem(key: string, val: string): Multi;
Expand Down
Loading

0 comments on commit 0beb289

Please sign in to comment.