This repository has been archived by the owner on Mar 11, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 43
/
transport.go
247 lines (209 loc) · 5.7 KB
/
transport.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
package p9p
import (
"errors"
"fmt"
"log"
"net"
"sync"
"context"
)
// roundTripper manages the request and response from the client-side. A
// roundTripper must abide by similar rules to the http.RoundTripper.
// Typically, the roundTripper will manage tag assignment and message
// serialization.
type roundTripper interface {
send(ctx context.Context, msg Message) (Message, error)
}
// transport plays the role of being a client channel manager. It multiplexes
// function calls onto the wire and dispatches responses to blocking calls to
// send. On the whole, transport is thread-safe for calling send
type transport struct {
ctx context.Context
ch Channel
requests chan *fcallRequest
shutdown chan struct{}
once sync.Once // protect closure of shutdown
closed chan struct{}
tags uint16
}
var _ roundTripper = &transport{}
func newTransport(ctx context.Context, ch Channel) roundTripper {
t := &transport{
ctx: ctx,
ch: ch,
requests: make(chan *fcallRequest),
shutdown: make(chan struct{}),
closed: make(chan struct{}),
}
go t.handle()
return t
}
// fcallRequest encompasses the request to send a message via fcall.
type fcallRequest struct {
ctx context.Context
message Message
response chan *Fcall
err chan error
}
func newFcallRequest(ctx context.Context, msg Message) *fcallRequest {
return &fcallRequest{
ctx: ctx,
message: msg,
response: make(chan *Fcall, 1),
err: make(chan error, 1),
}
}
func (t *transport) send(ctx context.Context, msg Message) (Message, error) {
req := newFcallRequest(ctx, msg)
// dispatch the request.
select {
case <-t.closed:
return nil, ErrClosed
case <-ctx.Done():
return nil, ctx.Err()
case t.requests <- req:
}
// wait for the response.
select {
case <-t.closed:
return nil, ErrClosed
case <-ctx.Done():
return nil, ctx.Err()
case err := <-req.err:
return nil, err
case resp := <-req.response:
if resp.Type == Rerror {
// pack the error into something useful
respmesg, ok := resp.Message.(MessageRerror)
if !ok {
return nil, fmt.Errorf("invalid error response: %v", resp)
}
return nil, respmesg
}
return resp.Message, nil
}
}
// allocateTag returns a valid tag given a tag pool map. It receives a hint as
// to where to start the tag search. It returns an error if the allocation is
// not possible. The provided map must not contain NOTAG as a key.
func allocateTag(r *fcallRequest, m map[Tag]*fcallRequest, hint Tag) (Tag, error) {
// The tag pool is depleted if 65535 (0xFFFF) tags are taken.
if len(m) >= 0xFFFF {
return 0, errors.New("tag pool depleted")
}
// Look for the first tag that doesn't exist in the map and return it.
for i := 0; i < 0xFFFF; i++ {
hint++
if hint == NOTAG {
hint = 0
}
if _, exists := m[hint]; !exists {
return hint, nil
}
}
return 0, errors.New("allocateTag: unexpected error")
}
// handle takes messages off the wire and wakes up the waiting tag call.
func (t *transport) handle() {
defer func() {
close(t.closed)
}()
// the following variable block are protected components owned by this thread.
var (
responses = make(chan *Fcall)
// outstanding provides a map of tags to outstanding requests.
outstanding = map[Tag]*fcallRequest{}
selected Tag
)
// loop to read messages off of the connection
go func() {
defer func() {
t.close() // single main loop
}()
loop:
for {
fcall := new(Fcall)
if err := t.ch.ReadFcall(t.ctx, fcall); err != nil {
switch err := err.(type) {
case net.Error:
if err.Timeout() || err.Temporary() {
// BUG(stevvooe): There may be partial reads under
// timeout errors where this is actually fatal.
// can only retry if we haven't offset the frame.
continue loop
}
}
log.Println("p9p: fatal error reading msg:", err)
return
}
select {
case <-t.ctx.Done():
return
case <-t.closed:
return
case responses <- fcall:
}
}
}()
for {
select {
case req := <-t.requests:
var err error
selected, err = allocateTag(req, outstanding, selected)
if err != nil {
req.err <- err
continue
}
outstanding[selected] = req
fcall := newFcall(selected, req.message)
// TODO(stevvooe): Consider the case of requests that never
// receive a response. We need to remove the fcall context from
// the tag map and dealloc the tag. We may also want to send a
// flush for the tag.
if err := t.ch.WriteFcall(req.ctx, fcall); err != nil {
delete(outstanding, fcall.Tag)
req.err <- err
}
case b := <-responses:
req, ok := outstanding[b.Tag]
if !ok {
// BUG(stevvooe): The exact handling of an unknown tag is
// unclear at this point. These may not necessarily fatal to
// the session, since they could be messages that the client no
// longer cares for. When we figure this out, replace this
// panic with something more sensible.
panic(fmt.Sprintf("unknown tag received: %v", b))
}
// BUG(stevvooe): Must detect duplicate tag and ensure that we are
// waking up the right caller. If a duplicate is received, the
// entry should not be deleted.
delete(outstanding, b.Tag)
req.response <- b
// TODO(stevvooe): Reclaim tag id.
case <-t.shutdown:
return
case <-t.ctx.Done():
return
}
}
}
func (t *transport) flush(ctx context.Context, tag Tag) error {
// TODO(stevvooe): We need to fire and forget flush messages when a call
// context gets cancelled.
panic("not implemented")
}
func (t *transport) Close() error {
t.close()
select {
case <-t.closed:
return nil
case <-t.ctx.Done():
return t.ctx.Err()
}
}
// close starts the shutdown process.
func (t *transport) close() {
t.once.Do(func() {
close(t.shutdown)
})
}