From fb829ce9e6e7e8a9cd2b7098c4531e535902e687 Mon Sep 17 00:00:00 2001 From: Drew Nutter Date: Mon, 7 Oct 2024 09:31:47 -0400 Subject: [PATCH 01/14] feat(prometheus,shred-collector): metrics improvements + add metrics to shred collector --- src/accountsdb/db.zig | 109 +++++++--------- src/cmd/cmd.zig | 2 + src/geyser/core.zig | 20 +-- src/gossip/service.zig | 34 ++--- src/ledger/cleanup_service.zig | 2 +- src/ledger/insert_shred.zig | 33 +++-- src/ledger/insert_shreds_working_state.zig | 10 +- src/ledger/reader.zig | 41 ++---- src/ledger/writer.zig | 34 ++--- src/prometheus/counter.zig | 20 ++- src/prometheus/gauge.zig | 19 ++- src/prometheus/gauge_fn.zig | 14 +- src/prometheus/histogram.zig | 29 ++++- src/prometheus/lib.zig | 3 +- src/prometheus/metric.zig | 22 +++- src/prometheus/registry.zig | 86 +++++++++++- src/prometheus/variant_counter.zig | 145 +++++++++++++++++++++ src/shred_collector/repair_service.zig | 79 ++++++++++- src/shred_collector/service.zig | 14 +- src/shred_collector/shred_processor.zig | 38 +++++- src/shred_collector/shred_receiver.zig | 101 +++++++++----- src/shred_collector/shred_tracker.zig | 28 +++- src/shred_collector/shred_verifier.zig | 57 ++++++-- src/transaction_sender/service.zig | 14 +- 24 files changed, 679 insertions(+), 275 deletions(-) create mode 100644 src/prometheus/variant_counter.zig diff --git a/src/accountsdb/db.zig b/src/accountsdb/db.zig index 7fcf81ff3..c9e5e04a5 100644 --- a/src/accountsdb/db.zig +++ b/src/accountsdb/db.zig @@ -35,6 +35,7 @@ const Level = sig.trace.level.Level; const NestedHashTree = sig.common.merkle_tree.NestedHashTree; const GetMetricError = sig.prometheus.registry.GetMetricError; const Counter = sig.prometheus.counter.Counter; +const Gauge = sig.prometheus.Gauge; const Histogram = sig.prometheus.histogram.Histogram; const ClientVersion = sig.version.ClientVersion; const StatusCache = sig.accounts_db.StatusCache; @@ -58,37 +59,6 @@ pub const ACCOUNT_FILE_SHRINK_THRESHOLD = 70; // shrink account files with more pub const DELETE_ACCOUNT_FILES_MIN = 100; pub const AccountsDBStats = struct { - const HistogramKind = enum { - flush_account_file_size, - shrink_file_shrunk_by, - shrink_alive_accounts, - shrink_dead_accounts, - time_flush, - time_clean, - time_shrink, - time_purge, - - fn buckets(self: HistogramKind) []const f64 { - const account_size_buckets = &.{ - // 10 bytes -> 10MB (solana max account size) - 10, 100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000, - }; - const account_count_buckets = &.{ 1, 5, 10, 50, 100, 500, 1_000, 10_000 }; - const nanosecond_buckets = &.{ - // 0.01ms -> 10ms - 1_000, 10_000, 100_000, 1_000_000, 10_000_000, - // 50ms -> 1000ms - 50_000_000, 100_000_000, 200_000_000, 400_000_000, 1_000_000_000, - }; - - return switch (self) { - .flush_account_file_size, .shrink_file_shrunk_by => account_size_buckets, - .shrink_alive_accounts, .shrink_dead_accounts => account_count_buckets, - .time_flush, .time_clean, .time_shrink, .time_purge => nanosecond_buckets, - }; - } - }; - number_files_flushed: *Counter, number_files_cleaned: *Counter, number_files_shrunk: *Counter, @@ -102,11 +72,11 @@ pub const AccountsDBStats = struct { flush_account_file_size: *Histogram, flush_accounts_written: *Counter, - clean_references_deleted: *Counter, - clean_files_queued_deletion: *Counter, - clean_files_queued_shrink: *Counter, - clean_slot_old_state: *Counter, - clean_slot_zero_lamports: *Counter, + clean_references_deleted: *Gauge(u64), + clean_files_queued_deletion: *Gauge(u64), + clean_files_queued_shrink: *Gauge(u64), + clean_slot_old_state: *Gauge(u64), + clean_slot_zero_lamports: *Gauge(u64), shrink_file_shrunk_by: *Histogram, shrink_alive_accounts: *Histogram, @@ -115,25 +85,38 @@ pub const AccountsDBStats = struct { const Self = @This(); pub fn init() GetMetricError!Self { - var self: Self = undefined; - const registry = globalRegistry(); - const stats_struct_info = @typeInfo(Self).Struct; - inline for (stats_struct_info.fields) |field| { - @field(self, field.name) = switch (field.type) { - *Counter => try registry.getOrCreateCounter(field.name), - *Histogram => blk: { - @setEvalBranchQuota(2000); // stringToEnum requires a little more than default - const histogram_kind = comptime std.meta.stringToEnum( - HistogramKind, - field.name, - ) orelse @compileError("no matching HistogramKind for AccountsDBStats *Histogram field"); - - break :blk try registry.getOrCreateHistogram(field.name, histogram_kind.buckets()); - }, - else => @compileError("Unsupported field type"), - }; - } - return self; + return globalRegistry().initStruct(Self); + } + + pub fn buckets(comptime field_name: []const u8) []const f64 { + const HistogramKind = enum { + flush_account_file_size, + shrink_file_shrunk_by, + shrink_alive_accounts, + shrink_dead_accounts, + time_flush, + time_clean, + time_shrink, + time_purge, + }; + + const account_size_buckets = &.{ + // 10 bytes -> 10MB (solana max account size) + 10, 100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000, + }; + const account_count_buckets = &.{ 1, 5, 10, 50, 100, 500, 1_000, 10_000 }; + const nanosecond_buckets = &.{ + // 0.01ms -> 10ms + 1_000, 10_000, 100_000, 1_000_000, 10_000_000, + // 50ms -> 1000ms + 50_000_000, 100_000_000, 200_000_000, 400_000_000, 1_000_000_000, + }; + + return switch (@field(HistogramKind, field_name)) { + .flush_account_file_size, .shrink_file_shrunk_by => account_size_buckets, + .shrink_alive_accounts, .shrink_dead_accounts => account_count_buckets, + .time_flush, .time_clean, .time_shrink, .time_purge => nanosecond_buckets, + }; } }; @@ -1441,7 +1424,7 @@ pub const AccountsDB = struct { for (accounts) |*account| { const account_size_in_file = account.getSizeInFile(); size += account_size_in_file; - self.stats.flush_account_file_size.observe(@floatFromInt(account_size_in_file)); + self.stats.flush_account_file_size.observe(account_size_in_file); } const file, const file_id, const memory = try self.createAccountFile(size, slot); @@ -1510,7 +1493,7 @@ pub const AccountsDB = struct { self.allocator.free(pubkeys); } - self.stats.time_flush.observe(@floatFromInt(timer.read().asNanos())); + self.stats.time_flush.observe(timer.read().asNanos()); // return to queue for cleaning return file_id; @@ -1671,7 +1654,7 @@ pub const AccountsDB = struct { self.stats.clean_slot_old_state.set(num_old_states); self.stats.clean_slot_zero_lamports.set(num_zero_lamports); - self.stats.time_clean.observe(@floatFromInt(timer.read().asNanos())); + self.stats.time_clean.observe(timer.read().asNanos()); return .{ .num_zero_lamports = num_zero_lamports, .num_old_states = num_old_states, @@ -1834,9 +1817,9 @@ pub const AccountsDB = struct { std.debug.assert(accounts_dead_count > 0); total_accounts_deleted += accounts_dead_count; - self.stats.shrink_alive_accounts.observe(@floatFromInt(accounts_alive_count)); - self.stats.shrink_dead_accounts.observe(@floatFromInt(accounts_dead_count)); - self.stats.shrink_file_shrunk_by.observe(@floatFromInt(accounts_dead_size)); + self.stats.shrink_alive_accounts.observe(accounts_alive_count); + self.stats.shrink_dead_accounts.observe(accounts_dead_count); + self.stats.shrink_file_shrunk_by.observe(accounts_dead_size); self.logger.debug().logf("n alive accounts: {}", .{accounts_alive_count}); self.logger.debug().logf("n dead accounts: {}", .{accounts_dead_count}); @@ -1947,7 +1930,7 @@ pub const AccountsDB = struct { } } - self.stats.time_shrink.observe(@floatFromInt(timer.read().asNanos())); + self.stats.time_shrink.observe(timer.read().asNanos()); return .{ .num_accounts_deleted = total_accounts_deleted, @@ -1995,7 +1978,7 @@ pub const AccountsDB = struct { self.allocator.free(accounts); self.allocator.free(pubkeys); - self.stats.time_purge.observe(@floatFromInt(timer.read().asNanos())); + self.stats.time_purge.observe(timer.read().asNanos()); } // NOTE: we need to acquire locks which requires `self: *Self` but we never modify any data diff --git a/src/cmd/cmd.zig b/src/cmd/cmd.zig index 5e89a1627..5119dd25c 100644 --- a/src/cmd/cmd.zig +++ b/src/cmd/cmd.zig @@ -735,6 +735,7 @@ fn validator() !void { .allocator = allocator, .logger = app_base.logger, .random = rng.random(), + .registry = app_base.metrics_registry, .my_keypair = &app_base.my_keypair, .exit = &app_base.exit, .gossip_table_rw = &gossip_service.gossip_table_rw, @@ -831,6 +832,7 @@ fn shredCollector() !void { .allocator = allocator, .logger = app_base.logger, .random = rng.random(), + .registry = app_base.metrics_registry, .my_keypair = &app_base.my_keypair, .exit = &app_base.exit, .gossip_table_rw = &gossip_service.gossip_table_rw, diff --git a/src/geyser/core.zig b/src/geyser/core.zig index a0a8fadc0..c67cd09bd 100644 --- a/src/geyser/core.zig +++ b/src/geyser/core.zig @@ -77,13 +77,7 @@ pub const GeyserWriterStats = struct { geyser_writer_total_bytes: *Counter, pub fn init() !GeyserWriterStats { - var self: GeyserWriterStats = undefined; - const registry = globalRegistry(); - const stats_struct_info = @typeInfo(GeyserWriterStats).Struct; - inline for (stats_struct_info.fields) |field| { - @field(self, field.name) = try registry.getOrCreateCounter(field.name); - } - return self; + return try globalRegistry().initStruct(GeyserWriterStats); } }; @@ -260,17 +254,7 @@ pub const GeyserReaderStats = struct { const GaugeU64 = Gauge(u64); pub fn init() !GeyserReaderStats { - var self: GeyserReaderStats = undefined; - const registry = globalRegistry(); - const stats_struct_info = @typeInfo(GeyserReaderStats).Struct; - inline for (stats_struct_info.fields) |field| { - @field(self, field.name) = switch (field.type) { - *Counter => try registry.getOrCreateCounter(field.name), - *GaugeU64 => try registry.getOrCreateGauge(field.name, u64), - else => @compileError("Unhandled field type: " ++ field.name ++ ": " ++ @typeName(field.type)), - }; - } - return self; + return try globalRegistry().initStruct(GeyserReaderStats); } }; diff --git a/src/gossip/service.zig b/src/gossip/service.zig index b838858c5..b8296f59f 100644 --- a/src/gossip/service.zig +++ b/src/gossip/service.zig @@ -704,7 +704,7 @@ pub const GossipService = struct { self.logger.err().logf("handleBatchPushMessages failed: {}", .{err}); }; const elapsed = x_timer.read().asMillis(); - self.stats.handle_batch_push_time.observe(@floatFromInt(elapsed)); + self.stats.handle_batch_push_time.observe(elapsed); push_messages.clearRetainingCapacity(); } @@ -713,7 +713,7 @@ pub const GossipService = struct { var x_timer = try sig.time.Timer.start(); self.handleBatchPruneMessages(&prune_messages); const elapsed = x_timer.read().asMillis(); - self.stats.handle_batch_prune_time.observe(@floatFromInt(elapsed)); + self.stats.handle_batch_prune_time.observe(elapsed); prune_messages.clearRetainingCapacity(); } @@ -724,7 +724,7 @@ pub const GossipService = struct { self.logger.err().logf("handleBatchPullRequest failed: {}", .{err}); }; const elapsed = x_timer.read().asMillis(); - self.stats.handle_batch_pull_req_time.observe(@floatFromInt(elapsed)); + self.stats.handle_batch_pull_req_time.observe(elapsed); pull_requests.clearRetainingCapacity(); } @@ -735,7 +735,7 @@ pub const GossipService = struct { self.logger.err().logf("handleBatchPullResponses failed: {}", .{err}); }; const elapsed = x_timer.read().asMillis(); - self.stats.handle_batch_pull_resp_time.observe(@floatFromInt(elapsed)); + self.stats.handle_batch_pull_resp_time.observe(elapsed); pull_responses.clearRetainingCapacity(); } @@ -746,7 +746,7 @@ pub const GossipService = struct { self.logger.err().logf("handleBatchPingMessages failed: {}", .{err}); }; const elapsed = x_timer.read().asMillis(); - self.stats.handle_batch_ping_time.observe(@floatFromInt(elapsed)); + self.stats.handle_batch_ping_time.observe(elapsed); ping_messages.clearRetainingCapacity(); } @@ -755,7 +755,7 @@ pub const GossipService = struct { var x_timer = try sig.time.Timer.start(); self.handleBatchPongMessages(&pong_messages); const elapsed = x_timer.read().asMillis(); - self.stats.handle_batch_pong_time.observe(@floatFromInt(elapsed)); + self.stats.handle_batch_pong_time.observe(elapsed); pong_messages.clearRetainingCapacity(); } @@ -794,7 +794,7 @@ pub const GossipService = struct { break :err_blk 0; }; const elapsed = x_timer.read().asMillis(); - self.stats.handle_trim_table_time.observe(@floatFromInt(elapsed)); + self.stats.handle_trim_table_time.observe(elapsed); break :blk n_pubkeys_dropped; } else 0; @@ -1578,7 +1578,7 @@ pub const GossipService = struct { var timer = try sig.time.Timer.start(); defer { const elapsed = timer.read().asMillis(); - self.stats.push_messages_time_to_insert.observe(@floatFromInt(elapsed)); + self.stats.push_messages_time_to_insert.observe(elapsed); } var gossip_table, var gossip_table_lg = self.gossip_table_rw.writeWithLock(); @@ -1681,7 +1681,7 @@ pub const GossipService = struct { var timer = try sig.time.Timer.start(); defer { const elapsed = timer.read().asMillis(); - self.stats.push_messages_time_build_prune.observe(@floatFromInt(elapsed)); + self.stats.push_messages_time_build_prune.observe(elapsed); } var pubkey_to_failed_origins_iter = pubkey_to_failed_origins.iterator(); @@ -2032,28 +2032,18 @@ pub const GossipStats = struct { const Self = @This(); - const HANDLE_TIME_BUCKETS_MS: [10]f64 = .{ + pub const buckets: [10]f64 = .{ 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000, }; + pub fn init(logger: Logger) GetMetricError!Self { var self: Self = undefined; const registry = globalRegistry(); - const stats_struct_info = @typeInfo(GossipStats).Struct; - inline for (stats_struct_info.fields) |field| { - if (field.name[0] != '_') { - @field(self, field.name) = switch (field.type) { - *Counter => try registry.getOrCreateCounter(field.name), - *GaugeU64 => try registry.getOrCreateGauge(field.name, u64), - *Histogram => try registry.getOrCreateHistogram(field.name, &HANDLE_TIME_BUCKETS_MS), - else => @compileError("Unhandled field type: " ++ field.name ++ ": " ++ @typeName(field.type)), - }; - } - } - + std.debug.assert(try registry.initFields(&self) == 1); self._logging_fields = .{ .logger = logger }; return self; } diff --git a/src/ledger/cleanup_service.zig b/src/ledger/cleanup_service.zig index 8dfe8edc3..e07d77133 100644 --- a/src/ledger/cleanup_service.zig +++ b/src/ledger/cleanup_service.zig @@ -385,7 +385,7 @@ test "purgeSlots" { .logger = logger, .lowest_cleanup_slot = &lowest_cleanup_slot, .max_root = &max_root, - .scan_and_fix_roots_metrics = try ledger.writer.ScanAndFixRootsMetrics.init(registry), + .scan_and_fix_roots_metrics = try registry.initStruct(ledger.writer.ScanAndFixRootsMetrics), }; // write some roots diff --git a/src/ledger/insert_shred.zig b/src/ledger/insert_shred.zig index 884c50cf9..18b6fab76 100644 --- a/src/ledger/insert_shred.zig +++ b/src/ledger/insert_shred.zig @@ -68,7 +68,7 @@ pub const ShredInserter = struct { .db = db, .lock = .{}, .max_root = Atomic(u64).init(0), // TODO read this from the database - .metrics = try BlockstoreInsertionMetrics.init(registry), + .metrics = try registry.initStruct(BlockstoreInsertionMetrics), }; } @@ -160,7 +160,12 @@ pub const ShredInserter = struct { // const allocator = self.allocator; var total_timer = try Timer.start(); - var state = try PendingInsertShredsState.init(self.allocator, self.logger, &self.db); + var state = try PendingInsertShredsState.init( + self.allocator, + self.logger, + &self.db, + self.metrics, + ); defer state.deinit(); var write_batch = state.write_batch; @@ -1102,14 +1107,7 @@ pub const BlockstoreInsertionMetrics = struct { num_code_shreds_invalid_erasure_config: *Counter, // usize num_code_shreds_inserted: *Counter, // usize - pub fn init(registry: *sig.prometheus.Registry(.{})) !BlockstoreInsertionMetrics { - var self: BlockstoreInsertionMetrics = undefined; - inline for (@typeInfo(BlockstoreInsertionMetrics).Struct.fields) |field| { - const name = "shred_inserter_" ++ field.name; - @field(self, field.name) = try registry.getOrCreateCounter(name); - } - return self; - } + pub const prefix = "shred_inserter"; }; ////////// @@ -1324,6 +1322,7 @@ test "merkle root metas coding" { var state = try ShredInserterTestState.initWithLogger(std.testing.allocator, "handle chaining basic", .noop); defer state.deinit(); const allocator = state.allocator(); + const metrics = try sig.prometheus.globalRegistry().initStruct(BlockstoreInsertionMetrics); const slot = 1; const start_index = 0; @@ -1338,7 +1337,12 @@ test "merkle root metas coding" { var write_batch = try state.db.initWriteBatch(); defer write_batch.deinit(); const this_shred = shreds[0]; - var insert_state = try PendingInsertShredsState.init(state.allocator(), .noop, &state.db); + var insert_state = try PendingInsertShredsState.init( + state.allocator(), + .noop, + &state.db, + metrics, + ); defer insert_state.deinit(); const merkle_root_metas = &insert_state.merkle_root_metas; @@ -1369,7 +1373,12 @@ test "merkle root metas coding" { try state.db.commit(write_batch); } - var insert_state = try PendingInsertShredsState.init(state.allocator(), .noop, &state.db); + var insert_state = try PendingInsertShredsState.init( + state.allocator(), + .noop, + &state.db, + metrics, + ); defer insert_state.deinit(); { // second shred (same index as first, should conflict with merkle root) diff --git a/src/ledger/insert_shreds_working_state.zig b/src/ledger/insert_shreds_working_state.zig index db90556d7..14d60678f 100644 --- a/src/ledger/insert_shreds_working_state.zig +++ b/src/ledger/insert_shreds_working_state.zig @@ -89,8 +89,12 @@ pub const PendingInsertShredsState = struct { const Self = @This(); - // TODO add param for metrics - pub fn init(allocator: Allocator, logger: sig.trace.Logger, db: *BlockstoreDB) !Self { + pub fn init( + allocator: Allocator, + logger: sig.trace.Logger, + db: *BlockstoreDB, + metrics: BlockstoreInsertionMetrics, + ) !Self { return .{ .allocator = allocator, .db = db, @@ -102,7 +106,7 @@ pub const PendingInsertShredsState = struct { .slot_meta_working_set = AutoHashMap(u64, SlotMetaWorkingSetEntry).init(allocator), .index_working_set = AutoHashMap(u64, IndexMetaWorkingSetEntry).init(allocator), .duplicate_shreds = ArrayList(PossibleDuplicateShred).init(allocator), - .metrics = try BlockstoreInsertionMetrics.init(sig.prometheus.globalRegistry()), + .metrics = metrics, }; } diff --git a/src/ledger/reader.zig b/src/ledger/reader.zig index 4df9719e5..f0d5ca6d8 100644 --- a/src/ledger/reader.zig +++ b/src/ledger/reader.zig @@ -72,8 +72,8 @@ pub const BlockstoreReader = struct { .allocator = allocator, .logger = logger, .db = db, - .rpc_api_metrics = try BlockstoreRpcApiMetrics.init(registry), - .metrics = try BlockstoreReaderMetrics.init(registry), + .rpc_api_metrics = try registry.initStruct(BlockstoreRpcApiMetrics), + .metrics = try registry.initStruct(BlockstoreReaderMetrics), .lowest_cleanup_slot = lowest_cleanup_slot, .max_root = max_root, }; @@ -774,7 +774,7 @@ pub const BlockstoreReader = struct { } else .{ highest_slot, AutoHashMap(Signature, void).init(self.allocator) }; defer before_excluded_signatures.deinit(); self.metrics.get_before_slot_us - .observe(@floatFromInt(get_before_slot_timer.read().asMicros())); + .observe(get_before_slot_timer.read().asMicros()); // Generate a HashSet of signatures that should be excluded from the results based on // `until` signature @@ -801,7 +801,7 @@ pub const BlockstoreReader = struct { }; defer until_excluded_signatures.deinit(); self.metrics.get_until_slot_us - .observe(@floatFromInt(get_until_slot_timer.read().asMicros())); + .observe(get_until_slot_timer.read().asMicros()); // Fetch the list of signatures that affect the given address var address_signatures = ArrayList(struct { Slot, Signature }).init(self.allocator); @@ -819,7 +819,7 @@ pub const BlockstoreReader = struct { } } self.metrics.get_initial_slot_us - .observe(@floatFromInt(get_initial_slot_timer.read().asMicros())); + .observe(get_initial_slot_timer.read().asMicros()); var address_signatures_iter_timer = try Timer.start(); // Regardless of whether a `before` signature is provided, the latest relevant @@ -848,7 +848,7 @@ pub const BlockstoreReader = struct { } } self.metrics.address_signatures_iter_us - .observe(@floatFromInt(address_signatures_iter_timer.read().asMicros())); + .observe(address_signatures_iter_timer.read().asMicros()); address_signatures.items.len = @min(address_signatures.items.len, limit); @@ -877,7 +877,7 @@ pub const BlockstoreReader = struct { }); } self.metrics.get_status_info_us - .observe(@floatFromInt(get_status_info_timer.read().asMicros())); + .observe(get_status_info_timer.read().asMicros()); return .{ .infos = infos, @@ -1454,22 +1454,8 @@ const BlockstoreReaderMetrics = struct { get_status_info_us: *Histogram, get_until_slot_us: *Histogram, - pub fn init(registry: *Registry(.{})) GetMetricError!BlockstoreReaderMetrics { - var self: BlockstoreReaderMetrics = undefined; - inline for (@typeInfo(BlockstoreReaderMetrics).Struct.fields) |field| { - const name = "blockstore_reader_" ++ field.name; - @field(self, field.name) = try registry.getOrCreateHistogram(name, &buckets); - } - return self; - } - - const buckets: [11]f64 = blk: { - var bs: [11]f64 = undefined; - for (0..11) |i| { - bs[i] = std.math.pow(f64, 5.0, @as(f64, @floatFromInt(i)) - 1.0); - } - break :blk bs; - }; + pub const prefix = "blockstore_reader"; + pub const buckets = sig.prometheus.histogram.exponentialBuckets(5, -1, 10); }; const BlockstoreRpcApiMetrics = struct { @@ -1484,14 +1470,7 @@ const BlockstoreRpcApiMetrics = struct { num_get_rooted_block_with_entries: *Counter, num_get_transaction_status: *Counter, - pub fn init(registry: *Registry(.{})) GetMetricError!BlockstoreRpcApiMetrics { - var self: BlockstoreRpcApiMetrics = undefined; - inline for (@typeInfo(BlockstoreRpcApiMetrics).Struct.fields) |field| { - const name = "blockstore_rpc_api_" ++ field.name; - @field(self, field.name) = try registry.getOrCreateCounter(name); - } - return self; - } + pub const prefix = "blockstore_rpc_api"; }; pub const AncestorIterator = struct { diff --git a/src/ledger/writer.zig b/src/ledger/writer.zig index 2fa212963..1990604fe 100644 --- a/src/ledger/writer.zig +++ b/src/ledger/writer.zig @@ -53,7 +53,7 @@ pub const BlockstoreWriter = struct { .db = db, .lowest_cleanup_slot = lowest_cleanup_slot, .max_root = max_root, - .scan_and_fix_roots_metrics = try ScanAndFixRootsMetrics.init(registry), + .scan_and_fix_roots_metrics = try registry.initStruct(ScanAndFixRootsMetrics), }; } @@ -239,9 +239,9 @@ pub const BlockstoreWriter = struct { const fix_roots_us = fix_roots_timer.read().asMicros(); const num_roots_fixed = roots_to_fix.items.len; - self.scan_and_fix_roots_metrics.fix_roots_us.observe(@floatFromInt(fix_roots_us)); - self.scan_and_fix_roots_metrics.find_missing_roots_us.observe(@floatFromInt(find_missing_roots_us)); - self.scan_and_fix_roots_metrics.num_roots_to_fix.observe(@floatFromInt(roots_to_fix.items.len)); + self.scan_and_fix_roots_metrics.fix_roots_us.observe(fix_roots_us); + self.scan_and_fix_roots_metrics.find_missing_roots_us.observe(find_missing_roots_us); + self.scan_and_fix_roots_metrics.num_roots_to_fix.observe(roots_to_fix.items.len); return num_roots_fixed; } @@ -310,22 +310,8 @@ pub const ScanAndFixRootsMetrics = struct { num_roots_to_fix: *Histogram, fix_roots_us: *Histogram, - pub fn init(registry: *Registry(.{})) GetMetricError!ScanAndFixRootsMetrics { - var self: ScanAndFixRootsMetrics = undefined; - inline for (@typeInfo(ScanAndFixRootsMetrics).Struct.fields) |field| { - const name = "scan_and_fix_roots_" ++ field.name; - @field(self, field.name) = try registry.getOrCreateHistogram(name, &buckets); - } - return self; - } - - const buckets: [11]f64 = blk: { - var bs: [11]f64 = undefined; - for (0..11) |i| { - bs[i] = std.math.pow(f64, 5.0, @as(f64, @floatFromInt(i)) - 1.0); - } - break :blk bs; - }; + pub const prefix = "scan_and_fix_roots"; + pub const buckets = sig.prometheus.histogram.exponentialBuckets(5, -1, 10); }; const TestDB = sig.ledger.tests.TestDB("writer"); @@ -346,7 +332,7 @@ test "setRoots" { .logger = logger, .lowest_cleanup_slot = &lowest_cleanup_slot, .max_root = &max_root, - .scan_and_fix_roots_metrics = try ScanAndFixRootsMetrics.init(registry), + .scan_and_fix_roots_metrics = try registry.initStruct(ScanAndFixRootsMetrics), }; const roots: [5]Slot = .{ 1, 2, 3, 4, 5 }; @@ -374,7 +360,7 @@ test "scanAndFixRoots" { .logger = logger, .lowest_cleanup_slot = &lowest_cleanup_slot, .max_root = &max_root, - .scan_and_fix_roots_metrics = try ScanAndFixRootsMetrics.init(registry), + .scan_and_fix_roots_metrics = try registry.initStruct(ScanAndFixRootsMetrics), }; // slot = 2 is not a root, but should be! @@ -416,7 +402,7 @@ test "setAndChainConnectedOnRootAndNextSlots" { .logger = logger, .lowest_cleanup_slot = &lowest_cleanup_slot, .max_root = &max_root, - .scan_and_fix_roots_metrics = try ScanAndFixRootsMetrics.init(registry), + .scan_and_fix_roots_metrics = try registry.initStruct(ScanAndFixRootsMetrics), }; // 1 is a root @@ -488,7 +474,7 @@ test "setAndChainConnectedOnRootAndNextSlots: disconnected" { .logger = logger, .lowest_cleanup_slot = &lowest_cleanup_slot, .max_root = &max_root, - .scan_and_fix_roots_metrics = try ScanAndFixRootsMetrics.init(registry), + .scan_and_fix_roots_metrics = try registry.initStruct(ScanAndFixRootsMetrics), }; // 1 is a root and full diff --git a/src/prometheus/counter.zig b/src/prometheus/counter.zig index db28a1dec..952641f04 100644 --- a/src/prometheus/counter.zig +++ b/src/prometheus/counter.zig @@ -1,15 +1,21 @@ const std = @import("std"); +const prometheus = @import("lib.zig"); + const mem = std.mem; const testing = std.testing; -const Metric = @import("metric.zig").Metric; +const Metric = prometheus.metric.Metric; +const MetricType = prometheus.metric.MetricType; +/// Monotonically increasing value. pub const Counter = struct { - const Self = @This(); - metric: Metric = Metric{ .getResultFn = getResult }, value: std.atomic.Value(u64) = std.atomic.Value(u64).init(0), + const Self = @This(); + + pub const metric_type: MetricType = .counter; + pub fn inc(self: *Self) void { _ = self.value.fetchAdd(1, .monotonic); } @@ -31,14 +37,6 @@ pub const Counter = struct { _ = self.value.store(0, .monotonic); } - pub fn set(self: *Self, value: anytype) void { - switch (@typeInfo(@TypeOf(value))) { - .Int, .Float, .ComptimeInt, .ComptimeFloat => {}, - else => @compileError("can't set a non-number"), - } - self.value.store(@intCast(value), .monotonic); - } - fn getResult(metric: *Metric, _: mem.Allocator) Metric.Error!Metric.Result { const self: *Self = @fieldParentPtr("metric", metric); return Metric.Result{ .counter = self.get() }; diff --git a/src/prometheus/gauge.zig b/src/prometheus/gauge.zig index caadccb31..50bffc3d9 100644 --- a/src/prometheus/gauge.zig +++ b/src/prometheus/gauge.zig @@ -1,6 +1,8 @@ const std = @import("std"); +const prometheus = @import("lib.zig"); -const Metric = @import("metric.zig").Metric; +const Metric = prometheus.metric.Metric; +const MetricType = prometheus.metric.MetricType; /// A gauge that stores the value it reports. /// Read and write operations are atomic and monotonic. @@ -11,6 +13,9 @@ pub fn Gauge(comptime T: type) type { const Self = @This(); + pub const metric_type: MetricType = .gauge; + pub const Data = T; + pub fn inc(self: *Self) void { self.value.fetchAdd(1, .monotonic); } @@ -24,11 +29,11 @@ pub fn Gauge(comptime T: type) type { } pub fn dec(self: *Self) void { - self.value.fetchSub(1, .monotonic); + _ = self.value.fetchSub(1, .monotonic); } pub fn sub(self: *Self, v: T) void { - self.value.fetchAdd(v, .monotonic); + _ = self.value.fetchAdd(v, .monotonic); } pub fn set(self: *Self, v: T) void { @@ -39,6 +44,14 @@ pub fn Gauge(comptime T: type) type { return self.value.load(.monotonic); } + pub fn max(self: *Self, v: T) void { + _ = self.value.fetchMax(v, .monotonic); + } + + pub fn min(self: *Self, v: T) void { + _ = self.value.fetchMin(v, .monotonic); + } + fn getResult(metric: *Metric, allocator: std.mem.Allocator) Metric.Error!Metric.Result { _ = allocator; diff --git a/src/prometheus/gauge_fn.zig b/src/prometheus/gauge_fn.zig index 5e971f19b..5e83d9bcd 100644 --- a/src/prometheus/gauge_fn.zig +++ b/src/prometheus/gauge_fn.zig @@ -1,8 +1,11 @@ const std = @import("std"); +const prometheus = @import("lib.zig"); + const mem = std.mem; const testing = std.testing; -const Metric = @import("metric.zig").Metric; +const Metric = prometheus.metric.Metric; +const MetricType = prometheus.metric.MetricType; pub fn GaugeCallFnType(comptime StateType: type, comptime Return: type) type { const CallFnArgType = switch (@typeInfo(StateType)) { @@ -16,15 +19,16 @@ pub fn GaugeCallFnType(comptime StateType: type, comptime Return: type) type { } pub fn GaugeFn(comptime StateType: type, comptime Return: type) type { - const CallFnType = GaugeCallFnType(StateType, Return); - return struct { - const Self = @This(); - metric: Metric = .{ .getResultFn = getResult }, callFn: CallFnType = undefined, state: StateType = undefined, + const Self = @This(); + + pub const CallFnType = GaugeCallFnType(StateType, Return); + pub const metric_type: MetricType = .gauge_fn; + pub fn init(callFn: CallFnType, state: StateType) Self { return .{ .callFn = callFn, diff --git a/src/prometheus/histogram.zig b/src/prometheus/histogram.zig index d6efa364a..6ec7fab93 100644 --- a/src/prometheus/histogram.zig +++ b/src/prometheus/histogram.zig @@ -1,12 +1,26 @@ const std = @import("std"); +const prometheus = @import("lib.zig"); + const Allocator = std.mem.Allocator; const ArrayList = std.ArrayList; const Atomic = std.atomic.Value; -const Metric = @import("metric.zig").Metric; +const Metric = prometheus.metric.Metric; +const MetricType = prometheus.metric.MetricType; pub const DEFAULT_BUCKETS: [11]f64 = .{ 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0 }; +pub fn exponentialBuckets(base: i64, comptime start: i64, comptime end: i64) [end - start]f64 { + std.debug.assert(end > start); + const base_float = @as(f64, @floatFromInt(base)); + var buckets: [end - start]f64 = undefined; + for (0..end - start) |i| { + const exponent = @as(f64, @floatFromInt(start + @as(i64, @intCast(i)))); + buckets[i] = std.math.pow(f64, base_float, exponent); + } + return buckets; +} + /// Histogram optimized for fast concurrent writes. /// Reads and writes are thread-safe if you use the public methods. /// Writes are lock-free. Reads are locked with a mutex because they occupy a shard. @@ -42,6 +56,8 @@ pub const Histogram = struct { /// Used by registry to report the histogram metric: Metric = .{ .getResultFn = getResult }, + pub const metric_type: MetricType = .histogram; + const ShardSync = packed struct { /// The total count of events that have started to be recorded (including those that finished). /// If this is larger than the shard count, it means a write is in progress. @@ -89,16 +105,21 @@ pub const Histogram = struct { } /// Writes a value into the histogram. - pub fn observe(self: *Self, value: f64) void { + pub fn observe( + self: *Self, + /// Must be f64 or int + value: anytype, + ) void { + const float: f64 = if (@typeInfo(@TypeOf(value)) == .Int) @floatFromInt(value) else value; const shard_sync = self.incrementCount(.acquire); // acquires lock. must be first step. const shard = &self.shards[shard_sync.shard]; for (self.upper_bounds.items, 0..) |bound, i| { - if (value <= bound) { + if (float <= bound) { _ = shard.buckets.items[i].fetchAdd(1, .monotonic); break; } } - _ = shard.sum.fetchAdd(value, .monotonic); + _ = shard.sum.fetchAdd(float, .monotonic); _ = shard.count.fetchAdd(1, .release); // releases lock. must be last step. } diff --git a/src/prometheus/lib.zig b/src/prometheus/lib.zig index 9f80c3dc4..4048b5d90 100644 --- a/src/prometheus/lib.zig +++ b/src/prometheus/lib.zig @@ -1,4 +1,5 @@ pub const counter = @import("counter.zig"); +pub const variant_counter = @import("variant_counter.zig"); pub const gauge_fn = @import("gauge_fn.zig"); pub const gauge = @import("gauge.zig"); pub const histogram = @import("histogram.zig"); @@ -7,11 +8,11 @@ pub const metric = @import("metric.zig"); pub const registry = @import("registry.zig"); pub const Counter = counter.Counter; +pub const VariantCounter = variant_counter.VariantCounter; pub const GaugeFn = gauge_fn.GaugeFn; pub const Gauge = gauge.Gauge; pub const GetMetricError = registry.GetMetricError; pub const Histogram = histogram.Histogram; -pub const DEFAULT_HISTOGRAM_BUCKETS = histogram.DEFAULT_BUCKETS; pub const Registry = registry.Registry; pub const globalRegistry = registry.globalRegistry; diff --git a/src/prometheus/metric.zig b/src/prometheus/metric.zig index fd367a757..01f74590d 100644 --- a/src/prometheus/metric.zig +++ b/src/prometheus/metric.zig @@ -1,9 +1,12 @@ const std = @import("std"); +const prometheus = @import("lib.zig"); + const fmt = std.fmt; const mem = std.mem; const testing = std.testing; -const HistogramSnapshot = @import("histogram.zig").HistogramSnapshot; +const HistogramSnapshot = prometheus.histogram.HistogramSnapshot; +const VariantCounts = prometheus.variant_counter.VariantCounts; pub const Metric = struct { pub const Error = error{OutOfMemory} || std.posix.WriteError || std.http.Server.Response.WriteError; @@ -15,6 +18,7 @@ pub const Metric = struct { gauge: f64, gauge_int: u64, histogram: HistogramSnapshot, + variant_counter: VariantCounts, pub fn deinit(self: Self, allocator: mem.Allocator) void { switch (self) { @@ -39,6 +43,14 @@ pub const Metric = struct { .gauge => |v| { return try writer.print("{s} {d:.6}\n", .{ name, v }); }, + .variant_counter => |counts| { + for (counts.counts, counts.names) |counter, label| { + try writer.print( + "{s}_{s}_count {d}\n", + .{ name, label, counter.load(.monotonic) }, + ); + } + }, .histogram => |v| { if (v.buckets.len <= 0) return; @@ -85,6 +97,14 @@ pub const Metric = struct { } }; +pub const MetricType = enum { + counter, + variant_counter, + gauge, + gauge_fn, + histogram, +}; + /// Converts a float into an anonymous type that can be formatted properly for prometheus. pub fn floatMetric(value: anytype) struct { value: @TypeOf(value), diff --git a/src/prometheus/registry.zig b/src/prometheus/registry.zig index 1b033be00..dd718b53a 100644 --- a/src/prometheus/registry.zig +++ b/src/prometheus/registry.zig @@ -10,6 +10,7 @@ const OnceCell = @import("../sync/once_cell.zig").OnceCell; const Metric = @import("metric.zig").Metric; const Counter = @import("counter.zig").Counter; +const VariantCounter = @import("variant_counter.zig").VariantCounter; const Gauge = @import("gauge.zig").Gauge; const GaugeFn = @import("gauge_fn.zig").GaugeFn; const GaugeCallFnType = @import("gauge_fn.zig").GaugeCallFnType; @@ -47,6 +48,10 @@ const RegistryOptions = struct { pub fn Registry(comptime options: RegistryOptions) type { return struct { + arena_state: heap.ArenaAllocator, + mutex: std.Thread.Mutex, + metrics: MetricMap, + const Self = @This(); const MetricMap = hash_map.StringHashMapUnmanaged(struct { @@ -55,10 +60,6 @@ pub fn Registry(comptime options: RegistryOptions) type { metric: *Metric, }); - arena_state: heap.ArenaAllocator, - mutex: std.Thread.Mutex, - metrics: MetricMap, - pub fn init(allocator: mem.Allocator) Self { return .{ .arena_state = heap.ArenaAllocator.init(allocator), @@ -71,6 +72,75 @@ pub fn Registry(comptime options: RegistryOptions) type { self.arena_state.deinit(); } + /// Initialize a struct full of metrics. + /// Every field must be a supported metric type. + pub fn initStruct(self: *Self, comptime Struct: type) GetMetricError!Struct { + var metrics_struct: Struct = undefined; + inline for (@typeInfo(Struct).Struct.fields) |field| { + try self.initMetric(Struct, &@field(metrics_struct, field.name), field.name); + } + return metrics_struct; + } + + /// Initialize any fields within the struct that are supported metric types. + /// Leaves other fields untouched. + /// + /// Returns the number of fields that were *not* initialized. + pub fn initFields( + self: *Self, + /// Mutable pointer to a struct containing metrics. + metrics_struct: anytype, + ) GetMetricError!usize { + const Struct = @typeInfo(@TypeOf(metrics_struct)).Pointer.child; + const fields = @typeInfo(Struct).Struct.fields; + var num_fields_skipped: usize = fields.len; + inline for (@typeInfo(Struct).Struct.fields) |field| { + if (@typeInfo(field.type) == .Pointer) { + const MetricType = @typeInfo(field.type).Pointer.child; + if (@hasDecl(MetricType, "metric_type")) { + try self.initMetric(Struct, &@field(metrics_struct, field.name), field.name); + num_fields_skipped -= 1; + } + } + } + return num_fields_skipped; + } + + /// Assumes the metric type is **SomeMetric and initializes it. + /// + /// Uses the following declarations from Config: + /// - `prefix` if it exists + /// - `buckets` - required if metric is a *Histogram + /// + /// NOTE: does not support GaugeFn + /// + /// If expectations are violated, throws a compile error. + fn initMetric( + self: *Self, + Config: type, + /// Should be a mutable pointer to the location containing the + /// pointer to the metric, so `**SomeMetric` (ptr to a ptr) + metric: anytype, + comptime local_name: []const u8, + ) GetMetricError!void { + const MetricType = @typeInfo(@typeInfo(@TypeOf(metric)).Pointer.child).Pointer.child; + const prefix = if (@hasDecl(Config, "prefix")) Config.prefix ++ "_" else ""; + const name = prefix ++ local_name; + metric.* = switch (MetricType.metric_type) { + .counter => try self.getOrCreateCounter(name), + .variant_counter => try self.getOrCreateVariantCounter(name, MetricType.Type), + .gauge => try self.getOrCreateGauge(name, MetricType.Data), + .gauge_fn => @compileError("GaugeFn does not support auto-init."), + .histogram => try self.getOrCreateHistogram( + name, + if (@typeInfo(@TypeOf(Config.buckets)) == .Fn) + Config.buckets(name) + else + &Config.buckets, + ), + }; + } + fn nbMetrics(self: *const Self) usize { return self.metrics.count(); } @@ -104,6 +174,14 @@ pub fn Registry(comptime options: RegistryOptions) type { return self.getOrCreateMetric(name, Histogram, .{buckets}); } + pub fn getOrCreateVariantCounter( + self: *Self, + name: []const u8, + ErrorSet: type, + ) GetMetricError!*VariantCounter(ErrorSet) { + return self.getOrCreateMetric(name, VariantCounter(ErrorSet), .{}); + } + /// MetricType must be initializable in one of these ways: /// - try MetricType.init(allocator, ...args) /// - MetricType.init(...args) diff --git a/src/prometheus/variant_counter.zig b/src/prometheus/variant_counter.zig new file mode 100644 index 000000000..f16b7f075 --- /dev/null +++ b/src/prometheus/variant_counter.zig @@ -0,0 +1,145 @@ +const std = @import("std"); +const prometheus = @import("lib.zig"); + +const Atomic = std.atomic.Value; + +const Counter = prometheus.Counter; +const Metric = prometheus.metric.Metric; +const MetricType = prometheus.metric.MetricType; +const Registry = prometheus.Registry; + +/// Separately count the occurrence of each variant within an enum or error set. +/// +/// This is a composite metric, similar to a histogram, except the "buckets" are +/// discrete, unordered, and identified by name, instead of representing numeric ranges. +pub fn VariantCounter(comptime EnumOrError: type) type { + const indexer = VariantIndexer.init(EnumOrError); + const names = indexer.names(); + + return struct { + counts: [names.len]Atomic(u64) = .{Atomic(u64).init(0)} ** names.len, + metric: Metric = .{ .getResultFn = getResult }, + + const Self = @This(); + + pub const metric_type: MetricType = .variant_counter; + pub const Type = EnumOrError; + + pub fn observe(self: *Self, err: EnumOrError) void { + _ = self.counts[indexer.index(err)].fetchAdd(1, .monotonic); + } + + pub fn getResult(metric: *Metric, _: std.mem.Allocator) Metric.Error!Metric.Result { + const self: *Self = @fieldParentPtr("metric", metric); + return .{ .variant_counter = .{ + .counts = &self.counts, + .names = &names, + } }; + } + }; +} + +pub const VariantCounts = struct { + counts: []const Atomic(u64), + names: []const []const u8, +}; + +/// Assigns a continuous sequence of integers starting at 0 to the +/// variants. For example, if you have an variant set with 10 variants, +/// it will assign the numbers 0-9 to each of the variants. +/// +/// Enums usually already have numbers with this property, but it's +/// not guaranteed. This is particularly useful for variants, whose +/// integers are likely not continuous. +/// +/// You can provide an variant to determine its index, or provide an +/// index to get the respective variant. +/// +/// This struct must be initialized at comptime, but it can be used +/// at any time. +const VariantIndexer = struct { + EnumOrError: type, + offset: u16, + index_to_int: []const u16, + int_to_index: []const u16, + len: usize, + + const Self = @This(); + + pub fn init(comptime EnumOrError: type) Self { + const variants = switch (@typeInfo(EnumOrError)) { + .ErrorSet => |es| es.?, + .Enum => |e| e.fields, + else => @compileError(@typeName(EnumOrError) ++ " is neither error nor enum"), + }; + + // get min and max to determine array bounds and offset + var max: u16 = 0; + var min: u16 = std.math.maxInt(u16); + for (variants) |variant| { + const int = toInt(@field(EnumOrError, variant.name)); + max = @max(max, int); + min = @min(min, int); + } + const offset = min; + + // populate maps translating between the index and the variant's int representation + var init_index_to_int: [variants.len]u16 = undefined; + var init_int_to_index: [1 + max - min]u16 = undefined; + for (variants, 0..) |variant, i| { + const int = toInt(@field(EnumOrError, variant.name)); + init_index_to_int[i] = int; + init_int_to_index[int - offset] = @intCast(i); + } + const index_to_int = init_index_to_int; + const int_to_index = init_int_to_index; + + return .{ + .EnumOrError = EnumOrError, + .offset = offset, + .index_to_int = &index_to_int, + .int_to_index = &int_to_index, + .len = index_to_int.len, + }; + } + + pub fn index(self: Self, err: self.EnumOrError) usize { + return self.int_to_index[toInt(err)]; + } + + pub fn get(self: Self, index_: usize) self.EnumOrError { + return self.fromInt(self.index_to_int[index_]); + } + + pub fn names(self: Self) [self.len][]const u8 { + var name_array: [self.len][]const u8 = undefined; + for (0..self.len) |i| { + name_array[i] = self.toName(self.fromInt(self.index_to_int[i])); + } + return name_array; + } + + fn toInt(err: anytype) u16 { + return switch (@typeInfo(@TypeOf(err))) { + .ErrorSet => @intFromError(err), + .Enum => @intFromEnum(err), + else => unreachable, // init prevents this + }; + } + + fn fromInt(self: Self, int: u16) self.EnumOrError { + return switch (@typeInfo(self.EnumOrError)) { + .ErrorSet => @errorCast(@errorFromInt(int)), + .Enum => @enumFromInt(int), + else => unreachable, // init prevents this + }; + } + + fn toName(self: Self, item: self.EnumOrError) []const u8 { + return switch (@typeInfo(self.EnumOrError)) { + .ErrorSet => @errorName(item), + .Enum => @tagName(item), + else => unreachable, // init prevents this + }; + } +}; diff --git a/src/shred_collector/repair_service.zig b/src/shred_collector/repair_service.zig index 5085280b4..c303b9e6c 100644 --- a/src/shred_collector/repair_service.zig +++ b/src/shred_collector/repair_service.zig @@ -14,6 +14,8 @@ const Socket = zig_network.Socket; const BasicShredTracker = shred_collector.shred_tracker.BasicShredTracker; const ContactInfo = sig.gossip.ContactInfo; +const Counter = sig.prometheus.Counter; +const Gauge = sig.prometheus.Gauge; const GossipTable = sig.gossip.GossipTable; const HomogeneousThreadPool = sig.utils.thread.HomogeneousThreadPool; const Logger = sig.trace.Logger; @@ -22,6 +24,7 @@ const MultiSlotReport = shred_collector.shred_tracker.MultiSlotReport; const Nonce = sig.core.Nonce; const Packet = sig.net.Packet; const Pubkey = sig.core.Pubkey; +const Registry = sig.prometheus.Registry; const RwMux = sig.sync.RwMux; const SignedGossipData = sig.gossip.SignedGossipData; const SocketAddr = sig.net.SocketAddr; @@ -48,11 +51,10 @@ pub const RepairService = struct { logger: Logger, exit: *Atomic(bool), last_big_request_timestamp_ms: i64 = 0, - /// memory to re-use across iterations. initialized to empty report: MultiSlotReport, - thread_pool: RequestBatchThreadPool, + metrics: Metrics, pub const RequestBatchThreadPool = HomogeneousThreadPool(struct { requester: *RepairRequester, @@ -63,16 +65,28 @@ pub const RepairService = struct { } }); + const Metrics = struct { + repair_request_count: *Counter, + requests_in_latest_batch: *Gauge(u64), + oldest_slot_needing_repair: *Gauge(u64), + newest_slot_needing_repair: *Gauge(u64), + newest_slot_to_request: *Gauge(u64), + oldest_slot_to_request: *Gauge(u64), + + const prefix = "repair"; + }; + const Self = @This(); pub fn init( allocator: Allocator, logger: Logger, exit: *Atomic(bool), + registry: *Registry(.{}), requester: RepairRequester, peer_provider: RepairPeerProvider, shred_tracker: *BasicShredTracker, - ) Self { + ) !Self { return RepairService{ .allocator = allocator, .requester = requester, @@ -82,6 +96,7 @@ pub const RepairService = struct { .exit = exit, .report = MultiSlotReport.init(allocator), .thread_pool = RequestBatchThreadPool.init(allocator, NUM_REQUESTER_THREADS), + .metrics = try registry.initStruct(Metrics), }; } @@ -122,10 +137,12 @@ pub const RepairService = struct { pub fn sendNecessaryRepairs(self: *Self) !void { const repair_requests = try self.getRepairs(); defer repair_requests.deinit(); + self.metrics.repair_request_count.add(repair_requests.items.len); const addressed_requests = try self.assignRequestsToPeers(repair_requests.items); defer addressed_requests.deinit(); if (addressed_requests.items.len < 4) { + self.metrics.requests_in_latest_batch.set(addressed_requests.items.len); try self.requester.sendRepairRequestBatch(addressed_requests.items); } else { for (0..4) |i| { @@ -153,6 +170,8 @@ pub const RepairService = struct { const MAX_HIGHEST_REPAIRS = 200; fn getRepairs(self: *Self) !ArrayList(RepairRequest) { + var oldest_slot_needing_repair: u64 = 0; + var newest_slot_needing_repair: u64 = 0; var repairs = ArrayList(RepairRequest).init(self.allocator); if (!try self.shred_tracker.identifyMissing(&self.report)) { return repairs; @@ -169,6 +188,8 @@ pub const RepairService = struct { for (self.report.items()) |*report| outer: { slot = report.slot; + oldest_slot_needing_repair = @min(slot, oldest_slot_needing_repair); + newest_slot_needing_repair = @max(slot, newest_slot_needing_repair); for (report.missing_shreds.items) |shred_window| { if (shred_window.end) |end| { for (shred_window.start..end) |i| { @@ -186,11 +207,21 @@ pub const RepairService = struct { } } + var newest_slot_to_request: u64 = newest_slot_needing_repair; + var oldest_slot_to_request: u64 = oldest_slot_needing_repair; if (highest_count < num_highest_repairs) { for (slot..slot + num_highest_repairs - highest_count) |s| { + newest_slot_to_request = @max(slot, newest_slot_to_request); + oldest_slot_to_request = @min(slot, oldest_slot_to_request); try repairs.append(.{ .HighestShred = .{ s, 0 } }); } } + + self.metrics.oldest_slot_needing_repair.set(oldest_slot_needing_repair); + self.metrics.newest_slot_needing_repair.set(newest_slot_needing_repair); + self.metrics.newest_slot_to_request.set(newest_slot_to_request); + self.metrics.oldest_slot_to_request.set(oldest_slot_to_request); + return repairs; } @@ -220,13 +251,22 @@ pub const RepairRequester = struct { rng: Random, keypair: *const KeyPair, sender: SocketThread, + metrics: Metrics, const Self = @This(); + const Metrics = struct { + sent_request_count: *Counter, + pending_requests: *Gauge(u64), + + const prefix = "repair"; + }; + pub fn init( allocator: Allocator, logger: Logger, rng: Random, + registry: *Registry(.{}), keypair: *const KeyPair, udp_send_socket: Socket, exit: *Atomic(bool), @@ -238,6 +278,7 @@ pub const RepairRequester = struct { .rng = rng, .keypair = keypair, .sender = sndr, + .metrics = try registry.initStruct(Metrics), }; } @@ -249,6 +290,8 @@ pub const RepairRequester = struct { self: *const Self, requests: []const AddressedRepairRequest, ) !void { + self.metrics.pending_requests.add(requests.len); + defer self.metrics.pending_requests.set(0); const timestamp = std.time.milliTimestamp(); for (requests) |request| { var packet: Packet = .{ @@ -266,6 +309,8 @@ pub const RepairRequester = struct { ); packet.size = data.len; try self.sender.channel.send(packet); + self.metrics.pending_requests.dec(); + self.metrics.sent_request_count.inc(); } } }; @@ -309,6 +354,16 @@ pub const RepairPeerProvider = struct { cache: LruCacheCustom(.non_locking, Slot, RepairPeers, Allocator, RepairPeers.deinit), my_pubkey: Pubkey, my_shred_version: *const Atomic(u16), + metrics: Metrics, + + pub const Metrics = struct { + latest_count_from_gossip: *Gauge(u64), + cache_hit_count: *Counter, + cache_miss_count: *Counter, + cache_expired_count: *Counter, + + const prefix = "repair_peers"; + }; const Self = @This(); @@ -324,10 +379,11 @@ pub const RepairPeerProvider = struct { pub fn init( allocator: Allocator, rng: Random, + registry: *Registry(.{}), gossip: *RwMux(GossipTable), my_pubkey: Pubkey, my_shred_version: *const Atomic(u16), - ) error{OutOfMemory}!RepairPeerProvider { + ) !RepairPeerProvider { return .{ .allocator = allocator, .gossip_table_rw = gossip, @@ -336,6 +392,7 @@ pub const RepairPeerProvider = struct { .my_pubkey = my_pubkey, .my_shred_version = my_shred_version, .rng = rng, + .metrics = try registry.initStruct(Metrics), }; } @@ -364,11 +421,14 @@ pub const RepairPeerProvider = struct { if (self.cache.get(slot)) |peers| { if (now - peers.insertion_time_secs <= REPAIR_PEERS_CACHE_TTL_SECONDS) { + self.metrics.cache_hit_count.inc(); return peers.peers; } - } + self.metrics.cache_expired_count.inc(); + } else self.metrics.cache_miss_count.inc(); const peers = try self.getRepairPeersFromGossip(self.allocator, slot); + self.metrics.latest_count_from_gossip.set(peers.len); try self.cache.insert(slot, .{ .insertion_time_secs = now, .peers = peers, @@ -422,6 +482,7 @@ pub const RepairPeerProvider = struct { test "RepairService sends repair request to gossip peer" { const allocator = std.testing.allocator; + const registry = sig.prometheus.globalRegistry(); var rand = std.rand.DefaultPrng.init(4328095); var random = rand.random(); const TestLogger = sig.trace.DirectPrintLogger; @@ -465,20 +526,23 @@ test "RepairService sends repair request to gossip peer" { const peers = try RepairPeerProvider.init( allocator, random, + registry, &gossip_mux, Pubkey.fromPublicKey(&keypair.public_key), &my_shred_version, ); - var tracker = BasicShredTracker.init(13579, .noop); - var service = RepairService.init( + var tracker = try BasicShredTracker.init(13579, .noop, registry); + var service = try RepairService.init( allocator, logger, &exit, + registry, try RepairRequester.init( allocator, logger, random, + registry, &keypair, repair_socket, &exit, @@ -536,6 +600,7 @@ test "RepairPeerProvider selects correct peers" { var peers = try RepairPeerProvider.init( allocator, random, + sig.prometheus.globalRegistry(), &gossip_mux, Pubkey.fromPublicKey(&keypair.public_key), &my_shred_version, diff --git a/src/shred_collector/service.zig b/src/shred_collector/service.zig index ed0257ab6..522cb97fc 100644 --- a/src/shred_collector/service.zig +++ b/src/shred_collector/service.zig @@ -15,6 +15,7 @@ const Logger = sig.trace.Logger; const Packet = sig.net.Packet; const Pubkey = sig.core.Pubkey; const RwMux = sig.sync.RwMux; +const Registry = sig.prometheus.Registry; const ServiceManager = sig.utils.service_manager.ServiceManager; const Slot = sig.core.Slot; const SlotLeaderProvider = sig.core.leader_schedule.SlotLeaderProvider; @@ -39,6 +40,7 @@ pub const ShredCollectorDependencies = struct { allocator: Allocator, logger: Logger, random: Random, + registry: *Registry(.{}), /// This validator's keypair my_keypair: *const KeyPair, /// Shared exit indicator, used to shutdown the Shred Collector. @@ -90,7 +92,7 @@ pub fn start( .turbine_socket = turbine_socket, .unverified_shred_sender = unverified_shred_channel, .shred_version = deps.my_shred_version, - .metrics = try ShredReceiverMetrics.init(), + .metrics = try deps.registry.initStruct(ShredReceiverMetrics), .root_slot = if (conf.start_slot) |s| s - 1 else 0, }; try service_manager.spawn( @@ -106,6 +108,7 @@ pub fn start( shred_collector.shred_verifier.runShredVerifier, .{ deps.exit, + deps.registry, unverified_shred_channel, verified_shred_channel, deps.leader_schedule, @@ -115,9 +118,10 @@ pub fn start( // tracker (shared state, internal to Shred Collector) const shred_tracker = try arena.create(BasicShredTracker); - shred_tracker.* = BasicShredTracker.init( + shred_tracker.* = try BasicShredTracker.init( conf.start_slot, deps.logger, + deps.registry, ); // processor (thread) @@ -128,6 +132,7 @@ pub fn start( deps.allocator, deps.exit, deps.logger, + deps.registry, verified_shred_channel, shred_tracker, deps.shred_inserter, @@ -140,6 +145,7 @@ pub fn start( const repair_peer_provider = try RepairPeerProvider.init( deps.allocator, deps.random, + deps.registry, deps.gossip_table_rw, Pubkey.fromPublicKey(&deps.my_keypair.public_key), deps.my_shred_version, @@ -148,16 +154,18 @@ pub fn start( deps.allocator, deps.logger, deps.random, + deps.registry, deps.my_keypair, repair_socket, deps.exit, ); const repair_svc = try arena.create(RepairService); try service_manager.defers.deferCall(RepairService.deinit, .{repair_svc}); - repair_svc.* = RepairService.init( + repair_svc.* = try RepairService.init( deps.allocator, deps.logger, deps.exit, + deps.registry, repair_requester, repair_peer_provider, shred_tracker, diff --git a/src/shred_collector/shred_processor.zig b/src/shred_collector/shred_processor.zig index 0be342185..38309298a 100644 --- a/src/shred_collector/shred_processor.zig +++ b/src/shred_collector/shred_processor.zig @@ -10,10 +10,15 @@ const Atomic = std.atomic.Value; const BasicShredTracker = shred_collector.shred_tracker.BasicShredTracker; const Channel = sig.sync.Channel; +const Counter = sig.prometheus.Counter; +const Histogram = sig.prometheus.Histogram; const Logger = sig.trace.Logger; const Packet = sig.net.Packet; +const Registry = sig.prometheus.Registry; const Shred = sig.ledger.shred.Shred; const ShredInserter = sig.ledger.ShredInserter; +const SlotOutOfBounds = shred_collector.shred_tracker.SlotOutOfBounds; +const VariantCounter = sig.prometheus.VariantCounter; // TODO: add metrics (e.g. total count of shreds processed) @@ -22,6 +27,7 @@ pub fn runShredProcessor( allocator: Allocator, exit: *Atomic(bool), logger: Logger, + registry: *Registry(.{}), // shred verifier --> me verified_shred_receiver: *Channel(Packet), tracker: *BasicShredTracker, @@ -32,6 +38,7 @@ pub fn runShredProcessor( var shreds: ArrayListUnmanaged(Shred) = .{}; var is_repaired: ArrayListUnmanaged(bool) = .{}; var error_context: ErrorContext = .{}; + const metrics = try registry.initStruct(Metrics); while (!exit.load(.acquire) or verified_shred_receiver.len() != 0) @@ -42,6 +49,7 @@ pub fn runShredProcessor( processShred( allocator, tracker, + metrics, &packet, &shreds, &is_repaired, @@ -54,6 +62,8 @@ pub fn runShredProcessor( error_context = .{}; }; } + metrics.insertion_batch_size.observe(shreds.items.len); + metrics.passed_to_inserter_count.add(shreds.items.len); _ = try shred_inserter.insertShreds( shreds.items, is_repaired.items, @@ -69,6 +79,7 @@ const ErrorContext = struct { slot: ?u64 = null, index: ?u32 = null }; fn processShred( allocator: Allocator, tracker: *BasicShredTracker, + metrics: Metrics, packet: *const Packet, shreds: *ArrayListUnmanaged(Shred), is_repaired: *ArrayListUnmanaged(bool), @@ -81,7 +92,10 @@ fn processShred( errdefer error_context.index = index; tracker.registerShred(slot, index) catch |err| switch (err) { - error.SlotUnderflow, error.SlotOverflow => return, + error.SlotUnderflow, error.SlotOverflow => { + metrics.register_shred_error.observe(err); + return; + }, }; var shred = try shreds.addOne(allocator); @@ -94,14 +108,32 @@ fn processShred( if (shred.* == .data) { const parent = try shred.data.parent(); if (parent + 1 != slot) { + metrics.skipped_slot_count.add(slot - parent); tracker.skipSlots(parent, slot) catch |err| switch (err) { - error.SlotUnderflow, error.SlotOverflow => {}, + error.SlotUnderflow, error.SlotOverflow => { + metrics.skip_slots_error.observe(err); + }, }; } } if (shred.isLastInSlot()) { tracker.setLastShred(slot, index) catch |err| switch (err) { - error.SlotUnderflow, error.SlotOverflow => return, + error.SlotUnderflow, error.SlotOverflow => { + metrics.set_last_shred_error.observe(err); + return; + }, }; } } + +const Metrics = struct { + passed_to_inserter_count: *Counter, + skipped_slot_count: *Counter, + insertion_batch_size: *Histogram, + register_shred_error: *VariantCounter(SlotOutOfBounds), + skip_slots_error: *VariantCounter(SlotOutOfBounds), + set_last_shred_error: *VariantCounter(SlotOutOfBounds), + + pub const prefix = "shred_processor"; + pub const buckets = sig.prometheus.histogram.exponentialBuckets(2, -1, 8); +}; diff --git a/src/shred_collector/shred_receiver.zig b/src/shred_collector/shred_receiver.zig index d08eec469..3c4708683 100644 --- a/src/shred_collector/shred_receiver.zig +++ b/src/shred_collector/shred_receiver.zig @@ -12,6 +12,8 @@ const KeyPair = std.crypto.sign.Ed25519.KeyPair; const Socket = network.Socket; const Channel = sig.sync.Channel; +const Counter = sig.prometheus.Counter; +const Histogram = sig.prometheus.Histogram; const Logger = sig.trace.Logger; const Packet = sig.net.Packet; const Ping = sig.gossip.Ping; @@ -19,6 +21,7 @@ const Pong = sig.gossip.Pong; const RepairMessage = shred_collector.repair_message.RepairMessage; const Slot = sig.core.Slot; const SocketThread = sig.net.SocketThread; +const VariantCounter = sig.prometheus.VariantCounter; const NUM_TVU_RECEIVERS = 2; @@ -91,9 +94,16 @@ pub const ShredReceiver = struct { ) !void { while (!self.exit.load(.acquire)) { for (receivers) |receiver| { + var count: usize = 0; while (receiver.receive()) |packet| { + count += 1; try self.handlePacket(packet, response_sender, is_repair); } + if (is_repair) { + self.metrics.repair_batch_size.observe(count); + } else { + self.metrics.turbine_batch_size.observe(count); + } } } } @@ -106,12 +116,16 @@ pub const ShredReceiver = struct { comptime is_repair: bool, ) !void { if (packet.size == REPAIR_RESPONSE_SERIALIZED_PING_BYTES) { - if (try self.handlePing(&packet)) |p| try response_sender.send(p); + if (try self.handlePing(&packet)) |p| { + try response_sender.send(p); + self.metrics.pong_sent_count.inc(); + } } else { const max_slot = std.math.maxInt(Slot); // TODO agave uses BankForks for this - if (shouldDiscardShred(&packet, self.root_slot, self.shred_version, max_slot)) { + validateShred(&packet, self.root_slot, self.shred_version, max_slot) catch |err| { + self.metrics.discard.observe(err); return; - } + }; var our_packet = packet; if (is_repair) our_packet.flags.set(.repair); try self.unverified_shred_sender.send(our_packet); @@ -121,14 +135,15 @@ pub const ShredReceiver = struct { /// Handle a ping message and returns the repair message. fn handlePing(self: *const Self, packet: *const Packet) !?Packet { const repair_ping = bincode.readFromSlice(self.allocator, RepairPing, &packet.data, .{}) catch { - self.metrics.invalid_repair_pings.inc(); + self.metrics.ping_deserialize_fail_count.inc(); return null; }; const ping = repair_ping.Ping; ping.verify() catch { - self.metrics.invalid_repair_pings.inc(); + self.metrics.ping_verify_fail_count.inc(); return null; }; + self.metrics.valid_ping_count.inc(); const reply: RepairMessage = .{ .Pong = try Pong.init(&ping, self.keypair) }; var reply_packet = Packet.default(); @@ -139,30 +154,36 @@ pub const ShredReceiver = struct { } }; -fn shouldDiscardShred( +fn validateShred( packet: *const Packet, root: Slot, shred_version: *const Atomic(u16), max_slot: Slot, -) bool { - const shred = layout.getShred(packet) orelse return true; - const version = layout.getVersion(shred) orelse return true; - const slot = layout.getSlot(shred) orelse return true; - const index = layout.getIndex(shred) orelse return true; - const variant = layout.getShredVariant(shred) orelse return true; - - if (version != shred_version.load(.acquire)) return true; - if (slot > max_slot) return true; +) ShredValidationError!void { + const shred = layout.getShred(packet) orelse return error.insufficient_shred_size; + const version = layout.getVersion(shred) orelse return error.missing_version; + const slot = layout.getSlot(shred) orelse return error.slot_missing; + const index = layout.getIndex(shred) orelse return error.index_missing; + const variant = layout.getShredVariant(shred) orelse return error.variant_missing; + + if (version != shred_version.load(.acquire)) return error.wrong_version; + if (slot > max_slot) return error.slot_too_new; switch (variant.shred_type) { .code => { - if (index >= sig.ledger.shred.code_shred_constants.max_per_slot) return true; - if (slot <= root) return true; + if (index >= sig.ledger.shred.code_shred_constants.max_per_slot) { + return error.code_index_too_high; + } + if (slot <= root) return error.rooted_slot; }, .data => { - if (index >= sig.ledger.shred.data_shred_constants.max_per_slot) return true; - const parent_offset = layout.getParentOffset(shred) orelse return true; + if (index >= sig.ledger.shred.data_shred_constants.max_per_slot) { + return error.data_index_too_high; + } + const parent_offset = layout.getParentOffset(shred) orelse { + return error.parent_offset_missing; + }; const parent = slot -| @as(Slot, @intCast(parent_offset)); - if (!verifyShredSlots(slot, parent, root)) return true; + if (!verifyShredSlots(slot, parent, root)) return error.slot_verification_failed; }, } @@ -171,10 +192,8 @@ fn shouldDiscardShred( // https://github.com/solana-labs/solana/pull/34916 // https://github.com/solana-labs/solana/pull/35076 - _ = layout.getSignature(shred) orelse return true; - _ = layout.getSignedData(shred) orelse return true; - - return false; + _ = layout.getSignature(shred) orelse return error.signature_missing; + _ = layout.getSignedData(shred) orelse return error.signed_data_missing; } /// TODO: this may need to move to blockstore @@ -192,10 +211,34 @@ const REPAIR_RESPONSE_SERIALIZED_PING_BYTES = 132; const RepairPing = union(enum) { Ping: Ping }; pub const ShredReceiverMetrics = struct { - invalid_repair_pings: *sig.prometheus.Counter, + received_count: *Counter, + satisfactory_shred_count: *Counter, + valid_ping_count: *Counter, + ping_deserialize_fail_count: *Counter, + ping_verify_fail_count: *Counter, + pong_sent_count: *Counter, + repair_batch_size: *Histogram, + turbine_batch_size: *Histogram, + discard: *VariantCounter(ShredValidationError), + + pub const prefix = "shred_receiver"; + pub const buckets = sig.prometheus.histogram.exponentialBuckets(2, -1, 8); +}; - pub fn init() !ShredReceiverMetrics { - const registry = sig.prometheus.globalRegistry(); - return .{ .invalid_repair_pings = try registry.getOrCreateCounter("invalid_repair_pings") }; - } +/// Something about the shred was unexpected, so we will discard it. +pub const ShredValidationError = error{ + insufficient_shred_size, + missing_version, + slot_missing, + index_missing, + variant_missing, + wrong_version, + slot_too_new, + code_index_too_high, + rooted_slot, + data_index_too_high, + parent_offset_missing, + slot_verification_failed, + signature_missing, + signed_data_missing, }; diff --git a/src/shred_collector/shred_tracker.zig b/src/shred_collector/shred_tracker.zig index b0438d765..3dc5874f6 100644 --- a/src/shred_collector/shred_tracker.zig +++ b/src/shred_collector/shred_tracker.zig @@ -6,6 +6,8 @@ const Allocator = std.mem.Allocator; const ArrayList = std.ArrayList; const Mutex = std.Thread.Mutex; +const Gauge = sig.prometheus.Gauge; +const Registry = sig.prometheus.Registry; const Slot = sig.core.Slot; const MAX_SHREDS_PER_SLOT: usize = sig.ledger.shred.MAX_SHREDS_PER_SLOT; @@ -33,16 +35,25 @@ pub const BasicShredTracker = struct { max_slot_seen: Slot = 0, /// ring buffer slots: [num_slots]MonitoredSlot = .{.{}} ** num_slots, + metrics: Metrics, const num_slots: usize = 1024; + const Metrics = struct { + finished_slots_through: *Gauge(u64), + max_slot_processed: *Gauge(u64), + + pub const prefix = "shred_tracker"; + }; + const Self = @This(); - pub fn init(slot: ?Slot, logger: sig.trace.Logger) Self { + pub fn init(slot: ?Slot, logger: sig.trace.Logger, registry: *Registry(.{})) !Self { return .{ .start_slot = slot, .current_bottom_slot = slot orelse 0, .logger = logger, + .metrics = try registry.initStruct(Metrics), }; } @@ -66,7 +77,10 @@ pub const BasicShredTracker = struct { if (!monitored_slot.is_complete) { monitored_slot.is_complete = true; self.logger.info().logf("skipping slot: {}", .{slot}); - self.max_slot_processed = @max(self.max_slot_processed, slot); + if (slot > self.max_slot_processed) { + self.max_slot_processed = slot; + self.metrics.max_slot_processed.set(slot); + } } } } @@ -82,7 +96,10 @@ pub const BasicShredTracker = struct { const monitored_slot = try self.observeSlot(slot); const new = monitored_slot.record(shred_index); if (new) self.logger.debug().logf("new slot: {}", .{slot}); - self.max_slot_processed = @max(self.max_slot_processed, slot); + if (slot > self.max_slot_processed) { + self.max_slot_processed = slot; + self.metrics.max_slot_processed.set(slot); + } } pub fn setLastShred(self: *Self, slot: Slot, index: usize) SlotOutOfBounds!void { @@ -127,6 +144,7 @@ pub const BasicShredTracker = struct { self.logger.debug().logf("shred tracker: received all shreds up to slot {}", .{slot}); } self.current_bottom_slot = @max(self.current_bottom_slot, slot + 1); + self.metrics.finished_slots_through.set(slot); monitored_slot.* = .{}; } } @@ -239,7 +257,7 @@ test "trivial happy path" { var msr = MultiSlotReport.init(allocator); defer msr.deinit(); - var tracker = BasicShredTracker.init(13579, .noop); + var tracker = try BasicShredTracker.init(13579, .noop, sig.prometheus.globalRegistry()); _ = try tracker.identifyMissing(&msr); @@ -257,7 +275,7 @@ test "1 registered shred is identified" { var msr = MultiSlotReport.init(allocator); defer msr.deinit(); - var tracker = BasicShredTracker.init(13579, .noop); + var tracker = try BasicShredTracker.init(13579, .noop, sig.prometheus.globalRegistry()); try tracker.registerShred(13579, 123); std.time.sleep(210 * std.time.ns_per_ms); diff --git a/src/shred_collector/shred_verifier.zig b/src/shred_collector/shred_verifier.zig index 0fbeeb00d..80e8bb5e4 100644 --- a/src/shred_collector/shred_verifier.zig +++ b/src/shred_collector/shred_verifier.zig @@ -7,38 +7,71 @@ const shred_layout = sig.ledger.shred.layout; const Atomic = std.atomic.Value; const Channel = sig.sync.Channel; -const SlotLeaderProvider = sig.core.leader_schedule.SlotLeaderProvider; +const Counter = sig.prometheus.Counter; +const Histogram = sig.prometheus.Histogram; const Packet = sig.net.Packet; +const Registry = sig.prometheus.Registry; +const SlotLeaderProvider = sig.core.leader_schedule.SlotLeaderProvider; +const VariantCounter = sig.prometheus.VariantCounter; /// Analogous to [run_shred_sigverify](https://github.com/anza-xyz/agave/blob/8c5a33a81a0504fd25d0465bed35d153ff84819f/turbine/src/sigverify_shreds.rs#L82) pub fn runShredVerifier( exit: *Atomic(bool), + registry: *Registry(.{}), /// shred receiver --> me unverified_shred_receiver: *Channel(Packet), /// me --> shred processor verified_shred_sender: *Channel(Packet), leader_schedule: SlotLeaderProvider, ) !void { + const metrics = try registry.initStruct(Metrics); while (!exit.load(.acquire) or unverified_shred_receiver.len() != 0) { + var count: usize = 0; while (unverified_shred_receiver.receive()) |packet| { - // TODO parallelize this once it's actually verifying signatures - if (verifyShred(&packet, leader_schedule)) { + count += 1; + metrics.received_count.inc(); + if (verifyShred(&packet, leader_schedule)) |_| { + metrics.verified_count.inc(); try verified_shred_sender.send(packet); + } else |err| { + metrics.fail.observe(err); } } + metrics.batch_size.observe(count); } } -/// verify_shred_cpu -fn verifyShred(packet: *const Packet, leader_schedule: SlotLeaderProvider) bool { - const shred = shred_layout.getShred(packet) orelse return false; - const slot = shred_layout.getSlot(shred) orelse return false; - const signature = shred_layout.getSignature(shred) orelse return false; - const signed_data = shred_layout.getSignedData(shred) orelse return false; - - const leader = leader_schedule.call(slot) orelse return false; +/// Analogous to [verify_shred_cpu](https://github.com/anza-xyz/agave/blob/83e7d84bcc4cf438905d07279bc07e012a49afd9/ledger/src/sigverify_shreds.rs#L35) +fn verifyShred( + packet: *const Packet, + leader_schedule: SlotLeaderProvider, +) ShredVerificationFailure!void { + const shred = shred_layout.getShred(packet) orelse return error.insufficient_shred_size; + const slot = shred_layout.getSlot(shred) orelse return error.slot_missing; + const signature = shred_layout.getSignature(shred) orelse return error.signature_missing; + const signed_data = shred_layout.getSignedData(shred) orelse return error.signed_data_missing; + const leader = leader_schedule.call(slot) orelse return error.leader_unknown; - return signature.verify(leader, &signed_data.data); + _ = signature.verify(leader, &signed_data.data) or return error.failed_verification; } + +pub const ShredVerificationFailure = error{ + insufficient_shred_size, + slot_missing, + signature_missing, + signed_data_missing, + leader_unknown, + failed_verification, +}; + +const Metrics = struct { + received_count: *Counter, + verified_count: *Counter, + batch_size: *Histogram, + fail: *VariantCounter(ShredVerificationFailure), + + pub const prefix = "shred_verifier"; + pub const buckets = sig.prometheus.histogram.exponentialBuckets(2, -1, 8); +}; diff --git a/src/transaction_sender/service.zig b/src/transaction_sender/service.zig index bd0dab7c1..b5a64ec7f 100644 --- a/src/transaction_sender/service.zig +++ b/src/transaction_sender/service.zig @@ -349,19 +349,7 @@ pub const Stats = struct { rpc_signature_statuses_latency_millis: *Gauge(u64), pub fn init() GetMetricError!Stats { - var self: Stats = undefined; - const registry = globalRegistry(); - const stats_struct_info = @typeInfo(Stats).Struct; - inline for (stats_struct_info.fields) |field| { - if (field.name[0] != '_') { - @field(self, field.name) = switch (field.type) { - *Counter => try registry.getOrCreateCounter(field.name), - *Gauge(u64) => try registry.getOrCreateGauge(field.name, u64), - else => @compileError("Unhandled field type: " ++ field.name ++ ": " ++ @typeName(field.type)), - }; - } - } - return self; + return globalRegistry().initStruct(Stats); } pub fn log(self: *const Stats, logger: Logger) void { From 1fbf474700eb3a4462acbc4b2d3d88dce534dc0f Mon Sep 17 00:00:00 2001 From: Drew Nutter Date: Mon, 7 Oct 2024 09:33:49 -0400 Subject: [PATCH 02/14] refactor: remove unused imports --- src/ledger/reader.zig | 1 - src/ledger/writer.zig | 2 -- src/prometheus/variant_counter.zig | 2 -- 3 files changed, 5 deletions(-) diff --git a/src/ledger/reader.zig b/src/ledger/reader.zig index f0d5ca6d8..7e1b9662c 100644 --- a/src/ledger/reader.zig +++ b/src/ledger/reader.zig @@ -10,7 +10,6 @@ const AutoHashMap = std.AutoHashMap; // sig common const Counter = sig.prometheus.Counter; const Entry = sig.core.Entry; -const GetMetricError = sig.prometheus.GetMetricError; const Hash = sig.core.Hash; const Histogram = sig.prometheus.Histogram; const Logger = sig.trace.Logger; diff --git a/src/ledger/writer.zig b/src/ledger/writer.zig index 1990604fe..658c580d1 100644 --- a/src/ledger/writer.zig +++ b/src/ledger/writer.zig @@ -7,7 +7,6 @@ const Allocator = std.mem.Allocator; const ArrayList = std.ArrayList; // sig common -const GetMetricError = sig.prometheus.GetMetricError; const Hash = sig.core.Hash; const Histogram = sig.prometheus.Histogram; const Logger = sig.trace.Logger; @@ -15,7 +14,6 @@ const Pubkey = sig.core.Pubkey; const RwMux = sig.sync.RwMux; const Signature = sig.core.Signature; const Slot = sig.core.Slot; -const Registry = sig.prometheus.Registry; const Timer = sig.time.Timer; // ledger diff --git a/src/prometheus/variant_counter.zig b/src/prometheus/variant_counter.zig index f16b7f075..77d15d8cc 100644 --- a/src/prometheus/variant_counter.zig +++ b/src/prometheus/variant_counter.zig @@ -3,10 +3,8 @@ const prometheus = @import("lib.zig"); const Atomic = std.atomic.Value; -const Counter = prometheus.Counter; const Metric = prometheus.metric.Metric; const MetricType = prometheus.metric.MetricType; -const Registry = prometheus.Registry; /// Separately count the occurrence of each variant within an enum or error set. /// From 22c2feb4839758c834698f298f8b7b0d4a506f35 Mon Sep 17 00:00:00 2001 From: Drew Nutter Date: Mon, 7 Oct 2024 10:37:23 -0400 Subject: [PATCH 03/14] feat(prometheus): use labels for variants in variant counter, instead of separate metrics --- src/prometheus/metric.zig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/prometheus/metric.zig b/src/prometheus/metric.zig index 01f74590d..e7a2ad8f8 100644 --- a/src/prometheus/metric.zig +++ b/src/prometheus/metric.zig @@ -46,7 +46,7 @@ pub const Metric = struct { .variant_counter => |counts| { for (counts.counts, counts.names) |counter, label| { try writer.print( - "{s}_{s}_count {d}\n", + "{s}_count{{variant=\"{s}\"}} {d}\n", .{ name, label, counter.load(.monotonic) }, ); } From 9ce0baeb60dd60fcce68e874148a5eabea1669d8 Mon Sep 17 00:00:00 2001 From: Drew Nutter Date: Tue, 8 Oct 2024 22:02:09 -0400 Subject: [PATCH 04/14] refactor(prometheus): remove unused VariantIndex.get --- src/prometheus/variant_counter.zig | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/prometheus/variant_counter.zig b/src/prometheus/variant_counter.zig index 77d15d8cc..c0e768641 100644 --- a/src/prometheus/variant_counter.zig +++ b/src/prometheus/variant_counter.zig @@ -105,10 +105,6 @@ const VariantIndexer = struct { return self.int_to_index[toInt(err)]; } - pub fn get(self: Self, index_: usize) self.EnumOrError { - return self.fromInt(self.index_to_int[index_]); - } - pub fn names(self: Self) [self.len][]const u8 { var name_array: [self.len][]const u8 = undefined; for (0..self.len) |i| { From b373a1d9f44f8758d904170ae4ce6d70f8765ba3 Mon Sep 17 00:00:00 2001 From: Drew Nutter Date: Tue, 8 Oct 2024 22:06:21 -0400 Subject: [PATCH 05/14] refactor(prometheus): use standard import style --- src/prometheus/registry.zig | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/prometheus/registry.zig b/src/prometheus/registry.zig index dd718b53a..eadcf5294 100644 --- a/src/prometheus/registry.zig +++ b/src/prometheus/registry.zig @@ -1,23 +1,25 @@ const std = @import("std"); const sig = @import("../sig.zig"); +const prometheus = @import("lib.zig"); + const fmt = std.fmt; const hash_map = std.hash_map; const heap = std.heap; const mem = std.mem; const testing = std.testing; -const OnceCell = @import("../sync/once_cell.zig").OnceCell; +const OnceCell = sig.sync.OnceCell; +const ReturnType = sig.utils.types.ReturnType; -const Metric = @import("metric.zig").Metric; -const Counter = @import("counter.zig").Counter; -const VariantCounter = @import("variant_counter.zig").VariantCounter; -const Gauge = @import("gauge.zig").Gauge; -const GaugeFn = @import("gauge_fn.zig").GaugeFn; -const GaugeCallFnType = @import("gauge_fn.zig").GaugeCallFnType; -const Histogram = @import("histogram.zig").Histogram; -const DEFAULT_BUCKETS = @import("histogram.zig").DEFAULT_BUCKETS; +const Metric = prometheus.metric.Metric; +const Counter = prometheus.counter.Counter; +const VariantCounter = prometheus.variant_counter.VariantCounter; +const Gauge = prometheus.gauge.Gauge; +const GaugeFn = prometheus.gauge_fn.GaugeFn; +const GaugeCallFnType = prometheus.gauge_fn.GaugeCallFnType; +const Histogram = prometheus.histogram.Histogram; -const ReturnType = sig.utils.types.ReturnType; +const DEFAULT_BUCKETS = prometheus.histogram.DEFAULT_BUCKETS; pub const GetMetricError = error{ /// Returned when trying to add a metric to an already full registry. From f5c2e3ce1717cb55b7e69671b6fa6816297bdf7d Mon Sep 17 00:00:00 2001 From: Drew Nutter Date: Tue, 8 Oct 2024 22:10:04 -0400 Subject: [PATCH 06/14] refactor(geyser): use metrics prefix for reader and writer --- src/geyser/core.zig | 44 ++++++++++++++++++++++++-------------------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/src/geyser/core.zig b/src/geyser/core.zig index c67cd09bd..440873a6a 100644 --- a/src/geyser/core.zig +++ b/src/geyser/core.zig @@ -71,10 +71,12 @@ pub const AccountPayloadV1 = struct { }; pub const GeyserWriterStats = struct { - geyser_writer_recycle_fba_empty_loop_count: *Counter, - geyser_writer_pipe_full_count: *Counter, - geyser_writer_n_payloads: *Counter, - geyser_writer_total_bytes: *Counter, + recycle_fba_empty_loop_count: *Counter, + pipe_full_count: *Counter, + n_payloads: *Counter, + total_bytes: *Counter, + + pub const prefix = "geyser_writer"; pub fn init() !GeyserWriterStats { return try globalRegistry().initStruct(GeyserWriterStats); @@ -157,7 +159,7 @@ pub const GeyserWriter = struct { return err; } }; - self.stats.geyser_writer_n_payloads.inc(); + self.stats.n_payloads.inc(); self.io_allocator.free(payload); } } @@ -190,7 +192,7 @@ pub const GeyserWriter = struct { const buf = blk: while (true) { const buf = self.io_allocator.alloc(u8, total_len) catch { // no memory available rn - unlock and wait - self.stats.geyser_writer_recycle_fba_empty_loop_count.inc(); + self.stats.recycle_fba_empty_loop_count.inc(); std.time.sleep(std.time.ns_per_ms); if (self.exit.load(.acquire)) { return error.MemoryBlockedWithExitSignaled; @@ -224,7 +226,7 @@ pub const GeyserWriter = struct { return WritePipeError.PipeBlockedWithExitSignaled; } else { // pipe is full but we dont need to exit, so we try again - self.stats.geyser_writer_pipe_full_count.inc(); + self.stats.pipe_full_count.inc(); continue; } } else { @@ -236,7 +238,7 @@ pub const GeyserWriter = struct { return WritePipeError.PipeClosed; } n_bytes_written_total += n_bytes_written; - self.stats.geyser_writer_total_bytes.add(n_bytes_written); + self.stats.total_bytes.add(n_bytes_written); } std.debug.assert(n_bytes_written_total == buf.len); @@ -245,14 +247,16 @@ pub const GeyserWriter = struct { }; pub const GeyserReaderStats = struct { - geyser_reader_io_buf_size: *GaugeU64, - geyser_reader_bincode_buf_size: *GaugeU64, - geyser_reader_pipe_empty_count: *Counter, - geyser_reader_total_payloads: *Counter, - geyser_reader_total_bytes: *Counter, + io_buf_size: *GaugeU64, + bincode_buf_size: *GaugeU64, + pipe_empty_count: *Counter, + total_payloads: *Counter, + total_bytes: *Counter, const GaugeU64 = Gauge(u64); + pub const prefix = "geyser_reader"; + pub fn init() !GeyserReaderStats { return try globalRegistry().initStruct(GeyserReaderStats); } @@ -297,8 +301,8 @@ pub const GeyserReader = struct { const fba = std.heap.FixedBufferAllocator.init(bincode_buf); const stats = try GeyserReaderStats.init(); - stats.geyser_reader_io_buf_size.set(allocator_config.io_buf_len); - stats.geyser_reader_bincode_buf_size.set(allocator_config.bincode_buf_len); + stats.io_buf_size.set(allocator_config.io_buf_len); + stats.bincode_buf_size.set(allocator_config.bincode_buf_len); return .{ .file = file, @@ -330,7 +334,7 @@ pub const GeyserReader = struct { pub fn readPayload(self: *Self) !struct { u64, VersionedAccountPayload } { const len = try self.readType(u64, 8); const versioned_payload = try self.readType(VersionedAccountPayload, len); - self.stats.geyser_reader_total_payloads.inc(); + self.stats.total_payloads.inc(); return .{ 8 + len, versioned_payload }; } @@ -342,7 +346,7 @@ pub const GeyserReader = struct { const new_buf = try self.allocator.alloc(u8, expected_n_bytes); self.allocator.free(self.io_buf); self.io_buf = new_buf; - self.stats.geyser_reader_io_buf_size.set(expected_n_bytes); + self.stats.io_buf_size.set(expected_n_bytes); } var total_bytes_read: u64 = 0; @@ -353,7 +357,7 @@ pub const GeyserReader = struct { return error.PipeBlockedWithExitSignaled; } else { // pipe is empty but we dont need to exit, so we try again - self.stats.geyser_reader_pipe_empty_count.inc(); + self.stats.pipe_empty_count.inc(); continue; } } else { @@ -366,7 +370,7 @@ pub const GeyserReader = struct { } total_bytes_read += n_bytes_read; } - self.stats.geyser_reader_total_bytes.add(expected_n_bytes); + self.stats.total_bytes.add(expected_n_bytes); while (true) { const data = bincode.readFromSlice( @@ -383,7 +387,7 @@ pub const GeyserReader = struct { self.bincode_buf = new_buf; self.bincode_allocator = std.heap.FixedBufferAllocator.init(self.bincode_buf); - self.stats.geyser_reader_bincode_buf_size.set(new_size); + self.stats.bincode_buf_size.set(new_size); continue; } else { return err; From a0d785d8f28fd092c0e56e1d187f9c22c4eaeb28 Mon Sep 17 00:00:00 2001 From: Drew Nutter Date: Wed, 9 Oct 2024 15:04:15 -0400 Subject: [PATCH 07/14] docs(prometheus): fix typos/inaccuracies in VariantIndexer docs --- src/prometheus/variant_counter.zig | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/prometheus/variant_counter.zig b/src/prometheus/variant_counter.zig index c0e768641..01883484a 100644 --- a/src/prometheus/variant_counter.zig +++ b/src/prometheus/variant_counter.zig @@ -47,11 +47,10 @@ pub const VariantCounts = struct { /// it will assign the numbers 0-9 to each of the variants. /// /// Enums usually already have numbers with this property, but it's -/// not guaranteed. This is particularly useful for variants, whose +/// not guaranteed. This is particularly useful for errors, whose /// integers are likely not continuous. /// -/// You can provide an variant to determine its index, or provide an -/// index to get the respective variant. +/// Call `index` to determine the index for a variant. /// /// This struct must be initialized at comptime, but it can be used /// at any time. From 38c9d9c6c17beea7b56a12925f7552845c5a41d1 Mon Sep 17 00:00:00 2001 From: Drew Nutter Date: Wed, 9 Oct 2024 23:33:55 -0400 Subject: [PATCH 08/14] feat(prometheus): more descriptive histogram bucket configuration and error messages --- src/accountsdb/db.zig | 2 +- src/gossip/service.zig | 2 +- src/ledger/reader.zig | 2 +- src/ledger/writer.zig | 2 +- src/prometheus/registry.zig | 55 +++++++++++++++++++++---- src/shred_collector/shred_processor.zig | 2 +- src/shred_collector/shred_receiver.zig | 2 +- src/shred_collector/shred_verifier.zig | 2 +- 8 files changed, 55 insertions(+), 14 deletions(-) diff --git a/src/accountsdb/db.zig b/src/accountsdb/db.zig index c9e5e04a5..cc9c4b869 100644 --- a/src/accountsdb/db.zig +++ b/src/accountsdb/db.zig @@ -88,7 +88,7 @@ pub const AccountsDBStats = struct { return globalRegistry().initStruct(Self); } - pub fn buckets(comptime field_name: []const u8) []const f64 { + pub fn histogramBucketsForField(comptime field_name: []const u8) []const f64 { const HistogramKind = enum { flush_account_file_size, shrink_file_shrunk_by, diff --git a/src/gossip/service.zig b/src/gossip/service.zig index b8296f59f..8465a3db3 100644 --- a/src/gossip/service.zig +++ b/src/gossip/service.zig @@ -2032,7 +2032,7 @@ pub const GossipStats = struct { const Self = @This(); - pub const buckets: [10]f64 = .{ + pub const histogram_buckets: [10]f64 = .{ 10, 25, 50, 100, 250, 500, diff --git a/src/ledger/reader.zig b/src/ledger/reader.zig index 7e1b9662c..01ffab8e9 100644 --- a/src/ledger/reader.zig +++ b/src/ledger/reader.zig @@ -1454,7 +1454,7 @@ const BlockstoreReaderMetrics = struct { get_until_slot_us: *Histogram, pub const prefix = "blockstore_reader"; - pub const buckets = sig.prometheus.histogram.exponentialBuckets(5, -1, 10); + pub const histogram_buckets = sig.prometheus.histogram.exponentialBuckets(5, -1, 10); }; const BlockstoreRpcApiMetrics = struct { diff --git a/src/ledger/writer.zig b/src/ledger/writer.zig index 658c580d1..a42466a1d 100644 --- a/src/ledger/writer.zig +++ b/src/ledger/writer.zig @@ -309,7 +309,7 @@ pub const ScanAndFixRootsMetrics = struct { fix_roots_us: *Histogram, pub const prefix = "scan_and_fix_roots"; - pub const buckets = sig.prometheus.histogram.exponentialBuckets(5, -1, 10); + pub const histogram_buckets = sig.prometheus.histogram.exponentialBuckets(5, -1, 10); }; const TestDB = sig.ledger.tests.TestDB("writer"); diff --git a/src/prometheus/registry.zig b/src/prometheus/registry.zig index eadcf5294..1369e6b49 100644 --- a/src/prometheus/registry.zig +++ b/src/prometheus/registry.zig @@ -133,13 +133,54 @@ pub fn Registry(comptime options: RegistryOptions) type { .variant_counter => try self.getOrCreateVariantCounter(name, MetricType.Type), .gauge => try self.getOrCreateGauge(name, MetricType.Data), .gauge_fn => @compileError("GaugeFn does not support auto-init."), - .histogram => try self.getOrCreateHistogram( - name, - if (@typeInfo(@TypeOf(Config.buckets)) == .Fn) - Config.buckets(name) - else - &Config.buckets, - ), + .histogram => try self + .getOrCreateHistogram(name, histogramBuckets(Config, local_name)), + }; + } + + fn histogramBuckets( + comptime Config: type, + comptime local_histogram_name: []const u8, + ) []const f64 { + const has_fn = @hasDecl(Config, "histogramBucketsForField"); + const has_const = @hasDecl(Config, "histogram_buckets"); + if (has_const and has_fn) { + @compileError(@typeName(Config) ++ " has both histogramBucketsForField and" ++ + " histogram_buckets, but it should only have one."); + } else if (has_const) { + comptime if (!isSlicable(@TypeOf(Config.histogram_buckets), f64)) { + @compileError(@typeName(Config) ++ + ".histogram_buckets should be a slice or array of f64"); + }; + return Config.histogram_buckets[0..]; + } else if (has_fn) { + const info = @typeInfo(@TypeOf(Config.histogramBucketsForField)); + comptime if (info != .Fn or + info.Fn.params.len != 1 or + info.Fn.params[0].type != []const u8 or + !isSlicable(info.Fn.return_type.?, f64)) + { + @compileError(@typeName(Config) ++ + ".histogramBucketsForField should take one param `[]const u8` and " ++ + "return either a slice or array of f64"); + }; + return Config.histogramBucketsForField(local_histogram_name)[0..]; + } else { + @compileError(@typeName(Config) ++ " must provide the histogram buckets for " ++ + local_histogram_name ++ ", either with a const `histogram_buckets` " ++ + "that defines the buckets to use for all histograms in the struct, or with " ++ + "a function histogramBucketsForField that accepts the local histogram name " ++ + "as an input and returns the buckets for that histogram. In either case, " ++ + "the buckets should be provided as either a slice or array of f64."); + } + } + + fn isSlicable(comptime T: type, comptime DesiredChild: type) bool { + return switch (@typeInfo(T)) { + .Array => |a| a.child == DesiredChild, + .Pointer => |p| p.size != .One and p.child == DesiredChild or + p.size == .One and isSlicable(p.child, DesiredChild), + else => false, }; } diff --git a/src/shred_collector/shred_processor.zig b/src/shred_collector/shred_processor.zig index 38309298a..bef42df2a 100644 --- a/src/shred_collector/shred_processor.zig +++ b/src/shred_collector/shred_processor.zig @@ -135,5 +135,5 @@ const Metrics = struct { set_last_shred_error: *VariantCounter(SlotOutOfBounds), pub const prefix = "shred_processor"; - pub const buckets = sig.prometheus.histogram.exponentialBuckets(2, -1, 8); + pub const histogram_buckets = sig.prometheus.histogram.exponentialBuckets(2, -1, 8); }; diff --git a/src/shred_collector/shred_receiver.zig b/src/shred_collector/shred_receiver.zig index 3c4708683..dd2f9db27 100644 --- a/src/shred_collector/shred_receiver.zig +++ b/src/shred_collector/shred_receiver.zig @@ -222,7 +222,7 @@ pub const ShredReceiverMetrics = struct { discard: *VariantCounter(ShredValidationError), pub const prefix = "shred_receiver"; - pub const buckets = sig.prometheus.histogram.exponentialBuckets(2, -1, 8); + pub const histogram_buckets = sig.prometheus.histogram.exponentialBuckets(2, -1, 8); }; /// Something about the shred was unexpected, so we will discard it. diff --git a/src/shred_collector/shred_verifier.zig b/src/shred_collector/shred_verifier.zig index 80e8bb5e4..a1043cd2b 100644 --- a/src/shred_collector/shred_verifier.zig +++ b/src/shred_collector/shred_verifier.zig @@ -73,5 +73,5 @@ const Metrics = struct { fail: *VariantCounter(ShredVerificationFailure), pub const prefix = "shred_verifier"; - pub const buckets = sig.prometheus.histogram.exponentialBuckets(2, -1, 8); + pub const histogram_buckets = sig.prometheus.histogram.exponentialBuckets(2, -1, 8); }; From b3e96e55a69bef8f930b77d36a15786d5c0ec405 Mon Sep 17 00:00:00 2001 From: Drew Nutter Date: Wed, 9 Oct 2024 23:49:57 -0400 Subject: [PATCH 09/14] refactor(shred-collector): rename count variable to packet_count --- src/shred_collector/shred_verifier.zig | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/shred_collector/shred_verifier.zig b/src/shred_collector/shred_verifier.zig index a1043cd2b..033336048 100644 --- a/src/shred_collector/shred_verifier.zig +++ b/src/shred_collector/shred_verifier.zig @@ -28,9 +28,9 @@ pub fn runShredVerifier( while (!exit.load(.acquire) or unverified_shred_receiver.len() != 0) { - var count: usize = 0; + var packet_count: usize = 0; while (unverified_shred_receiver.receive()) |packet| { - count += 1; + packet_count += 1; metrics.received_count.inc(); if (verifyShred(&packet, leader_schedule)) |_| { metrics.verified_count.inc(); @@ -39,7 +39,7 @@ pub fn runShredVerifier( metrics.fail.observe(err); } } - metrics.batch_size.observe(count); + metrics.batch_size.observe(packet_count); } } From e6312ca0a63ec918fb67d3a891fcaab44e9464be Mon Sep 17 00:00:00 2001 From: Drew Nutter Date: Wed, 9 Oct 2024 23:55:20 -0400 Subject: [PATCH 10/14] refactor(shred-collector): more descriptive variable names in shred receiver --- src/shred_collector/shred_receiver.zig | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/shred_collector/shred_receiver.zig b/src/shred_collector/shred_receiver.zig index dd2f9db27..f37c8b353 100644 --- a/src/shred_collector/shred_receiver.zig +++ b/src/shred_collector/shred_receiver.zig @@ -94,15 +94,15 @@ pub const ShredReceiver = struct { ) !void { while (!self.exit.load(.acquire)) { for (receivers) |receiver| { - var count: usize = 0; + var packet_count: usize = 0; while (receiver.receive()) |packet| { - count += 1; + packet_count += 1; try self.handlePacket(packet, response_sender, is_repair); } if (is_repair) { - self.metrics.repair_batch_size.observe(count); + self.metrics.repair_batch_size.observe(packet_count); } else { - self.metrics.turbine_batch_size.observe(count); + self.metrics.turbine_batch_size.observe(packet_count); } } } @@ -116,8 +116,8 @@ pub const ShredReceiver = struct { comptime is_repair: bool, ) !void { if (packet.size == REPAIR_RESPONSE_SERIALIZED_PING_BYTES) { - if (try self.handlePing(&packet)) |p| { - try response_sender.send(p); + if (try self.handlePing(&packet)) |pong_packet| { + try response_sender.send(pong_packet); self.metrics.pong_sent_count.inc(); } } else { From ca6e0de009a42df00787291b21ee1654b46514d0 Mon Sep 17 00:00:00 2001 From: Drew Nutter Date: Wed, 9 Oct 2024 23:56:34 -0400 Subject: [PATCH 11/14] refactor(shred-collector): remove todo about metrics --- src/shred_collector/shred_processor.zig | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/shred_collector/shred_processor.zig b/src/shred_collector/shred_processor.zig index bef42df2a..cf79e961e 100644 --- a/src/shred_collector/shred_processor.zig +++ b/src/shred_collector/shred_processor.zig @@ -20,8 +20,6 @@ const ShredInserter = sig.ledger.ShredInserter; const SlotOutOfBounds = shred_collector.shred_tracker.SlotOutOfBounds; const VariantCounter = sig.prometheus.VariantCounter; -// TODO: add metrics (e.g. total count of shreds processed) - /// Analogous to [WindowService](https://github.com/anza-xyz/agave/blob/aa2f078836434965e1a5a03af7f95c6640fe6e1e/core/src/window_service.rs#L395) pub fn runShredProcessor( allocator: Allocator, From 90689ef28bc892cbea8d903212b7d3633e008e77 Mon Sep 17 00:00:00 2001 From: Drew Nutter Date: Wed, 9 Oct 2024 23:59:03 -0400 Subject: [PATCH 12/14] refactor: rename Stats structs to Metrics --- src/accountsdb/db.zig | 8 ++++---- src/accountsdb/snapshots.zig | 14 +++++++------- src/geyser/core.zig | 20 ++++++++++---------- src/gossip/service.zig | 8 ++++---- src/transaction_sender/service.zig | 12 ++++++------ 5 files changed, 31 insertions(+), 31 deletions(-) diff --git a/src/accountsdb/db.zig b/src/accountsdb/db.zig index cc9c4b869..f0610a6e2 100644 --- a/src/accountsdb/db.zig +++ b/src/accountsdb/db.zig @@ -41,7 +41,7 @@ const ClientVersion = sig.version.ClientVersion; const StatusCache = sig.accounts_db.StatusCache; const BankFields = sig.accounts_db.snapshots.BankFields; const BankHashInfo = sig.accounts_db.snapshots.BankHashInfo; -const BankHashStats = sig.accounts_db.snapshots.BankHashStats; +const BankHashStats = sig.accounts_db.snapshots.BankHashMetrics; const PubkeyBinCalculator = sig.accounts_db.index.PubkeyBinCalculator; const GeyserWriter = sig.geyser.GeyserWriter; @@ -58,7 +58,7 @@ pub const ACCOUNT_INDEX_BINS: usize = 8192; pub const ACCOUNT_FILE_SHRINK_THRESHOLD = 70; // shrink account files with more than X% dead bytes pub const DELETE_ACCOUNT_FILES_MIN = 100; -pub const AccountsDBStats = struct { +pub const AccountsDBMetrics = struct { number_files_flushed: *Counter, number_files_cleaned: *Counter, number_files_shrunk: *Counter, @@ -173,7 +173,7 @@ pub const AccountsDB = struct { // TODO: move to Bank struct bank_hash_stats: RwMux(BankHashStatsMap) = RwMux(BankHashStatsMap).init(.{}), - stats: AccountsDBStats, + stats: AccountsDBMetrics, logger: Logger, config: InitConfig, @@ -236,7 +236,7 @@ pub const AccountsDB = struct { ); errdefer account_index.deinit(true); - const stats = try AccountsDBStats.init(); + const stats = try AccountsDBMetrics.init(); return .{ .allocator = allocator, diff --git a/src/accountsdb/snapshots.zig b/src/accountsdb/snapshots.zig index f805cc4e6..c05f855a1 100644 --- a/src/accountsdb/snapshots.zig +++ b/src/accountsdb/snapshots.zig @@ -981,26 +981,26 @@ pub const AccountFileInfo = struct { pub const BankHashInfo = struct { accounts_delta_hash: Hash, accounts_hash: Hash, - stats: BankHashStats, + stats: BankHashMetrics, pub fn random(rand: std.Random) BankHashInfo { return .{ .accounts_delta_hash = Hash.random(rand), .accounts_hash = Hash.random(rand), - .stats = BankHashStats.random(rand), + .stats = BankHashMetrics.random(rand), }; } }; /// Analogous to [BankHashStats](https://github.com/anza-xyz/agave/blob/4c921ca276bbd5997f809dec1dd3937fb06463cc/accounts-db/src/accounts_db.rs#L1299) -pub const BankHashStats = struct { +pub const BankHashMetrics = struct { num_updated_accounts: u64, num_removed_accounts: u64, num_lamports_stored: u64, total_data_len: u64, num_executable_accounts: u64, - pub const zero_init: BankHashStats = .{ + pub const zero_init: BankHashMetrics = .{ .num_updated_accounts = 0, .num_removed_accounts = 0, .num_lamports_stored = 0, @@ -1008,7 +1008,7 @@ pub const BankHashStats = struct { .num_executable_accounts = 0, }; - pub fn random(rand: std.Random) BankHashStats { + pub fn random(rand: std.Random) BankHashMetrics { return .{ .num_updated_accounts = rand.int(u64), .num_removed_accounts = rand.int(u64), @@ -1023,7 +1023,7 @@ pub const BankHashStats = struct { data_len: u64, executable: bool, }; - pub fn update(stats: *BankHashStats, account: AccountData) void { + pub fn update(stats: *BankHashMetrics, account: AccountData) void { if (account.lamports == 0) { stats.num_removed_accounts += 1; } else { @@ -1034,7 +1034,7 @@ pub const BankHashStats = struct { stats.num_lamports_stored +%= account.lamports; } - pub fn accumulate(stats: *BankHashStats, other: BankHashStats) void { + pub fn accumulate(stats: *BankHashMetrics, other: BankHashMetrics) void { stats.num_updated_accounts += other.num_updated_accounts; stats.num_removed_accounts += other.num_removed_accounts; stats.total_data_len +%= other.total_data_len; diff --git a/src/geyser/core.zig b/src/geyser/core.zig index 440873a6a..2190bfb14 100644 --- a/src/geyser/core.zig +++ b/src/geyser/core.zig @@ -70,7 +70,7 @@ pub const AccountPayloadV1 = struct { } }; -pub const GeyserWriterStats = struct { +pub const GeyserWriterMetrics = struct { recycle_fba_empty_loop_count: *Counter, pipe_full_count: *Counter, n_payloads: *Counter, @@ -78,8 +78,8 @@ pub const GeyserWriterStats = struct { pub const prefix = "geyser_writer"; - pub fn init() !GeyserWriterStats { - return try globalRegistry().initStruct(GeyserWriterStats); + pub fn init() !GeyserWriterMetrics { + return try globalRegistry().initStruct(GeyserWriterMetrics); } }; @@ -95,7 +95,7 @@ pub const GeyserWriter = struct { /// channel which data is streamed into and then written to the pipe io_channel: *sig.sync.Channel([]u8), exit: *std.atomic.Value(bool), - stats: GeyserWriterStats, + stats: GeyserWriterMetrics, /// set when the writer thread is running io_handle: ?std.Thread = null, @@ -119,7 +119,7 @@ pub const GeyserWriter = struct { const io_channel = try sig.sync.Channel([]u8).create(allocator); const io_allocator_state = try allocator.create(RecycleFBA(.{})); io_allocator_state.* = try RecycleFBA(.{}).init(allocator, io_fba_bytes); - const stats = try GeyserWriterStats.init(); + const stats = try GeyserWriterMetrics.init(); return .{ .allocator = allocator, @@ -246,7 +246,7 @@ pub const GeyserWriter = struct { } }; -pub const GeyserReaderStats = struct { +pub const GeyserReaderMetrics = struct { io_buf_size: *GaugeU64, bincode_buf_size: *GaugeU64, pipe_empty_count: *Counter, @@ -257,8 +257,8 @@ pub const GeyserReaderStats = struct { pub const prefix = "geyser_reader"; - pub fn init() !GeyserReaderStats { - return try globalRegistry().initStruct(GeyserReaderStats); + pub fn init() !GeyserReaderMetrics { + return try globalRegistry().initStruct(GeyserReaderMetrics); } }; @@ -271,7 +271,7 @@ pub const GeyserReader = struct { bincode_buf: []u8, /// NOTE: not thread-safe bincode_allocator: std.heap.FixedBufferAllocator, - stats: GeyserReaderStats, + stats: GeyserReaderMetrics, exit: ?*std.atomic.Value(bool), const Self = @This(); @@ -300,7 +300,7 @@ pub const GeyserReader = struct { const fba = std.heap.FixedBufferAllocator.init(bincode_buf); - const stats = try GeyserReaderStats.init(); + const stats = try GeyserReaderMetrics.init(); stats.io_buf_size.set(allocator_config.io_buf_len); stats.bincode_buf_size.set(allocator_config.bincode_buf_len); diff --git a/src/gossip/service.zig b/src/gossip/service.zig index 8465a3db3..0f4704bdd 100644 --- a/src/gossip/service.zig +++ b/src/gossip/service.zig @@ -167,7 +167,7 @@ pub const GossipService = struct { thread_pool: *ThreadPool, echo_server: EchoServer, - stats: GossipStats, + stats: GossipMetrics, const Self = @This(); @@ -226,7 +226,7 @@ pub const GossipService = struct { for (eps) |ep| entrypoint_list.appendAssumeCapacity(.{ .addr = ep }); } - const stats = try GossipStats.init(logger); + const stats = try GossipMetrics.init(logger); const ping_cache_ptr = try allocator.create(PingCache); ping_cache_ptr.* = try PingCache.init( @@ -1942,7 +1942,7 @@ pub const GossipService = struct { }; /// stats that we publish to prometheus -pub const GossipStats = struct { +pub const GossipMetrics = struct { gossip_packets_received_total: *Counter, gossip_packets_verified_total: *Counter, gossip_packets_processed_total: *Counter, @@ -2049,7 +2049,7 @@ pub const GossipStats = struct { } pub fn reset(self: *Self) void { - inline for (@typeInfo(GossipStats).Struct.fields) |field| { + inline for (@typeInfo(GossipMetrics).Struct.fields) |field| { if (field.name[0] != '_') { @field(self, field.name).reset(); } diff --git a/src/transaction_sender/service.zig b/src/transaction_sender/service.zig index b5a64ec7f..622b28d7c 100644 --- a/src/transaction_sender/service.zig +++ b/src/transaction_sender/service.zig @@ -41,7 +41,7 @@ const globalRegistry = sig.prometheus.globalRegistry; pub const Service = struct { allocator: std.mem.Allocator, config: Config, - stats: Stats, + stats: Metrics, transaction_pool: TransactionPool, leader_info_rw: RwMux(LeaderInfo), send_socket: UdpSocket, @@ -61,7 +61,7 @@ pub const Service = struct { return .{ .allocator = allocator, .config = config, - .stats = try Stats.init(), + .stats = try Metrics.init(), .transaction_pool = TransactionPool.init( allocator, config.pool_max_size, @@ -330,7 +330,7 @@ pub const Config = struct { rpc_retries: usize = 3, }; -pub const Stats = struct { +pub const Metrics = struct { transactions_pending: *Gauge(u64), transactions_received_count: *Counter, transactions_retry_count: *Counter, @@ -348,11 +348,11 @@ pub const Stats = struct { rpc_block_height_latency_millis: *Gauge(u64), rpc_signature_statuses_latency_millis: *Gauge(u64), - pub fn init() GetMetricError!Stats { - return globalRegistry().initStruct(Stats); + pub fn init() GetMetricError!Metrics { + return globalRegistry().initStruct(Metrics); } - pub fn log(self: *const Stats, logger: Logger) void { + pub fn log(self: *const Metrics, logger: Logger) void { logger.info().logf("transaction-sender: {} received, {} pending, {} rooted, {} failed, {} expired, {} exceeded_retries", .{ self.transactions_received_count.get(), self.transactions_pending.get(), From 9b17bfb177eb1cd3e24ae8e42269091e8eb0f601 Mon Sep 17 00:00:00 2001 From: Drew Nutter Date: Thu, 10 Oct 2024 09:41:58 -0400 Subject: [PATCH 13/14] docs(contributing): add metrics naming to style guide --- docs/CONTRIBUTING.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/docs/CONTRIBUTING.md b/docs/CONTRIBUTING.md index 9588f7580..41d2be907 100644 --- a/docs/CONTRIBUTING.md +++ b/docs/CONTRIBUTING.md @@ -100,3 +100,16 @@ fn do_something(maybe_foo: ?Foo) void { ### Linting - run `zig fmt src/` in the top-level directory to run the zig linter + + +### Metrics + +It's common for a single context to deal with multiple prometheus metrics. In that case, it may be useful to group them together in their own struct. Any metrics structs following this pattern must end their names with the word `Metrics`. If practical, `Registry.initStruct` or `Registry.initFields` is recommended for the struct's initialization. + +The name of the struct should be prefixed with a name describing the scope where it is used, as in `GossipMetrics`. The prefix may be omitted if the context is already obvious, and the following are true about the metrics struct: +- It is a private const. +- It is the only metrics struct defined in that context. +- It is expected to contain all metrics that would ever be used in any context where it the struct is accessible. +- The context's name would be redundantly expressed in its name. For example, shred_processor.zig encloses a metrics struct, for which the only sensible names would be ShredProcessorMetrics or Metrics. In this case, the context would be redundantly expressed, so it may be omitted. + +If the Metrics struct is simple, and only used within one struct, it can be defined within that struct, otherwise it can be defined directly underneath the struct. From 0c561b44328d999705a0237be957edbddfbc7c6b Mon Sep 17 00:00:00 2001 From: x19 <100000306+0xNineteen@users.noreply.github.com> Date: Thu, 10 Oct 2024 10:13:00 -0400 Subject: [PATCH 14/14] fix: naming --- src/accountsdb/db.zig | 169 ++++++++++++++--------------- src/accountsdb/snapshots.zig | 14 +-- src/geyser/core.zig | 34 +++--- src/gossip/service.zig | 112 +++++++++---------- src/transaction_sender/service.zig | 34 +++--- 5 files changed, 181 insertions(+), 182 deletions(-) diff --git a/src/accountsdb/db.zig b/src/accountsdb/db.zig index f0610a6e2..b98f190d7 100644 --- a/src/accountsdb/db.zig +++ b/src/accountsdb/db.zig @@ -41,7 +41,7 @@ const ClientVersion = sig.version.ClientVersion; const StatusCache = sig.accounts_db.StatusCache; const BankFields = sig.accounts_db.snapshots.BankFields; const BankHashInfo = sig.accounts_db.snapshots.BankHashInfo; -const BankHashStats = sig.accounts_db.snapshots.BankHashMetrics; +const BankHashStats = sig.accounts_db.snapshots.BankHashStats; const PubkeyBinCalculator = sig.accounts_db.index.PubkeyBinCalculator; const GeyserWriter = sig.geyser.GeyserWriter; @@ -58,68 +58,6 @@ pub const ACCOUNT_INDEX_BINS: usize = 8192; pub const ACCOUNT_FILE_SHRINK_THRESHOLD = 70; // shrink account files with more than X% dead bytes pub const DELETE_ACCOUNT_FILES_MIN = 100; -pub const AccountsDBMetrics = struct { - number_files_flushed: *Counter, - number_files_cleaned: *Counter, - number_files_shrunk: *Counter, - number_files_deleted: *Counter, - - time_flush: *Histogram, - time_clean: *Histogram, - time_shrink: *Histogram, - time_purge: *Histogram, - - flush_account_file_size: *Histogram, - flush_accounts_written: *Counter, - - clean_references_deleted: *Gauge(u64), - clean_files_queued_deletion: *Gauge(u64), - clean_files_queued_shrink: *Gauge(u64), - clean_slot_old_state: *Gauge(u64), - clean_slot_zero_lamports: *Gauge(u64), - - shrink_file_shrunk_by: *Histogram, - shrink_alive_accounts: *Histogram, - shrink_dead_accounts: *Histogram, - - const Self = @This(); - - pub fn init() GetMetricError!Self { - return globalRegistry().initStruct(Self); - } - - pub fn histogramBucketsForField(comptime field_name: []const u8) []const f64 { - const HistogramKind = enum { - flush_account_file_size, - shrink_file_shrunk_by, - shrink_alive_accounts, - shrink_dead_accounts, - time_flush, - time_clean, - time_shrink, - time_purge, - }; - - const account_size_buckets = &.{ - // 10 bytes -> 10MB (solana max account size) - 10, 100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000, - }; - const account_count_buckets = &.{ 1, 5, 10, 50, 100, 500, 1_000, 10_000 }; - const nanosecond_buckets = &.{ - // 0.01ms -> 10ms - 1_000, 10_000, 100_000, 1_000_000, 10_000_000, - // 50ms -> 1000ms - 50_000_000, 100_000_000, 200_000_000, 400_000_000, 1_000_000_000, - }; - - return switch (@field(HistogramKind, field_name)) { - .flush_account_file_size, .shrink_file_shrunk_by => account_size_buckets, - .shrink_alive_accounts, .shrink_dead_accounts => account_count_buckets, - .time_flush, .time_clean, .time_shrink, .time_purge => nanosecond_buckets, - }; - } -}; - /// database for accounts /// /// Analogous to [AccountsDb](https://github.com/anza-xyz/agave/blob/4c921ca276bbd5997f809dec1dd3937fb06463cc/accounts-db/src/accounts_db.rs#L1363) @@ -173,7 +111,7 @@ pub const AccountsDB = struct { // TODO: move to Bank struct bank_hash_stats: RwMux(BankHashStatsMap) = RwMux(BankHashStatsMap).init(.{}), - stats: AccountsDBMetrics, + metrics: AccountsDBMetrics, logger: Logger, config: InitConfig, @@ -236,8 +174,7 @@ pub const AccountsDB = struct { ); errdefer account_index.deinit(true); - const stats = try AccountsDBMetrics.init(); - + const metrics = try AccountsDBMetrics.init(); return .{ .allocator = allocator, .disk_allocator_ptr = maybe_disk_allocator_ptr, @@ -247,7 +184,7 @@ pub const AccountsDB = struct { .account_cache = RwMux(AccountCache).init(AccountCache.init(allocator)), .snapshot_dir = snapshot_dir, .dead_accounts_counter = RwMux(DeadAccountsCounter).init(DeadAccountsCounter.init(allocator)), - .stats = stats, + .metrics = metrics, .geyser_writer = geyser_writer, }; } @@ -1406,7 +1343,7 @@ pub const AccountsDB = struct { pub fn flushSlot(self: *Self, slot: Slot) !FileId { var timer = try sig.time.Timer.start(); - defer self.stats.number_files_flushed.inc(); + defer self.metrics.number_files_flushed.inc(); const pubkeys, const accounts: []const Account = blk: { // NOTE: flush should be the only function to delete/free cache slices of a flushed slot @@ -1424,7 +1361,7 @@ pub const AccountsDB = struct { for (accounts) |*account| { const account_size_in_file = account.getSizeInFile(); size += account_size_in_file; - self.stats.flush_account_file_size.observe(account_size_in_file); + self.metrics.flush_account_file_size.observe(account_size_in_file); } const file, const file_id, const memory = try self.createAccountFile(size, slot); @@ -1452,7 +1389,7 @@ pub const AccountsDB = struct { try file_map.putNoClobber(self.allocator, file_id, account_file); } - self.stats.flush_accounts_written.add(account_file.number_of_accounts); + self.metrics.flush_accounts_written.add(account_file.number_of_accounts); // update the reference AFTER the data exists for (pubkeys, offsets) |pubkey, offset| { @@ -1493,7 +1430,7 @@ pub const AccountsDB = struct { self.allocator.free(pubkeys); } - self.stats.time_flush.observe(timer.read().asNanos()); + self.metrics.time_flush.observe(timer.read().asNanos()); // return to queue for cleaning return file_id; @@ -1519,7 +1456,7 @@ pub const AccountsDB = struct { defer { const number_of_files = unclean_account_files.len; - self.stats.number_files_cleaned.add(number_of_files); + self.metrics.number_files_cleaned.add(number_of_files); } var num_zero_lamports: usize = 0; @@ -1642,19 +1579,19 @@ pub const AccountsDB = struct { } } references_to_delete.clearRetainingCapacity(); - self.stats.clean_references_deleted.set(references_to_delete.items.len); + self.metrics.clean_references_deleted.set(references_to_delete.items.len); self.logger.debug().logf( "cleaned slot {} - old_state: {}, zero_lamports: {}", .{ account_file.slot, num_old_states, num_zero_lamports }, ); } - self.stats.clean_files_queued_deletion.set(delete_account_files.count()); - self.stats.clean_files_queued_shrink.set(delete_account_files.count()); - self.stats.clean_slot_old_state.set(num_old_states); - self.stats.clean_slot_zero_lamports.set(num_zero_lamports); + self.metrics.clean_files_queued_deletion.set(delete_account_files.count()); + self.metrics.clean_files_queued_shrink.set(delete_account_files.count()); + self.metrics.clean_slot_old_state.set(num_old_states); + self.metrics.clean_slot_zero_lamports.set(num_zero_lamports); - self.stats.time_clean.observe(timer.read().asNanos()); + self.metrics.time_clean.observe(timer.read().asNanos()); return .{ .num_zero_lamports = num_zero_lamports, .num_old_states = num_old_states, @@ -1669,7 +1606,7 @@ pub const AccountsDB = struct { ) !void { const number_of_files = delete_account_files.len; defer { - self.stats.number_files_deleted.add(number_of_files); + self.metrics.number_files_deleted.add(number_of_files); } var delete_queue = try std.ArrayList(AccountFile).initCapacity( @@ -1755,7 +1692,7 @@ pub const AccountsDB = struct { defer { const number_of_files = shrink_account_files.len; - self.stats.number_files_shrunk.add(number_of_files); + self.metrics.number_files_shrunk.add(number_of_files); } var alive_pubkeys = std.AutoArrayHashMap(Pubkey, void).init(self.allocator); @@ -1817,9 +1754,9 @@ pub const AccountsDB = struct { std.debug.assert(accounts_dead_count > 0); total_accounts_deleted += accounts_dead_count; - self.stats.shrink_alive_accounts.observe(accounts_alive_count); - self.stats.shrink_dead_accounts.observe(accounts_dead_count); - self.stats.shrink_file_shrunk_by.observe(accounts_dead_size); + self.metrics.shrink_alive_accounts.observe(accounts_alive_count); + self.metrics.shrink_dead_accounts.observe(accounts_dead_count); + self.metrics.shrink_file_shrunk_by.observe(accounts_dead_size); self.logger.debug().logf("n alive accounts: {}", .{accounts_alive_count}); self.logger.debug().logf("n dead accounts: {}", .{accounts_dead_count}); @@ -1930,7 +1867,7 @@ pub const AccountsDB = struct { } } - self.stats.time_shrink.observe(timer.read().asNanos()); + self.metrics.time_shrink.observe(timer.read().asNanos()); return .{ .num_accounts_deleted = total_accounts_deleted, @@ -1978,7 +1915,7 @@ pub const AccountsDB = struct { self.allocator.free(accounts); self.allocator.free(pubkeys); - self.stats.time_purge.observe(timer.read().asNanos()); + self.metrics.time_purge.observe(timer.read().asNanos()); } // NOTE: we need to acquire locks which requires `self: *Self` but we never modify any data @@ -2814,6 +2751,68 @@ pub const AccountsDB = struct { } }; +pub const AccountsDBMetrics = struct { + number_files_flushed: *Counter, + number_files_cleaned: *Counter, + number_files_shrunk: *Counter, + number_files_deleted: *Counter, + + time_flush: *Histogram, + time_clean: *Histogram, + time_shrink: *Histogram, + time_purge: *Histogram, + + flush_account_file_size: *Histogram, + flush_accounts_written: *Counter, + + clean_references_deleted: *Gauge(u64), + clean_files_queued_deletion: *Gauge(u64), + clean_files_queued_shrink: *Gauge(u64), + clean_slot_old_state: *Gauge(u64), + clean_slot_zero_lamports: *Gauge(u64), + + shrink_file_shrunk_by: *Histogram, + shrink_alive_accounts: *Histogram, + shrink_dead_accounts: *Histogram, + + const Self = @This(); + + pub fn init() GetMetricError!Self { + return globalRegistry().initStruct(Self); + } + + pub fn histogramBucketsForField(comptime field_name: []const u8) []const f64 { + const HistogramKind = enum { + flush_account_file_size, + shrink_file_shrunk_by, + shrink_alive_accounts, + shrink_dead_accounts, + time_flush, + time_clean, + time_shrink, + time_purge, + }; + + const account_size_buckets = &.{ + // 10 bytes -> 10MB (solana max account size) + 10, 100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000, + }; + const account_count_buckets = &.{ 1, 5, 10, 50, 100, 500, 1_000, 10_000 }; + const nanosecond_buckets = &.{ + // 0.01ms -> 10ms + 1_000, 10_000, 100_000, 1_000_000, 10_000_000, + // 50ms -> 1000ms + 50_000_000, 100_000_000, 200_000_000, 400_000_000, 1_000_000_000, + }; + + return switch (@field(HistogramKind, field_name)) { + .flush_account_file_size, .shrink_file_shrunk_by => account_size_buckets, + .shrink_alive_accounts, .shrink_dead_accounts => account_count_buckets, + .time_flush, .time_clean, .time_shrink, .time_purge => nanosecond_buckets, + }; + } +}; + /// this is used when loading from a snapshot. it uses a fixed buffer allocator /// to allocate memory which is used to clone account data slices of account files. /// the memory is serialized into bincode and sent through the pipe. diff --git a/src/accountsdb/snapshots.zig b/src/accountsdb/snapshots.zig index c05f855a1..f805cc4e6 100644 --- a/src/accountsdb/snapshots.zig +++ b/src/accountsdb/snapshots.zig @@ -981,26 +981,26 @@ pub const AccountFileInfo = struct { pub const BankHashInfo = struct { accounts_delta_hash: Hash, accounts_hash: Hash, - stats: BankHashMetrics, + stats: BankHashStats, pub fn random(rand: std.Random) BankHashInfo { return .{ .accounts_delta_hash = Hash.random(rand), .accounts_hash = Hash.random(rand), - .stats = BankHashMetrics.random(rand), + .stats = BankHashStats.random(rand), }; } }; /// Analogous to [BankHashStats](https://github.com/anza-xyz/agave/blob/4c921ca276bbd5997f809dec1dd3937fb06463cc/accounts-db/src/accounts_db.rs#L1299) -pub const BankHashMetrics = struct { +pub const BankHashStats = struct { num_updated_accounts: u64, num_removed_accounts: u64, num_lamports_stored: u64, total_data_len: u64, num_executable_accounts: u64, - pub const zero_init: BankHashMetrics = .{ + pub const zero_init: BankHashStats = .{ .num_updated_accounts = 0, .num_removed_accounts = 0, .num_lamports_stored = 0, @@ -1008,7 +1008,7 @@ pub const BankHashMetrics = struct { .num_executable_accounts = 0, }; - pub fn random(rand: std.Random) BankHashMetrics { + pub fn random(rand: std.Random) BankHashStats { return .{ .num_updated_accounts = rand.int(u64), .num_removed_accounts = rand.int(u64), @@ -1023,7 +1023,7 @@ pub const BankHashMetrics = struct { data_len: u64, executable: bool, }; - pub fn update(stats: *BankHashMetrics, account: AccountData) void { + pub fn update(stats: *BankHashStats, account: AccountData) void { if (account.lamports == 0) { stats.num_removed_accounts += 1; } else { @@ -1034,7 +1034,7 @@ pub const BankHashMetrics = struct { stats.num_lamports_stored +%= account.lamports; } - pub fn accumulate(stats: *BankHashMetrics, other: BankHashMetrics) void { + pub fn accumulate(stats: *BankHashStats, other: BankHashStats) void { stats.num_updated_accounts += other.num_updated_accounts; stats.num_removed_accounts += other.num_removed_accounts; stats.total_data_len +%= other.total_data_len; diff --git a/src/geyser/core.zig b/src/geyser/core.zig index 2190bfb14..5eb75cbc6 100644 --- a/src/geyser/core.zig +++ b/src/geyser/core.zig @@ -95,7 +95,7 @@ pub const GeyserWriter = struct { /// channel which data is streamed into and then written to the pipe io_channel: *sig.sync.Channel([]u8), exit: *std.atomic.Value(bool), - stats: GeyserWriterMetrics, + metrics: GeyserWriterMetrics, /// set when the writer thread is running io_handle: ?std.Thread = null, @@ -119,7 +119,7 @@ pub const GeyserWriter = struct { const io_channel = try sig.sync.Channel([]u8).create(allocator); const io_allocator_state = try allocator.create(RecycleFBA(.{})); io_allocator_state.* = try RecycleFBA(.{}).init(allocator, io_fba_bytes); - const stats = try GeyserWriterMetrics.init(); + const metrics = try GeyserWriterMetrics.init(); return .{ .allocator = allocator, @@ -127,7 +127,7 @@ pub const GeyserWriter = struct { .io_allocator_state = io_allocator_state, .io_channel = io_channel, .file = file, - .stats = stats, + .metrics = metrics, .exit = exit, }; } @@ -159,7 +159,7 @@ pub const GeyserWriter = struct { return err; } }; - self.stats.n_payloads.inc(); + self.metrics.n_payloads.inc(); self.io_allocator.free(payload); } } @@ -192,7 +192,7 @@ pub const GeyserWriter = struct { const buf = blk: while (true) { const buf = self.io_allocator.alloc(u8, total_len) catch { // no memory available rn - unlock and wait - self.stats.recycle_fba_empty_loop_count.inc(); + self.metrics.recycle_fba_empty_loop_count.inc(); std.time.sleep(std.time.ns_per_ms); if (self.exit.load(.acquire)) { return error.MemoryBlockedWithExitSignaled; @@ -226,7 +226,7 @@ pub const GeyserWriter = struct { return WritePipeError.PipeBlockedWithExitSignaled; } else { // pipe is full but we dont need to exit, so we try again - self.stats.pipe_full_count.inc(); + self.metrics.pipe_full_count.inc(); continue; } } else { @@ -238,7 +238,7 @@ pub const GeyserWriter = struct { return WritePipeError.PipeClosed; } n_bytes_written_total += n_bytes_written; - self.stats.total_bytes.add(n_bytes_written); + self.metrics.total_bytes.add(n_bytes_written); } std.debug.assert(n_bytes_written_total == buf.len); @@ -271,7 +271,7 @@ pub const GeyserReader = struct { bincode_buf: []u8, /// NOTE: not thread-safe bincode_allocator: std.heap.FixedBufferAllocator, - stats: GeyserReaderMetrics, + metrics: GeyserReaderMetrics, exit: ?*std.atomic.Value(bool), const Self = @This(); @@ -300,9 +300,9 @@ pub const GeyserReader = struct { const fba = std.heap.FixedBufferAllocator.init(bincode_buf); - const stats = try GeyserReaderMetrics.init(); - stats.io_buf_size.set(allocator_config.io_buf_len); - stats.bincode_buf_size.set(allocator_config.bincode_buf_len); + const metrics = try GeyserReaderMetrics.init(); + metrics.io_buf_size.set(allocator_config.io_buf_len); + metrics.bincode_buf_size.set(allocator_config.bincode_buf_len); return .{ .file = file, @@ -310,7 +310,7 @@ pub const GeyserReader = struct { .io_buf = io_buf, .bincode_buf = bincode_buf, .bincode_allocator = fba, - .stats = stats, + .metrics = metrics, .exit = exit, }; } @@ -334,7 +334,7 @@ pub const GeyserReader = struct { pub fn readPayload(self: *Self) !struct { u64, VersionedAccountPayload } { const len = try self.readType(u64, 8); const versioned_payload = try self.readType(VersionedAccountPayload, len); - self.stats.total_payloads.inc(); + self.metrics.total_payloads.inc(); return .{ 8 + len, versioned_payload }; } @@ -346,7 +346,7 @@ pub const GeyserReader = struct { const new_buf = try self.allocator.alloc(u8, expected_n_bytes); self.allocator.free(self.io_buf); self.io_buf = new_buf; - self.stats.io_buf_size.set(expected_n_bytes); + self.metrics.io_buf_size.set(expected_n_bytes); } var total_bytes_read: u64 = 0; @@ -357,7 +357,7 @@ pub const GeyserReader = struct { return error.PipeBlockedWithExitSignaled; } else { // pipe is empty but we dont need to exit, so we try again - self.stats.pipe_empty_count.inc(); + self.metrics.pipe_empty_count.inc(); continue; } } else { @@ -370,7 +370,7 @@ pub const GeyserReader = struct { } total_bytes_read += n_bytes_read; } - self.stats.total_bytes.add(expected_n_bytes); + self.metrics.total_bytes.add(expected_n_bytes); while (true) { const data = bincode.readFromSlice( @@ -387,7 +387,7 @@ pub const GeyserReader = struct { self.bincode_buf = new_buf; self.bincode_allocator = std.heap.FixedBufferAllocator.init(self.bincode_buf); - self.stats.bincode_buf_size.set(new_size); + self.metrics.bincode_buf_size.set(new_size); continue; } else { return err; diff --git a/src/gossip/service.zig b/src/gossip/service.zig index 0f4704bdd..4378b57b6 100644 --- a/src/gossip/service.zig +++ b/src/gossip/service.zig @@ -167,7 +167,7 @@ pub const GossipService = struct { thread_pool: *ThreadPool, echo_server: EchoServer, - stats: GossipMetrics, + metrics: GossipMetrics, const Self = @This(); @@ -226,7 +226,7 @@ pub const GossipService = struct { for (eps) |ep| entrypoint_list.appendAssumeCapacity(.{ .addr = ep }); } - const stats = try GossipMetrics.init(logger); + const metrics = try GossipMetrics.init(logger); const ping_cache_ptr = try allocator.create(PingCache); ping_cache_ptr.* = try PingCache.init( @@ -261,7 +261,7 @@ pub const GossipService = struct { .echo_server = echo_server, .logger = logger, .thread_pool = thread_pool, - .stats = stats, + .metrics = metrics, }; } @@ -479,7 +479,7 @@ pub const GossipService = struct { // PERF: investigate CPU pinning var task_search_start_idx: usize = 0; while (self.packet_incoming_channel.receive()) |packet| { - defer self.stats.gossip_packets_received_total.inc(); + defer self.metrics.gossip_packets_received_total.inc(); const acquired_task_idx = VerifyMessageTask.awaitAndAcquireFirstAvailableTask(tasks, task_search_start_idx); task_search_start_idx = (acquired_task_idx + 1) % tasks.len; @@ -597,7 +597,7 @@ pub const GossipService = struct { // Allow spy nodes with shred-verion == 0 to pull from other nodes. if (data.shred_version != 0 and data.shred_version != self.my_shred_version.load(.monotonic)) { // non-matching shred version - self.stats.pull_requests_dropped.add(1); + self.metrics.pull_requests_dropped.add(1); should_process_value = false; } }, @@ -609,13 +609,13 @@ pub const GossipService = struct { // Allow spy nodes with shred-verion == 0 to pull from other nodes. if (data.shred_version != 0 and data.shred_version != self.my_shred_version.load(.monotonic)) { // non-matching shred version - self.stats.pull_requests_dropped.add(1); + self.metrics.pull_requests_dropped.add(1); should_process_value = false; } }, // only contact info supported else => { - self.stats.pull_requests_dropped.add(1); + self.metrics.pull_requests_dropped.add(1); should_process_value = false; }, } @@ -623,7 +623,7 @@ pub const GossipService = struct { const from_addr = SocketAddr.fromEndpoint(&message.from_endpoint); if (from_addr.isUnspecified() or from_addr.port() == 0) { // unable to respond to these messages - self.stats.pull_requests_dropped.add(1); + self.metrics.pull_requests_dropped.add(1); should_process_value = false; } @@ -646,7 +646,7 @@ pub const GossipService = struct { const too_old = prune_wallclock < now -| PRUNE_MSG_TIMEOUT.asMillis(); const incorrect_destination = !prune_data.destination.equals(&self.my_pubkey); if (too_old or incorrect_destination) { - self.stats.prune_messages_dropped.add(1); + self.metrics.prune_messages_dropped.add(1); continue; } try prune_messages.append(prune_data); @@ -655,7 +655,7 @@ pub const GossipService = struct { const from_addr = SocketAddr.fromEndpoint(&message.from_endpoint); if (from_addr.isUnspecified() or from_addr.port() == 0) { // unable to respond to these messages - self.stats.ping_messages_dropped.add(1); + self.metrics.ping_messages_dropped.add(1); continue; } @@ -676,13 +676,13 @@ pub const GossipService = struct { if (msg_count == 0) continue; // track metrics - self.stats.gossip_packets_verified_total.add(msg_count); - self.stats.ping_messages_recv.add(ping_messages.items.len); - self.stats.pong_messages_recv.add(pong_messages.items.len); - self.stats.push_messages_recv.add(push_messages.items.len); - self.stats.pull_requests_recv.add(pull_requests.items.len); - self.stats.pull_responses_recv.add(pull_responses.items.len); - self.stats.prune_messages_recv.add(prune_messages.items.len); + self.metrics.gossip_packets_verified_total.add(msg_count); + self.metrics.ping_messages_recv.add(ping_messages.items.len); + self.metrics.pong_messages_recv.add(pong_messages.items.len); + self.metrics.push_messages_recv.add(push_messages.items.len); + self.metrics.pull_requests_recv.add(pull_requests.items.len); + self.metrics.pull_responses_recv.add(pull_responses.items.len); + self.metrics.prune_messages_recv.add(prune_messages.items.len); var gossip_packets_processed_total: usize = 0; gossip_packets_processed_total += ping_messages.items.len; @@ -693,9 +693,9 @@ pub const GossipService = struct { gossip_packets_processed_total += prune_messages.items.len; // only add the count once we've finished processing - defer self.stats.gossip_packets_processed_total.add(gossip_packets_processed_total); + defer self.metrics.gossip_packets_processed_total.add(gossip_packets_processed_total); - self.stats.maybeLog(); + self.metrics.maybeLog(); // handle batch messages if (push_messages.items.len > 0) { @@ -704,7 +704,7 @@ pub const GossipService = struct { self.logger.err().logf("handleBatchPushMessages failed: {}", .{err}); }; const elapsed = x_timer.read().asMillis(); - self.stats.handle_batch_push_time.observe(elapsed); + self.metrics.handle_batch_push_time.observe(elapsed); push_messages.clearRetainingCapacity(); } @@ -713,7 +713,7 @@ pub const GossipService = struct { var x_timer = try sig.time.Timer.start(); self.handleBatchPruneMessages(&prune_messages); const elapsed = x_timer.read().asMillis(); - self.stats.handle_batch_prune_time.observe(elapsed); + self.metrics.handle_batch_prune_time.observe(elapsed); prune_messages.clearRetainingCapacity(); } @@ -724,7 +724,7 @@ pub const GossipService = struct { self.logger.err().logf("handleBatchPullRequest failed: {}", .{err}); }; const elapsed = x_timer.read().asMillis(); - self.stats.handle_batch_pull_req_time.observe(elapsed); + self.metrics.handle_batch_pull_req_time.observe(elapsed); pull_requests.clearRetainingCapacity(); } @@ -735,7 +735,7 @@ pub const GossipService = struct { self.logger.err().logf("handleBatchPullResponses failed: {}", .{err}); }; const elapsed = x_timer.read().asMillis(); - self.stats.handle_batch_pull_resp_time.observe(elapsed); + self.metrics.handle_batch_pull_resp_time.observe(elapsed); pull_responses.clearRetainingCapacity(); } @@ -746,7 +746,7 @@ pub const GossipService = struct { self.logger.err().logf("handleBatchPingMessages failed: {}", .{err}); }; const elapsed = x_timer.read().asMillis(); - self.stats.handle_batch_ping_time.observe(elapsed); + self.metrics.handle_batch_ping_time.observe(elapsed); ping_messages.clearRetainingCapacity(); } @@ -755,7 +755,7 @@ pub const GossipService = struct { var x_timer = try sig.time.Timer.start(); self.handleBatchPongMessages(&pong_messages); const elapsed = x_timer.read().asMillis(); - self.stats.handle_batch_pong_time.observe(elapsed); + self.metrics.handle_batch_pong_time.observe(elapsed); pong_messages.clearRetainingCapacity(); } @@ -794,12 +794,12 @@ pub const GossipService = struct { break :err_blk 0; }; const elapsed = x_timer.read().asMillis(); - self.stats.handle_trim_table_time.observe(elapsed); + self.metrics.handle_trim_table_time.observe(elapsed); break :blk n_pubkeys_dropped; } else 0; - self.stats.table_pubkeys_dropped.add(n_pubkeys_dropped); + self.metrics.table_pubkeys_dropped.add(n_pubkeys_dropped); } /// main gossip loop for periodically sending new GossipMessagemessages. @@ -842,7 +842,7 @@ pub const GossipService = struct { for (packets.items) |packet| { try self.packet_outgoing_channel.send(packet); } - self.stats.pull_requests_sent.add(packets.items.len); + self.metrics.pull_requests_sent.add(packets.items.len); } // new push msgs @@ -852,7 +852,7 @@ pub const GossipService = struct { break :blk null; }; if (maybe_push_packets) |push_packets| { - self.stats.push_messages_sent.add(push_packets.items.len); + self.metrics.push_messages_sent.add(push_packets.items.len); for (push_packets.items) |push_packet| { try self.packet_outgoing_channel.send(push_packet); } @@ -917,16 +917,16 @@ pub const GossipService = struct { const n_entries = gossip_table.store.count(); const n_pubkeys = gossip_table.pubkey_to_values.count(); - self.stats.table_n_values.set(n_entries); - self.stats.table_n_pubkeys.set(n_pubkeys); + self.metrics.table_n_values.set(n_entries); + self.metrics.table_n_pubkeys.set(n_pubkeys); const incoming_channel_length = self.packet_incoming_channel.len(); - self.stats.incoming_channel_length.set(incoming_channel_length); + self.metrics.incoming_channel_length.set(incoming_channel_length); const outgoing_channel_length = self.packet_outgoing_channel.len(); - self.stats.outgoing_channel_length.set(outgoing_channel_length); + self.metrics.outgoing_channel_length.set(outgoing_channel_length); - self.stats.verified_channel_length.set(self.verified_incoming_channel.len()); + self.metrics.verified_channel_length.set(self.verified_incoming_channel.len()); } pub fn rotateActiveSet(self: *Self, rand: std.Random) !void { @@ -1364,7 +1364,7 @@ pub const GossipService = struct { if (task.output.items.len > 0) { for (task.output.items) |output| { try self.packet_outgoing_channel.send(output); - self.stats.pull_responses_sent.add(1); + self.metrics.pull_responses_sent.add(1); } task.output_consumed.store(true, .release); } @@ -1417,7 +1417,7 @@ pub const GossipService = struct { .log("gossip: recv ping"); try self.packet_outgoing_channel.send(packet); - self.stats.pong_messages_sent.add(1); + self.metrics.pong_messages_sent.add(1); } } @@ -1461,15 +1461,15 @@ pub const GossipService = struct { for (insert_results.items) |result| { switch (result) { - .InsertedNewEntry => self.stats.pull_response_n_new_inserts.inc(), - .OverwroteExistingEntry => self.stats.pull_response_n_overwrite_existing.inc(), - .IgnoredOldValue => self.stats.pull_response_n_old_value.inc(), - .IgnoredDuplicateValue => self.stats.pull_response_n_duplicate_value.inc(), - .IgnoredTimeout => self.stats.pull_response_n_timeouts.inc(), + .InsertedNewEntry => self.metrics.pull_response_n_new_inserts.inc(), + .OverwroteExistingEntry => self.metrics.pull_response_n_overwrite_existing.inc(), + .IgnoredOldValue => self.metrics.pull_response_n_old_value.inc(), + .IgnoredDuplicateValue => self.metrics.pull_response_n_duplicate_value.inc(), + .IgnoredTimeout => self.metrics.pull_response_n_timeouts.inc(), .GossipTableFull => {}, } } - self.stats.pull_response_n_invalid_shred_version.add(invalid_shred_count); + self.metrics.pull_response_n_invalid_shred_version.add(invalid_shred_count); for (insert_results.items, 0..) |result, index| { if (result.wasInserted()) { @@ -1578,7 +1578,7 @@ pub const GossipService = struct { var timer = try sig.time.Timer.start(); defer { const elapsed = timer.read().asMillis(); - self.stats.push_messages_time_to_insert.observe(elapsed); + self.metrics.push_messages_time_to_insert.observe(elapsed); } var gossip_table, var gossip_table_lg = self.gossip_table_rw.writeWithLock(); @@ -1605,22 +1605,22 @@ pub const GossipService = struct { var insert_fail_count: u64 = 0; for (insert_results.items) |result| { switch (result) { - .InsertedNewEntry => self.stats.push_message_n_new_inserts.inc(), + .InsertedNewEntry => self.metrics.push_message_n_new_inserts.inc(), .OverwroteExistingEntry => |old_data| { - self.stats.push_message_n_overwrite_existing.inc(); + self.metrics.push_message_n_overwrite_existing.inc(); // if the value was overwritten, we need to free the old value bincode.free(self.gossip_value_allocator, old_data); }, - .IgnoredOldValue => self.stats.push_message_n_old_value.inc(), - .IgnoredDuplicateValue => self.stats.push_message_n_duplicate_value.inc(), - .IgnoredTimeout => self.stats.push_message_n_timeouts.inc(), + .IgnoredOldValue => self.metrics.push_message_n_old_value.inc(), + .IgnoredDuplicateValue => self.metrics.push_message_n_duplicate_value.inc(), + .IgnoredTimeout => self.metrics.push_message_n_timeouts.inc(), .GossipTableFull => {}, } if (!result.wasInserted()) { insert_fail_count += 1; } } - self.stats.push_message_n_invalid_shred_version.add(invalid_shred_count); + self.metrics.push_message_n_invalid_shred_version.add(invalid_shred_count); // logging this message takes too long and causes a bottleneck // self.logger @@ -1681,7 +1681,7 @@ pub const GossipService = struct { var timer = try sig.time.Timer.start(); defer { const elapsed = timer.read().asMillis(); - self.stats.push_messages_time_build_prune.observe(elapsed); + self.metrics.push_messages_time_build_prune.observe(elapsed); } var pubkey_to_failed_origins_iter = pubkey_to_failed_origins.iterator(); @@ -1718,7 +1718,7 @@ pub const GossipService = struct { packet.addr = from_endpoint; try self.packet_outgoing_channel.send(packet); - self.stats.prune_messages_sent.add(1); + self.metrics.prune_messages_sent.add(1); } } @@ -1746,7 +1746,7 @@ pub const GossipService = struct { // - if all nodes have zero stake: epoch duration (TODO: this might be unreasonably large) // - if any other nodes have non-zero stake: DATA_TIMEOUT (15s) const n_values_removed = try gossip_table.removeOldLabels(now, DEFAULT_EPOCH_DURATION.asMillis()); - self.stats.table_old_values_removed.add(n_values_removed); + self.metrics.table_old_values_removed.add(n_values_removed); } const failed_insert_cutoff_timestamp = now -| FAILED_INSERTS_RETENTION.asMillis(); @@ -1834,7 +1834,7 @@ pub const GossipService = struct { packet.addr = ping_and_addr.socket.toEndpoint(); try self.packet_outgoing_channel.send(packet); - self.stats.ping_messages_sent.add(1); + self.metrics.ping_messages_sent.add(1); } } @@ -2544,7 +2544,7 @@ test "handle old prune & pull request message" { gossip_service.shutdown(); handle.join(); - try std.testing.expect(gossip_service.stats.pull_requests_dropped.get() == 2); + try std.testing.expect(gossip_service.metrics.pull_requests_dropped.get() == 2); } test "handle pull request" { @@ -3251,7 +3251,7 @@ pub const BenchmarkGossipServiceGeneral = struct { ); gossip_service.echo_server.kill(); // we dont need this rn defer { - gossip_service.stats.reset(); + gossip_service.metrics.reset(); gossip_service.deinit(); } @@ -3366,7 +3366,7 @@ pub const BenchmarkGossipServicePullRequests = struct { ); gossip_service.echo_server.kill(); // we dont need this rn defer { - gossip_service.stats.reset(); + gossip_service.metrics.reset(); gossip_service.deinit(); } diff --git a/src/transaction_sender/service.zig b/src/transaction_sender/service.zig index 622b28d7c..a475ed253 100644 --- a/src/transaction_sender/service.zig +++ b/src/transaction_sender/service.zig @@ -41,7 +41,7 @@ const globalRegistry = sig.prometheus.globalRegistry; pub const Service = struct { allocator: std.mem.Allocator, config: Config, - stats: Metrics, + metrics: Metrics, transaction_pool: TransactionPool, leader_info_rw: RwMux(LeaderInfo), send_socket: UdpSocket, @@ -61,7 +61,7 @@ pub const Service = struct { return .{ .allocator = allocator, .config = config, - .stats = try Metrics.init(), + .metrics = try Metrics.init(), .transaction_pool = TransactionPool.init( allocator, config.pool_max_size, @@ -124,7 +124,7 @@ pub const Service = struct { while (!self.exit.load(.unordered)) { const transaction = self.receive_channel.receive() orelse break; - self.stats.transactions_received_count.add(1); + self.metrics.transactions_received_count.add(1); if (!transaction_batch.contains(transaction.signature) and !self.transaction_pool.contains(transaction.signature)) @@ -169,15 +169,15 @@ pub const Service = struct { var timer = try Timer.start(); try self.processTransactions(&rpc_client); - self.stats.process_transactions_latency_millis.set(timer.lap().asMillis()); + self.metrics.process_transactions_latency_millis.set(timer.lap().asMillis()); try self.retryTransactions(); - self.stats.retry_transactions_latency_millis.set(timer.lap().asMillis()); + self.metrics.retry_transactions_latency_millis.set(timer.lap().asMillis()); self.transaction_pool.purge(); - self.stats.transactions_pending.set(self.transaction_pool.count()); + self.metrics.transactions_pending.set(self.transaction_pool.count()); - self.stats.log(self.logger); + self.metrics.log(self.logger); } } @@ -190,7 +190,7 @@ pub const Service = struct { ); defer block_height_response.deinit(); const block_height = try block_height_response.result(); - self.stats.rpc_block_height_latency_millis.set(block_height_timer.read().asMillis()); + self.metrics.rpc_block_height_latency_millis.set(block_height_timer.read().asMillis()); // We need to hold a read lock until we are finished using the signatures and transactions, otherwise // the receiver thread could add new transactions and corrupt the underlying array @@ -205,37 +205,37 @@ pub const Service = struct { ); defer signature_statuses_response.deinit(); const signature_statuses = try signature_statuses_response.result(); - self.stats.rpc_signature_statuses_latency_millis.set(signature_statuses_timer.read().asMillis()); + self.metrics.rpc_signature_statuses_latency_millis.set(signature_statuses_timer.read().asMillis()); for (signature_statuses.value, signatures, transactions) |maybe_signature_status, signature, transaction_info| { if (maybe_signature_status) |signature_status| { if (signature_status.confirmations == null) { try self.transaction_pool.drop_signatures.append(signature); - self.stats.transactions_rooted_count.add(1); + self.metrics.transactions_rooted_count.add(1); continue; } if (signature_status.err) |_| { try self.transaction_pool.drop_signatures.append(signature); - self.stats.transactions_failed_count.add(1); + self.metrics.transactions_failed_count.add(1); continue; } if (transaction_info.isExpired(block_height)) { try self.transaction_pool.drop_signatures.append(signature); - self.stats.transactions_expired_count.add(1); + self.metrics.transactions_expired_count.add(1); continue; } } else { if (transaction_info.exceededMaxRetries(self.config.default_max_retries)) { try self.transaction_pool.drop_signatures.append(signature); - self.stats.transactions_exceeded_max_retries_count.add(1); + self.metrics.transactions_exceeded_max_retries_count.add(1); continue; } if (transaction_info.shouldRetry(self.config.retry_rate)) { try self.transaction_pool.retry_signatures.append(signature); - self.stats.transactions_retry_count.add(1); + self.metrics.transactions_retry_count.add(1); } } } @@ -273,8 +273,8 @@ pub const Service = struct { defer leader_info_lg.unlock(); break :blk try leader_info.getLeaderAddresses(self.allocator); }; - self.stats.get_leader_addresses_latency_millis.set(get_leader_addresses_timer.read().asMillis()); - self.stats.number_of_leaders_identified.set(leader_addresses.items.len); + self.metrics.get_leader_addresses_latency_millis.set(get_leader_addresses_timer.read().asMillis()); + self.metrics.number_of_leaders_identified.set(leader_addresses.items.len); return leader_addresses; } @@ -301,7 +301,7 @@ pub const Service = struct { tx.last_sent_time = last_sent_time; } - self.stats.transactions_sent_count.add(transactions.len); + self.metrics.transactions_sent_count.add(transactions.len); } };