Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat(bulk_load): add start bulk load http interface (#693)
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong authored Dec 17, 2020
1 parent 67e06d2 commit 1d88c1d
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 1 deletion.
3 changes: 3 additions & 0 deletions src/meta/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ void bulk_load_service::initialize_bulk_load_service()
// ThreadPool: THREAD_POOL_META_SERVER
void bulk_load_service::on_start_bulk_load(start_bulk_load_rpc rpc)
{
FAIL_POINT_INJECT_F("meta_on_start_bulk_load",
[=](dsn::string_view) { rpc.response().err = ERR_OK; });

const auto &request = rpc.request();
auto &response = rpc.response();
response.err = ERR_OK;
Expand Down
56 changes: 55 additions & 1 deletion src/meta/meta_http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include <string>

#include <dsn/c/api_layer1.h>
#include <dsn/cpp/json_helper.h>
#include <dsn/cpp/serialization_helper/dsn.layer2_types.h>
#include <dsn/dist/replication/replication_types.h>
#include <dsn/dist/replication/duplication_common.h>
Expand Down Expand Up @@ -611,6 +610,61 @@ void meta_http_service::query_duplication_handler(const http_request &req, http_
resp.body = duplication_query_response_to_string(rpc_resp);
}

void meta_http_service::start_bulk_load_handler(const http_request &req, http_response &resp)
{
if (!redirect_if_not_primary(req, resp)) {
return;
}

if (_service->_bulk_load_svc == nullptr) {
resp.body = "bulk load is not enabled";
resp.status_code = http_status_code::not_found;
return;
}

start_bulk_load_request request;
bool ret = json::json_forwarder<start_bulk_load_request>::decode(req.body, request);
if (!ret) {
resp.body = "invalid request structure";
resp.status_code = http_status_code::bad_request;
return;
}
if (request.app_name.empty()) {
resp.body = "app_name should not be empty";
resp.status_code = http_status_code::bad_request;
return;
}
if (request.cluster_name.empty()) {
resp.body = "cluster_name should not be empty";
resp.status_code = http_status_code::bad_request;
return;
}
if (request.file_provider_type.empty()) {
resp.body = "file_provider_type should not be empty";
resp.status_code = http_status_code::bad_request;
return;
}
if (request.remote_root_path.empty()) {
resp.body = "remote_root_path should not be empty";
resp.status_code = http_status_code::bad_request;
return;
}

auto rpc_req = dsn::make_unique<start_bulk_load_request>(request);
start_bulk_load_rpc rpc(std::move(rpc_req), LPC_META_CALLBACK);
_service->_bulk_load_svc->on_start_bulk_load(rpc);

auto rpc_resp = rpc.response();
// output as json format
dsn::utils::table_printer tp;
tp.add_row_name_and_data("error", rpc_resp.err.to_string());
tp.add_row_name_and_data("hint_msg", rpc_resp.hint_msg);
std::ostringstream out;
tp.output(out, dsn::utils::table_printer::output_format::kJsonCompact);
resp.body = out.str();
resp.status_code = http_status_code::ok;
}

void meta_http_service::query_bulk_load_handler(const http_request &req, http_response &resp)
{
if (!redirect_if_not_primary(req, resp)) {
Expand Down
12 changes: 12 additions & 0 deletions src/meta/meta_http_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@

#include <algorithm>

#include <dsn/cpp/json_helper.h>
#include <dsn/http/http_server.h>

namespace dsn {
namespace replication {

NON_MEMBER_JSON_SERIALIZATION(
start_bulk_load_request, app_name, cluster_name, file_provider_type, remote_root_path)

class meta_service;
class meta_http_service : public http_service
{
Expand Down Expand Up @@ -59,6 +63,13 @@ class meta_http_service : public http_service
std::placeholders::_1,
std::placeholders::_2),
"ip:port/meta/backup_policy");
// request body should be start_bulk_load_request
register_handler("app/start_bulk_load",
std::bind(&meta_http_service::start_bulk_load_handler,
this,
std::placeholders::_1,
std::placeholders::_2),
"ip:port/meta/start_bulk_load");
register_handler("app/query_bulk_load",
std::bind(&meta_http_service::query_bulk_load_handler,
this,
Expand All @@ -76,6 +87,7 @@ class meta_http_service : public http_service
void get_app_envs_handler(const http_request &req, http_response &resp);
void query_backup_policy_handler(const http_request &req, http_response &resp);
void query_duplication_handler(const http_request &req, http_response &resp);
void start_bulk_load_handler(const http_request &req, http_response &resp);
void query_bulk_load_handler(const http_request &req, http_response &resp);

private:
Expand Down
42 changes: 42 additions & 0 deletions src/meta/test/meta_http_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <gtest/gtest.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/http/http_server.h>
#include <dsn/utility/fail_point.h>

#include "meta/meta_http_service.h"
#include "meta/meta_service.h"
Expand Down Expand Up @@ -195,6 +196,15 @@ class meta_bulk_load_http_test : public meta_test_base
meta_test_base::TearDown();
}

http_response test_start_bulk_load(std::string req_body_json)
{
http_request req;
http_response resp;
req.body = blob::create_from_bytes(std::move(req_body_json));
_mhs->start_bulk_load_handler(req, resp);
return resp;
}

std::string test_query_bulk_load(const std::string &app_name)
{
http_request req;
Expand Down Expand Up @@ -230,6 +240,38 @@ class meta_bulk_load_http_test : public meta_test_base
std::string APP_NAME = "test_bulk_load";
};

TEST_F(meta_bulk_load_http_test, start_bulk_load_request)
{
fail::setup();
fail::cfg("meta_on_start_bulk_load", "return()");
struct start_bulk_load_test
{
std::string request_json;
http_status_code expected_code;
std::string expected_response_json;
} tests[] = {
{"{\"app\":\"test_bulk_load\",\"cluster_name\":\"onebox\", "
"\"file_provider_type\":\"local_service\", \"remote_root_path\":\"bulk_load_root\"}",
http_status_code::bad_request,
"invalid request structure"},
{"{\"app_name\":\"test_bulk_load\",\"cluster_name\":\"onebox\", "
"\"file_provider_type\":\"\", \"remote_root_path\":\"bulk_load_root\"}",
http_status_code::bad_request,
"file_provider_type should not be empty"},
{"{\"app_name\":\"test_bulk_load\",\"cluster_name\":\"onebox\", "
"\"file_provider_type\":\"local_service\", \"remote_root_path\":\"bulk_load_root\"}",
http_status_code::ok,
"{\"error\":\"ERR_OK\",\"hint_msg\":\"\"}\n"},
};

for (const auto &test : tests) {
http_response resp = test_start_bulk_load(test.request_json);
ASSERT_EQ(resp.status_code, test.expected_code);
ASSERT_EQ(resp.body, test.expected_response_json);
}
fail::teardown();
}

TEST_F(meta_bulk_load_http_test, query_bulk_load_request)
{
const std::string NOT_BULK_LOAD = "not_bulk_load_app";
Expand Down

0 comments on commit 1d88c1d

Please sign in to comment.