-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconn.go
292 lines (257 loc) · 7.74 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
package rwproxy
import (
"context"
"database/sql"
"database/sql/driver"
"errors"
"fmt"
"strings"
)
// ConnCloseError is provided when conn.Close() fails, encapsulating errors from one or both proxied connections
type ConnCloseError struct {
errors []error
}
func (e ConnCloseError) Error() string {
es := make([]string, len(e.errors))
for i, err := range e.errors {
es[i] = err.Error()
}
return fmt.Sprintf("rwproxy: failed to close %d proxied connections: %s", len(e.errors), strings.Join(es, ", "))
}
// ErrConnBeginTxUnsupported is provided when conn.BeginTx() is used but not supported by the underlying driver
var ErrConnBeginTxUnsupported = errors.New("rwproxy: driver doesn't support BeginTx")
// ErrUnexpectedTxClose is provided when a closed proxied transaction is not currently expected by the connection
var ErrUnexpectedTxClose = errors.New("rwproxy: unexpected proxied transaction close")
// conn is a virtual conneciton to a read/write cluster of connections
type conn struct {
driver *Driver
writerDSN string
readerDSNs []string
writerConn *proxiedConn
readerConn *proxiedConn
tx *tx
}
func (c *conn) writer(ctx context.Context) (*proxiedConn, error) {
if c.tx != nil {
return c.tx.driverConn, nil
}
var err error
if c.writerConn == nil {
c.driver.debugf("opening writer connection to: %s", c.writerDSN)
pc, err := c.driver.proxiedDriver.Open(c.writerDSN)
if err != nil {
return nil, err
}
c.writerConn = &proxiedConn{Conn: pc, role: "writer"}
return c.writerConn, nil
}
return c.writerConn, err
}
func (c *conn) reader(ctx context.Context) (*proxiedConn, error) {
if c.tx != nil {
return c.tx.driverConn, nil
}
var err error
if c.readerConn == nil {
// if there's no readers, signal the caller to use a writer instead
if len(c.readerDSNs) == 0 {
c.driver.debugf("no readers specified; substituting with writer")
c.readerConn, err = c.writer(ctx)
return c.readerConn, err
}
// pick a reader
c.driver.debugf("selecting reader connection from: [ %s ]", strings.Join(c.readerDSNs, "; "))
pc, err := c.driver.selector(ctx, c.driver.proxiedDriver, c.readerDSNs)
if err != nil {
// fall back to signalling the caller to use a writer instead
c.driver.debugf("no readers available; substituting with writer: %s", err)
c.readerConn, err = c.writer(ctx)
return c.readerConn, err
}
c.readerConn = &proxiedConn{Conn: pc, role: "reader"}
}
return c.readerConn, err
}
// Prepare returns a lazily prepared statement, not yet bound to an underlying connection
func (c *conn) Prepare(query string) (driver.Stmt, error) {
c.driver.debugf("preparing: %s", query)
return newStmt(c, query), nil
}
// Close closes the underlying reader and writer connections
func (c *conn) Close() error {
errs := []error{}
if c.writerConn != nil {
c.driver.debugf("closing writer")
if err := c.writerConn.Close(); err != nil {
errs = append(errs, err)
}
}
if c.readerConn != nil && c.readerConn != c.writerConn {
c.driver.debugf("closing reader")
if err := c.readerConn.Close(); err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return ConnCloseError{errors: errs}
}
return nil
}
// Begin starts and returns a new transaction
func (c *conn) Begin() (driver.Tx, error) {
if c.tx != nil {
// already in a transaction
c.driver.debugf("begin called while already in a transaction")
return nil, driver.ErrBadConn
}
w, err := c.writer(context.Background())
if err != nil {
return nil, err
}
wtx, err := w.Begin()
if err != nil {
return nil, err
}
c.tx = &tx{conn: c, driverConn: w, proxiedTx: wtx}
return c.tx, nil
}
// BeginTx starts and returns a new transaction
func (c *conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
if c.tx != nil {
// already in a transaction
c.driver.debugf("begin called while already in a transaction")
return nil, driver.ErrBadConn
}
// read only transactions can be sent to a reader
if opts.ReadOnly {
c.driver.debugf("begin readonly transaction; using reader")
r, err := c.reader(ctx)
if err == nil {
if err := c.beginTx(ctx, r, opts); err == nil {
// transacting on the reader
return c.tx, nil
}
}
// if any part of the reader transaction setup fails, fall back to the writer
}
// by default, force transactions to the writer
w, err := c.writer(ctx)
if err != nil {
return nil, err
}
if err = c.beginTx(ctx, w, opts); err == nil {
// transacting on the writer
return c.tx, nil
}
if err == ErrConnBeginTxUnsupported && opts.Isolation == driver.IsolationLevel(sql.LevelDefault) {
// if options aren't used, try falling back to plain .Begin()
return c.Begin()
}
return nil, err
}
func (c *conn) beginTx(ctx context.Context, pc *proxiedConn, opts driver.TxOptions) error {
if b, ok := pc.Conn.(driver.ConnBeginTx); ok {
dtx, err := b.BeginTx(ctx, opts)
if err != nil {
return err
}
// no errors, use the reader transaction
c.tx = &tx{conn: c, driverConn: pc, proxiedTx: dtx}
return nil
}
return ErrConnBeginTxUnsupported
}
func (c *conn) closeTx(closed *tx) error {
if c.tx == closed {
c.tx = nil
return nil
}
c.driver.debugf("closed tx mismatch: expected %v; got %v", c.tx, closed)
return ErrUnexpectedTxClose
}
// PrepareContext returns a lazily prepared statement, not yet bound to an underlying connection
func (c *conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
// Check the context now (as the statement will be prepared lazily) and return if it's expired
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
c.driver.debugf("preparing: %s", query)
return newStmt(c, query), nil
}
// Exec attempts to fast-path conn.Exec() against the writer
func (c *conn) Exec(query string, args []driver.Value) (driver.Result, error) {
w, err := c.writer(context.Background())
if err != nil {
return nil, err
}
if e, ok := w.Conn.(driver.Execer); ok {
return e.Exec(query, args)
}
return nil, driver.ErrSkip
}
// ExecContext attempts to fast-path conn.ExecContext() against the writer
func (c *conn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
// Exec always goes to the writer
w, err := c.writer(ctx)
if err != nil {
return nil, err
}
if e, ok := w.Conn.(driver.ExecerContext); ok {
return e.ExecContext(ctx, query, args)
}
return nil, driver.ErrSkip
}
// Ping forces writer and reader connections to be established and verified
func (c *conn) Ping(ctx context.Context) error {
// Ping all subconnections (so they can be reconnected if necessary)
w, err := c.writer(ctx)
if err != nil {
return err
}
if err := ping(ctx, w); err != nil {
return err
}
r, err := c.reader(ctx)
if err != nil {
return err
}
if c.readerConn != c.writerConn {
// only ping the reader if it's a different connection to the writer
if err := ping(ctx, r); err != nil {
return err
}
}
return nil
}
// Query attempts to fast-path conn.Query() against the reader
func (c *conn) Query(query string, args []driver.Value) (driver.Rows, error) {
// Query always goes to the reader
w, err := c.reader(context.Background())
if err != nil {
return nil, err
}
if e, ok := w.Conn.(driver.Queryer); ok {
return e.Query(query, args)
}
return nil, driver.ErrSkip
}
// QueryContext attempts to fast-path conn.QueryContext() against the reader
func (c *conn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
// Query always goes to the reader
w, err := c.reader(ctx)
if err != nil {
return nil, err
}
if e, ok := w.Conn.(driver.QueryerContext); ok {
return e.QueryContext(ctx, query, args)
}
return nil, driver.ErrSkip
}
func ping(ctx context.Context, conn driver.Conn) error {
if p, ok := conn.(driver.Pinger); ok {
return p.Ping(ctx)
}
return nil
}