From 93887e5f64c61028da105b39c953a9158c5d2a12 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Tue, 19 Jun 2018 19:29:38 -0700 Subject: [PATCH] Fix bugs in bulk loader. Add Dgraph debug tool. (#2449) When building multiple shards, we want each predicate to only lie to one shard, and nowhere else. Bulk loader was doing that, but then adding the schema for all predicates on all shards, which causes confusion about ownership of the predicates. This changes that to only add schema for the predicates that the shard holds. Similarly, when outputting _predicate_ edge, the shard being used was the one corresponding to the original predicate. Instead, we should use the shard corresponding to _predicate_. This PR fixes #2129 . Added a new debug tool which can iterate over posting store and spit out stats per predicate. Useful for debugging. --- dgraph/cmd/bulk/loader.go | 19 ++--- dgraph/cmd/bulk/mapper.go | 13 ++-- dgraph/cmd/bulk/merge_shards.go | 6 +- dgraph/cmd/bulk/schema.go | 37 +++++++++- dgraph/cmd/bulk/shuffle.go | 5 +- dgraph/cmd/debug/run.go | 123 ++++++++++++++++++++++++++++++++ dgraph/cmd/root.go | 3 +- 7 files changed, 182 insertions(+), 24 deletions(-) create mode 100644 dgraph/cmd/debug/run.go diff --git a/dgraph/cmd/bulk/loader.go b/dgraph/cmd/bulk/loader.go index 054b9259583..ab6394abbf5 100644 --- a/dgraph/cmd/bulk/loader.go +++ b/dgraph/cmd/bulk/loader.go @@ -202,15 +202,6 @@ func (ld *loader) mapStage() { LRUSize: 1 << 19, }) - var mapperWg sync.WaitGroup - mapperWg.Add(len(ld.mappers)) - for _, m := range ld.mappers { - go func(m *mapper) { - m.run() - mapperWg.Done() - }(m) - } - var readers []*bufio.Reader for _, rdfFile := range findRDFFiles(ld.opt.RDFDir) { f, err := os.Open(rdfFile) @@ -230,6 +221,16 @@ func (ld *loader) mapStage() { os.Exit(1) } + var mapperWg sync.WaitGroup + mapperWg.Add(len(ld.mappers)) + for _, m := range ld.mappers { + go func(m *mapper) { + m.run() + mapperWg.Done() + }(m) + } + + // This is the main map loop. thr := x.NewThrottle(ld.opt.NumGoroutines) for _, r := range readers { thr.Start() diff --git a/dgraph/cmd/bulk/mapper.go b/dgraph/cmd/bulk/mapper.go index 62e09797cc0..a045c267280 100644 --- a/dgraph/cmd/bulk/mapper.go +++ b/dgraph/cmd/bulk/mapper.go @@ -70,6 +70,8 @@ func less(lhs, rhs *intern.MapEntry) bool { } func (m *mapper) writeMapEntriesToFile(entriesBuf []byte, shardIdx int) { + defer m.shards[shardIdx].mu.Unlock() // Locked by caller. + buf := entriesBuf var entries []*intern.MapEntry for len(buf) > 0 { @@ -105,7 +107,6 @@ func (m *mapper) writeMapEntriesToFile(entriesBuf []byte, shardIdx int) { ) x.Check(os.MkdirAll(filepath.Dir(filename), 0755)) x.Check(x.WriteFileSync(filename, entriesBuf, 0644)) - m.shards[shardIdx].mu.Unlock() // Locked by caller. } func (m *mapper) run() { @@ -121,7 +122,7 @@ func (m *mapper) run() { } rdf = strings.TrimSpace(rdf) - x.Check(m.parseRDF(rdf)) + x.Check(m.processRDF(rdf)) atomic.AddInt64(&m.prog.rdfCount, 1) for i := range m.shards { sh := &m.shards[i] @@ -162,7 +163,7 @@ func (m *mapper) addMapEntry(key []byte, p *intern.Posting, shard int) { x.Check(err) } -func (m *mapper) parseRDF(rdfLine string) error { +func (m *mapper) processRDF(rdfLine string) error { nq, err := parseNQuad(rdfLine) if err != nil { if err == rdf.ErrEmpty { @@ -199,14 +200,14 @@ func (m *mapper) processNQuad(nq gql.NQuad) { key = x.ReverseKey(nq.Predicate, oid) m.addMapEntry(key, rev, shard) } + m.addIndexMapEntries(nq, de) if m.opt.ExpandEdges { + shard := m.state.shards.shardFor("_predicate_") key = x.DataKey("_predicate_", sid) pp := m.createPredicatePosting(nq.Predicate) m.addMapEntry(key, pp, shard) } - - m.addIndexMapEntries(nq, de) } func (m *mapper) lookupUid(xid string) uint64 { @@ -287,9 +288,7 @@ func (m *mapper) addIndexMapEntries(nq gql.NQuad, de *intern.DirectedEdge) { } sch := m.schema.getSchema(nq.GetPredicate()) - for _, tokerName := range sch.GetTokenizer() { - // Find tokeniser. toker, ok := tok.GetTokenizer(tokerName) if !ok { diff --git a/dgraph/cmd/bulk/merge_shards.go b/dgraph/cmd/bulk/merge_shards.go index fc9670f383a..7129538db8b 100644 --- a/dgraph/cmd/bulk/merge_shards.go +++ b/dgraph/cmd/bulk/merge_shards.go @@ -31,8 +31,10 @@ func mergeMapShardsIntoReduceShards(opt options) { // until there are no more map shards left. Should be a good approximation. for _, shard := range mapShards { sortBySize(reduceShards) - x.Check(os.Rename(shard, filepath.Join( - reduceShards[len(reduceShards)-1], filepath.Base(shard)))) + reduceShard := filepath.Join( + reduceShards[len(reduceShards)-1], filepath.Base(shard)) + x.Printf("Shard %s -> Reduce %s\n", shard, reduceShard) + x.Check(os.Rename(shard, reduceShard)) } } diff --git a/dgraph/cmd/bulk/schema.go b/dgraph/cmd/bulk/schema.go index 7af839f9da4..58a8c5db371 100644 --- a/dgraph/cmd/bulk/schema.go +++ b/dgraph/cmd/bulk/schema.go @@ -10,6 +10,7 @@ package bulk import ( "fmt" "log" + "math" "sync" "github.com/dgraph-io/badger" @@ -82,11 +83,43 @@ func (s *schemaStore) validateType(de *intern.DirectedEdge, objectIsUID bool) { } } +func (s *schemaStore) getPredicates(db *badger.ManagedDB) []string { + txn := db.NewTransactionAt(math.MaxUint64, false) + defer txn.Discard() + + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = false + itr := txn.NewIterator(opts) + defer itr.Close() + + m := make(map[string]struct{}) + for itr.Rewind(); itr.Valid(); { + item := itr.Item() + pk := x.Parse(item.Key()) + m[pk.Attr] = struct{}{} + itr.Seek(pk.SkipPredicate()) + continue + } + + var preds []string + for pred := range m { + preds = append(preds, pred) + } + return preds +} + func (s *schemaStore) write(db *badger.ManagedDB) { // Write schema always at timestamp 1, s.state.writeTs may not be equal to 1 // if bulk loader was restarted or other similar scenarios. - txn := db.NewTransactionAt(1, true) - for pred, sch := range s.m { + preds := s.getPredicates(db) + + txn := db.NewTransactionAt(math.MaxUint64, true) + defer txn.Discard() + for _, pred := range preds { + sch, ok := s.m[pred] + if !ok { + continue + } k := x.SchemaKey(pred) v, err := sch.Marshal() x.Check(err) diff --git a/dgraph/cmd/bulk/shuffle.go b/dgraph/cmd/bulk/shuffle.go index 0e13ae4a977..f248e1fa29b 100644 --- a/dgraph/cmd/bulk/shuffle.go +++ b/dgraph/cmd/bulk/shuffle.go @@ -36,8 +36,8 @@ func (s *shuffler) run() { thr := x.NewThrottle(s.opt.NumShufflers) for i := 0; i < s.opt.ReduceShards; i++ { thr.Start() - go func(i int, db *badger.ManagedDB) { - mapFiles := filenamesInTree(shardDirs[i]) + go func(shardId int, db *badger.ManagedDB) { + mapFiles := filenamesInTree(shardDirs[shardId]) shuffleInputChs := make([]chan *intern.MapEntry, len(mapFiles)) for i, mapFile := range mapFiles { shuffleInputChs[i] = make(chan *intern.MapEntry, 1000) @@ -98,7 +98,6 @@ func readMapOutput(filename string, mapEntryCh chan<- *intern.MapEntry) { } func (s *shuffler) shufflePostings(mapEntryChs []chan *intern.MapEntry, ci *countIndexer) { - var ph postingHeap for _, ch := range mapEntryChs { heap.Push(&ph, heapNode{mapEntry: <-ch, ch: ch}) diff --git a/dgraph/cmd/debug/run.go b/dgraph/cmd/debug/run.go new file mode 100644 index 00000000000..905569041e7 --- /dev/null +++ b/dgraph/cmd/debug/run.go @@ -0,0 +1,123 @@ +/* + * Copyright 2017-2018 Dgraph Labs, Inc. + * + * This file is available under the Apache License, Version 2.0, + * with the Commons Clause restriction. + */ + +package debug + +import ( + "fmt" + "log" + "math" + "sort" + + "github.com/dgraph-io/badger" + "github.com/dgraph-io/badger/options" + "github.com/dgraph-io/dgraph/x" + "github.com/spf13/cobra" +) + +var Debug x.SubCommand + +func init() { + Debug.Cmd = &cobra.Command{ + Use: "debug", + Short: "Debug Dgraph instance", + Run: func(cmd *cobra.Command, args []string) { + run() + }, + } + + flag := Debug.Cmd.Flags() + flag.StringP("postings", "p", "", "Directory where posting lists are stored.") + flag.BoolP("predicates", "s", false, "List all the predicates.") + flag.BoolP("readonly", "o", true, "Open in read only mode.") +} + +type Stats struct { + Data int + Index int + Schema int + Reverse int + Count int + Total int +} + +func run() { + opts := badger.DefaultOptions + opts.Dir = Debug.Conf.GetString("postings") + opts.ValueDir = Debug.Conf.GetString("postings") + opts.TableLoadingMode = options.MemoryMap + opts.ReadOnly = Debug.Conf.GetBool("readonly") + + x.AssertTruef(len(opts.Dir) > 0, "No posting dir specified.") + fmt.Printf("Opening DB: %s\n", opts.Dir) + db, err := badger.OpenManaged(opts) + x.Check(err) + defer db.Close() + + if Debug.Conf.GetBool("predicates") { + txn := db.NewTransactionAt(math.MaxUint64, false) + defer txn.Discard() + + iopts := badger.DefaultIteratorOptions + iopts.PrefetchValues = false + itr := txn.NewIterator(iopts) + defer itr.Close() + + var loop int + m := make(map[string]*Stats) + for itr.Rewind(); itr.Valid(); itr.Next() { + item := itr.Item() + pk := x.Parse(item.Key()) + stats, ok := m[pk.Attr] + if !ok { + stats = new(Stats) + m[pk.Attr] = stats + } + stats.Total += 1 + // Don't use a switch case here. Because multiple of these can be true. In particular, + // IsSchema can be true alongside IsData. + if pk.IsData() { + stats.Data += 1 + } + if pk.IsIndex() { + stats.Index += 1 + } + if pk.IsCount() { + stats.Count += 1 + } + if pk.IsSchema() { + stats.Schema += 1 + } + if pk.IsReverse() { + stats.Reverse += 1 + } + loop++ + } + + type C struct { + pred string + stats *Stats + } + + var counts []C + for pred, stats := range m { + counts = append(counts, C{pred, stats}) + } + sort.Slice(counts, func(i, j int) bool { + return counts[i].stats.Total > counts[j].stats.Total + }) + for _, c := range counts { + st := c.stats + fmt.Printf("Total: %-8d. Predicate: %-20s\n", st.Total, c.pred) + fmt.Printf(" Data: %d Index: %d Reverse: %d Schema: %d Count: %d Predicate: %s\n\n", + st.Data, st.Index, st.Reverse, st.Schema, st.Count, c.pred) + } + fmt.Printf("Found %d keys\n", loop) + return + } + log.Fatalln("Please provide a valid option for diagnosis.") +} diff --git a/dgraph/cmd/root.go b/dgraph/cmd/root.go index c052ad2aaa2..0b5f06728f8 100644 --- a/dgraph/cmd/root.go +++ b/dgraph/cmd/root.go @@ -12,6 +12,7 @@ import ( "os" "github.com/dgraph-io/dgraph/dgraph/cmd/bulk" + "github.com/dgraph-io/dgraph/dgraph/cmd/debug" "github.com/dgraph-io/dgraph/dgraph/cmd/live" "github.com/dgraph-io/dgraph/dgraph/cmd/server" "github.com/dgraph-io/dgraph/dgraph/cmd/version" @@ -58,7 +59,7 @@ func init() { rootConf.BindPFlags(RootCmd.PersistentFlags()) var subcommands = []*x.SubCommand{ - &bulk.Bulk, &live.Live, &server.Server, &zero.Zero, &version.Version, + &bulk.Bulk, &live.Live, &server.Server, &zero.Zero, &version.Version, &debug.Debug, } for _, sc := range subcommands { RootCmd.AddCommand(sc.Cmd)