From 3333b3a69d6b6412559c36e91758ce5e97aa2b3e Mon Sep 17 00:00:00 2001 From: Konstantinos Feretos Date: Fri, 21 Oct 2022 14:26:49 +0300 Subject: [PATCH] fix(grpc-sdk): loki availability checks (#402) --- libraries/grpc-sdk/src/index.ts | 20 ++------ libraries/grpc-sdk/src/utilities/Logger.ts | 51 ++++++++++++++++++- libraries/grpc-sdk/src/utilities/index.ts | 2 + .../src/utilities/linearBackoffTimeout.ts | 28 +++++++++- .../src/adapters/sequelize-adapter/index.ts | 2 +- .../config-manager/service-discovery/index.ts | 2 +- 6 files changed, 84 insertions(+), 21 deletions(-) rename packages/core/src/config-manager/service-discovery/utils.ts => libraries/grpc-sdk/src/utilities/linearBackoffTimeout.ts (52%) diff --git a/libraries/grpc-sdk/src/index.ts b/libraries/grpc-sdk/src/index.ts index a5f21abc2..eacb10af7 100644 --- a/libraries/grpc-sdk/src/index.ts +++ b/libraries/grpc-sdk/src/index.ts @@ -33,10 +33,9 @@ import { import { GrpcError, HealthCheckStatus } from './types'; import { createSigner } from 'fast-jwt'; import { checkModuleHealth } from './classes/HealthCheck'; -import { ConduitLogger } from './utilities/Logger'; +import { ConduitLogger, setupLoki } from './utilities/Logger'; import winston from 'winston'; import path from 'path'; -import LokiTransport from 'winston-loki'; import { ConduitMetrics } from './metrics'; export default class ConduitGrpcSdk { @@ -102,21 +101,7 @@ export default class ConduitGrpcSdk { module_instance: this.instance, }); } - if (process.env.LOKI_URL && process.env.LOKI_URL !== '') { - ConduitGrpcSdk.Logger.addTransport( - new LokiTransport({ - level: 'debug', - host: process.env.LOKI_URL, - batching: false, - replaceTimestamp: true, - labels: { - module: this.name, - instance: this.instance, - }, - }), - ); - } - + setupLoki(this.name, this.instance).then(); this.serverUrl = serverUrl; this._watchModules = watchModules; this._serviceHealthStatusGetter = serviceHealthStatusGetter; @@ -494,3 +479,4 @@ export * from './helpers'; export * from './constants'; export * from './routing'; export * from './types'; +export * from './utilities'; diff --git a/libraries/grpc-sdk/src/utilities/Logger.ts b/libraries/grpc-sdk/src/utilities/Logger.ts index 20ccaf66b..ccfff2886 100644 --- a/libraries/grpc-sdk/src/utilities/Logger.ts +++ b/libraries/grpc-sdk/src/utilities/Logger.ts @@ -1,6 +1,10 @@ -import winston, { format, LogCallback, Logger } from 'winston'; +import ConduitGrpcSdk from '../index'; import { Indexable } from '../interfaces'; +import { linearBackoffTimeoutAsync } from '../utilities'; +import winston, { format, LogCallback, Logger } from 'winston'; import { isEmpty } from 'lodash'; +import { get } from 'http'; +import LokiTransport from 'winston-loki'; const processMeta = (meta: Indexable) => { if (Array.isArray(meta)) { @@ -109,3 +113,48 @@ export class ConduitLogger { return this._winston; } } + +async function lokiReadyCheck(lokiUrl: string): Promise { + return new Promise((resolve, reject) => { + const data: any[] = []; + get(`${lokiUrl}/ready`, r => { + r.on('data', chunk => data.push(chunk)); + r.on('end', () => { + if (Buffer.concat(data).toString() === 'ready\n') resolve(); + else reject(false); + }); + }).on('error', err => { + reject(err.message); + }); + }); +} + +export async function setupLoki(module: string, instance: string) { + let lokiUrl = process.env.LOKI_URL; + if (lokiUrl && lokiUrl !== '') { + if (lokiUrl.endsWith('/')) lokiUrl = lokiUrl.slice(0, -1); + const onTry = async () => { + return await lokiReadyCheck(lokiUrl!) + .then(() => { + ConduitGrpcSdk.Logger.addTransport( + new LokiTransport({ + level: 'debug', + host: lokiUrl!, + batching: false, + replaceTimestamp: true, + labels: { + module, + instance, + }, + }), + ); + return false; + }) + .catch(() => true); // retry + }; + const onFailure = () => { + ConduitGrpcSdk.Logger.error(`Failed to connect to Loki on '${lokiUrl}'`); + }; + await linearBackoffTimeoutAsync(onTry, 250, 15, onFailure, true); + } +} diff --git a/libraries/grpc-sdk/src/utilities/index.ts b/libraries/grpc-sdk/src/utilities/index.ts index fbf26b868..548424b0c 100644 --- a/libraries/grpc-sdk/src/utilities/index.ts +++ b/libraries/grpc-sdk/src/utilities/index.ts @@ -1,3 +1,5 @@ +export * from './linearBackoffTimeout'; + export function sleep(ms: number) { return new Promise(resolve => { setTimeout(resolve, ms); diff --git a/packages/core/src/config-manager/service-discovery/utils.ts b/libraries/grpc-sdk/src/utilities/linearBackoffTimeout.ts similarity index 52% rename from packages/core/src/config-manager/service-discovery/utils.ts rename to libraries/grpc-sdk/src/utilities/linearBackoffTimeout.ts index 1e7dd80f5..bb96c096d 100644 --- a/packages/core/src/config-manager/service-discovery/utils.ts +++ b/libraries/grpc-sdk/src/utilities/linearBackoffTimeout.ts @@ -1,8 +1,10 @@ -/* +/** * Registers a timeout with linear backoff. * onFailure() only runs on rep exhaustion. * Timeout can be cleared through returned clear() or inside onTry(). */ +import { clearTimeout } from 'timers'; + export function linearBackoffTimeout( onTry: (timeout: NodeJS.Timeout) => void, delay: number, @@ -31,3 +33,27 @@ export function linearBackoffTimeout( }, }; } + +/** + * Registers a timeout with linear backoff. + * onFailure() only runs on rep exhaustion. + * @param {() => Promise} onTry - Async handler, returns 'continue' boolean flag + */ +export async function linearBackoffTimeoutAsync( + onTry: () => Promise, + delay: number, + reps?: number, + onFailure?: () => void | Promise, + startNow = false, +) { + const nextRep = () => reps === undefined || --reps > 0; + const invoker = async () => { + delay = Math.floor(delay * 2); + if (delay > 0 && nextRep()) { + if (await onTry()) setTimeout(invoker, delay); + } else { + onFailure && (await onFailure()); + } + }; + setTimeout(invoker, startNow ? 0 : delay); +} diff --git a/modules/database/src/adapters/sequelize-adapter/index.ts b/modules/database/src/adapters/sequelize-adapter/index.ts index b2aa70fb4..2ce70a1cc 100644 --- a/modules/database/src/adapters/sequelize-adapter/index.ts +++ b/modules/database/src/adapters/sequelize-adapter/index.ts @@ -6,8 +6,8 @@ import ConduitGrpcSdk, { ConduitSchema, GrpcError, Indexable, + sleep, } from '@conduitplatform/grpc-sdk'; -import { sleep } from '@conduitplatform/grpc-sdk/dist/utilities'; import { DatabaseAdapter } from '../DatabaseAdapter'; import { validateSchema } from '../utils/validateSchema'; import { sqlSchemaConverter } from '../../introspection/sequelize/utils'; diff --git a/packages/core/src/config-manager/service-discovery/index.ts b/packages/core/src/config-manager/service-discovery/index.ts index dbb3e16ae..4b138c572 100644 --- a/packages/core/src/config-manager/service-discovery/index.ts +++ b/packages/core/src/config-manager/service-discovery/index.ts @@ -4,9 +4,9 @@ import ConduitGrpcSdk, { GrpcRequest, GrpcResponse, HealthCheckStatus, + linearBackoffTimeout, } from '@conduitplatform/grpc-sdk'; import { IModuleConfig } from '../../interfaces/IModuleConfig'; -import { linearBackoffTimeout } from './utils'; import { ServerWritableStream, status } from '@grpc/grpc-js'; import { EventEmitter } from 'events'; import { clearTimeout } from 'timers';