diff --git a/gossip_map/src/lib.rs b/gossip_map/src/lib.rs index d8c5bcd..cfd399a 100644 --- a/gossip_map/src/lib.rs +++ b/gossip_map/src/lib.rs @@ -1,8 +1,6 @@ -#![allow(dead_code)] use std::collections::HashMap; use std::fs::File; -use std::io; -use std::io::{BufReader, Error, ErrorKind}; +use std::io::{BufReader, Read, Seek, SeekFrom}; use fundamentals::core::FromWire; use fundamentals::types::ShortChannelId; @@ -13,7 +11,6 @@ mod bolt7; mod flags; mod gossip_stor_wiregen; mod gossip_types; -mod peekable_stream; use crate::bolt7::{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement}; use crate::flags::{ @@ -86,29 +83,38 @@ impl GossipMap { fn refresh(&mut self) -> anyhow::Result<()> { let gossip_store = File::open(self.path.clone())?; - let stream = BufReader::new(gossip_store); - let mut stream = peekable_stream::PeekableStream::new(stream); + let mut stream = BufReader::new(gossip_store); let version = u8::from_wire(&mut stream)? as u16; if (version & GOSSIP_STORE_MAJOR_VERSION_MASK) != GOSSIP_STORE_MAJOR_VERSION { anyhow::bail!("Invalid gossip store version {version}"); } self.version = version as u8; - + log::info!("Gossip map version: v{}", self.version); let mut last_short_channel_id: Option = None; + while let Ok(header) = GossipStoredHeader::from_wire(&mut stream) { log::debug!("header {:?}", header); if (header.flag() & flags::GOSSIP_STORE_LEN_DELETED_BIT) != 0 { - log::debug!("flags::GOSSIP_STORE_LEN_DELETED_BIT"); + log::warn!("flags::GOSSIP_STORE_LEN_DELETED_BIT"); + // Consume the buffer + let mut inner_stream: Vec = vec![0; header.len.into()]; + stream.read_exact(&mut inner_stream)?; continue; } - let typmsg = stream.peek_msgtype()?; + + let typmsg = u16::from_wire(&mut stream)?; + // fake lookup, because the message will decode the type. + stream.seek(SeekFrom::Current(-2))?; log::info!("type: {typmsg}"); match typmsg { // channel announcement! 256 => { - let channel_announcement = ChannelAnnouncement::from_wire(&mut stream)?; - log::trace!("{:?}", channel_announcement); + log::info!("channel announcement"); + let mut inner_stream: Vec = vec![0; header.len.into()]; + stream.read_exact(&mut inner_stream)?; + let mut inner_stream = inner_stream.as_slice(); + let channel_announcement = ChannelAnnouncement::from_wire(&mut inner_stream)?; let node_one = GossipNodeId::from_bytes(&channel_announcement.node_id_1.to_vec()).unwrap(); let node_two = @@ -138,8 +144,11 @@ impl GossipMap { unimplemented!(); } WIRE_GOSSIP_STORE_CHANNEL_AMOUNT => { - let channel_amount = GossipStoreChannelAmount::from_wire(&mut stream)?; - log::trace!("{:?}", channel_amount); + log::info!("gossip store amount"); + let mut inner_stream: Vec = vec![0; header.len.into()]; + stream.read_exact(&mut inner_stream)?; + let mut inner_stream = inner_stream.as_slice(); + let channel_amount = GossipStoreChannelAmount::from_wire(&mut inner_stream)?; //FIXME: remove the unwrap(). let channel = self .channels @@ -162,7 +171,15 @@ impl GossipMap { break; } 257 => { - let node_announcement = NodeAnnouncement::from_wire(&mut stream).unwrap(); + let mut inner_stream: Vec = vec![0; header.len.into()]; + stream.read_exact(&mut inner_stream)?; + let mut inner_stream = inner_stream.as_slice(); + log::info!( + "buffer len {} vs expected {}", + inner_stream.len(), + header.len + ); + let node_announcement = NodeAnnouncement::from_wire(&mut inner_stream).unwrap(); log::trace!("{:?}", node_announcement); let node_id = GossipNodeId::from_bytes(&node_announcement.node_id)?; if !self.nodes.contains_key(&node_id) { @@ -172,7 +189,10 @@ impl GossipMap { } 258 => { log::info!("found channel update"); - let channel_update = ChannelUpdate::from_wire(&mut stream)?; + let mut inner_stream: Vec = vec![0; header.len.into()]; + stream.read_exact(&mut inner_stream)?; + let mut inner_stream = inner_stream.as_slice(); + let channel_update = ChannelUpdate::from_wire(&mut inner_stream)?; if let Some(channel) = self.channels.get_mut(&channel_update.short_channel_id) { log::info!( "found channel with short id `{}`", @@ -184,7 +204,15 @@ impl GossipMap { .insert(channel_update.short_channel_id, channel_update); } } - _ => anyhow::bail!("Unexpected message with type `{typmsg}`"), + _ => { + log::error!( + "Unexpected message with type `{typmsg}`, breaking: size of nodes {}", + self.nodes.len() + ); + let mut inner_stream: Vec = vec![0; header.len.into()]; + stream.read_exact(&mut inner_stream)?; + continue; + } } } log::info!("{:#?}", self.nodes); @@ -213,7 +241,7 @@ mod tests { #[test] fn read_gossipmap_from_file() { init(); - let path = "/run/media/vincent/VincentSSD/.lightning/signet/gossip_store"; + let path = "/run/media/vincent/VincentSSD/.lightning/testnet/gossip_store"; let pubkey = "03b39d1ddf13ce486de74e9e44e0538f960401a9ec75534ba9cfe4100d65426880"; let map = GossipMap::from_file(path); assert!(map.is_ok(), "{:?}", map); diff --git a/gossip_map/src/peekable_stream.rs b/gossip_map/src/peekable_stream.rs deleted file mode 100644 index 915a6c7..0000000 --- a/gossip_map/src/peekable_stream.rs +++ /dev/null @@ -1,79 +0,0 @@ -use std::io; -use std::io::{Read, Seek, SeekFrom}; - -pub struct PeekableStream { - inner: R, - peek_buffer: Vec, -} - -impl PeekableStream { - pub fn new(inner: R) -> Self { - PeekableStream { - inner, - peek_buffer: Vec::new(), - } - } - - // Peek and return a u16 without consuming it - // Assumes big-endian byte order for this example; adjust as necessary - pub fn peek_msgtype(&mut self) -> io::Result { - while self.peek_buffer.len() < 2 { - let mut buf = [0; 1]; - let n = self.inner.read(&mut buf)?; - if n == 0 { - return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "EOF reached")); - } - self.peek_buffer.push(buf[0]); - } - - Ok(((self.peek_buffer[0] as u16) << 8) | (self.peek_buffer[1] as u16)) - } - - // Reset peek buffer - pub fn reset_peek(&mut self) { - self.peek_buffer.clear(); - } - - // Adds seeking functionality relative to the current position - // Note: This simplistic implementation only supports positive offsets - pub fn seek_relative(&mut self, offset: i64) -> io::Result { - let peek_len = self.peek_buffer.len() as i64; - // Calculate how much we need to seek in the underlying stream - let seek_offset = offset - peek_len; - if seek_offset < 0 { - // If seeking backwards within the peek buffer, this would require a different approach - Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Seeking backwards not supported", - )) - } else { - // Clear the peek buffer since we're seeking past it - self.peek_buffer.clear(); - // Seek in the underlying stream - self.inner.seek(SeekFrom::Current(seek_offset)) - } - } -} - -impl Read for PeekableStream { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - if self.peek_buffer.len() >= buf.len() { - let n = buf.len(); - for (i, byte) in self.peek_buffer.drain(0..n).enumerate() { - buf[i] = byte; - } - return Ok(n); - } - - if !self.peek_buffer.is_empty() { - let n = self.peek_buffer.len(); - for (i, byte) in self.peek_buffer.drain(..).enumerate() { - buf[i] = byte; - } - let m = self.inner.read(&mut buf[n..])?; - return Ok(n + m); - } - - self.inner.read(buf) - } -}