Skip to content

Commit

Permalink
Bulk multi-shard DBs fix (#3065)
Browse files Browse the repository at this point in the history
Change summary:

* Each shard's schema contains only the predicates found in that shard rather than all preds.

* Test case for above fix.

* More user-friendly "file not found" messages rather than stack traces when bulk schema or data paths incorrect.
  • Loading branch information
codexnull authored and Javier Alvarado committed Feb 27, 2019
1 parent 0b561bb commit 6699b22
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 23 deletions.
27 changes: 25 additions & 2 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"compress/gzip"
"context"
"fmt"
"hash/adler32"
"io"
"io/ioutil"
"os"
Expand Down Expand Up @@ -282,8 +283,30 @@ func (ld *loader) reduceStage() {
}

func (ld *loader) writeSchema() {
for _, db := range ld.dbs {
ld.schema.write(db)
numDBs := uint32(len(ld.dbs))
preds := make([][]string, numDBs)

// Get all predicates that have data in some DB.
m := make(map[string]struct{})
for i, db := range ld.dbs {
preds[i] = ld.schema.getPredicates(db)
for _, p := range preds[i] {
m[p] = struct{}{}
}
}

// Find any predicates that don't have data in any DB
// and distribute them among all the DBs.
for p := range ld.schema.m {
if _, ok := m[p]; !ok {
i := adler32.Checksum([]byte(p)) % numDBs
preds[i] = append(preds[i], p)
}
}

// Write out each DB's final predicate list.
for i, db := range ld.dbs {
ld.schema.write(db, preds[i])
}
}

Expand Down
6 changes: 6 additions & 0 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ func run() {
if opt.SchemaFile == "" {
fmt.Fprint(os.Stderr, "schema file must be specified.\n")
os.Exit(1)
} else if _, err := os.Stat(opt.SchemaFile); err != nil && os.IsNotExist(err) {
fmt.Fprintf(os.Stderr, "Schema path(%v) does not exist.\n", opt.SchemaFile)
os.Exit(1)
}
if opt.RDFDir == "" && opt.JSONDir == "" {
fmt.Fprint(os.Stderr, "RDF or JSON file(s) must be specified.\n")
Expand All @@ -132,6 +135,9 @@ func run() {
fmt.Fprintf(os.Stderr, "Invalid flags: only one of rdfs(%q) of jsons(%q) may be specified.\n",
opt.RDFDir, opt.JSONDir)
os.Exit(1)
} else if _, err := os.Stat(opt.RDFDir); err != nil && os.IsNotExist(err) {
fmt.Fprintf(os.Stderr, "Data path(%v) does not exist.\n", opt.RDFDir)
os.Exit(1)
}
if opt.ReduceShards > opt.MapShards {
fmt.Fprintf(os.Stderr, "Invalid flags: reduce_shards(%d) should be <= map_shards(%d)\n",
Expand Down
23 changes: 4 additions & 19 deletions dgraph/cmd/bulk/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,25 +117,7 @@ func (s *schemaStore) getPredicates(db *badger.DB) []string {
return preds
}

func (s *schemaStore) write(db *badger.DB) {
// Write schema always at timestamp 1, s.state.writeTs may not be equal to 1
// if bulk loader was restarted or other similar scenarios.

// Get predicates from the schema store so that the db includes all
// predicates from the schema file.
preds := make([]string, 0, len(s.m))
for pred := range s.m {
preds = append(preds, pred)
}

// Add predicates from the db so that final schema includes predicates
// used in the rdf file but not included in the schema file.
for _, pred := range s.getPredicates(db) {
if _, ok := s.m[pred]; !ok {
preds = append(preds, pred)
}
}

func (s *schemaStore) write(db *badger.DB, preds []string) {
txn := db.NewTransactionAt(math.MaxUint64, true)
defer txn.Discard()
for _, pred := range preds {
Expand All @@ -148,5 +130,8 @@ func (s *schemaStore) write(db *badger.DB) {
x.Check(err)
x.Check(txn.SetWithMeta(k, v, posting.BitCompletePosting))
}

// Write schema always at timestamp 1, s.state.writeTs may not be equal to 1
// if bulk loader was restarted or other similar scenarios.
x.Check(txn.CommitAt(1, nil))
}
51 changes: 49 additions & 2 deletions dgraph/cmd/bulk/systest/test-bulk-schema.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@ FATAL() { ERROR "$@"; exit 1; }

set -e

INFO "rebuilding dgraph"

cd $SRCROOT
make install >/dev/null

INFO "running bulk load schema test"

WORKDIR=$(mktemp --tmpdir -d $ME.tmp-XXXXXX)
INFO "using workdir $WORKDIR"
cd $WORKDIR

LOGFILE=$WORKDIR/output.log

trap ForceClean EXIT

function ForceClean
Expand Down Expand Up @@ -118,7 +125,8 @@ function BulkLoadExportedData
dgraph bulk -z localhost:$ZERO_PORT \
-s ../dir1/export/*/g01.schema.gz \
-r ../dir1/export/*/g01.rdf.gz \
>bulk.log 2>&1 </dev/null
>$LOGFILE 2>&1 </dev/null
mv $LOGFILE $LOGFILE.export
}

function BulkLoadFixtureData
Expand Down Expand Up @@ -146,7 +154,42 @@ _:et <revenue> "792.9" .
EOF

dgraph bulk -z localhost:$ZERO_PORT -s fixture.schema -r fixture.rdf \
>bulk.log 2>&1 </dev/null
>$LOGFILE 2>&1 </dev/null
mv $LOGFILE $LOGFILE.fixture
}

function TestBulkLoadMultiShard
{
INFO "bulk loading into multiple shards"

cat >fixture.schema <<EOF
name:string @index(term) .
genre:default .
language:string .
EOF

cat >fixture.rdf <<EOF
_:et <name> "E.T. the Extra-Terrestrial" .
_:et <genre> "Science Fiction" .
_:et <revenue> "792.9" .
EOF

dgraph bulk -z localhost:$ZERO_PORT -s fixture.schema -r fixture.rdf \
--map_shards 2 --reduce_shards 2 \
>$LOGFILE 2>&1 </dev/null
mv $LOGFILE $LOGFILE.multi

INFO "checking that each predicate appears in only one shard"

dgraph debug -p out/0/p 2>|/dev/null | grep '{s}' | cut -d' ' -f4 > all_dbs.out
dgraph debug -p out/1/p 2>|/dev/null | grep '{s}' | cut -d' ' -f4 >> all_dbs.out
diff <(LC_ALL=C sort all_dbs.out | uniq -c) - <<EOF
1 _predicate_
1 genre
1 language
1 name
1 revenue
EOF
}

function StopServers
Expand Down Expand Up @@ -228,6 +271,10 @@ diff -b - dir3/schema.out <<EOF || FATAL "schema incorrect"
}
EOF

StartZero
TestBulkLoadMultiShard
StopServers

INFO "schema is correct"

Cleanup
Expand Down

0 comments on commit 6699b22

Please sign in to comment.