Skip to content

Commit

Permalink
fix: add mutex lock to providers not using quote in request
Browse files Browse the repository at this point in the history
  • Loading branch information
dolcalmi committed Nov 29, 2023
1 parent 0f27c92 commit 22b9082
Show file tree
Hide file tree
Showing 17 changed files with 449 additions and 759 deletions.
36 changes: 18 additions & 18 deletions history/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,29 @@
"db:seed": "knex --knexfile ./src/config/database.ts seed:run"
},
"dependencies": {
"@grpc/grpc-js": "^1.8.14",
"@grpc/grpc-js": "^1.9.12",
"@grpc/proto-loader": "^0.7.7",
"@opentelemetry/api": "^1.4.1",
"@opentelemetry/core": "^1.13.0",
"@opentelemetry/exporter-trace-otlp-http": "^0.43.0",
"@opentelemetry/exporter-prometheus": "^0.43.0",
"@opentelemetry/instrumentation": "^0.43.0",
"@opentelemetry/instrumentation-grpc": "^0.43.0",
"@opentelemetry/instrumentation-http": "^0.43.0",
"@opentelemetry/resources": "^1.13.0",
"@opentelemetry/sdk-metrics": "^1.13.0",
"@opentelemetry/sdk-trace-base": "^1.13.0",
"@opentelemetry/sdk-trace-node": "^1.13.0",
"@opentelemetry/semantic-conventions": "^1.13.0",
"@opentelemetry/api": "^1.7.0",
"@opentelemetry/core": "^1.18.1",
"@opentelemetry/exporter-prometheus": "^0.45.1",
"@opentelemetry/exporter-trace-otlp-http": "^0.45.1",
"@opentelemetry/instrumentation": "^0.45.1",
"@opentelemetry/instrumentation-grpc": "^0.45.1",
"@opentelemetry/instrumentation-http": "^0.45.1",
"@opentelemetry/resources": "^1.18.1",
"@opentelemetry/sdk-metrics": "^1.18.1",
"@opentelemetry/sdk-trace-base": "^1.18.1",
"@opentelemetry/sdk-trace-node": "^1.18.1",
"@opentelemetry/semantic-conventions": "^1.18.1",
"ajv": "^8.12.0",
"ccxt": "^4.0.102",
"ccxt": "^4.1.70",
"dotenv": "^16.0.3",
"grpc-health-check": "^1.8.0",
"grpc-health-check": "^2.0.0",
"js-yaml": "^4.1.0",
"knex": "^2.4.2",
"knex": "^3.0.1",
"lodash.merge": "^4.6.2",
"node-cron": "^3.0.2",
"node-cron": "^3.0.3",
"pg": "^8.11.0",
"pino": "^8.14.1"
"pino": "^8.16.2"
}
}
4 changes: 2 additions & 2 deletions history/src/domain/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export const ServiceStatus = {
SERVING: 1,
NOT_SERVING: 2,
SERVING: "SERVING",
NOT_SERVING: "NOT_SERVING",
} as const
8 changes: 4 additions & 4 deletions history/src/domain/shared/error-parsers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ export const parseErrorFromUnknown = (error: unknown): Error => {
error instanceof Error
? error
: typeof error === "string"
? new Error(error)
: error instanceof Object
? new Error(JSON.stringify(error))
: new Error("Unknown error")
? new Error(error)
: error instanceof Object
? new Error(JSON.stringify(error))
: new Error("Unknown error")
return err
}
10 changes: 5 additions & 5 deletions history/src/servers/history/run.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import dotenv from "dotenv"
import * as grpc from "@grpc/grpc-js"
import healthCheck from "grpc-health-check"
import { HealthImplementation, ServingStatusMap } from "grpc-health-check"

import { History } from "@app"

Expand All @@ -12,12 +12,12 @@ import { wrapAsyncToRunInSpan } from "@services/tracing"
import { protoDescriptorPrice } from "../grpc"

