diff --git a/ydb/tools/tsserver/defs.h b/ydb/tools/tsserver/defs.h new file mode 100644 index 000000000000..e9dfa8a60a1c --- /dev/null +++ b/ydb/tools/tsserver/defs.h @@ -0,0 +1,4 @@ +#pragma once + +#include +#include diff --git a/ydb/tools/tsserver/main.cpp b/ydb/tools/tsserver/main.cpp new file mode 100644 index 000000000000..925e3380c495 --- /dev/null +++ b/ydb/tools/tsserver/main.cpp @@ -0,0 +1,82 @@ +#include +#include +#include +#include "socket_context.h" + +int main(int argc, char *argv[]) { + if (argc != 2) { + printf("Usage: %s \n", argv[0]); + return 1; + } + + auto server = NInterconnect::TStreamSocket::Make(AF_INET6); + Y_ABORT_UNLESS(*server != -1, "failed to create server socket"); + SetNonBlock(*server); + SetSockOpt(*server, SOL_SOCKET, SO_REUSEADDR, 1); + SetSockOpt(*server, SOL_SOCKET, SO_REUSEPORT, 1); + + int res = server->Bind(NInterconnect::TAddress("::", atoi(argv[1]))); + Y_ABORT_UNLESS(!res, "failed to bind server socket: %s", strerror(-res)); + res = server->Listen(10); + Y_ABORT_UNLESS(!res, "failed to listen on server socket: %s", strerror(-res)); + + int epfd = epoll_create(1); + if (epfd == -1) { + Y_ABORT("failed to create epoll: %s", strerror(errno)); + } + + epoll_event e; + e.events = EPOLLIN; + e.data.u64 = 0; + if (epoll_ctl(epfd, EPOLL_CTL_ADD, *server, &e) == -1) { + Y_ABORT("failed to add listening socket to epoll: %s", strerror(errno)); + } + + std::unordered_map Clients; + ui64 LastClientId = 0; + + TProcessor processor; + + for (;;) { + static constexpr size_t N = 64; + epoll_event events[N]; + int res = epoll_wait(epfd, events, N, -1); + if (res == -1) { + if (errno == EINTR) { + continue; + } else { + Y_ABORT("epoll_wait failed: %s", strerror(errno)); + } + } + for (int i = 0; i < res; ++i) { + epoll_event& e = events[i]; + if (!e.data.u64) { + NInterconnect::TAddress addr; + int fd = server->Accept(addr); + Y_ABORT_UNLESS(fd >= 0, "failed to accept client socket: %s", strerror(-fd)); + auto client = MakeIntrusive(fd); + SetNonBlock(*client); + SetNoDelay(*client, true); + const ui64 clientId = ++LastClientId; + auto&& [it, inserted] = Clients.try_emplace(clientId, clientId, epfd, client, processor); + Y_ABORT_UNLESS(inserted); + } else if (auto it = Clients.find(e.data.u64); it != Clients.end()) { + try { + if (e.events & EPOLLIN) { + it->second.Read(); + } + if (e.events & EPOLLOUT) { + it->second.Write(); + } + it->second.UpdateEpoll(); + } catch (const TExError&) { + Clients.erase(it); + } + } + } + } + + close(epfd); + + return 0; +} diff --git a/ydb/tools/tsserver/socket_context.h b/ydb/tools/tsserver/socket_context.h new file mode 100644 index 000000000000..e2c04b8d4da8 --- /dev/null +++ b/ydb/tools/tsserver/socket_context.h @@ -0,0 +1,161 @@ +#pragma once + +#include "defs.h" +#include "types.h" + +using TProcessor = NKikimr::NTestShard::TProcessor; + +class TSocketContext { + const ui64 ClientId; + const int EpollFd; + TIntrusivePtr Socket; + TProcessor& Processor; + + enum class EReadStep { + LEN, + DATA, + } ReadStep = EReadStep::LEN; + TString ReadBuffer = TString::Uninitialized(sizeof(ui32)); + char *ReadBegin = ReadBuffer.Detach(), *ReadEnd = ReadBegin + ReadBuffer.size(); + + std::deque OutQ; + size_t WriteOffset = 0; + + std::optional StoredEvents; + ui32 Events = 0; + +public: + TSocketContext(ui64 clientId, int epollFd, TIntrusivePtr socket, TProcessor& processor) + : ClientId(clientId) + , EpollFd(epollFd) + , Socket(std::move(socket)) + , Processor(processor) + { + printf("[%" PRIu64 "] created\n", ClientId); + Read(); + Write(); + UpdateEpoll(); + } + + ~TSocketContext() { + if (StoredEvents) { + Epoll(EPOLL_CTL_DEL, 0); + } + } + + void Epoll(int op, ui32 events) { + epoll_event e; + e.events = events; + e.data.u64 = ClientId; + if (epoll_ctl(EpollFd, op, *Socket, &e) == -1) { + Y_ABORT("epoll_ctl failed: %s", strerror(errno)); + } + } + + void UpdateEpoll() { + std::optional mode = !StoredEvents ? std::make_optional(EPOLL_CTL_ADD) : + *StoredEvents != Events ? std::make_optional(EPOLL_CTL_MOD) : std::nullopt; + if (mode) { + Epoll(*mode, Events); + } + StoredEvents = Events; + } + + void Read() { + for (;;) { + ssize_t n = Socket->Recv(ReadBegin, ReadEnd - ReadBegin); + if (n > 0) { + ReadBegin += n; + if (ReadBegin == ReadEnd) { + switch (ReadStep) { + case EReadStep::LEN: { + ui32 n; + Y_ABORT_UNLESS(ReadBuffer.size() == sizeof(n)); + memcpy(&n, ReadBuffer.data(), ReadBuffer.size()); // strict aliasing + Y_ABORT_UNLESS(n <= 64 * 1024 * 1024); + ReadBuffer = TString::Uninitialized(n); + ReadStep = EReadStep::DATA; + break; + } + + case EReadStep::DATA: + ProcessReadBuffer(); + ReadBuffer = TString::Uninitialized(sizeof(ui32)); + ReadStep = EReadStep::LEN; + break; + } + ReadBegin = ReadBuffer.Detach(); + ReadEnd = ReadBegin + ReadBuffer.size(); + } + } else if (-n == EAGAIN || -n == EWOULDBLOCK) { + Events |= EPOLLIN; + break; + } else if (-n == EINTR) { + continue; + } else { + printf("[%" PRIu64 "] failed to receive data: %s\n", ClientId, strerror(-n)); + throw TExError(); + } + } + } + + void ProcessReadBuffer() { + ::NTestShard::TStateServer::TRequest request; + if (!request.ParseFromString(ReadBuffer)) { + throw TExError(); + } + auto send = [this](const auto& proto) { + TString buffer; + const bool success = proto.SerializeToString(&buffer); + Y_ABORT_UNLESS(success); + Y_ABORT_UNLESS(buffer.size() <= 64 * 1024 * 1024); + ui32 len = buffer.size(); + TString w = TString::Uninitialized(sizeof(ui32) + len); + char *p = w.Detach(); + memcpy(p, &len, sizeof(len)); + memcpy(p + sizeof(len), buffer.data(), buffer.size()); + OutQ.push_back(std::move(w)); + Write(); + }; + switch (request.GetCommandCase()) { + case ::NTestShard::TStateServer::TRequest::kWrite: + send(Processor.Execute(request.GetWrite())); + break; + + case ::NTestShard::TStateServer::TRequest::kRead: + send(Processor.Execute(request.GetRead())); + break; + + case ::NTestShard::TStateServer::TRequest::kTabletInfo: + case ::NTestShard::TStateServer::TRequest::COMMAND_NOT_SET: + printf("[%" PRIu64 "] incorrect request received\n", ClientId); + throw TExError(); + } + } + + void Write() { + while (!OutQ.empty()) { + auto& buffer = OutQ.front(); + if (WriteOffset == buffer.size()) { + OutQ.pop_front(); + WriteOffset = 0; + continue; + } + ssize_t n = Socket->Send(buffer.data() + WriteOffset, buffer.size() - WriteOffset); + if (n > 0) { + WriteOffset += n; + } else if (-n == EWOULDBLOCK || -n == EAGAIN) { + break; + } else if (-n == EINTR) { + continue; + } else { + printf("[%" PRIu64 "] failed to send data: %s\n", ClientId, strerror(-n)); + } + } + if (OutQ.empty()) { + Events &= ~EPOLLOUT; + } else { + Events |= EPOLLOUT; + } + } +}; diff --git a/ydb/tools/tsserver/types.h b/ydb/tools/tsserver/types.h new file mode 100644 index 000000000000..444e26986082 --- /dev/null +++ b/ydb/tools/tsserver/types.h @@ -0,0 +1,5 @@ +#pragma once + +#include "defs.h" + +struct TExError : std::exception {}; diff --git a/ydb/tools/tsserver/ya.make b/ydb/tools/tsserver/ya.make new file mode 100644 index 000000000000..a43950c5bf33 --- /dev/null +++ b/ydb/tools/tsserver/ya.make @@ -0,0 +1,14 @@ +PROGRAM() + +OWNER(alexvru) + +SRCS( + main.cpp +) + +PEERDIR( + ydb/library/actors/interconnect + ydb/core/protos +) + +END() diff --git a/ydb/tools/tstool/testshard.txt b/ydb/tools/tstool/testshard.txt new file mode 100644 index 000000000000..018307bd55da --- /dev/null +++ b/ydb/tools/tstool/testshard.txt @@ -0,0 +1,7 @@ +StorageServerPort: 35000 +MaxDataBytes: 1500000000 +MinDataBytes: 1000000000 +MaxInFlight: 5 +Sizes { Weight: 1 Min: 1 Max: 1024 Inline: true } +Sizes { Weight: 3 Min: 10000 Max: 1000000 } +Sizes { Weight: 7 Min: 2357120 Max: 8388608 } diff --git a/ydb/tools/tstool/tstool.py b/ydb/tools/tstool/tstool.py new file mode 100644 index 000000000000..8c6568d59eae --- /dev/null +++ b/ydb/tools/tstool/tstool.py @@ -0,0 +1,102 @@ +from argparse import ArgumentParser, FileType +from ydb.core.protos.grpc_pb2_grpc import TGRpcServerStub +from ydb.core.protos.msgbus_pb2 import THiveCreateTablet, TTestShardControlRequest +from ydb.core.protos.tablet_pb2 import TTabletTypes +from ydb.core.protos.base_pb2 import EReplyStatus +from google.protobuf import text_format +import grpc +import socket +import sys +import time +import multiprocessing +import random + +grpc_host = None +grpc_port = None +domain_uid = None +default_tsserver_port = 35000 + + +def invoke_grpc(func, *params): + with grpc.insecure_channel('%s:%d' % (grpc_host, grpc_port), []) as channel: + stub = TGRpcServerStub(channel) + return getattr(stub, func)(*params) + + +def create_tablet(owner_idx, channels, count): + request = THiveCreateTablet(DomainUid=domain_uid) + for i in range(count): + cmd = request.CmdCreateTablet.add(OwnerId=0, OwnerIdx=owner_idx + i, TabletType=TTabletTypes.TestShard, ChannelsProfile=0) + for channel in channels: + cmd.BindedChannels.add(StoragePoolName=channel, PhysicalGroupsOnly=False) + + for _ in range(10): + res = invoke_grpc('HiveCreateTablet', request) + if res.Status == 1: + assert all(r.Status in [EReplyStatus.OK, EReplyStatus.ALREADY] for r in res.CreateTabletResult) + return [r.TabletId for r in res.CreateTabletResult] + else: + print(res, file=sys.stderr) + time.sleep(3) + + assert False + + +def init_tablet(args): + tablet_id, cmd = args + request = TTestShardControlRequest(TabletId=tablet_id, Initialize=cmd) + + for _ in range(100): + res = invoke_grpc('TestShardControl', request) + if res.Status == 1: + return None + else: + time.sleep(random.uniform(1.0, 2.0)) + + return str(tablet_id) + ': ' + str(res) + + +def main(): + parser = ArgumentParser(description='YDB TestShard control tool') + parser.add_argument('--grpc-host', type=str, required=True, help='gRPC endpoint hostname') + parser.add_argument('--grpc-port', type=int, default=2135, help='gRPC endpoint port') + parser.add_argument('--domain-uid', type=int, default=1, help='domain UID') + parser.add_argument('--owner-idx', type=int, required=True, help='unique instance id for tablet creation') + parser.add_argument('--channels', type=str, nargs='+', required=True, help='channel storage pool names') + parser.add_argument('--count', type=int, default=1, help='create desired amount of tablets at once') + + subparsers = parser.add_subparsers(help='Action', dest='action', required=True) + + p = subparsers.add_parser('initialize', help='initialize test shard state') + p.add_argument('--proto-file', type=FileType(), required=True, help='path to protobuf containing TCmdInitialize') + p.add_argument('--tsserver', type=str, help='FQDN:port pair for tsserver') + + args = parser.parse_args() + + global grpc_host, grpc_port, domain_uid + grpc_host = args.grpc_host + grpc_port = args.grpc_port + domain_uid = args.domain_uid + + tablet_ids = create_tablet(args.owner_idx, args.channels, args.count) + print('TabletIds# %s' % ', '.join(map(str, tablet_ids))) + + if args.action == 'initialize': + cmd = text_format.Parse(args.proto_file.read(), TTestShardControlRequest.TCmdInitialize()) + if args.tsserver is not None: + host, sep, port = args.tsserver.partition(':') + port = int(port) if sep else default_tsserver_port + sockaddr = None + for _, _, _, _, sockaddr in socket.getaddrinfo(host, port, socket.AF_INET6): + break + if sockaddr is None: + print('Failed to resolve hostname %s' % host, file=sys.stderr) + sys.exit(1) + cmd.StorageServerHost = sockaddr[0] + with multiprocessing.Pool(None) as p: + status = 0 + for r in p.imap_unordered(init_tablet, ((tablet_id, cmd) for tablet_id in tablet_ids), 1): + if r is not None: + sys.stderr.write(r) + status = 1 + sys.exit(status) diff --git a/ydb/tools/tstool/ya.make b/ydb/tools/tstool/ya.make new file mode 100644 index 000000000000..217284a1f19b --- /dev/null +++ b/ydb/tools/tstool/ya.make @@ -0,0 +1,16 @@ +PY3_PROGRAM(tstool) + +OWNER(alexvru) + +PY_MAIN(tstool) + +PY_SRCS( + TOP_LEVEL + tstool.py +) + +PEERDIR( + ydb/core/protos +) + +END() diff --git a/ydb/tools/ya.make b/ydb/tools/ya.make index e01fbd619c73..25389126d01e 100644 --- a/ydb/tools/ya.make +++ b/ydb/tools/ya.make @@ -1,4 +1,6 @@ RECURSE( cfg ydbd_slice + tsserver + tstool )