Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tstool and tsserver into ydb/tools #545

Merged
merged 1 commit into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ydb/tools/tsserver/defs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#pragma once

#include <ydb/core/test_tablet/processor.h>
#include <ydb/library/actors/interconnect/interconnect_stream.h>
82 changes: 82 additions & 0 deletions ydb/tools/tsserver/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#include <stdio.h>
#include <sys/epoll.h>
#include <ydb/core/protos/test_shard.pb.h>
#include "socket_context.h"

int main(int argc, char *argv[]) {
if (argc != 2) {
printf("Usage: %s <port>\n", argv[0]);
return 1;
}

auto server = NInterconnect::TStreamSocket::Make(AF_INET6);
Y_ABORT_UNLESS(*server != -1, "failed to create server socket");
SetNonBlock(*server);
SetSockOpt(*server, SOL_SOCKET, SO_REUSEADDR, 1);
SetSockOpt(*server, SOL_SOCKET, SO_REUSEPORT, 1);

int res = server->Bind(NInterconnect::TAddress("::", atoi(argv[1])));
Y_ABORT_UNLESS(!res, "failed to bind server socket: %s", strerror(-res));
res = server->Listen(10);
Y_ABORT_UNLESS(!res, "failed to listen on server socket: %s", strerror(-res));

int epfd = epoll_create(1);
if (epfd == -1) {
Y_ABORT("failed to create epoll: %s", strerror(errno));
}

epoll_event e;
e.events = EPOLLIN;
e.data.u64 = 0;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, *server, &e) == -1) {
Y_ABORT("failed to add listening socket to epoll: %s", strerror(errno));
}

std::unordered_map<ui64, TSocketContext> Clients;
ui64 LastClientId = 0;

TProcessor processor;

for (;;) {
static constexpr size_t N = 64;
epoll_event events[N];
int res = epoll_wait(epfd, events, N, -1);
if (res == -1) {
if (errno == EINTR) {
continue;
} else {
Y_ABORT("epoll_wait failed: %s", strerror(errno));
}
}
for (int i = 0; i < res; ++i) {
epoll_event& e = events[i];
if (!e.data.u64) {
NInterconnect::TAddress addr;
int fd = server->Accept(addr);
Y_ABORT_UNLESS(fd >= 0, "failed to accept client socket: %s", strerror(-fd));
auto client = MakeIntrusive<NInterconnect::TStreamSocket>(fd);
SetNonBlock(*client);
SetNoDelay(*client, true);
const ui64 clientId = ++LastClientId;
auto&& [it, inserted] = Clients.try_emplace(clientId, clientId, epfd, client, processor);
Y_ABORT_UNLESS(inserted);
} else if (auto it = Clients.find(e.data.u64); it != Clients.end()) {
try {
if (e.events & EPOLLIN) {
it->second.Read();
}
if (e.events & EPOLLOUT) {
it->second.Write();
}
it->second.UpdateEpoll();
} catch (const TExError&) {
Clients.erase(it);
}
}
}
}

close(epfd);

return 0;
}
161 changes: 161 additions & 0 deletions ydb/tools/tsserver/socket_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
#pragma once

#include "defs.h"
#include "types.h"

using TProcessor = NKikimr::NTestShard::TProcessor;

