Skip to content

Commit

Permalink
Add tstool and tsserver into ydb/tools
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvru committed Dec 18, 2023
1 parent 9f6e1bc commit ddb91c2
Show file tree
Hide file tree
Showing 9 changed files with 393 additions and 0 deletions.
4 changes: 4 additions & 0 deletions ydb/tools/tsserver/defs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#pragma once

#include <ydb/core/test_tablet/processor.h>
#include <ydb/library/actors/interconnect/interconnect_stream.h>
82 changes: 82 additions & 0 deletions ydb/tools/tsserver/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#include <stdio.h>
#include <sys/epoll.h>
#include <ydb/core/protos/test_shard.pb.h>
#include "socket_context.h"

int main(int argc, char *argv[]) {
if (argc != 2) {
printf("Usage: %s <port>\n", argv[0]);
return 1;
}

auto server = NInterconnect::TStreamSocket::Make(AF_INET6);
Y_ABORT_UNLESS(*server != -1, "failed to create server socket");
SetNonBlock(*server);
SetSockOpt(*server, SOL_SOCKET, SO_REUSEADDR, 1);
SetSockOpt(*server, SOL_SOCKET, SO_REUSEPORT, 1);

int res = server->Bind(NInterconnect::TAddress("::", atoi(argv[1])));
Y_ABORT_UNLESS(!res, "failed to bind server socket: %s", strerror(-res));
res = server->Listen(10);
Y_ABORT_UNLESS(!res, "failed to listen on server socket: %s", strerror(-res));

int epfd = epoll_create(1);
if (epfd == -1) {
Y_ABORT("failed to create epoll: %s", strerror(errno));
}

epoll_event e;
e.events = EPOLLIN;
e.data.u64 = 0;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, *server, &e) == -1) {
Y_ABORT("failed to add listening socket to epoll: %s", strerror(errno));
}

std::unordered_map<ui64, TSocketContext> Clients;
ui64 LastClientId = 0;

TProcessor processor;

for (;;) {
static constexpr size_t N = 64;
epoll_event events[N];
int res = epoll_wait(epfd, events, N, -1);
if (res == -1) {
if (errno == EINTR) {
continue;
} else {
Y_ABORT("epoll_wait failed: %s", strerror(errno));
}
}
for (int i = 0; i < res; ++i) {
epoll_event& e = events[i];
if (!e.data.u64) {
NInterconnect::TAddress addr;
int fd = server->Accept(addr);
Y_ABORT_UNLESS(fd >= 0, "failed to accept client socket: %s", strerror(-fd));
auto client = MakeIntrusive<NInterconnect::TStreamSocket>(fd);
SetNonBlock(*client);
SetNoDelay(*client, true);
const ui64 clientId = ++LastClientId;
auto&& [it, inserted] = Clients.try_emplace(clientId, clientId, epfd, client, processor);
Y_ABORT_UNLESS(inserted);
} else if (auto it = Clients.find(e.data.u64); it != Clients.end()) {
try {
if (e.events & EPOLLIN) {
it->second.Read();
}
if (e.events & EPOLLOUT) {
it->second.Write();
}
it->second.UpdateEpoll();
} catch (const TExError&) {
Clients.erase(it);
}
}
}
}

close(epfd);

return 0;
}
161 changes: 161 additions & 0 deletions ydb/tools/tsserver/socket_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
#pragma once

#include "defs.h"
#include "types.h"

using TProcessor = NKikimr::NTestShard::TProcessor;

