Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize feed parser, make it run forever #547

Merged
merged 2 commits into from
Jan 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions env.example
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@ BLOG_INACTIVE_TIME=360
GITHUB_TOKEN=

# Feed job queue attempts
FEED_QUEUE_ATTEMPTS=8
FEED_QUEUE_ATTEMPTS=2
Grommers00 marked this conversation as resolved.
Show resolved Hide resolved

# Feed job queue delay(ms)
FEED_QUEUE_DELAY_MS=60000
FEED_QUEUE_DELAY_MS=30000

# Number of concurrent worker processors to run. Use * if you want to run
# one per CPU. Use a number if you want to set it manually, up to a max
# of the CPU count. If not set, we'll assume 1.
FEED_QUEUE_PARALLEL_WORKERS=1
31 changes: 31 additions & 0 deletions src/backend/feed/processor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* A processor function to be run concurrently, in its own process, and
* with potentially multiple simultaneous instances, by the feed queue.
* https://github.com/OptimalBits/bull#separate-processes
*/

const { parse } = require('feedparser-promised');

const { logger } = require('../utils/logger');
const Post = require('../post');

module.exports = async function processor(job) {
const { url } = job.data;
const httpOptions = {
url,
// ms to wait for a connection to be assumed to have failed
timeout: 20 * 1000,
gzip: true,
};
let articles;

try {
articles = await parse(httpOptions);
} catch (err) {
logger.error({ err }, `Unable to process feed ${url}`);
throw err;
}

// Transform the list of articles to a list of Post objects
return articles.map(article => Post.fromArticle(article));
};
27 changes: 27 additions & 0 deletions src/backend/feed/queue.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const { setQueues } = require('bull-board');

require('../lib/config');
const { logger } = require('../utils/logger');
const { createQueue } = require('../lib/queue');

// Create a Bull Redis Queue
Expand All @@ -9,4 +10,30 @@ const queue = createQueue('feed-queue');
// For visualizing queues using bull board
setQueues(queue);

/**
* Provide a helper for adding a feed with our desired default options.
* The `feedInfo` is an Object with `name` (i.e., name of author) and `url`
* (i.e., url of feed) properties, matching what we parse from the wiki.
*/
queue.addFeed = async function(feedInfo) {
const options = {
// Use the feed URL as the job key, so we don't double add it.
// Bull will not add a job there already exists a job with the same id.
id: feedInfo.url,
attempts: process.env.FEED_QUEUE_ATTEMPTS || 2,
backoff: {
type: 'exponential',
delay: process.env.FEED_QUEUE_DELAY_MS || 30 * 1000,
},
removeOnComplete: true,
removeOnFail: true,
};

try {
queue.add(feedInfo, options);
} catch (err) {
logger.error({ err, feedInfo }, 'Unable to add job to queue');
}
};

module.exports = queue;
47 changes: 32 additions & 15 deletions src/backend/feed/worker.js
Original file line number Diff line number Diff line change
@@ -1,33 +1,50 @@
const { parse } = require('feedparser-promised');
const { cpus } = require('os');
const path = require('path');

const feedQueue = require('./queue');
const { logger } = require('../utils/logger');
const Post = require('../post');

