Skip to content

Commit

Permalink
fix: Process stdin as a stream (#308)
Browse files Browse the repository at this point in the history
  • Loading branch information
juanjoDiaz authored and knownasilya committed Jun 26, 2018
1 parent 97da8e1 commit 2b186b6
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 21 deletions.
14 changes: 12 additions & 2 deletions bin/json2csv.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ function getFields() {
: undefined;
}

function getInputStream() {
if (!inputPath) {
process.stdin.resume();
process.stdin.setEncoding('utf8');
return process.stdin;
}

return fs.createReadStream(inputPath, { encoding: 'utf8' })
}

function getInput() {
if (!inputPath) {
return getInputFromStdin();
Expand Down Expand Up @@ -157,14 +167,14 @@ Promise.resolve()
withBOM: program.withBom
};

if (!inputPath || program.streaming === false) {
if (!program.streaming) {
return getInput()
.then(input => new JSON2CSVParser(opts).parse(input))
.then(processOutput);
}

const transform = new Json2csvTransform(opts);
const input = fs.createReadStream(inputPath, { encoding: 'utf8' });
const input = getInputStream();
const stream = input.pipe(transform);

if (program.output) {
Expand Down
25 changes: 23 additions & 2 deletions lib/JSON2CSVTransform.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,26 @@ class JSON2CSVTransform extends Transform {
.map(line => line.trim())
.filter(line => line !== '');

let pendingData = false;
lines
.forEach((line, i) => {
try {
transform.pushLine(JSON.parse(line));
} catch(e) {
if (i !== lines.length - 1) {
if (i === lines.length - 1) {
pendingData = true;
} else {
e.message = 'Invalid JSON (' + line + ')'
transform.emit('error', e);
}
}
});
this._data = this._data.slice(this._data.lastIndexOf('\n'));
this._data = pendingData
? this._data.slice(this._data.lastIndexOf('\n'))
: '';
},
getPendingData() {
return this._data;
}
};
}
Expand Down Expand Up @@ -104,6 +112,10 @@ class JSON2CSVTransform extends Transform {
}
}

this.parser.getPendingData = function () {
return this.value;
}

this.parser.onError = function (err) {
if(err.message.indexOf('Unexpected') > -1) {
err.message = 'Invalid JSON (' + err.message + ')';
Expand All @@ -124,6 +136,15 @@ class JSON2CSVTransform extends Transform {
done();
}

_flush(done) {
if (this.parser.getPendingData()) {
done(new Error('Invalid data received from stdin', this.parser.getPendingData()));
}

done();
}


/**
* Generate the csv header and pushes it downstream.
*/
Expand Down
26 changes: 24 additions & 2 deletions test/CLI.js
Original file line number Diff line number Diff line change
Expand Up @@ -626,11 +626,11 @@ module.exports = (testRunner, jsonFixtures, csvFixtures) => {

// Get input from stdin

testRunner.add('should get input from stdin', (t) => {
testRunner.add('should get input from stdin and process as stream', (t) => {
const test = child_process.exec(cli, (err, stdout, stderr) => {
t.notOk(stderr);
const csv = stdout;
t.equal(csv, csvFixtures.default + '\n'); // console.log append the new line
t.equal(csv, csvFixtures.defaultStream);
t.end();
});

Expand All @@ -648,6 +648,28 @@ module.exports = (testRunner, jsonFixtures, csvFixtures) => {
test.stdin.end();
});

testRunner.add('should get input from stdin with -s flag', (t) => {
const test = child_process.exec(cli + '-s', (err, stdout, stderr) => {
t.notOk(stderr);
const csv = stdout;
t.equal(csv, csvFixtures.default + '\n'); // console.log append the new line
t.end();
});

test.stdin.write(JSON.stringify(jsonFixtures.default));
test.stdin.end();
});

testRunner.add('should error if stdin data is not valid with -s flag', (t) => {
const test = child_process.exec(cli + '-s', (err, stdout, stderr) => {
t.ok(stderr.indexOf('Invalid data received from stdin') !== -1);
t.end();
});

test.stdin.write('{ "b": 1,');
test.stdin.end();
});

// testRunner.add('should error if stdin fails', (t) => {
// const test = child_process.exec(cli, (err, stdout, stderr) => {
// t.ok(stderr.indexOf('Could not read from stdin') !== -1);
Expand Down
29 changes: 14 additions & 15 deletions test/JSON2CSVTransform.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,21 +151,20 @@ module.exports = (testRunner, jsonFixtures, csvFixtures) => {
.on('error', err => t.notOk(true, err.message));
});

// TODO infer only from first element
// testRunner.add('should parse json to csv and infer the fields automatically ', (t) => {
// const transform = new Json2csvTransform();
// const processor = jsonFixtures.default().pipe(transform);

// let csv = '';
// processor
// .on('data', chunk => (csv += chunk.toString()))
// .on('end', () => {
// t.ok(typeof csv === 'string');
// t.equal(csv, csvFixtures.default);
// t.end();
// })
// .on('error', err => t.notOk(true, err.message));
// });
testRunner.add('should parse json to csv and infer the fields automatically ', (t) => {
const transform = new Json2csvTransform();
const processor = jsonFixtures.default().pipe(transform);

let csv = '';
processor
.on('data', chunk => (csv += chunk.toString()))
.on('end', () => {
t.ok(typeof csv === 'string');
t.equal(csv, csvFixtures.defaultStream);
t.end();
})
.on('error', err => t.notOk(true, err.message));
});

testRunner.add('should parse json to csv using custom fields', (t) => {
const opts = {
Expand Down
5 changes: 5 additions & 0 deletions test/fixtures/csv/defaultStream.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"carModel","price","color"
"Audi",0,"blue"
"BMW",15000,"red"
"Mercedes",20000,"yellow"
"Porsche",30000,"green"

0 comments on commit 2b186b6

Please sign in to comment.