-
-
Notifications
You must be signed in to change notification settings - Fork 27
/
Copy pathindex.js
125 lines (86 loc) · 2.49 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
var Worker = require('camunda-worker-node');
var Backoff = require('camunda-worker-node/lib/backoff');
var Metrics = require('camunda-worker-node/lib/metrics');
var engineEndpoint = process.env.ENGINE_URL || 'http://localhost:8080/engine-rest';
var uuid = require('uuid');
var debugWorker = require('debug')('orderProcess:worker');
var debugShipment = require('debug')('orderProcess:worker:shipment');
var debugCheckout = require('debug')('orderProcess:worker:checkout');
var worker = new Worker(engineEndpoint, {
maxTasks: 10,
use: [
[ Backoff, { maxActiveTasks: 50 } ],
Metrics
]
});
async function shipOrder(context) {
const {
variables
} = context;
var order = variables.order;
debugShipment('processing order[id=%s]', order.orderId);
await delay(Math.trunc(Math.random() * 1.5));
if (Math.random() > 0.8) {
debugShipment('failed to ship order[id=%s]', order.orderId);
throw new Error('failed to process shipment: RANDOM STUFF');
}
// do actual work here, write database, provision goods
order.shipmentId = uuid.v4();
order.shipped = true;
debugShipment('shipping order[id=%s] with shipmentId=%s', order.orderId, order.shipmentId);
// notify we are done with an updated order variable
return {
variables: {
order: order
}
};
}
async function checkout(context) {
const {
variables,
extendLock
} = context;
const goods = variables.goods;
if (!goods || goods.length === 0) {
throw new Error('no goods in basket');
}
// do actual work here, write database, reserve goods
const order = {
orderId: uuid.v4(),
goods
};
debugCheckout(
'created order[orderId=%s] with %s goods',
order.orderId,
order.goods.length
);
if (Math.random() > 0.6) {
debugCheckout('delayed order[orderId=%s]', order.orderId);
await extendLock(5000);
await delay(Math.trunc(Math.random() * 3));
}
// notify we are done with a new order variable
return {
variables: {
order: order
}
};
}
worker.subscribe('orderProcess:shipment', [ 'order' ], shipOrder);
worker.subscribe('orderProcess:checkout', [ 'goods' ], checkout);
worker.on('start', function() {
debugWorker('starting');
});
worker.on('poll', function() {
debugWorker('polling');
});
// handle worker errors
worker.on('error', function(err) {
debugWorker('error: %s', err);
});
// helpers ////////////////////
function delay(seconds) {
return new Promise(function(resolve) {
setTimeout(resolve, seconds * 1000);
});
}