Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk multi-shard DBs fix #3065

Merged
merged 11 commits into from
Feb 26, 2019
27 changes: 25 additions & 2 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"compress/gzip"
"context"
"fmt"
"hash/adler32"
"io"
"io/ioutil"
"os"
Expand Down Expand Up @@ -238,8 +239,30 @@ func (ld *loader) reduceStage() {
}

func (ld *loader) writeSchema() {
for _, db := range ld.dbs {
ld.schema.write(db)
numDBs := uint32(len(ld.dbs))
preds := make([][]string, numDBs)

// Get all predicates that have data in some DB.
m := make(map[string]struct{})
for i, db := range ld.dbs {
preds[i] = ld.schema.getPredicates(db)
for _, p := range preds[i] {
m[p] = struct{}{}
}
}

// Find any predicates that don't have data in any DB
// and distribute them among all the DBs.
for p := range ld.schema.m {
if _, ok := m[p]; !ok {
i := adler32.Checksum([]byte(p)) % numDBs
preds[i] = append(preds[i], p)
}
}

// Write out each DB's final predicate list.
for i, db := range ld.dbs {
ld.schema.write(db, preds[i])
}
}

Expand Down
6 changes: 6 additions & 0 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,16 @@ func run() {
if opt.SchemaFile == "" {
fmt.Fprint(os.Stderr, "Schema file must be specified.\n")
os.Exit(1)
} else if _, err := os.Stat(opt.SchemaFile); err != nil && os.IsNotExist(err) {
fmt.Fprintf(os.Stderr, "Schema path(%v) does not exist.\n", opt.SchemaFile)
os.Exit(1)
}
if opt.DataFiles == "" {
fmt.Fprint(os.Stderr, "RDF or JSON file(s) location must be specified.\n")
os.Exit(1)
} else if _, err := os.Stat(opt.DataFiles); err != nil && os.IsNotExist(err) {
fmt.Fprintf(os.Stderr, "Data path(%v) does not exist.\n", opt.DataFiles)
os.Exit(1)
}
if opt.ReduceShards > opt.MapShards {
fmt.Fprintf(os.Stderr, "Invalid flags: reduce_shards(%d) should be <= map_shards(%d)\n",
Expand Down
23 changes: 4 additions & 19 deletions dgraph/cmd/bulk/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,25 +117,7 @@ func (s *schemaStore) getPredicates(db *badger.DB) []string {
return preds
}

func (s *schemaStore) write(db *badger.DB) {
// Write schema always at timestamp 1, s.state.writeTs may not be equal to 1
// if bulk loader was restarted or other similar scenarios.

// Get predicates from the schema store so that the db includes all
// predicates from the schema file.
preds := make([]string, 0, len(s.m))
for pred := range s.m {
preds = append(preds, pred)
}

// Add predicates from the db so that final schema includes predicates
// used in the rdf file but not included in the schema file.
for _, pred := range s.getPredicates(db) {
if _, ok := s.m[pred]; !ok {
preds = append(preds, pred)
}
}

func (s *schemaStore) write(db *badger.DB, preds []string) {
txn := db.NewTransactionAt(math.MaxUint64, true)
defer txn.Discard()
for _, pred := range preds {
Expand All @@ -148,5 +130,8 @@ func (s *schemaStore) write(db *badger.DB) {
x.Check(err)
x.Check(txn.SetWithMeta(k, v, posting.BitCompletePosting))
}

// Write schema always at timestamp 1, s.state.writeTs may not be equal to 1
// if bulk loader was restarted or other similar scenarios.
x.Check(txn.CommitAt(1, nil))
}
48 changes: 45 additions & 3 deletions dgraph/cmd/bulk/systest/test-bulk-schema.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ FATAL() { ERROR "$@"; exit 1; }

set -e

INFO "rebuilding dgraph"

cd $SRCROOT
make install >/dev/null

INFO "running bulk load schema test"

WORKDIR=$(mktemp --tmpdir -d $ME.tmp-XXXXXX)
Expand Down Expand Up @@ -118,7 +123,6 @@ function DoExport
sleep 2
docker cp bank-dg1:/data/dg1/export .
sleep 1
set +x
}

function BulkLoadExportedData
Expand All @@ -128,7 +132,7 @@ function BulkLoadExportedData
-s ../dir1/export/*/g01.schema.gz \
-f ../dir1/export/*/g01.rdf.gz \
>$LOGFILE 2>&1 </dev/null
rm -f $LOGFILE
mv $LOGFILE $LOGFILE.export
}

function BulkLoadFixtureData
Expand Down Expand Up @@ -157,7 +161,41 @@ EOF

dgraph bulk -z localhost:$ZERO_PORT -s fixture.schema -f fixture.rdf \
>$LOGFILE 2>&1 </dev/null
rm -f $LOGFILE
mv $LOGFILE $LOGFILE.fixture
}

function TestBulkLoadMultiShard
{
INFO "bulk loading into multiple shards"

cat >fixture.schema <<EOF
name:string @index(term) .
genre:default .
language:string .
EOF

cat >fixture.rdf <<EOF
_:et <name> "E.T. the Extra-Terrestrial" .
_:et <genre> "Science Fiction" .
_:et <revenue> "792.9" .
EOF

dgraph bulk -z localhost:$ZERO_PORT -s fixture.schema -f fixture.rdf \
--map_shards 2 --reduce_shards 2 \
>$LOGFILE 2>&1 </dev/null
mv $LOGFILE $LOGFILE.multi

INFO "checking that each predicate appears in only one shard"

dgraph debug -p out/0/p 2>|/dev/null | grep '{s}' | cut -d' ' -f4 > all_dbs.out
dgraph debug -p out/1/p 2>|/dev/null | grep '{s}' | cut -d' ' -f4 >> all_dbs.out
diff <(LC_ALL=C sort all_dbs.out | uniq -c) - <<EOF
1 _predicate_
1 genre
1 language
1 name
1 revenue
EOF
}

function StopServers
Expand Down Expand Up @@ -239,6 +277,10 @@ diff -b - dir3/schema.out <<EOF || FATAL "schema incorrect"
}
EOF

StartZero
TestBulkLoadMultiShard
StopServers

INFO "schema is correct"

Cleanup
Expand Down