Skip to content

Commit

Permalink
Merge pull request #547 from humphd/feedparser-fixes
Browse files Browse the repository at this point in the history
Parallelize feed parser, make it run forever
  • Loading branch information
cindyorangis authored Jan 20, 2020
2 parents aea978b + 7642ea1 commit 60eeffe
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 61 deletions.
9 changes: 7 additions & 2 deletions env.example
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@ BLOG_INACTIVE_TIME=360
GITHUB_TOKEN=

# Feed job queue attempts
FEED_QUEUE_ATTEMPTS=8
FEED_QUEUE_ATTEMPTS=2

# Feed job queue delay(ms)
FEED_QUEUE_DELAY_MS=60000
FEED_QUEUE_DELAY_MS=30000

# Number of concurrent worker processors to run. Use * if you want to run
# one per CPU. Use a number if you want to set it manually, up to a max
# of the CPU count. If not set, we'll assume 1.
FEED_QUEUE_PARALLEL_WORKERS=1
31 changes: 31 additions & 0 deletions src/backend/feed/processor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* A processor function to be run concurrently, in its own process, and
* with potentially multiple simultaneous instances, by the feed queue.
* https://github.com/OptimalBits/bull#separate-processes
*/

const { parse } = require('feedparser-promised');

const { logger } = require('../utils/logger');
const Post = require('../post');

module.exports = async function processor(job) {
const { url } = job.data;
const httpOptions = {
url,
// ms to wait for a connection to be assumed to have failed
timeout: 20 * 1000,
gzip: true,
};
let articles;

try {
articles = await parse(httpOptions);
} catch (err) {
logger.error({ err }, `Unable to process feed ${url}`);
throw err;
}

// Transform the list of articles to a list of Post objects
return articles.map(article => Post.fromArticle(article));
};
27 changes: 27 additions & 0 deletions src/backend/feed/queue.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const { setQueues } = require('bull-board');

require('../lib/config');
const { logger } = require('../utils/logger');
const { createQueue } = require('../lib/queue');

// Create a Bull Redis Queue
Expand All @@ -9,4 +10,30 @@ const queue = createQueue('feed-queue');
// For visualizing queues using bull board
setQueues(queue);

/**
* Provide a helper for adding a feed with our desired default options.
* The `feedInfo` is an Object with `name` (i.e., name of author) and `url`
* (i.e., url of feed) properties, matching what we parse from the wiki.
*/
queue.addFeed = async function(feedInfo) {
const options = {
// Use the feed URL as the job key, so we don't double add it.
// Bull will not add a job there already exists a job with the same id.
id: feedInfo.url,
attempts: process.env.FEED_QUEUE_ATTEMPTS || 2,
backoff: {
type: 'exponential',
delay: process.env.FEED_QUEUE_DELAY_MS || 30 * 1000,
},
removeOnComplete: true,
removeOnFail: true,
};

try {
queue.add(feedInfo, options);
} catch (err) {
logger.error({ err, feedInfo }, 'Unable to add job to queue');
}
};

module.exports = queue;
47 changes: 32 additions & 15 deletions src/backend/feed/worker.js
Original file line number Diff line number Diff line change
@@ -1,33 +1,50 @@
const { parse } = require('feedparser-promised');
const { cpus } = require('os');
const path = require('path');

const feedQueue = require('./queue');
const { logger } = require('../utils/logger');
const Post = require('../post');