class TSocketContext {
const ui64 ClientId;
const int EpollFd;
TIntrusivePtr<NInterconnect::TStreamSocket> Socket;
TProcessor& Processor;

enum class EReadStep {
LEN,
DATA,
} ReadStep = EReadStep::LEN;
TString ReadBuffer = TString::Uninitialized(sizeof(ui32));
char *ReadBegin = ReadBuffer.Detach(), *ReadEnd = ReadBegin + ReadBuffer.size();

std::deque<TString> OutQ;
size_t WriteOffset = 0;

std::optional<ui32> StoredEvents;
ui32 Events = 0;

public:
TSocketContext(ui64 clientId, int epollFd, TIntrusivePtr<NInterconnect::TStreamSocket> socket, TProcessor& processor)
: ClientId(clientId)
, EpollFd(epollFd)
, Socket(std::move(socket))
, Processor(processor)
{
printf("[%" PRIu64 "] created\n", ClientId);
Read();
Write();
UpdateEpoll();
}

~TSocketContext() {
if (StoredEvents) {
Epoll(EPOLL_CTL_DEL, 0);
}
}

void Epoll(int op, ui32 events) {
epoll_event e;
e.events = events;
e.data.u64 = ClientId;
if (epoll_ctl(EpollFd, op, *Socket, &e) == -1) {
Y_ABORT("epoll_ctl failed: %s", strerror(errno));
}
}

void UpdateEpoll() {
std::optional<int> mode = !StoredEvents ? std::make_optional(EPOLL_CTL_ADD) :
*StoredEvents != Events ? std::make_optional(EPOLL_CTL_MOD) : std::nullopt;
if (mode) {
Epoll(*mode, Events);
}
StoredEvents = Events;
}

void Read() {
for (;;) {
ssize_t n = Socket->Recv(ReadBegin, ReadEnd - ReadBegin);
if (n > 0) {
ReadBegin += n;
if (ReadBegin == ReadEnd) {
switch (ReadStep) {
case EReadStep::LEN: {
ui32 n;
Y_ABORT_UNLESS(ReadBuffer.size() == sizeof(n));
memcpy(&n, ReadBuffer.data(), ReadBuffer.size()); // strict aliasing
Y_ABORT_UNLESS(n <= 64 * 1024 * 1024);
ReadBuffer = TString::Uninitialized(n);
ReadStep = EReadStep::DATA;
break;
}

case EReadStep::DATA:
ProcessReadBuffer();
ReadBuffer = TString::Uninitialized(sizeof(ui32));
ReadStep = EReadStep::LEN;
break;
}
ReadBegin = ReadBuffer.Detach();
ReadEnd = ReadBegin + ReadBuffer.size();
}
} else if (-n == EAGAIN || -n == EWOULDBLOCK) {
Events |= EPOLLIN;
break;
} else if (-n == EINTR) {
continue;
} else {
printf("[%" PRIu64 "] failed to receive data: %s\n", ClientId, strerror(-n));
throw TExError();
}
}
}

void ProcessReadBuffer() {
::NTestShard::TStateServer::TRequest request;
if (!request.ParseFromString(ReadBuffer)) {
throw TExError();
}
auto send = [this](const auto& proto) {
TString buffer;
const bool success = proto.SerializeToString(&buffer);
Y_ABORT_UNLESS(success);
Y_ABORT_UNLESS(buffer.size() <= 64 * 1024 * 1024);
ui32 len = buffer.size();
TString w = TString::Uninitialized(sizeof(ui32) + len);
char *p = w.Detach();
memcpy(p, &len, sizeof(len));
memcpy(p + sizeof(len), buffer.data(), buffer.size());
OutQ.push_back(std::move(w));
Write();
};
switch (request.GetCommandCase()) {
case ::NTestShard::TStateServer::TRequest::kWrite:
send(Processor.Execute(request.GetWrite()));
break;

case ::NTestShard::TStateServer::TRequest::kRead:
send(Processor.Execute(request.GetRead()));
break;

case ::NTestShard::TStateServer::TRequest::kTabletInfo:
case ::NTestShard::TStateServer::TRequest::COMMAND_NOT_SET:
printf("[%" PRIu64 "] incorrect request received\n", ClientId);
throw TExError();
}
}

void Write() {
while (!OutQ.empty()) {
auto& buffer = OutQ.front();
if (WriteOffset == buffer.size()) {
OutQ.pop_front();
WriteOffset = 0;
continue;
}
ssize_t n = Socket->Send(buffer.data() + WriteOffset, buffer.size() - WriteOffset);
if (n > 0) {
WriteOffset += n;
} else if (-n == EWOULDBLOCK || -n == EAGAIN) {
break;
} else if (-n == EINTR) {
continue;
} else {
printf("[%" PRIu64 "] failed to send data: %s\n", ClientId, strerror(-n));
}
}
if (OutQ.empty()) {
Events &= ~EPOLLOUT;
} else {
Events |= EPOLLOUT;
}
}
};
5 changes: 5 additions & 0 deletions ydb/tools/tsserver/types.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#pragma once

#include "defs.h"

struct TExError : std::exception {};
14 changes: 14 additions & 0 deletions ydb/tools/tsserver/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
PROGRAM()

OWNER(alexvru)

SRCS(
main.cpp
)

PEERDIR(
ydb/library/actors/interconnect
ydb/core/protos
)

