-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpipe.go
203 lines (178 loc) · 5.18 KB
/
pipe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Pipe adapter to connect code expecting an io.Reader
// with code expecting an io.Writer.
package ctxio
import (
"context"
"io"
"sync"
)
// onceError is an object that will only store an error once.
type onceError struct {
sync.Mutex // guards following
err error
}
func (a *onceError) Store(err error) {
a.Lock()
defer a.Unlock()
if a.err != nil {
return
}
a.err = err
}
func (a *onceError) Load() error {
a.Lock()
defer a.Unlock()
return a.err
}
// ErrClosedPipe is the error used for read or write operations on a closed pipe.
var ErrClosedPipe = io.ErrClosedPipe
// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
type pipe struct {
wrMu sync.Mutex // Serializes Write operations
wrCh chan []byte
rdCh chan int
once sync.Once // Protects closing done
done chan struct{}
rerr onceError
werr onceError
}
func (p *pipe) read(b []byte) (n int, err error) {
select {
case <-p.done:
return 0, p.readCloseError()
default:
}
select {
case bw := <-p.wrCh:
nr := copy(b, bw)
p.rdCh <- nr
return nr, nil
case <-p.done:
return 0, p.readCloseError()
}
}
func (p *pipe) closeRead(err error) error {
if err == nil {
err = ErrClosedPipe
}
p.rerr.Store(err)
p.once.Do(func() { close(p.done) })
return nil
}
func (p *pipe) write(b []byte) (n int, err error) {
select {
case <-p.done:
return 0, p.writeCloseError()
default:
p.wrMu.Lock()
defer p.wrMu.Unlock()
}
for once := true; once || len(b) > 0; once = false {
select {
case p.wrCh <- b:
nw := <-p.rdCh
b = b[nw:]
n += nw
case <-p.done:
return n, p.writeCloseError()
}
}
return n, nil
}
func (p *pipe) closeWrite(err error) error {
if err == nil {
err = EOF
}
p.werr.Store(err)
p.once.Do(func() { close(p.done) })
return nil
}
// readCloseError is considered internal to the pipe type.
func (p *pipe) readCloseError() error {
rerr := p.rerr.Load()
if werr := p.werr.Load(); rerr == nil && werr != nil {
return werr
}
return ErrClosedPipe
}
// writeCloseError is considered internal to the pipe type.
func (p *pipe) writeCloseError() error {
werr := p.werr.Load()
if rerr := p.rerr.Load(); werr == nil && rerr != nil {
return rerr
}
return ErrClosedPipe
}
// A PipeReader is the read half of a pipe.
type PipeReader struct{ pipe }
// Read implements the standard Read interface:
// it reads data from the pipe, blocking until a writer
// arrives or the write end is closed.
// If the write end is closed with an error, that error is
// returned as err; otherwise err is EOF.
func (r *PipeReader) Read(ctx context.Context, data []byte) (n int, err error) {
return r.pipe.read(data)
}
// Close closes the reader; subsequent writes to the
// write half of the pipe will return the error [ErrClosedPipe].
func (r *PipeReader) Close(ctx context.Context) error {
return r.CloseWithError(nil)
}
// CloseWithError closes the reader; subsequent writes
// to the write half of the pipe will return the error err.
//
// CloseWithError never overwrites the previous error if it exists
// and always returns nil.
func (r *PipeReader) CloseWithError(err error) error {
return r.pipe.closeRead(err)
}
// A PipeWriter is the write half of a pipe.
type PipeWriter struct{ r PipeReader }
// Write implements the standard Write interface:
// it writes data to the pipe, blocking until one or more readers
// have consumed all the data or the read end is closed.
// If the read end is closed with an error, that err is
// returned as err; otherwise err is [ErrClosedPipe].
func (w *PipeWriter) Write(ctx context.Context, data []byte) (n int, err error) {
return w.r.pipe.write(data)
}
// Close closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and EOF.
func (w *PipeWriter) Close(ctx context.Context) error {
return w.CloseWithError(nil)
}
// CloseWithError closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and the error err,
// or EOF if err is nil.
//
// CloseWithError never overwrites the previous error if it exists
// and always returns nil.
func (w *PipeWriter) CloseWithError(err error) error {
return w.r.pipe.closeWrite(err)
}
// Pipe creates a synchronous in-memory pipe.
// It can be used to connect code expecting an [io.Reader]
// with code expecting an [io.Writer].
//
// Reads and Writes on the pipe are matched one to one
// except when multiple Reads are needed to consume a single Write.
// That is, each Write to the [PipeWriter] blocks until it has satisfied
// one or more Reads from the [PipeReader] that fully consume
// the written data.
// The data is copied directly from the Write to the corresponding
// Read (or Reads); there is no internal buffering.
//
// It is safe to call Read and Write in parallel with each other or with Close.
// Parallel calls to Read and parallel calls to Write are also safe:
// the individual calls will be gated sequentially.
func Pipe() (*PipeReader, *PipeWriter) {
pw := &PipeWriter{r: PipeReader{pipe: pipe{
wrCh: make(chan []byte),
rdCh: make(chan int),
done: make(chan struct{}),
}}}
return &pw.r, pw
}