exports.workerCallback = async function(job) {
const { url } = job.data;
let articles;
/**
* We determine the number of parallel feed processor functions to run
* based on the value of the environment variable FEED_QUEUE_PARALLEL_WORKERS.
* Possible values are:
*
* *: use the number of available CPUs
* <a Number>: use the given number, up to the number of available CPUs
* <not set>: use 1 by default
*/
function getFeedWorkersCount() {
const { FEED_QUEUE_PARALLEL_WORKERS } = process.env;
const cpuCount = cpus().length;

try {
articles = await parse(url);
} catch (err) {
logger.error({ err }, `Unable to process feed ${url}`);
throw err;
if (FEED_QUEUE_PARALLEL_WORKERS === '*') {
return cpuCount;
}

return articles.map(article => Post.fromArticle(article));
};
const count = Number(FEED_QUEUE_PARALLEL_WORKERS);
if (typeof count === 'number') {
return Math.min(count, cpuCount);
}

exports.start = async function() {
// Start processing jobs from the feed queue...
feedQueue.process(exports.workerCallback);
return 1;
}

exports.start = function() {
const concurrency = getFeedWorkersCount();
logger.info(`Starting ${concurrency} instance${concurrency > 1 ? 's' : ''} of feed processor.`);
feedQueue.process(concurrency, path.resolve(__dirname, 'processor.js'));

// When posts are returned from the queue, save them to the database
feedQueue.on('completed', async (job, posts) => {
try {
await Promise.all(posts.map(post => post.save()));
// The posts we get back will be Objects, and we need to convert
// to a full Post, then save to Redis.
await Promise.all(posts.map(post => Post.parse(post).save()));
} catch (err) {
logger.error({ err }, 'Error inserting posts into database');
}
});

return feedQueue;
};
45 changes: 24 additions & 21 deletions src/backend/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
require('./lib/config.js');
const feedQueue = require('./feed/queue');
const feedWorker = require('./feed/worker');
const { logger: log } = require('./utils/logger');
const { logger } = require('./utils/logger');
const wikiFeed = require('./utils/wiki-feed-parser');
const shutdown = require('./lib/shutdown');

Expand All @@ -23,26 +23,29 @@ process.on('uncaughtException', shutdown('UNCAUGHT EXCEPTION'));
* @param {Array[Object]} feedJobs - list of feed URL jobs to be processed
*/
async function enqueueWikiFeed() {
const data = await wikiFeed.parseData();
await Promise.all(
data.map(feedJob =>
feedQueue.add(feedJob, {
attempts: process.env.FEED_QUEUE_ATTEMPTS || 8,
backoff: {
type: 'exponential',
delay: process.env.FEED_QUEUE_DELAY_MS || 60 * 1000,
},
removeOnComplete: true,
removeOnFail: true,
})
)
).catch(err => log.error({ err }, 'Error queuing wiki feeds'));
try {
const data = await wikiFeed.parseData();
await Promise.all(data.map(feedInfo => feedQueue.addFeed(feedInfo)));
} catch (err) {
logger.error({ err }, 'Error queuing wiki feeds');
}
}

enqueueWikiFeed()
.then(() => {
feedWorker.start();
})
.catch(error => {
log.error({ error }, 'Unable to enqueue wiki feeds');
function loadFeedsIntoQueue() {
logger.info('Loading all feeds into feed queue for processing');
enqueueWikiFeed().catch(error => {
logger.error({ error }, 'Unable to enqueue wiki feeds');
});
}

/**
* When the feed queue is drained (all feeds are processed in the queue),
* restart the process again, and repeat forever.
*/
feedQueue.on('drained', loadFeedsIntoQueue);

/**
* Also load all feeds now and begin processing.
*/
loadFeedsIntoQueue();
feedWorker.start();
14 changes: 7 additions & 7 deletions test/feed-worker.test.js → test/feed-processor.test.js
Original file line number Diff line number Diff line change
@@ -1,44 +1,44 @@
const fixtures = require('./fixtures');
const feedWorker = require('../src/backend/feed/worker');
const processor = require('../src/backend/feed/processor');

test('Passing a valid Atom feed URI should pass', async () => {
const feedURL = fixtures.getAtomUri();
fixtures.nockValidAtomResponse();
const job = fixtures.createMockJobObjectFromURL(feedURL);
await expect(feedWorker.workerCallback(job)).resolves.toBeTruthy();
await expect(processor(job)).resolves.toBeTruthy();
});

test('Passing a valid RSS feed URI should pass', async () => {
const feedURL = fixtures.getRssUri();
fixtures.nockValidRssResponse();
const job = fixtures.createMockJobObjectFromURL(feedURL);
await expect(feedWorker.workerCallback(job)).resolves.toBeTruthy();
await expect(processor(job)).resolves.toBeTruthy();
});

test('Passing a valid URI, but not a feed URI should error', async () => {
const url = fixtures.getHtmlUri();
fixtures.nockValidHtmlResponse();
const job = fixtures.createMockJobObjectFromURL(url);
await expect(feedWorker.workerCallback(job)).rejects.toThrow();
await expect(processor(job)).rejects.toThrow();
});

test('Passing an invalid RSS category feed should pass', async () => {
const feedURL = fixtures.getRssUri();
fixtures.nockInvalidRssResponse();
const job = fixtures.createMockJobObjectFromURL(feedURL);
await expect(feedWorker.workerCallback(job)).resolves.toBeTruthy();
await expect(processor(job)).resolves.toBeTruthy();
});

test('Passing a valid RSS category feed should pass', async () => {
const feedURL = fixtures.getRssUri();
fixtures.nockValidRssResponse();
const job = fixtures.createMockJobObjectFromURL(feedURL);
await expect(feedWorker.workerCallback(job)).resolves.toBeTruthy();
await expect(processor(job)).resolves.toBeTruthy();
});

test('Non existent feed failure case: 404 should error', async () => {
const url = fixtures.getHtmlUri();
fixtures.nock404Response();
const job = fixtures.createMockJobObjectFromURL(url);
await expect(feedWorker.workerCallback(job)).rejects.toThrow();
await expect(processor(job)).rejects.toThrow();
});
23 changes: 7 additions & 16 deletions tools/add-feed.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,13 @@ async function add() {
process.exit(1);
}

const feed = { name, url };
const feedInfo = { name, url };

await feedQueue
.add(feed, {
attempts: process.env.FEED_QUEUE_ATTEMPTS || 8,
backoff: {
type: 'exponential',
delay: process.env.FEED_QUEUE_DELAY_MS || 60 * 1000,
},
removeOnComplete: true,
removeOnFail: true,
})
.catch(err => {
log.error({ err }, 'Error enqueuing feed');
process.exit(1);
});
process.exit(0);
try {
await feedQueue.addFeed(feedInfo);
process.exit(0);
} catch (err) {
process.exit(1);
}
}
add();

0 comments on commit 60eeffe

Please sign in to comment.