Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Streamer #57

Merged
merged 4 commits into from
Mar 11, 2018
Merged

Streamer #57

merged 4 commits into from
Mar 11, 2018

Conversation

aeyakovenko
Copy link
Member

No description provided.

@aeyakovenko aeyakovenko requested a review from garious March 9, 2018 07:43
@aeyakovenko
Copy link
Member Author

this is 10x slower then what is in the loom repo

@aeyakovenko
Copy link
Member Author

aeyakovenko commented Mar 9, 2018

woot!
performance: 767008.4235675965
@garious !!!!

want to give a quick look on the design? ill merge and squash and add a test for dup/sender

@codecov-io
Copy link

codecov-io commented Mar 9, 2018

Codecov Report

Merging #57 into master will increase coverage by 0.27%.
The diff coverage is 92.2%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master      #57      +/-   ##
==========================================
+ Coverage   91.57%   91.85%   +0.27%     
==========================================
  Files          12       14       +2     
  Lines         724     1031     +307     
==========================================
+ Hits          663      947     +284     
- Misses         61       84      +23
Impacted Files Coverage Δ
src/lib.rs 100% <ø> (ø) ⬆️
src/streamer.rs 91.63% <91.63%> (ø)
src/result.rs 94.2% <94.2%> (ø)
src/signature.rs 100% <0%> (+7.14%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 1e07014...069f9f0. Read the comment docs.

@garious
Copy link
Contributor

garious commented Mar 10, 2018

Anatoly, can you rebase on the latest?

fn run_read_from(&mut self, socket: &UdpSocket) -> Result<usize> {
self.packets.resize(BLOCK_SIZE, Packet::default());
let mut i = 0;
for p in self.packets.iter_mut() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enumerate()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really dumb, but, if i used an enumerate, i would still need a mut to return the value outside of the loop. If i had a return instead of a break, then the return statement at the end of the function would be unreachable.

IO(std::io::Error),
JSON(serde_json::Error),
AddrParse(std::net::AddrParseError),
JoinError(Box<Any + Send + 'static>),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems odd. What's going on here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am converting the join error type into the result type. It has a really weird signature because its a return value of the closure that is passed into spawn.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That `static most likely comes from the possibility of the thread outliving the parent thread. You can get that out of there by either using an explicit lifetime or using scoped threads from crossbeam.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like a big change just for the type. for a return value the 'static' isn't a huge problem because all-most all the apis just return ()

src/result.rs Outdated

fn addr_parse_error() -> Result<SocketAddr> {
let r = "12fdfasfsafsadfs".parse()?;
return Ok(r);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two lines can be reduced to:

"12fdfasfsafsadfs".parse()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't do the conversion

  --> src/result.rs:74:9                                                                                                                                      
   |                                                                                                                                                          
73 |     fn join_error() -> Result<()> {                                                                                                                      
   |                        ---------- expected `std::result::Result<(), resul                                                                                
t::Error>` because of return type                                                                                                                             
74 |         thread::spawn(|| panic!("hi")).join()                                                                                                            
   |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected enum `result::Erro                                                                                
r`, found struct `std::boxed::Box`                                                                                                                            
   |                                                                                                                                                          
   = note: expected type `std::result::Result<(), result::Error>`                                                                                             
              found type `std::result::Result<_, std::boxed::Box<std::any::Any                                                                                
 + std::marker::Send + 'static>>`   
    fn join_error() -> Result<()> {
        thread::spawn(|| panic!("hi")).join()                                                                                                                 
    }

or

    fn join_error() -> Result<()> {
        Result::from(thread::spawn(|| panic!("hi")).join())                                                                                                   
    }

doesn't work either

src/result.rs Outdated
}

fn join_error() -> Result<()> {
thread::spawn(|| panic!("hi")).join()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with this and the next few

src/streamer.rs Outdated
fn read_from(&mut self, socket: &UdpSocket) -> Result<()> {
let sz = self.run_read_from(socket)?;
self.packets.resize(sz, Packet::default());
return Ok(());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The last line of a function should just be an expression, not a statement. In most of you functions, can you replace return Ok(()); with just Ok(())?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yar, you cant teach an old c-dog new tricks

src/streamer.rs Outdated
r: Receiver<SharedPacketData>,
) -> JoinHandle<()> {
spawn(move || loop {
match recv_send(&sock, recycler.clone(), &r) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this could reduce to:

if recv_send(&sock, recycler.clone(), &r).is_err() && exit.lock().unwrap() {
   break;
}


pub type Result<T> = std::result::Result<T, Error>;

impl std::convert::From<std::sync::mpsc::RecvError> for Error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool way to do it.

* message needs to fit into 256 bytes
* allocator to keep track of blocks of messages
* udp socket receiver server that fills up the block as fast as possible
* udp socket sender server that sends out the block as fast as possible
@aeyakovenko
Copy link
Member Author

@garious do you know how to enable ipv6 on travis?

@garious
Copy link
Contributor

garious commented Mar 11, 2018

No, I don't know how to enable ipv6, but if it's not easy to do and you want to keep moving forward, guard your new functionality with a feature flag.

Are you planning to close this PR in favor of the other?

@aeyakovenko
Copy link
Member Author

no, pipelining

@garious garious merged commit a7ecf4a into solana-labs:master Mar 11, 2018
vkomenda pushed a commit to vkomenda/solana that referenced this pull request Aug 29, 2021
steviez pushed a commit to steviez/solana that referenced this pull request Mar 6, 2024
willhickey pushed a commit that referenced this pull request Mar 9, 2024
segfaultdoc pushed a commit to jito-labs/solana that referenced this pull request Aug 7, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants