-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
160 lines (147 loc) · 3.79 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package main
import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"sort"
"sync"
)
// DataElement is minimal element of data
type DataElement struct {
ID int `json:"id"`
FirstName string `json:"first_name"`
LastName string `json:"last_name"`
}
// OutputDataStructure is data structure format output to/from json files
type OutputDataStructure struct {
Data []DataElement `json:"data"`
}
// Concurrency depicts number of concurrent processes being executed
const Concurrency = 12
// Output is used for output
var Output map[int]DataElement
var channelForFilesToParse chan string
var channelForDataElementsToSave chan DataElement
var inputDirectory string
var outputFileName string
var ignoreDuplicates bool
// ReadDirectory reads directory
func ReadDirectory(pathToDirectory string) (err error) {
err = filepath.Walk(pathToDirectory, func(path string, info os.FileInfo, err error) error {
if err != nil {
if os.IsNotExist(err) {
fmt.Println("Source directory not found")
os.Exit(10)
}
return err
}
if filepath.Dir(path) == pathToDirectory {
if !info.IsDir() && filepath.Ext(info.Name()) == ".json" {
channelForFilesToParse <- path
}
}
return nil
})
return
}
// Parse parses
func Parse(pathToFile string) (err error) {
var elements OutputDataStructure
data, err := ioutil.ReadFile(pathToFile)
if err != nil {
return
}
err = json.Unmarshal(data, &elements)
if err != nil {
return
}
for _, el := range elements.Data {
channelForDataElementsToSave <- el
}
return
}
func main() {
var err error
flag.StringVar(&inputDirectory, "source-dir", "", "The path to the directory to read files from")
flag.StringVar(&outputFileName, "out-file", "", "The path to the file to write the final data to")
flag.BoolVar(&ignoreDuplicates, "ignore-duplicates", false, "Whether or not to ignore duplicate keys")
flag.Parse()
if inputDirectory == "" {
fmt.Println("Source directory not found")
os.Exit(10)
}
if outputFileName == "" {
fmt.Println("Output file be empty")
os.Exit(1)
}
channelForFilesToParse = make(chan string, 1000)
channelForDataElementsToSave = make(chan DataElement, 1000)
abs, err := filepath.Abs(inputDirectory)
if err != nil {
log.Fatalf("%s : while reading absolute path for %s", err, inputDirectory)
}
err = ReadDirectory(abs)
if err != nil {
log.Fatalf("%s : while reading input directory %s", err, inputDirectory)
}
wg := sync.WaitGroup{}
wg.Add(Concurrency)
if len(channelForFilesToParse) > 0 {
for i := 0; i < Concurrency; i += 1 {
go func() {
if len(channelForFilesToParse) == 0 {
wg.Done()
return
}
for pathToFileToParse := range channelForFilesToParse {
err = Parse(pathToFileToParse)
if err != nil {
log.Fatalf("%s : while parsing %s", err, pathToFileToParse)
}
if len(channelForFilesToParse) == 0 {
break
}
}
wg.Done()
}()
}
}
wg.Wait()
Output = make(map[int]DataElement, 0)
if len(channelForDataElementsToSave) > 0 {
for de := range channelForDataElementsToSave {
if 0 == len(channelForDataElementsToSave) {
break
}
_, found := Output[de.ID]
if found {
if ignoreDuplicates {
continue
}
fmt.Println("Duplicate data found")
os.Exit(20)
}
Output[de.ID] = de
}
}
outputSlice := make([]DataElement, 0)
for _, v := range Output {
outputSlice = append(outputSlice, v)
}
sort.Slice(outputSlice, func(i, j int) bool {
return outputSlice[i].ID < outputSlice[j].ID
})
payload, err := json.MarshalIndent(OutputDataStructure{Data: outputSlice}, "", " ")
if err != nil {
log.Fatalf("%s : while marshaling output data to json", err)
}
err = ioutil.WriteFile(outputFileName, payload, 0644)
if err != nil {
log.Fatalf("%s : while writing output into %s", err, outputFileName)
}
os.Exit(0)
}