exports.workerCallback = async function(job) {
const { url } = job.data;
let articles;
/**
* We determine the number of parallel feed processor functions to run
* based on the value of the environment variable FEED_QUEUE_PARALLEL_WORKERS.
* Possible values are:
*
* *: use the number of available CPUs
* <a Number>: use the given number, up to the number of available CPUs
* <not set>: use 1 by default
*/
function getFeedWorkersCount() {
const { FEED_QUEUE_PARALLEL_WORKERS } = process.env;
const cpuCount = cpus().length;

try {
articles = await parse(url);
} catch (err) {
logger.error({ err }, `Unable to process feed ${url}`);
throw err;
if (FEED_QUEUE_PARALLEL_WORKERS === '*') {
return cpuCount;
}

return articles.map(article => Post.fromArticle(article));
};
const count = Number(FEED_QUEUE_PARALLEL_WORKERS);
if (typeof count === 'number') {
return Math.min(count, cpuCount);
}

exports.start = async function() {
// Start processing jobs from the feed queue...
feedQueue.process(exports.workerCallback);
return 1;
}

exports.start = function() {
const concurrency = getFeedWorkersCount();
logger.info(`Starting ${concurrency} instance${concurrency > 1 ? 's' : ''} of feed processor.`);
feedQueue.process(concurrency, path.resolve(__dirname, 'processor.js'));

// When posts are returned from the queue, save them to the database
feedQueue.on('completed', async (job, posts) => {
try {
await Promise.all(posts.map(post => post.save()));
// The posts we get back will be Objects, and we need to convert
// to a full Post, then save to Redis.
await Promise.all(posts.map(post => Post.parse(post).save()));
} catch (err) {
logger.error({ err }, 'Error inserting posts into database');
}
});

return feedQueue;
};
45 changes: 24 additions & 21 deletions src/backend/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
require('./lib/config.js');
const feedQueue = require('./feed/queue');
const feedWorker = require('./feed/worker');
const { logger: log } = require('./utils/logger');
const { logger } = require('./utils/logger');
const wikiFeed = require('./utils/wiki-feed-parser');
const shutdown = require('./lib/shutdown');

Expand All @@ -23,26 +23,29 @@ process.on('uncaughtException', shutdown('UNCAUGHT EXCEPTION'));
* @param {Array[Object]} feedJobs - list of feed URL jobs to be processed
*/
async function enqueueWikiFeed() {
const data = await wikiFeed.parseData();
await Promise.all(
data.map(feedJob =>
feedQueue.add(feedJob, {
attempts: process.env.FEED_QUEUE_ATTEMPTS || 8,
backoff: {
type: 'exponential',
delay: process.env.FEED_QUEUE_DELAY_MS || 60 * 1000,
},
removeOnComplete: true,
removeOnFail: true,
})
)
).catch(err => log.error({ err }, 'Error queuing wiki feeds'));
try {
const data = await wikiFeed.parseData();
await Promise.all(data.map(feedInfo => feedQueue.addFeed(feedInfo)));
} catch (err) {
logger.error({ err }, 'Error queuing wiki feeds');
}
}

enqueueWikiFeed()
.then(() => {
feedWorker.start();
})
.catch(error => {
log.error({ error }, 'Unable to enqueue wiki feeds');
function loadFeedsIntoQueue() {
logger.info('Loading all feeds into feed queue for processing');
enqueueWikiFeed().catch(error => {
logger.error({ error }, 'Unable to enqueue wiki feeds');
});
}

/**
* When the feed queue is drained (all feeds are processed in the queue),
* restart the process again, and repeat forever.
*/
feedQueue.on('drained', loadFeedsIntoQueue);

/**
* Also load all feeds now and begin processing.
*/
loadFeedsIntoQueue();
feedWorker.start();
14 changes: 7 additions & 7 deletions test/feed-worker.test.js → test/feed-processor.test.js
Original file line number Diff line number Diff line change
@@ -1,44 +1,44 @@
const fixtures = require('./fixtures');
const feedWorker = require('../src/backend/feed/worker');
const processor = require('../src/backend/feed/processor');

test('Passing a valid Atom feed URI should pass', async () => {
const feedURL = fixtures.getAtomUri();
fixtures.nockValidAtomResponse();
const job = fixtures.createMockJobObjectFromURL(feedURL);
await expect(feedWorker.workerCallback(job)).resolves.toBeTruthy();
await expect(processor(job)).resolves.toBeTruthy();
});

test('Passing a valid RSS feed URI should pass', async () => {
const feedURL = fixtures.getRssUri();
fixtures.nockValidRssResponse();
const job = fixtures.createMockJobObjectFromURL(feedURL);
await expect(feedWorker.workerCallback(job)).resolves.toBeTruthy();
await expect(processor(job)).resolves.toBeTruthy();
});

test('Passing a valid URI, but not a feed URI should error', async () => {
const url = fixtures.getHtmlUri();
fixtures.nockValidHtmlResponse();
const job = fixtures.createMockJobObjectFromURL(url);
await expect(feedWorker.workerCallback(job)).rejects.toThrow();
await expect(processor(job)).rejects.toThrow();
});

test('Passing an invalid RSS category feed should pass', async () => {
const feedURL = fixtures.getRssUri();
fixtures.nockInvalidRssResponse();
const job = fixtures.createMockJobObjectFromURL(feedURL);
await expect(feedWorker.workerCallback(job)).resolves.toBeTruthy();
await expect(processor(job)).resolves.toBeTruthy();
});

test('Passing a valid RSS category feed should pass', async () => {
const feedURL = fixtures.getRssUri();
fixtures.nockValidRssResponse();
const job = fixtures.createMockJobObjectFromURL(feedURL);
await expect(feedWorker.workerCallback(job)).resolves.toBeTruthy();
await expect(processor(job)).resolves.toBeTruthy();
});

test('Non existent feed failure case: 404 should error', async () => {
const url = fixtures.getHtmlUri();
fixtures.nock404Response();
const job = fixtures.createMockJobObjectFromURL(url);
await expect(feedWorker.workerCallback(job)).rejects.toThrow();
await expect(processor(job)).rejects.toThrow();
});
23 changes: 7 additions & 16 deletions tools/add-feed.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,13 @@ async function add() {
process.exit(1);
}

const feed = { name, url };
const feedInfo = { name, url };

await feedQueue
.add(feed, {
attempts: process.env.FEED_QUEUE_ATTEMPTS || 8,
backoff: {
type: 'exponential',
delay: process.env.FEED_QUEUE_DELAY_MS || 60 * 1000,
},
removeOnComplete: true,
removeOnFail: true,
})
.catch(err => {
log.error({ err }, 'Error enqueuing feed');
process.exit(1);
});
process.exit(0);
try {
await feedQueue.addFeed(feedInfo);
process.exit(0);
} catch (err) {
process.exit(1);
}
}
add();