Skip to content

Commit

Permalink
internal/jsonrpc2_v2: remove “Box” suffix from channel field names
Browse files Browse the repository at this point in the history
With the suffixes I end up a little lost in the “box” noise while I'm
reading — the channel ops alone suffice to make the storage mechanism
clear.

(To me, the mechanism of storing a value in a 1-buffered channel is
conceptually similar to storing it in a pointer, atomic.Pointer, or
similar — and we don't generally name those with a suffix either.)

For golang/go#46047.
For golang/go#46520.
For golang/go#49387.

Change-Id: I7f58a9ac532f597fe49ed70606d89bd8cbe33b55
Reviewed-on: https://go-review.googlesource.com/c/tools/+/443355
TryBot-Result: Gopher Robot <gobot@golang.org>
Run-TryBot: Bryan Mills <bcmills@google.com>
Auto-Submit: Bryan Mills <bcmills@google.com>
Reviewed-by: Alan Donovan <adonovan@google.com>
gopls-CI: kokoro <noreply+kokoro@google.com>
  • Loading branch information
Bryan C. Mills authored and gopherbot committed Oct 17, 2022
1 parent fd32990 commit 5935531
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 48 deletions.
82 changes: 41 additions & 41 deletions internal/jsonrpc2_v2/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ type Connection struct {
closeOnce sync.Once
closer io.Closer

writerBox chan Writer
outgoingBox chan map[ID]chan<- *Response
incomingBox chan map[ID]*incoming
async *async
writer chan Writer
outgoing chan map[ID]chan<- *Response
incoming chan map[ID]*incoming
async *async
}

type AsyncCall struct {
id ID
response chan *Response // the channel a response will be delivered on
resultBox chan asyncResult
endSpan func() // close the tracing span when all processing for the message is complete
id ID
response chan *Response // the channel a response will be delivered on
result chan asyncResult
endSpan func() // close the tracing span when all processing for the message is complete
}

type asyncResult struct {
Expand All @@ -87,11 +87,11 @@ func (o ConnectionOptions) Bind(context.Context, *Connection) (ConnectionOptions
// This is used by the Dial and Serve functions to build the actual connection.
func newConnection(ctx context.Context, rwc io.ReadWriteCloser, binder Binder) (*Connection, error) {
c := &Connection{
closer: rwc,
writerBox: make(chan Writer, 1),
outgoingBox: make(chan map[ID]chan<- *Response, 1),
incomingBox: make(chan map[ID]*incoming, 1),
async: newAsync(),
closer: rwc,
writer: make(chan Writer, 1),
outgoing: make(chan map[ID]chan<- *Response, 1),
incoming: make(chan map[ID]*incoming, 1),
async: newAsync(),
}

options, err := binder.Bind(ctx, c)
Expand All @@ -107,8 +107,8 @@ func newConnection(ctx context.Context, rwc io.ReadWriteCloser, binder Binder) (
if options.Handler == nil {
options.Handler = defaultHandler{}
}
c.outgoingBox <- make(map[ID]chan<- *Response)
c.incomingBox <- make(map[ID]*incoming)
c.outgoing <- make(map[ID]chan<- *Response)
c.incoming <- make(map[ID]*incoming)
// the goroutines started here will continue until the underlying stream is closed
reader := options.Framer.Reader(rwc)
readToQueue := make(chan *incoming)
Expand All @@ -119,7 +119,7 @@ func newConnection(ctx context.Context, rwc io.ReadWriteCloser, binder Binder) (

// releaseing the writer must be the last thing we do in case any requests
// are blocked waiting for the connection to be ready
c.writerBox <- options.Framer.Writer(rwc)
c.writer <- options.Framer.Writer(rwc)
return c, nil
}

Expand Down Expand Up @@ -154,14 +154,14 @@ func (c *Connection) Notify(ctx context.Context, method string, params interface
// If sending the call failed, the response will be ready and have the error in it.
func (c *Connection) Call(ctx context.Context, method string, params interface{}) *AsyncCall {
result := &AsyncCall{
id: Int64ID(atomic.AddInt64(&c.seq, 1)),
resultBox: make(chan asyncResult, 1),
id: Int64ID(atomic.AddInt64(&c.seq, 1)),
result: make(chan asyncResult, 1),
}
// generate a new request identifier
call, err := NewCall(result.id, method, params)
if err != nil {
//set the result to failed
result.resultBox <- asyncResult{err: fmt.Errorf("marshaling call parameters: %w", err)}
result.result <- asyncResult{err: fmt.Errorf("marshaling call parameters: %w", err)}
return result
}
ctx, endSpan := event.Start(ctx, method,
Expand All @@ -175,7 +175,7 @@ func (c *Connection) Call(ctx context.Context, method string, params interface{}
// are racing the response.
// rchan is buffered in case the response arrives without a listener.
result.response = make(chan *Response, 1)
outgoing, ok := <-c.outgoingBox
outgoing, ok := <-c.outgoing
if !ok {
// If the call failed due to (say) an I/O error or broken pipe, attribute it
// as such. (If the error is nil, then the connection must have been shut
Expand All @@ -193,7 +193,7 @@ func (c *Connection) Call(ctx context.Context, method string, params interface{}
return result
}
outgoing[result.id] = result.response
c.outgoingBox <- outgoing
c.outgoing <- outgoing
// now we are ready to send
if err := c.write(ctx, call); err != nil {
// sending failed, we will never get a response, so deliver a fake one
Expand All @@ -212,8 +212,8 @@ func (a *AsyncCall) ID() ID { return a.id }
// returned, or a call that failed to send in the first place.
func (a *AsyncCall) IsReady() bool {
select {
case r := <-a.resultBox:
a.resultBox <- r
case r := <-a.result:
a.result <- r
return true
default:
return false
Expand All @@ -236,14 +236,14 @@ func (a *AsyncCall) Await(ctx context.Context, result interface{}) error {
r.result = response.Result
event.Label(ctx, tag.StatusCode.Of("OK"))
}
case r = <-a.resultBox:
case r = <-a.result:
// result already available
case <-ctx.Done():
event.Label(ctx, tag.StatusCode.Of("CANCELLED"))
return ctx.Err()
}
// refill the box for the next caller
a.resultBox <- r
a.result <- r
// and unpack the result
if r.err != nil {
return r.err
Expand All @@ -259,8 +259,8 @@ func (a *AsyncCall) Await(ctx context.Context, result interface{}) error {
// Respond must be called exactly once for any message for which a handler
// returns ErrAsyncResponse. It must not be called for any other message.
func (c *Connection) Respond(id ID, result interface{}, rerr error) error {
pending := <-c.incomingBox
defer func() { c.incomingBox <- pending }()
pending := <-c.incoming
defer func() { c.incoming <- pending }()
entry, found := pending[id]
if !found {
return nil
Expand All @@ -277,8 +277,8 @@ func (c *Connection) Respond(id ID, result interface{}, rerr error) error {
// not cause any messages that have not arrived yet with that ID to be
// cancelled.
func (c *Connection) Cancel(id ID) {
pending := <-c.incomingBox
defer func() { c.incomingBox <- pending }()
pending := <-c.incoming
defer func() { c.incoming <- pending }()
if entry, found := pending[id]; found && entry.cancel != nil {
entry.cancel()
entry.cancel = nil
Expand Down Expand Up @@ -310,10 +310,10 @@ func (c *Connection) readIncoming(ctx context.Context, reader Reader, toQueue ch
defer func() {
// Retire any outgoing requests that were still in flight.
// With the Reader no longer being processed, they necessarily cannot receive a response.
outgoing := <-c.outgoingBox
close(c.outgoingBox) // Prevent new outgoing requests, which would deadlock.
for id, responseBox := range outgoing {
responseBox <- &Response{ID: id, Error: err}
outgoing := <-c.outgoing
close(c.outgoing) // Prevent new outgoing requests, which would deadlock.
for id, response := range outgoing {
response <- &Response{ID: id, Error: err}
}

close(toQueue)
Expand Down Expand Up @@ -352,9 +352,9 @@ func (c *Connection) readIncoming(ctx context.Context, reader Reader, toQueue ch
// if the request is a call, add it to the incoming map so it can be
// cancelled by id
if msg.IsCall() {
pending := <-c.incomingBox
pending := <-c.incoming
pending[msg.ID] = entry
c.incomingBox <- pending
c.incoming <- pending
}
// send the message to the incoming queue
toQueue <- entry
Expand All @@ -368,10 +368,10 @@ func (c *Connection) readIncoming(ctx context.Context, reader Reader, toQueue ch

func (c *Connection) incomingResponse(msg *Response) {
var response chan<- *Response
if outgoing, ok := <-c.outgoingBox; ok {
if outgoing, ok := <-c.outgoing; ok {
response = outgoing[msg.ID]
delete(outgoing, msg.ID)
c.outgoingBox <- outgoing
c.outgoing <- outgoing
}
if response != nil {
response <- msg
Expand Down Expand Up @@ -468,8 +468,8 @@ func (c *Connection) deliverMessages(ctx context.Context, handler Handler, fromQ
func (c *Connection) reply(entry *incoming, result interface{}, rerr error) {
if entry.request.IsCall() {
// we have a call finishing, remove it from the incoming map
pending := <-c.incomingBox
defer func() { c.incomingBox <- pending }()
pending := <-c.incoming
defer func() { c.incoming <- pending }()
delete(pending, entry.request.ID)
}
if err := c.respond(entry, result, rerr); err != nil {
Expand Down Expand Up @@ -527,8 +527,8 @@ func (c *Connection) respond(entry *incoming, result interface{}, rerr error) er
// write is used by all things that write outgoing messages, including replies.
// it makes sure that writes are atomic
func (c *Connection) write(ctx context.Context, msg Message) error {
writer := <-c.writerBox
defer func() { c.writerBox <- writer }()
writer := <-c.writer
defer func() { c.writer <- writer }()
n, err := writer.Write(ctx, msg)
event.Metric(ctx, tag.SentBytes.Of(n))
return err
Expand Down
14 changes: 7 additions & 7 deletions internal/jsonrpc2_v2/jsonrpc2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type binder struct {
type handler struct {
conn *jsonrpc2.Connection
accumulator int
waitersBox chan map[string]chan struct{}
waiters chan map[string]chan struct{}
calls map[string]*jsonrpc2.AsyncCall
}

Expand Down Expand Up @@ -256,11 +256,11 @@ func verifyResults(t *testing.T, method string, results interface{}, expect inte

func (b binder) Bind(ctx context.Context, conn *jsonrpc2.Connection) (jsonrpc2.ConnectionOptions, error) {
h := &handler{
conn: conn,
waitersBox: make(chan map[string]chan struct{}, 1),
calls: make(map[string]*jsonrpc2.AsyncCall),
conn: conn,
waiters: make(chan map[string]chan struct{}, 1),
calls: make(map[string]*jsonrpc2.AsyncCall),
}
h.waitersBox <- make(map[string]chan struct{})
h.waiters <- make(map[string]chan struct{})
if b.runTest != nil {
go b.runTest(h)
}
Expand All @@ -272,8 +272,8 @@ func (b binder) Bind(ctx context.Context, conn *jsonrpc2.Connection) (jsonrpc2.C
}

func (h *handler) waiter(name string) chan struct{} {
waiters := <-h.waitersBox
defer func() { h.waitersBox <- waiters }()
waiters := <-h.waiters
defer func() { h.waiters <- waiters }()
waiter, found := waiters[name]
if !found {
waiter = make(chan struct{})
Expand Down

0 comments on commit 5935531

Please sign in to comment.