dotenv.config()
const statusMap = {
"": 2,
const statusMap: ServingStatusMap = {
"": "NOT_SERVING",
// 1 is serving
// 2 is not serving
}
const healthImpl = new healthCheck.Implementation(statusMap)
const healthImpl = new HealthImplementation(statusMap)

const listPrices = wrapAsyncToRunInSpan({
root: true,
Expand Down Expand Up @@ -49,7 +49,7 @@ export const startServer = async () => {
const server = new grpc.Server()

server.addService(protoDescriptorPrice.PriceHistory.service, { listPrices })
server.addService(healthCheck.service, healthImpl)
healthImpl.addToServer(server)

server.bindAsync(
`0.0.0.0:${port}`,
Expand Down
17 changes: 12 additions & 5 deletions history/src/services/exchanges/ccxt/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export const CcxtExchangeService = async ({
const ohlc = await client.fetchOHLCV(symbol, timeframe, since, limit)
if (!ohlc || !ohlc.length) return []

return ohlc.map(exchangePriceFromRaw)
return ohlc.map(exchangePriceFromRaw).filter(isExchangePrice)
} catch (error) {
baseLogger.error({ error }, "Ccxt unknown error")
return new UnknownExchangeServiceError(error)
Expand All @@ -47,7 +47,14 @@ export const CcxtExchangeService = async ({
return { listPrices }
}

const exchangePriceFromRaw = ([timestamp, , , close]: OHLCV): ExchangePrice => ({
timestamp: toTimestamp(timestamp),
price: toPrice(close),
})
const exchangePriceFromRaw = ([timestamp, , , , close]: OHLCV):
| ExchangePrice
| undefined => {
if (!timestamp || !close) return undefined
return {
timestamp: toTimestamp(timestamp),
price: toPrice(close),
}
}

const isExchangePrice = (item: ExchangePrice | undefined): item is ExchangePrice => !!item
24 changes: 12 additions & 12 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,23 @@
"node": "20"
},
"devDependencies": {
"@types/eslint": "^8.44.2",
"@types/jest": "^29.5.5",
"@types/node": "^20.6.3",
"@types/node-cron": "^3.0.8",
"@typescript-eslint/eslint-plugin": "^6.7.2",
"@typescript-eslint/parser": "^6.7.2",
"eslint": "^8.49.0",
"@types/eslint": "^8.44.7",
"@types/jest": "^29.5.10",
"@types/node": "^20.10.0",
"@types/node-cron": "^3.0.11",
"@typescript-eslint/eslint-plugin": "^6.13.1",
"@typescript-eslint/parser": "^6.13.1",
"eslint": "^8.54.0",
"eslint-config-prettier": "^9.0.0",
"eslint-plugin-import": "^2.28.1",
"eslint-plugin-prettier": "^5.0.0",
"eslint-plugin-import": "^2.29.0",
"eslint-plugin-prettier": "^5.0.1",
"jest": "^29.7.0",
"pino-pretty": "^10.2.0",
"prettier": "^3.0.3",
"pino-pretty": "^10.2.3",
"prettier": "^3.1.0",
"ts-jest": "^29.1.1",
"ts-node": "^10.9.1",
"ts-node-dev": "^2.0.0",
"tscpaths": "^0.0.9",
"typescript": "^5.2.2"
"typescript": "^5.3.2"
}
}
2 changes: 1 addition & 1 deletion realtime/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ exchanges:
quote: "USD"
provider: "ccxt"
cron: "*/20 * * * * *"
- name: "bitstamp1"
- name: "bitstamp"
enabled: true
quoteAlias: "USD"
base: "BTC"
Expand Down
37 changes: 19 additions & 18 deletions realtime/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,30 @@
"watch-compile": "tsc --watch --noEmit --skipLibCheck"
},
"dependencies": {
"@grpc/grpc-js": "^1.8.14",
"@grpc/grpc-js": "^1.9.12",
"@grpc/proto-loader": "^0.7.7",
"@opentelemetry/api": "^1.4.1",
"@opentelemetry/core": "^1.13.0",
"@opentelemetry/exporter-trace-otlp-http": "^0.43.0",
"@opentelemetry/exporter-prometheus": "^0.43.0",
"@opentelemetry/instrumentation": "^0.43.0",
"@opentelemetry/instrumentation-grpc": "^0.43.0",
"@opentelemetry/instrumentation-http": "^0.43.0",
"@opentelemetry/resources": "^1.13.0",
"@opentelemetry/sdk-metrics": "^1.13.0",
"@opentelemetry/sdk-trace-base": "^1.13.0",
"@opentelemetry/sdk-trace-node": "^1.13.0",
"@opentelemetry/semantic-conventions": "^1.13.0",
"@opentelemetry/api": "^1.7.0",
"@opentelemetry/core": "^1.18.1",
"@opentelemetry/exporter-prometheus": "^0.45.1",
"@opentelemetry/exporter-trace-otlp-http": "^0.45.1",
"@opentelemetry/instrumentation": "^0.45.1",
"@opentelemetry/instrumentation-grpc": "^0.45.1",
"@opentelemetry/instrumentation-http": "^0.45.1",
"@opentelemetry/resources": "^1.18.1",
"@opentelemetry/sdk-metrics": "^1.18.1",
"@opentelemetry/sdk-trace-base": "^1.18.1",
"@opentelemetry/sdk-trace-node": "^1.18.1",
"@opentelemetry/semantic-conventions": "^1.18.1",
"ajv": "^8.12.0",
"axios": "^1.5.0",
"ccxt": "^4.0.102",
"async-mutex": "^0.4.0",
"axios": "^1.6.2",
"ccxt": "^4.1.70",
"dotenv": "^16.0.3",
"grpc-health-check": "^1.8.0",
"grpc-health-check": "^2.0.0",
"js-yaml": "^4.1.0",
"lodash.merge": "^4.6.2",
"node-cache": "^5.1.2",
"node-cron": "^3.0.2",
"pino": "^8.14.1"
"node-cron": "^3.0.3",
"pino": "^8.16.2"
}
}
4 changes: 2 additions & 2 deletions realtime/src/domain/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export const ServiceStatus = {
SERVING: 1,
NOT_SERVING: 2,
SERVING: "SERVING",
NOT_SERVING: "NOT_SERVING",
} as const
8 changes: 4 additions & 4 deletions realtime/src/domain/shared/error-parsers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ export const parseErrorFromUnknown = (error: unknown): Error => {
error instanceof Error
? error
: typeof error === "string"
? new Error(error)
: error instanceof Object
? new Error(JSON.stringify(error))
: new Error("Unknown error")
? new Error(error)
: error instanceof Object
? new Error(JSON.stringify(error))
: new Error("Unknown error")
return err
}
10 changes: 5 additions & 5 deletions realtime/src/servers/realtime/run.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import healthCheck from "grpc-health-check"
import * as grpc from "@grpc/grpc-js"
import { HealthImplementation, ServingStatusMap } from "grpc-health-check"

import { Realtime } from "@app"
import { getStatus, startWatchers } from "@app/realtime"
Expand All @@ -13,14 +13,14 @@ import { protoDescriptorPrice } from "../grpc"

// Define service status map. Key is the service name, value is the corresponding status.
// By convention, the empty string "" key represents that status of the entire server.
const statusMap = {
"": 2,
const statusMap: ServingStatusMap = {
"": "NOT_SERVING",
// 1 is serving
// 2 is not serving
}

// Construct the health service implementation
const healthImpl = new healthCheck.Implementation(statusMap)
const healthImpl = new HealthImplementation(statusMap)

const getPrice = wrapAsyncToRunInSpan({
root: true,
Expand Down Expand Up @@ -67,7 +67,7 @@ const server = new grpc.Server()

export const startServer = () => {
server.addService(protoDescriptorPrice.PriceFeed.service, { getPrice, listCurrencies })
server.addService(healthCheck.service, healthImpl)
healthImpl.addToServer(server)

server.bindAsync(
`0.0.0.0:${port}`,
Expand Down
2 changes: 2 additions & 0 deletions realtime/src/services/exchanges/ccxt/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const tickerFromRaw = ({
ask,
timestamp,
}: CcxtTicker): Ticker | InvalidTickerError => {
if (!bid || !ask || !timestamp) return new InvalidTickerError()

if (bid > 0 && ask > 0 && timestamp > 0) {
return {
bid: toPrice(bid),
Expand Down
6 changes: 5 additions & 1 deletion realtime/src/services/exchanges/currencybeacon/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import axios from "axios"
import { Mutex } from "async-mutex"

import {
InvalidTickerError,
Expand All @@ -11,6 +12,7 @@ import { LocalCacheService } from "@services/cache"
import { CacheKeys } from "@domain/cache"
import { baseLogger } from "@services/logger"

const mutex = new Mutex()
export const CurrencyBeaconExchangeService = async ({
base,
quote,
Expand Down Expand Up @@ -89,7 +91,9 @@ export const CurrencyBeaconExchangeService = async ({
}
}

return { fetchTicker }
return {
fetchTicker: () => mutex.runExclusive(fetchTicker),
}
}

const isRatesObjectValid = (rates: unknown): rates is CurrencyBeaconRates => {
Expand Down
6 changes: 5 additions & 1 deletion realtime/src/services/exchanges/exchangeratehost/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import axios from "axios"
import { Mutex } from "async-mutex"

import {
InvalidTickerError,
Expand All @@ -11,6 +12,7 @@ import { LocalCacheService } from "@services/cache"
import { CacheKeys } from "@domain/cache"
import { baseLogger } from "@services/logger"

const mutex = new Mutex()
export const ExchangeRateHostService = async ({
base,
quote,
Expand Down Expand Up @@ -89,7 +91,9 @@ export const ExchangeRateHostService = async ({
}
}

return { fetchTicker }
return {
fetchTicker: () => mutex.runExclusive(fetchTicker),
}
}

const isRatesObjectValid = (rates: unknown): rates is ExchangeRateHostRates => {
Expand Down
6 changes: 5 additions & 1 deletion realtime/src/services/exchanges/free-currency-rates/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import axios from "axios"
import { Mutex } from "async-mutex"

import {
InvalidTickerError,
Expand All @@ -11,6 +12,7 @@ import { LocalCacheService } from "@services/cache"
import { CacheKeys } from "@domain/cache"
import { baseLogger } from "@services/logger"

const mutex = new Mutex()
export const FreeCurrencyRatesExchangeService = async ({
base,
quote,
Expand Down Expand Up @@ -72,7 +74,9 @@ export const FreeCurrencyRatesExchangeService = async ({
}
}

return { fetchTicker }
return {
fetchTicker: () => mutex.runExclusive(fetchTicker),
}
}

const tickerFromRaw = ({
Expand Down
6 changes: 5 additions & 1 deletion realtime/src/services/exchanges/yadio/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import axios from "axios"
import { Mutex } from "async-mutex"

import {
InvalidTickerError,
Expand All @@ -12,6 +13,7 @@ import { LocalCacheService } from "@services/cache"
import { CacheKeys } from "@domain/cache"
import { baseLogger } from "@services/logger"

const mutex = new Mutex()
export const YadioExchangeService = async ({
base,
quote,
Expand Down Expand Up @@ -66,7 +68,9 @@ export const YadioExchangeService = async ({
}
}

return { fetchTicker }
return {
fetchTicker: () => mutex.runExclusive(fetchTicker),
}
}

const isRatesObjectValid = (rates: unknown): rates is YadioRates => {
Expand Down
Loading

0 comments on commit 22b9082

Please sign in to comment.