From 7249ef3ee776c66acc95036dc76a2d08dc3f6350 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Wed, 15 Sep 2010 11:39:35 +0700 Subject: [PATCH] WebSocket proxy support, fixed 304 code halting --- lib/node-http-proxy.js | 221 +++++++++++++++++++++++-- test/node-http-proxy-websocket-test.js | 51 ++++++ 2 files changed, 255 insertions(+), 17 deletions(-) create mode 100644 test/node-http-proxy-websocket-test.js diff --git a/lib/node-http-proxy.js b/lib/node-http-proxy.js index a9a34adbd..2c6c3861b 100644 --- a/lib/node-http-proxy.js +++ b/lib/node-http-proxy.js @@ -23,12 +23,12 @@ WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ - + var sys = require('sys'), http = require('http'), events = require('events'), pool = require('pool'), - min = 0, + min = 0, max = 100; // Setup the PoolManager @@ -42,24 +42,28 @@ exports.createServer = function () { callback = typeof args[args.length - 1] === 'function' && args.pop(); if (args[0]) port = args[0]; if (args[1]) host = args[1]; - + var server = http.createServer(function (req, res){ var proxy = new HttpProxy(req, res); - + proxy.emitter.on('proxy', function (err, body) { server.emit('proxy', err, body); }); - + // If we were passed a callback to process the request // or response in some way, then call it. if(callback) { callback(req, res, proxy); } - else { + else { proxy.proxyRequest(port, server); } }); - + + // WebSocket support + server.on('update', function() { + }); + return server; }; @@ -73,12 +77,21 @@ exports.setMax = function (value) { manager.setMaxClients(max); }; -var HttpProxy = function (req, res) { +var HttpProxy = function (req, res, /* optional */ head) { this.emitter = new(events.EventEmitter); this.events = {}; this.req = req; - this.res = res; - this.watch(req); + // If this request is upgrade request + // No response will be passed + if (!req.headers.upgrade) { + this.res = res; + this.watch(req); + } else { + // Second argument will be socket + this.sock = res; + this.head = head; + this.watch(res); + } }; HttpProxy.prototype = { @@ -90,7 +103,7 @@ HttpProxy.prototype = { } return arr; }, - + watch: function (req) { this.events = []; var self = this; @@ -105,11 +118,11 @@ HttpProxy.prototype = { req.addListener('data', this.onData); req.addListener('end', this.onEnd); }, - + unwatch: function (req) { req.removeListener('data', this.onData); req.removeListener('end', this.onEnd); - + // Rebroadcast any events that have been buffered for (var i = 0, len = this.events.length; i < len; ++i) { req.emit.apply(req, this.events[i]); @@ -120,13 +133,13 @@ HttpProxy.prototype = { // Remark: nodeProxy.body exists solely for testability var self = this, req = this.req, res = this.res; self.body = ''; - + // Open new HTTP request to internal resource with will act as a reverse proxy pass var p = manager.getPool(port, server); - + p.on('error', function (err) { // Remark: We should probably do something here - // but this is a hot-fix because I don't think 'pool' + // but this is a hot-fix because I don't think 'pool' // should be emitting this event. }); @@ -141,7 +154,7 @@ HttpProxy.prototype = { res.end(); }; - + // Add a listener for the connection timeout event reverse_proxy.addListener('error', error); @@ -155,6 +168,13 @@ HttpProxy.prototype = { // Set the response headers of the client response res.writeHead(response.statusCode, response.headers); + // Status code = 304 + // No 'data' event and no 'end' + if (response.statusCode === 304) { + res.end(); + return; + } + // Add event handler for the proxied response in chunks response.addListener('data', function (chunk) { if(req.method !== 'HEAD') { @@ -184,6 +204,173 @@ HttpProxy.prototype = { self.unwatch(req); }); + }, + + /** + * WebSocket Tunnel realization + * Copyright (c) 2010 Fedor Indutny : http://github.com/donnerjack13589 + */ + proxyWebSocketRequest: function (port, server, host /* optional */) { + var self = this, update = self.update, req = self.req, socket = self.sock, + head = self.head, headers = new _headers(req.headers), CRLF = '\r\n', + listeners = {}; + + // Will generate clone of headers + // To not change original + function _headers(headers) { + var h = {}; + for (var i in headers) { + h[i] = headers[i]; + } + return h; + } + + // WebSocket requests has + // method = GET + if (req.method !== 'GET' || headers.upgrade.toLowerCase() !== 'websocket') { + // This request is not WebSocket request + return; + } + + // Turn of all bufferings + // For server set KeepAlive + // For client set encoding + function _socket(socket, server) { + socket.setTimeout(0); + socket.setNoDelay(true); + if (server) { + socket.setKeepAlive(true, 0); + } else { + socket.setEncoding('utf8'); + } + } + + // Client socket + _socket(socket); + + // If host is undefined + // Get it from headers + if (!host) { + host = headers.Host; + } + // Remote host address + var remote_host = server + (port - 80 === 0 ? '' : ':' + port); + + // Change headers + headers.Host = remote_host; + headers.Origin = 'http://' + remote_host; + + // Open request + var p = manager.getPool(port, server); + + p.getClient(function(client) { + // Based on 'pool/main.js' + var request = client.request('GET', req.url, headers); + + var errorListener = function (error) { + p.emit('error', error); + request.emit('error', error); + socket.end(); + } + + // Not disconnect on update + client.on('upgrade', function(request, remote_socket, head) { + // Prepare socket + _socket(remote_socket, true); + + // Emit event + onUpgrade(remote_socket); + }); + + client.on('error', errorListener); + request.on('response', function (response) { + response.on('end', function () { + client.removeListener('error', errorListener); + client.busy = false; + p.onFree(client); + }) + }) + client.busy = true; + + var t; + request.socket.on('data', t = function(data) { + // Handshaking + + // Ok, kind of harmfull part of code + // Socket.IO is sending hash at the end of handshake + // If protocol = 76 + // But we need to replace 'host' and 'origin' in response + // So we split data to printable data and to non-printable + // (Non-printable will come after double-CRLF) + var sdata = data.toString(); + + // Get Printable + sdata = sdata + .substr(0, sdata.search(CRLF + CRLF)); + + // Get Non-Printable + data = data.slice(Buffer.byteLength(sdata), data.length); + + // Replace host and origin + sdata = sdata + .replace(remote_host, host) + .replace(remote_host, host); + + // Write printable + socket.write(sdata); + + // Write non-printable + socket.write(data); + + // Remove data listener + request.socket.removeListener('data', t); + }); + + // Write upgrade-head + request.write(head); + self.unwatch(socket); + }); + + // Request + + function onUpgrade(reverse_proxy) { + // We're now connected to the server + // So lets change server socket + + reverse_proxy.on('data', listeners._r_data = function(data) { + // Pass data to client + if (socket.writable) { + socket.write(data); + } + }); + + socket.on('data', listeners._data = function(data){ + // Pass data from client to server + // Socket thougth that it isn't writable + reverse_proxy.write(data); + }); + + // Detach event listeners from reverse_proxy + function detach() { + reverse_proxy.removeListener('close', listeners._r_close); + reverse_proxy.removeListener('data', listeners._r_data); + socket.removeListener('data', listeners._data); + socket.removeListener('close', listeners._close); + } + + // Hook disconnections + reverse_proxy.on('close', listeners._r_close = function() { + socket.end(); + detach(); + }); + + socket.on('close', listeners._close = function() { + reverse_proxy.end(); + detach(); + }); + + }; + } }; diff --git a/test/node-http-proxy-websocket-test.js b/test/node-http-proxy-websocket-test.js new file mode 100644 index 000000000..c56370e56 --- /dev/null +++ b/test/node-http-proxy-websocket-test.js @@ -0,0 +1,51 @@ +/* + node-http-proxy-websocket-test.js: http proxy for node.js + + Copyright (c) 2010 Fedor Indutny + + Permission is hereby granted, free of charge, to any person obtaining + a copy of this software and associated documentation files (the + "Software"), to deal in the Software without restriction, including + without limitation the rights to use, copy, modify, merge, publish, + distribute, sublicense, and/or sell copies of the Software, and to + permit persons to whom the Software is furnished to do so, subject to + the following conditions: + + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +*/ + +var vows = require('vows'), + sys = require('sys'), + assert = require('assert'), + http = require('http'); + +var httpProxy = require('./../lib/node-http-proxy'); +var testServers = {}; + +var server = httpProxy.createServer(function(req, res) { + var p = new httpProxy.HttpProxy(req, res); + + sys.puts('http request'); + + p.proxyRequest(8080, 'localhost'); +}); + +server.on('upgrade', function(req, socket, head) { + var p = new httpProxy.HttpProxy(req, socket, head); + + sys.puts('websocket request'); + + p.proxyWebSocketRequest(8080, 'localhost'); +}); + +server.listen(80);