forked from skeema/tengo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
instance.go
1176 lines (1108 loc) · 42.6 KB
/
instance.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package tengo
import (
"context"
"database/sql"
"fmt"
"net/url"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/VividCortex/mysqlerr"
"github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"golang.org/x/sync/errgroup"
)
// Instance represents a single database server running on a specific host or address.
type Instance struct {
BaseDSN string // DSN ending in trailing slash; i.e. no schema name or params
Driver string
User string
Password string
Host string
Port int
SocketPath string
defaultParams map[string]string
connectionPool map[string]*sqlx.DB // key is in format "schema?params"
*sync.RWMutex // protects connectionPool for concurrent operations
flavor Flavor
version [3]int
}
// NewInstance returns a pointer to a new Instance corresponding to the
// supplied driver and dsn. Currently only "mysql" driver is supported.
// dsn should be formatted according to driver specifications. If it contains
// a schema name, it will be ignored. If it contains any params, they will be
// applied as default params to all connections (in addition to whatever is
// supplied in Connect).
func NewInstance(driver, dsn string) (*Instance, error) {
if driver != "mysql" {
return nil, fmt.Errorf("Unsupported driver \"%s\"", driver)
}
base := baseDSN(dsn)
params := paramMap(dsn)
parsedConfig, err := mysql.ParseDSN(dsn)
if err != nil {
return nil, err
}
instance := &Instance{
BaseDSN: base,
Driver: driver,
User: parsedConfig.User,
Password: parsedConfig.Passwd,
defaultParams: params,
connectionPool: make(map[string]*sqlx.DB),
flavor: FlavorUnknown,
RWMutex: new(sync.RWMutex),
}
switch parsedConfig.Net {
case "unix":
instance.Host = "localhost"
instance.SocketPath = parsedConfig.Addr
case "cloudsql":
instance.Host = parsedConfig.Addr
default:
instance.Host, instance.Port, err = SplitHostOptionalPort(parsedConfig.Addr)
if err != nil {
return nil, err
}
}
return instance, nil
}
// String for an instance returns a "host:port" string (or "localhost:/path/to/socket"
// if using UNIX domain socket)
func (instance *Instance) String() string {
if instance.SocketPath != "" {
return fmt.Sprintf("%s:%s", instance.Host, instance.SocketPath)
} else if instance.Port == 0 {
return instance.Host
} else {
return fmt.Sprintf("%s:%d", instance.Host, instance.Port)
}
}
// HostAndOptionalPort is like String(), but omits the port if default
func (instance *Instance) HostAndOptionalPort() string {
if instance.Port == 3306 || instance.SocketPath != "" {
return instance.Host
}
return instance.String()
}
func (instance *Instance) buildParamString(params string) string {
v := url.Values{}
for defName, defValue := range instance.defaultParams {
v.Set(defName, defValue)
}
overrides, _ := url.ParseQuery(params)
for name := range overrides {
v.Set(name, overrides.Get(name))
}
return v.Encode()
}
// Connect returns a connection pool (sql.DB) for this instance's host/port/
// user/pass with the supplied default schema and params string. If a connection
// pool already exists for this combination, it will be returned; otherwise, one
// will be initialized and a connection attempt is made to confirm access.
// defaultSchema may be "" if it is not relevant.
// params should be supplied in format "foo=bar&fizz=buzz" with URL escaping
// already applied. Do not include a prefix of "?". params will be merged with
// instance.defaultParams, with params supplied here taking precedence.
// To avoid problems with unexpected disconnection, the connection pool will
// automatically have a max conn lifetime of at most 30sec, or less if a lower
// session-level wait_timeout was set in params or instance.defaultParams.
func (instance *Instance) Connect(defaultSchema string, params string) (*sqlx.DB, error) {
fullParams := instance.buildParamString(params)
key := fmt.Sprintf("%s?%s", defaultSchema, fullParams)
instance.RLock()
pool, ok := instance.connectionPool[key]
instance.RUnlock()
if ok {
return pool, nil
}
fullDSN := instance.BaseDSN + key
db, err := sqlx.Connect(instance.Driver, fullDSN)
if err != nil {
return nil, err
}
// Determine max conn lifetime, ensuring it is less than wait_timeout. If
// wait_timeout wasn't supplied explicitly in params, query it from the server.
// Then set conn lifetime to a value less than wait_timeout, but no less than
// 900ms and no more than 30s.
maxLifetime := 30 * time.Second
parsedParams, _ := url.ParseQuery(fullParams)
waitTimeout, _ := strconv.Atoi(parsedParams.Get("wait_timeout"))
if waitTimeout == 0 {
// Ignoring errors here, since this will keep maxLifetime at 30s sane default
db.QueryRow("SELECT @@wait_timeout").Scan(&waitTimeout)
}
if waitTimeout > 1 && waitTimeout <= 30 {
maxLifetime = time.Duration(waitTimeout-1) * time.Second
} else if waitTimeout == 1 {
maxLifetime = 900 * time.Millisecond
}
db.SetConnMaxLifetime(maxLifetime)
instance.Lock()
defer instance.Unlock()
instance.connectionPool[key] = db.Unsafe()
return instance.connectionPool[key], nil
}
// CanConnect verifies that the Instance can be connected to
func (instance *Instance) CanConnect() (bool, error) {
var err error
instance.Lock()
// To ensure we're initializing a new connection, if a conn pool already exists
// with the setup we want (no default db, only defaultParams for args), force
// it to close idle connections and then make an explicit Conn. Otherwise, go
// through Instance.Connect, which also verifies connectivity by making a new
// pool.
key := fmt.Sprintf("?%s", instance.buildParamString(""))
if db, ok := instance.connectionPool[key]; ok {
db.SetMaxIdleConns(0)
var conn *sql.Conn
conn, err = db.Conn(context.Background())
if conn != nil {
conn.Close()
}
db.SetMaxIdleConns(2) // default in database/sql, current as of Go 1.11
instance.Unlock()
} else {
instance.Unlock()
_, err = instance.Connect("", "")
}
return err == nil, err
}
// CloseAll closes all of instance's connection pools. This can be useful for
// graceful shutdown, to avoid aborted-connection counters/logging in some
// versions of MySQL.
func (instance *Instance) CloseAll() {
instance.Lock()
for key, db := range instance.connectionPool {
db.Close()
delete(instance.connectionPool, key)
}
instance.Unlock()
}
// Flavor returns this instance's flavor value, representing the database
// distribution/fork/vendor as well as major and minor version. If this is
// unable to be determined or an error occurs, FlavorUnknown will be returned.
func (instance *Instance) Flavor() Flavor {
if instance.flavor == FlavorUnknown {
instance.hydrateFlavorAndVersion()
}
return instance.flavor
}
// SetFlavor attempts to set this instance's flavor value. If the instance's
// flavor has already been hydrated successfully, the value is not changed and
// an error is returned.
func (instance *Instance) SetFlavor(flavor Flavor) error {
if instance.flavor.Known() {
return fmt.Errorf("SetFlavor: instance %s already detected as flavor %s", instance, instance.flavor)
}
instance.ForceFlavor(flavor)
return nil
}
// ForceFlavor overrides this instance's flavor value. Only tests should call
// this method directly; all other callers should use SetFlavor instead and
// check the error return value.
func (instance *Instance) ForceFlavor(flavor Flavor) {
instance.flavor = flavor
instance.version = [3]int{flavor.Major, flavor.Minor, 0}
}
// Version returns three ints representing the database's major, minor, and
// patch version, respectively. If this is unable to be determined, all 0's
// will be returned.
func (instance *Instance) Version() (int, int, int) {
if instance.version[0] == 0 {
instance.hydrateFlavorAndVersion()
}
return instance.version[0], instance.version[1], instance.version[2]
}
func (instance *Instance) hydrateFlavorAndVersion() {
db, err := instance.Connect("", "")
if err != nil {
return
}
var vendorString, versionString string
if err = db.QueryRow("SELECT @@global.version_comment, @@global.version").Scan(&vendorString, &versionString); err != nil {
return
}
instance.version = ParseVersion(versionString)
instance.flavor = NewFlavor(vendorString, instance.version[0], instance.version[1])
// If the vendor could not be parsed from @@global.version_comment, try again
// using version string
if instance.flavor.Vendor == VendorUnknown {
instance.flavor = NewFlavor(versionString, instance.version[0], instance.version[1])
}
}
// SchemaNames returns a slice of all schema name strings on the instance
// visible to the user. System schemas are excluded.
func (instance *Instance) SchemaNames() ([]string, error) {
db, err := instance.Connect("information_schema", "")
if err != nil {
return nil, err
}
var result []string
query := `
SELECT schema_name
FROM schemata
WHERE schema_name NOT IN ('information_schema', 'performance_schema', 'mysql', 'test', 'sys')`
if err := db.Select(&result, query); err != nil {
return nil, err
}
return result, nil
}
// Schemas returns a slice of schemas on the instance visible to the user. If
// called with no args, all non-system schemas will be returned. Or pass one or
// more schema names as args to filter the result to just those schemas.
// Note that the ordering of the resulting slice is not guaranteed.
func (instance *Instance) Schemas(onlyNames ...string) ([]*Schema, error) {
db, err := instance.Connect("information_schema", "")
if err != nil {
return nil, err
}
var rawSchemas []struct {
Name string `db:"schema_name"`
CharSet string `db:"default_character_set_name"`
Collation string `db:"default_collation_name"`
}
var args []interface{}
var query string
// Note on these queries: MySQL 8.0 changes information_schema column names to
// come back from queries in all caps, so we need to explicitly use AS clauses
// in order to get them back as lowercase and have sqlx Select() work
if len(onlyNames) == 0 {
query = `
SELECT schema_name AS schema_name, default_character_set_name AS default_character_set_name,
default_collation_name AS default_collation_name
FROM schemata
WHERE schema_name NOT IN ('information_schema', 'performance_schema', 'mysql', 'test', 'sys')`
} else {
query = `
SELECT schema_name AS schema_name, default_character_set_name AS default_character_set_name,
default_collation_name AS default_collation_name
FROM schemata
WHERE schema_name IN (?)`
query, args, err = sqlx.In(query, onlyNames)
}
if err := db.Select(&rawSchemas, query, args...); err != nil {
return nil, err
}
schemas := make([]*Schema, len(rawSchemas))
for n, rawSchema := range rawSchemas {
schemas[n] = &Schema{
Name: rawSchema.Name,
CharSet: rawSchema.CharSet,
Collation: rawSchema.Collation,
}
if schemas[n].Tables, err = instance.querySchemaTables(rawSchema.Name); err != nil {
return nil, err
}
if schemas[n].Routines, err = instance.querySchemaRoutines(rawSchema.Name); err != nil {
return nil, err
}
}
return schemas, nil
}
// SchemasByName returns a map of schema name string to *Schema. If
// called with no args, all non-system schemas will be returned. Or pass one or
// more schema names as args to filter the result to just those schemas.
func (instance *Instance) SchemasByName(onlyNames ...string) (map[string]*Schema, error) {
schemas, err := instance.Schemas(onlyNames...)
if err != nil {
return nil, err
}
result := make(map[string]*Schema, len(schemas))
for _, s := range schemas {
result[s.Name] = s
}
return result, nil
}
// Schema returns a single schema by name. If the schema does not exist, nil
// will be returned along with a sql.ErrNoRows error.
func (instance *Instance) Schema(name string) (*Schema, error) {
schemas, err := instance.Schemas(name)
if err != nil {
return nil, err
} else if len(schemas) == 0 {
return nil, sql.ErrNoRows
}
return schemas[0], nil
}
// HasSchema returns true if this instance has a schema with the supplied name
// visible to the user, or false otherwise. An error result will only be
// returned if a connection or query failed entirely and we weren't able to
// determine whether the schema exists.
func (instance *Instance) HasSchema(name string) (bool, error) {
db, err := instance.Connect("information_schema", "")
if err != nil {
return false, err
}
var exists int
query := `
SELECT 1
FROM schemata
WHERE schema_name = ?`
err = db.Get(&exists, query, name)
if err == nil {
return true, nil
} else if err == sql.ErrNoRows {
return false, nil
} else {
return false, err
}
}
// ShowCreateTable returns a string with a CREATE TABLE statement, representing
// how the instance views the specified table as having been created.
func (instance *Instance) ShowCreateTable(schema, table string) (string, error) {
db, err := instance.Connect(schema, "")
if err != nil {
return "", err
}
return showCreateTable(db, table)
}
func showCreateTable(db *sqlx.DB, table string) (string, error) {
var createRows []struct {
TableName string `db:"Table"`
CreateStatement string `db:"Create Table"`
}
query := fmt.Sprintf("SHOW CREATE TABLE %s", EscapeIdentifier(table))
if err := db.Select(&createRows, query); err != nil {
return "", err
}
if len(createRows) != 1 {
return "", sql.ErrNoRows
}
return createRows[0].CreateStatement, nil
}
// TableSize returns an estimate of the table's size on-disk, based on data in
// information_schema. If the table or schema does not exist on this instance,
// the error will be sql.ErrNoRows.
// Please note that use of innodb_stats_persistent may negatively impact the
// accuracy. For example, see https://bugs.mysql.com/bug.php?id=75428.
func (instance *Instance) TableSize(schema, table string) (int64, error) {
var result int64
db, err := instance.Connect("information_schema", "")
if err != nil {
return 0, err
}
err = db.Get(&result, `
SELECT data_length + index_length + data_free
FROM tables
WHERE table_schema = ? and table_name = ?`,
schema, table)
return result, err
}
// TableHasRows returns true if the table has at least one row. If an error
// occurs in querying, also returns true (along with the error) since a false
// positive is generally less dangerous in this case than a false negative.
func (instance *Instance) TableHasRows(schema, table string) (bool, error) {
db, err := instance.Connect(schema, "")
if err != nil {
return true, err
}
return tableHasRows(db, table)
}
func tableHasRows(db *sqlx.DB, table string) (bool, error) {
var result []int
query := fmt.Sprintf("SELECT 1 FROM %s LIMIT 1", EscapeIdentifier(table))
if err := db.Select(&result, query); err != nil {
return true, err
}
return len(result) != 0, nil
}
// CreateSchema creates a new database schema with the supplied name, and
// optionally the supplied default charSet and collation. (Leave charSet and
// collation blank to use server defaults.)
func (instance *Instance) CreateSchema(name, charSet, collation string) (*Schema, error) {
db, err := instance.Connect("", "")
if err != nil {
return nil, err
}
// Technically the server defaults would be used anyway if these are left
// blank, but we need the returned Schema value to reflect the correct values,
// and we can avoid re-querying this way
if charSet == "" || collation == "" {
defCharSet, defCollation, err := instance.DefaultCharSetAndCollation()
if err != nil {
return nil, err
}
if charSet == "" {
charSet = defCharSet
}
if collation == "" {
collation = defCollation
}
}
schema := &Schema{
Name: name,
CharSet: charSet,
Collation: collation,
Tables: []*Table{},
}
_, err = db.Exec(schema.CreateStatement())
if err != nil {
return nil, err
}
return schema, nil
}
// DropSchema first drops all tables in the schema, and then drops the database
// schema itself. If onlyIfEmpty==true, returns an error if any of the tables
// have any rows.
func (instance *Instance) DropSchema(schema string, onlyIfEmpty bool) error {
err := instance.DropTablesInSchema(schema, onlyIfEmpty)
if err != nil {
return err
}
// No need to actually obtain the fully hydrated schema value; we already know
// it has no tables after the call above, and the schema's name alone is
// sufficient to call Schema.DropStatement() to generate the necessary SQL
s := &Schema{
Name: schema,
}
db, err := instance.Connect("", "")
if err != nil {
return err
}
_, err = db.Exec(s.DropStatement())
if err != nil {
return err
}
prefix := fmt.Sprintf("%s?", schema)
instance.Lock()
for key, connPool := range instance.connectionPool {
if strings.HasPrefix(key, prefix) {
connPool.Close()
delete(instance.connectionPool, key)
}
}
instance.Unlock()
return nil
}
// AlterSchema changes the character set and/or collation of the supplied schema
// on instance. Supply an empty string for newCharSet to only change the
// collation, or supply an empty string for newCollation to use the default
// collation of newCharSet. (Supplying an empty string for both is also allowed,
// but is a no-op.)
func (instance *Instance) AlterSchema(schema, newCharSet, newCollation string) error {
s, err := instance.Schema(schema)
if err != nil {
return err
}
statement := s.AlterStatement(newCharSet, newCollation)
if statement == "" {
return nil
}
db, err := instance.Connect("", "")
if err != nil {
return err
}
if _, err = db.Exec(statement); err != nil {
return err
}
return nil
}
// DropTablesInSchema drops all tables in a schema. If onlyIfEmpty==true,
// returns an error if any of the tables have any rows.
func (instance *Instance) DropTablesInSchema(schema string, onlyIfEmpty bool) error {
db, err := instance.Connect(schema, "foreign_key_checks=0")
if err != nil {
return err
}
// Obtain table names directly; faster than going through instance.Schema(schema)
// since we don't need other info besides the names
var names []string
query := `
SELECT table_name
FROM information_schema.tables
WHERE table_schema = ?
AND table_type = 'BASE TABLE'`
if err := db.Select(&names, query, schema); err != nil {
return err
} else if len(names) == 0 {
return nil
}
var g errgroup.Group
defer db.SetMaxOpenConns(0)
if onlyIfEmpty {
db.SetMaxOpenConns(15)
for _, name := range names {
name := name
g.Go(func() error {
hasRows, err := tableHasRows(db, name)
if err == nil && hasRows {
err = fmt.Errorf("DropTablesInSchema: table %s.%s has at least one row", EscapeIdentifier(schema), EscapeIdentifier(name))
}
return err
})
}
if err := g.Wait(); err != nil {
return err
}
}
db.SetMaxOpenConns(10)
retries := make(chan string, len(names))
for _, name := range names {
name := name
g.Go(func() error {
_, err := db.Exec(fmt.Sprintf("DROP TABLE %s", EscapeIdentifier(name)))
// With the new data dictionary added in MySQL 8.0, attempting to
// concurrently drop two tables that have a foreign key constraint between
// them can deadlock.
if IsDatabaseError(err, mysqlerr.ER_LOCK_DEADLOCK) {
retries <- name
err = nil
}
return err
})
}
err = g.Wait()
close(retries)
for name := range retries {
if _, err := db.Exec(fmt.Sprintf("DROP TABLE %s", EscapeIdentifier(name))); err != nil {
return err
}
}
return err
}
// DefaultCharSetAndCollation returns the instance's default character set and
// collation
func (instance *Instance) DefaultCharSetAndCollation() (serverCharSet, serverCollation string, err error) {
db, err := instance.Connect("information_schema", "")
if err != nil {
return
}
err = db.QueryRow("SELECT @@global.character_set_server, @@global.collation_server").Scan(&serverCharSet, &serverCollation)
return
}
// StrictModeCompliant returns true if all tables in the supplied schemas,
// if re-created on instance, would comply with innodb_strict_mode and a
// sql_mode including STRICT_TRANS_TABLES,NO_ZERO_DATE.
// This method does not currently detect invalid-but-nonzero dates in default
// values, although it may in the future.
func (instance *Instance) StrictModeCompliant(schemas []*Schema) (bool, error) {
var hasFilePerTable, hasBarracuda, alreadyPopulated bool
getFormatVars := func() (fpt, barracuda bool, err error) {
if alreadyPopulated {
return hasFilePerTable, hasBarracuda, nil
}
db, err := instance.Connect("", "")
if err != nil {
return false, false, err
}
var ifpt, iff string
if instance.Flavor().HasInnoFileFormat() {
err = db.QueryRow("SELECT @@global.innodb_file_per_table, @@global.innodb_file_format").Scan(&ifpt, &iff)
hasBarracuda = (strings.ToLower(iff) == "barracuda")
} else {
err = db.QueryRow("SELECT @@global.innodb_file_per_table").Scan(&ifpt)
hasBarracuda = true
}
hasFilePerTable = (ifpt == "1")
alreadyPopulated = (err == nil)
return hasFilePerTable, hasBarracuda, err
}
for _, s := range schemas {
for _, t := range s.Tables {
for _, c := range t.Columns {
if strings.HasPrefix(c.TypeInDB, "timestamp") || strings.HasPrefix(c.TypeInDB, "date") {
if strings.HasPrefix(c.Default.Value, "0000-00-00") {
return false, nil
}
}
}
if format := t.RowFormatClause(); format != "" {
needFilePerTable, needBarracuda := instance.Flavor().InnoRowFormatReqs(format)
if needFilePerTable || needBarracuda {
haveFilePerTable, haveBarracuda, err := getFormatVars()
if err != nil {
return false, err
}
if (needFilePerTable && !haveFilePerTable) || (needBarracuda && !haveBarracuda) {
return false, nil
}
}
}
}
}
return true, nil
}
func (instance *Instance) querySchemaTables(schema string) ([]*Table, error) {
db, err := instance.Connect("information_schema", "")
if err != nil {
return nil, err
}
// Obtain flavor and version info. MariaDB changed how default values are
// represented in information_schema in 10.2+.
flavor := instance.Flavor()
// Note on these queries: MySQL 8.0 changes information_schema column names to
// come back from queries in all caps, so we need to explicitly use AS clauses
// in order to get them back as lowercase and have sqlx Select() work
// Obtain the tables in the schema
var rawTables []struct {
Name string `db:"table_name"`
Type string `db:"table_type"`
Engine sql.NullString `db:"engine"`
AutoIncrement sql.NullInt64 `db:"auto_increment"`
TableCollation sql.NullString `db:"table_collation"`
CreateOptions sql.NullString `db:"create_options"`
Comment string `db:"table_comment"`
CharSet string `db:"character_set_name"`
CollationIsDefault string `db:"is_default"`
}
query := `
SELECT t.table_name AS table_name, t.table_type AS table_type, t.engine AS engine,
t.auto_increment AS auto_increment, t.table_collation AS table_collation,
UPPER(t.create_options) AS create_options, t.table_comment AS table_comment,
c.character_set_name AS character_set_name, c.is_default AS is_default
FROM tables t
JOIN collations c ON t.table_collation = c.collation_name
WHERE t.table_schema = ?
AND t.table_type = 'BASE TABLE'`
if err := db.Select(&rawTables, query, schema); err != nil {
return nil, fmt.Errorf("Error querying information_schema.tables for schema %s: %s", schema, err)
}
if len(rawTables) == 0 {
return []*Table{}, nil
}
tables := make([]*Table, len(rawTables))
for n, rawTable := range rawTables {
tables[n] = &Table{
Name: rawTable.Name,
Engine: rawTable.Engine.String,
CharSet: rawTable.CharSet,
Collation: rawTable.TableCollation.String,
CollationIsDefault: rawTable.CollationIsDefault != "",
Comment: rawTable.Comment,
}
if rawTable.AutoIncrement.Valid {
tables[n].NextAutoIncrement = uint64(rawTable.AutoIncrement.Int64)
}
if rawTable.CreateOptions.Valid && rawTable.CreateOptions.String != "" && rawTable.CreateOptions.String != "PARTITIONED" {
// information_schema.tables.create_options annoyingly contains "partitioned"
// if the table is partitioned, despite this not being present as-is in the
// table table definition. All other create_options are present verbatim.
// Currently in mysql-server/sql/sql_show.cc, it's always at the *end* of
// create_options... but just to code defensively we handle any location.
if strings.HasPrefix(rawTable.CreateOptions.String, "PARTITIONED ") {
tables[n].CreateOptions = strings.Replace(rawTable.CreateOptions.String, "PARTITIONED ", "", 1)
} else {
tables[n].CreateOptions = strings.Replace(rawTable.CreateOptions.String, " PARTITIONED", "", 1)
}
}
}
// Obtain the columns in all tables in the schema
var rawColumns []struct {
Name string `db:"column_name"`
TableName string `db:"table_name"`
Type string `db:"column_type"`
IsNullable string `db:"is_nullable"`
Default sql.NullString `db:"column_default"`
Extra string `db:"extra"`
Comment string `db:"column_comment"`
CharSet sql.NullString `db:"character_set_name"`
Collation sql.NullString `db:"collation_name"`
CollationIsDefault sql.NullString `db:"is_default"`
}
query = `
SELECT c.table_name AS table_name, c.column_name AS column_name,
c.column_type AS column_type, c.is_nullable AS is_nullable,
c.column_default AS column_default, c.extra AS extra,
c.column_comment AS column_comment,
c.character_set_name AS character_set_name,
c.collation_name AS collation_name, co.is_default AS is_default
FROM columns c
LEFT JOIN collations co ON co.collation_name = c.collation_name
WHERE c.table_schema = ?
ORDER BY c.table_name, c.ordinal_position`
if err := db.Select(&rawColumns, query, schema); err != nil {
return nil, fmt.Errorf("Error querying information_schema.columns for schema %s: %s", schema, err)
}
columnsByTableName := make(map[string][]*Column)
columnsByTableAndName := make(map[string]*Column)
for _, rawColumn := range rawColumns {
col := &Column{
Name: rawColumn.Name,
TypeInDB: rawColumn.Type,
Nullable: strings.ToUpper(rawColumn.IsNullable) == "YES",
AutoIncrement: strings.Contains(rawColumn.Extra, "auto_increment"),
Comment: rawColumn.Comment,
}
if !rawColumn.Default.Valid {
col.Default = ColumnDefaultNull
} else if flavor.AllowDefaultExpression() {
if rawColumn.Default.String[0] == '\'' {
col.Default = ColumnDefaultValue(strings.Trim(rawColumn.Default.String, "'"))
} else if rawColumn.Default.String == "NULL" {
col.Default = ColumnDefaultNull
} else {
col.Default = ColumnDefaultExpression(rawColumn.Default.String)
}
} else if strings.HasPrefix(rawColumn.Default.String, "CURRENT_TIMESTAMP") && (strings.HasPrefix(rawColumn.Type, "timestamp") || strings.HasPrefix(rawColumn.Type, "datetime")) {
col.Default = ColumnDefaultExpression(rawColumn.Default.String)
} else if strings.HasPrefix(rawColumn.Type, "bit") && strings.HasPrefix(rawColumn.Default.String, "b'") {
col.Default = ColumnDefaultExpression(rawColumn.Default.String)
} else {
col.Default = ColumnDefaultValue(rawColumn.Default.String)
}
if strings.HasPrefix(strings.ToLower(rawColumn.Extra), "on update ") {
col.OnUpdate = rawColumn.Extra[10:]
// Some flavors omit fractional precision from ON UPDATE in
// information_schema only, despite it being present everywhere else
if openParen := strings.IndexByte(rawColumn.Type, '('); openParen > -1 && !strings.Contains(col.OnUpdate, "(") {
col.OnUpdate = fmt.Sprintf("%s%s", col.OnUpdate, rawColumn.Type[openParen:])
}
}
if rawColumn.Collation.Valid { // only text-based column types have a notion of charset and collation
col.CharSet = rawColumn.CharSet.String
col.Collation = rawColumn.Collation.String
col.CollationIsDefault = (rawColumn.CollationIsDefault.String != "")
}
if columnsByTableName[rawColumn.TableName] == nil {
columnsByTableName[rawColumn.TableName] = make([]*Column, 0)
}
columnsByTableName[rawColumn.TableName] = append(columnsByTableName[rawColumn.TableName], col)
fullNameStr := fmt.Sprintf("%s.%s.%s", schema, rawColumn.TableName, rawColumn.Name)
columnsByTableAndName[fullNameStr] = col
}
for n, t := range tables {
// Put the columns into the table
tables[n].Columns = columnsByTableName[t.Name]
// Avoid issues from data dictionary weirdly caching a NULL next auto-inc
if t.NextAutoIncrement == 0 && t.HasAutoIncrement() {
tables[n].NextAutoIncrement = 1
}
}
// Obtain the indexes of all tables in the schema. Since multi-column indexes
// have multiple rows in the result set, we do two passes over the result: one
// to figure out which indexes exist, and one to stitch together the col info.
// We cannot use an ORDER BY on this query, since only the unsorted result
// matches the same order of secondary indexes as the CREATE TABLE statement.
var rawIndexes []struct {
Name string `db:"index_name"`
TableName string `db:"table_name"`
NonUnique uint8 `db:"non_unique"`
SeqInIndex uint8 `db:"seq_in_index"`
ColumnName string `db:"column_name"`
SubPart sql.NullInt64 `db:"sub_part"`
Comment sql.NullString `db:"index_comment"`
}
query = `
SELECT index_name AS index_name, table_name AS table_name,
non_unique AS non_unique, seq_in_index AS seq_in_index,
column_name AS column_name, sub_part AS sub_part,
index_comment AS index_comment
FROM statistics
WHERE table_schema = ?`
if err := db.Select(&rawIndexes, query, schema); err != nil {
return nil, fmt.Errorf("Error querying information_schema.statistics for schema %s: %s", schema, err)
}
primaryKeyByTableName := make(map[string]*Index)
secondaryIndexesByTableName := make(map[string][]*Index)
indexesByTableAndName := make(map[string]*Index)
for _, rawIndex := range rawIndexes {
if rawIndex.SeqInIndex > 1 {
continue
}
index := &Index{
Name: rawIndex.Name,
Unique: rawIndex.NonUnique == 0,
Columns: make([]*Column, 0),
SubParts: make([]uint16, 0),
Comment: rawIndex.Comment.String,
}
if strings.ToUpper(index.Name) == "PRIMARY" {
primaryKeyByTableName[rawIndex.TableName] = index
index.PrimaryKey = true
} else {
if secondaryIndexesByTableName[rawIndex.TableName] == nil {
secondaryIndexesByTableName[rawIndex.TableName] = make([]*Index, 0)
}
secondaryIndexesByTableName[rawIndex.TableName] = append(secondaryIndexesByTableName[rawIndex.TableName], index)
}
fullNameStr := fmt.Sprintf("%s.%s.%s", schema, rawIndex.TableName, rawIndex.Name)
indexesByTableAndName[fullNameStr] = index
}
for _, rawIndex := range rawIndexes {
fullIndexNameStr := fmt.Sprintf("%s.%s.%s", schema, rawIndex.TableName, rawIndex.Name)
index, ok := indexesByTableAndName[fullIndexNameStr]
if !ok {
panic(fmt.Errorf("Cannot find index %s", fullIndexNameStr))
}
fullColNameStr := fmt.Sprintf("%s.%s.%s", schema, rawIndex.TableName, rawIndex.ColumnName)
col, ok := columnsByTableAndName[fullColNameStr]
if !ok {
panic(fmt.Errorf("Cannot find indexed column %s for index %s", fullColNameStr, fullIndexNameStr))
}
for len(index.Columns) < int(rawIndex.SeqInIndex) {
index.Columns = append(index.Columns, new(Column))
index.SubParts = append(index.SubParts, 0)
}
index.Columns[rawIndex.SeqInIndex-1] = col
if rawIndex.SubPart.Valid {
index.SubParts[rawIndex.SeqInIndex-1] = uint16(rawIndex.SubPart.Int64)
}
}
for _, t := range tables {
t.PrimaryKey = primaryKeyByTableName[t.Name]
t.SecondaryIndexes = secondaryIndexesByTableName[t.Name]
}
// Obtain the foreign keys of the tables in the schema
var rawForeignKeys []struct {
Name string `db:"constraint_name"`
TableName string `db:"table_name"`
UpdateRule string `db:"update_rule"`
DeleteRule string `db:"delete_rule"`
ReferencedTableName string `db:"referenced_table_name"`
ReferencedSchemaName string `db:"referenced_schema"`
ReferencedColumnName string `db:"referenced_column_name"`
ColumnLookupKey string `db:"col_lookup_key"`
}
query = `
SELECT rc.constraint_name AS constraint_name, rc.table_name AS table_name,
rc.update_rule AS update_rule, rc.delete_rule AS delete_rule,
rc.referenced_table_name AS referenced_table_name,
IF(rc.constraint_schema=rc.unique_constraint_schema, '', rc.unique_constraint_schema) AS referenced_schema,
kcu.referenced_column_name AS referenced_column_name,
CONCAT(kcu.constraint_schema, '.', kcu.table_name, '.', kcu.column_name) AS col_lookup_key
FROM referential_constraints rc
JOIN key_column_usage kcu ON kcu.constraint_name = rc.constraint_name AND
kcu.constraint_schema = rc.constraint_schema AND
kcu.referenced_column_name IS NOT NULL
WHERE rc.constraint_schema = ?
ORDER BY BINARY rc.constraint_name, kcu.ordinal_position`
if err := db.Select(&rawForeignKeys, query, schema); err != nil {
return nil, fmt.Errorf("Error querying foreign key constraints for schema %s: %s", schema, err)
}
foreignKeysByTableName := make(map[string][]*ForeignKey)
foreignKeysByName := make(map[string]*ForeignKey)
for _, rawForeignKey := range rawForeignKeys {
col := columnsByTableAndName[rawForeignKey.ColumnLookupKey]
if fk, already := foreignKeysByName[rawForeignKey.Name]; already {
fk.Columns = append(fk.Columns, col)
fk.ReferencedColumnNames = append(fk.ReferencedColumnNames, rawForeignKey.ReferencedColumnName)
} else {
foreignKey := &ForeignKey{
Name: rawForeignKey.Name,
ReferencedSchemaName: rawForeignKey.ReferencedSchemaName,
ReferencedTableName: rawForeignKey.ReferencedTableName,
UpdateRule: rawForeignKey.UpdateRule,
DeleteRule: rawForeignKey.DeleteRule,
Columns: []*Column{col},
ReferencedColumnNames: []string{rawForeignKey.ReferencedColumnName},
}
foreignKeysByName[rawForeignKey.Name] = foreignKey
foreignKeysByTableName[rawForeignKey.TableName] = append(foreignKeysByTableName[rawForeignKey.TableName], foreignKey)
}
}
for _, t := range tables {
t.ForeignKeys = foreignKeysByTableName[t.Name]
}
// Obtain actual SHOW CREATE TABLE output and store in each table. Since
// there's no way in MySQL to bulk fetch this for multiple tables at once,
// use multiple goroutines to make this faster.
db, err = instance.Connect(schema, "")
if err != nil {
return nil, err
}
defer db.SetMaxOpenConns(0)
db.SetMaxOpenConns(10)
var g errgroup.Group
for _, t := range tables {
t := t
g.Go(func() (err error) {
if t.CreateStatement, err = showCreateTable(db, t.Name); err != nil {
return fmt.Errorf("Error executing SHOW CREATE TABLE for %s.%s: %s", EscapeIdentifier(schema), EscapeIdentifier(t.Name), err)
}
if t.Engine == "InnoDB" {
t.CreateStatement = NormalizeCreateOptions(t.CreateStatement)
}
// Index order is unpredictable with new MySQL 8 data dictionary, so reorder
// indexes based on parsing SHOW CREATE TABLE if needed
if flavor.HasDataDictionary() && len(t.SecondaryIndexes) > 1 {
fixIndexOrder(t)
}
// Compare what we expect the create DDL to be, to determine if we support
// diffing for the table. Ignore next-auto-increment differences in this
// comparison, since the value may have changed between our previous
// information_schema introspection and our current SHOW CREATE TABLE call!
actual, _ := ParseCreateAutoInc(t.CreateStatement)
expected, _ := ParseCreateAutoInc(t.GeneratedCreateStatement(flavor))
if actual != expected {
t.UnsupportedDDL = true
}
return nil
})
}
return tables, g.Wait()
}
var reIndexLine = regexp.MustCompile("^\\s+(?:UNIQUE )?KEY `(.+)` \\(`")