Skip to content

Commit

Permalink
initialize e2e test suite using metadata
Browse files Browse the repository at this point in the history
Issue: BB-496
  • Loading branch information
Kerkesni committed Jan 10, 2025
1 parent 95df387 commit 707533a
Show file tree
Hide file tree
Showing 4 changed files with 387 additions and 0 deletions.
24 changes: 24 additions & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,27 @@ jobs:
docker compose --profile ${{ matrix.profile }} up -d --quiet-pull
bash ../../scripts/wait_for_local_port.bash 8000 120
working-directory: .github/dockerfiles/e2e
- name: Create Zookeeper paths for tests with metadata
run: |-
# Setup zookeeper paths for backbeat like we do in federation
docker exec e2e-kafka-1 /opt/kafka_2.11-0.10.1.0/bin/zookeeper-shell.sh localhost:2181 create /backbeat ""
docker exec e2e-kafka-1 /opt/kafka_2.11-0.10.1.0/bin/zookeeper-shell.sh localhost:2181 create /backbeat/replication-populator ""
docker exec e2e-kafka-1 /opt/kafka_2.11-0.10.1.0/bin/zookeeper-shell.sh localhost:2181 create /backbeat/replication-populator/raft-id-dispatcher ""
docker exec e2e-kafka-1 /opt/kafka_2.11-0.10.1.0/bin/zookeeper-shell.sh localhost:2181 create /backbeat/replication-populator/raft-id-dispatcher/owners ""
docker exec e2e-kafka-1 /opt/kafka_2.11-0.10.1.0/bin/zookeeper-shell.sh localhost:2181 create /backbeat/replication-populator/raft-id-dispatcher/leaders ""
docker exec e2e-kafka-1 /opt/kafka_2.11-0.10.1.0/bin/zookeeper-shell.sh localhost:2181 create /backbeat/replication-populator/raft-id-dispatcher/provisions ""
# provision raft ids, we configure 4 raft sessions in metadata config
docker exec e2e-kafka-1 /opt/kafka_2.11-0.10.1.0/bin/zookeeper-shell.sh localhost:2181 create /backbeat/replication-populator/raft-id-dispatcher/provisions/0 ""
docker exec e2e-kafka-1 /opt/kafka_2.11-0.10.1.0/bin/zookeeper-shell.sh localhost:2181 create /backbeat/replication-populator/raft-id-dispatcher/provisions/1 ""
docker exec e2e-kafka-1 /opt/kafka_2.11-0.10.1.0/bin/zookeeper-shell.sh localhost:2181 create /backbeat/replication-populator/raft-id-dispatcher/provisions/2 ""
docker exec e2e-kafka-1 /opt/kafka_2.11-0.10.1.0/bin/zookeeper-shell.sh localhost:2181 create /backbeat/replication-populator/raft-id-dispatcher/provisions/3 ""
if: ${{ matrix.profile == 's3c' }}
- name: Run E2E tests
env:
PROFILE: ${{ matrix.profile }}
run: yarn run cover:e2e
- uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
directory: ./coverage/e2e
flags: e2e
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
"garbage_collector": "node extensions/gc/service.js",
"test": "mocha --recursive tests/unit/oplogPopulator --timeout 30000",
"cover:test": "nyc --clean --silent yarn run test && nyc report --report-dir ./coverage/test --reporter=lcov",
"e2e": "mocha --recursive $(find tests/e2e -name '*.js') --timeout 30000",
"cover:e2e": "nyc --clean --silent yarn run e2e && nyc report --report-dir ./coverage/e2e --reporter=lcov",
"ft_test": "mocha --recursive $(find tests/functional -name '*.js') --timeout 30000",
"ft_test:notification": "mocha --recursive $(find tests/functional/notification -name '*.js') --timeout 30000",
"ft_test:replication": "mocha --recursive $(find tests/functional/replication -name '*.js') --timeout 30000",
Expand Down
171 changes: 171 additions & 0 deletions tests/e2e/queuePopulator/config/s3c-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
{
"zookeeper": {
"connectionString": "127.0.0.1:2181/backbeat",
"autoCreateNamespace": true
},
"kafka": {
"hosts": "127.0.0.1:9092",
"site": "0",
"backlogMetrics": {
"zkPath": "/run/kafka-backlog-metrics",
"intervalS": 60
}
},
"queuePopulator": {
"cronRule": "*/5 * * * * *",
"batchMaxRead": 10,
"batchTimeoutMs": 30,
"zookeeperPath": "/replication-populator",
"logSource": "bucketd",
"bucketd": {
"host": "localhost",
"port": 9000
},
"probeServer": {
"bindAddress": "127.0.0.1",
"port": 7900
}
},
"metrics": {
"topic": "backbeat-metrics"
},
"s3": {
"host": "127.0.0.1",
"port": 8000
},
"extensions": {
"replication": {
"source": {
"s3": {
"host": "127.0.0.1",
"port": 8000
},
"auth": {
"type": "role",
"vault": {
"host": "localhost",
"port": 8500,
"adminPort": 8600
}
}
},
"destination": {
"bootstrapList": [{
"site": "aws-location",
"servers": ["127.0.0.1:8001"],
"default": true
}],
"auth": {
"type": "role",
"vault": {}
}
},
"topic": "backbeat-replication",
"replicationStatusTopic": "backbeat-replication-status",
"replicationFailedTopic": "backbeat-replication-failed",
"monitorReplicationFailures": true,
"monitorReplicationFailureExpiryTimeS": 86400,
"queueProcessor": {
"groupId": "backbeat-replication-group",
"mpuPartsConcurrency": 10,
"concurrency": 10,
"probeServer": [],
"sourceCheckIfSizeGreaterThanMB": 100
},
"replayTopics": [
{
"topicName": "backbeat-replication-replay-0",
"retries": "5"
}
],
"replayProcessor": {
"probeServer": []
},
"replicationStatusProcessor": {
"groupId": "backbeat-replication-group",
"probeServer": {
"bindAddress": "127.0.0.1",
"port": 7700
}
}
},
"lifecycle": {
"forceLegacyListing": false,
"auth": {
"type": "assumeRole",
"roleName": "scality-internal/service-backbeat-lifecycle-1",
"sts": {
"host": "localhost",
"port": 8650,
"accessKey": "OG4509TSQHQNCSQODDTV",
"secretKey": "IxO6u+r6IR21xXfoxcHvXmHqVLyezL9x7Yx2PjHG"
},
"vault": {
"host": "localhost",
"port": 8500
}
},
"zookeeperPath": "/lifecycle",
"bucketTasksTopic": "backbeat-lifecycle-bucket-tasks",
"objectTasksTopic": "backbeat-lifecycle-object-tasks",
"conductor": {
"backlogControl": {
"enabled": true
},
"bucketSource": "bucketd",
"bucketd": {
"host": "localhost",
"port": 9000
},
"cronRule": "5 * * * * * *",
"concurrency": 1000,
"probeServer": {
"bindAddress": "127.0.0.1",
"port": 7200
}
},
"bucketProcessor": {
"groupId": "backbeat-lifecycle-bucket-processor-group",
"concurrency": 1,
"probeServer": {
"bindAddress": "127.0.0.1",
"port": 7300
}
},
"objectProcessor": {
"groupId": "backbeat-lifecycle-object-processor-group",
"concurrency": 20,
"probeServer": {
"bindAddress": "127.0.0.1",
"port": 7400
}
},
"supportedLifecycleRules": [
"expiration",
"noncurrentVersionExpiration",
"abortIncompleteMultipartUpload"
]
}
},
"log": {
"logLevel": "info",
"dumpLevel": "error"
},
"server": {
"healthChecks": {
"allowFrom": [
"127.0.0.1/8",
"::1"
]
},
"host": "localhost",
"port": 8900
},
"certFilePaths": {},
"redis": {
"name": "scality-s3",
"password": "",
"host": "127.0.0.1",
"port": 6379
}
}
190 changes: 190 additions & 0 deletions tests/e2e/queuePopulator/queuePopulator.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
process.env.BACKBEAT_CONFIG_FILE = 'tests/e2e/queuePopulator/config/s3c-config.json';

