Skip to content

Commit

Permalink
implement simultaneous open extension
Browse files Browse the repository at this point in the history
  • Loading branch information
vyzo committed Sep 21, 2019
1 parent b2ba9e9 commit 36a530c
Showing 1 changed file with 215 additions and 0 deletions.
215 changes: 215 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package multistream

import (
"bytes"
"crypto/rand"
"errors"
"io"
"math/big"
"strings"
)

// ErrNotSupported is the error returned when the muxer does not support
Expand Down Expand Up @@ -74,6 +77,218 @@ func SelectOneOf(protos []string, rwc io.ReadWriteCloser) (string, error) {
return "", ErrNotSupported
}

// Performs protocol negotiation with the simultaneous open extension; the returned boolean
// indicator will be true if we should act as a server.
func SelectWithSimopen(protos []string, rwc io.ReadWriteCloser) (string, bool, error) {
if len(protos) == 0 {
return "", false, ErrNoProtocols
}

var buf bytes.Buffer
delimWrite(&buf, []byte(ProtocolID))
delimWrite(&buf, []byte("iamclient"))
delimWrite(&buf, []byte(protos[0]))
_, err := io.Copy(rwc, &buf)
if err != nil {
return "", false, err
}

err = readMultistreamHeader(rwc)
if err != nil {
return "", false, err
}

tok, err := ReadNextToken(rwc)
if err != nil {
return "", false, err
}

switch tok {
case "iamclient":
// simultaneous open
return simOpen(protos, rwc)

case "na":
// client open
proto, err := clientOpen(protos, rwc)
if err != nil {
return "", false, err
}

return proto, false, nil

default:
return "", false, errors.New("unexpected response: " + tok)
}
}

func clientOpen(protos []string, rwc io.ReadWriteCloser) (string, error) {
// check to see if we selected the pipelined protocol
tok, err := ReadNextToken(rwc)
if err != nil {
return "", err
}

switch tok {
case protos[0]:
return tok, nil
case "na":
// try the other protos
for _, p := range protos[1:] {
err = trySelect(p, rwc)
switch err {
case nil:
return p, nil
case ErrNotSupported:
default:
return "", err
}
}

return "", ErrNotSupported
default:
return "", errors.New("unexpected response: " + tok)
}
}

func simOpen(protos []string, rwc io.ReadWriteCloser) (string, bool, error) {
retries := 3

again:
mynonce := make([]byte, 32)
_, err := rand.Read(mynonce)
if err != nil {
return "", false, err
}

myselect := []byte("select:" + string(mynonce))
err = delimWriteBuffered(rwc, myselect)
if err != nil {
return "", false, err
}

var peerselect string
for {
tok, err := ReadNextToken(rwc)
if err != nil {
return "", false, err
}

// this skips pipelined protocol negoatiation
// keep reading until the token starts with select:
if strings.HasPrefix(tok, "select:") {
peerselect = tok
break
}
}

peernonce := []byte(peerselect[7:])

var mybig, peerbig big.Int
var iamserver bool
mybig.SetBytes(mynonce)
peerbig.SetBytes(peernonce)

switch mybig.Cmp(&peerbig) {
case -1:
// peer nonce bigger, he is client
iamserver = true

case 1:
// my nonce bigger, i am client
iamserver = false

case 0:
// wtf, the world is ending! try again.
if retries > 0 {
retries--
goto again
}

return "", false, errors.New("failed client selection; identical nonces!")

default:
return "", false, errors.New("wut? bigint.Cmp returned unexpected value")
}

var proto string
if iamserver {
proto, err = simOpenSelectServer(protos, rwc)
} else {
proto, err = simOpenSelectClient(protos, rwc)
}

return proto, iamserver, err
}

func simOpenSelectServer(protos []string, rwc io.ReadWriteCloser) (string, error) {
err := delimWriteBuffered(rwc, []byte("responder"))
if err != nil {
return "", err
}

tok, err := ReadNextToken(rwc)
if err != nil {
return "", err
}
if tok != "initiator" {
return "", errors.New("unexpected response: " + tok)
}

for {
tok, err = ReadNextToken(rwc)
if err != nil {
return "", err
}

for _, p := range protos {
if tok == p {
err = delimWriteBuffered(rwc, []byte(p))
if err != nil {
return "", err
}

return p, nil
}
}

err = delimWriteBuffered(rwc, []byte("na"))
if err != nil {
return "", err
}
}

}

func simOpenSelectClient(protos []string, rwc io.ReadWriteCloser) (string, error) {
err := delimWriteBuffered(rwc, []byte("initiator"))
if err != nil {
return "", err
}

tok, err := ReadNextToken(rwc)
if err != nil {
return "", err
}
if tok != "responder" {
return "", errors.New("unexpected response: " + tok)
}

for _, p := range protos {
err = trySelect(p, rwc)
switch err {
case nil:
return p, nil

case ErrNotSupported:
default:
return "", err
}
}

return "", ErrNotSupported
}

func handshake(rw io.ReadWriter) error {
errCh := make(chan error, 1)
go func() {
Expand Down

0 comments on commit 36a530c

Please sign in to comment.