Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
[display event] add event watcher in database controller (#4939)
Browse files Browse the repository at this point in the history
* fix

fix

fix

fix

fix

fix

fix

fix

fix

Add index on task uid instead of framework name and attempt index (#4938)

fix

fix

fix

fix

fix

fix

fix

* fix

* fix

* fix
  • Loading branch information
hzy46 authored Oct 13, 2020
1 parent 1163cb8 commit 16f55e5
Show file tree
Hide file tree
Showing 10 changed files with 308 additions and 5 deletions.
12 changes: 11 additions & 1 deletion src/database-controller/config/database-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ write-merger-max-heap-mb: 2048
# db poller
# Polling interval of database poller. Default value is 120.
db-poller-interval-second: 120
# Max connection number to database in write merger.
# Max connection number to database in poller.
db-poller-max-db-connection: 10
# Max rpc concurrency for db poller
db-poller-max-rpc-concurrency: 50
Expand All @@ -39,3 +39,13 @@ db-poller-max-heap-mb: 4096
framework-watcher-max-rpc-concurrency: 150
# Framework watcher max heap size in MB
framework-watcher-max-heap-mb: 8192

# cluster event watcher
# Max rpc concurrency for cluster event watcher
cluster-event-watcher-max-rpc-concurrency: 40
# cluster event watcher max heap size in MB
cluster-event-watcher-max-heap-mb: 2048
# Max connection number to database in cluster event watcher.
cluster-event-max-db-connection: 40
# Max disk usage in internal storage for cluster event watcher
cluster-event-watcher-max-disk-usage-percent: 80
58 changes: 58 additions & 0 deletions src/database-controller/deploy/database-controller.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,60 @@ spec:
- name: MAX_RPC_CONCURRENCY
value: "{{ cluster_cfg['database-controller']['framework-watcher-max-rpc-concurrency'] }}"
command: ["node", "--max-old-space-size={{ cluster_cfg['database-controller']['framework-watcher-max-heap-mb'] }}", "watcher/framework/index.js"]
- name: cluster-event-watcher
image: {{ cluster_cfg["cluster"]["docker-registry"]["prefix"] }}database-controller:{{ cluster_cfg["cluster"]["docker-registry"]["tag"] }}
imagePullPolicy: Always
volumeMounts:
- name: internal-data-dir
mountPath: /paiInternal/storage
readOnly: true
mountPropagation: "None"
env:
# Log level of all logs. You can choose from error, warn, info, http, verbose, debug, and silly. Default value is info.
- name: LOG_LEVEL
value: "{{ cluster_cfg['database-controller']['log-level'] }}"
# The global timeout for all calls to Kubernetes API server. Default value is 120.
- name: K8S_CONNECTION_TIMEOUT_SECOND
value: "{{ cluster_cfg['database-controller']['k8s-connection-timeout-second'] }}"
# The timeout for calls to write merger. Default value is 120.
- name: WRITE_MERGER_CONNECTION_TIMEOUT_SECOND
value: "{{ cluster_cfg['database-controller']['write-merger-connection-timeout-second'] }}"
# Whether to enable retain mode.
# If someone submits a framework directly without accessing database, we can find the framework in write merger.
# For these frameworks, if retain mode is on, we will ignore them.
# If retain mode is off (it is the default setting), we will delete the frameworks to maintain ground-truth in database.
- name: RETAIN_MODE_ENABLED
{% if cluster_cfg['database-controller']['retain-mode'] %}
value: "true"
{% else %}
value: "false"
{% endif %}
# If RBAC is set up in current environment.
# If RBAC_IN_CLUSTER=true, the API Server client can read all settings automatically in container.
# If RBAC_IN_CLUSTER=false, we should set CUSTOM_K8S_API_SERVER_URL.
{% if cluster_cfg['cluster']['common']['k8s-rbac'] != 'false' %}
- name: RBAC_IN_CLUSTER
value: "true"
{% else %}
- name: RBAC_IN_CLUSTER
value: "false"
- name: CUSTOM_K8S_API_SERVER_URL
value: {{ cluster_cfg['layout']['kubernetes']['api-servers-url'] }}
{% endif %}
# The database client string. It follows the format "<database type>://<user>:<password>@<host>:<port>/<database name>"
- name: DB_CONNECTION_STR
value: "{{ cluster_cfg['postgresql']['connection-str'] }}"
# Max connection number to database in write merger. Default value is 10.
- name: MAX_DB_CONNECTION
value: "{{ cluster_cfg['database-controller']['db-poller-max-db-connection'] }}"
# Max rpc concurrency for cluster event watcher
- name: MAX_RPC_CONCURRENCY
value: "{{ cluster_cfg['database-controller']['cluster-event-watcher-max-rpc-concurrency'] }}"
- name: DISK_PATH
value: "/paiInternal/storage"
- name: MAX_DISK_USAGE_PERCENT
value: "{{ cluster_cfg['database-controller']['cluster-event-watcher-max-disk-usage-percent'] }}"
command: ["node", "--max-old-space-size={{ cluster_cfg['database-controller']['cluster-event-watcher-max-heap-mb'] }}", "watcher/cluster-event/index.js"]
- name: poller
image: {{ cluster_cfg["cluster"]["docker-registry"]["prefix"] }}database-controller:{{ cluster_cfg["cluster"]["docker-registry"]["tag"] }}
imagePullPolicy: Always
Expand Down Expand Up @@ -209,5 +263,9 @@ spec:
- name: MAX_RPC_CONCURRENCY
value: "{{ cluster_cfg['database-controller']['db-poller-max-rpc-concurrency'] }}"
command: ["node", "--max-old-space-size={{ cluster_cfg['database-controller']['db-poller-max-heap-mb'] }}", "poller/index.js"]
volumes:
- name: internal-data-dir
hostPath:
path: '{{ cluster_cfg["internal-storage"]["root-path"] }}/storage'
imagePullSecrets:
- name: {{ cluster_cfg["cluster"]["docker-registry"]["secret-name"] }}
10 changes: 10 additions & 0 deletions src/database-controller/sdk/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,21 @@ class DatabaseModel {
type: Sequelize.STRING(64),
allowNull: false,
},
podUid: Sequelize.STRING(36),
taskroleName: Sequelize.STRING(256),
taskName: Sequelize.STRING(256),
taskIndex: Sequelize.INTEGER,
type: {
type: Sequelize.STRING(32),
allowNull: false,
},
reason: Sequelize.STRING(64),
message: Sequelize.TEXT,
firstTimestamp: Sequelize.DATE,
lastTimestamp: Sequelize.DATE,
count: Sequelize.INTEGER,
sourceComponent: Sequelize.STRING(255),
sourceHost: Sequelize.STRING(255),
event: Sequelize.TEXT,
},
{
Expand Down
12 changes: 12 additions & 0 deletions src/database-controller/src/common/init.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
const logger = require('@dbc/common/logger');

// We doesn't allow unhandled promise rejection.
// It will make the watcher stop all actions (in some rare cases).
process.on('unhandledRejection', function(reason, p) {
logger.error(
`Encounter unhandled rejection of promise, reason: ${reason}`,
function() {
process.exit(1);
},
);
});
22 changes: 20 additions & 2 deletions src/database-controller/src/common/k8s.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,25 @@ function getFrameworkInformer(
return informer;
}

const coreV1Client = kc.makeApiClient(k8s.CoreV1Api);

function getEventInformer(timeoutSeconds = 365 * 86400, namespace = 'default') {
/*
The usage is very like `getFrameworkInformer`. Please see the comments of `getFrameworkInformer` for reference.
*/
const listFn = () => {
logger.info('Cluster events are listed.');
return coreV1Client.listNamespacedEvent(namespace);
};
const informer = k8s.makeInformer(
kc,
`/api/v1/namespaces/${namespace}/events?timeoutSeconds=${timeoutSeconds}`,
listFn,
);
return informer;
}

const priorityClassClient = kc.makeApiClient(k8s.SchedulingV1Api);

async function createPriorityClass(priorityClassDef) {
Expand All @@ -162,8 +181,6 @@ async function deletePriorityClass(name) {
return res.response;
}

const coreV1Client = kc.makeApiClient(k8s.CoreV1Api);

async function createSecret(secretDef) {
const res = await coreV1Client.createNamespacedSecret(
secretDef.metadata.namespace,
Expand Down Expand Up @@ -255,4 +272,5 @@ module.exports = {
timeoutMs,
),
getFrameworkInformer: getFrameworkInformer,
getEventInformer: getEventInformer,
};
1 change: 1 addition & 0 deletions src/database-controller/src/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"body-parser": "^1.19.0",
"compression": "^1.7.4",
"cors": "^2.8.5",
"diskusage": "^1.1.3",
"dotenv": "^8.2.0",
"express": "^4.17.1",
"http-errors": "^1.8.0",
Expand Down
41 changes: 41 additions & 0 deletions src/database-controller/src/watcher/cluster-event/config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

const basicConfig = require('@dbc/common/config');
const _ = require('lodash');
const Joi = require('joi');

const configSchema = Joi.object()
.keys({
dbConnectionStr: Joi.string().required(),
maxDatabaseConnection: Joi.number()
.integer()
.required(),
maxRpcConcurrency: Joi.number()
.integer()
.required(),
diskPath: Joi.string().required(),
diskCheckIntervalSecond: Joi.number()
.integer()
.required(),
maxDiskUsagePercent: Joi.number()
.integer()
.required(),
})
.required();

const config = {
dbConnectionStr: process.env.DB_CONNECTION_STR,
maxDatabaseConnection: parseInt(process.env.MAX_DB_CONNECTION),
maxRpcConcurrency: parseInt(process.env.MAX_RPC_CONCURRENCY),
diskPath: process.env.DISK_PATH,
diskCheckIntervalSecond: 60,
maxDiskUsagePercent: parseInt(process.env.MAX_DISK_USAGE_PERCENT),
};

const { error, value } = Joi.validate(config, configSchema);
if (error) {
throw new Error(`Config error\n${error}`);
}

module.exports = _.assign(basicConfig, value);
139 changes: 139 additions & 0 deletions src/database-controller/src/watcher/cluster-event/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

require('module-alias/register');
require('dotenv').config();
const AsyncLock = require('async-lock');
const _ = require('lodash');
const DatabaseModel = require('openpaidbsdk');
const { default: PQueue } = require('p-queue');
const interval = require('interval-promise');
require('@dbc/common/init');
const logger = require('@dbc/common/logger');
const { getEventInformer } = require('@dbc/common/k8s');
const { alwaysRetryDecorator } = require('@dbc/common/util');
const disk = require('diskusage');
const config = require('@dbc/watcher/cluster-event/config');

// Here, we use AsyncLock to control the concurrency of events with the same uid;
// e.g. If one event has ADDED, MODIFED, and MODIFED incidents, we use AsyncLock
// to ensure they will be delivered to write-merger in order.
// In the same time, we use PQueue to control the concurrency of events with different uid;
// e.g. If there are event 1 ~ event 30000, only some of them can be processed concurrently.
const lock = new AsyncLock({ maxPending: Number.MAX_SAFE_INTEGER });
const queue = new PQueue({ concurrency: config.maxRpcConcurrency });
const databaseModel = new DatabaseModel(
config.dbConnectionStr,
config.maxDatabaseConnection,
);

async function synchronizeEvent(eventType, apiObject) {
// query db instead
const uid = apiObject.metadata.uid;
const names = apiObject.involvedObject.name.split('-');

const obj = {
uid: uid,
frameworkName: names[0],
podUid: apiObject.involvedObject.uid,
taskroleName: names[1],
taskName: apiObject.involvedObject.name,
taskIndex: parseInt(names[2]),
type: apiObject.type,
reason: apiObject.reason,
message: apiObject.message,
firstTimestamp: apiObject.firstTimestamp,
lastTimestamp: apiObject.lastTimestamp,
count: apiObject.count,
sourceComponent: _.get(apiObject, 'source.component', null),
sourceHost: _.get(apiObject, 'source.host', null),
event: JSON.stringify(apiObject),
};

databaseModel.FrameworkEvent.upsert(obj, { where: { uid: uid } });
}

const eventHandler = (eventType, apiObject) => {
/*
event uid-based lock + always retry
*/
const receivedTs = new Date().getTime();
const involvedObjKind = apiObject.involvedObject.kind;
const involvedObjName = apiObject.involvedObject.name;
const uid = apiObject.metadata.uid;
if (
involvedObjKind === 'Pod' &&
/^[a-z0-9]{32}-[A-Za-z0-9._~]+-[0-9]+$/.test(involvedObjName)
) {
logger.info(
`Cluster event type=${eventType} receivedTs=${receivedTs} uid=${uid} involvedObjKind=${involvedObjKind} involvedObjName=${involvedObjName} received.`,
);
lock.acquire(uid, () => {
return queue.add(
alwaysRetryDecorator(
() => synchronizeEvent(eventType, apiObject),
`Sync to database type=${eventType} receivedTs=${receivedTs} uid=${uid} involvedObjKind=${involvedObjKind} involvedObjName=${involvedObjName}`,
),
);
});
} else {
logger.info(
`Cluster Event type=${eventType} receivedTs=${receivedTs} uid=${uid} involvedObjKind=${involvedObjKind} involvedObjName=${involvedObjName} received but ignored.`,
);
}
};

async function assertDiskUsageHealthy() {
try {
const { available, total } = await disk.check(config.diskPath);
const currentUsage = ((total - available) / total) * 100;
logger.info(`Current internal storage usage is ${currentUsage}% .`);
if (currentUsage > config.maxDiskUsagePercent) {
logger.error(
`Internal storage usage exceeds ${config.maxDiskUsagePercent}%, exit.`,
function() {
process.exit(1);
},
);
}
} catch (err) {
logger.error(`Check disk usage fails, details: ${err}`, function() {
process.exit(1);
});
}
}

function startInformer() {
const informer = getEventInformer();

informer.on('add', apiObject => {
eventHandler('ADDED', apiObject);
});
informer.on('update', apiObject => {
eventHandler('MODIFED', apiObject);
});
informer.on('delete', apiObject => {
eventHandler('DELETED', apiObject);
});
informer.on('error', err => {
// If any error happens, the process should exit, and let Kubernetes restart it.
logger.error(err, function() {
process.exit(1);
});
});
informer.start();
}

function startDiskCheck() {
interval(assertDiskUsageHealthy, config.diskCheckIntervalSecond * 1000, {
stopOnError: false,
});
}

async function main() {
await assertDiskUsageHealthy();
startInformer();
startDiskCheck();
}

main();
1 change: 1 addition & 0 deletions src/database-controller/src/watcher/framework/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require('dotenv').config();
const fetch = require('node-fetch');
const AsyncLock = require('async-lock');
const { default: PQueue } = require('p-queue');
require('@dbc/common/init');
const logger = require('@dbc/common/logger');
const { getFrameworkInformer } = require('@dbc/common/k8s');
const { alwaysRetryDecorator } = require('@dbc/common/util');
Expand Down
Loading

0 comments on commit 16f55e5

Please sign in to comment.