Skip to content

Commit

Permalink
Merge 643e5bb into e90285a
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvru authored Feb 13, 2024
2 parents e90285a + 643e5bb commit a9651c0
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 15 deletions.
1 change: 1 addition & 0 deletions ydb/apps/dstool/lib/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def apply_args(self, args, with_localhost=True):

if args.token_file:
self.token = args.token_file.readline().rstrip('\r\n')
args.token_file.close()
if self.token is None:
self.token = os.getenv('YDB_TOKEN')
if self.token is not None:
Expand Down
71 changes: 64 additions & 7 deletions ydb/apps/dstool/lib/dstool_cmd_cluster_workload_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ def add_options(p):
p.add_argument('--disable-evicts', action='store_true', help='Disable VDisk evicts')
p.add_argument('--disable-restarts', action='store_true', help='Disable node restarts')
p.add_argument('--enable-pdisk-encryption-keys-changes', action='store_true', help='Enable changes of PDisk encryption keys')
p.add_argument('--enable-kill-tablets', action='store_true', help='Enable tablet killer')
p.add_argument('--enable-kill-blob-depot', action='store_true', help='Enable BlobDepot killer')
p.add_argument('--kill-signal', type=str, default='KILL', help='Kill signal to send to restart node')


def fetch_start_time_map(base_config):
Expand Down Expand Up @@ -84,6 +87,8 @@ def do(args):
config_retries -= 1
continue

tablets = common.fetch_json_info('tabletinfo') if args.enable_kill_tablets or args.enable_kill_blob_depot else {}

config_retries = None

for vslot in base_config.VSlot:
Expand Down Expand Up @@ -135,7 +140,7 @@ def do_restart(node_id):
host = node_fqdn_map[node_id]
if args.enable_pdisk_encryption_keys_changes:
update_pdisk_key_config(node_fqdn_map, pdisk_keys, node_id)
subprocess.call(['ssh', host, 'sudo', 'killall', '-9', 'kikimr'])
subprocess.call(['ssh', host, 'sudo', 'killall', '-%s' % args.kill_signal, 'kikimr'])
if args.enable_pdisk_encryption_keys_changes:
remove_old_pdisk_keys(pdisk_keys, pdisk_key_versions, node_id)

Expand Down Expand Up @@ -185,6 +190,29 @@ def do_add_pdisk_key(node_id):
"version" : v,
"file" : "keynumber" + str(v)})

def do_kill_tablet():
tablet_list = [
value
for key, value in tablets.items()
if value['State'] == 'Active' and value['Leader']
]
item = random.choice(tablet_list)
tablet_id = int(item['TabletId'])
print('Killing tablet %d of type %s' % (tablet_id, item['Type']))
common.fetch('tablets', dict(RestartTabletID=tablet_id), fmt='raw', cache=False)

def do_kill_blob_depot():
tablet_list = [
value
for key, value in tablets.items()
if value['State'] == 'Active' and value['Leader'] and value['Type'] == 'BlobDepot'
]
if tablet_list:
item = random.choice(tablet_list)
tablet_id = int(item['TabletId'])
print('Killing tablet %d of type %s' % (tablet_id, item['Type']))
common.fetch('tablets', dict(RestartTabletID=tablet_id), fmt='raw', cache=False)

################################################################################################################

now = datetime.utcnow()
Expand All @@ -193,19 +221,45 @@ def do_add_pdisk_key(node_id):

possible_actions = []

if args.enable_kill_tablets:
possible_actions.append(('kill tablet', (do_kill_tablet,)))
if args.enable_kill_blob_depot:
possible_actions.append(('kill blob depot', (do_kill_blob_depot,)))

evicts = []
wipes = []
readonlies = []
unreadonlies = []

