diff --git a/src/accountsdb/db.zig b/src/accountsdb/db.zig index bf4ad0a8e..40abf48fb 100644 --- a/src/accountsdb/db.zig +++ b/src/accountsdb/db.zig @@ -43,6 +43,7 @@ const Pubkey = sig.core.Pubkey; const Slot = sig.core.Slot; const NestedHashTree = sig.utils.merkle_tree.NestedHashTree; +const Logger = sig.trace.log.Logger; const GeyserWriter = sig.geyser.GeyserWriter; @@ -55,8 +56,6 @@ const WeightedAliasSampler = sig.rand.WeightedAliasSampler; const RwMux = sig.sync.RwMux; -const Logger = sig.trace.log.Logger; - const parallelUnpackZstdTarBall = sig.accounts_db.snapshots.parallelUnpackZstdTarBall; const spawnThreadTasks = sig.utils.thread.spawnThreadTasks; const printTimeEstimate = sig.time.estimate.printTimeEstimate; @@ -634,7 +633,7 @@ pub const AccountsDB = struct { if (print_progress and progress_timer.read().asNanos() > DB_LOG_RATE.asNanos()) { printTimeEstimate( - self.logger, + self.logger.withScope(@typeName(Self)), &timer, n_account_files, file_count, @@ -669,7 +668,7 @@ pub const AccountsDB = struct { if (print_progress and progress_timer.read().asNanos() > DB_LOG_RATE.asNanos()) { printTimeEstimate( - self.logger, + self.logger.withScope(@typeName(Self)), &timer, n_accounts_total, ref_count, @@ -695,7 +694,7 @@ pub const AccountsDB = struct { .data_len = self.account_index.pubkey_ref_map.numberOfShards(), .max_threads = n_threads, .params = .{ - self.logger, + self.logger.unscoped(), &self.account_index, thread_dbs, }, @@ -758,11 +757,12 @@ pub const AccountsDB = struct { /// combines multiple thread indexes into the given index. /// each bin is also sorted by pubkey. pub fn combineThreadIndexesMultiThread( - logger: Logger, + logger_: Logger, index: *AccountIndex, thread_dbs: []const AccountsDB, task: sig.utils.thread.TaskParams, ) !void { + const logger = logger_.withScope(@typeName(Self)); const shard_start_index = task.start_index; const shard_end_index = task.end_index; @@ -1142,7 +1142,7 @@ pub const AccountsDB = struct { if (print_progress and progress_timer.read() > DB_LOG_RATE.asNanos()) { printTimeEstimate( - self.logger, + self.logger.withScope(@typeName(Self)), &timer, shards.len, count, diff --git a/src/accountsdb/download.zig b/src/accountsdb/download.zig index f52795386..894ce735e 100644 --- a/src/accountsdb/download.zig +++ b/src/accountsdb/download.zig @@ -10,9 +10,13 @@ const GossipTable = sig.gossip.GossipTable; const ThreadSafeContactInfo = sig.gossip.data.ThreadSafeContactInfo; const GossipService = sig.gossip.GossipService; const Logger = sig.trace.Logger; +const ScopedLogger = sig.trace.ScopedLogger; const DOWNLOAD_PROGRESS_UPDATES_NS = 6 * std.time.ns_per_s; +// The identifier for the scoped logger used in this file. +const LOG_SCOPE = "accountsdb.download"; + /// Analogous to [PeerSnapshotHash](https://github.com/anza-xyz/agave/blob/f868aa38097094e4fb78a885b6fb27ce0e43f5c7/validator/src/bootstrap.rs#L342) const PeerSnapshotHash = struct { contact_info: ThreadSafeContactInfo, @@ -157,13 +161,14 @@ pub fn findPeersToDownloadFromAssumeCapacity( /// note: gossip_service must be running. pub fn downloadSnapshotsFromGossip( allocator: std.mem.Allocator, - logger: Logger, + logger_: Logger, // if null, then we trust any peer for snapshot download maybe_trusted_validators: ?[]const Pubkey, gossip_service: *GossipService, output_dir: std.fs.Dir, min_mb_per_sec: usize, ) !void { + const logger = logger_.withScope(LOG_SCOPE); logger .info() .logf("starting snapshot download with min download speed: {d} MB/s", .{min_mb_per_sec}); @@ -240,7 +245,7 @@ pub fn downloadSnapshotsFromGossip( downloadFile( allocator, - logger, + logger.unscoped(), snapshot_url, output_dir, snapshot_filename, @@ -281,7 +286,7 @@ pub fn downloadSnapshotsFromGossip( logger.info().logf("downloading inc_snapshot from: {s}", .{inc_snapshot_url}); _ = downloadFile( allocator, - logger, + logger.unscoped(), inc_snapshot_url, output_dir, inc_snapshot_filename, @@ -304,7 +309,7 @@ pub fn downloadSnapshotsFromGossip( const DownloadProgress = struct { file: std.fs.File, min_mb_per_second: ?usize, - logger: Logger, + logger: ScopedLogger(@typeName(Self)), progress_timer: sig.time.Timer, bytes_read: u64 = 0, @@ -325,7 +330,7 @@ const DownloadProgress = struct { try file.setEndPos(download_size); return .{ - .logger = logger, + .logger = logger.withScope(@typeName(Self)), .file = file, .min_mb_per_second = min_mb_per_second, .progress_timer = try sig.time.Timer.start(), @@ -476,12 +481,13 @@ fn enableProgress( /// the main errors include {HeaderRequestFailed, NoContentLength, TooSlow} or a curl-related error pub fn downloadFile( allocator: std.mem.Allocator, - logger: Logger, + logger_: Logger, url: [:0]const u8, output_dir: std.fs.Dir, filename: []const u8, min_mb_per_second: ?usize, ) !void { + const logger = logger_.withScope(LOG_SCOPE); var easy = try curl.Easy.init(allocator, .{}); defer easy.deinit(); @@ -503,7 +509,7 @@ pub fn downloadFile( // timeout will need to be larger easy.timeout_ms = std.time.ms_per_hour * 5; // 5 hours is probs too long but its ok var download_progress = try DownloadProgress.init( - logger, + logger.unscoped(), output_dir, filename, download_size, diff --git a/src/accountsdb/index.zig b/src/accountsdb/index.zig index 67e642f03..968b81839 100644 --- a/src/accountsdb/index.zig +++ b/src/accountsdb/index.zig @@ -66,11 +66,12 @@ pub const AccountIndex = struct { pub fn init( allocator: std.mem.Allocator, - logger: sig.trace.Logger, + logger_: sig.trace.Logger, allocator_config: AllocatorConfig, /// number of shards for the pubkey_ref_map number_of_shards: usize, ) !Self { + const logger = logger_.withScope(@typeName((Self))); const reference_allocator: ReferenceAllocator = switch (allocator_config) { .ram => |ram| blk: { logger.info().logf("using ram memory for account index", .{}); @@ -80,7 +81,7 @@ pub const AccountIndex = struct { var index_dir = try disk.accountsdb_dir.makeOpenPath("index", .{}); errdefer index_dir.close(); const disk_allocator = try allocator.create(DiskMemoryAllocator); - disk_allocator.* = .{ .dir = index_dir, .logger = logger }; + disk_allocator.* = .{ .dir = index_dir, .logger = logger.withScope(@typeName(DiskMemoryAllocator)) }; logger.info().logf("using disk memory (@{s}) for account index", .{sig.utils.fmt.tryRealPath(index_dir, ".")}); break :blk .{ .disk = .{ .dma = disk_allocator, .ptr_allocator = allocator } }; }, diff --git a/src/accountsdb/snapshots.zig b/src/accountsdb/snapshots.zig index 73a01c988..0c9d71289 100644 --- a/src/accountsdb/snapshots.zig +++ b/src/accountsdb/snapshots.zig @@ -1725,10 +1725,11 @@ pub const AllSnapshotFields = struct { pub fn fromFiles( allocator: std.mem.Allocator, - logger: Logger, + logger_: Logger, snapshot_dir: std.fs.Dir, files: SnapshotFiles, ) !Self { + const logger = logger_.withScope(@typeName((Self))); // unpack const full_fields = blk: { const rel_path_bounded = sig.utils.fmt.boundedFmt("snapshots/{0}/{0}", .{files.full_snapshot.slot}); diff --git a/src/cmd/cmd.zig b/src/cmd/cmd.zig index 6c9ecf83c..3a31ab206 100644 --- a/src/cmd/cmd.zig +++ b/src/cmd/cmd.zig @@ -20,6 +20,7 @@ const GenesisConfig = sig.accounts_db.GenesisConfig; const GossipService = sig.gossip.GossipService; const IpAddr = sig.net.IpAddr; const Logger = sig.trace.Logger; +const ScopedLogger = sig.trace.ScopedLogger; const Network = config.Network; const ChannelPrintLogger = sig.trace.ChannelPrintLogger; const Pubkey = sig.core.Pubkey; @@ -56,6 +57,9 @@ else const base58Encoder = base58.Encoder.init(.{}); +// The identifier for the scoped logger used in this file. +const LOG_SCOPE = "cmd"; + pub fn run() !void { defer { // _ = gpa.deinit(); TODO: this causes literally thousands of leaks @@ -427,8 +431,8 @@ pub fn run() !void { \\ NOTE: this means that this command *requires* a leader schedule to be provided \\ (which would usually be derived from the accountsdb snapshot). \\ - \\ NOTE: this command also requires `start_slot` (`--test-repair-for-slot`) to be given as well - \\ (which is usually derived from the accountsdb snapshot). + \\ NOTE: this command also requires `start_slot` (`--test-repair-for-slot`) to be given as well + \\ (which is usually derived from the accountsdb snapshot). \\ This can be done with `--test-repair-for-slot $(solana slot -u testnet)` \\ for testnet or another `-u` for mainnet/devnet. }, @@ -685,7 +689,7 @@ fn validator() !void { service_manager.deinit(); } - const geyser_writer = try buildGeyserWriter(allocator, app_base.logger); + const geyser_writer = try buildGeyserWriter(allocator, app_base.logger.unscoped()); defer { if (geyser_writer) |geyser| { geyser.deinit(); @@ -695,7 +699,7 @@ fn validator() !void { const snapshot = try loadSnapshot( allocator, - app_base.logger, + app_base.logger.unscoped(), gossip_service, true, geyser_writer, @@ -716,12 +720,12 @@ fn validator() !void { // blockstore var blockstore_db = try sig.ledger.BlockstoreDB.open( allocator, - app_base.logger, + app_base.logger.unscoped(), sig.VALIDATOR_DIR ++ "blockstore", ); const shred_inserter = try sig.ledger.ShredInserter.init( allocator, - app_base.logger, + app_base.logger.unscoped(), app_base.metrics_registry, blockstore_db, ); @@ -739,7 +743,7 @@ fn validator() !void { defer allocator.destroy(blockstore_reader); blockstore_reader.* = try BlockstoreReader.init( allocator, - app_base.logger, + app_base.logger.unscoped(), blockstore_db, app_base.metrics_registry, lowest_cleanup_slot, @@ -747,7 +751,7 @@ fn validator() !void { ); var cleanup_service_handle = try std.Thread.spawn(.{}, sig.ledger.cleanup_service.run, .{ - app_base.logger, + app_base.logger.unscoped(), blockstore_reader, &blockstore_db, lowest_cleanup_slot, @@ -764,7 +768,7 @@ fn validator() !void { shred_col_conf, ShredCollectorDependencies{ .allocator = allocator, - .logger = app_base.logger, + .logger = app_base.logger.unscoped(), .registry = app_base.metrics_registry, .random = prng.random(), .my_keypair = &app_base.my_keypair, @@ -817,12 +821,12 @@ fn shredCollector() !void { // blockstore var blockstore_db = try sig.ledger.BlockstoreDB.open( allocator, - app_base.logger, + app_base.logger.unscoped(), sig.VALIDATOR_DIR ++ "blockstore", ); const shred_inserter = try sig.ledger.ShredInserter.init( allocator, - app_base.logger, + app_base.logger.unscoped(), app_base.metrics_registry, blockstore_db, ); @@ -840,7 +844,7 @@ fn shredCollector() !void { defer allocator.destroy(blockstore_reader); blockstore_reader.* = try BlockstoreReader.init( allocator, - app_base.logger, + app_base.logger.unscoped(), blockstore_db, app_base.metrics_registry, lowest_cleanup_slot, @@ -848,7 +852,7 @@ fn shredCollector() !void { ); var cleanup_service_handle = try std.Thread.spawn(.{}, sig.ledger.cleanup_service.run, .{ - app_base.logger, + app_base.logger.unscoped(), blockstore_reader, &blockstore_db, lowest_cleanup_slot, @@ -865,7 +869,7 @@ fn shredCollector() !void { shred_col_conf, .{ .allocator = allocator, - .logger = app_base.logger, + .logger = app_base.logger.unscoped(), .registry = app_base.metrics_registry, .random = prng.random(), .my_keypair = &app_base.my_keypair, @@ -884,7 +888,8 @@ fn shredCollector() !void { const GeyserWriter = sig.geyser.GeyserWriter; -fn buildGeyserWriter(allocator: std.mem.Allocator, logger: Logger) !?*GeyserWriter { +fn buildGeyserWriter(allocator: std.mem.Allocator, logger_: Logger) !?*GeyserWriter { + const logger = logger_.withScope(LOG_SCOPE); var geyser_writer: ?*GeyserWriter = null; if (config.current.geyser.enable) { logger.info().log("Starting GeyserWriter..."); @@ -925,7 +930,7 @@ fn printManifest() !void { var snapshots = try AllSnapshotFields.fromFiles( allocator, - app_base.logger, + app_base.logger.unscoped(), snapshot_dir, snapshot_file_info, ); @@ -951,7 +956,7 @@ fn createSnapshot() !void { const snapshot_result = try loadSnapshot( allocator, - app_base.logger, + app_base.logger.unscoped(), null, false, null, @@ -997,7 +1002,7 @@ fn validateSnapshot() !void { var snapshot_dir = try std.fs.cwd().makeOpenPath(snapshot_dir_str, .{}); defer snapshot_dir.close(); - const geyser_writer = try buildGeyserWriter(allocator, app_base.logger); + const geyser_writer = try buildGeyserWriter(allocator, app_base.logger.unscoped()); defer { if (geyser_writer) |geyser| { geyser.deinit(); @@ -1007,7 +1012,7 @@ fn validateSnapshot() !void { const snapshot_result = try loadSnapshot( allocator, - app_base.logger, + app_base.logger.unscoped(), null, true, geyser_writer, @@ -1028,7 +1033,7 @@ fn printLeaderSchedule() !void { app_base.logger.info().log("Downloading a snapshot to calculate the leader schedule."); const loaded_snapshot = loadSnapshot( allocator, - app_base.logger, + app_base.logger.unscoped(), null, true, null, @@ -1106,7 +1111,7 @@ pub fn testTransactionSenderService() !void { // this handles transactions and forwards them to leaders TPU ports var transaction_sender_service = try sig.transaction_sender.Service.init( allocator, - app_base.logger, + app_base.logger.unscoped(), .{ .cluster = cluster, .socket = SocketAddr.init(app_base.my_ip, 0) }, transaction_channel, &gossip_service.gossip_table_rw, @@ -1120,7 +1125,7 @@ pub fn testTransactionSenderService() !void { ); // rpc is used to get blockhashes and other balance information - var rpc_client = sig.rpc.Client.init(allocator, cluster, .{ .logger = app_base.logger }); + var rpc_client = sig.rpc.Client.init(allocator, cluster, .{ .logger = app_base.logger.unscoped() }); defer rpc_client.deinit(); // this sends mock txs to the transaction sender @@ -1129,7 +1134,7 @@ pub fn testTransactionSenderService() !void { transaction_channel, rpc_client, &app_base.exit, - app_base.logger, + app_base.logger.unscoped(), ); // send and confirm mock transactions try mock_transfer_service.run( @@ -1149,7 +1154,7 @@ const AppBase = struct { counter: std.atomic.Value(usize) = std.atomic.Value(usize).init(0), closed: bool, - logger: Logger, + logger: ScopedLogger(@typeName(Self)), metrics_registry: *sig.prometheus.Registry(.{}), metrics_thread: std.Thread, my_keypair: KeyPair, @@ -1158,8 +1163,10 @@ const AppBase = struct { my_ip: IpAddr, my_port: u16, + const Self = @This(); + fn init(allocator: Allocator) !AppBase { - const logger = try spawnLogger(); + const logger = (try spawnLogger()).withScope(@typeName(Self)); errdefer logger.deinit(); const metrics_registry = globalRegistry(); @@ -1167,12 +1174,12 @@ const AppBase = struct { const metrics_thread = try spawnMetrics(gpa_allocator, config.current.metrics_port); errdefer metrics_thread.detach(); - const my_keypair = try getOrInitIdentity(allocator, logger); + const my_keypair = try getOrInitIdentity(allocator, logger.unscoped()); - const entrypoints = try getEntrypoints(logger); + const entrypoints = try getEntrypoints(logger.unscoped()); errdefer entrypoints.deinit(); - const ip_echo_data = try getMyDataFromIpEcho(logger, entrypoints.items); + const ip_echo_data = try getMyDataFromIpEcho(logger.unscoped(), entrypoints.items); const my_port = config.current.gossip.port; return .{ @@ -1207,7 +1214,7 @@ const AppBase = struct { /// Initialize an instance of GossipService and configure with CLI arguments fn initGossip( - logger: Logger, + logger_: Logger, my_keypair: KeyPair, exit: *Atomic(usize), entrypoints: []const SocketAddr, @@ -1215,6 +1222,7 @@ fn initGossip( gossip_host_ip: IpAddr, sockets: []const struct { tag: SocketTag, port: u16 }, ) !GossipService { + const logger = logger_.withScope(LOG_SCOPE); const gossip_port: u16 = config.current.gossip.port; logger.info().logf("gossip host: {any}", .{gossip_host_ip}); logger.info().logf("gossip port: {d}", .{gossip_port}); @@ -1233,7 +1241,7 @@ fn initGossip( my_keypair, entrypoints, exit, - logger, + logger.unscoped(), ); } @@ -1255,7 +1263,7 @@ fn startGossip( var manager = sig.utils.service_manager.ServiceManager.init( allocator, - app_base.logger, + app_base.logger.unscoped(), &app_base.exit, "gossip", .{}, @@ -1270,7 +1278,7 @@ fn startGossip( app_base.my_keypair, // TODO: consider security implication of passing keypair by value app_base.entrypoints.items, &app_base.counter, - app_base.logger, + app_base.logger.unscoped(), ); try manager.defers.deferCall(GossipService.deinit, .{service}); @@ -1293,9 +1301,10 @@ fn runGossipWithConfigValues(gossip_service: *GossipService) !void { /// determine our shred version and ip. in the solana-labs client, the shred version /// comes from the snapshot, and ip echo is only used to validate it. fn getMyDataFromIpEcho( - logger: Logger, + logger_: Logger, entrypoints: []SocketAddr, ) !struct { shred_version: u16, ip: IpAddr } { + const logger = logger_.withScope(LOG_SCOPE); var my_ip_from_entrypoint: ?IpAddr = null; const my_shred_version = loop: for (entrypoints) |entrypoint| { if (requestIpEcho(gpa_allocator, entrypoint.toAddress(), .{})) |response| { @@ -1364,7 +1373,8 @@ fn resolveSocketAddr(entrypoint: []const u8, logger: Logger) !SocketAddr { return socket_addr; } -fn getEntrypoints(logger: Logger) !std.ArrayList(SocketAddr) { +fn getEntrypoints(logger_: Logger) !std.ArrayList(SocketAddr) { + const logger = logger_.withScope(LOG_SCOPE); var entrypoints = std.ArrayList(SocketAddr).init(gpa_allocator); errdefer entrypoints.deinit(); @@ -1385,7 +1395,7 @@ fn getEntrypoints(logger: Logger) !std.ArrayList(SocketAddr) { for (config.current.gossip.entrypoints) |entrypoint| { const socket_addr = SocketAddr.parse(entrypoint) catch brk: { - break :brk try resolveSocketAddr(entrypoint, logger); + break :brk try resolveSocketAddr(entrypoint, logger.unscoped()); }; const gop = try entrypoint_set.getOrPut(socket_addr); @@ -1429,7 +1439,7 @@ const LoadedSnapshot = struct { fn loadSnapshot( allocator: Allocator, - logger: Logger, + logger_: Logger, /// optional service to download a fresh snapshot from gossip. if null, will read from the snapshot_dir maybe_gossip_service: ?*GossipService, /// whether to validate the snapshot account data against the metadata @@ -1437,6 +1447,7 @@ fn loadSnapshot( /// optional geyser to write snapshot data to geyser_writer: ?*GeyserWriter, ) !*LoadedSnapshot { + const logger = logger_.withScope(@typeName(@This())); const result = try allocator.create(LoadedSnapshot); errdefer allocator.destroy(result); result.allocator = allocator; @@ -1448,7 +1459,7 @@ fn loadSnapshot( var snapshot_dir = try std.fs.cwd().makeOpenPath(snapshot_dir_str, .{ .iterate = true }); defer snapshot_dir.close(); - var all_snapshot_fields, const snapshot_files = try getOrDownloadSnapshots(allocator, logger, maybe_gossip_service, .{ + var all_snapshot_fields, const snapshot_files = try getOrDownloadSnapshots(allocator, logger.unscoped(), maybe_gossip_service, .{ .snapshot_dir = snapshot_dir, .force_unpack_snapshot = config.current.accounts_db.force_unpack_snapshot, .force_new_snapshot_download = config.current.accounts_db.force_new_snapshot_download, @@ -1480,7 +1491,7 @@ fn loadSnapshot( result.accounts_db = try AccountsDB.init(.{ .allocator = allocator, - .logger = logger, + .logger = logger.unscoped(), .snapshot_dir = snapshot_dir, .geyser_writer = geyser_writer, .gossip_view = if (maybe_gossip_service) |service| AccountsDB.GossipView.fromService(service) else null, @@ -1600,7 +1611,7 @@ fn downloadSnapshot() !void { try downloadSnapshotsFromGossip( gpa_allocator, - logger, + logger.unscoped(), if (trusted_validators) |trusted| trusted.items else null, &gossip_service, snapshot_dir, @@ -1627,7 +1638,7 @@ fn getTrustedValidators(allocator: Allocator) !?std.ArrayList(Pubkey) { fn getOrDownloadSnapshots( allocator: Allocator, - logger: Logger, + logger_: Logger, gossip_service: ?*GossipService, // accounts_db_config: config.AccountsDBConfig, options: struct { @@ -1638,6 +1649,7 @@ fn getOrDownloadSnapshots( min_snapshot_download_speed_mbs: usize, }, ) !struct { AllSnapshotFields, SnapshotFiles } { + const logger = logger_.withScope(LOG_SCOPE); // arg parsing const snapshot_dir = options.snapshot_dir; const force_unpack_snapshot = options.force_unpack_snapshot; @@ -1667,7 +1679,7 @@ fn getOrDownloadSnapshots( const min_mb_per_sec = options.min_snapshot_download_speed_mbs; try downloadSnapshotsFromGossip( allocator, - logger, + logger.unscoped(), if (trusted_validators) |trusted| trusted.items else null, gossip_service orelse return error.SnapshotsNotFoundAndNoGossipService, snapshot_dir, @@ -1723,7 +1735,7 @@ fn getOrDownloadSnapshots( defer archive_file.close(); try parallelUnpackZstdTarBall( allocator, - logger, + logger.unscoped(), archive_file, snapshot_dir, n_threads_snapshot_unpack, @@ -1742,7 +1754,7 @@ fn getOrDownloadSnapshots( try parallelUnpackZstdTarBall( allocator, - logger, + logger.unscoped(), archive_file, snapshot_dir, n_threads_snapshot_unpack, @@ -1756,7 +1768,7 @@ fn getOrDownloadSnapshots( timer.reset(); logger.info().log("reading snapshot metadata..."); - const snapshots = try AllSnapshotFields.fromFiles(allocator, logger, snapshot_dir, snapshot_files); + const snapshots = try AllSnapshotFields.fromFiles(allocator, logger.unscoped(), snapshot_dir, snapshot_files); logger.info().logf("read snapshot metdata in {s}", .{std.fmt.fmtDuration(timer.read())}); return .{ snapshots, snapshot_files }; diff --git a/src/gossip/dump_service.zig b/src/gossip/dump_service.zig index 0c109ddc5..12ae9b036 100644 --- a/src/gossip/dump_service.zig +++ b/src/gossip/dump_service.zig @@ -6,14 +6,14 @@ const Atomic = std.atomic.Value; const SignedGossipData = sig.gossip.data.SignedGossipData; const GossipTable = sig.gossip.table.GossipTable; const Duration = sig.time.Duration; -const Logger = sig.trace.log.Logger; +const ScopedLogger = sig.trace.log.ScopedLogger; const RwMux = sig.sync.mux.RwMux; pub const DUMP_INTERVAL = Duration.fromSecs(10); pub const GossipDumpService = struct { allocator: Allocator, - logger: Logger, + logger: ScopedLogger(@typeName(Self)), gossip_table_rw: *RwMux(GossipTable), counter: *Atomic(usize), diff --git a/src/gossip/service.zig b/src/gossip/service.zig index 11ce593f9..86af37acb 100644 --- a/src/gossip/service.zig +++ b/src/gossip/service.zig @@ -18,6 +18,7 @@ const UdpSocket = network.Socket; const Pubkey = sig.core.Pubkey; const Hash = sig.core.Hash; const Logger = sig.trace.log.Logger; +const ScopedLogger = sig.trace.log.ScopedLogger; const Packet = sig.net.Packet; const EchoServer = sig.net.echo.Server; const SocketAddr = sig.net.SocketAddr; @@ -164,7 +165,7 @@ pub const GossipService = struct { /// only be read by that thread, or it needs a synchronization mechanism. entrypoints: ArrayList(Entrypoint), ping_cache_rw: RwMux(*PingCache), - logger: Logger, + logger: ScopedLogger(@typeName(GossipService)), thread_pool: *ThreadPool, echo_server: EchoServer, @@ -186,6 +187,7 @@ pub const GossipService = struct { counter: *Atomic(usize), logger: Logger, ) !Self { + const gossip_logger = logger.withScope(@typeName(GossipService)); var packet_incoming_channel = try Channel(Packet).create(allocator); errdefer packet_incoming_channel.deinit(); @@ -201,7 +203,7 @@ pub const GossipService = struct { .max_threads = @intCast(n_threads), .stack_size = 2 * 1024 * 1024, }); - logger.debug().logf("using n_threads in gossip: {}", .{n_threads}); + gossip_logger.debug().logf("using n_threads in gossip: {}", .{n_threads}); var gossip_table = try GossipTable.init(gossip_value_allocator, thread_pool); errdefer gossip_table.deinit(); @@ -226,7 +228,7 @@ pub const GossipService = struct { for (eps) |ep| entrypoint_list.appendAssumeCapacity(.{ .addr = ep }); } - const metrics = try GossipMetrics.init(logger); + const metrics = try GossipMetrics.init(gossip_logger); const ping_cache_ptr = try allocator.create(PingCache); ping_cache_ptr.* = try PingCache.init( @@ -259,7 +261,7 @@ pub const GossipService = struct { .entrypoints = entrypoint_list, .ping_cache_rw = RwMux(*PingCache).init(ping_cache_ptr), .echo_server = echo_server, - .logger = logger, + .logger = gossip_logger, .thread_pool = thread_pool, .metrics = metrics, }; @@ -342,7 +344,7 @@ pub const GossipService = struct { pub fn run(self: *Self, params: RunThreadsParams) !void { var manager = ServiceManager.init( self.allocator, - self.logger, + self.logger.unscoped(), &self.service_exit, "gossip", .{}, @@ -375,7 +377,7 @@ pub const GossipService = struct { try manager.spawn("gossip readSocket", socket_utils.readSocket, .{ self.gossip_socket, self.packet_incoming_channel, - self.logger, + self.logger.unscoped(), true, self.counter, }, true); @@ -390,7 +392,7 @@ pub const GossipService = struct { try manager.spawn("gossip sendSocket", socket_utils.sendSocket, .{ self.gossip_socket, self.packet_outgoing_channel, - self.logger, + self.logger.unscoped(), true, self.counter, }, true); @@ -398,7 +400,7 @@ pub const GossipService = struct { if (params.dump) { try manager.spawn("GossipDumpService", GossipDumpService.run, .{.{ .allocator = self.allocator, - .logger = self.logger, + .logger = self.logger.withScope(@typeName(GossipDumpService)), .gossip_table_rw = &self.gossip_table_rw, .counter = self.counter, }}, true); @@ -410,7 +412,7 @@ pub const GossipService = struct { gossip_value_allocator: std.mem.Allocator, packet: Packet, verified_incoming_channel: *Channel(GossipMessageWithEndpoint), - logger: Logger, + logger: ScopedLogger(@typeName(VerifyMessageEntry)), pub fn callback(self: *VerifyMessageEntry) !void { const packet = self.packet; @@ -466,7 +468,7 @@ pub const GossipService = struct { .gossip_value_allocator = self.gossip_value_allocator, .verified_incoming_channel = self.verified_incoming_channel, .packet = undefined, - .logger = self.logger, + .logger = self.logger.withScope(@typeName(VerifyMessageEntry)), }; } @@ -2023,7 +2025,8 @@ pub const GossipMetrics = struct { // logging details _logging_fields: struct { - logger: Logger, + // Scoping to GossipService instead of logging fields struct. + logger: ScopedLogger(@typeName(GossipService)), log_interval_micros: i64 = 10 * std.time.us_per_s, last_log: i64 = 0, last_logged_snapshot: StatsToLog = .{}, @@ -2060,7 +2063,7 @@ pub const GossipMetrics = struct { 5000, 10000, }; - pub fn init(logger: Logger) GetMetricError!Self { + pub fn init(logger: ScopedLogger(@typeName(GossipService))) GetMetricError!Self { var self: Self = undefined; const registry = globalRegistry(); std.debug.assert(try registry.initFields(&self) == 1); diff --git a/src/ledger/cleanup_service.zig b/src/ledger/cleanup_service.zig index e75609ff0..8696ff515 100644 --- a/src/ledger/cleanup_service.zig +++ b/src/ledger/cleanup_service.zig @@ -30,20 +30,24 @@ const DEFAULT_CLEANUP_SLOT_INTERVAL: u64 = 512; // a long wait incase a check occurs just before the interval has elapsed const LOOP_LIMITER = Duration.fromMillis(DEFAULT_CLEANUP_SLOT_INTERVAL * DEFAULT_MS_PER_SLOT / 10); +// The identifier for the scoped logger used in this file. +const LOG_SCOPE: []const u8 = "ledger.cleanup_service"; + pub fn run( - logger: sig.trace.Logger, + logger_: sig.trace.Logger, blockstore_reader: *BlockstoreReader, db: *BlockstoreDB, lowest_cleanup_slot: *sig.sync.RwMux(Slot), max_ledger_shreds: u64, exit: *AtomicBool, ) !void { + const logger = logger_.withScope(LOG_SCOPE); var last_purge_slot: Slot = 0; logger.info().log("Starting blockstore cleanup service"); while (!exit.load(.acquire)) { last_purge_slot = try cleanBlockstore( - logger, + logger.unscoped(), blockstore_reader, db, lowest_cleanup_slot, @@ -76,7 +80,7 @@ pub fn run( /// /// Analogous to the [`cleanup_ledger`](https://github.com/anza-xyz/agave/blob/6476d5fac0c30d1f49d13eae118b89be78fb15d2/ledger/src/blockstore_cleanup_service.rs#L198) in agave: pub fn cleanBlockstore( - logger: sig.trace.Logger, + logger_: sig.trace.Logger, blockstore_reader: *BlockstoreReader, db: *BlockstoreDB, lowest_cleanup_slot: *sig.sync.RwMux(Slot), @@ -84,6 +88,7 @@ pub fn cleanBlockstore( last_purge_slot: u64, purge_interval: u64, ) !Slot { + const logger = logger_.withScope(LOG_SCOPE); // // TODO: add back when max_root is implemented with consensus // const root = blockstore_reader.max_root.load(.acquire); // if (root - last_purge_slot <= purge_interval) return last_purge_slot; diff --git a/src/ledger/database/rocksdb.zig b/src/ledger/database/rocksdb.zig index c043ecb84..f2167e1af 100644 --- a/src/ledger/database/rocksdb.zig +++ b/src/ledger/database/rocksdb.zig @@ -10,22 +10,27 @@ const BytesRef = database.interface.BytesRef; const ColumnFamily = database.interface.ColumnFamily; const IteratorDirection = database.interface.IteratorDirection; const Logger = sig.trace.Logger; +const ScopedLogger = sig.trace.ScopedLogger; const ReturnType = sig.utils.types.ReturnType; const key_serializer = database.interface.key_serializer; const value_serializer = database.interface.value_serializer; +// The identifier for the scoped logger used in this file. +const LOG_SCOPE: []const u8 = "rocksdb"; + pub fn RocksDB(comptime column_families: []const ColumnFamily) type { return struct { allocator: Allocator, db: rocks.DB, - logger: Logger, + logger: ScopedLogger(LOG_SCOPE), cf_handles: []const rocks.ColumnFamilyHandle, path: []const u8, const Self = @This(); - pub fn open(allocator: Allocator, logger: Logger, path: []const u8) Error!Self { + pub fn open(allocator: Allocator, logger_: Logger, path: []const u8) Error!Self { + const logger = logger_.withScope(LOG_SCOPE); logger.info().log("Initializing RocksDB"); const owned_path = try allocator.dupe(u8, path); @@ -277,7 +282,7 @@ pub fn RocksDB(comptime column_families: []const ColumnFamily) type { return struct { allocator: Allocator, inner: rocks.Iterator, - logger: Logger, + logger: ScopedLogger(LOG_SCOPE), /// Calling this will free all slices returned by the iterator pub fn deinit(self: *@This()) void { @@ -333,7 +338,7 @@ pub fn RocksDB(comptime column_families: []const ColumnFamily) type { }; } -fn callRocks(logger: Logger, comptime func: anytype, args: anytype) ReturnType(@TypeOf(func)) { +fn callRocks(logger: ScopedLogger(LOG_SCOPE), comptime func: anytype, args: anytype) ReturnType(@TypeOf(func)) { var err_str: ?rocks.Data = null; return @call(.auto, func, args ++ .{&err_str}) catch |e| { logger.err().logf("{} - {s}", .{ e, err_str.? }); diff --git a/src/ledger/reader.zig b/src/ledger/reader.zig index 41e3b0d00..0a4e31d59 100644 --- a/src/ledger/reader.zig +++ b/src/ledger/reader.zig @@ -13,6 +13,7 @@ const Entry = sig.core.Entry; const Hash = sig.core.Hash; const Histogram = sig.prometheus.Histogram; const Logger = sig.trace.Logger; +const ScopedLogger = sig.trace.ScopedLogger; const Pubkey = sig.core.Pubkey; const Registry = sig.prometheus.Registry; const RwMux = sig.sync.RwMux; @@ -48,7 +49,7 @@ const DEFAULT_TICKS_PER_SECOND = sig.core.time.DEFAULT_TICKS_PER_SECOND; pub const BlockstoreReader = struct { allocator: Allocator, - logger: Logger, + logger: ScopedLogger(@typeName(Self)), db: BlockstoreDB, // TODO: change naming to 'highest_slot_cleaned' lowest_cleanup_slot: *RwMux(Slot), @@ -69,7 +70,7 @@ pub const BlockstoreReader = struct { ) !Self { return .{ .allocator = allocator, - .logger = logger, + .logger = logger.withScope(@typeName(Self)), .db = db, .rpc_api_metrics = try registry.initStruct(BlockstoreRpcApiMetrics), .metrics = try registry.initStruct(BlockstoreReaderMetrics), diff --git a/src/ledger/result_writer.zig b/src/ledger/result_writer.zig index ff31b339b..b48e3e830 100644 --- a/src/ledger/result_writer.zig +++ b/src/ledger/result_writer.zig @@ -10,6 +10,7 @@ const ArrayList = std.ArrayList; const Hash = sig.core.Hash; const Histogram = sig.prometheus.Histogram; const Logger = sig.trace.Logger; +const ScopedLogger = sig.trace.ScopedLogger; const Pubkey = sig.core.Pubkey; const RwMux = sig.sync.RwMux; const Signature = sig.core.Signature; @@ -30,7 +31,7 @@ const schema = ledger.schema.schema; /// or reaching consensus on a block. pub const LedgerResultWriter = struct { allocator: Allocator, - logger: Logger, + logger: ScopedLogger(@typeName(Self)), db: BlockstoreDB, // TODO: change naming to 'highest_slot_cleaned' lowest_cleanup_slot: *RwMux(Slot), @@ -49,7 +50,7 @@ pub const LedgerResultWriter = struct { ) !LedgerResultWriter { return .{ .allocator = allocator, - .logger = logger, + .logger = logger.withScope(@typeName(Self)), .db = db, .lowest_cleanup_slot = lowest_cleanup_slot, .max_root = max_root, diff --git a/src/ledger/shred_inserter/merkle_root_checks.zig b/src/ledger/shred_inserter/merkle_root_checks.zig index 1047af9c7..2bab97408 100644 --- a/src/ledger/shred_inserter/merkle_root_checks.zig +++ b/src/ledger/shred_inserter/merkle_root_checks.zig @@ -9,7 +9,7 @@ const Allocator = std.mem.Allocator; const ErasureSetId = sig.ledger.shred.ErasureSetId; const Hash = sig.core.Hash; -const Logger = sig.trace.Logger; +const ScopedLogger = sig.trace.ScopedLogger; const Slot = sig.core.Slot; const CodeShred = ledger.shred.CodeShred; @@ -27,7 +27,7 @@ const newlinesToSpaces = sig.utils.fmt.newlinesToSpaces; pub const MerkleRootValidator = struct { allocator: Allocator, - logger: Logger, + logger: ScopedLogger(@typeName(Self)), shreds: ShredWorkingStore, duplicate_shreds: DuplicateShredsWorkingStore, @@ -36,7 +36,7 @@ pub const MerkleRootValidator = struct { pub fn init(pending_state: *PendingInsertShredsState) Self { return .{ .allocator = pending_state.allocator, - .logger = pending_state.logger, + .logger = pending_state.logger.withScope(@typeName(Self)), .shreds = pending_state.shreds(), .duplicate_shreds = pending_state.duplicateShreds(), }; @@ -88,9 +88,9 @@ pub const MerkleRootValidator = struct { }); } else { self.logger.err().logf(&newlinesToSpaces( - \\Shred {any} indiciated by merkle root meta {any} is - \\missing from blockstore. This should only happen in extreme cases where - \\blockstore cleanup has caught up to the root. Skipping the merkle root + \\Shred {any} indiciated by merkle root meta {any} is + \\missing from blockstore. This should only happen in extreme cases where + \\blockstore cleanup has caught up to the root. Skipping the merkle root \\consistency check ), .{ shred_id, merkle_root_meta }); return true; diff --git a/src/ledger/shred_inserter/shred_inserter.zig b/src/ledger/shred_inserter/shred_inserter.zig index 5a26fae30..b3536783d 100644 --- a/src/ledger/shred_inserter/shred_inserter.zig +++ b/src/ledger/shred_inserter/shred_inserter.zig @@ -52,7 +52,7 @@ const DEFAULT_TICKS_PER_SECOND = sig.core.time.DEFAULT_TICKS_PER_SECOND; pub const ShredInserter = struct { allocator: Allocator, - logger: sig.trace.Logger, + logger: sig.trace.ScopedLogger(@typeName(Self)), db: BlockstoreDB, lock: Mutex, max_root: Atomic(u64), // TODO shared @@ -68,7 +68,7 @@ pub const ShredInserter = struct { ) GetMetricError!Self { return .{ .allocator = allocator, - .logger = logger, + .logger = logger.withScope(@typeName(Self)), .db = db, .lock = .{}, .max_root = Atomic(u64).init(0), // TODO read this from the database @@ -166,7 +166,7 @@ pub const ShredInserter = struct { var total_timer = try Timer.start(); var state = try PendingInsertShredsState.init( self.allocator, - self.logger, + self.logger.unscoped(), &self.db, self.metrics, ); diff --git a/src/ledger/shred_inserter/working_state.zig b/src/ledger/shred_inserter/working_state.zig index 11edced48..112a9ecb5 100644 --- a/src/ledger/shred_inserter/working_state.zig +++ b/src/ledger/shred_inserter/working_state.zig @@ -76,7 +76,7 @@ const newlinesToSpaces = sig.utils.fmt.newlinesToSpaces; /// database and working sets. pub const PendingInsertShredsState = struct { allocator: Allocator, - logger: sig.trace.Logger, + logger: sig.trace.ScopedLogger(@typeName(Self)), db: *BlockstoreDB, write_batch: WriteBatch, just_inserted_shreds: AutoHashMap(ShredId, Shred), @@ -100,7 +100,7 @@ pub const PendingInsertShredsState = struct { return .{ .allocator = allocator, .db = db, - .logger = logger, + .logger = logger.withScope(@typeName(Self)), .write_batch = try db.initWriteBatch(), .just_inserted_shreds = AutoHashMap(ShredId, Shred).init(allocator), // TODO capacity = shreds.len .erasure_metas = SortedMap(ErasureSetId, WorkingEntry(ErasureMeta)).init(allocator), @@ -170,7 +170,7 @@ pub const PendingInsertShredsState = struct { pub fn shreds(self: *Self) ShredWorkingStore { return .{ - .logger = self.logger, + .logger = self.logger.withScope(@typeName(ShredWorkingStore)), .db = self.db, .just_inserted_shreds = &self.just_inserted_shreds, }; @@ -490,7 +490,7 @@ const ShredConflict = struct { }; pub const ShredWorkingStore = struct { - logger: sig.trace.Logger, + logger: sig.trace.ScopedLogger(@typeName(Self)), db: *BlockstoreDB, just_inserted_shreds: *const AutoHashMap(ShredId, Shred), diff --git a/src/net/socket_utils.zig b/src/net/socket_utils.zig index 6ef4cfa4e..a50811972 100644 --- a/src/net/socket_utils.zig +++ b/src/net/socket_utils.zig @@ -14,14 +14,18 @@ const UdpSocket = @import("zig-network").Socket; pub const SOCKET_TIMEOUT_US: usize = 1 * std.time.us_per_s; pub const PACKETS_PER_BATCH: usize = 64; +// The identifier for the scoped logger used in this file. +const LOG_SCOPE: []const u8 = "socket_utils"; + pub fn readSocket( socket_: UdpSocket, incoming_channel: *Channel(Packet), - logger: Logger, + logger_: Logger, comptime needs_exit_order: bool, counter: *Atomic(if (needs_exit_order) usize else bool), idx: if (needs_exit_order) usize else void, ) !void { + const logger = logger_.withScope(LOG_SCOPE); defer { logger.info().logf( "leaving with: {}, {}, {}", @@ -55,11 +59,12 @@ pub fn readSocket( pub fn sendSocket( socket: UdpSocket, outgoing_channel: *Channel(Packet), - logger: Logger, + logger_: Logger, comptime needs_exit_order: bool, counter: *Atomic(if (needs_exit_order) usize else bool), idx: if (needs_exit_order) usize else void, ) !void { + const logger = logger_.withScope(LOG_SCOPE); defer { if (needs_exit_order) { // exit the next service in the chain diff --git a/src/rpc/client.zig b/src/rpc/client.zig index 55f687fc0..35bc21909 100644 --- a/src/rpc/client.zig +++ b/src/rpc/client.zig @@ -10,6 +10,7 @@ const Signature = sig.core.Signature; const Request = sig.rpc.Request; const Response = sig.rpc.Response; const Logger = sig.trace.log.Logger; +const ScopedLogger = sig.trace.log.ScopedLogger; const Transaction = sig.core.transaction.Transaction; const ClusterType = sig.rpc.ClusterType; @@ -17,7 +18,9 @@ pub const Client = struct { http_endpoint: []const u8, http_client: std.http.Client, max_retries: usize, - logger: Logger, + logger: ScopedLogger(@typeName(Self)), + + const Self = @This(); pub const Options = struct { max_retries: usize = 0, @@ -37,7 +40,7 @@ pub const Client = struct { .http_endpoint = http_endpoint, .http_client = std.http.Client{ .allocator = allocator }, .max_retries = options.max_retries, - .logger = options.logger, + .logger = options.logger.withScope(@typeName(Client)), }; client.logVersion(allocator) catch |err| { diff --git a/src/shred_collector/repair_service.zig b/src/shred_collector/repair_service.zig index 1381ca73c..96b5c280a 100644 --- a/src/shred_collector/repair_service.zig +++ b/src/shred_collector/repair_service.zig @@ -19,6 +19,7 @@ const Gauge = sig.prometheus.Gauge; const GossipTable = sig.gossip.GossipTable; const HomogeneousThreadPool = sig.utils.thread.HomogeneousThreadPool; const Logger = sig.trace.Logger; +const ScopedLogger = sig.trace.ScopedLogger; const LruCacheCustom = sig.utils.lru.LruCacheCustom; const MultiSlotReport = shred_collector.shred_tracker.MultiSlotReport; const Nonce = sig.core.Nonce; @@ -48,7 +49,7 @@ pub const RepairService = struct { requester: RepairRequester, peer_provider: RepairPeerProvider, shred_tracker: *BasicShredTracker, - logger: Logger, + logger: ScopedLogger(@typeName(Self)), exit: *Atomic(bool), last_big_request_timestamp_ms: i64 = 0, /// memory to re-use across iterations. initialized to empty @@ -92,7 +93,7 @@ pub const RepairService = struct { .requester = requester, .peer_provider = peer_provider, .shred_tracker = shred_tracker, - .logger = logger, + .logger = logger.withScope(@typeName(Self)), .exit = exit, .report = MultiSlotReport.init(allocator), .thread_pool = RequestBatchThreadPool.init(allocator, NUM_REQUESTER_THREADS), @@ -247,7 +248,7 @@ pub const RepairService = struct { /// Signs and serializes repair requests. Sends them over the network. pub const RepairRequester = struct { allocator: Allocator, - logger: Logger, + logger: ScopedLogger(@typeName(Self)), random: Random, keypair: *const KeyPair, sender: SocketThread, @@ -274,7 +275,7 @@ pub const RepairRequester = struct { const sndr = try SocketThread.initSender(allocator, logger, udp_send_socket, exit); return .{ .allocator = allocator, - .logger = logger, + .logger = logger.withScope(@typeName(Self)), .random = random, .keypair = keypair, .sender = sndr, diff --git a/src/shred_collector/service.zig b/src/shred_collector/service.zig index 522cb97fc..aca30ca78 100644 --- a/src/shred_collector/service.zig +++ b/src/shred_collector/service.zig @@ -68,7 +68,7 @@ pub fn start( ) !ServiceManager { var service_manager = ServiceManager.init( deps.allocator, - deps.logger, + deps.logger.unscoped(), deps.exit, "shred collector", .{}, @@ -87,7 +87,7 @@ pub fn start( .allocator = deps.allocator, .keypair = deps.my_keypair, .exit = deps.exit, - .logger = deps.logger, + .logger = deps.logger.withScope(@typeName(ShredReceiver)), .repair_socket = repair_socket, .turbine_socket = turbine_socket, .unverified_shred_sender = unverified_shred_channel, @@ -120,7 +120,7 @@ pub fn start( const shred_tracker = try arena.create(BasicShredTracker); shred_tracker.* = try BasicShredTracker.init( conf.start_slot, - deps.logger, + deps.logger.unscoped(), deps.registry, ); @@ -131,7 +131,7 @@ pub fn start( .{ deps.allocator, deps.exit, - deps.logger, + deps.logger.unscoped(), deps.registry, verified_shred_channel, shred_tracker, @@ -152,7 +152,7 @@ pub fn start( ); const repair_requester = try RepairRequester.init( deps.allocator, - deps.logger, + deps.logger.unscoped(), deps.random, deps.registry, deps.my_keypair, @@ -163,7 +163,7 @@ pub fn start( try service_manager.defers.deferCall(RepairService.deinit, .{repair_svc}); repair_svc.* = try RepairService.init( deps.allocator, - deps.logger, + deps.logger.unscoped(), deps.exit, deps.registry, repair_requester, diff --git a/src/shred_collector/shred_processor.zig b/src/shred_collector/shred_processor.zig index cf79e961e..453ccb88d 100644 --- a/src/shred_collector/shred_processor.zig +++ b/src/shred_collector/shred_processor.zig @@ -20,11 +20,14 @@ const ShredInserter = sig.ledger.ShredInserter; const SlotOutOfBounds = shred_collector.shred_tracker.SlotOutOfBounds; const VariantCounter = sig.prometheus.VariantCounter; +// The identifier for the scoped logger used in this file. +const LOG_SCOPE = "shred_processor"; + /// Analogous to [WindowService](https://github.com/anza-xyz/agave/blob/aa2f078836434965e1a5a03af7f95c6640fe6e1e/core/src/window_service.rs#L395) pub fn runShredProcessor( allocator: Allocator, exit: *Atomic(bool), - logger: Logger, + logger_: Logger, registry: *Registry(.{}), // shred verifier --> me verified_shred_receiver: *Channel(Packet), @@ -32,6 +35,7 @@ pub fn runShredProcessor( shred_inserter_: ShredInserter, leader_schedule: sig.core.leader_schedule.SlotLeaderProvider, ) !void { + const logger = logger_.withScope(LOG_SCOPE); var shred_inserter = shred_inserter_; var shreds: ArrayListUnmanaged(Shred) = .{}; var is_repaired: ArrayListUnmanaged(bool) = .{}; diff --git a/src/shred_collector/shred_receiver.zig b/src/shred_collector/shred_receiver.zig index 83e556dd8..c0fb41e71 100644 --- a/src/shred_collector/shred_receiver.zig +++ b/src/shred_collector/shred_receiver.zig @@ -14,7 +14,7 @@ const Socket = network.Socket; const Channel = sig.sync.Channel; const Counter = sig.prometheus.Counter; const Histogram = sig.prometheus.Histogram; -const Logger = sig.trace.Logger; +const ScopedLogger = sig.trace.ScopedLogger; const Packet = sig.net.Packet; const Ping = sig.gossip.Ping; const Pong = sig.gossip.Pong; @@ -30,7 +30,7 @@ pub const ShredReceiver = struct { allocator: Allocator, keypair: *const KeyPair, exit: *Atomic(bool), - logger: Logger, + logger: ScopedLogger(@typeName(Self)), repair_socket: Socket, turbine_socket: Socket, /// me --> shred verifier @@ -48,17 +48,17 @@ pub const ShredReceiver = struct { errdefer self.logger.err().log("error in shred receiver"); var response_sender = try SocketThread - .initSender(self.allocator, self.logger, self.repair_socket, self.exit); + .initSender(self.allocator, self.logger.unscoped(), self.repair_socket, self.exit); defer response_sender.deinit(self.allocator); var repair_receiver = try SocketThread - .initReceiver(self.allocator, self.logger, self.repair_socket, self.exit); + .initReceiver(self.allocator, self.logger.unscoped(), self.repair_socket, self.exit); defer repair_receiver.deinit(self.allocator); var turbine_receivers: [NUM_TVU_RECEIVERS]SocketThread = undefined; for (0..NUM_TVU_RECEIVERS) |i| { turbine_receivers[i] = try SocketThread.initReceiver( self.allocator, - self.logger, + self.logger.unscoped(), self.turbine_socket, self.exit, ); diff --git a/src/shred_collector/shred_tracker.zig b/src/shred_collector/shred_tracker.zig index 2f2c50ea1..1a07dac72 100644 --- a/src/shred_collector/shred_tracker.zig +++ b/src/shred_collector/shred_tracker.zig @@ -22,7 +22,7 @@ pub const Range = struct { /// once it is implemented. This struct tracks shreds linearly with no regard /// for forking. The Blockstore will fix this by tracking forks. pub const BasicShredTracker = struct { - logger: sig.trace.Logger, + logger: sig.trace.ScopedLogger(@typeName(Self)), mux: Mutex = .{}, /// The slot that this struct was initialized with at index 0 start_slot: ?Slot, @@ -51,7 +51,7 @@ pub const BasicShredTracker = struct { return .{ .start_slot = slot, .current_bottom_slot = slot orelse 0, - .logger = logger, + .logger = logger.withScope(@typeName(Self)), .metrics = try registry.initStruct(Metrics), }; } diff --git a/src/time/estimate.zig b/src/time/estimate.zig index 95c62b6ca..07ee22dc5 100644 --- a/src/time/estimate.zig +++ b/src/time/estimate.zig @@ -1,10 +1,9 @@ const std = @import("std"); const sig = @import("../sig.zig"); -const Logger = sig.trace.Logger; // TODO: change to writer interface when logger has improved pub fn printTimeEstimate( - logger: Logger, + logger: anytype, // timer should be started at the beginning of the loop timer: *sig.time.Timer, total: usize, diff --git a/src/trace/lib.zig b/src/trace/lib.zig index 0850e3d1d..c0bdf60ab 100644 --- a/src/trace/lib.zig +++ b/src/trace/lib.zig @@ -4,6 +4,7 @@ pub const logfmt = @import("logfmt.zig"); pub const entry = @import("entry.zig"); pub const Logger = log.Logger; +pub const ScopedLogger = log.ScopedLogger; pub const Level = level.Level; pub const DirectPrintLogger = log.DirectPrintLogger; pub const ChannelPrintLogger = log.ChannelPrintLogger; diff --git a/src/transaction_sender/leader_info.zig b/src/transaction_sender/leader_info.zig index 5c3979e43..043918e61 100644 --- a/src/transaction_sender/leader_info.zig +++ b/src/transaction_sender/leader_info.zig @@ -16,6 +16,7 @@ const RpcClient = sig.rpc.Client; const LeaderScheduleCache = sig.core.leader_schedule.LeaderScheduleCache; const EpochSchedule = sig.core.epoch_schedule.EpochSchedule; const Logger = sig.trace.log.Logger; +const ScopedLogger = sig.trace.log.ScopedLogger; const Config = sig.transaction_sender.service.Config; const LeaderSchedule = sig.core.leader_schedule.LeaderSchedule; @@ -29,12 +30,14 @@ const LeaderSchedule = sig.core.leader_schedule.LeaderSchedule; pub const LeaderInfo = struct { allocator: Allocator, config: Config, - logger: Logger, + logger: ScopedLogger(@typeName(Self)), rpc_client: RpcClient, leader_schedule_cache: LeaderScheduleCache, leader_addresses_cache: std.AutoArrayHashMapUnmanaged(Pubkey, SocketAddr), gossip_table_rw: *RwMux(GossipTable), + const Self = @This(); + pub fn init( allocator: Allocator, logger: Logger, @@ -45,7 +48,7 @@ pub const LeaderInfo = struct { return .{ .allocator = allocator, .config = config, - .logger = logger, + .logger = logger.withScope(@typeName(Self)), .rpc_client = RpcClient.init( allocator, config.cluster, diff --git a/src/transaction_sender/mock_transfer_generator.zig b/src/transaction_sender/mock_transfer_generator.zig index 5b9ab24de..074e2f241 100644 --- a/src/transaction_sender/mock_transfer_generator.zig +++ b/src/transaction_sender/mock_transfer_generator.zig @@ -3,6 +3,7 @@ const sig = @import("../sig.zig"); const types = @import("../rpc/types.zig"); const Logger = sig.trace.Logger; +const ScopedLogger = sig.trace.ScopedLogger; const AtomicBool = std.atomic.Value(bool); const KeyPair = std.crypto.sign.Ed25519.KeyPair; const Channel = sig.sync.Channel; @@ -56,9 +57,11 @@ pub const MockTransferService = struct { sender: *Channel(TransactionInfo), rpc_client: RpcClient, exit: *AtomicBool, - logger: Logger, + logger: ScopedLogger(@typeName(Self)), accounts: MockAccounts = MockAccounts.DEFAULT, + const Self = @This(); + pub fn init( allocator: std.mem.Allocator, sender: *Channel(TransactionInfo), @@ -71,7 +74,7 @@ pub const MockTransferService = struct { .sender = sender, .rpc_client = rpc_client, .exit = exit, - .logger = logger, + .logger = logger.withScope(@typeName(Self)), }; } diff --git a/src/transaction_sender/service.zig b/src/transaction_sender/service.zig index d065232d8..e34194af8 100644 --- a/src/transaction_sender/service.zig +++ b/src/transaction_sender/service.zig @@ -24,6 +24,7 @@ const Duration = sig.time.Duration; const Instant = sig.time.Instant; const Timer = sig.time.Timer; const Logger = sig.trace.log.Logger; +const ScopedLogger = sig.trace.log.ScopedLogger; const LeaderInfo = sig.transaction_sender.LeaderInfo; const TransactionInfo = sig.transaction_sender.TransactionInfo; const TransactionPool = sig.transaction_sender.TransactionPool; @@ -49,7 +50,9 @@ pub const Service = struct { send_channel: *Channel(Packet), receive_channel: *Channel(TransactionInfo), exit: *AtomicBool, - logger: Logger, + logger: ScopedLogger(@typeName(Self)), + + const Self = @This(); pub fn init( allocator: std.mem.Allocator, @@ -81,7 +84,7 @@ pub const Service = struct { ), .send_channel = try Channel(Packet).create(allocator), .receive_channel = receive_channel, - .logger = logger, + .logger = logger.withScope(@typeName(Self)), .exit = exit, }; } @@ -93,7 +96,7 @@ pub const Service = struct { .{ self.send_socket, self.send_channel, - self.logger, + self.logger.unscoped(), false, self.exit, {}, @@ -162,7 +165,7 @@ pub const Service = struct { var rpc_client = RpcClient.init( self.allocator, self.config.cluster, - .{ .max_retries = self.config.rpc_retries, .logger = self.logger }, + .{ .max_retries = self.config.rpc_retries, .logger = self.logger.unscoped() }, ); defer rpc_client.deinit(); @@ -180,7 +183,7 @@ pub const Service = struct { self.transaction_pool.purge(); self.metrics.transactions_pending.set(self.transaction_pool.count()); - self.metrics.log(self.logger); + self.metrics.log(self.logger.unscoped()); } } diff --git a/src/utils/allocators.zig b/src/utils/allocators.zig index 37baac2bb..589cf1095 100644 --- a/src/utils/allocators.zig +++ b/src/utils/allocators.zig @@ -476,7 +476,7 @@ pub fn RecycleFBA(config: struct { /// thread safe disk memory allocator pub const DiskMemoryAllocator = struct { dir: std.fs.Dir, - logger: sig.trace.Logger, + logger: sig.trace.ScopedLogger(@typeName(Self)), /// The amount of memory mmap'd for a particular allocation will be `file_size * mmap_ratio`. /// See `alignedFileSize` and its usages to understand the relationship between an allocation /// size, and the size of a file, and by extension, how this then relates to the allocated diff --git a/src/utils/service.zig b/src/utils/service.zig index be3a66c2b..84f92df92 100644 --- a/src/utils/service.zig +++ b/src/utils/service.zig @@ -9,6 +9,7 @@ const Atomic = std.atomic.Value; const Lazy = sig.utils.lazy.Lazy; const Logger = sig.trace.Logger; +const ScopedLogger = sig.trace.ScopedLogger; /// High level manager for long-running threads and the state /// shared by those threads. @@ -16,7 +17,7 @@ const Logger = sig.trace.Logger; /// You can add threads or state, then await all threads and /// clean up their state. pub const ServiceManager = struct { - logger: Logger, + logger: ScopedLogger(@typeName(Self)), /// Threads to join. threads: ArrayListUnmanaged(std.Thread), exit: *Atomic(bool), @@ -39,7 +40,7 @@ pub const ServiceManager = struct { default_spawn_config: std.Thread.SpawnConfig, ) Self { return .{ - .logger = logger, + .logger = logger.withScope(@typeName(Self)), .exit = exit, .threads = .{}, .arena = ArenaAllocator.init(backing_allocator), @@ -54,7 +55,7 @@ pub const ServiceManager = struct { /// The function may be restarted periodically, according to default_run_config. pub fn spawn( self: *Self, - name: ?[]const u8, + comptime name: []const u8, comptime function: anytype, args: anytype, comptime needs_exit_order: bool, @@ -82,7 +83,7 @@ pub const ServiceManager = struct { /// The function may be restarted periodically, according to the provided config. fn spawnCustom( self: *Self, - maybe_name: ?[]const u8, + comptime name: []const u8, run_config: ?RunConfig, spawn_config: std.Thread.SpawnConfig, comptime function: anytype, @@ -96,14 +97,14 @@ pub const ServiceManager = struct { .{ self.logger, self.exit, - maybe_name, + name, run_config orelse self.default_run_config, function, args, }, ); - if (maybe_name) |name| thread.setName(name) catch {}; + thread.setName(name) catch {}; try self.threads.append(allocator, thread); } @@ -111,7 +112,7 @@ pub const ServiceManager = struct { /// in the shutdown chain to the arguments. fn spawnCustomIdx( self: *Self, - maybe_name: ?[]const u8, + comptime name: []const u8, run_config: ?RunConfig, spawn_config: std.Thread.SpawnConfig, comptime function: anytype, @@ -125,14 +126,14 @@ pub const ServiceManager = struct { .{ self.logger, self.exit, - maybe_name, + name, run_config orelse self.default_run_config, function, args ++ .{(self.threads.items.len + 1)}, }, ); - if (maybe_name) |name| thread.setName(name) catch {}; + thread.setName(name) catch {}; try self.threads.append(allocator, thread); } @@ -186,7 +187,7 @@ pub const ReturnHandler = struct { /// It's guaranteed to run at least once in order to not race initialization with /// the `exit` flag. pub fn runService( - logger: Logger, + logger: ScopedLogger(@typeName(ServiceManager)), exit: *Atomic(bool), maybe_name: ?[]const u8, config: RunConfig, diff --git a/src/utils/tar.zig b/src/utils/tar.zig index a43ed4c00..78f7f2558 100644 --- a/src/utils/tar.zig +++ b/src/utils/tar.zig @@ -9,6 +9,9 @@ const printTimeEstimate = sig.time.estimate.printTimeEstimate; /// Unpack tarball is related to accounts_db so we reuse it's progress bar const TAR_PROGRESS_UPDATES = @import("../accountsdb/db.zig").DB_LOG_RATE; +// The identifier for the scoped logger used in this file. +const LOG_SCOPE = "utils.tar"; + fn stripComponents(path: []const u8, count: u32) ![]const u8 { var i: usize = 0; var c = count; @@ -64,12 +67,13 @@ const Logger = @import("../trace/log.zig").Logger; pub fn parallelUntarToFileSystem( allocator: std.mem.Allocator, - logger: Logger, + logger_: Logger, dir: std.fs.Dir, reader: anytype, n_threads: usize, n_files_estimate: ?usize, ) !void { + const logger = logger_.withScope(LOG_SCOPE); var thread_pool = ThreadPool.init(.{ .max_threads = @intCast(n_threads), });