Skip to content

Commit

Permalink
Add pattern to poll trigger (#114)
Browse files Browse the repository at this point in the history
Add a file filter to the "Poll Files" trigger
  • Loading branch information
shulkaolka authored Jun 8, 2022
1 parent fc8b5b6 commit 1e9094b
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 8 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 1.4.9 (June 03, 2022)
* Add a file filter to the `Poll Files` trigger

## 1.4.8 (April 20, 2022)
* Fix memory leak for `Download Files/File by name` actions and `Read Files` trigger
* Get rid of vulnerabilities in dependencies
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ The following configuration fields are available:
* **Emit Behaviour**: Options are: default is `Emit Individually` emits each object in separate message, `Fetch All` emits all objects in one message
* **Start Time**: Start datetime of polling. Default min date:`-271821-04-20T00:00:00.000Z`
* **End Time**: End datetime of polling. Default max date: `+275760-09-13T00:00:00.000Z`
* **Pattern**: Optional regex pattern for file names. If no pattern is given, no matching is done.


#### Expected output metadata
Expand Down Expand Up @@ -662,7 +663,7 @@ Schema of output metadata depends on Behaviour configuration:

`type` field represents type of the file. You can find additional information about Unix file types [below](#ssh2-sftp-client-api-and-documentation-links);

#### Known limitations
#### Action Known limitations
Action does not support `Fetch Page` mode (according to OIH standards)


Expand Down
9 changes: 8 additions & 1 deletion component.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"title": "SFTP",
"buildType": "docker",
"description": "Provides file access and transfer using SSH File Transfer Protocol",
"version": "1.4.8",
"version": "1.4.9-dev.1",
"credentials": {
"fields": {
"host": {
Expand Down Expand Up @@ -402,6 +402,7 @@
"main": "./lib/triggers/read.js",
"title": "Read Files",
"type": "polling",
"deprecated": true,
"help": {
"description": "Will continuously poll remote SFTP location for files that match given pattern. Found files will be transferred as attachments to the next component",
"link": "/components/sftp/triggers#read-files"
Expand Down Expand Up @@ -453,6 +454,12 @@
"required": true,
"placeholder": "Directory"
},
"pattern": {
"viewClass": "TextFieldView",
"label": "Pattern to Match Files",
"required": false,
"placeholder": "Pattern"
},
"emitBehaviour": {
"label": "Emit Behaviour",
"viewClass": "SelectView",
Expand Down
4 changes: 3 additions & 1 deletion lib/utils/pollingUtil.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ class SftpPolling extends PollingTrigger {
async getObjects({ startTime, endTime, cfg }) {
const formattedStartTime = new Date(startTime);
const formattedEndTime = new Date(endTime);
const regExp = new RegExp(cfg.pattern || '');
const fileList = await this.client.list(cfg.directory);
return fileList
.filter((file) => file.type === '-')
.filter((file) => new Date(file.modifyTime) >= formattedStartTime)
.filter((file) => new Date(file.modifyTime) < formattedEndTime);
.filter((file) => new Date(file.modifyTime) < formattedEndTime)
.filter((file) => regExp.test(file.name));
}

async emitIndividually(files) {
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 53 additions & 4 deletions spec-integration/pollingIntegration.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ require('dotenv').config();

const { expect } = require('chai');
const EventEmitter = require('events');
const { getLogger } = require('@elastic.io/component-commons-library');
const { getLogger, AttachmentProcessor } = require('@elastic.io/component-commons-library');
const nock = require('nock');
const sinon = require('sinon');
const Sftp = require('../lib/Sftp');
const upload = require('../lib/actions/upload');
const poll = require('../lib/triggers/polling');
Expand Down Expand Up @@ -36,8 +37,14 @@ describe('SFTP integration test - polling', () => {
let port;
let directory;
let sender;
let uploadAttachment;
const testNumber = Math.floor(Math.random() * 10000);

beforeEach(async () => {
sftp = new Sftp(logger, cfg);
await sftp.connect();
});

before(async () => {
if (!process.env.SFTP_HOSTNAME) {
throw new Error('Please set SFTP_HOSTNAME env variable to proceed');
Expand All @@ -54,9 +61,9 @@ describe('SFTP integration test - polling', () => {
port,
directory,
};
sftp = new Sftp(logger, cfg);
await sftp.connect();
sender = new TestEmitter();
const uploadResult = { config: { url: '/hello/world' }, data: { objectId: 1111 } };
uploadAttachment = sinon.stub(AttachmentProcessor.prototype, 'uploadAttachment').resolves(uploadResult);
});

it('Uploads and poll attachment', async () => {
Expand Down Expand Up @@ -92,7 +99,49 @@ describe('SFTP integration test - polling', () => {
await sftp.rmdir(cfg.directory, false);
});

after(async () => {
it('Uploads and poll filtered attachment', async () => {
nock('https://api.elastic.io/', { encodedQueryParams: true })
.post('/v2/resources/storage/signed-url')
.times(10)
.reply(200, { put_url: 'http://api.io/some', get_url: 'http://api.io/some' });
nock('http://api.io/', { encodedQueryParams: true })
.put('/some').times(10).reply(200, { signedUrl: { put_url: 'http://api.io/some' } });

const msg = {
body: { },
attachments: {
'logo.svg': {
url: 'https://app.elastic.io/img/logo.svg',
},
'logo2.svg': {
url: 'https://app.elastic.io/img/logo.svg',
},
},
};
const result = await upload.process.call(new TestEmitter(), msg, cfg);

expect(result.body.results).to.be.an('array');
expect(result.body.results.length).to.equal(2);
expect(result.body.results[0].attachment).to.equal('logo.svg');
expect(result.body.results[1].attachment).to.equal('logo2.svg');
const list = await sftp.list(cfg.directory);
expect(list.length).to.equal(2);
expect(list[1].name).to.equal('logo.svg');
expect(list[0].name).to.equal('logo2.svg');
expect(list[0].size).to.equal(4379);
cfg.pattern = 'logo.svg';
await poll.process.call(sender, {}, cfg);

expect(sender.data[0].body.path).to.equal(`${cfg.directory}logo.svg`);
expect(sender.data[0].body.size).to.equal(4379);

await sftp.delete(`${cfg.directory}logo2.svg`);
await sftp.delete(`${cfg.directory}logo.svg`);
await sftp.rmdir(cfg.directory, false);
});

afterEach(async () => {
await sftp.end();
uploadAttachment.restore();
});
});
198 changes: 198 additions & 0 deletions spec/triggers/polling.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
const sinon = require('sinon');
const chaiAsPromised = require('chai-as-promised');
const chai = require('chai');
const { AttachmentProcessor } = require('@elastic.io/component-commons-library');
const { getLogger } = require('@elastic.io/component-commons-library');

chai.use(chaiAsPromised);
const { expect } = require('chai');
const Sftp = require('../../lib/Sftp');
const trigger = require('../../lib/triggers/polling');

const logger = getLogger();
let self;

describe('SFTP test - polling trigger', () => {
const buffer = Buffer.from('Hello');
const res = { config: { url: 'https://url' }, data: { objectId: 1111 } };
const cfg = {
directory: 'www/test',
};
beforeEach(async () => {
self = {
emit: sinon.spy(),
logger,
};
});
afterEach(async () => {
sinon.restore();
});

it('Failed to connect', async () => {
const sftpClientConnectStub = sinon.stub(Sftp.prototype, 'connect').throws(new Error('Connection failed'));

await expect(trigger.process.call(self, {}, cfg)).be.rejectedWith('Connection failed');
expect(sftpClientConnectStub.calledOnce).to.be.equal(true);
sftpClientConnectStub.restore();
});

it('No such directory', async () => {
const sftpClientConnectStub = sinon.stub(Sftp.prototype, 'connect').returns({});
const sftpClientListStub = sinon.stub(Sftp.prototype, 'list').throws(new Error('No such directory'));

await expect(trigger.process.call(self, {}, cfg)).be.rejectedWith('No such directory');

expect(sftpClientConnectStub.calledOnce).to.be.equal(true);
expect(sftpClientListStub.calledOnce).to.be.equal(true);
sftpClientConnectStub.restore();
sftpClientListStub.restore();
});

it('Invalid file pattern causes exception', async () => {
const sftpClientConnectStub = sinon.stub(Sftp.prototype, 'connect').returns({});
await expect(trigger.process.call(self, {}, { ...cfg, pattern: '***' })).be.rejectedWith('Invalid regular expression: /***/: Nothing to repeat');

expect(sftpClientConnectStub.calledOnce).to.be.equal(true);
sftpClientConnectStub.restore();
});

it('No files available', async () => {
const list = [
{
type: 'd',
name: '.elasticio_processed',
size: 4096,
},
];
const sftpClientConnectStub = sinon.stub(Sftp.prototype, 'connect').returns({});
const sftpClientListStub = sinon.stub(Sftp.prototype, 'list').returns(list);
const sftpClientEndStub = sinon.stub(Sftp.prototype, 'end').returns(true);

await trigger.process.call(self, {}, cfg);

expect(self.emit.called).to.be.equal(true);
expect(self.emit.callCount).to.be.equal(1);
expect(self.emit.firstCall.args[0]).to.be.equal('snapshot');
expect(sftpClientEndStub.calledOnce).to.be.equal(true);
expect(sftpClientListStub.calledOnce).to.be.equal(true);
expect(sftpClientConnectStub.calledOnce).to.be.equal(true);
sftpClientConnectStub.restore();
sftpClientListStub.restore();
sftpClientEndStub.restore();
});

it('File name does not match given pattern', async () => {
const list = [
{
type: 'd',
name: '.elasticio_processed',
size: 4096,
},
{
type: '-',
name: '1.txt',
size: 7,
accessTime: '1575379317000',
modifyTime: '1575291942000',
},
];
const sftpClientConnectStub = sinon.stub(Sftp.prototype, 'connect').returns({});
const sftpClientListStub = sinon.stub(Sftp.prototype, 'list').returns(list);
const sftpClientEndStub = sinon.stub(Sftp.prototype, 'end').returns(true);

await trigger.process.call(self, {}, { ...cfg, pattern: 'aaa' });

expect(self.emit.called).to.be.equal(true);
expect(self.emit.callCount).to.be.equal(1);
expect(self.emit.firstCall.args[0]).to.be.equal('snapshot');
expect(sftpClientConnectStub.calledOnce).to.be.equal(true);
sftpClientEndStub.restore();
sftpClientConnectStub.restore();
sftpClientListStub.restore();
});

it('File exceeds maximal file size', async () => {
const list = [
{
type: 'd',
name: '.elasticio_processed',
size: 4096,
},
{
type: '-',
name: '1.txt',
size: 204857600,
accessTime: 1575379317000,
modifyTime: 1575291942000,
},
];
const sftpClientConnectStub = sinon.stub(Sftp.prototype, 'connect').returns({});
const sftpClientEndStub = sinon.stub(Sftp.prototype, 'end').returns(true);
const sftpClientListStub = sinon.stub(Sftp.prototype, 'list').returns(list);

await trigger.process.call(self, {}, cfg);

expect(self.emit.called).to.be.equal(true);
expect(self.emit.callCount).to.be.equal(2);
expect(self.emit.firstCall.args[0]).to.be.equal('error');
expect(self.emit.firstCall.args[1].message).to.be.equal('File size is 204857600 bytes, it violates the variable MAX_FILE_SIZE, which is currently set to 104857600 bytes');
expect(self.emit.secondCall.args[0]).to.be.equal('snapshot');
expect(sftpClientConnectStub.calledOnce).to.be.equal(true);
expect(sftpClientEndStub.calledOnce).to.be.equal(true);
expect(sftpClientListStub.calledOnce).to.be.equal(true);
sftpClientEndStub.restore();
sftpClientConnectStub.restore();
sftpClientListStub.restore();
});

it('File read successfully', async () => {
const list = [
{
type: 'd',
name: '.elasticio_processed',
size: 4096,
},
{
type: '-',
name: '1.txt',
size: 7,
accessTime: 1575379317000,
modifyTime: 1651992942000,
},
];
const sftpClientConnectStub = sinon.stub(Sftp.prototype, 'connect').returns({});
const sftpClientExistsStub = sinon.stub(Sftp.prototype, 'exists').returns(true);
const sftpClientMoveStub = sinon.stub(Sftp.prototype, 'move').returns(true);
const sftpClientEndStub = sinon.stub(Sftp.prototype, 'end').returns(true);
const sftpClientListStub = sinon.stub(Sftp.prototype, 'list').returns(list);
const sftpClientGetStub = sinon.stub(Sftp.prototype, 'get').returns(buffer);
const sftpClientGetReadStreamStub = sinon.stub(Sftp.prototype, 'getReadStream').returns(buffer);
const attachStub = sinon.stub(AttachmentProcessor.prototype, 'uploadAttachment').returns(res);

await trigger.process.call(self, {}, cfg);

expect(self.emit.calledTwice).to.be.equal(true);
expect(self.emit.getCall(0).args[0]).to.be.equal('data');
expect(self.emit.getCall(0).args[1].body).to.be.deep.equal({
accessTime: '2019-12-03T13:21:57.000Z',
attachment_url: 'https://url1111?storage_type=maester',
directory: 'www/test',
modifyTime: '2022-05-08T06:55:42.000Z',
name: '1.txt',
path: 'www/test/1.txt',
size: 7,
type: '-',
});
expect(self.emit.getCall(1).args[0]).to.be.equal('snapshot');
expect(sftpClientConnectStub.calledOnce).to.be.equal(true);
expect(attachStub.calledOnce).to.be.equal(true);
sftpClientEndStub.restore();
sftpClientMoveStub.restore();
sftpClientExistsStub.restore();
sftpClientConnectStub.restore();
sftpClientListStub.restore();
sftpClientGetStub.restore();
sftpClientGetReadStreamStub.restore();
attachStub.restore();
});
});

0 comments on commit 1e9094b

Please sign in to comment.