Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Send repairman shreds to the repair socket (#6671)
Browse files Browse the repository at this point in the history
  • Loading branch information
sagar-solana authored Nov 1, 2019
1 parent 2e30926 commit 2d67962
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 9 deletions.
10 changes: 9 additions & 1 deletion core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1507,6 +1507,7 @@ impl ClusterInfo {
daddr,
daddr,
daddr,
daddr,
timestamp(),
);
(node, gossip_socket, Some(ip_echo))
Expand All @@ -1527,6 +1528,7 @@ impl ClusterInfo {
daddr,
daddr,
daddr,
daddr,
timestamp(),
);
(node, gossip_socket, None)
Expand Down Expand Up @@ -1610,6 +1612,7 @@ impl Node {
gossip.local_addr().unwrap(),
tvu.local_addr().unwrap(),
tvu_forwards.local_addr().unwrap(),
repair.local_addr().unwrap(),
empty,
empty,
storage.local_addr().unwrap(),
Expand Down Expand Up @@ -1658,6 +1661,7 @@ impl Node {
tvu_forwards.local_addr().unwrap(),
tpu.local_addr().unwrap(),
tpu_forwards.local_addr().unwrap(),
repair.local_addr().unwrap(),
storage.local_addr().unwrap(),
rpc_addr,
rpc_pubsub_addr,
Expand Down Expand Up @@ -1717,14 +1721,15 @@ impl Node {
let (_, retransmit_sockets) =
multi_bind_in_range(port_range, 8).expect("retransmit multi_bind");

let (_, repair) = Self::bind(port_range);
let (repair_port, repair) = Self::bind(port_range);
let (_, broadcast) = Self::bind(port_range);

let info = ContactInfo::new(
pubkey,
SocketAddr::new(gossip_addr.ip(), gossip_port),
SocketAddr::new(gossip_addr.ip(), tvu_port),
SocketAddr::new(gossip_addr.ip(), tvu_forwards_port),
SocketAddr::new(gossip_addr.ip(), repair_port),
SocketAddr::new(gossip_addr.ip(), tpu_port),
SocketAddr::new(gossip_addr.ip(), tpu_forwards_port),
socketaddr_any!(),
Expand Down Expand Up @@ -1882,6 +1887,7 @@ mod tests {
socketaddr!([127, 0, 0, 1], 1239),
socketaddr!([127, 0, 0, 1], 1240),
socketaddr!([127, 0, 0, 1], 1241),
socketaddr!([127, 0, 0, 1], 1242),
0,
);
cluster_info.insert_info(nxt.clone());
Expand All @@ -1902,6 +1908,7 @@ mod tests {
socketaddr!([127, 0, 0, 1], 1239),
socketaddr!([127, 0, 0, 1], 1240),
socketaddr!([127, 0, 0, 1], 1241),
socketaddr!([127, 0, 0, 1], 1242),
0,
);
cluster_info.insert_info(nxt);
Expand Down Expand Up @@ -1939,6 +1946,7 @@ mod tests {
socketaddr!("127.0.0.1:1239"),
socketaddr!("127.0.0.1:1240"),
socketaddr!("127.0.0.1:1241"),
socketaddr!("127.0.0.1:1242"),
0,
);
let rv = ClusterInfo::run_window_request(
Expand Down
14 changes: 7 additions & 7 deletions core/src/cluster_info_repair_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,13 @@ impl ClusterInfoRepairListener {
for (repairee_pubkey, repairee_epoch_slots) in repairees {
let repairee_root = repairee_epoch_slots.root;

let repairee_tvu = {
let repairee_repair_addr = {
let r_cluster_info = cluster_info.read().unwrap();
let contact_info = r_cluster_info.get_contact_info_for_node(repairee_pubkey);
contact_info.map(|c| c.tvu)
contact_info.map(|c| c.repair)
};

if let Some(repairee_tvu) = repairee_tvu {
if let Some(repairee_addr) = repairee_repair_addr {
// For every repairee, get the set of repairmen who are responsible for
let mut eligible_repairmen = Self::find_eligible_repairmen(
my_pubkey,
Expand All @@ -242,7 +242,7 @@ impl ClusterInfoRepairListener {
&repairee_epoch_slots,
&eligible_repairmen,
socket,
&repairee_tvu,
&repairee_addr,
NUM_SLOTS_PER_UPDATE,
epoch_schedule,
);
Expand All @@ -261,7 +261,7 @@ impl ClusterInfoRepairListener {
repairee_epoch_slots: &EpochSlots,
eligible_repairmen: &[&Pubkey],
socket: &UdpSocket,
repairee_tvu: &SocketAddr,
repairee_addr: &SocketAddr,
num_slots_to_repair: usize,
epoch_schedule: &EpochSchedule,
) -> Result<()> {
Expand Down Expand Up @@ -320,15 +320,15 @@ impl ClusterInfoRepairListener {
.get_data_shred(slot, blob_index as u64)
.expect("Failed to read data blob from blocktree")
{
socket.send_to(&blob_data[..], repairee_tvu)?;
socket.send_to(&blob_data[..], repairee_addr)?;
total_data_blobs_sent += 1;
}

if let Some(coding_bytes) = blocktree
.get_coding_shred(slot, blob_index as u64)
.expect("Failed to read coding blob from blocktree")
{
socket.send_to(&coding_bytes[..], repairee_tvu)?;
socket.send_to(&coding_bytes[..], repairee_addr)?;
total_coding_blobs_sent += 1;
}
}
Expand Down
14 changes: 13 additions & 1 deletion core/src/contact_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ pub struct ContactInfo {
pub gossip: SocketAddr,
/// address to connect to for replication
pub tvu: SocketAddr,
/// address to forward blobs to
/// address to forward shreds to
pub tvu_forwards: SocketAddr,
/// address to send repairs to
pub repair: SocketAddr,
/// transactions address
pub tpu: SocketAddr,
/// address to forward unprocessed transactions to
Expand Down Expand Up @@ -80,6 +82,7 @@ impl Default for ContactInfo {
gossip: socketaddr_any!(),
tvu: socketaddr_any!(),
tvu_forwards: socketaddr_any!(),
repair: socketaddr_any!(),
tpu: socketaddr_any!(),
tpu_forwards: socketaddr_any!(),
storage_addr: socketaddr_any!(),
Expand All @@ -98,6 +101,7 @@ impl ContactInfo {
gossip: SocketAddr,
tvu: SocketAddr,
tvu_forwards: SocketAddr,
repair: SocketAddr,
tpu: SocketAddr,
tpu_forwards: SocketAddr,
storage_addr: SocketAddr,
Expand All @@ -111,6 +115,7 @@ impl ContactInfo {
gossip,
tvu,
tvu_forwards,
repair,
tpu,
tpu_forwards,
storage_addr,
Expand All @@ -131,6 +136,7 @@ impl ContactInfo {
socketaddr!("127.0.0.1:1239"),
socketaddr!("127.0.0.1:1240"),
socketaddr!("127.0.0.1:1241"),
socketaddr!("127.0.0.1:1242"),
now,
)
}
Expand All @@ -150,6 +156,7 @@ impl ContactInfo {
addr,
addr,
addr,
addr,
0,
)
}
Expand All @@ -167,13 +174,15 @@ impl ContactInfo {
let tvu_addr = next_port(&bind_addr, 2);
let tpu_forwards_addr = next_port(&bind_addr, 3);
let tvu_forwards_addr = next_port(&bind_addr, 4);
let repair = next_port(&bind_addr, 5);
let rpc_addr = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PORT);
let rpc_pubsub_addr = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PUBSUB_PORT);
Self::new(
pubkey,
gossip_addr,
tvu_addr,
tvu_forwards_addr,
repair,
tpu_addr,
tpu_forwards_addr,
"0.0.0.0:0".parse().unwrap(),
Expand Down Expand Up @@ -202,6 +211,7 @@ impl ContactInfo {
daddr,
daddr,
daddr,
daddr,
timestamp(),
)
}
Expand Down Expand Up @@ -245,6 +255,7 @@ impl Signable for ContactInfo {
tvu: SocketAddr,
tpu: SocketAddr,
tpu_forwards: SocketAddr,
repair: SocketAddr,
storage_addr: SocketAddr,
rpc: SocketAddr,
rpc_pubsub: SocketAddr,
Expand All @@ -259,6 +270,7 @@ impl Signable for ContactInfo {
tpu: me.tpu,
storage_addr: me.storage_addr,
tpu_forwards: me.tpu_forwards,
repair: me.repair,
rpc: me.rpc,
rpc_pubsub: me.rpc_pubsub,
wallclock: me.wallclock,
Expand Down

0 comments on commit 2d67962

Please sign in to comment.