Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add subscribePosition to options and use mysql.format #65

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,26 @@ zongji.start({
});
```

## Subscribe Binary Log Position
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of a section here, create a section just above **Options Available:** for **Events Emitted:** and take out the current text for the on method to bind handlers:

Add a listener to the binlog or error event. Each handler function accepts one argument.


```javascript
var zongji = new ZongJi({ /* ... MySQL Connection Settings ... */ });

// Receive `binlog_position` events
zongji.on('binlog_position', function(pos) {
console.log(pos);
// { filename: 'mysql-bin.000001', position: 77656182 }
});

// Need both `subscribePosition` and `rotate` to take effect
zongji.start({
subscribePosition: true,
includeEvents: ['tablemap', 'writerows', 'updaterows', 'deleterows', 'rotate']
});
```
> Notice: `pos.filename` won't be updated when `rotate` event is disabled
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should always receive rotate event when we subscribed position event. Otherwise, it sounds like a bug.



For a complete implementation see [`example.js`](example.js)...

## Installation
Expand Down Expand Up @@ -81,6 +101,7 @@ Option Name | Type | Description
`startAtEnd` | `boolean` | Pass `true` to only emit binlog events that occur after ZongJi's instantiation. Must be used in `start()` method for effect.<br>**Default:** `false`
`binlogName` | `string` | Begin reading events from this binlog file. If specified together with `binlogNextPos`, will take precedence over `startAtEnd`.
`binlogNextPos` | `integer` | Begin reading events from this position. Must be included with `binlogName`.
`subscribePosition` | `boolean` | Pass `true` to options `zongji.on('binlog_position', function(pos){...})` will receive an object of `{filename:..., position:...}` as argument, the filename properties won't be updated while `rotate` is disabled;
`includeEvents` | `[string]` | Array of event names to include<br>**Example:** `['writerows', 'updaterows', 'deleterows']`
`excludeEvents` | `[string]` | Array of event names to exclude<br>**Example:** `['rotate', 'tablemap']`
`includeSchema` | `object` | Object describing which databases and tables to include (Only for row events). Use database names as the key and pass an array of table names or `true` (for the entire database).<br>**Example:** ```{ 'my_database': ['allow_table', 'another_table'], 'another_db': true }```
Expand Down
50 changes: 37 additions & 13 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ ZongJi.prototype._init = function() {
name: '_findBinlogEnd',
callback: function(result){
if(result && self.options.startAtEnd){
binlogOptions.filename = result.Log_name;
binlogOptions.position = result.File_size;
self.binlogName = binlogOptions.filename = result.Log_name;
self.binlogNextPos = binlogOptions.position = result.File_size;
}
}
}
Expand Down Expand Up @@ -114,8 +114,14 @@ ZongJi.prototype._init = function() {
}

if(('binlogName' in self.options) && ('binlogNextPos' in self.options)) {
binlogOptions.filename = self.options.binlogName;
binlogOptions.position = self.options.binlogNextPos
self.binlogName = binlogOptions.filename = self.options.binlogName;
self.binlogNextPos = binlogOptions.position = self.options.binlogNextPos
}
if(self.options.subscribePosition){
self.emit('binlog_position', {
filename: self.binlogName,
position: self.binlogNextPos
})
}

self.binlog = generateBinlog.call(self, binlogOptions);
Expand Down Expand Up @@ -186,12 +192,12 @@ ZongJi.prototype._executeCtrlCallbacks = function() {
var tableInfoQueryTemplate = 'SELECT ' +
'COLUMN_NAME, COLLATION_NAME, CHARACTER_SET_NAME, ' +
'COLUMN_COMMENT, COLUMN_TYPE ' +
"FROM information_schema.columns " + "WHERE table_schema='%s' AND table_name='%s'";
"FROM information_schema.columns " + "WHERE table_schema=? AND table_name=?";

ZongJi.prototype._fetchTableInfo = function(tableMapEvent, next) {
var self = this;
var sql = util.format(tableInfoQueryTemplate,
tableMapEvent.schemaName, tableMapEvent.tableName);
var sql = mysql.format(tableInfoQueryTemplate,
[tableMapEvent.schemaName, tableMapEvent.tableName]);

this.ctrlConnection.query(sql, function(err, rows) {
if (err) {
Expand Down Expand Up @@ -233,6 +239,30 @@ ZongJi.prototype.start = function(options) {
self.connection._implyConnect();
self.connection._protocol._enqueue(new self.binlog(function(error, event){
if(error) return self.emit('error', error);
// Add before `event === undefined || event._filtered === true`, so that all nextPosition can be caught
if(event){
var changed = false;
if(event.getTypeName() == 'Rotate'){
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and many other places, ==, != is used where ===, !== should be used.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just like you say, '==' and '!=' is not strict. :)

if(self.binlogName != event.binlogName || self.binlogNextPos != event.position){
self.binlogName = event.binlogName;
// Use event.position because event.nextPosition is incorrect while rotate
self.binlogNextPos = event.position;
changed = true;
}
}else if(event.nextPosition){
if(self.binlogNextPos != event.nextPosition){
self.binlogNextPos = event.nextPosition;
changed = true;
}
}
// Check if need to send `binlog_position` event
if(self.options.subscribePosition === true && changed){
self.emit('binlog_position', {
filename: self.binlogName,
position: self.binlogNextPos
})
}
}
// Do not emit events that have been filtered out
if(event === undefined || event._filtered === true) return;

Expand All @@ -251,13 +281,7 @@ ZongJi.prototype.start = function(options) {
return;
}
break;
case 'Rotate':
if (self.binlogName !== event.binlogName) {
self.binlogName = event.binlogName;
}
break;
}
self.binlogNextPos = event.nextPosition;
self.emit('binlog', event);
}));
};
Expand Down
132 changes: 132 additions & 0 deletions test/position.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
var ZongJi = require('./../');
var getEventClass = require('./../lib/code_map').getEventClass;
var settings = require('./settings/mysql');
var connector = require('./helpers/connector');
var querySequence = require('./helpers/querySequence');
var util = require('util')

var conn = process.testZongJi || {};

module.exports = {
setUp: function (done) {
if (!conn.db) {
process.testZongJi = connector.call(conn, settings, done);
} else {
conn.incCount();
done();
}
},
tearDown: function (done) {
if (conn) {
conn.eventLog.splice(0, conn.eventLog.length);
conn.errorLog.splice(0, conn.errorLog.length);
conn.closeIfInactive(1000);
}
done();
},
position: function (test) {
var TEST_TABLE = 'binlog_position';
var events = [];

var zongji = new ZongJi(settings.connection);

var positions = []
zongji.on('binlog', function(event){
positions.push({
filename: zongji.binlogName,
position: zongji.binlogNextPos
})
});
zongji.start({
startAtEnd: true,
serverId: 10, // Second instance must not use same server ID
subscribePosition: true,
includeEvents: ['tablemap', 'writerows', 'updaterows', 'deleterows', 'rotate']
});

setTimeout(function() {
querySequence(conn.db, [
'FLUSH LOGS',
'DROP TABLE IF EXISTS ' + conn.escId(TEST_TABLE),
'CREATE TABLE ' + conn.escId(TEST_TABLE) + ' (col INT UNSIGNED)',
'INSERT INTO ' + conn.escId(TEST_TABLE) + ' (col) VALUES (1)',
'UPDATE ' + conn.escId(TEST_TABLE) + ' SET col=2',
'DELETE FROM ' + conn.escId(TEST_TABLE),
'FLUSH LOGS',
], function (error, results) {
if (error) {
console.error(error);
zongji.stop();
test.done();
return;
}
// newer position should bigger than previous position
var lastPos;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is some spaghetti logic. If you're testing that the position increases with each event, just do it directly.

var lastPos = null;
zongji.on('position', function() {
  lastPos !== null && test.ok(zongji.binlogNextPos <= lastPos);
  lastPos = zongji.binlogNextPos;
});

You might also want to test that there are the correct number of position events.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a consider of check the number of position events, but I'm not sure if the binlog event is triggered as provided number.

for(var n=0; n<positions.length; n++){
var item = positions[n];
if(lastPos){
if(item.filename == lastPos.filename){
test.ok(item.position >= lastPos.position, util.format('same filename, position should increase, but from %s to %s', lastPos.position, item.position));
}else{
test.ok(item.filename > lastPos.filename, util.format('filename should increase, but from %s to %s', lastPos.filename, item.filename));
}
}
lastPos = item;
}
zongji.stop();
test.done();
})
}, 1000);
},
subscribe_position: function (test) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between these tests?

var TEST_TABLE = 'binlog_position';
var events = [];

var zongji = new ZongJi(settings.connection);

var positions = []
zongji.on('binlog_position', function(pos){
positions.push(pos)
});
zongji.start({
startAtEnd: true,
serverId: 11, // Second instance must not use same server ID
subscribePosition: true,
includeEvents: ['tablemap', 'writerows', 'updaterows', 'deleterows', 'rotate']
});

setTimeout(function() {
querySequence(conn.db, [
'FLUSH LOGS',
'DROP TABLE IF EXISTS ' + conn.escId(TEST_TABLE),
'CREATE TABLE ' + conn.escId(TEST_TABLE) + ' (col INT UNSIGNED)',
'INSERT INTO ' + conn.escId(TEST_TABLE) + ' (col) VALUES (1)',
'UPDATE ' + conn.escId(TEST_TABLE) + ' SET col=2',
'DELETE FROM ' + conn.escId(TEST_TABLE),
'FLUSH LOGS',
], function (error, results) {
if (error) {
console.error(error);
zongji.stop();
test.done();
return;
}
// newer position should bigger than previous position
var lastPos;
for(var n=0; n<positions.length; n++){
var item = positions[n];
if(lastPos){
if(item.filename == lastPos.filename){
test.ok(item.position > lastPos.position, util.format('same filename, position should increase, but from %s to %s', lastPos.position, item.position));
}else{
test.ok(item.filename > lastPos.filename, util.format('filename should increase, but from %s to %s', lastPos.filename, item.filename));
}
}
lastPos = item;
}
zongji.stop();
test.done();
})
}, 1000);
}
}