-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathimport.go
112 lines (89 loc) · 2 KB
/
import.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package sqlimporter
import (
"database/sql"
libcsv "encoding/csv"
"fmt"
"log"
"path"
"strings"
"github.com/chop-dbhi/sql-importer/profile/csv"
"github.com/chop-dbhi/sql-importer/reader"
)
type Request struct {
// Input path.
Path string
// Target database.
Database string
Schema string
Table string
// Behavior
AppendTable bool
CStore bool
// File specifics.
CSV bool
Compression string
// CSV
Delimiter string
Header bool
}
func Import(r *Request) error {
fileType, fileComp := reader.DetectType(r.Path)
if r.CSV || fileType == "csv" {
r.CSV = true
} else {
return fmt.Errorf("file type not supported: %s", fileType)
}
if r.Compression == "" {
r.Compression = fileComp
}
if r.Table == "" {
_, base := path.Split(r.Path)
r.Table = strings.Split(base, ".")[0]
}
// Connect to database.
db, err := sql.Open("postgres", r.Database)
if err != nil {
return fmt.Errorf("cannot open db connection: %s", err)
}
defer db.Close()
// Open the input stream.
input, err := reader.Open(r.Path, r.Compression)
if err != nil {
return fmt.Errorf("cannot open input: %s", err)
}
defer input.Close()
cp := csv.NewProfiler(input)
cp.Delimiter = r.Delimiter[0]
cp.Header = r.Header
prof, err := cp.Profile()
if err != nil {
return fmt.Errorf("profile error: %s", err)
}
log.Print("Done profiling")
input.Close()
input, err = reader.Open(r.Path, r.Compression)
if err != nil {
return fmt.Errorf("cannot open input: %s", err)
}
defer input.Close()
schema := NewSchema(prof)
if r.CStore {
schema.Cstore = true
}
// Load intot he database.
log.Printf(`Begin load into "%s"."%s"`, r.Schema, r.Table)
cr := libcsv.NewReader(input)
cr.Comma = rune(r.Delimiter[0])
var n int64
dbc := New(db)
if r.AppendTable {
n, err = dbc.Append(r.Schema, r.Table, schema, cr)
} else {
n, err = dbc.Replace(r.Schema, r.Table, schema, cr)
}
if err != nil {
return fmt.Errorf("error loading: %s", err)
}
log.Printf("Loaded %d records", n)
return nil
}