for vslot in base_config.VSlot:
if common.is_dynamic_group(vslot.GroupId):
vslot_id = common.get_vslot_id(vslot.VSlotId)
vdisk_id = '[%08x:%d:%d:%d]' % (vslot.GroupId, vslot.FailRealmIdx, vslot.FailDomainIdx, vslot.VDiskIdx)
if vslot_id in vslot_readonly and not args.disable_readonly:
possible_actions.append(('un-readonly vslot id: %s, vdisk id: %s' % (vslot_id, vdisk_id), (do_readonly, vslot, False)))
unreadonlies.append(('un-readonly vslot id: %s, vdisk id: %s' % (vslot_id, vdisk_id), (do_readonly, vslot, False)))
if can_act_on_vslot(*vslot_id) and (recent_restarts or args.disable_restarts):
if not args.disable_evicts:
possible_actions.append(('evict vslot id: %s, vdisk id: %s' % (vslot_id, vdisk_id), (do_evict, vslot_id)))
evicts.append(('evict vslot id: %s, vdisk id: %s' % (vslot_id, vdisk_id), (do_evict, vslot_id)))
if not args.disable_wipes:
possible_actions.append(('wipe vslot id: %s, vdisk id: %s' % (vslot_id, vdisk_id), (do_wipe, vslot)))
wipes.append(('wipe vslot id: %s, vdisk id: %s' % (vslot_id, vdisk_id), (do_wipe, vslot)))
if not args.disable_readonly:
possible_actions.append(('readonly vslot id: %s, vdisk id: %s' % (vslot_id, vdisk_id), (do_readonly, vslot, True)))
readolies.append(('readonly vslot id: %s, vdisk id: %s' % (vslot_id, vdisk_id), (do_readonly, vslot, True)))

def pick(v):
action_name, action = random.choice(v)
print(action_name)
action[0](*action[1:])

if evicts:
possible_actions.append(('evict', (pick, evicts)))
if wipes:
possible_actions.append(('wipe', (pick, wipes)))
if readonlies:
possible_actions.append(('readonly', (pick, readonlies)))
if unreadonlies:
possible_actions.append(('un-readonly', (pick, unreadonlies)))

restarts = []

if start_time_map and len(recent_restarts) < 3:
# sort so that the latest restarts come first
Expand All @@ -216,7 +270,10 @@ def do_add_pdisk_key(node_id):
if args.enable_pdisk_encryption_keys_changes:
possible_actions.append(('add new pdisk key to node with id: %d' % node_id, (do_add_pdisk_key, node_id)))
if not args.disable_restarts:
possible_actions.append(('restart node with id: %d' % node_id, (do_restart, node_id)))
restarts.append(('restart node with id: %d' % node_id, (do_restart, node_id)))

if restarts:
possible_actions.append(('restart', (pick, restarts)))

if not possible_actions:
common.print_if_not_quiet(args, 'Waiting for the next round...', file=sys.stdout)
Expand All @@ -226,7 +283,7 @@ def do_add_pdisk_key(node_id):
################################################################################################################

action_name, action = random.choice(possible_actions)
common.print_if_not_quiet(args, '%s' % action_name, file=sys.stdout)
print('%s %s' % (action_name, datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S')))

try:
action[0](*action[1:])
Expand Down
15 changes: 8 additions & 7 deletions ydb/apps/dstool/lib/dstool_cmd_group_take_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ def get_endpoints():
global output_file
output_file = args.output

threads = []
for p in get_endpoints():
thread = Thread(target=fetch_blobs_from_vdisk, args=p, daemon=True)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
with output_file:
threads = []
for p in get_endpoints():
thread = Thread(target=fetch_blobs_from_vdisk, args=p, daemon=True)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
2 changes: 1 addition & 1 deletion ydb/core/keyvalue/channel_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ namespace NKikimr::NKeyValue {
const size_t index = (LatencyQueue.size() - 1) * 99 / 100;
const TDuration perc = latencies[index];
weight = MeanExpectedLatency.GetValue() * weight / Max(perc, TDuration::MilliSeconds(1)).GetValue();
Y_DEBUG_ABORT_UNLESS(weight);
//Y_DEBUG_ABORT_UNLESS(weight);
if (!weight) {
weight = 1;
}
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/keyvalue/keyvalue_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,9 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e
}

THelpers::DbEraseCollect(db, ctx);
if (IsEmptyDbStart) {
THelpers::DbUpdateState(StoredState, db, ctx);
}

// corner case, if no CollectGarbage events were sent
if (InitialCollectsSent == 0) {
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/keyvalue/keyvalue_storage_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,11 @@ class TKeyValueStorageRequest : public TActorBootstrapped<TKeyValueStorageReques
IntermediateResults->Stat.GroupReadIops[std::make_pair(response.Id.Channel(), groupId)] += 1; // FIXME: count distinct blobs?
read.Value.Write(readItem.ValueOffset, std::move(response.Buffer));
} else {
Y_VERIFY_DEBUG_S(response.Status != NKikimrProto::NODATA, "NODATA received for TEvGet"
<< " TabletId# " << TabletInfo->TabletID
<< " Id# " << response.Id
<< " Key# " << read.Key);

TStringStream err;
if (read.Message.size()) {
err << read.Message << Endl;
Expand Down

0 comments on commit a9651c0

Please sign in to comment.