-
Notifications
You must be signed in to change notification settings - Fork 8
/
can-ndjson-stream.js
68 lines (60 loc) · 1.76 KB
/
can-ndjson-stream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
"use strict";
/*exported ndjsonStream*/
var namespace = require('can-namespace');
var ndjsonStream = function(response) {
// For cancellation
var is_reader, cancellationRequest = false;
return new ReadableStream({
start: function(controller) {
var reader = response.getReader();
is_reader = reader;
var decoder = new TextDecoder();
var data_buf = "";
reader.read().then(function processResult(result) {
if (result.done) {
if (cancellationRequest) {
// Immediately exit
return;
}
data_buf = data_buf.trim();
if (data_buf.length !== 0) {
try {
var data_l = JSON.parse(data_buf);
controller.enqueue(data_l);
} catch(e) {
controller.error(e);
return;
}
}
controller.close();
return;
}
var data = decoder.decode(result.value, {stream: true});
data_buf += data;
var lines = data_buf.split("\n");
for(var i = 0; i < lines.length - 1; ++i) {
var l = lines[i].trim();
if (l.length > 0) {
try {
var data_line = JSON.parse(l);
controller.enqueue(data_line);
} catch(e) {
controller.error(e);
cancellationRequest = true;
reader.cancel();
return;
}
}
}
data_buf = lines[lines.length-1];
return reader.read().then(processResult);
});
},
cancel: function(reason) {
console.log("Cancel registered due to ", reason);
cancellationRequest = true;
is_reader.cancel();
}
});
};
module.exports = namespace.ndjsonStream = ndjsonStream;