From 0a5c7a114abe7e7bff88118fcfaa462f69f1a31e Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 26 Aug 2019 14:14:04 +0300 Subject: [PATCH 01/10] implement simultaneous open extension --- client.go | 215 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 215 insertions(+) diff --git a/client.go b/client.go index 1de2443..4857400 100644 --- a/client.go +++ b/client.go @@ -2,8 +2,11 @@ package multistream import ( "bytes" + "crypto/rand" "errors" "io" + "math/big" + "strings" ) // ErrNotSupported is the error returned when the muxer does not support @@ -74,6 +77,218 @@ func SelectOneOf(protos []string, rwc io.ReadWriteCloser) (string, error) { return "", ErrNotSupported } +// Performs protocol negotiation with the simultaneous open extension; the returned boolean +// indicator will be true if we should act as a server. +func SelectWithSimopen(protos []string, rwc io.ReadWriteCloser) (string, bool, error) { + if len(protos) == 0 { + return "", false, ErrNoProtocols + } + + var buf bytes.Buffer + delimWrite(&buf, []byte(ProtocolID)) + delimWrite(&buf, []byte("iamclient")) + delimWrite(&buf, []byte(protos[0])) + _, err := io.Copy(rwc, &buf) + if err != nil { + return "", false, err + } + + err = readMultistreamHeader(rwc) + if err != nil { + return "", false, err + } + + tok, err := ReadNextToken(rwc) + if err != nil { + return "", false, err + } + + switch tok { + case "iamclient": + // simultaneous open + return simOpen(protos, rwc) + + case "na": + // client open + proto, err := clientOpen(protos, rwc) + if err != nil { + return "", false, err + } + + return proto, false, nil + + default: + return "", false, errors.New("unexpected response: " + tok) + } +} + +func clientOpen(protos []string, rwc io.ReadWriteCloser) (string, error) { + // check to see if we selected the pipelined protocol + tok, err := ReadNextToken(rwc) + if err != nil { + return "", err + } + + switch tok { + case protos[0]: + return tok, nil + case "na": + // try the other protos + for _, p := range protos[1:] { + err = trySelect(p, rwc) + switch err { + case nil: + return p, nil + case ErrNotSupported: + default: + return "", err + } + } + + return "", ErrNotSupported + default: + return "", errors.New("unexpected response: " + tok) + } +} + +func simOpen(protos []string, rwc io.ReadWriteCloser) (string, bool, error) { + retries := 3 + +again: + mynonce := make([]byte, 32) + _, err := rand.Read(mynonce) + if err != nil { + return "", false, err + } + + myselect := []byte("select:" + string(mynonce)) + err = delimWriteBuffered(rwc, myselect) + if err != nil { + return "", false, err + } + + var peerselect string + for { + tok, err := ReadNextToken(rwc) + if err != nil { + return "", false, err + } + + // this skips pipelined protocol negoatiation + // keep reading until the token starts with select: + if strings.HasPrefix(tok, "select:") { + peerselect = tok + break + } + } + + peernonce := []byte(peerselect[7:]) + + var mybig, peerbig big.Int + var iamserver bool + mybig.SetBytes(mynonce) + peerbig.SetBytes(peernonce) + + switch mybig.Cmp(&peerbig) { + case -1: + // peer nonce bigger, he is client + iamserver = true + + case 1: + // my nonce bigger, i am client + iamserver = false + + case 0: + // wtf, the world is ending! try again. + if retries > 0 { + retries-- + goto again + } + + return "", false, errors.New("failed client selection; identical nonces!") + + default: + return "", false, errors.New("wut? bigint.Cmp returned unexpected value") + } + + var proto string + if iamserver { + proto, err = simOpenSelectServer(protos, rwc) + } else { + proto, err = simOpenSelectClient(protos, rwc) + } + + return proto, iamserver, err +} + +func simOpenSelectServer(protos []string, rwc io.ReadWriteCloser) (string, error) { + err := delimWriteBuffered(rwc, []byte("responder")) + if err != nil { + return "", err + } + + tok, err := ReadNextToken(rwc) + if err != nil { + return "", err + } + if tok != "initiator" { + return "", errors.New("unexpected response: " + tok) + } + + for { + tok, err = ReadNextToken(rwc) + if err != nil { + return "", err + } + + for _, p := range protos { + if tok == p { + err = delimWriteBuffered(rwc, []byte(p)) + if err != nil { + return "", err + } + + return p, nil + } + } + + err = delimWriteBuffered(rwc, []byte("na")) + if err != nil { + return "", err + } + } + +} + +func simOpenSelectClient(protos []string, rwc io.ReadWriteCloser) (string, error) { + err := delimWriteBuffered(rwc, []byte("initiator")) + if err != nil { + return "", err + } + + tok, err := ReadNextToken(rwc) + if err != nil { + return "", err + } + if tok != "responder" { + return "", errors.New("unexpected response: " + tok) + } + + for _, p := range protos { + err = trySelect(p, rwc) + switch err { + case nil: + return p, nil + + case ErrNotSupported: + default: + return "", err + } + } + + return "", ErrNotSupported +} + func handshake(rw io.ReadWriter) error { errCh := make(chan error, 1) go func() { From 5da27f85b87a29de063bf397f3a5affb5369414d Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 21 Sep 2019 13:07:52 +0300 Subject: [PATCH 02/10] handle protcol mismatch correctly in server path of simultaneous open --- client.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/client.go b/client.go index 4857400..5560f4f 100644 --- a/client.go +++ b/client.go @@ -237,6 +237,11 @@ func simOpenSelectServer(protos []string, rwc io.ReadWriteCloser) (string, error for { tok, err = ReadNextToken(rwc) + + if err == io.EOF { + return "", ErrNotSupported + } + if err != nil { return "", err } From 276e57ef5924d1d28ffe2e7f022771ac3680cd60 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 21 Sep 2019 13:08:03 +0300 Subject: [PATCH 03/10] add simultaneous open tests --- multistream_test.go | 171 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 171 insertions(+) diff --git a/multistream_test.go b/multistream_test.go index 46f5661..6e6ef3d 100644 --- a/multistream_test.go +++ b/multistream_test.go @@ -743,3 +743,174 @@ func TestNegotiateFail(t *testing.T) { t.Fatal("got wrong protocol") } } + +func TestSimopenClientServer(t *testing.T) { + a, b := newPipe(t) + + mux := NewMultistreamMuxer() + mux.AddHandler("/a", nil) + + done := make(chan struct{}) + go func() { + selected, _, err := mux.Negotiate(a) + if err != nil { + t.Fatal(err) + } + if selected != "/a" { + t.Fatal("incorrect protocol selected") + } + close(done) + }() + + proto, server, err := SelectWithSimopen([]string{"/a"}, b) + if err != nil { + t.Fatal(err) + } + + if proto != "/a" { + t.Fatal("wrong protocol selected") + } + + if server { + t.Fatal("expected to be client") + } + + select { + case <-time.After(time.Second): + t.Fatal("protocol negotiation didn't complete") + case <-done: + } + + verifyPipe(t, a, b) +} + +func TestSimopenClientServerFail(t *testing.T) { + a, b := newPipe(t) + + mux := NewMultistreamMuxer() + mux.AddHandler("/a", nil) + + done := make(chan struct{}) + go func() { + _, _, err := mux.Negotiate(a) + if err != io.EOF { + t.Fatal(err) + } + close(done) + }() + + _, _, err := SelectWithSimopen([]string{"/b"}, b) + if err != ErrNotSupported { + t.Fatal(err) + } + b.Close() + + select { + case <-time.After(time.Second): + t.Fatal("protocol negotiation didn't complete") + case <-done: + } +} + +func TestSimopenClientClient(t *testing.T) { + a, b := newPipe(t) + + done := make(chan bool, 1) + go func() { + proto, server, err := SelectWithSimopen([]string{"/a"}, b) + if err != nil { + t.Fatal(err) + } + if proto != "/a" { + t.Fatal("wrong protocol selected") + } + done <- server + }() + + proto, servera, err := SelectWithSimopen([]string{"/a"}, a) + if err != nil { + t.Fatal(err) + } + if proto != "/a" { + t.Fatal("wrong protocol selected") + } + + var serverb bool + select { + case <-time.After(time.Second): + t.Fatal("protocol negotiation didn't complete") + + case serverb = <-done: + } + + if servera == serverb { + t.Fatal("client selection failed") + } + + verifyPipe(t, a, b) +} + +func TestSimopenClientClient2(t *testing.T) { + a, b := newPipe(t) + + done := make(chan bool, 1) + go func() { + proto, server, err := SelectWithSimopen([]string{"/a", "/b"}, b) + if err != nil { + t.Fatal(err) + } + if proto != "/b" { + t.Fatal("wrong protocol selected") + } + done <- server + }() + + proto, servera, err := SelectWithSimopen([]string{"/b"}, a) + if err != nil { + t.Fatal(err) + } + if proto != "/b" { + t.Fatal("wrong protocol selected") + } + + var serverb bool + select { + case <-time.After(time.Second): + t.Fatal("protocol negotiation didn't complete") + + case serverb = <-done: + } + + if servera == serverb { + t.Fatal("client selection failed") + } + + verifyPipe(t, a, b) +} + +func TestSimopenClientClientFail(t *testing.T) { + a, b := newPipe(t) + + done := make(chan struct{}) + go func() { + _, _, err := SelectWithSimopen([]string{"/a"}, b) + if err != ErrNotSupported { + t.Error(err) + } + b.Close() + close(done) + }() + + _, _, err := SelectWithSimopen([]string{"/b"}, a) + if err != ErrNotSupported { + t.Fatal(err) + } + a.Close() + + select { + case <-time.After(time.Second): + t.Fatal("protocol negotiation didn't complete") + + case <-done: + } +} From ef959dab5ee1725243f82a4055bd93cd1addc511 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 21 Sep 2019 13:24:38 +0300 Subject: [PATCH 04/10] concurrent read/write goroutines for simultaneous open --- client.go | 60 ++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 39 insertions(+), 21 deletions(-) diff --git a/client.go b/client.go index 5560f4f..360c585 100644 --- a/client.go +++ b/client.go @@ -84,22 +84,27 @@ func SelectWithSimopen(protos []string, rwc io.ReadWriteCloser) (string, bool, e return "", false, ErrNoProtocols } - var buf bytes.Buffer - delimWrite(&buf, []byte(ProtocolID)) - delimWrite(&buf, []byte("iamclient")) - delimWrite(&buf, []byte(protos[0])) - _, err := io.Copy(rwc, &buf) + werrCh := make(chan error, 1) + go func() { + var buf bytes.Buffer + delimWrite(&buf, []byte(ProtocolID)) + delimWrite(&buf, []byte("iamclient")) + delimWrite(&buf, []byte(protos[0])) + _, err := io.Copy(rwc, &buf) + werrCh <- err + }() + + err := readMultistreamHeader(rwc) if err != nil { return "", false, err } - err = readMultistreamHeader(rwc) + tok, err := ReadNextToken(rwc) if err != nil { return "", false, err } - tok, err := ReadNextToken(rwc) - if err != nil { + if err = <-werrCh; err != nil { return "", false, err } @@ -161,11 +166,12 @@ again: return "", false, err } - myselect := []byte("select:" + string(mynonce)) - err = delimWriteBuffered(rwc, myselect) - if err != nil { - return "", false, err - } + werrCh := make(chan error, 1) + go func() { + myselect := []byte("select:" + string(mynonce)) + err := delimWriteBuffered(rwc, myselect) + werrCh <- err + }() var peerselect string for { @@ -182,6 +188,10 @@ again: } } + if err = <-werrCh; err != nil { + return "", false, err + } + peernonce := []byte(peerselect[7:]) var mybig, peerbig big.Int @@ -222,10 +232,11 @@ again: } func simOpenSelectServer(protos []string, rwc io.ReadWriteCloser) (string, error) { - err := delimWriteBuffered(rwc, []byte("responder")) - if err != nil { - return "", err - } + werrCh := make(chan error, 1) + go func() { + err := delimWriteBuffered(rwc, []byte("responder")) + werrCh <- err + }() tok, err := ReadNextToken(rwc) if err != nil { @@ -234,6 +245,9 @@ func simOpenSelectServer(protos []string, rwc io.ReadWriteCloser) (string, error if tok != "initiator" { return "", errors.New("unexpected response: " + tok) } + if err = <-werrCh; err != nil { + return "", err + } for { tok, err = ReadNextToken(rwc) @@ -266,10 +280,11 @@ func simOpenSelectServer(protos []string, rwc io.ReadWriteCloser) (string, error } func simOpenSelectClient(protos []string, rwc io.ReadWriteCloser) (string, error) { - err := delimWriteBuffered(rwc, []byte("initiator")) - if err != nil { - return "", err - } + werrCh := make(chan error, 1) + go func() { + err := delimWriteBuffered(rwc, []byte("initiator")) + werrCh <- err + }() tok, err := ReadNextToken(rwc) if err != nil { @@ -278,6 +293,9 @@ func simOpenSelectClient(protos []string, rwc io.ReadWriteCloser) (string, error if tok != "responder" { return "", errors.New("unexpected response: " + tok) } + if err = <-werrCh; err != nil { + return "", err + } for _, p := range protos { err = trySelect(p, rwc) From b087b1b3f4804d546eccfaf01548b18449d5025b Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 24 Sep 2019 12:50:03 +0300 Subject: [PATCH 05/10] encode simopen selection nonces in base64 --- client.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/client.go b/client.go index 360c585..f70f74d 100644 --- a/client.go +++ b/client.go @@ -3,6 +3,7 @@ package multistream import ( "bytes" "crypto/rand" + "encoding/base64" "errors" "io" "math/big" @@ -168,7 +169,7 @@ again: werrCh := make(chan error, 1) go func() { - myselect := []byte("select:" + string(mynonce)) + myselect := []byte("select:" + base64.StdEncoding.EncodeToString(mynonce)) err := delimWriteBuffered(rwc, myselect) werrCh <- err }() @@ -192,7 +193,10 @@ again: return "", false, err } - peernonce := []byte(peerselect[7:]) + peernonce, err := base64.StdEncoding.DecodeString(peerselect[7:]) + if err != nil { + return "", false, err + } var mybig, peerbig big.Int var iamserver bool From 2c5076b351df0f1a0c2e5c3c2ed39e199c492528 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Sat, 16 Jan 2021 13:05:03 +0530 Subject: [PATCH 06/10] changes as per review --- client.go | 88 +++++++++++++++------------------------------ multistream.go | 11 ++++++ multistream_test.go | 16 ++++----- 3 files changed, 48 insertions(+), 67 deletions(-) diff --git a/client.go b/client.go index f70f74d..5bd4de2 100644 --- a/client.go +++ b/client.go @@ -6,7 +6,6 @@ import ( "encoding/base64" "errors" "io" - "math/big" "strings" ) @@ -26,8 +25,10 @@ func SelectProtoOrFail(proto string, rwc io.ReadWriteCloser) error { errCh := make(chan error, 1) go func() { var buf bytes.Buffer - delimWrite(&buf, []byte(ProtocolID)) - delimWrite(&buf, []byte(proto)) + if err := delitmWriteAll(&buf, []byte(ProtocolID), []byte(proto)); err != nil { + errCh <- err + return + } _, err := io.Copy(rwc, &buf) errCh <- err }() @@ -65,22 +66,12 @@ func SelectOneOf(protos []string, rwc io.ReadWriteCloser) (string, error) { default: return "", err } - for _, p := range protos[1:] { - err := trySelect(p, rwc) - switch err { - case nil: - return p, nil - case ErrNotSupported: - default: - return "", err - } - } - return "", ErrNotSupported + return selectProtosOrFail(protos[1:], rwc) } // Performs protocol negotiation with the simultaneous open extension; the returned boolean // indicator will be true if we should act as a server. -func SelectWithSimopen(protos []string, rwc io.ReadWriteCloser) (string, bool, error) { +func SelectWithSimopenOrFail(protos []string, rwc io.ReadWriteCloser) (string, bool, error) { if len(protos) == 0 { return "", false, ErrNoProtocols } @@ -88,9 +79,11 @@ func SelectWithSimopen(protos []string, rwc io.ReadWriteCloser) (string, bool, e werrCh := make(chan error, 1) go func() { var buf bytes.Buffer - delimWrite(&buf, []byte(ProtocolID)) - delimWrite(&buf, []byte("iamclient")) - delimWrite(&buf, []byte(protos[0])) + if err := delitmWriteAll(&buf, []byte(ProtocolID), []byte("iamclient"), []byte(protos[0])); err != nil { + werrCh <- err + return + } + _, err := io.Copy(rwc, &buf) werrCh <- err }() @@ -139,28 +132,27 @@ func clientOpen(protos []string, rwc io.ReadWriteCloser) (string, error) { case protos[0]: return tok, nil case "na": - // try the other protos - for _, p := range protos[1:] { - err = trySelect(p, rwc) - switch err { - case nil: - return p, nil - case ErrNotSupported: - default: - return "", err - } - } - - return "", ErrNotSupported + return selectProtosOrFail(protos[1:], rwc) default: return "", errors.New("unexpected response: " + tok) } } -func simOpen(protos []string, rwc io.ReadWriteCloser) (string, bool, error) { - retries := 3 +func selectProtosOrFail(protos []string, rwc io.ReadWriteCloser) (string, error) { + for _, p := range protos { + err := trySelect(p, rwc) + switch err { + case nil: + return p, nil + case ErrNotSupported: + default: + return "", err + } + } + return "", ErrNotSupported +} -again: +func simOpen(protos []string, rwc io.ReadWriteCloser) (string, bool, error) { mynonce := make([]byte, 32) _, err := rand.Read(mynonce) if err != nil { @@ -181,7 +173,7 @@ again: return "", false, err } - // this skips pipelined protocol negoatiation + // this skips pipelined protocol negotiation // keep reading until the token starts with select: if strings.HasPrefix(tok, "select:") { peerselect = tok @@ -198,12 +190,8 @@ again: return "", false, err } - var mybig, peerbig big.Int var iamserver bool - mybig.SetBytes(mynonce) - peerbig.SetBytes(peernonce) - - switch mybig.Cmp(&peerbig) { + switch bytes.Compare(mynonce, peernonce) { case -1: // peer nonce bigger, he is client iamserver = true @@ -213,12 +201,6 @@ again: iamserver = false case 0: - // wtf, the world is ending! try again. - if retries > 0 { - retries-- - goto again - } - return "", false, errors.New("failed client selection; identical nonces!") default: @@ -301,19 +283,7 @@ func simOpenSelectClient(protos []string, rwc io.ReadWriteCloser) (string, error return "", err } - for _, p := range protos { - err = trySelect(p, rwc) - switch err { - case nil: - return p, nil - - case ErrNotSupported: - default: - return "", err - } - } - - return "", ErrNotSupported + return selectProtosOrFail(protos, rwc) } func handshake(rw io.ReadWriter) error { diff --git a/multistream.go b/multistream.go index 73ed5ec..5b4ba7d 100644 --- a/multistream.go +++ b/multistream.go @@ -7,6 +7,7 @@ import ( "bufio" "bytes" "errors" + "fmt" "io" "sync" @@ -81,6 +82,16 @@ func delimWriteBuffered(w io.Writer, mes []byte) error { return bw.Flush() } +func delitmWriteAll(w io.Writer, messages ...[]byte) error { + for _, mes := range messages { + if err := delimWrite(w, mes); err != nil { + return fmt.Errorf("failed to write messages %s, err: %w", string(mes), err) + } + } + + return nil +} + func delimWrite(w io.Writer, mes []byte) error { err := writeUvarint(w, uint64(len(mes)+1)) if err != nil { diff --git a/multistream_test.go b/multistream_test.go index 6e6ef3d..a0e4260 100644 --- a/multistream_test.go +++ b/multistream_test.go @@ -762,7 +762,7 @@ func TestSimopenClientServer(t *testing.T) { close(done) }() - proto, server, err := SelectWithSimopen([]string{"/a"}, b) + proto, server, err := SelectWithSimopenOrFail([]string{"/a"}, b) if err != nil { t.Fatal(err) } @@ -799,7 +799,7 @@ func TestSimopenClientServerFail(t *testing.T) { close(done) }() - _, _, err := SelectWithSimopen([]string{"/b"}, b) + _, _, err := SelectWithSimopenOrFail([]string{"/b"}, b) if err != ErrNotSupported { t.Fatal(err) } @@ -817,7 +817,7 @@ func TestSimopenClientClient(t *testing.T) { done := make(chan bool, 1) go func() { - proto, server, err := SelectWithSimopen([]string{"/a"}, b) + proto, server, err := SelectWithSimopenOrFail([]string{"/a"}, b) if err != nil { t.Fatal(err) } @@ -827,7 +827,7 @@ func TestSimopenClientClient(t *testing.T) { done <- server }() - proto, servera, err := SelectWithSimopen([]string{"/a"}, a) + proto, servera, err := SelectWithSimopenOrFail([]string{"/a"}, a) if err != nil { t.Fatal(err) } @@ -855,7 +855,7 @@ func TestSimopenClientClient2(t *testing.T) { done := make(chan bool, 1) go func() { - proto, server, err := SelectWithSimopen([]string{"/a", "/b"}, b) + proto, server, err := SelectWithSimopenOrFail([]string{"/a", "/b"}, b) if err != nil { t.Fatal(err) } @@ -865,7 +865,7 @@ func TestSimopenClientClient2(t *testing.T) { done <- server }() - proto, servera, err := SelectWithSimopen([]string{"/b"}, a) + proto, servera, err := SelectWithSimopenOrFail([]string{"/b"}, a) if err != nil { t.Fatal(err) } @@ -893,7 +893,7 @@ func TestSimopenClientClientFail(t *testing.T) { done := make(chan struct{}) go func() { - _, _, err := SelectWithSimopen([]string{"/a"}, b) + _, _, err := SelectWithSimopenOrFail([]string{"/a"}, b) if err != ErrNotSupported { t.Error(err) } @@ -901,7 +901,7 @@ func TestSimopenClientClientFail(t *testing.T) { close(done) }() - _, _, err := SelectWithSimopen([]string{"/b"}, a) + _, _, err := SelectWithSimopenOrFail([]string{"/b"}, a) if err != ErrNotSupported { t.Fatal(err) } From 85f04aad069c1c96a0e22242be17bf469ff951b3 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Thu, 21 Jan 2021 11:39:16 +0530 Subject: [PATCH 07/10] skip exactly once protocol --- client.go | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/client.go b/client.go index 5bd4de2..cd6aaee 100644 --- a/client.go +++ b/client.go @@ -17,6 +17,10 @@ var ErrNotSupported = errors.New("protocol not supported") // specified. var ErrNoProtocols = errors.New("no protocols specified") +var ( + tieBreakerPrefix = "select:" +) + // SelectProtoOrFail performs the initial multistream handshake // to inform the muxer of the protocol that will be used to communicate // on this ReadWriteCloser. It returns an error if, for example, @@ -161,31 +165,32 @@ func simOpen(protos []string, rwc io.ReadWriteCloser) (string, bool, error) { werrCh := make(chan error, 1) go func() { - myselect := []byte("select:" + base64.StdEncoding.EncodeToString(mynonce)) + myselect := []byte(tieBreakerPrefix + base64.StdEncoding.EncodeToString(mynonce)) err := delimWriteBuffered(rwc, myselect) werrCh <- err }() - var peerselect string - for { - tok, err := ReadNextToken(rwc) - if err != nil { - return "", false, err - } + // skip exactly one protocol + // see https://github.com/multiformats/go-multistream/pull/42#discussion_r558757135 + _, err = ReadNextToken(rwc) + if err != nil { + return "", false, err + } - // this skips pipelined protocol negotiation - // keep reading until the token starts with select: - if strings.HasPrefix(tok, "select:") { - peerselect = tok - break - } + // read the tie breaker nonce + tok, err := ReadNextToken(rwc) + if err != nil { + return "", false, err + } + if !strings.HasPrefix(tok, tieBreakerPrefix) { + return "", false, errors.New("tie breaker nonce not sent with the correct prefix") } if err = <-werrCh; err != nil { return "", false, err } - peernonce, err := base64.StdEncoding.DecodeString(peerselect[7:]) + peernonce, err := base64.StdEncoding.DecodeString(tok[7:]) if err != nil { return "", false, err } From 31c07c1ea50b6f01d3ad8e9bff834113ae2170da Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Thu, 21 Jan 2021 13:43:22 +0530 Subject: [PATCH 08/10] fixed formatting --- multistream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/multistream.go b/multistream.go index 5b4ba7d..26d28c3 100644 --- a/multistream.go +++ b/multistream.go @@ -85,7 +85,7 @@ func delimWriteBuffered(w io.Writer, mes []byte) error { func delitmWriteAll(w io.Writer, messages ...[]byte) error { for _, mes := range messages { if err := delimWrite(w, mes); err != nil { - return fmt.Errorf("failed to write messages %s, err: %w", string(mes), err) + return fmt.Errorf("failed to write messages %s, err: %v ", string(mes), err) } } From 890f0662d04458dd53e02f5ce7e535ee55821ed4 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Fri, 22 Jan 2021 16:15:49 +0530 Subject: [PATCH 09/10] changes as per review --- client.go | 36 +++++++++++++++++------------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/client.go b/client.go index cd6aaee..1dfebd3 100644 --- a/client.go +++ b/client.go @@ -3,9 +3,10 @@ package multistream import ( "bytes" "crypto/rand" - "encoding/base64" + "encoding/binary" "errors" "io" + "strconv" "strings" ) @@ -17,8 +18,10 @@ var ErrNotSupported = errors.New("protocol not supported") // specified. var ErrNoProtocols = errors.New("no protocols specified") -var ( +const ( tieBreakerPrefix = "select:" + initiator = "initiator" + responder = "responder" ) // SelectProtoOrFail performs the initial multistream handshake @@ -157,15 +160,16 @@ func selectProtosOrFail(protos []string, rwc io.ReadWriteCloser) (string, error) } func simOpen(protos []string, rwc io.ReadWriteCloser) (string, bool, error) { - mynonce := make([]byte, 32) - _, err := rand.Read(mynonce) + randBytes := make([]byte, 8) + _, err := rand.Read(randBytes) if err != nil { return "", false, err } + myNonce := binary.LittleEndian.Uint64(randBytes) werrCh := make(chan error, 1) go func() { - myselect := []byte(tieBreakerPrefix + base64.StdEncoding.EncodeToString(mynonce)) + myselect := []byte(tieBreakerPrefix + strconv.FormatUint(myNonce, 10)) err := delimWriteBuffered(rwc, myselect) werrCh <- err }() @@ -190,26 +194,20 @@ func simOpen(protos []string, rwc io.ReadWriteCloser) (string, bool, error) { return "", false, err } - peernonce, err := base64.StdEncoding.DecodeString(tok[7:]) + peerNone, err := strconv.ParseUint(tok[len(tieBreakerPrefix):], 10, 64) if err != nil { return "", false, err } var iamserver bool - switch bytes.Compare(mynonce, peernonce) { - case -1: + if peerNone > myNonce { // peer nonce bigger, he is client iamserver = true - - case 1: + } else if peerNone < myNonce { // my nonce bigger, i am client iamserver = false - - case 0: + } else { return "", false, errors.New("failed client selection; identical nonces!") - - default: - return "", false, errors.New("wut? bigint.Cmp returned unexpected value") } var proto string @@ -225,7 +223,7 @@ func simOpen(protos []string, rwc io.ReadWriteCloser) (string, bool, error) { func simOpenSelectServer(protos []string, rwc io.ReadWriteCloser) (string, error) { werrCh := make(chan error, 1) go func() { - err := delimWriteBuffered(rwc, []byte("responder")) + err := delimWriteBuffered(rwc, []byte(responder)) werrCh <- err }() @@ -233,7 +231,7 @@ func simOpenSelectServer(protos []string, rwc io.ReadWriteCloser) (string, error if err != nil { return "", err } - if tok != "initiator" { + if tok != initiator { return "", errors.New("unexpected response: " + tok) } if err = <-werrCh; err != nil { @@ -273,7 +271,7 @@ func simOpenSelectServer(protos []string, rwc io.ReadWriteCloser) (string, error func simOpenSelectClient(protos []string, rwc io.ReadWriteCloser) (string, error) { werrCh := make(chan error, 1) go func() { - err := delimWriteBuffered(rwc, []byte("initiator")) + err := delimWriteBuffered(rwc, []byte(initiator)) werrCh <- err }() @@ -281,7 +279,7 @@ func simOpenSelectClient(protos []string, rwc io.ReadWriteCloser) (string, error if err != nil { return "", err } - if tok != "responder" { + if tok != responder { return "", errors.New("unexpected response: " + tok) } if err = <-werrCh; err != nil { From 4c3567f00c1e3069cb1ed9497f2ca4b321f04790 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Mon, 25 Jan 2021 15:32:01 +0530 Subject: [PATCH 10/10] fix code --- client.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/client.go b/client.go index 1dfebd3..6677a14 100644 --- a/client.go +++ b/client.go @@ -194,21 +194,17 @@ func simOpen(protos []string, rwc io.ReadWriteCloser) (string, bool, error) { return "", false, err } - peerNone, err := strconv.ParseUint(tok[len(tieBreakerPrefix):], 10, 64) + peerNonce, err := strconv.ParseUint(tok[len(tieBreakerPrefix):], 10, 64) if err != nil { return "", false, err } var iamserver bool - if peerNone > myNonce { - // peer nonce bigger, he is client - iamserver = true - } else if peerNone < myNonce { - // my nonce bigger, i am client - iamserver = false - } else { + + if peerNonce == myNonce { return "", false, errors.New("failed client selection; identical nonces!") } + iamserver = peerNonce > myNonce var proto string if iamserver {