FutoIn EventStream is fundamental part for efficient global state update in distributed systems. It is used for reliable event delivery, decentralized cache invalidation, efficient online segregation of active and warehouse data.
Unlike various message/event brokers, the focus of FutoIn Event Stream is integration with database transactions for reliable efficient event recording and delivery.
The design is not focused on high throughput as the primary reason is reliable association of events with database changes. Please consider using pure message brokers for throughput-intensive cases.
Documentation --> FutoIn Guide
Reference implementation of:
FTN18: FutoIn Interface - Event Stream
Version: 1.1
- Database transaction-bound event generation.
- Standalone event generation.
- Event polling for simple, but less efficient solutions.
- Event pushing for advanced efficient cases.
- MySQL
- PostgreSQL
- SQLite
- Potentially, any other SQL-compliant supported by
futoin-database
Command line:
$ yarn add futoin-eventstream
or
$ npm install futoin-eventstream --save
Each event has auto-generated ID, type, data and timestamp. Type is all upper case identifier. Data is arbitrary JSON-friendly data.
Two configurable delivery strategies are supported: polling and streaming, but consumer acts as client in both cases.
There are two delivery modes: reliable and live. The later allow messages to be skipped. To ensure that events are reliably delivered, each consumer must register first.
Two message storage types are assumed: active small high performance area and slower data warehouse for all time history. DBEventArchiver tool is provided for efficient reliable data transfer.
More detailed concept is in the FTN18 spec.
GenFace.register(as, ccm, 'evtgen', endpoint );
// ...
const evtgen = ccm.iface('evtgen');
evtgen.addEvent(as, 'NULL_EVENT');
evtgen.addEvent(as, 'INT_EVENT', 123);
evtgen.addEvent(as, 'STR_EVENT', 'Some Str');
evtgen.addEvent(as, 'ARRAY_EVENT', [1, 2, 3]);
evtgen.addEvent(as, 'OBJECT_EVENT', { a: 1, b: 2, c: 3 });
For more advanced cases, you can check source code of DBGenFace#addXferEvent() to build more tailored statements.
DBGenFace.register(as, ccm, 'evtgen', endpoint );
// ...
const evtgen = ccm.iface('evtgen');
const db = ccm.db();
const xfer = db.newXfer();
xfer.insert('SomeTable').set('name', 'Name');
evtgen.addXferEvent(xfer, 'NEW_ENTRY', {name: 'Name'});
xfer.execute(as);
Each consumer is identifier by credentials + arbitrary component name.
As special "LIVE" component can be used for unreliable delivery.
PollFace.register(as, ccm, 'evtpoll', endpoint );
// ...
const evtpoll = ccm.iface('evtpoll');
// User info polling
evtpoll.registerConsumer(as, 'Security');
evtpoll.pollEvents(as, 'Security', last_known_id_here, ['USR_ADD', 'USR_MOD', 'USR_LOGIN']);
as.add((as, events) => {
// ....
});
// Anti-Fraud processing
evtpoll.registerConsumer(as, 'AntiFraud');
evtpoll.pollEvents(as, 'AntiFraud', last_known_id_here, ['DEPOSIT', 'WITHDRAW', 'XFER']);
as.add((as, events) => {
// ....
});
A child class of ReliableReceiverService should be created to properly handle incoming events.
A bi-directional channel like WebSockets or Internal must be used.
A separate Executor instance should be created for use in endpoint callbacks.
class UserReceiver extends ReliableReceiverService
{
_onEvents( as, reqinfo, events ) {
// ...
}
}
const executor = Executor(ccm);
PollFace.register(as, ccm, 'evtpushsec', endpoint, credentials, { executor } );
UserReceiver.register(as, executor);
const evtpushsec = ccm.iface('evtpushsec');
evtpushsec.registerConsumer(as, 'Security');
evtpushsec.readyToReceive(as, 'Security', ['USR_ADD', 'USR_MOD', 'USR_LOGIN']);
There should be a single system-wide instance of DBEventArchiver tool. The tool will automatically reconnect on errors. Processing state can be monitored through events.
DBAutoConfig(as, ccm, {
evtdwh: {}
});
const archiver = new DBEventArchiver(ccm);
archiver.on('workerError', () => { ... });
archiver.on('receiverError', () => { ... });
archiver.on('newEvents', () => { ... });
// keep going until stopped
archiver.start(push_endpoint, credentials);
// to stop - automatically called on ccm.close()
archiver.stop();
For performance and disaster recovery time reasons, operation critical database should be kept as small as possible. Events delivered to all consumers including DBEventArchiver can be removed the following way.
DBAutoConfig(as, ccm, {
evt: {}
});
const discarder = new DBEventDiscarder(ccm);
archiver.on('workerError', () => { ... });
archiver.on('eventDiscard', () => { ... });
// keep going until stopped
discarder.start(ccm);
// to stop - automatically called on ccm.close()
discarder.stop();
DBPushService inherits DBPollService, so there is no need to instance both.
This case show internal communicaton channel.
const ccm = new AdvancedCCM();
DBAutoConfig(as, ccm, {
evt: {}
});
const executor = new Executor(ccm); // or NodeExecutor() for WebSockets
DBGenService.register(as, executor);
DBPushService.register(as, executor);
GenFace.register(as, ccm, 'evtgen', executor);
PollFace.register(as, ccm, 'evtpoll', executor);
const p = as.parallel();
p.loop( (as) => {
ccm.iface('evtgen').addEvent(as, 'EVT', 'data');
});
p.add( (as) => {
let last_id = null;
as.loop( (as) => {
ccm.iface('evtpoll').pollEvents(as, 'LIVE', last_id);
as.add((as, events) => {
if (events.length) {
last_id = events[events.length - 1].id;
} else {
const timer = setTimeout( () => as.success(), 1e3 );
as.setCancel((as) => clearTimeout(timer));
}
});
});
});
The concept is described in FutoIn specification: FTN18: FutoIn Interface - Event Stream v1.x
- DBEventArchiver
Database Event Archiver service.
- DBEventDiscarder
DB-specific event discarding.
It's assumed to be run against "active" database part as defined in the concept to reduce its size after all reliably delivered events are delivered to consumers.
Event are deleted based on limit_at_once to avoid too large transactions which may affect performance of realtime processes and break some DB clusters like Galera.
- DBGenFace
GenFace for DB backend.
The only difference to original GenFace is native DB-specific API.
- DBGenService
Database-specific event generation service
- DBPollService
Database-based Poll Service
- DBPushService
Database-specific Push Service
- DBServiceApp
All-in-one DB EventStream initialization
- GenFace
Event Stream - Generator Face
- GenService
Event Stream - Generator Service Base
- LiveReceiver
Reliable Event Receiver helper to minimize boilerplate code in projects.
- PollFace
Event Stream - Poll Face
- PollService
Event Stream - Poll Service Base
- PushFace
Event Stream - Push Face
- PushService
Event Stream - Push Service Base
- ReceiverFace
Event Stream - Receiver Face
- ReceiverService
Base implementation for receiver side
- ReliableReceiver
Reliable Event Receiver helper to minimize boilerplate code in projects.
- ReliableReceiverService
Base implementation for reliable receiver side.
Database Event Archiver service.
Kind: global class
Note: No more than one instance should run at once.
C-tor
Param | Type | Description |
---|---|---|
db_ccm | AdvancedCCM |
CCM instance with registered '#db.evtdwh' interface |
DB-specific event discarding.
It's assumed to be run against "active" database part as defined in the concept to reduce its size after all reliably delivered events are delivered to consumers.
Event are deleted based on limit_at_once to avoid too large transactions which may affect performance of realtime processes and break some DB clusters like Galera.
Kind: global class
Start event discarding
Kind: instance method of DBEventDiscarder
Param | Type | Default | Description |
---|---|---|---|
ccm | AdvancedCCM |
CCM with registered #db.evt interface | |
[options] | object |
{} |
options |
[options.poll_period_ms] | integer |
600e3 |
poll interval |
[options.limit_at_once] | integer |
1000 |
events to delete at once |
[options.event_table] | string |
"default" |
events table |
[options.consumer_table] | string |
"default" |
consumers table |
Stop event discarding
Kind: instance method of DBEventDiscarder
Emitted on worker errors
Kind: event emitted by DBEventDiscarder
Emitted on discarded events
Kind: event emitted by DBEventDiscarder
GenFace for DB backend.
The only difference to original GenFace is native DB-specific API.
Kind: global class
Easy access to DB event table name
Kind: instance property of DBGenFace
Returns: string
- raw table name
Helper to add event generation into DB transaction
Kind: instance method of DBGenFace
Param | Type | Default | Description |
---|---|---|---|
xb | XferBuilder |
instance of transaction builder | |
type | string |
event type | |
data | * |
any data | |
[table] | string |
"evt_queue" |
event queue |
Database-specific event generation service
Please use DBGenService.regster()
Param | Type | Default | Description |
---|---|---|---|
_as | AsyncSteps |
async step interface | |
executor | Executor |
related Executor | |
[options] | object |
{} |
options |
[options.event_table] | string |
"default" |
events table |
Database-based Poll Service
Please use DBPollService,register()
Param | Type | Default | Description |
---|---|---|---|
as | AsyncSteps |
async step interface | |
executor | Executor |
related Executor | |
[options] | object |
{} |
options |
[options.event_table] | string |
"default" |
events table |
[options.consumer_table] | string |
"default" |
consumers table |
Database-specific Push Service
Please use DBPushService,register()
Param | Type | Default | Description |
---|---|---|---|
as | AsyncSteps |
async step interface | |
executor | Executor |
related Executor | |
[options] | object |
{} |
options |
[options.event_table] | string |
"default" |
events table |
[options.consumer_table] | string |
"default" |
consumers table |
[options.sleep_min] | integer |
100 |
minimal sleep on lack of events |
[options.sleep_max] | integer |
3000 |
maximal sleep on lack of events |
[options.sleep_step] | integer |
100 |
sleep time increase on lack of events |
All-in-one DB EventStream initialization
Kind: global class
- DBServiceApp
- new DBServiceApp(as, options)
- .ccm() ⇒
AdvancedCCM
- .executor() ⇒
Executor
- .close([done])
C-tor
Param | Type | Default | Description |
---|---|---|---|
as | AsyncSteps |
AsyncSteps interface | |
options | object |
{} |
options |
[options.ccm] | AdvancedCCM |
external CCM instance | |
[options.executor] | Executor |
external private executor instance | |
[options.ccmOptions] | object |
auto-CCM options | |
[options.notExpectedHandler] | callable |
'notExpected' error handler | |
[options.executorOptions] | object |
private auto-Executor options | |
[options.evtOptions] | object |
eventstream options | |
[options.discarderOptions] | object |
discarder options | |
[options.enableDiscarder] | boolean |
enable discarder, if true | |
[options.archiverOptions] | object |
discarder options | |
[options.enableArchiver] | boolean |
enable archiver, if true |
CCM instance accessor
Kind: instance method of DBServiceApp
Returns: AdvancedCCM
- instance
Executor instance accessor
Kind: instance method of DBServiceApp
Returns: Executor
- instance
Shutdown of app and related instances
Kind: instance method of DBServiceApp
Param | Type | Default | Description |
---|---|---|---|
[done] | callable |
|
done callback |
Event Stream - Generator Face
Kind: global class
Latest supported FTN17 version
Kind: static property of GenFace
Latest supported FTN4 version
Kind: static property of GenFace
CCM registration helper
Kind: static method of GenFace
Param | Type | Default | Description |
---|---|---|---|
as | AsyncSteps |
steps interface | |
ccm | AdvancedCCM |
CCM instance | |
name | string |
CCM registration name | |
endpoint | * |
see AdvancedCCM#register | |
[credentials] | * |
|
see AdvancedCCM#register |
[options] | object |
{} |
interface options |
[options.version] | string |
"1.0" |
interface version to use |
Event Stream - Generator Service Base
GenService.register(as, executor, options) ⇒ GenService
Register futoin.evt.gen interface with Executor
Kind: static method of GenService
Returns: GenService
- instance
Param | Type | Description |
---|---|---|
as | AsyncSteps |
steps interface |
executor | Executor |
executor instance |
options | object |
implementation defined options |
Reliable Event Receiver helper to minimize boilerplate code in projects.
Kind: global class
Initialize event archiver.
Param | Type | Description |
---|---|---|
executor_ccm | AdvancedCCM |
CCM for executor |
Start receiving events for archiving
Kind: instance method of LiveReceiver
Note: options.executor is overridden
Param | Type | Default | Description |
---|---|---|---|
endpoint | * |
see PushFace | |
[credentials] | * |
|
see PushFace |
[options] | * |
{} |
see PushFace |
[options.component] | string |
component name | |
[options.want] | array |
"want" parameter for event filtering |
Stop receiving events
Kind: instance method of LiveReceiver
liveReceiver._registerReceiver(as, executor, options) ⇒ ReceiverService
Override to register custom instance of ReceiverService.
Kind: instance method of LiveReceiver
Returns: ReceiverService
- instance of service
Param | Type | Description |
---|---|---|
as | AsyncSteps |
async steps interface |
executor | Executor |
Internal Executor instance |
options | object |
passed options |
Override to catch new events here instead of using newEvents
event handler.
Kind: instance method of LiveReceiver
Param | Type | Description |
---|---|---|
as | AsyncSteps |
async steps interface |
events | array |
array of events |
Emitted on not expected receiver errors
Kind: event emitted by LiveReceiver
Emitted on worker errors
Kind: event emitted by LiveReceiver
Emitted on new events
Kind: event emitted by LiveReceiver
Emitted after event receiver is ready
Kind: event emitted by LiveReceiver
Event Stream - Poll Face
Kind: global class
Latest supported FTN17 version
Kind: static property of PollFace
Latest supported FTN4 version
Kind: static property of PollFace
CCM registration helper
Kind: static method of PollFace
Param | Type | Default | Description |
---|---|---|---|
as | AsyncSteps |
steps interface | |
ccm | AdvancedCCM |
CCM instance | |
name | string |
CCM registration name | |
endpoint | * |
see AdvancedCCM#register | |
[credentials] | * |
|
see AdvancedCCM#register |
[options] | object |
{} |
interface options |
[options.version] | string |
"1.0" |
interface version to use |
Event Stream - Poll Service Base
PollService.register(as, executor, options) ⇒ PollService
Register futoin.evt.poll interface with Executor
Kind: static method of PollService
Returns: PollService
- instance
Note: Chunk event count is lower then protocol permits by default as there is
a typical amount 64K futoin message limit.
Param | Type | Default | Description |
---|---|---|---|
as | AsyncSteps |
steps interface | |
executor | Executor |
executor instance | |
options | object |
implementation defined options | |
[options.allow_reliable] | boolean |
true |
allow reliable consumers |
[options.allow_polling] | boolean |
true |
allow polling calls |
[options.max_chunk_events] | integer |
100 |
maxium events per request |
Event Stream - Push Face
CCM registration helper
Kind: static method of PushFace
Param | Type | Default | Description |
---|---|---|---|
as | AsyncSteps |
steps interface | |
ccm | AdvancedCCM |
CCM instance | |
name | string |
CCM registration name | |
endpoint | * |
see AdvancedCCM#register | |
[credentials] | * |
|
see AdvancedCCM#register |
[options] | object |
{} |
interface options |
[options.version] | string |
"1.0" |
interface version to use |
Event Stream - Push Service Base
Kind: global class
- PushService
- instance
- static
Emitted in push error handlers
Kind: event emitted by PushService
Emitted in push error handlers
Kind: event emitted by PushService
PushService.register(as, executor, options) ⇒ PushService
Register futoin.evt.push interface with Executor
Kind: static method of PushService
Returns: PushService
- instance
Param | Type | Default | Description |
---|---|---|---|
as | AsyncSteps |
steps interface | |
executor | Executor |
executor instance | |
options | object |
implementation defined options | |
[options.allow_reliable] | boolean |
true |
allow reliable consumers |
[options.allow_polling] | boolean |
true |
allow polling calls |
Event Stream - Receiver Face
Kind: global class
Latest supported FTN17 version
Kind: static property of ReceiverFace
CCM registration helper
Kind: static method of ReceiverFace
Returns: string
- - iface:ver of registered interface
Param | Type | Default | Description |
---|---|---|---|
as | AsyncSteps |
steps interface | |
channel | ChannelContext |
Bi-Direction channel instance | |
[options] | object |
{} |
interface options |
[options.version] | string |
"1.0" |
interface version to use |
Base implementation for receiver side
Kind: global class
- ReceiverService
- instance
- static
Member to override to handle vents.
Kind: instance method of ReceiverService
Param | Type | Description |
---|---|---|
as | AsyncSteps |
AsyncSteps interface |
reqinfo | RequestInfo |
request info object |
events | array |
list of events |
ReceiverService.register(as, executor, options) ⇒ PushService
Register futoin.evt.receiver interface with Executor
Kind: static method of ReceiverService
Returns: PushService
- instance
Param | Type | Description |
---|---|---|
as | AsyncSteps |
steps interface |
executor | Executor |
executor instance |
options | object |
implementation defined options |
Reliable Event Receiver helper to minimize boilerplate code in projects.
Kind: global class
reliableReceiver._registerReceiver(as, executor, options) ⇒ ReliableReceiverService
Override to register custom instance of ReliableReceiverService.
Kind: instance method of ReliableReceiver
Returns: ReliableReceiverService
- instance of service
Param | Type | Description |
---|---|---|
as | AsyncSteps |
async steps interface |
executor | Executor |
Internal Executor instance |
options | object |
passed options |
Emitted for count of archived events in each iteration.
Kind: event emitted by ReliableReceiver
Base implementation for reliable receiver side.
Kind: global class
Note: Unlike ReceiverService, it restores proper order of events.
documented by jsdoc-to-markdown.