const assert = require('assert');
const async = require('async');
const AWS = require('aws-sdk');

const config = require('../../../lib/Config');
const zkConfig = config.zookeeper;
const kafkaConfig = config.kafka;
const extConfigs = config.extensions;
const qpConfig = config.queuePopulator;
const httpsConfig = config.internalHttps;
const mConfig = config.metrics;
const rConfig = config.redis;
const vConfig = config.vaultAdmin;

const QueuePopulator = require('../../../lib/queuePopulator/QueuePopulator');

const S3 = AWS.S3;
const s3config = {
endpoint: `http://${config.s3.host}:${config.s3.port}`,
s3ForcePathStyle: true,
credentials: new AWS.Credentials('accessKey1', 'verySecretKey1'),
};

const maxRead = qpConfig.batchMaxRead;
const timeoutMs = qpConfig.batchTimeoutMs;

class S3Helper {
constructor(client) {
this.s3 = client;
this.bucket = undefined;

this._scenario = [
{
keyNames: ['object-1', 'object-2', 'object-3'],
},
];
}

setAndCreateBucket(name, cb) {
this.bucket = name;
this.s3.createBucket({
Bucket: name,
}, err => {
assert.ifError(err);
cb();
});
}

setBucketVersioning(status, cb) {
this.s3.putBucketVersioning({
Bucket: this.bucket,
VersioningConfiguration: {
Status: status,
},
}, cb);
}

createObjects(scenarioNumber, cb) {
async.forEachOf(this._scenario[scenarioNumber].keyNames,
(key, i, done) => {
this.s3.putObject({
Body: '',
Bucket: this.bucket,
Key: key,
}, done);
}, err => {
assert.ifError(err);
return cb();
});
}

createVersions(scenarioNumber, cb) {
async.series([
next => this.setBucketVersioning('Enabled', next),
next => this.createObjects(scenarioNumber, next),
], err => {
assert.ifError(err);
return cb();
});
}

emptyAndDeleteBucket(cb) {
if (!this.bucket) {
return cb();
}
return async.waterfall([
next => this.s3.getBucketVersioning({ Bucket: this.bucket }, next),
(data, next) => {
if (data.Status === 'Enabled' || data.Status === 'Suspended') {
// listObjectVersions
return this.s3.listObjectVersions({
Bucket: this.bucket,
}, (err, data) => {
assert.ifError(err);

const list = [
...data.Versions.map(v => ({
Key: v.Key,
VersionId: v.VersionId,
})),
...data.DeleteMarkers.map(dm => ({
Key: dm.Key,
VersionId: dm.VersionId,
})),
];

if (list.length === 0) {
return next(null, null);
}

return this.s3.deleteObjects({
Bucket: this.bucket,
Delete: { Objects: list },
}, next);
});
}

return this.s3.listObjects({ Bucket: this.bucket },
(err, data) => {
assert.ifError(err);

const list = data.Contents.map(c => ({ Key: c.Key }));

return this.s3.deleteObjects({
Bucket: this.bucket,
Delete: { Objects: list },
}, next);
});
},
(data, next) => this.s3.deleteBucket({ Bucket: this.bucket }, next),
], cb);
}

setBucketReplicationConfigurations(cb) {
const params = {
Bucket: this.bucket,
ReplicationConfiguration: {
Role: 'arn:aws:iam::0000:role/role-src,arn:aws:iam::0000:role/role-dst',
Rules: [{
Destination: {
Bucket: 'arn:aws:s3:::destination-bucket',
},
Prefix: '',
Status: 'Enabled'
}]
}
};
return this.s3.putBucketReplication(params, cb);
}
}

describe('Queue Populator', () => {
let qp;
let s3;
let s3Helper;

before(done => {
s3 = new S3(s3config);
s3Helper = new S3Helper(s3);
qp = new QueuePopulator(zkConfig, kafkaConfig, qpConfig,
httpsConfig, mConfig, rConfig, vConfig, extConfigs);
qp.open(done);
});

afterEach(done => {
s3Helper.emptyAndDeleteBucket(err => {
assert.ifError(err);
done();
});
});

after(done => {
qp.close(done);
});

it('should process log entries without failure', done => {
async.series([
next => s3Helper.setAndCreateBucket('bucket-1', next),
next => s3Helper.setBucketVersioning('Enabled', next),
next => s3Helper.setBucketReplicationConfigurations(next),
next => s3Helper.createVersions(0, next),
next => qp.processLogEntries({ maxRead, timeoutMs }, next),
], err => {
assert.ifError(err);
done();
});
});
});

0 comments on commit 707533a

Please sign in to comment.