class TSocketContext {
const ui64 ClientId;
const int EpollFd;
TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
TProcessor& Processor;

enum class EReadStep {
LEN,
DATA,
} ReadStep = EReadStep::LEN;
TString ReadBuffer = TString::Uninitialized(sizeof(ui32));
char *ReadBegin = ReadBuffer.Detach(), *ReadEnd = ReadBegin + ReadBuffer.size();

std::deque<TString> OutQ;
size_t WriteOffset = 0;

std::optional<ui32> StoredEvents;
ui32 Events = 0;

public:
TSocketContext(ui64 clientId, int epollFd, TIntrusivePtr<NInterconnect::TStreamSocket> socket, TProcessor& processor)
: ClientId(clientId)
, EpollFd(epollFd)
, Socket(std::move(socket))
, Processor(processor)
{
printf("[%" PRIu64 "] created\n", ClientId);
Read();
Write();
UpdateEpoll();
}

~TSocketContext() {
if (StoredEvents) {
Epoll(EPOLL_CTL_DEL, 0);
}
}

void Epoll(int op, ui32 events) {
epoll_event e;
e.events = events;
e.data.u64 = ClientId;
if (epoll_ctl(EpollFd, op, *Socket, &e) == -1) {
Y_ABORT("epoll_ctl failed: %s", strerror(errno));
}
}

void UpdateEpoll() {
std::optional<int> mode = !StoredEvents ? std::make_optional(EPOLL_CTL_ADD) :
*StoredEvents != Events ? std::make_optional(EPOLL_CTL_MOD) : std::nullopt;
if (mode) {
Epoll(*mode, Events);
}
StoredEvents = Events;
}

void Read() {
for (;;) {
ssize_t n = Socket->Recv(ReadBegin, ReadEnd - ReadBegin);
if (n > 0) {
ReadBegin += n;
if (ReadBegin == ReadEnd) {
switch (ReadStep) {
case EReadStep::LEN: {
ui32 n;
Y_ABORT_UNLESS(ReadBuffer.size() == sizeof(n));
memcpy(&n, ReadBuffer.data(), ReadBuffer.size()); // strict aliasing
Y_ABORT_UNLESS(n <= 64 * 1024 * 1024);
ReadBuffer = TString::Uninitialized(n);
ReadStep = EReadStep::DATA;
break;
}

case EReadStep::DATA:
ProcessReadBuffer();
ReadBuffer = TString::Uninitialized(sizeof(ui32));
ReadStep = EReadStep::LEN;
break;
}
ReadBegin = ReadBuffer.Detach();
ReadEnd = ReadBegin + ReadBuffer.size();
}
} else if (-n == EAGAIN || -n == EWOULDBLOCK) {
Events |= EPOLLIN;
break;
} else if (-n == EINTR) {
continue;
} else {
printf("[%" PRIu64 "] failed to receive data: %s\n", ClientId, strerror(-n));
throw TExError();
}
}
}

void ProcessReadBuffer() {
::NTestShard::TStateServer::TRequest request;
if (!request.ParseFromString(ReadBuffer)) {
throw TExError();
}
auto send = [this](const auto& proto) {
TString buffer;
const bool success = proto.SerializeToString(&buffer);
Y_ABORT_UNLESS(success);
Y_ABORT_UNLESS(buffer.size() <= 64 * 1024 * 1024);
ui32 len = buffer.size();
TString w = TString::Uninitialized(sizeof(ui32) + len);
char *p = w.Detach();
memcpy(p, &len, sizeof(len));
memcpy(p + sizeof(len), buffer.data(), buffer.size());
OutQ.push_back(std::move(w));
Write();
};
switch (request.GetCommandCase()) {
case ::NTestShard::TStateServer::TRequest::kWrite:
send(Processor.Execute(request.GetWrite()));
break;

case ::NTestShard::TStateServer::TRequest::kRead:
send(Processor.Execute(request.GetRead()));
break;

case ::NTestShard::TStateServer::TRequest::kTabletInfo:
case ::NTestShard::TStateServer::TRequest::COMMAND_NOT_SET:
printf("[%" PRIu64 "] incorrect request received\n", ClientId);
throw TExError();
}
}

void Write() {
while (!OutQ.empty()) {
auto& buffer = OutQ.front();
if (WriteOffset == buffer.size()) {
OutQ.pop_front();
WriteOffset = 0;
continue;
}
ssize_t n = Socket->Send(buffer.data() + WriteOffset, buffer.size() - WriteOffset);
if (n > 0) {
WriteOffset += n;
} else if (-n == EWOULDBLOCK || -n == EAGAIN) {
break;
} else if (-n == EINTR) {
continue;
} else {
printf("[%" PRIu64 "] failed to send data: %s\n", ClientId, strerror(-n));
}
}
if (OutQ.empty()) {
Events &= ~EPOLLOUT;
} else {
Events |= EPOLLOUT;
}
}
};
5 changes: 5 additions & 0 deletions ydb/tools/tsserver/types.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#pragma once

#include "defs.h"

struct TExError : std::exception {};
14 changes: 14 additions & 0 deletions ydb/tools/tsserver/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
PROGRAM()

OWNER(alexvru)

SRCS(
main.cpp
)

PEERDIR(
ydb/library/actors/interconnect
ydb/core/protos
)

