Skip to content

Commit

Permalink
chore: delete 'stream' field (#9)
Browse files Browse the repository at this point in the history
* docs:delete 'stream' field

* chore:adapt standard.NewDialer()
  • Loading branch information
ViolaPioggia authored Nov 19, 2023
1 parent 1adf40e commit 95179ca
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 31 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func main() {
events := make(chan *sse.Event)
errChan := make(chan error)
go func() {
cErr := c.Subscribe("client1", func(msg *sse.Event) {
cErr := c.Subscribe(func(msg *sse.Event) {
if msg.Data != nil {
events <- msg
return
Expand Down Expand Up @@ -138,7 +138,7 @@ func main() {
events := make(chan *sse.Event)
errChan := make(chan error)
go func() {
cErr := c.Subscribe("client2", func(msg *sse.Event) {
cErr := c.Subscribe( func(msg *sse.Event) {
if msg.Data != nil {
events <- msg
return
Expand Down
4 changes: 2 additions & 2 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func main() {
events := make(chan *sse.Event)
errChan := make(chan error)
go func() {
cErr := c.Subscribe("client1", func(msg *sse.Event) {
cErr := c.Subscribe(func(msg *sse.Event) {
if msg.Data != nil {
events <- msg
return
Expand Down Expand Up @@ -136,7 +136,7 @@ func main() {
events := make(chan *sse.Event)
errChan := make(chan error)
go func() {
cErr := c.Subscribe("client2", func(msg *sse.Event) {
cErr := c.Subscribe(func(msg *sse.Event) {
if msg.Data != nil {
events <- msg
return
Expand Down
28 changes: 8 additions & 20 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"io"
"sync/atomic"

"github.com/cloudwego/hertz/pkg/network/standard"

"github.com/cloudwego/hertz/pkg/app/client"
"github.com/cloudwego/hertz/pkg/protocol"
"github.com/cloudwego/hertz/pkg/protocol/consts"
Expand Down Expand Up @@ -58,7 +60,7 @@ type Client struct {
LastEventID atomic.Value // []byte
}

var defaultClient, _ = client.NewClient(client.WithResponseBodyStream(true))
var defaultClient, _ = client.NewClient(client.WithDialer(standard.NewDialer()), client.WithResponseBodyStream(true))

// NewClient creates a new client
func NewClient(url string) *Client {
Expand All @@ -74,14 +76,14 @@ func NewClient(url string) *Client {
}

// Subscribe to a data stream
func (c *Client) Subscribe(stream string, handler func(msg *Event)) error {
return c.SubscribeWithContext(context.Background(), stream, handler)
func (c *Client) Subscribe(handler func(msg *Event)) error {
return c.SubscribeWithContext(context.Background(), handler)
}

// SubscribeWithContext to a data stream with context
func (c *Client) SubscribeWithContext(ctx context.Context, stream string, handler func(msg *Event)) error {
func (c *Client) SubscribeWithContext(ctx context.Context, handler func(msg *Event)) error {
req, resp := protocol.AcquireRequest(), protocol.AcquireResponse()
err := c.request(ctx, req, resp, stream)
err := c.request(ctx, req, resp)
if err != nil {
return err
}
Expand Down Expand Up @@ -158,16 +160,6 @@ func (c *Client) readLoop(ctx context.Context, reader *EventStreamReader, outCh
}
}

// SubscribeRaw to an sse endpoint
func (c *Client) SubscribeRaw(handler func(msg *Event)) error {
return c.Subscribe("", handler)
}

// SubscribeRawWithContext to an sse endpoint with context
func (c *Client) SubscribeRawWithContext(ctx context.Context, handler func(msg *Event)) error {
return c.SubscribeWithContext(ctx, "", handler)
}

// OnDisconnect specifies the function to run when the connection disconnects
func (c *Client) OnDisconnect(fn ConnCallback) {
c.disconnectCallback = fn
Expand All @@ -183,13 +175,9 @@ func (c *Client) SetMaxBufferSize(size int) {
c.maxBufferSize = size
}

func (c *Client) request(ctx context.Context, req *protocol.Request, resp *protocol.Response, stream string) error {
func (c *Client) request(ctx context.Context, req *protocol.Request, resp *protocol.Response) error {
req.SetMethod(c.Method)
req.SetRequestURI(c.URL)
// Setup request, specify stream to connect to
if stream != "" {
req.URI().QueryArgs().Add("stream", stream)
}

req.Header.Set("Cache-Control", "no-cache")
req.Header.Set("Accept", "text/event-stream")
Expand Down
10 changes: 5 additions & 5 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestClientSubscribe(t *testing.T) {
events := make(chan *Event)
var cErr error
go func() {
cErr = c.Subscribe("test", func(msg *Event) {
cErr = c.Subscribe(func(msg *Event) {
if msg.Data != nil {
events <- msg
return
Expand All @@ -182,7 +182,7 @@ func TestClientSubscribeMultiline(t *testing.T) {
var cErr error

go func() {
cErr = c.Subscribe("test", func(msg *Event) {
cErr = c.Subscribe(func(msg *Event) {
if msg.Data != nil {
events <- msg
return
Expand All @@ -209,7 +209,7 @@ func TestClientOnConnect(t *testing.T) {
called <- struct{}{}
})

go c.Subscribe("test", func(msg *Event) {})
go c.Subscribe(func(msg *Event) {})

time.Sleep(time.Second)
assert.DeepEqual(t, struct{}{}, <-called)
Expand All @@ -220,7 +220,7 @@ func TestClientUnsubscribe401(t *testing.T) {
time.Sleep(time.Second)
c := NewClient("http://127.0.0.1:9009/sse")

err := c.SubscribeRaw(func(ev *Event) {
err := c.Subscribe(func(ev *Event) {
// this shouldn't run
assert.False(t, true)
})
Expand All @@ -241,7 +241,7 @@ func TestClientLargeData(t *testing.T) {
ec := make(chan *Event, 1)

go func() {
c.Subscribe("test", func(ev *Event) {
c.Subscribe(func(ev *Event) {
ec <- ev
})
}()
Expand Down
4 changes: 2 additions & 2 deletions examples/client/quickstart/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func main() {
events := make(chan *sse.Event)
errChan := make(chan error)
go func() {
cErr := c.Subscribe("client1", func(msg *sse.Event) {
cErr := c.Subscribe(func(msg *sse.Event) {
if msg.Data != nil {
events <- msg
return
Expand Down Expand Up @@ -101,7 +101,7 @@ func main() {
events := make(chan *sse.Event)
errChan := make(chan error)
go func() {
cErr := c.Subscribe("client2", func(msg *sse.Event) {
cErr := c.Subscribe(func(msg *sse.Event) {
if msg.Data != nil {
events <- msg
return
Expand Down

0 comments on commit 95179ca

Please sign in to comment.