From 8693cf493ca5008ffb5ec4e0f3ffa15de46e4ca9 Mon Sep 17 00:00:00 2001 From: Zhiyuan He Date: Sun, 27 Sep 2020 11:33:58 +0800 Subject: [PATCH] fix fix fix fix fix fix fix fix fix Add index on task uid instead of framework name and attempt index (#4938) fix fix fix fix fix fix fix --- .../config/database-controller.yaml | 12 +- .../deploy/database-controller.yaml.template | 57 +++++++ src/database-controller/sdk/index.js | 12 +- src/database-controller/src/common/init.js | 12 ++ src/database-controller/src/common/k8s.js | 22 ++- src/database-controller/src/package.json | 1 + .../src/watcher/cluster-event/config.js | 41 ++++++ .../src/watcher/cluster-event/index.js | 139 ++++++++++++++++++ .../src/watcher/framework/index.js | 1 + src/database-controller/src/yarn.lock | 17 ++- 10 files changed, 308 insertions(+), 6 deletions(-) create mode 100644 src/database-controller/src/common/init.js create mode 100644 src/database-controller/src/watcher/cluster-event/config.js create mode 100644 src/database-controller/src/watcher/cluster-event/index.js diff --git a/src/database-controller/config/database-controller.yaml b/src/database-controller/config/database-controller.yaml index f513f50005..7353c086af 100644 --- a/src/database-controller/config/database-controller.yaml +++ b/src/database-controller/config/database-controller.yaml @@ -27,7 +27,7 @@ write-merger-max-heap-mb: 2048 # db poller # Polling interval of database poller. Default value is 120. db-poller-interval-second: 120 -# Max connection number to database in write merger. +# Max connection number to database in poller. db-poller-max-db-connection: 10 # Max rpc concurrency for db poller db-poller-max-rpc-concurrency: 50 @@ -39,3 +39,13 @@ db-poller-max-heap-mb: 4096 framework-watcher-max-rpc-concurrency: 150 # Framework watcher max heap size in MB framework-watcher-max-heap-mb: 8192 + +# cluster event watcher +# Max rpc concurrency for cluster event watcher +cluster-event-watcher-max-rpc-concurrency: 40 +# cluster event watcher max heap size in MB +cluster-event-watcher-max-heap-mb: 2048 +# Max connection number to database in cluster event watcher. +cluster-event-max-db-connection: 40 +# Max disk usage in internal storage for cluster event watcher +cluster-event-watcher-max-disk-usage-percent: 80 diff --git a/src/database-controller/deploy/database-controller.yaml.template b/src/database-controller/deploy/database-controller.yaml.template index 91d321acc1..302e800f0f 100644 --- a/src/database-controller/deploy/database-controller.yaml.template +++ b/src/database-controller/deploy/database-controller.yaml.template @@ -158,6 +158,59 @@ spec: - name: MAX_RPC_CONCURRENCY value: "{{ cluster_cfg['database-controller']['framework-watcher-max-rpc-concurrency'] }}" command: ["node", "--max-old-space-size={{ cluster_cfg['database-controller']['framework-watcher-max-heap-mb'] }}", "watcher/framework/index.js"] + - name: cluster-event-watcher + image: {{ cluster_cfg["cluster"]["docker-registry"]["prefix"] }}database-controller:{{ cluster_cfg["cluster"]["docker-registry"]["tag"] }} + imagePullPolicy: Always + volumeMounts: + - name: internal-data-dir + mountPath: /paiInternal/storage + mountPropagation: "None" + env: + # Log level of all logs. You can choose from error, warn, info, http, verbose, debug, and silly. Default value is info. + - name: LOG_LEVEL + value: "{{ cluster_cfg['database-controller']['log-level'] }}" + # The global timeout for all calls to Kubernetes API server. Default value is 120. + - name: K8S_CONNECTION_TIMEOUT_SECOND + value: "{{ cluster_cfg['database-controller']['k8s-connection-timeout-second'] }}" + # The timeout for calls to write merger. Default value is 120. + - name: WRITE_MERGER_CONNECTION_TIMEOUT_SECOND + value: "{{ cluster_cfg['database-controller']['write-merger-connection-timeout-second'] }}" + # Whether to enable retain mode. + # If someone submits a framework directly without accessing database, we can find the framework in write merger. + # For these frameworks, if retain mode is on, we will ignore them. + # If retain mode is off (it is the default setting), we will delete the frameworks to maintain ground-truth in database. + - name: RETAIN_MODE_ENABLED +{% if cluster_cfg['database-controller']['retain-mode'] %} + value: "true" +{% else %} + value: "false" +{% endif %} + # If RBAC is set up in current environment. + # If RBAC_IN_CLUSTER=true, the API Server client can read all settings automatically in container. + # If RBAC_IN_CLUSTER=false, we should set CUSTOM_K8S_API_SERVER_URL. +{% if cluster_cfg['cluster']['common']['k8s-rbac'] != 'false' %} + - name: RBAC_IN_CLUSTER + value: "true" +{% else %} + - name: RBAC_IN_CLUSTER + value: "false" + - name: CUSTOM_K8S_API_SERVER_URL + value: {{ cluster_cfg['layout']['kubernetes']['api-servers-url'] }} +{% endif %} + # The database client string. It follows the format "://:@:/" + - name: DB_CONNECTION_STR + value: "{{ cluster_cfg['postgresql']['connection-str'] }}" + # Max connection number to database in write merger. Default value is 10. + - name: MAX_DB_CONNECTION + value: "{{ cluster_cfg['database-controller']['db-poller-max-db-connection'] }}" + # Max rpc concurrency for cluster event watcher + - name: MAX_RPC_CONCURRENCY + value: "{{ cluster_cfg['database-controller']['cluster-event-watcher-max-rpc-concurrency'] }}" + - name: DISK_PATH + value: "/paiInternal/storage" + - name: MAX_DISK_USAGE_PERCENT + value: "{{ cluster_cfg['database-controller']['cluster-event-watcher-max-disk-usage-percent'] }}" + command: ["node", "--max-old-space-size={{ cluster_cfg['database-controller']['cluster-event-watcher-max-heap-mb'] }}", "watcher/cluster-event/index.js"] - name: poller image: {{ cluster_cfg["cluster"]["docker-registry"]["prefix"] }}database-controller:{{ cluster_cfg["cluster"]["docker-registry"]["tag"] }} imagePullPolicy: Always @@ -209,5 +262,9 @@ spec: - name: MAX_RPC_CONCURRENCY value: "{{ cluster_cfg['database-controller']['db-poller-max-rpc-concurrency'] }}" command: ["node", "--max-old-space-size={{ cluster_cfg['database-controller']['db-poller-max-heap-mb'] }}", "poller/index.js"] + volumes: + - name: internal-data-dir + hostPath: + path: '{{ cluster_cfg["internal-storage"]["root-path"] }}/storage' imagePullSecrets: - name: {{ cluster_cfg["cluster"]["docker-registry"]["secret-name"] }} diff --git a/src/database-controller/sdk/index.js b/src/database-controller/sdk/index.js index 7eb75bd3c6..e9e88f1d3b 100644 --- a/src/database-controller/sdk/index.js +++ b/src/database-controller/sdk/index.js @@ -161,7 +161,7 @@ class DatabaseModel { indexes: [ { unique: false, - fields: ['frameworkName', 'attemptIndex'], + fields: ['taskUid'], }, ], freezeTableName: true, @@ -211,11 +211,21 @@ class DatabaseModel { type: Sequelize.STRING(64), allowNull: false, }, + podUid: Sequelize.STRING(36), + taskroleName: Sequelize.STRING(256), + taskName: Sequelize.STRING(256), + taskIndex: Sequelize.INTEGER, type: { type: Sequelize.STRING(32), allowNull: false, }, + reason: Sequelize.STRING(64), message: Sequelize.TEXT, + firstTimestamp: Sequelize.DATE, + lastTimestamp: Sequelize.DATE, + count: Sequelize.INTEGER, + sourceComponent: Sequelize.STRING(255), + sourceHost: Sequelize.STRING(255), event: Sequelize.TEXT, }, { diff --git a/src/database-controller/src/common/init.js b/src/database-controller/src/common/init.js new file mode 100644 index 0000000000..2ee7a8f454 --- /dev/null +++ b/src/database-controller/src/common/init.js @@ -0,0 +1,12 @@ +const logger = require('@dbc/common/logger'); + +// We doesn't allow unhandled promise rejection. +// It will make the watcher stop all actions (in some rare cases). +process.on('unhandledRejection', function(reason, p) { + logger.error( + `Encounter unhandled rejection of promise, reason: ${reason}`, + function() { + process.exit(1); + }, + ); +}); diff --git a/src/database-controller/src/common/k8s.js b/src/database-controller/src/common/k8s.js index 05f9f30d4c..ce838a639a 100644 --- a/src/database-controller/src/common/k8s.js +++ b/src/database-controller/src/common/k8s.js @@ -150,6 +150,25 @@ function getFrameworkInformer( return informer; } +const coreV1Client = kc.makeApiClient(k8s.CoreV1Api); + +function getEventInformer(timeoutSeconds = 365 * 86400, namespace = 'default') { + /* + The usage is very like `getFrameworkInformer`. Please see the comments of `getFrameworkInformer` for reference. + + */ + const listFn = () => { + logger.info('Cluster events are listed.'); + return coreV1Client.listNamespacedEvent(namespace); + }; + const informer = k8s.makeInformer( + kc, + `/api/v1/namespaces/${namespace}/events?timeoutSeconds=${timeoutSeconds}`, + listFn, + ); + return informer; +} + const priorityClassClient = kc.makeApiClient(k8s.SchedulingV1Api); async function createPriorityClass(priorityClassDef) { @@ -162,8 +181,6 @@ async function deletePriorityClass(name) { return res.response; } -const coreV1Client = kc.makeApiClient(k8s.CoreV1Api); - async function createSecret(secretDef) { const res = await coreV1Client.createNamespacedSecret( secretDef.metadata.namespace, @@ -255,4 +272,5 @@ module.exports = { timeoutMs, ), getFrameworkInformer: getFrameworkInformer, + getEventInformer: getEventInformer, }; diff --git a/src/database-controller/src/package.json b/src/database-controller/src/package.json index 2fceecec57..b944024d7c 100644 --- a/src/database-controller/src/package.json +++ b/src/database-controller/src/package.json @@ -15,6 +15,7 @@ "body-parser": "^1.19.0", "compression": "^1.7.4", "cors": "^2.8.5", + "diskusage": "^1.1.3", "dotenv": "^8.2.0", "express": "^4.17.1", "http-errors": "^1.8.0", diff --git a/src/database-controller/src/watcher/cluster-event/config.js b/src/database-controller/src/watcher/cluster-event/config.js new file mode 100644 index 0000000000..2eec298145 --- /dev/null +++ b/src/database-controller/src/watcher/cluster-event/config.js @@ -0,0 +1,41 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +const basicConfig = require('@dbc/common/config'); +const _ = require('lodash'); +const Joi = require('joi'); + +const configSchema = Joi.object() + .keys({ + dbConnectionStr: Joi.string().required(), + maxDatabaseConnection: Joi.number() + .integer() + .required(), + maxRpcConcurrency: Joi.number() + .integer() + .required(), + diskPath: Joi.string().required(), + diskCheckIntervalSecond: Joi.number() + .integer() + .required(), + maxDiskUsagePercent: Joi.number() + .integer() + .required(), + }) + .required(); + +const config = { + dbConnectionStr: process.env.DB_CONNECTION_STR, + maxDatabaseConnection: parseInt(process.env.MAX_DB_CONNECTION), + maxRpcConcurrency: parseInt(process.env.MAX_RPC_CONCURRENCY), + diskPath: process.env.DISK_PATH, + diskCheckIntervalSecond: 60, + maxDiskUsagePercent: parseInt(process.env.MAX_DISK_USAGE_PERCENT), +}; + +const { error, value } = Joi.validate(config, configSchema); +if (error) { + throw new Error(`Config error\n${error}`); +} + +module.exports = _.assign(basicConfig, value); diff --git a/src/database-controller/src/watcher/cluster-event/index.js b/src/database-controller/src/watcher/cluster-event/index.js new file mode 100644 index 0000000000..2ae709a723 --- /dev/null +++ b/src/database-controller/src/watcher/cluster-event/index.js @@ -0,0 +1,139 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +require('module-alias/register'); +require('dotenv').config(); +const AsyncLock = require('async-lock'); +const _ = require('lodash'); +const DatabaseModel = require('openpaidbsdk'); +const { default: PQueue } = require('p-queue'); +const interval = require('interval-promise'); +require('@dbc/common/init'); +const logger = require('@dbc/common/logger'); +const { getEventInformer } = require('@dbc/common/k8s'); +const { alwaysRetryDecorator } = require('@dbc/common/util'); +const disk = require('diskusage'); +const config = require('@dbc/watcher/cluster-event/config'); + +// Here, we use AsyncLock to control the concurrency of events with the same uid; +// e.g. If one event has ADDED, MODIFED, and MODIFED incidents, we use AsyncLock +// to ensure they will be delivered to write-merger in order. +// In the same time, we use PQueue to control the concurrency of events with different uid; +// e.g. If there are event 1 ~ event 30000, only some of them can be processed concurrently. +const lock = new AsyncLock({ maxPending: Number.MAX_SAFE_INTEGER }); +const queue = new PQueue({ concurrency: config.maxRpcConcurrency }); +const databaseModel = new DatabaseModel( + config.dbConnectionStr, + config.maxDatabaseConnection, +); + +async function synchronizeEvent(eventType, apiObject) { + // query db instead + const uid = apiObject.metadata.uid; + const names = apiObject.involvedObject.name.split('-'); + + const obj = { + uid: uid, + frameworkName: names[0], + podUid: apiObject.involvedObject.uid, + taskroleName: names[1], + taskName: apiObject.involvedObject.name, + taskIndex: parseInt(names[2]), + type: apiObject.type, + reason: apiObject.reason, + message: apiObject.message, + firstTimestamp: apiObject.firstTimestamp, + lastTimestamp: apiObject.lastTimestamp, + count: apiObject.count, + sourceComponent: _.get(apiObject, 'source.component', null), + sourceHost: _.get(apiObject, 'source.host', null), + event: JSON.stringify(apiObject), + }; + + databaseModel.FrameworkEvent.upsert(obj, { where: { uid: uid } }); +} + +const eventHandler = (eventType, apiObject) => { + /* + framework name-based lock + always retry + */ + const receivedTs = new Date().getTime(); + const involvedObjKind = apiObject.involvedObject.kind; + const involvedObjName = apiObject.involvedObject.name; + const uid = apiObject.metadata.uid; + if ( + involvedObjKind === 'Pod' && + /^[a-z0-9]{32}-[A-Za-z0-9._~]+-[0-9]+$/.test(involvedObjName) + ) { + logger.info( + `Cluster event type=${eventType} receivedTs=${receivedTs} uid=${uid} involvedObjKind=${involvedObjKind} involvedObjName=${involvedObjName} received.`, + ); + lock.acquire(uid, () => { + return queue.add( + alwaysRetryDecorator( + () => synchronizeEvent(eventType, apiObject), + `Sync to database type=${eventType} receivedTs=${receivedTs} uid=${uid} involvedObjKind=${involvedObjKind} involvedObjName=${involvedObjName}`, + ), + ); + }); + } else { + logger.info( + `Cluster Event type=${eventType} receivedTs=${receivedTs} uid=${uid} involvedObjKind=${involvedObjKind} involvedObjName=${involvedObjName} received but ignored.`, + ); + } +}; + +async function assertDiskUsageHealthy() { + try { + const { available, total } = await disk.check(config.diskPath); + const currentUsage = ((total - available) / total) * 100; + logger.info(`Current internal storage usage is ${currentUsage}% .`); + if (currentUsage > config.maxDiskUsagePercent) { + logger.error( + `Internal storage usage exceeds ${config.maxDiskUsagePercent}%, exit.`, + function() { + process.exit(1); + }, + ); + } + } catch (err) { + logger.error(`Check disk usage fails, details: ${err}`, function() { + process.exit(1); + }); + } +} + +function startInformer() { + const informer = getEventInformer(); + + informer.on('add', apiObject => { + eventHandler('ADDED', apiObject); + }); + informer.on('update', apiObject => { + eventHandler('MODIFED', apiObject); + }); + informer.on('delete', apiObject => { + eventHandler('DELETED', apiObject); + }); + informer.on('error', err => { + // If any error happens, the process should exit, and let Kubernetes restart it. + logger.error(err, function() { + process.exit(1); + }); + }); + informer.start(); +} + +function startDiskCheck() { + interval(assertDiskUsageHealthy, config.diskCheckIntervalSecond * 1000, { + stopOnError: false, + }); +} + +async function main() { + await assertDiskUsageHealthy(); + startInformer(); + startDiskCheck(); +} + +main(); diff --git a/src/database-controller/src/watcher/framework/index.js b/src/database-controller/src/watcher/framework/index.js index b88f3c85aa..a437e0f699 100644 --- a/src/database-controller/src/watcher/framework/index.js +++ b/src/database-controller/src/watcher/framework/index.js @@ -6,6 +6,7 @@ require('dotenv').config(); const fetch = require('node-fetch'); const AsyncLock = require('async-lock'); const { default: PQueue } = require('p-queue'); +require('@dbc/common/init'); const logger = require('@dbc/common/logger'); const { getFrameworkInformer } = require('@dbc/common/k8s'); const { alwaysRetryDecorator } = require('@dbc/common/util'); diff --git a/src/database-controller/src/yarn.lock b/src/database-controller/src/yarn.lock index 4e8ad2861a..3e6ba96d36 100644 --- a/src/database-controller/src/yarn.lock +++ b/src/database-controller/src/yarn.lock @@ -593,6 +593,14 @@ destroy@~1.0.4: resolved "https://registry.yarnpkg.com/destroy/-/destroy-1.0.4.tgz#978857442c44749e4206613e37946205826abd80" integrity sha1-l4hXRCxEdJ5CBmE+N5RiBYJqvYA= +diskusage@^1.1.3: + version "1.1.3" + resolved "https://registry.yarnpkg.com/diskusage/-/diskusage-1.1.3.tgz#680d7dbf1b679168a195c9240eb3552cbd2c067b" + integrity sha512-EAyaxl8hy4Ph07kzlzGTfpbZMNAAAHXSZtNEMwdlnSd1noHzvA6HsgKt4fEMSvaEXQYLSphe5rPMxN4WOj0hcQ== + dependencies: + es6-promise "^4.2.5" + nan "^2.14.0" + doctrine@1.5.0: version "1.5.0" resolved "https://registry.yarnpkg.com/doctrine/-/doctrine-1.5.0.tgz#379dce730f6166f76cefa4e6707a159b02c5a6fa" @@ -699,7 +707,7 @@ es-to-primitive@^1.2.1: is-date-object "^1.0.1" is-symbol "^1.0.2" -es6-promise@^4.2.8: +es6-promise@^4.2.5, es6-promise@^4.2.8: version "4.2.8" resolved "https://registry.yarnpkg.com/es6-promise/-/es6-promise-4.2.8.tgz#4eb21594c972bc40553d276e510539143db53e0a" integrity sha512-HJDGx5daxeIvxdBxvG2cb9g4tEvwIk3i8+nhX0yGrYmZUzbkdg8QbDevheDB8gd0//uPj4c1EQua8Q+MViT0/w== @@ -1825,6 +1833,11 @@ mute-stream@0.0.7: resolved "https://registry.yarnpkg.com/mute-stream/-/mute-stream-0.0.7.tgz#3075ce93bc21b8fab43e1bc4da7e8115ed1e7bab" integrity sha1-MHXOk7whuPq0PhvE2n6BFe0ee6s= +nan@^2.14.0: + version "2.14.1" + resolved "https://registry.yarnpkg.com/nan/-/nan-2.14.1.tgz#d7be34dfa3105b91494c3147089315eff8874b01" + integrity sha512-isWHgVjnFjh2x2yuJ/tj3JbwoHu3UC2dX5G/88Cm24yB6YopVgxvBObDY7n5xW6ExmFhJpSEQqFPvq9zaXc8Jw== + natural-compare@^1.4.0: version "1.4.0" resolved "https://registry.yarnpkg.com/natural-compare/-/natural-compare-1.4.0.tgz#4abebfeed7541f2c27acfb29bdbbd15c8d5ba4f7" @@ -1991,7 +2004,7 @@ openid-client@2.5.0: p-any "^1.1.0" "openpaidbsdk@file:../sdk": - version "1.0.0" + version "1.0.1" dependencies: pg "^8.2.1" sequelize "5.21.3"