-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
317 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
-- | ||
-- Licensed to the Apache Software Foundation (ASF) under one or more | ||
-- contributor license agreements. See the NOTICE file distributed with | ||
-- this work for additional information regarding copyright ownership. | ||
-- The ASF licenses this file to You under the Apache License, Version 2.0 | ||
-- (the "License"); you may not use this file except in compliance with | ||
-- the License. You may obtain a copy of the License at | ||
-- | ||
-- http://www.apache.org/licenses/LICENSE-2.0 | ||
-- | ||
-- Unless required by applicable law or agreed to in writing, software | ||
-- distributed under the License is distributed on an "AS IS" BASIS, | ||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
-- See the License for the specific language governing permissions and | ||
-- limitations under the License. | ||
-- | ||
|
||
local ws_client = require "resty.websocket.client" | ||
local protoc = require("protoc") | ||
local pb = require("pb") | ||
|
||
local _M = {} | ||
local mt = { __index = _M } | ||
|
||
|
||
local pb_state | ||
local function load_proto() | ||
pb.state(nil) | ||
protoc.reload() | ||
pb.option("int64_as_string") | ||
local pubsub_protoc = protoc.new() | ||
pubsub_protoc:addpath("apisix/include/apisix/model") | ||
local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto") | ||
if not ok then | ||
ngx.log(ngx.ERR, "failed to load protocol: "..err) | ||
return err | ||
end | ||
pb_state = pb.state(nil) | ||
end | ||
|
||
|
||
local function init_websocket_client(endpoint) | ||
local ws, err = ws_client:new() | ||
if not ws then | ||
ngx.log(ngx.ERR, "failed to create websocket client: "..err) | ||
return nil, err | ||
end | ||
local ok, err = ws:connect(endpoint) | ||
if not ok then | ||
ngx.log(ngx.ERR, "failed to connect: "..err) | ||
return nil, err | ||
end | ||
return ws | ||
end | ||
|
||
|
||
function _M.new_ws(server) | ||
local err = load_proto() | ||
if err then | ||
return nil, err | ||
end | ||
local ws, err = init_websocket_client(server) | ||
if not ws then | ||
return nil, err | ||
end | ||
|
||
local obj = setmetatable({ | ||
type = "ws", | ||
ws_client = ws, | ||
}, mt) | ||
|
||
return obj | ||
end | ||
|
||
|
||
function _M.send_recv_ws(self, data) | ||
pb.state(pb_state) | ||
local ws = self.ws_client | ||
local _, err = ws:send_binary(pb.encode("PubSubReq", data)) | ||
if err then | ||
return nil, err | ||
end | ||
local raw_data, _, err = ws:recv_frame() | ||
if not raw_data then | ||
ngx.log(ngx.ERR, "failed to receive the frame: ", err) | ||
return nil, err | ||
end | ||
local data, err = pb.decode("PubSubResp", raw_data) | ||
if not data then | ||
ngx.log(ngx.ERR, "failed to decode the frame: ", err) | ||
return nil, err | ||
end | ||
|
||
return data | ||
end | ||
|
||
|
||
function _M.close_ws(self) | ||
self.ws_client:send_close() | ||
end | ||
|
||
|
||
return _M |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,214 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
use t::APISIX 'no_plan'; | ||
|
||
repeat_each(1); | ||
no_long_string(); | ||
no_root_location(); | ||
|
||
add_block_preprocessor(sub { | ||
my ($block) = @_; | ||
|
||
if ((!defined $block->error_log) && (!defined $block->no_error_log)) { | ||
$block->set_value("no_error_log", "[error]"); | ||
} | ||
|
||
if (!defined $block->request) { | ||
$block->set_value("request", "GET /t"); | ||
} | ||
}); | ||
|
||
run_tests(); | ||
|
||
__DATA__ | ||
=== TEST 1: setup all-in-one test | ||
--- config | ||
location /t { | ||
content_by_lua_block { | ||
local data = { | ||
{ | ||
url = "/apisix/admin/routes/kafka", | ||
data = [[{ | ||
"upstream": { | ||
"nodes": { | ||
"127.0.0.1:9092": 1 | ||
}, | ||
"type": "none", | ||
"scheme": "kafka" | ||
}, | ||
"uri": "/kafka" | ||
}]], | ||
}, | ||
{ | ||
url = "/apisix/admin/routes/kafka-tlsv", | ||
data = [[{ | ||
"upstream": { | ||
"nodes": { | ||
"127.0.0.1:9093": 1 | ||
}, | ||
"type": "none", | ||
"scheme": "kafka", | ||
"tls": { | ||
"verify": true | ||
} | ||
}, | ||
"uri": "/kafka-tlsv" | ||
}]], | ||
}, | ||
{ | ||
url = "/apisix/admin/routes/kafka-tls", | ||
data = [[{ | ||
"upstream": { | ||
"nodes": { | ||
"127.0.0.1:9093": 1 | ||
}, | ||
"type": "none", | ||
"scheme": "kafka", | ||
"tls": { | ||
"verify": false | ||
} | ||
}, | ||
"uri": "/kafka-tls" | ||
}]], | ||
}, | ||
{ | ||
url = "/apisix/admin/routes/kafka-sasl", | ||
data = [[{ | ||
"upstream": { | ||
"nodes": { | ||
"127.0.0.1:9094": 1 | ||
}, | ||
"type": "none", | ||
"scheme": "kafka" | ||
}, | ||
"uri": "/kafka-sasl", | ||
"plugins": { | ||
"kafka-proxy": { | ||
"enable_sasl": true, | ||
"sasl": { | ||
"username": "admin", | ||
"password": "admin-secret" | ||
} | ||
} | ||
} | ||
}]], | ||
} | ||
} | ||
local t = require("lib.test_admin").test | ||
for _, data in ipairs(data) do | ||
local code, body = t(data.url, ngx.HTTP_PUT, data.data) | ||
ngx.say(code..body) | ||
end | ||
} | ||
} | ||
--- response_body eval | ||
"201passed\n"x4 | ||
=== TEST 2: hit route (with HTTP request) | ||
--- request | ||
GET /kafka | ||
--- error_code: 400 | ||
--- error_log | ||
failed to initialize pub-sub module, err: bad "upgrade" request header: nil | ||
=== TEST 3: hit route (normal Kafka) | ||
--- config | ||
location /t { | ||
content_by_lua_block { | ||
local lib_pubsub = require("lib.pubsub") | ||
local test_pubsub, err = lib_pubsub.new_ws("ws://127.0.0.1:1984/kafka") | ||
if not test_pubsub then | ||
ngx.say(err) | ||
return | ||
end | ||
local data = { | ||
{ | ||
sequence = 0, | ||
cmd_kafka_list_offset = { | ||
topic = "not-exist", | ||
partition = 0, | ||
timestamp = -1, | ||
}, | ||
}, | ||
{ | ||
sequence = 1, | ||
cmd_kafka_fetch = { | ||
topic = "not-exist", | ||
partition = 0, | ||
offset = 0, | ||
}, | ||
}, | ||
{ | ||
sequence = 2, | ||
cmd_kafka_list_offset = { | ||
topic = "test-consumer", | ||
partition = 0, | ||
timestamp = -2, | ||
}, | ||
}, | ||
{ | ||
sequence = 3, | ||
cmd_kafka_list_offset = { | ||
topic = "test-consumer", | ||
partition = 0, | ||
timestamp = -1, | ||
}, | ||
}, | ||
{ | ||
sequence = 4, | ||
cmd_kafka_fetch = { | ||
topic = "test-consumer", | ||
partition = 0, | ||
offset = 14, | ||
}, | ||
} | ||
} | ||
for i = 1, #data do | ||
local data, err = test_pubsub:send_recv_ws(data[i]) | ||
if not data then | ||
ngx.say(err) | ||
return | ||
end | ||
if data.error_resp then | ||
ngx.say(data.sequence..data.error_resp.message) | ||
end | ||
if data.kafka_list_offset_resp then | ||
ngx.say(data.sequence.."offset: "..data.kafka_list_offset_resp.offset) | ||
end | ||
if data.kafka_fetch_resp then | ||
ngx.say(data.sequence.."offset: "..data.kafka_fetch_resp.messages[1].offset.. | ||
" msg: "..data.kafka_fetch_resp.messages[1].value) | ||
end | ||
end | ||
test_pubsub:close_ws() | ||
} | ||
} | ||
--- response_body | ||
0failed to list offset, topic: not-exist, partition: 0, err: not found topic | ||
1failed to fetch message, topic: not-exist, partition: 0, err: not found topic | ||
2offset: 0 | ||
3offset: 30 | ||
4offset: 14 msg: testmsg15 |