Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
fix

fix

fix

fix

fix

fix

fix

fix

Add index on task uid instead of framework name and attempt index (#4938)

fix

fix

fix

fix

fix

fix

fix
  • Loading branch information
hzy46 committed Sep 29, 2020
1 parent a3be56f commit 8693cf4
Show file tree
Hide file tree
Showing 10 changed files with 308 additions and 6 deletions.
12 changes: 11 additions & 1 deletion src/database-controller/config/database-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ write-merger-max-heap-mb: 2048
# db poller
# Polling interval of database poller. Default value is 120.
db-poller-interval-second: 120
# Max connection number to database in write merger.
# Max connection number to database in poller.
db-poller-max-db-connection: 10
# Max rpc concurrency for db poller
db-poller-max-rpc-concurrency: 50
Expand All @@ -39,3 +39,13 @@ db-poller-max-heap-mb: 4096
framework-watcher-max-rpc-concurrency: 150
# Framework watcher max heap size in MB
framework-watcher-max-heap-mb: 8192

# cluster event watcher
# Max rpc concurrency for cluster event watcher
cluster-event-watcher-max-rpc-concurrency: 40
# cluster event watcher max heap size in MB
cluster-event-watcher-max-heap-mb: 2048
# Max connection number to database in cluster event watcher.
cluster-event-max-db-connection: 40
# Max disk usage in internal storage for cluster event watcher
cluster-event-watcher-max-disk-usage-percent: 80
57 changes: 57 additions & 0 deletions src/database-controller/deploy/database-controller.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,59 @@ spec:
- name: MAX_RPC_CONCURRENCY
value: "{{ cluster_cfg['database-controller']['framework-watcher-max-rpc-concurrency'] }}"
command: ["node", "--max-old-space-size={{ cluster_cfg['database-controller']['framework-watcher-max-heap-mb'] }}", "watcher/framework/index.js"]
- name: cluster-event-watcher
image: {{ cluster_cfg["cluster"]["docker-registry"]["prefix"] }}database-controller:{{ cluster_cfg["cluster"]["docker-registry"]["tag"] }}
imagePullPolicy: Always
volumeMounts:
- name: internal-data-dir
mountPath: /paiInternal/storage
mountPropagation: "None"
env:
# Log level of all logs. You can choose from error, warn, info, http, verbose, debug, and silly. Default value is info.
- name: LOG_LEVEL
value: "{{ cluster_cfg['database-controller']['log-level'] }}"
# The global timeout for all calls to Kubernetes API server. Default value is 120.
- name: K8S_CONNECTION_TIMEOUT_SECOND
value: "{{ cluster_cfg['database-controller']['k8s-connection-timeout-second'] }}"
# The timeout for calls to write merger. Default value is 120.
- name: WRITE_MERGER_CONNECTION_TIMEOUT_SECOND
value: "{{ cluster_cfg['database-controller']['write-merger-connection-timeout-second'] }}"
# Whether to enable retain mode.
# If someone submits a framework directly without accessing database, we can find the framework in write merger.
# For these frameworks, if retain mode is on, we will ignore them.
# If retain mode is off (it is the default setting), we will delete the frameworks to maintain ground-truth in database.
- name: RETAIN_MODE_ENABLED
{% if cluster_cfg['database-controller']['retain-mode'] %}
value: "true"
{% else %}
value: "false"
{% endif %}
# If RBAC is set up in current environment.
# If RBAC_IN_CLUSTER=true, the API Server client can read all settings automatically in container.
# If RBAC_IN_CLUSTER=false, we should set CUSTOM_K8S_API_SERVER_URL.
{% if cluster_cfg['cluster']['common']['k8s-rbac'] != 'false' %}
- name: RBAC_IN_CLUSTER
value: "true"
{% else %}
- name: RBAC_IN_CLUSTER
value: "false"
- name: CUSTOM_K8S_API_SERVER_URL
value: {{ cluster_cfg['layout']['kubernetes']['api-servers-url'] }}
{% endif %}
# The database client string. It follows the format "<database type>://<user>:<password>@<host>:<port>/<database name>"
- name: DB_CONNECTION_STR
value: "{{ cluster_cfg['postgresql']['connection-str'] }}"
# Max connection number to database in write merger. Default value is 10.
- name: MAX_DB_CONNECTION
value: "{{ cluster_cfg['database-controller']['db-poller-max-db-connection'] }}"
# Max rpc concurrency for cluster event watcher
- name: MAX_RPC_CONCURRENCY
value: "{{ cluster_cfg['database-controller']['cluster-event-watcher-max-rpc-concurrency'] }}"
- name: DISK_PATH
value: "/paiInternal/storage"
- name: MAX_DISK_USAGE_PERCENT
value: "{{ cluster_cfg['database-controller']['cluster-event-watcher-max-disk-usage-percent'] }}"
command: ["node", "--max-old-space-size={{ cluster_cfg['database-controller']['cluster-event-watcher-max-heap-mb'] }}", "watcher/cluster-event/index.js"]
- name: poller
image: {{ cluster_cfg["cluster"]["docker-registry"]["prefix"] }}database-controller:{{ cluster_cfg["cluster"]["docker-registry"]["tag"] }}
imagePullPolicy: Always
Expand Down Expand Up @@ -209,5 +262,9 @@ spec:
- name: MAX_RPC_CONCURRENCY
value: "{{ cluster_cfg['database-controller']['db-poller-max-rpc-concurrency'] }}"
command: ["node", "--max-old-space-size={{ cluster_cfg['database-controller']['db-poller-max-heap-mb'] }}", "poller/index.js"]
volumes:
- name: internal-data-dir
hostPath:
path: '{{ cluster_cfg["internal-storage"]["root-path"] }}/storage'
imagePullSecrets:
- name: {{ cluster_cfg["cluster"]["docker-registry"]["secret-name"] }}
12 changes: 11 additions & 1 deletion src/database-controller/sdk/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class DatabaseModel {
indexes: [
{
unique: false,
fields: ['frameworkName', 'attemptIndex'],
fields: ['taskUid'],
},
],
freezeTableName: true,
Expand Down Expand Up @@ -211,11 +211,21 @@ class DatabaseModel {
type: Sequelize.STRING(64),
allowNull: false,
},
podUid: Sequelize.STRING(36),
taskroleName: Sequelize.STRING(256),
taskName: Sequelize.STRING(256),
taskIndex: Sequelize.INTEGER,
type: {
type: Sequelize.STRING(32),
allowNull: false,
},
reason: Sequelize.STRING(64),
message: Sequelize.TEXT,
firstTimestamp: Sequelize.DATE,
lastTimestamp: Sequelize.DATE,
count: Sequelize.INTEGER,
sourceComponent: Sequelize.STRING(255),
sourceHost: Sequelize.STRING(255),
event: Sequelize.TEXT,
},
{
Expand Down
12 changes: 12 additions & 0 deletions src/database-controller/src/common/init.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
const logger = require('@dbc/common/logger');

// We doesn't allow unhandled promise rejection.
// It will make the watcher stop all actions (in some rare cases).
process.on('unhandledRejection', function(reason, p) {
logger.error(
`Encounter unhandled rejection of promise, reason: ${reason}`,
function() {
process.exit(1);
},
);
});
22 changes: 20 additions & 2 deletions src/database-controller/src/common/k8s.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,25 @@ function getFrameworkInformer(
return informer;
}

const coreV1Client = kc.makeApiClient(k8s.CoreV1Api);

function getEventInformer(timeoutSeconds = 365 * 86400, namespace = 'default') {
/*
The usage is very like `getFrameworkInformer`. Please see the comments of `getFrameworkInformer` for reference.
*/
const listFn = () => {
logger.info('Cluster events are listed.');
return coreV1Client.listNamespacedEvent(namespace);
};
const informer = k8s.makeInformer(
kc,
`/api/v1/namespaces/${namespace}/events?timeoutSeconds=${timeoutSeconds}`,
listFn,
);
return informer;
}

const priorityClassClient = kc.makeApiClient(k8s.SchedulingV1Api);

async function createPriorityClass(priorityClassDef) {
Expand All @@ -162,8 +181,6 @@ async function deletePriorityClass(name) {
return res.response;
}

const coreV1Client = kc.makeApiClient(k8s.CoreV1Api);

async function createSecret(secretDef) {
const res = await coreV1Client.createNamespacedSecret(
secretDef.metadata.namespace,
Expand Down Expand Up @@ -255,4 +272,5 @@ module.exports = {
timeoutMs,
),
getFrameworkInformer: getFrameworkInformer,
getEventInformer: getEventInformer,
};
1 change: 1 addition & 0 deletions src/database-controller/src/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"body-parser": "^1.19.0",
"compression": "^1.7.4",
"cors": "^2.8.5",
"diskusage": "^1.1.3",
"dotenv": "^8.2.0",
"express": "^4.17.1",
"http-errors": "^1.8.0",
Expand Down
41 changes: 41 additions & 0 deletions src/database-controller/src/watcher/cluster-event/config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

const basicConfig = require('@dbc/common/config');
const _ = require('lodash');
const Joi = require('joi');

const configSchema = Joi.object()
.keys({
dbConnectionStr: Joi.string().required(),
maxDatabaseConnection: Joi.number()
.integer()
.required(),
maxRpcConcurrency: Joi.number()
.integer()
.required(),
diskPath: Joi.string().required(),
diskCheckIntervalSecond: Joi.number()
.integer()
.required(),
maxDiskUsagePercent: Joi.number()
.integer()
.required(),
})
.required();

const config = {
dbConnectionStr: process.env.DB_CONNECTION_STR,
maxDatabaseConnection: parseInt(process.env.MAX_DB_CONNECTION),
maxRpcConcurrency: parseInt(process.env.MAX_RPC_CONCURRENCY),
diskPath: process.env.DISK_PATH,
diskCheckIntervalSecond: 60,
maxDiskUsagePercent: parseInt(process.env.MAX_DISK_USAGE_PERCENT),
};

const { error, value } = Joi.validate(config, configSchema);
if (error) {
throw new Error(`Config error\n${error}`);
}

module.exports = _.assign(basicConfig, value);
139 changes: 139 additions & 0 deletions src/database-controller/src/watcher/cluster-event/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

require('module-alias/register');
require('dotenv').config();
const AsyncLock = require('async-lock');
const _ = require('lodash');
const DatabaseModel = require('openpaidbsdk');
const { default: PQueue } = require('p-queue');
const interval = require('interval-promise');
require('@dbc/common/init');
const logger = require('@dbc/common/logger');
const { getEventInformer } = require('@dbc/common/k8s');
const { alwaysRetryDecorator } = require('@dbc/common/util');
const disk = require('diskusage');
const config = require('@dbc/watcher/cluster-event/config');

// Here, we use AsyncLock to control the concurrency of events with the same uid;
// e.g. If one event has ADDED, MODIFED, and MODIFED incidents, we use AsyncLock
// to ensure they will be delivered to write-merger in order.
// In the same time, we use PQueue to control the concurrency of events with different uid;
// e.g. If there are event 1 ~ event 30000, only some of them can be processed concurrently.
const lock = new AsyncLock({ maxPending: Number.MAX_SAFE_INTEGER });
const queue = new PQueue({ concurrency: config.maxRpcConcurrency });
const databaseModel = new DatabaseModel(
config.dbConnectionStr,
config.maxDatabaseConnection,
);

async function synchronizeEvent(eventType, apiObject) {
// query db instead
const uid = apiObject.metadata.uid;
const names = apiObject.involvedObject.name.split('-');

const obj = {
uid: uid,
frameworkName: names[0],
podUid: apiObject.involvedObject.uid,
taskroleName: names[1],
taskName: apiObject.involvedObject.name,
taskIndex: parseInt(names[2]),
type: apiObject.type,
reason: apiObject.reason,
message: apiObject.message,
firstTimestamp: apiObject.firstTimestamp,
lastTimestamp: apiObject.lastTimestamp,
count: apiObject.count,
sourceComponent: _.get(apiObject, 'source.component', null),
sourceHost: _.get(apiObject, 'source.host', null),
event: JSON.stringify(apiObject),
};

databaseModel.FrameworkEvent.upsert(obj, { where: { uid: uid } });
}

const eventHandler = (eventType, apiObject) => {
/*
framework name-based lock + always retry
*/
const receivedTs = new Date().getTime();
const involvedObjKind = apiObject.involvedObject.kind;
const involvedObjName = apiObject.involvedObject.name;
const uid = apiObject.metadata.uid;
if (
involvedObjKind === 'Pod' &&
/^[a-z0-9]{32}-[A-Za-z0-9._~]+-[0-9]+$/.test(involvedObjName)
) {
logger.info(
`Cluster event type=${eventType} receivedTs=${receivedTs} uid=${uid} involvedObjKind=${involvedObjKind} involvedObjName=${involvedObjName} received.`,
);
lock.acquire(uid, () => {
return queue.add(
alwaysRetryDecorator(
() => synchronizeEvent(eventType, apiObject),
`Sync to database type=${eventType} receivedTs=${receivedTs} uid=${uid} involvedObjKind=${involvedObjKind} involvedObjName=${involvedObjName}`,
),
);
});
} else {
logger.info(
`Cluster Event type=${eventType} receivedTs=${receivedTs} uid=${uid} involvedObjKind=${involvedObjKind} involvedObjName=${involvedObjName} received but ignored.`,
);
}
};

async function assertDiskUsageHealthy() {
try {
const { available, total } = await disk.check(config.diskPath);
const currentUsage = ((total - available) / total) * 100;
logger.info(`Current internal storage usage is ${currentUsage}% .`);
if (currentUsage > config.maxDiskUsagePercent) {
logger.error(
`Internal storage usage exceeds ${config.maxDiskUsagePercent}%, exit.`,
function() {
process.exit(1);
},
);
}
} catch (err) {
logger.error(`Check disk usage fails, details: ${err}`, function() {
process.exit(1);
});
}
}

function startInformer() {
const informer = getEventInformer();

informer.on('add', apiObject => {
eventHandler('ADDED', apiObject);
});
informer.on('update', apiObject => {
eventHandler('MODIFED', apiObject);
});
informer.on('delete', apiObject => {
eventHandler('DELETED', apiObject);
});
informer.on('error', err => {
// If any error happens, the process should exit, and let Kubernetes restart it.
logger.error(err, function() {
process.exit(1);
});
});
informer.start();
}

function startDiskCheck() {
interval(assertDiskUsageHealthy, config.diskCheckIntervalSecond * 1000, {
stopOnError: false,
});
}

async function main() {
await assertDiskUsageHealthy();
startInformer();
startDiskCheck();
}

main();
1 change: 1 addition & 0 deletions src/database-controller/src/watcher/framework/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require('dotenv').config();
const fetch = require('node-fetch');
const AsyncLock = require('async-lock');
const { default: PQueue } = require('p-queue');
require('@dbc/common/init');
const logger = require('@dbc/common/logger');
const { getFrameworkInformer } = require('@dbc/common/k8s');
const { alwaysRetryDecorator } = require('@dbc/common/util');
Expand Down
Loading

0 comments on commit 8693cf4

Please sign in to comment.