Skip to content

Commit

Permalink
updated file stream methods
Browse files Browse the repository at this point in the history
adamstruck committed Nov 22, 2019
1 parent b037525 commit f958057
Showing 7 changed files with 200 additions and 144 deletions.
44 changes: 33 additions & 11 deletions cmd/kvload/main.go
Original file line number Diff line number Diff line change
@@ -2,10 +2,10 @@ package kvload

import (
"fmt"
"strings"
"sync"
"time"

"github.com/bmeg/golib"
"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/kvgraph"
"github.com/bmeg/grip/kvi"
@@ -49,7 +49,12 @@ var Cmd = &cobra.Command{
db := kvgraph.NewKVGraph(kv)
defer db.Close()

_ = db.AddGraph(graph)
err = db.AddGraph(graph)
if err != nil {
if strings.Contains(err.Error(), "invalid graph name") {
return err
}
}
kgraph, err := db.Graph(graph)
if err != nil {
return err
@@ -59,17 +64,24 @@ var Cmd = &cobra.Command{
edgeFileArray := []string{}

if vertexManifestFile != "" {
reader, err := golib.ReadFileLines(vertexManifestFile)
if err == nil {
for line := range reader {
reader, err := util.StreamLines(vertexManifestFile, 10)
if err != nil {
return err
}
for line := range reader {
if line != "" {
vertexFileArray = append(vertexFileArray, string(line))
}
}
}

if edgeManifestFile != "" {
reader, err := golib.ReadFileLines(edgeManifestFile)
if err == nil {
for line := range reader {
reader, err := util.StreamLines(edgeManifestFile, 10)
if err != nil {
return err
}
for line := range reader {
if line != "" {
edgeFileArray = append(edgeFileArray, string(line))
}
}
@@ -82,7 +94,7 @@ var Cmd = &cobra.Command{
edgeFileArray = append(edgeFileArray, edgeFile)
}

graphChan := make(chan *gripql.GraphElement, 1000)
graphChan := make(chan *gripql.GraphElement, 10)
wg := &sync.WaitGroup{}
go func() {
wg.Add(1)
@@ -96,7 +108,12 @@ var Cmd = &cobra.Command{
for _, vertexFile := range vertexFileArray {
log.Infof("Loading %s", vertexFile)
count := 0
for v := range util.StreamVerticesFromFile(vertexFile) {
vertChan, err := util.StreamVerticesFromFile(vertexFile)
if err != nil {
log.WithFields(log.Fields{"error": err}).Errorf("Error reading file: %s", vertexFile)
continue
}
for v := range vertChan {
graphChan <- &gripql.GraphElement{Graph: graph, Vertex: v}
count++
vertexCounter.Incr(1)
@@ -111,7 +128,12 @@ var Cmd = &cobra.Command{
for _, edgeFile := range edgeFileArray {
log.Infof("Loading %s", edgeFile)
count := 0
for e := range util.StreamEdgesFromFile(edgeFile) {
edgeChan, err := util.StreamEdgesFromFile(edgeFile)
if err != nil {
log.WithFields(log.Fields{"error": err}).Errorf("Error reading file: %s", edgeFile)
continue
}
for e := range edgeChan {
graphChan <- &gripql.GraphElement{Graph: graph, Edge: e}
count++
edgeCounter.Incr(1)
12 changes: 10 additions & 2 deletions cmd/load/main.go
Original file line number Diff line number Diff line change
@@ -69,7 +69,11 @@ var Cmd = &cobra.Command{
if vertexFile != "" {
log.Infof("Loading vertex file: %s", vertexFile)
count := 0
for v := range util.StreamVerticesFromFile(vertexFile) {
vertChan, err := util.StreamVerticesFromFile(vertexFile)
if err != nil {
return err
}
for v := range vertChan {
count++
if count%1000 == 0 {
log.Infof("Loaded %d vertices", count)
@@ -82,7 +86,11 @@ var Cmd = &cobra.Command{
if edgeFile != "" {
log.Infof("Loading edge file: %s", edgeFile)
count := 0
for e := range util.StreamEdgesFromFile(edgeFile) {
edgeChan, err := util.StreamEdgesFromFile(edgeFile)
if err != nil {
return err
}
for e := range edgeChan {
count++
if count%1000 == 0 {
log.Infof("Loaded %d edges", count)
129 changes: 70 additions & 59 deletions cmd/mongoload/main.go
Original file line number Diff line number Diff line change
@@ -106,94 +106,105 @@ var Cmd = &cobra.Command{

if vertexFile != "" {
log.Infof("Loading vertex file: %s", vertexFile)
count := 0

docChan := make(chan []map[string]interface{}, 100)
bulkVertChan := make(chan []map[string]interface{}, 5)
docBatch := make([]map[string]interface{}, 0, batchSize)

go func() {
defer close(docChan)
for v := range util.StreamVerticesFromFile(vertexFile) {
data := mongo.PackVertex(v)
docBatch = append(docBatch, data)
if len(docBatch) > batchSize {
docChan <- docBatch
docBatch = make([]map[string]interface{}, 0, batchSize)
count := 0
for batch := range bulkVertChan {
for i := 0; i < maxRetries; i++ {
bulk := vertexCo.Bulk()
bulk.Unordered()
for _, data := range batch {
bulk.Upsert(bson.M{"_id": data["_id"]}, data)
count++
}
_, err = bulk.Run()
if err == nil || !isNetError(err) {
i = maxRetries
} else {
log.Infof("Refreshing Connection")
session.Refresh()
}
}
count++
if count%1000 == 0 {
log.Infof("Loaded %d vertices", count)
}
}
if len(docBatch) > 0 {
docChan <- docBatch
}
log.Infof("Loaded %d vertices", count)
}()

for batch := range docChan {
for i := 0; i < maxRetries; i++ {
bulk := vertexCo.Bulk()
bulk.Unordered()
for _, data := range batch {
bulk.Upsert(bson.M{"_id": data["_id"]}, data)
}
_, err = bulk.Run()
if err == nil || !isNetError(err) {
i = maxRetries
} else {
log.Infof("Refreshing Connection")
session.Refresh()
}
vertChan, err := util.StreamVerticesFromFile(vertexFile)
if err != nil {
return err
}
for v := range vertChan {
data := mongo.PackVertex(v)
docBatch = append(docBatch, data)
if len(docBatch) > batchSize {
bulkVertChan <- docBatch
docBatch = make([]map[string]interface{}, 0, batchSize)
}
}
log.Infof("Loaded %d vertices", count)
if len(docBatch) > 0 {
bulkVertChan <- docBatch
}
close(bulkVertChan)
}

if edgeFile != "" {
log.Infof("Loading edge file: %s", edgeFile)
count := 0

docChan := make(chan []map[string]interface{}, 100)
bulkEdgeChan := make(chan []map[string]interface{}, 5)
docBatch := make([]map[string]interface{}, 0, batchSize)

go func() {
defer close(docChan)
for e := range util.StreamEdgesFromFile(edgeFile) {
data := mongo.PackEdge(e)
if data["_id"] == "" {
data["_id"] = bson.NewObjectId().Hex()
}
docBatch = append(docBatch, data)
if len(docBatch) > batchSize {
docChan <- docBatch
docBatch = make([]map[string]interface{}, 0, batchSize)
count := 0
for batch := range bulkEdgeChan {
for i := 0; i < maxRetries; i++ {
bulk := edgeCo.Bulk()
bulk.Unordered()
for _, data := range batch {
bulk.Upsert(bson.M{"_id": data["_id"]}, data)
count++
}
_, err = bulk.Run()
if err == nil || !isNetError(err) {
i = maxRetries
} else {
log.Infof("Refreshing Connection")
session.Refresh()
}
}
count++
if count%1000 == 0 {
log.Infof("Loaded %d edges", count)
}
}
if len(docBatch) > 0 {
docChan <- docBatch
}
log.Infof("Loaded %d edges", count)
}()

for batch := range docChan {
for i := 0; i < maxRetries; i++ {
bulk := edgeCo.Bulk()
bulk.Unordered()
for _, data := range batch {
bulk.Upsert(bson.M{"_id": data["_id"]}, data)
}
_, err = bulk.Run()
if err == nil || !isNetError(err) {
i = maxRetries
} else {
log.Infof("Refreshing Connection")
session.Refresh()
}
edgeChan, err := util.StreamEdgesFromFile(edgeFile)
if err != nil {
return err
}
for e := range edgeChan {
data := mongo.PackEdge(e)
if data["_id"] == "" {
data["_id"] = bson.NewObjectId().Hex()
}
docBatch = append(docBatch, data)
if len(docBatch) > batchSize {
bulkEdgeChan <- docBatch
docBatch = make([]map[string]interface{}, 0, batchSize)
}
}
log.Infof("Loaded %d edges", count)
if len(docBatch) > 0 {
bulkEdgeChan <- docBatch
}
close(bulkEdgeChan)
}

return nil
},
}
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -4,7 +4,6 @@ go 1.12

require (
github.com/Shopify/sarama v1.22.1
github.com/bmeg/golib v0.0.0-20170626075926-82a1e1d7a0b2
github.com/bmeg/protoc-gen-grcp-rest-direct v0.0.0-20190228222353-4d40e8b9d305 // indirect
github.com/boltdb/bolt v1.3.1
github.com/ckaznocha/protoc-gen-lint v0.2.1 // indirect
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -17,8 +17,6 @@ github.com/VictoriaMetrics/fastcache v1.5.1/go.mod h1:+jv9Ckb+za/P1ZRg/sulP5Ni1v
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/allegro/bigcache v1.2.1/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/bmeg/golib v0.0.0-20170626075926-82a1e1d7a0b2 h1:pR4J0Eq9bLu2kRq3Xh3iGDaiOHELIKJLPBhB5K8yTJg=
github.com/bmeg/golib v0.0.0-20170626075926-82a1e1d7a0b2/go.mod h1:UYeNUXxiubPvUu5x/Sm/lIIbdhtkFOAdR0P0NTIQrPc=
github.com/bmeg/protoc-gen-grcp-rest-direct v0.0.0-20190228222353-4d40e8b9d305 h1:jFKpPaxW0TZKlh7u4fYtdzoX+JZV/czsi2j6d8Bh21A=
github.com/bmeg/protoc-gen-grcp-rest-direct v0.0.0-20190228222353-4d40e8b9d305/go.mod h1:1S3ijn0o+w7RmSdfX3EQttlrF9csSziK8dpeh0nP4LU=
github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
30 changes: 14 additions & 16 deletions kvgraph/graph.go
Original file line number Diff line number Diff line change
@@ -45,20 +45,23 @@ type kvAddData struct {
// in the graph, it is replaced
func (kgdb *KVInterfaceGDB) AddVertex(vertices []*gripql.Vertex) error {
err := kgdb.kvg.kv.BulkWrite(func(tx kvi.KVBulkWrite) error {
var anyErr error
var bulkErr *multierror.Error
for _, vert := range vertices {
if err := insertVertex(tx, kgdb.kvg.idx, kgdb.graph, vert); err != nil {
anyErr = err
log.Errorf("AddVertex Error %s", err)
bulkErr = multierror.Append(bulkErr, err)
}
}
kgdb.kvg.ts.Touch(kgdb.graph)
return anyErr
return bulkErr.ErrorOrNil()
})
return err
}

func insertVertex(tx kvi.KVBulkWrite, idx *kvindex.KVIndex, graph string, vertex *gripql.Vertex) error {
if err := vertex.Validate(); err != nil {
return err
}

key := VertexKey(graph, vertex.Gid)
value, err := proto.Marshal(vertex)
if err != nil {
@@ -79,6 +82,10 @@ func insertEdge(tx kvi.KVBulkWrite, idx *kvindex.KVIndex, graph string, edge *gr
var err error
var data []byte

if err = edge.Validate(); err != nil {
return err
}

data, err = proto.Marshal(edge)
if err != nil {
return err
@@ -113,14 +120,14 @@ func insertEdge(tx kvi.KVBulkWrite, idx *kvindex.KVIndex, graph string, edge *gr
// in the graph, it is replaced
func (kgdb *KVInterfaceGDB) AddEdge(edges []*gripql.Edge) error {
err := kgdb.kvg.kv.BulkWrite(func(tx kvi.KVBulkWrite) error {
var anyErr error
var bulkErr *multierror.Error
for _, edge := range edges {
if err := insertEdge(tx, kgdb.kvg.idx, kgdb.graph, edge); err != nil {
anyErr = err
bulkErr = multierror.Append(bulkErr, err)
}
}
kgdb.kvg.ts.Touch(kgdb.graph)
return anyErr
return bulkErr.ErrorOrNil()
})
return err
}
@@ -130,21 +137,12 @@ func (kgdb *KVInterfaceGDB) BulkAdd(stream <-chan *gripql.GraphElement) error {
var bulkErr *multierror.Error
for elem := range stream {
if elem.Vertex != nil {
if err := elem.Vertex.Validate(); err != nil {
bulkErr = multierror.Append(bulkErr, err)
continue
}
if err := insertVertex(tx, kgdb.kvg.idx, kgdb.graph, elem.Vertex); err != nil {
bulkErr = multierror.Append(bulkErr, err)
}
continue
}

if elem.Edge != nil {
if err := elem.Edge.Validate(); err != nil {
bulkErr = multierror.Append(bulkErr, err)
continue
}
if err := insertEdge(tx, kgdb.kvg.idx, kgdb.graph, elem.Edge); err != nil {
bulkErr = multierror.Append(bulkErr, err)
}
Loading

0 comments on commit f958057

Please sign in to comment.