Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace obsolete websocket library -- allows WebAssembly #433

Merged
merged 19 commits into from
Jul 1, 2022
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ably/realtime_conn_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func TestRealtimeConn_SendErrorReconnects(t *testing.T) {
ablytest.Instantly.NoRecv(t, nil, publishErr, t.Fatalf)

// Reconnect should happen instantly as a result of transport closure.
ablytest.Instantly.Send(t, allowDial, struct{}{}, t.Fatalf)
allowDial <- struct{}{}

// After reconnection, message should be published.
ablytest.Soon.Recv(t, &err, publishErr, t.Fatalf)
Expand Down
97 changes: 64 additions & 33 deletions ably/websocket.go
Original file line number Diff line number Diff line change
@@ -1,55 +1,94 @@
package ably

import (
"context"
"encoding/json"
"errors"
"net"
"net/http"
"net/url"
"time"

"github.com/ably/ably-go/ably/internal/ablyutil"
"golang.org/x/net/websocket"
"nhooyr.io/websocket"
)

type proto int

const (
jsonProto proto = iota
msgpackProto
)

type websocketConn struct {
conn *websocket.Conn
codec websocket.Codec
proto proto
}

func (ws *websocketConn) Send(msg *protocolMessage) error {
return ws.codec.Send(ws.conn, msg)
switch ws.proto {
case jsonProto:
p, err := json.Marshal(msg)
if err != nil {
return err
}
return ws.conn.Write(context.Background(), websocket.MessageText, p)
case msgpackProto:
p, err := ablyutil.MarshalMsgpack(msg)
if err != nil {
return err
}
return ws.conn.Write(context.Background(), websocket.MessageBinary, p)
}
return nil
}

func (ws *websocketConn) Receive(deadline time.Time) (*protocolMessage, error) {
msg := &protocolMessage{}
if !deadline.IsZero() {
err := ws.conn.SetReadDeadline(deadline)
if err != nil {
return nil, err
}
var (
ctx context.Context
cancel context.CancelFunc
)
if deadline.IsZero() {
ctx = context.Background()
} else {
ctx, cancel = context.WithDeadline(context.Background(), deadline)
Jmgr marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()
}
err := ws.codec.Receive(ws.conn, &msg)
_, data, err := ws.conn.Read(ctx)
if err != nil {
return nil, err
}
switch ws.proto {
case jsonProto:
err := json.Unmarshal(data, msg)
if err != nil {
return nil, err
}
case msgpackProto:
err := ablyutil.UnmarshalMsgpack(data, msg)
if err != nil {
return nil, err
}
}
return msg, nil
}

func (ws *websocketConn) Close() error {
return ws.conn.Close()
return ws.conn.Close(websocket.StatusNormalClosure, "")
}

func dialWebsocket(proto string, u *url.URL, timeout time.Duration) (*websocketConn, error) {
ws := &websocketConn{}
switch proto {
case "application/json":
ws.codec = websocket.JSON
ws.proto = jsonProto
case "application/x-msgpack":
ws.codec = msgpackCodec
ws.proto = msgpackProto
default:
return nil, errors.New(`invalid protocol "` + proto + `"`)
}
// Starts a raw websocket connection with server
conn, err := dialWebsocketTimeout(u.String(), "", "https://"+u.Host, timeout)
conn, err := dialWebsocketTimeout(u.String(), "https://"+u.Host, timeout)
if err != nil {
return nil, err
}
Expand All @@ -58,27 +97,19 @@ func dialWebsocket(proto string, u *url.URL, timeout time.Duration) (*websocketC
}

// dialWebsocketTimeout dials the websocket with a timeout.
func dialWebsocketTimeout(uri, protocol, origin string, timeout time.Duration) (*websocket.Conn, error) {
config, err := websocket.NewConfig(uri, origin)
func dialWebsocketTimeout(uri, origin string, timeout time.Duration) (*websocket.Conn, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

var ops websocket.DialOptions
ops.HTTPHeader = make(http.Header)
ops.HTTPHeader.Add(ablyAgentHeader, ablyAgentIdentifier)

c, _, err := websocket.Dial(ctx, uri, &ops)

if err != nil {
return nil, err
}
config.Header.Set(ablyAgentHeader, ablyAgentIdentifier)
if protocol != "" {
config.Protocol = []string{protocol}
}
config.Dialer = &net.Dialer{
Timeout: timeout,
}
return websocket.DialConfig(config)
}

var msgpackCodec = websocket.Codec{
Marshal: func(v interface{}) ([]byte, byte, error) {
p, err := ablyutil.MarshalMsgpack(v)
return p, websocket.BinaryFrame, err
},
Unmarshal: func(p []byte, _ byte, v interface{}) error {
return ablyutil.UnmarshalMsgpack(p, v)
},
return c, nil
}
39 changes: 13 additions & 26 deletions ably/websocket_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,32 +95,24 @@ func TestWebsocketDial(t *testing.T) {
websocketUrl := fmt.Sprintf("ws%s", strings.TrimPrefix(ts.URL, "http"))
testServerURL, _ := url.Parse(websocketUrl)

// Calculate the expected Ably-Agent header used by tests.
expectedAgentHeader := AblySDKIdentifier + " " + GoRuntimeIdentifier + " " + GoOSIdentifier()

tests := map[string]struct {
dialProtocol string
expectedPayloadType byte
expectedSubprotocol []string
expectedResult *websocketConn
expectedErr error
dialProtocol string
expectedErr error
expectedProto proto
}{
"Can dial for protocol application/json": {
dialProtocol: "application/json",
expectedPayloadType: uint8(1),
expectedSubprotocol: []string(nil),
expectedErr: nil,
dialProtocol: "application/json",
expectedErr: nil,
expectedProto: jsonProto,
},
"Can dial for protocol application/x-msgpack": {
dialProtocol: "application/x-msgpack",
expectedPayloadType: uint8(1),
expectedSubprotocol: []string(nil),
expectedErr: nil,
dialProtocol: "application/x-msgpack",
expectedErr: nil,
expectedProto: msgpackProto,
},
"Can handle an error when dialing for an invalid protocol": {
dialProtocol: "aProtocol",
expectedResult: nil,
expectedErr: errors.New(`invalid protocol "aProtocol"`),
dialProtocol: "aProtocol",
expectedErr: errors.New(`invalid protocol "aProtocol"`),
},
}

Expand All @@ -131,11 +123,8 @@ func TestWebsocketDial(t *testing.T) {
assert.Equal(t, test.expectedErr, err)

if result != nil {
assert.NotNil(t, result.codec)
Jmgr marked this conversation as resolved.
Show resolved Hide resolved
assert.Equal(t, expectedAgentHeader, result.conn.Config().Header.Get("Ably-Agent"))
assert.Equal(t, test.expectedPayloadType, result.conn.PayloadType)
assert.Equal(t, test.expectedSubprotocol, result.conn.Config().Protocol)
assert.True(t, result.conn.IsClientConn())
assert.NotNil(t, result.conn)
assert.Equal(t, test.expectedProto, result.proto)
}
})
}
Expand All @@ -153,8 +142,6 @@ func TestWebsocketSendAndReceive(t *testing.T) {
tests := map[string]struct {
dialProtocol string
expectedMessageType string
expectedPayloadType byte
expectedResult *websocketConn
expectedErr error
}{
"Can send and receive a message using protocol application/json": {
Expand Down
2 changes: 1 addition & 1 deletion ablytest/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"
)

var Instantly = Before(10 * time.Millisecond)
var Instantly = Before(40 * time.Millisecond)
Jmgr marked this conversation as resolved.
Show resolved Hide resolved

var Soon = Before(Timeout)

Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/ably/ably-go
require (
github.com/stretchr/testify v1.7.1
github.com/ugorji/go/codec v1.1.9
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654
nhooyr.io/websocket v1.8.7
)
Expand Down
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,10 @@ github.com/ugorji/go v1.1.9/go.mod h1:chLrngdsg43geAaeId+nXO57YsDdl5OZqd/QtBiD19
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/ugorji/go/codec v1.1.9 h1:J/7hhpkQwgypRNvaeh/T5gzJ2gEI/l8S3qyRrdEa1fA=
github.com/ugorji/go/codec v1.1.9/go.mod h1:+SWgpdqOgdW5sBaiDfkHilQ1SxQ1hBkq/R+kHfL7Suo=
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f h1:OfiFi4JbukWwe3lzw+xunroH1mnC1e2Gy5cxNJApiSY=
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 h1:id054HUawV2/6IGm2IV8KZQjqtwAOo2CYlOToYqa0d0=
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
Expand Down