Skip to content

Commit

Permalink
gossip_map: fix the decoding reading chunk
Browse files Browse the repository at this point in the history
A lightning network message is a chunk of memory, and when
a single stream contains more than a single message it is
required split this by chunk.

This commit is implementing the following logic

Signed-off-by: Vincenzo Palazzo <vincenzopalazzodev@gmail.com>
  • Loading branch information
vincenzopalazzo committed Jul 26, 2024
1 parent a06785b commit c64ed57
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 96 deletions.
62 changes: 45 additions & 17 deletions gossip_map/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#![allow(dead_code)]
use std::collections::HashMap;
use std::fs::File;
use std::io;
use std::io::{BufReader, Error, ErrorKind};
use std::io::{BufReader, Read, Seek, SeekFrom};

use fundamentals::core::FromWire;
use fundamentals::types::ShortChannelId;
Expand All @@ -13,7 +11,6 @@ mod bolt7;
mod flags;
mod gossip_stor_wiregen;
mod gossip_types;
mod peekable_stream;

use crate::bolt7::{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement};
use crate::flags::{
Expand Down Expand Up @@ -86,29 +83,38 @@ impl GossipMap {

fn refresh(&mut self) -> anyhow::Result<()> {
let gossip_store = File::open(self.path.clone())?;
let stream = BufReader::new(gossip_store);
let mut stream = peekable_stream::PeekableStream::new(stream);
let mut stream = BufReader::new(gossip_store);

let version = u8::from_wire(&mut stream)? as u16;
if (version & GOSSIP_STORE_MAJOR_VERSION_MASK) != GOSSIP_STORE_MAJOR_VERSION {
anyhow::bail!("Invalid gossip store version {version}");
}
self.version = version as u8;

log::info!("Gossip map version: v{}", self.version);
let mut last_short_channel_id: Option<ShortChannelId> = None;

while let Ok(header) = GossipStoredHeader::from_wire(&mut stream) {
log::debug!("header {:?}", header);
if (header.flag() & flags::GOSSIP_STORE_LEN_DELETED_BIT) != 0 {
log::debug!("flags::GOSSIP_STORE_LEN_DELETED_BIT");
log::warn!("flags::GOSSIP_STORE_LEN_DELETED_BIT");
// Consume the buffer
let mut inner_stream: Vec<u8> = vec![0; header.len.into()];
stream.read_exact(&mut inner_stream)?;
continue;
}
let typmsg = stream.peek_msgtype()?;

let typmsg = u16::from_wire(&mut stream)?;
// fake lookup, because the message will decode the type.
stream.seek(SeekFrom::Current(-2))?;
log::info!("type: {typmsg}");
match typmsg {
// channel announcement!
256 => {
let channel_announcement = ChannelAnnouncement::from_wire(&mut stream)?;
log::trace!("{:?}", channel_announcement);
log::info!("channel announcement");
let mut inner_stream: Vec<u8> = vec![0; header.len.into()];
stream.read_exact(&mut inner_stream)?;
let mut inner_stream = inner_stream.as_slice();
let channel_announcement = ChannelAnnouncement::from_wire(&mut inner_stream)?;
let node_one =
GossipNodeId::from_bytes(&channel_announcement.node_id_1.to_vec()).unwrap();
let node_two =
Expand Down Expand Up @@ -138,8 +144,11 @@ impl GossipMap {
unimplemented!();
}
WIRE_GOSSIP_STORE_CHANNEL_AMOUNT => {
let channel_amount = GossipStoreChannelAmount::from_wire(&mut stream)?;
log::trace!("{:?}", channel_amount);
log::info!("gossip store amount");
let mut inner_stream: Vec<u8> = vec![0; header.len.into()];
stream.read_exact(&mut inner_stream)?;
let mut inner_stream = inner_stream.as_slice();
let channel_amount = GossipStoreChannelAmount::from_wire(&mut inner_stream)?;
//FIXME: remove the unwrap().
let channel = self
.channels
Expand All @@ -162,7 +171,15 @@ impl GossipMap {
break;
}
257 => {
let node_announcement = NodeAnnouncement::from_wire(&mut stream).unwrap();
let mut inner_stream: Vec<u8> = vec![0; header.len.into()];
stream.read_exact(&mut inner_stream)?;
let mut inner_stream = inner_stream.as_slice();
log::info!(
"buffer len {} vs expected {}",
inner_stream.len(),
header.len
);
let node_announcement = NodeAnnouncement::from_wire(&mut inner_stream).unwrap();
log::trace!("{:?}", node_announcement);
let node_id = GossipNodeId::from_bytes(&node_announcement.node_id)?;
if !self.nodes.contains_key(&node_id) {
Expand All @@ -172,7 +189,10 @@ impl GossipMap {
}
258 => {
log::info!("found channel update");
let channel_update = ChannelUpdate::from_wire(&mut stream)?;
let mut inner_stream: Vec<u8> = vec![0; header.len.into()];
stream.read_exact(&mut inner_stream)?;
let mut inner_stream = inner_stream.as_slice();
let channel_update = ChannelUpdate::from_wire(&mut inner_stream)?;
if let Some(channel) = self.channels.get_mut(&channel_update.short_channel_id) {
log::info!(
"found channel with short id `{}`",
Expand All @@ -184,7 +204,15 @@ impl GossipMap {
.insert(channel_update.short_channel_id, channel_update);
}
}
_ => anyhow::bail!("Unexpected message with type `{typmsg}`"),
_ => {
log::error!(
"Unexpected message with type `{typmsg}`, breaking: size of nodes {}",
self.nodes.len()
);
let mut inner_stream: Vec<u8> = vec![0; header.len.into()];
stream.read_exact(&mut inner_stream)?;
continue;
}
}
}
log::info!("{:#?}", self.nodes);
Expand Down Expand Up @@ -213,7 +241,7 @@ mod tests {
#[test]
fn read_gossipmap_from_file() {
init();
let path = "/run/media/vincent/VincentSSD/.lightning/signet/gossip_store";
let path = "/run/media/vincent/VincentSSD/.lightning/testnet/gossip_store";
let pubkey = "03b39d1ddf13ce486de74e9e44e0538f960401a9ec75534ba9cfe4100d65426880";
let map = GossipMap::from_file(path);
assert!(map.is_ok(), "{:?}", map);
Expand Down
79 changes: 0 additions & 79 deletions gossip_map/src/peekable_stream.rs

This file was deleted.

0 comments on commit c64ed57

Please sign in to comment.