END()
7 changes: 7 additions & 0 deletions ydb/tools/tstool/testshard.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
StorageServerPort: 35000
MaxDataBytes: 1500000000
MinDataBytes: 1000000000
MaxInFlight: 5
Sizes { Weight: 1 Min: 1 Max: 1024 Inline: true }
Sizes { Weight: 3 Min: 10000 Max: 1000000 }
Sizes { Weight: 7 Min: 2357120 Max: 8388608 }
102 changes: 102 additions & 0 deletions ydb/tools/tstool/tstool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from argparse import ArgumentParser, FileType
from ydb.core.protos.grpc_pb2_grpc import TGRpcServerStub
from ydb.core.protos.msgbus_pb2 import THiveCreateTablet, TTestShardControlRequest
from ydb.core.protos.tablet_pb2 import TTabletTypes
from ydb.core.protos.base_pb2 import EReplyStatus
from google.protobuf import text_format
import grpc
import socket
import sys
import time
import multiprocessing
import random

grpc_host = None
grpc_port = None
domain_uid = None
default_tsserver_port = 35000


def invoke_grpc(func, *params):
with grpc.insecure_channel('%s:%d' % (grpc_host, grpc_port), []) as channel:
stub = TGRpcServerStub(channel)
return getattr(stub, func)(*params)


def create_tablet(owner_idx, channels, count):
request = THiveCreateTablet(DomainUid=domain_uid)
for i in range(count):
cmd = request.CmdCreateTablet.add(OwnerId=0, OwnerIdx=owner_idx + i, TabletType=TTabletTypes.TestShard, ChannelsProfile=0)
for channel in channels:
cmd.BindedChannels.add(StoragePoolName=channel, PhysicalGroupsOnly=False)

for _ in range(10):
res = invoke_grpc('HiveCreateTablet', request)
if res.Status == 1:
assert all(r.Status in [EReplyStatus.OK, EReplyStatus.ALREADY] for r in res.CreateTabletResult)
return [r.TabletId for r in res.CreateTabletResult]
else:
print(res, file=sys.stderr)
time.sleep(3)

assert False


def init_tablet(args):
tablet_id, cmd = args
request = TTestShardControlRequest(TabletId=tablet_id, Initialize=cmd)

for _ in range(100):
res = invoke_grpc('TestShardControl', request)
if res.Status == 1:
return None
else:
time.sleep(random.uniform(1.0, 2.0))

return str(tablet_id) + ': ' + str(res)


def main():
parser = ArgumentParser(description='YDB TestShard control tool')
parser.add_argument('--grpc-host', type=str, required=True, help='gRPC endpoint hostname')
parser.add_argument('--grpc-port', type=int, default=2135, help='gRPC endpoint port')
parser.add_argument('--domain-uid', type=int, default=1, help='domain UID')
parser.add_argument('--owner-idx', type=int, required=True, help='unique instance id for tablet creation')
parser.add_argument('--channels', type=str, nargs='+', required=True, help='channel storage pool names')
parser.add_argument('--count', type=int, default=1, help='create desired amount of tablets at once')

subparsers = parser.add_subparsers(help='Action', dest='action', required=True)

p = subparsers.add_parser('initialize', help='initialize test shard state')
p.add_argument('--proto-file', type=FileType(), required=True, help='path to protobuf containing TCmdInitialize')
p.add_argument('--tsserver', type=str, help='FQDN:port pair for tsserver')

args = parser.parse_args()

global grpc_host, grpc_port, domain_uid
grpc_host = args.grpc_host
grpc_port = args.grpc_port
domain_uid = args.domain_uid

tablet_ids = create_tablet(args.owner_idx, args.channels, args.count)
print('TabletIds# %s' % ', '.join(map(str, tablet_ids)))

if args.action == 'initialize':
cmd = text_format.Parse(args.proto_file.read(), TTestShardControlRequest.TCmdInitialize())
if args.tsserver is not None:
host, sep, port = args.tsserver.partition(':')
port = int(port) if sep else default_tsserver_port
sockaddr = None
for _, _, _, _, sockaddr in socket.getaddrinfo(host, port, socket.AF_INET6):
break
if sockaddr is None:
print('Failed to resolve hostname %s' % host, file=sys.stderr)
sys.exit(1)
cmd.StorageServerHost = sockaddr[0]
with multiprocessing.Pool(None) as p:
status = 0
for r in p.imap_unordered(init_tablet, ((tablet_id, cmd) for tablet_id in tablet_ids), 1):
if r is not None:
sys.stderr.write(r)
status = 1
sys.exit(status)
16 changes: 16 additions & 0 deletions ydb/tools/tstool/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
PY3_PROGRAM(tstool)

OWNER(alexvru)

PY_MAIN(tstool)

PY_SRCS(
TOP_LEVEL
tstool.py
)

PEERDIR(
ydb/core/protos
)

END()
Loading
Loading