Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit e9d8e58

Browse files
authoredJul 11, 2022
Merge pull request volatiletech#1143 from pavel-krush/master
Fetch column and table info in parallel
2 parents c758432 + 1538f76 commit e9d8e58

File tree

7 files changed

+176
-75
lines changed

7 files changed

+176
-75
lines changed
 

‎drivers/interface.go

+166-69
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package drivers
44

55
import (
66
"sort"
7+
"sync"
78

89
"github.com/friendsofgo/errors"
910
"github.com/volatiletech/sqlboiler/v4/importers"
@@ -17,13 +18,17 @@ const (
1718
ConfigSchema = "schema"
1819
ConfigAddEnumTypes = "add-enum-types"
1920
ConfigEnumNullPrefix = "enum-null-prefix"
21+
ConfigConcurrency = "concurrency"
2022

2123
ConfigUser = "user"
2224
ConfigPass = "pass"
2325
ConfigHost = "host"
2426
ConfigPort = "port"
2527
ConfigDBName = "dbname"
2628
ConfigSSLMode = "sslmode"
29+
30+
// DefaultConcurrency defines the default amount of threads to use when loading tables info
31+
DefaultConcurrency = 10
2732
)
2833

2934
// Interface abstracts either a side-effect imported driver or a binary
@@ -102,77 +107,122 @@ type TableColumnTypeTranslator interface {
102107
// Tables returns the metadata for all tables, minus the tables
103108
// specified in the blacklist.
104109
func Tables(c Constructor, schema string, whitelist, blacklist []string) ([]Table, error) {
110+
return TablesConcurrently(c, schema, whitelist, blacklist, 1)
111+
}
112+
113+
// TablesConcurrently is a concurrent version of Tables. It returns the
114+
// metadata for all tables, minus the tables specified in the blacklist.
115+
func TablesConcurrently(c Constructor, schema string, whitelist, blacklist []string, concurrency int) ([]Table, error) {
105116
var err error
117+
var ret []Table
106118

107-
names, err := c.TableNames(schema, whitelist, blacklist)
119+
ret, err = tables(c, schema, whitelist, blacklist, concurrency)
108120
if err != nil {
109-
return nil, errors.Wrap(err, "unable to get table names")
121+
return nil, errors.Wrap(err, "unable to load tables")
110122
}
111123

112-
sort.Strings(names)
113-
114-
var tables []Table
115-
for _, name := range names {
116-
t := Table{
117-
Name: name,
124+
if vc, ok := c.(ViewConstructor); ok {
125+
v, err := views(vc, schema, whitelist, blacklist, concurrency)
126+
if err != nil {
127+
return nil, errors.Wrap(err, "unable to load views")
118128
}
129+
ret = append(ret, v...)
130+
}
119131

120-
if t.Columns, err = c.Columns(schema, name, whitelist, blacklist); err != nil {
121-
return nil, errors.Wrapf(err, "unable to fetch table column info (%s)", name)
122-
}
132+
return ret, nil
133+
}
123134

124-
tr, ok := c.(TableColumnTypeTranslator)
125-
if ok {
126-
for i, col := range t.Columns {
127-
t.Columns[i] = tr.TranslateTableColumnType(col, name)
128-
}
129-
} else {
130-
for i, col := range t.Columns {
131-
t.Columns[i] = c.TranslateColumnType(col)
132-
}
133-
}
135+
func tables(c Constructor, schema string, whitelist, blacklist []string, concurrency int) ([]Table, error) {
136+
var err error
134137

135-
if t.PKey, err = c.PrimaryKeyInfo(schema, name); err != nil {
136-
return nil, errors.Wrapf(err, "unable to fetch table pkey info (%s)", name)
137-
}
138+
names, err := c.TableNames(schema, whitelist, blacklist)
139+
if err != nil {
140+
return nil, errors.Wrap(err, "unable to get table names")
141+
}
138142

139-
if t.FKeys, err = c.ForeignKeyInfo(schema, name); err != nil {
140-
return nil, errors.Wrapf(err, "unable to fetch table fkey info (%s)", name)
141-
}
143+
sort.Strings(names)
142144

143-
filterPrimaryKey(&t, whitelist, blacklist)
144-
filterForeignKeys(&t, whitelist, blacklist)
145+
ret := make([]Table, len(names))
146+
147+
limiter := newConcurrencyLimiter(concurrency)
148+
wg := sync.WaitGroup{}
149+
errs := make(chan error, len(names))
150+
for i, name := range names {
151+
wg.Add(1)
152+
limiter.get()
153+
go func(i int, name string) {
154+
defer wg.Done()
155+
defer limiter.put()
156+
t, err := table(c, schema, name, whitelist, blacklist)
157+
if err != nil {
158+
errs <- err
159+
return
160+
}
161+
ret[i] = t
162+
}(i, name)
163+
}
145164

146-
setIsJoinTable(&t)
165+
wg.Wait()
147166

148-
tables = append(tables, t)
167+
// return first error occurred if any
168+
if len(errs) > 0 {
169+
return nil, <-errs
149170
}
150171

151172
// Relationships have a dependency on foreign key nullability.
152-
for i := range tables {
153-
tbl := &tables[i]
154-
setForeignKeyConstraints(tbl, tables)
173+
for i := range ret {
174+
tbl := &ret[i]
175+
setForeignKeyConstraints(tbl, ret)
155176
}
156-
for i := range tables {
157-
tbl := &tables[i]
158-
setRelationships(tbl, tables)
177+
for i := range ret {
178+
tbl := &ret[i]
179+
setRelationships(tbl, ret)
159180
}
160181

161-
if vc, ok := c.(ViewConstructor); ok {
162-
viewTables, err := views(vc, schema, whitelist, blacklist)
163-
if err != nil {
164-
return nil, errors.Wrap(err, "unable to load views")
182+
return ret, nil
183+
}
184+
185+
// table returns columns info for a given table
186+
func table(c Constructor, schema string, name string, whitelist, blacklist []string) (Table, error) {
187+
var err error
188+
t := &Table{
189+
Name: name,
190+
}
191+
192+
if t.Columns, err = c.Columns(schema, name, whitelist, blacklist); err != nil {
193+
return Table{}, errors.Wrapf(err, "unable to fetch table column info (%s)", name)
194+
}
195+
196+
tr, ok := c.(TableColumnTypeTranslator)
197+
if ok {
198+
for i, col := range t.Columns {
199+
t.Columns[i] = tr.TranslateTableColumnType(col, name)
165200
}
201+
} else {
202+
for i, col := range t.Columns {
203+
t.Columns[i] = c.TranslateColumnType(col)
204+
}
205+
}
166206

167-
tables = append(tables, viewTables...)
207+
if t.PKey, err = c.PrimaryKeyInfo(schema, name); err != nil {
208+
return Table{}, errors.Wrapf(err, "unable to fetch table pkey info (%s)", name)
168209
}
169210

170-
return tables, nil
211+
if t.FKeys, err = c.ForeignKeyInfo(schema, name); err != nil {
212+
return Table{}, errors.Wrapf(err, "unable to fetch table fkey info (%s)", name)
213+
}
214+
215+
filterPrimaryKey(t, whitelist, blacklist)
216+
filterForeignKeys(t, whitelist, blacklist)
217+
218+
setIsJoinTable(t)
219+
220+
return *t, nil
171221
}
172222

173223
// views returns the metadata for all views, minus the views
174224
// specified in the blacklist.
175-
func views(c ViewConstructor, schema string, whitelist, blacklist []string) ([]Table, error) {
225+
func views(c ViewConstructor, schema string, whitelist, blacklist []string, concurrency int) ([]Table, error) {
176226
var err error
177227

178228
names, err := c.ViewNames(schema, whitelist, blacklist)
@@ -182,44 +232,71 @@ func views(c ViewConstructor, schema string, whitelist, blacklist []string) ([]T
182232

183233
sort.Strings(names)
184234

185-
var views []Table
186-
for _, name := range names {
187-
t := Table{
188-
IsView: true,
189-
Name: name,
190-
}
235+
ret := make([]Table, len(names))
236+
237+
limiter := newConcurrencyLimiter(concurrency)
238+
wg := sync.WaitGroup{}
239+
errs := make(chan error, len(names))
240+
for i, name := range names {
241+
wg.Add(1)
242+
limiter.get()
243+
go func(i int, name string) {
244+
defer wg.Done()
245+
defer limiter.put()
246+
t, err := view(c, schema, name, whitelist, blacklist)
247+
if err != nil {
248+
errs <- err
249+
return
250+
}
251+
ret[i] = t
252+
}(i, name)
253+
}
191254

192-
if t.ViewCapabilities, err = c.ViewCapabilities(schema, name); err != nil {
193-
return nil, errors.Wrapf(err, "unable to fetch view capabilities info (%s)", name)
194-
}
255+
wg.Wait()
195256

196-
if t.Columns, err = c.ViewColumns(schema, name, whitelist, blacklist); err != nil {
197-
return nil, errors.Wrapf(err, "unable to fetch view column info (%s)", name)
198-
}
257+
// return first error occurred if any
258+
if len(errs) > 0 {
259+
return nil, <-errs
260+
}
199261

200-
tr, ok := c.(TableColumnTypeTranslator)
201-
if ok {
202-
for i, col := range t.Columns {
203-
t.Columns[i] = tr.TranslateTableColumnType(col, name)
204-
}
205-
} else {
206-
for i, col := range t.Columns {
207-
t.Columns[i] = c.TranslateColumnType(col)
208-
}
209-
}
262+
return ret, nil
263+
}
210264

211-
views = append(views, t)
265+
// view returns columns info for a given view
266+
func view(c ViewConstructor, schema string, name string, whitelist, blacklist []string) (Table, error) {
267+
var err error
268+
t := Table{
269+
IsView: true,
270+
Name: name,
212271
}
213272

214-
return views, nil
273+
if t.ViewCapabilities, err = c.ViewCapabilities(schema, name); err != nil {
274+
return Table{}, errors.Wrapf(err, "unable to fetch view capabilities info (%s)", name)
275+
}
276+
277+
if t.Columns, err = c.ViewColumns(schema, name, whitelist, blacklist); err != nil {
278+
return Table{}, errors.Wrapf(err, "unable to fetch view column info (%s)", name)
279+
}
280+
281+
tr, ok := c.(TableColumnTypeTranslator)
282+
if ok {
283+
for i, col := range t.Columns {
284+
t.Columns[i] = tr.TranslateTableColumnType(col, name)
285+
}
286+
} else {
287+
for i, col := range t.Columns {
288+
t.Columns[i] = c.TranslateColumnType(col)
289+
}
290+
}
291+
292+
return t, nil
215293
}
216294

217295
func knownColumn(table string, column string, whitelist, blacklist []string) bool {
218296
return (len(whitelist) == 0 ||
219297
strmangle.SetInclude(table, whitelist) ||
220298
strmangle.SetInclude(table+"."+column, whitelist) ||
221299
strmangle.SetInclude("*."+column, whitelist)) &&
222-
223300
(len(blacklist) == 0 || (!strmangle.SetInclude(table, blacklist) &&
224301
!strmangle.SetInclude(table+"."+column, blacklist) &&
225302
!strmangle.SetInclude("*."+column, blacklist)))
@@ -294,3 +371,23 @@ func setRelationships(t *Table, tables []Table) {
294371
t.ToOneRelationships = toOneRelationships(*t, tables)
295372
t.ToManyRelationships = toManyRelationships(*t, tables)
296373
}
374+
375+
// concurrencyCounter is a helper structure that can limit amount of concurrently processed requests
376+
type concurrencyLimiter chan struct{}
377+
378+
func newConcurrencyLimiter(capacity int) concurrencyLimiter {
379+
ret := make(concurrencyLimiter, capacity)
380+
for i := 0; i < capacity; i++ {
381+
ret <- struct{}{}
382+
}
383+
384+
return ret
385+
}
386+
387+
func (c concurrencyLimiter) get() {
388+
<-c
389+
}
390+
391+
func (c concurrencyLimiter) put() {
392+
c <- struct{}{}
393+
}

‎drivers/interface_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func (m testMockDriver) UseIndexPlaceholders() bool {
115115
func TestTables(t *testing.T) {
116116
t.Parallel()
117117

118-
tables, err := Tables(testMockDriver{}, "public", nil, nil)
118+
tables, err := TablesConcurrently(testMockDriver{}, "public", nil, nil, 1)
119119
if err != nil {
120120
t.Error(err)
121121
}

‎drivers/mocks/mock.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (m *MockDriver) Assemble(config drivers.Config) (dbinfo *drivers.DBInfo, er
6666
whitelist, _ := config.StringSlice(drivers.ConfigWhitelist)
6767
blacklist, _ := config.StringSlice(drivers.ConfigBlacklist)
6868

69-
dbinfo.Tables, err = drivers.Tables(m, schema, whitelist, blacklist)
69+
dbinfo.Tables, err = drivers.TablesConcurrently(m, schema, whitelist, blacklist, 1)
7070
if err != nil {
7171
return nil, err
7272
}

‎drivers/sqlboiler-mssql/driver/mssql.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ func (m *MSSQLDriver) Assemble(config drivers.Config) (dbinfo *drivers.DBInfo, e
8181
schema := config.DefaultString(drivers.ConfigSchema, "dbo")
8282
whitelist, _ := config.StringSlice(drivers.ConfigWhitelist)
8383
blacklist, _ := config.StringSlice(drivers.ConfigBlacklist)
84+
concurrency := config.DefaultInt(drivers.ConfigConcurrency, drivers.DefaultConcurrency)
8485

8586
m.connStr = MSSQLBuildQueryString(user, pass, dbname, host, port, sslmode)
8687
m.conn, err = sql.Open("mssql", m.connStr)
@@ -110,7 +111,7 @@ func (m *MSSQLDriver) Assemble(config drivers.Config) (dbinfo *drivers.DBInfo, e
110111
UseCaseWhenExistsClause: true,
111112
},
112113
}
113-
dbinfo.Tables, err = drivers.Tables(m, schema, whitelist, blacklist)
114+
dbinfo.Tables, err = drivers.TablesConcurrently(m, schema, whitelist, blacklist, concurrency)
114115
if err != nil {
115116
return nil, err
116117
}

‎drivers/sqlboiler-mysql/driver/mysql.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ func (m *MySQLDriver) Assemble(config drivers.Config) (dbinfo *drivers.DBInfo, e
8383
schema := dbname
8484
whitelist, _ := config.StringSlice(drivers.ConfigWhitelist)
8585
blacklist, _ := config.StringSlice(drivers.ConfigBlacklist)
86+
concurrency := config.DefaultInt(drivers.ConfigConcurrency, drivers.DefaultConcurrency)
8687

8788
tinyIntAsIntIntf, ok := config["tinyint_as_int"]
8889
if ok {
@@ -116,7 +117,7 @@ func (m *MySQLDriver) Assemble(config drivers.Config) (dbinfo *drivers.DBInfo, e
116117
},
117118
}
118119

119-
dbinfo.Tables, err = drivers.Tables(m, schema, whitelist, blacklist)
120+
dbinfo.Tables, err = drivers.TablesConcurrently(m, schema, whitelist, blacklist, concurrency)
120121
if err != nil {
121122
return nil, err
122123
}

‎drivers/sqlboiler-psql/driver/psql.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ func (p *PostgresDriver) Assemble(config drivers.Config) (dbinfo *drivers.DBInfo
9696
schema := config.DefaultString(drivers.ConfigSchema, "public")
9797
whitelist, _ := config.StringSlice(drivers.ConfigWhitelist)
9898
blacklist, _ := config.StringSlice(drivers.ConfigBlacklist)
99+
concurrency := config.DefaultInt(drivers.ConfigConcurrency, drivers.DefaultConcurrency)
99100

100101
useSchema := schema != "public"
101102

@@ -130,7 +131,7 @@ func (p *PostgresDriver) Assemble(config drivers.Config) (dbinfo *drivers.DBInfo
130131
UseDefaultKeyword: true,
131132
},
132133
}
133-
dbinfo.Tables, err = drivers.Tables(p, schema, whitelist, blacklist)
134+
dbinfo.Tables, err = drivers.TablesConcurrently(p, schema, whitelist, blacklist, concurrency)
134135
if err != nil {
135136
return nil, err
136137
}

‎drivers/sqlboiler-sqlite3/driver/sqlite3.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ func (s SQLiteDriver) Assemble(config drivers.Config) (dbinfo *drivers.DBInfo, e
7070
dbname := config.MustString(drivers.ConfigDBName)
7171
whitelist, _ := config.StringSlice(drivers.ConfigWhitelist)
7272
blacklist, _ := config.StringSlice(drivers.ConfigBlacklist)
73+
concurrency := config.DefaultInt(drivers.ConfigConcurrency, drivers.DefaultConcurrency)
7374

7475
s.connStr = SQLiteBuildQueryString(dbname)
7576
s.dbConn, err = sql.Open("sqlite", s.connStr)
@@ -95,7 +96,7 @@ func (s SQLiteDriver) Assemble(config drivers.Config) (dbinfo *drivers.DBInfo, e
9596
},
9697
}
9798

98-
dbinfo.Tables, err = drivers.Tables(s, "", whitelist, blacklist)
99+
dbinfo.Tables, err = drivers.TablesConcurrently(s, "", whitelist, blacklist, concurrency)
99100
if err != nil {
100101
return nil, err
101102
}

0 commit comments

Comments
 (0)