END()
7 changes: 7 additions & 0 deletions ydb/tools/tstool/testshard.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
StorageServerPort: 35000
MaxDataBytes: 1500000000
MinDataBytes: 1000000000
MaxInFlight: 5
Sizes { Weight: 1 Min: 1 Max: 1024 Inline: true }
Sizes { Weight: 3 Min: 10000 Max: 1000000 }
Sizes { Weight: 7 Min: 2357120 Max: 8388608 }
102 changes: 102 additions & 0 deletions ydb/tools/tstool/tstool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from argparse import ArgumentParser, FileType
from ydb.core.protos.grpc_pb2_grpc import TGRpcServerStub
from ydb.core.protos.msgbus_pb2 import THiveCreateTablet, TTestShardControlRequest
from ydb.core.protos.tablet_pb2 import TTabletTypes
from ydb.core.protos.base_pb2 import EReplyStatus
from google.protobuf import text_format
import grpc
import socket
import sys
import time
import multiprocessing
import random

grpc_host = None
grpc_port = None
domain_uid = None
default_tsserver_port = 35000


def invoke_grpc(func, *params):
with grpc.insecure_channel('%s:%d' % (grpc_host, grpc_port), []) as channel:
stub = TGRpcServerStub(channel)
return getattr(stub, func)(*params)


def create_tablet(owner_idx, channels, count):
request = THiveCreateTablet(DomainUid=domain_uid)
for i in range(count):
cmd = request.CmdCreateTablet.add(OwnerId=0, OwnerIdx=owner_idx + i, TabletType=TTabletTypes.TestShard, ChannelsProfile=0)
for channel in channels:
cmd.BindedChannels.add(StoragePoolName=channel, PhysicalGroupsOnly=False)

for _ in range(10):
res = invoke_grpc('HiveCreateTablet', request)
if res.Status == 1:
assert all(r.Status in [EReplyStatus.OK, EReplyStatus.ALREADY] for r in res.CreateTabletResult)
return [r.TabletId for r in res.CreateTabletResult]
else:
print(res, file=sys.stderr)
time.sleep(3)

assert False


def init_tablet(args):
tablet_id, cmd = args
request = TTestShardControlRequest(TabletId=tablet_id, Initialize=cmd)

for _ in range(100):
res = invoke_grpc('TestShardControl', request)
if res.Status == 1:
return None
else:
time.sleep(random.uniform(1.0, 2.0))

return str(tablet_id) + ': ' + str(res)


def main():
parser = ArgumentParser(description='YDB TestShard control tool')
parser.add_argument('--grpc-host', type=str, required=True, help='gRPC endpoint hostname')
parser.add_argument('--grpc-port', type=int, default=2135, help='gRPC endpoint port')
parser.add_argument('--domain-uid', type=int, default=1, help='domain UID')
parser.add_argument('--owner-idx', type=int, required=True, help='unique instance id for tablet creation')
parser.add_argument('--channels', type=str, nargs='+', required=True, help='channel storage pool names')
parser.add_argument('--count', type=int, default=1, help='create desired amount of tablets at once')

subparsers = parser.add_subparsers(help='Action', dest='action', required=True)

p = subparsers.add_parser('initialize', help='initialize test shard state')
p.add_argument('--proto-file', type=FileType(), required=True, help='path to protobuf containing TCmdInitialize')
p.add_argument('--tsserver', type=str, help='FQDN:port pair for tsserver')

args = parser.parse_args()

global grpc_host, grpc_port, domain_uid
grpc_host = args.grpc_host
grpc_port = args.grpc_port
domain_uid = args.domain_uid

tablet_ids = create_tablet(args.owner_idx, args.channels, args.count)
print('TabletIds# %s' % ', '.join(map(str, tablet_ids)))

if args.action == 'initialize':
cmd = text_format.Parse(args.proto_file.read(), TTestShardControlRequest.TCmdInitialize())
if args.tsserver is not None:
host, sep, port = args.tsserver.partition(':')
port = int(port) if sep else default_tsserver_port
sockaddr = None
for _, _, _, _, sockaddr in socket.getaddrinfo(host, port, socket.AF_INET6):
break
if sockaddr is None:
print('Failed to resolve hostname %s' % host, file=sys.stderr)
sys.exit(1)
cmd.StorageServerHost = sockaddr[0]
with multiprocessing.Pool(None) as p:
status = 0
for r in p.imap_unordered(init_tablet, ((tablet_id, cmd) for tablet_id in tablet_ids), 1):
if r is not None:
sys.stderr.write(r)
status = 1
sys.exit(status)
16 changes: 16 additions & 0 deletions ydb/tools/tstool/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
PY3_PROGRAM(tstool)

OWNER(alexvru)

PY_MAIN(tstool)

PY_SRCS(
TOP_LEVEL
tstool.py
)

PEERDIR(
ydb/core/protos
)

END()
Loading

0 comments on commit ddb91c2

Please sign in to comment.