-
Notifications
You must be signed in to change notification settings - Fork 118
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add subscribePosition
to options and use mysql.format
#65
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,26 @@ zongji.start({ | |
}); | ||
``` | ||
|
||
## Subscribe Binary Log Position | ||
|
||
```javascript | ||
var zongji = new ZongJi({ /* ... MySQL Connection Settings ... */ }); | ||
|
||
// Receive `binlog_position` events | ||
zongji.on('binlog_position', function(pos) { | ||
console.log(pos); | ||
// { filename: 'mysql-bin.000001', position: 77656182 } | ||
}); | ||
|
||
// Need both `subscribePosition` and `rotate` to take effect | ||
zongji.start({ | ||
subscribePosition: true, | ||
includeEvents: ['tablemap', 'writerows', 'updaterows', 'deleterows', 'rotate'] | ||
}); | ||
``` | ||
> Notice: `pos.filename` won't be updated when `rotate` event is disabled | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should always receive rotate event when we subscribed position event. Otherwise, it sounds like a bug. |
||
|
||
|
||
For a complete implementation see [`example.js`](example.js)... | ||
|
||
## Installation | ||
|
@@ -81,6 +101,7 @@ Option Name | Type | Description | |
`startAtEnd` | `boolean` | Pass `true` to only emit binlog events that occur after ZongJi's instantiation. Must be used in `start()` method for effect.<br>**Default:** `false` | ||
`binlogName` | `string` | Begin reading events from this binlog file. If specified together with `binlogNextPos`, will take precedence over `startAtEnd`. | ||
`binlogNextPos` | `integer` | Begin reading events from this position. Must be included with `binlogName`. | ||
`subscribePosition` | `boolean` | Pass `true` to options `zongji.on('binlog_position', function(pos){...})` will receive an object of `{filename:..., position:...}` as argument, the filename properties won't be updated while `rotate` is disabled; | ||
`includeEvents` | `[string]` | Array of event names to include<br>**Example:** `['writerows', 'updaterows', 'deleterows']` | ||
`excludeEvents` | `[string]` | Array of event names to exclude<br>**Example:** `['rotate', 'tablemap']` | ||
`includeSchema` | `object` | Object describing which databases and tables to include (Only for row events). Use database names as the key and pass an array of table names or `true` (for the entire database).<br>**Example:** ```{ 'my_database': ['allow_table', 'another_table'], 'another_db': true }``` | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -85,8 +85,8 @@ ZongJi.prototype._init = function() { | |
name: '_findBinlogEnd', | ||
callback: function(result){ | ||
if(result && self.options.startAtEnd){ | ||
binlogOptions.filename = result.Log_name; | ||
binlogOptions.position = result.File_size; | ||
self.binlogName = binlogOptions.filename = result.Log_name; | ||
self.binlogNextPos = binlogOptions.position = result.File_size; | ||
} | ||
} | ||
} | ||
|
@@ -114,8 +114,14 @@ ZongJi.prototype._init = function() { | |
} | ||
|
||
if(('binlogName' in self.options) && ('binlogNextPos' in self.options)) { | ||
binlogOptions.filename = self.options.binlogName; | ||
binlogOptions.position = self.options.binlogNextPos | ||
self.binlogName = binlogOptions.filename = self.options.binlogName; | ||
self.binlogNextPos = binlogOptions.position = self.options.binlogNextPos | ||
} | ||
if(self.options.subscribePosition){ | ||
self.emit('binlog_position', { | ||
filename: self.binlogName, | ||
position: self.binlogNextPos | ||
}) | ||
} | ||
|
||
self.binlog = generateBinlog.call(self, binlogOptions); | ||
|
@@ -186,12 +192,12 @@ ZongJi.prototype._executeCtrlCallbacks = function() { | |
var tableInfoQueryTemplate = 'SELECT ' + | ||
'COLUMN_NAME, COLLATION_NAME, CHARACTER_SET_NAME, ' + | ||
'COLUMN_COMMENT, COLUMN_TYPE ' + | ||
"FROM information_schema.columns " + "WHERE table_schema='%s' AND table_name='%s'"; | ||
"FROM information_schema.columns " + "WHERE table_schema=? AND table_name=?"; | ||
|
||
ZongJi.prototype._fetchTableInfo = function(tableMapEvent, next) { | ||
var self = this; | ||
var sql = util.format(tableInfoQueryTemplate, | ||
tableMapEvent.schemaName, tableMapEvent.tableName); | ||
var sql = mysql.format(tableInfoQueryTemplate, | ||
[tableMapEvent.schemaName, tableMapEvent.tableName]); | ||
|
||
this.ctrlConnection.query(sql, function(err, rows) { | ||
if (err) { | ||
|
@@ -233,6 +239,30 @@ ZongJi.prototype.start = function(options) { | |
self.connection._implyConnect(); | ||
self.connection._protocol._enqueue(new self.binlog(function(error, event){ | ||
if(error) return self.emit('error', error); | ||
// Add before `event === undefined || event._filtered === true`, so that all nextPosition can be caught | ||
if(event){ | ||
var changed = false; | ||
if(event.getTypeName() == 'Rotate'){ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here and many other places, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just like you say, '==' and '!=' is not strict. :) |
||
if(self.binlogName != event.binlogName || self.binlogNextPos != event.position){ | ||
self.binlogName = event.binlogName; | ||
// Use event.position because event.nextPosition is incorrect while rotate | ||
self.binlogNextPos = event.position; | ||
changed = true; | ||
} | ||
}else if(event.nextPosition){ | ||
if(self.binlogNextPos != event.nextPosition){ | ||
self.binlogNextPos = event.nextPosition; | ||
changed = true; | ||
} | ||
} | ||
// Check if need to send `binlog_position` event | ||
if(self.options.subscribePosition === true && changed){ | ||
self.emit('binlog_position', { | ||
filename: self.binlogName, | ||
position: self.binlogNextPos | ||
}) | ||
} | ||
} | ||
// Do not emit events that have been filtered out | ||
if(event === undefined || event._filtered === true) return; | ||
|
||
|
@@ -251,13 +281,7 @@ ZongJi.prototype.start = function(options) { | |
return; | ||
} | ||
break; | ||
case 'Rotate': | ||
if (self.binlogName !== event.binlogName) { | ||
self.binlogName = event.binlogName; | ||
} | ||
break; | ||
} | ||
self.binlogNextPos = event.nextPosition; | ||
self.emit('binlog', event); | ||
})); | ||
}; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
var ZongJi = require('./../'); | ||
var getEventClass = require('./../lib/code_map').getEventClass; | ||
var settings = require('./settings/mysql'); | ||
var connector = require('./helpers/connector'); | ||
var querySequence = require('./helpers/querySequence'); | ||
var util = require('util') | ||
|
||
var conn = process.testZongJi || {}; | ||
|
||
module.exports = { | ||
setUp: function (done) { | ||
if (!conn.db) { | ||
process.testZongJi = connector.call(conn, settings, done); | ||
} else { | ||
conn.incCount(); | ||
done(); | ||
} | ||
}, | ||
tearDown: function (done) { | ||
if (conn) { | ||
conn.eventLog.splice(0, conn.eventLog.length); | ||
conn.errorLog.splice(0, conn.errorLog.length); | ||
conn.closeIfInactive(1000); | ||
} | ||
done(); | ||
}, | ||
position: function (test) { | ||
var TEST_TABLE = 'binlog_position'; | ||
var events = []; | ||
|
||
var zongji = new ZongJi(settings.connection); | ||
|
||
var positions = [] | ||
zongji.on('binlog', function(event){ | ||
positions.push({ | ||
filename: zongji.binlogName, | ||
position: zongji.binlogNextPos | ||
}) | ||
}); | ||
zongji.start({ | ||
startAtEnd: true, | ||
serverId: 10, // Second instance must not use same server ID | ||
subscribePosition: true, | ||
includeEvents: ['tablemap', 'writerows', 'updaterows', 'deleterows', 'rotate'] | ||
}); | ||
|
||
setTimeout(function() { | ||
querySequence(conn.db, [ | ||
'FLUSH LOGS', | ||
'DROP TABLE IF EXISTS ' + conn.escId(TEST_TABLE), | ||
'CREATE TABLE ' + conn.escId(TEST_TABLE) + ' (col INT UNSIGNED)', | ||
'INSERT INTO ' + conn.escId(TEST_TABLE) + ' (col) VALUES (1)', | ||
'UPDATE ' + conn.escId(TEST_TABLE) + ' SET col=2', | ||
'DELETE FROM ' + conn.escId(TEST_TABLE), | ||
'FLUSH LOGS', | ||
], function (error, results) { | ||
if (error) { | ||
console.error(error); | ||
zongji.stop(); | ||
test.done(); | ||
return; | ||
} | ||
// newer position should bigger than previous position | ||
var lastPos; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is some spaghetti logic. If you're testing that the position increases with each event, just do it directly. var lastPos = null;
zongji.on('position', function() {
lastPos !== null && test.ok(zongji.binlogNextPos <= lastPos);
lastPos = zongji.binlogNextPos;
}); You might also want to test that there are the correct number of position events. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have a consider of check the number of |
||
for(var n=0; n<positions.length; n++){ | ||
var item = positions[n]; | ||
if(lastPos){ | ||
if(item.filename == lastPos.filename){ | ||
test.ok(item.position >= lastPos.position, util.format('same filename, position should increase, but from %s to %s', lastPos.position, item.position)); | ||
}else{ | ||
test.ok(item.filename > lastPos.filename, util.format('filename should increase, but from %s to %s', lastPos.filename, item.filename)); | ||
} | ||
} | ||
lastPos = item; | ||
} | ||
zongji.stop(); | ||
test.done(); | ||
}) | ||
}, 1000); | ||
}, | ||
subscribe_position: function (test) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the difference between these tests? |
||
var TEST_TABLE = 'binlog_position'; | ||
var events = []; | ||
|
||
var zongji = new ZongJi(settings.connection); | ||
|
||
var positions = [] | ||
zongji.on('binlog_position', function(pos){ | ||
positions.push(pos) | ||
}); | ||
zongji.start({ | ||
startAtEnd: true, | ||
serverId: 11, // Second instance must not use same server ID | ||
subscribePosition: true, | ||
includeEvents: ['tablemap', 'writerows', 'updaterows', 'deleterows', 'rotate'] | ||
}); | ||
|
||
setTimeout(function() { | ||
querySequence(conn.db, [ | ||
'FLUSH LOGS', | ||
'DROP TABLE IF EXISTS ' + conn.escId(TEST_TABLE), | ||
'CREATE TABLE ' + conn.escId(TEST_TABLE) + ' (col INT UNSIGNED)', | ||
'INSERT INTO ' + conn.escId(TEST_TABLE) + ' (col) VALUES (1)', | ||
'UPDATE ' + conn.escId(TEST_TABLE) + ' SET col=2', | ||
'DELETE FROM ' + conn.escId(TEST_TABLE), | ||
'FLUSH LOGS', | ||
], function (error, results) { | ||
if (error) { | ||
console.error(error); | ||
zongji.stop(); | ||
test.done(); | ||
return; | ||
} | ||
// newer position should bigger than previous position | ||
var lastPos; | ||
for(var n=0; n<positions.length; n++){ | ||
var item = positions[n]; | ||
if(lastPos){ | ||
if(item.filename == lastPos.filename){ | ||
test.ok(item.position > lastPos.position, util.format('same filename, position should increase, but from %s to %s', lastPos.position, item.position)); | ||
}else{ | ||
test.ok(item.filename > lastPos.filename, util.format('filename should increase, but from %s to %s', lastPos.filename, item.filename)); | ||
} | ||
} | ||
lastPos = item; | ||
} | ||
zongji.stop(); | ||
test.done(); | ||
}) | ||
}, 1000); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of a section here, create a section just above
**Options Available:**
for**Events Emitted:**
and take out the current text for theon
method to bind handlers: