forked from ydb-platform/ydb
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add tstool and tsserver into ydb/tools (ydb-platform#545)
- Loading branch information
Showing
9 changed files
with
393 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
#pragma once | ||
|
||
#include <ydb/core/test_tablet/processor.h> | ||
#include <ydb/library/actors/interconnect/interconnect_stream.h> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
#include <stdio.h> | ||
#include <sys/epoll.h> | ||
#include <ydb/core/protos/test_shard.pb.h> | ||
#include "socket_context.h" | ||
|
||
int main(int argc, char *argv[]) { | ||
if (argc != 2) { | ||
printf("Usage: %s <port>\n", argv[0]); | ||
return 1; | ||
} | ||
|
||
auto server = NInterconnect::TStreamSocket::Make(AF_INET6); | ||
Y_ABORT_UNLESS(*server != -1, "failed to create server socket"); | ||
SetNonBlock(*server); | ||
SetSockOpt(*server, SOL_SOCKET, SO_REUSEADDR, 1); | ||
SetSockOpt(*server, SOL_SOCKET, SO_REUSEPORT, 1); | ||
|
||
int res = server->Bind(NInterconnect::TAddress("::", atoi(argv[1]))); | ||
Y_ABORT_UNLESS(!res, "failed to bind server socket: %s", strerror(-res)); | ||
res = server->Listen(10); | ||
Y_ABORT_UNLESS(!res, "failed to listen on server socket: %s", strerror(-res)); | ||
|
||
int epfd = epoll_create(1); | ||
if (epfd == -1) { | ||
Y_ABORT("failed to create epoll: %s", strerror(errno)); | ||
} | ||
|
||
epoll_event e; | ||
e.events = EPOLLIN; | ||
e.data.u64 = 0; | ||
if (epoll_ctl(epfd, EPOLL_CTL_ADD, *server, &e) == -1) { | ||
Y_ABORT("failed to add listening socket to epoll: %s", strerror(errno)); | ||
} | ||
|
||
std::unordered_map<ui64, TSocketContext> Clients; | ||
ui64 LastClientId = 0; | ||
|
||
TProcessor processor; | ||
|
||
for (;;) { | ||
static constexpr size_t N = 64; | ||
epoll_event events[N]; | ||
int res = epoll_wait(epfd, events, N, -1); | ||
if (res == -1) { | ||
if (errno == EINTR) { | ||
continue; | ||
} else { | ||
Y_ABORT("epoll_wait failed: %s", strerror(errno)); | ||
} | ||
} | ||
for (int i = 0; i < res; ++i) { | ||
epoll_event& e = events[i]; | ||
if (!e.data.u64) { | ||
NInterconnect::TAddress addr; | ||
int fd = server->Accept(addr); | ||
Y_ABORT_UNLESS(fd >= 0, "failed to accept client socket: %s", strerror(-fd)); | ||
auto client = MakeIntrusive<NInterconnect::TStreamSocket>(fd); | ||
SetNonBlock(*client); | ||
SetNoDelay(*client, true); | ||
const ui64 clientId = ++LastClientId; | ||
auto&& [it, inserted] = Clients.try_emplace(clientId, clientId, epfd, client, processor); | ||
Y_ABORT_UNLESS(inserted); | ||
} else if (auto it = Clients.find(e.data.u64); it != Clients.end()) { | ||
try { | ||
if (e.events & EPOLLIN) { | ||
it->second.Read(); | ||
} | ||
if (e.events & EPOLLOUT) { | ||
it->second.Write(); | ||
} | ||
it->second.UpdateEpoll(); | ||
} catch (const TExError&) { | ||
Clients.erase(it); | ||
} | ||
} | ||
} | ||
} | ||
|
||
close(epfd); | ||
|
||
return 0; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
#pragma once | ||
|
||
#include "defs.h" | ||
#include "types.h" | ||
|
||
using TProcessor = NKikimr::NTestShard::TProcessor; | ||
|
||
class TSocketContext { | ||
const ui64 ClientId; | ||
const int EpollFd; | ||
TIntrusivePtr<NInterconnect::TStreamSocket> Socket; | ||
TProcessor& Processor; | ||
|
||
enum class EReadStep { | ||
LEN, | ||
DATA, | ||
} ReadStep = EReadStep::LEN; | ||
TString ReadBuffer = TString::Uninitialized(sizeof(ui32)); | ||
char *ReadBegin = ReadBuffer.Detach(), *ReadEnd = ReadBegin + ReadBuffer.size(); | ||
|
||
std::deque<TString> OutQ; | ||
size_t WriteOffset = 0; | ||
|
||
std::optional<ui32> StoredEvents; | ||
ui32 Events = 0; | ||
|
||
public: | ||
TSocketContext(ui64 clientId, int epollFd, TIntrusivePtr<NInterconnect::TStreamSocket> socket, TProcessor& processor) | ||
: ClientId(clientId) | ||
, EpollFd(epollFd) | ||
, Socket(std::move(socket)) | ||
, Processor(processor) | ||
{ | ||
printf("[%" PRIu64 "] created\n", ClientId); | ||
Read(); | ||
Write(); | ||
UpdateEpoll(); | ||
} | ||
|
||
~TSocketContext() { | ||
if (StoredEvents) { | ||
Epoll(EPOLL_CTL_DEL, 0); | ||
} | ||
} | ||
|
||
void Epoll(int op, ui32 events) { | ||
epoll_event e; | ||
e.events = events; | ||
e.data.u64 = ClientId; | ||
if (epoll_ctl(EpollFd, op, *Socket, &e) == -1) { | ||
Y_ABORT("epoll_ctl failed: %s", strerror(errno)); | ||
} | ||
} | ||
|
||
void UpdateEpoll() { | ||
std::optional<int> mode = !StoredEvents ? std::make_optional(EPOLL_CTL_ADD) : | ||
*StoredEvents != Events ? std::make_optional(EPOLL_CTL_MOD) : std::nullopt; | ||
if (mode) { | ||
Epoll(*mode, Events); | ||
} | ||
StoredEvents = Events; | ||
} | ||
|
||
void Read() { | ||
for (;;) { | ||
ssize_t n = Socket->Recv(ReadBegin, ReadEnd - ReadBegin); | ||
if (n > 0) { | ||
ReadBegin += n; | ||
if (ReadBegin == ReadEnd) { | ||
switch (ReadStep) { | ||
case EReadStep::LEN: { | ||
ui32 n; | ||
Y_ABORT_UNLESS(ReadBuffer.size() == sizeof(n)); | ||
memcpy(&n, ReadBuffer.data(), ReadBuffer.size()); // strict aliasing | ||
Y_ABORT_UNLESS(n <= 64 * 1024 * 1024); | ||
ReadBuffer = TString::Uninitialized(n); | ||
ReadStep = EReadStep::DATA; | ||
break; | ||
} | ||
|
||
case EReadStep::DATA: | ||
ProcessReadBuffer(); | ||
ReadBuffer = TString::Uninitialized(sizeof(ui32)); | ||
ReadStep = EReadStep::LEN; | ||
break; | ||
} | ||
ReadBegin = ReadBuffer.Detach(); | ||
ReadEnd = ReadBegin + ReadBuffer.size(); | ||
} | ||
} else if (-n == EAGAIN || -n == EWOULDBLOCK) { | ||
Events |= EPOLLIN; | ||
break; | ||
} else if (-n == EINTR) { | ||
continue; | ||
} else { | ||
printf("[%" PRIu64 "] failed to receive data: %s\n", ClientId, strerror(-n)); | ||
throw TExError(); | ||
} | ||
} | ||
} | ||
|
||
void ProcessReadBuffer() { | ||
::NTestShard::TStateServer::TRequest request; | ||
if (!request.ParseFromString(ReadBuffer)) { | ||
throw TExError(); | ||
} | ||
auto send = [this](const auto& proto) { | ||
TString buffer; | ||
const bool success = proto.SerializeToString(&buffer); | ||
Y_ABORT_UNLESS(success); | ||
Y_ABORT_UNLESS(buffer.size() <= 64 * 1024 * 1024); | ||
ui32 len = buffer.size(); | ||
TString w = TString::Uninitialized(sizeof(ui32) + len); | ||
char *p = w.Detach(); | ||
memcpy(p, &len, sizeof(len)); | ||
memcpy(p + sizeof(len), buffer.data(), buffer.size()); | ||
OutQ.push_back(std::move(w)); | ||
Write(); | ||
}; | ||
switch (request.GetCommandCase()) { | ||
case ::NTestShard::TStateServer::TRequest::kWrite: | ||
send(Processor.Execute(request.GetWrite())); | ||
break; | ||
|
||
case ::NTestShard::TStateServer::TRequest::kRead: | ||
send(Processor.Execute(request.GetRead())); | ||
break; | ||
|
||
case ::NTestShard::TStateServer::TRequest::kTabletInfo: | ||
case ::NTestShard::TStateServer::TRequest::COMMAND_NOT_SET: | ||
printf("[%" PRIu64 "] incorrect request received\n", ClientId); | ||
throw TExError(); | ||
} | ||
} | ||
|
||
void Write() { | ||
while (!OutQ.empty()) { | ||
auto& buffer = OutQ.front(); | ||
if (WriteOffset == buffer.size()) { | ||
OutQ.pop_front(); | ||
WriteOffset = 0; | ||
continue; | ||
} | ||
ssize_t n = Socket->Send(buffer.data() + WriteOffset, buffer.size() - WriteOffset); | ||
if (n > 0) { | ||
WriteOffset += n; | ||
} else if (-n == EWOULDBLOCK || -n == EAGAIN) { | ||
break; | ||
} else if (-n == EINTR) { | ||
continue; | ||
} else { | ||
printf("[%" PRIu64 "] failed to send data: %s\n", ClientId, strerror(-n)); | ||
} | ||
} | ||
if (OutQ.empty()) { | ||
Events &= ~EPOLLOUT; | ||
} else { | ||
Events |= EPOLLOUT; | ||
} | ||
} | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
#pragma once | ||
|
||
#include "defs.h" | ||
|
||
struct TExError : std::exception {}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
PROGRAM() | ||
|
||
OWNER(alexvru) | ||
|
||
SRCS( | ||
main.cpp | ||
) | ||
|
||
PEERDIR( | ||
ydb/library/actors/interconnect | ||
ydb/core/protos | ||
) | ||
|
||
END() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
StorageServerPort: 35000 | ||
MaxDataBytes: 1500000000 | ||
MinDataBytes: 1000000000 | ||
MaxInFlight: 5 | ||
Sizes { Weight: 1 Min: 1 Max: 1024 Inline: true } | ||
Sizes { Weight: 3 Min: 10000 Max: 1000000 } | ||
Sizes { Weight: 7 Min: 2357120 Max: 8388608 } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
from argparse import ArgumentParser, FileType | ||
from ydb.core.protos.grpc_pb2_grpc import TGRpcServerStub | ||
from ydb.core.protos.msgbus_pb2 import THiveCreateTablet, TTestShardControlRequest | ||
from ydb.core.protos.tablet_pb2 import TTabletTypes | ||
from ydb.core.protos.base_pb2 import EReplyStatus | ||
from google.protobuf import text_format | ||
import grpc | ||
import socket | ||
import sys | ||
import time | ||
import multiprocessing | ||
import random | ||
|
||
grpc_host = None | ||
grpc_port = None | ||
domain_uid = None | ||
default_tsserver_port = 35000 | ||
|
||
|
||
def invoke_grpc(func, *params): | ||
with grpc.insecure_channel('%s:%d' % (grpc_host, grpc_port), []) as channel: | ||
stub = TGRpcServerStub(channel) | ||
return getattr(stub, func)(*params) | ||
|
||
|
||
def create_tablet(owner_idx, channels, count): | ||
request = THiveCreateTablet(DomainUid=domain_uid) | ||
for i in range(count): | ||
cmd = request.CmdCreateTablet.add(OwnerId=0, OwnerIdx=owner_idx + i, TabletType=TTabletTypes.TestShard, ChannelsProfile=0) | ||
for channel in channels: | ||
cmd.BindedChannels.add(StoragePoolName=channel, PhysicalGroupsOnly=False) | ||
|
||
for _ in range(10): | ||
res = invoke_grpc('HiveCreateTablet', request) | ||
if res.Status == 1: | ||
assert all(r.Status in [EReplyStatus.OK, EReplyStatus.ALREADY] for r in res.CreateTabletResult) | ||
return [r.TabletId for r in res.CreateTabletResult] | ||
else: | ||
print(res, file=sys.stderr) | ||
time.sleep(3) | ||
|
||
assert False | ||
|
||
|
||
def init_tablet(args): | ||
tablet_id, cmd = args | ||
request = TTestShardControlRequest(TabletId=tablet_id, Initialize=cmd) | ||
|
||
for _ in range(100): | ||
res = invoke_grpc('TestShardControl', request) | ||
if res.Status == 1: | ||
return None | ||
else: | ||
time.sleep(random.uniform(1.0, 2.0)) | ||
|
||
return str(tablet_id) + ': ' + str(res) | ||
|
||
|
||
def main(): | ||
parser = ArgumentParser(description='YDB TestShard control tool') | ||
parser.add_argument('--grpc-host', type=str, required=True, help='gRPC endpoint hostname') | ||
parser.add_argument('--grpc-port', type=int, default=2135, help='gRPC endpoint port') | ||
parser.add_argument('--domain-uid', type=int, default=1, help='domain UID') | ||
parser.add_argument('--owner-idx', type=int, required=True, help='unique instance id for tablet creation') | ||
parser.add_argument('--channels', type=str, nargs='+', required=True, help='channel storage pool names') | ||
parser.add_argument('--count', type=int, default=1, help='create desired amount of tablets at once') | ||
|
||
subparsers = parser.add_subparsers(help='Action', dest='action', required=True) | ||
|
||
p = subparsers.add_parser('initialize', help='initialize test shard state') | ||
p.add_argument('--proto-file', type=FileType(), required=True, help='path to protobuf containing TCmdInitialize') | ||
p.add_argument('--tsserver', type=str, help='FQDN:port pair for tsserver') | ||
|
||
args = parser.parse_args() | ||
|
||
global grpc_host, grpc_port, domain_uid | ||
grpc_host = args.grpc_host | ||
grpc_port = args.grpc_port | ||
domain_uid = args.domain_uid | ||
|
||
tablet_ids = create_tablet(args.owner_idx, args.channels, args.count) | ||
print('TabletIds# %s' % ', '.join(map(str, tablet_ids))) | ||
|
||
if args.action == 'initialize': | ||
cmd = text_format.Parse(args.proto_file.read(), TTestShardControlRequest.TCmdInitialize()) | ||
if args.tsserver is not None: | ||
host, sep, port = args.tsserver.partition(':') | ||
port = int(port) if sep else default_tsserver_port | ||
sockaddr = None | ||
for _, _, _, _, sockaddr in socket.getaddrinfo(host, port, socket.AF_INET6): | ||
break | ||
if sockaddr is None: | ||
print('Failed to resolve hostname %s' % host, file=sys.stderr) | ||
sys.exit(1) | ||
cmd.StorageServerHost = sockaddr[0] | ||
with multiprocessing.Pool(None) as p: | ||
status = 0 | ||
for r in p.imap_unordered(init_tablet, ((tablet_id, cmd) for tablet_id in tablet_ids), 1): | ||
if r is not None: | ||
sys.stderr.write(r) | ||
status = 1 | ||
sys.exit(status) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
PY3_PROGRAM(tstool) | ||
|
||
OWNER(alexvru) | ||
|
||
PY_MAIN(tstool) | ||
|
||
PY_SRCS( | ||
TOP_LEVEL | ||
tstool.py | ||
) | ||
|
||
PEERDIR( | ||
ydb/core/protos | ||
) | ||
|
||
END() |
Oops, something went wrong.