Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Live & bulk loader changes #2961

Merged
merged 62 commits into from
Feb 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
6145526
Accept directory as argument to --rdfs and look for .rdf and .json fi…
Jan 12, 2019
6e395be
Merge branch 'master' into javier/issue2889_live_load_json
Jan 12, 2019
98e0ea1
Add x.FileReader() function to return a reader that decompresses if n…
Jan 14, 2019
3ca188a
Forward file to the RDF or JSON processor as appropriate.
Jan 14, 2019
de3da8f
Working version?
Jan 15, 2019
c4291db
Add live test stub.
Jan 17, 2019
5866bd7
Ensure pending nquads are submitted after EOF.
Jan 17, 2019
86c421f
Change references to RDFs to N-Quads to be more correct.
Jan 17, 2019
9cb6dca
Merge master.
Jan 17, 2019
bdfcd85
Export TMPDIR in test.sh so that hopefully tests will use it. Change …
Jan 17, 2019
1e9c530
Change test data schema to reflect changes merged from master.
Jan 17, 2019
c495a01
Live JSON load test.
Jan 17, 2019
f499943
Auto-detect JSON in load files with no .rdf or .json extension in the…
Jan 18, 2019
7b5ad3b
Fix bug reading streamed JSON. Add test of loading streamed JSON.
Jan 18, 2019
405c60a
Live load testing improvements.
Jan 18, 2019
37c3437
First attempt at autogenerating uid from key fields.
Jan 19, 2019
afe44d4
Replace RDFs in message with N-Quads.
Jan 19, 2019
9da9d8e
Add debugging code.
Jan 19, 2019
fce4ff6
Fix bug assigning blank ids. Move more testing code to testing package.
Jan 21, 2019
fbad4cf
Add test of live loading multiple JSON files.
Jan 21, 2019
8c073d5
Add test of live loading JSON without UID field. Testing improvements.
Jan 21, 2019
f828d57
Add tests of live JSON load with auto-uid.
Jan 22, 2019
44b1a73
Merge branch 'master' into javier/issue2889_live_load_json
Jan 22, 2019
cc59eed
Small restartCluster fix.
Jan 22, 2019
f6b25ca
Merge master.
Jan 22, 2019
06fd748
Rename fields to be more correct or specific.
Jan 22, 2019
a23a169
Improved comments.
Jan 22, 2019
4495702
Remove test.sh change that should be part of another branch.
Jan 22, 2019
2a94576
Fix bugs catching errors.
Jan 23, 2019
c9833fd
Make auto-added blank uid fields per file.
Jan 23, 2019
cee8c8d
Don't bother hashing the concatenated key values to generate a blank …
Jan 24, 2019
3b7b648
Refactor processRdfFile
Jan 25, 2019
ad641a2
Minor changes.
Jan 28, 2019
5b4c423
Add JsonToNquads to replace NquadsFromJson later.
Jan 28, 2019
4747690
Rename some functions. Don't dump stack trace for input error.
Jan 28, 2019
6f01e18
Move chunk test from bulk package to x package.
Jan 28, 2019
0cb8030
Change processRdf to use a chunker as well.
Jan 28, 2019
ce62a5a
Move chunker from x package to a new loadfile package. Remove bulk ch…
Jan 29, 2019
7286176
Merge master.
Jan 30, 2019
1a409cf
Refactor more bulk/live code.
Jan 31, 2019
0454e92
Minor testing changes.
Jan 31, 2019
77d029b
Merge master.
Jan 31, 2019
50f5744
Merge master.
Jan 31, 2019
500fa12
Add a small visual indicator of when a long-running test starts.
Jan 31, 2019
05ff572
Fix rdfChunker.Parse to parse all RDF lines in chunk instead of only …
Jan 31, 2019
487dcc5
Add check for EOF in RDF chunk parser.
Jan 31, 2019
17125d9
Add --key option to bulk loader for parity with live loader.
Jan 31, 2019
8e7b581
Fix batching, which was broken by live/bulk refactoring.
Feb 1, 2019
1c2c9ef
Parse key fields earlier in the process.
Feb 1, 2019
116a794
Cleanup suggestions from PR.
Feb 1, 2019
8f16e04
PR review fixes.
Feb 5, 2019
7685150
Fix warning.
Feb 5, 2019
dd237b1
Remove --key support since it requires more thought.
Feb 5, 2019
db7f34f
Remove two-line functions processJsonFile and processRdfFile.
Feb 6, 2019
915eef8
Inline nextNquads() and finalNquads() into processLoadFile().
Feb 6, 2019
d09e925
Rename package loadfile to chunker.
Feb 6, 2019
94f355b
Move package rdf to chunker/rdf.
Feb 6, 2019
9f39ac9
WIP...
Feb 6, 2019
6872833
Move json and rdf nquad parsing under chunker.
Feb 6, 2019
ec990e2
Move FileReader() and IsJSONData() from x package to chunker.
Feb 6, 2019
ac41b58
Rename live --rdfs option to --files since it can load more than *.rd…
Feb 6, 2019
6143636
Fix a bug where batch slice was being modified after sending to a cha…
manishrjain Feb 6, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 135 additions & 79 deletions dgraph/cmd/bulk/chunk.go → chunker/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,54 +14,61 @@
* limitations under the License.
*/

package bulk
package chunker

import (
"bufio"
"bytes"
"compress/gzip"
encjson "encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"unicode"

"github.com/dgraph-io/dgraph/edgraph"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/rdf"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/dgo/protos/api"
"github.com/dgraph-io/dgo/x"
"github.com/dgraph-io/dgraph/chunker/json"
"github.com/dgraph-io/dgraph/chunker/rdf"

"github.com/pkg/errors"
)

type chunker interface {
begin(r *bufio.Reader) error
chunk(r *bufio.Reader) (*bytes.Buffer, error)
end(r *bufio.Reader) error
parse(chunkBuf *bytes.Buffer) ([]gql.NQuad, error)
type Chunker interface {
Begin(r *bufio.Reader) error
Chunk(r *bufio.Reader) (*bytes.Buffer, error)
End(r *bufio.Reader) error
Parse(chunkBuf *bytes.Buffer) ([]*api.NQuad, error)
}

type rdfChunker struct{}
type jsonChunker struct{}

const (
rdfInput int = iota
jsonInput
RdfInput int = iota
JsonInput
)

func newChunker(inputFormat int) chunker {
func NewChunker(inputFormat int) Chunker {
switch inputFormat {
case rdfInput:
case RdfInput:
return &rdfChunker{}
case jsonInput:
case JsonInput:
return &jsonChunker{}
default:
panic("unknown loader type")
panic("unknown chunker type")
}
}

func (rdfChunker) begin(r *bufio.Reader) error {
// RDF files don't require any special processing at the beginning of the file.
func (rdfChunker) Begin(r *bufio.Reader) error {
return nil
}

func (rdfChunker) chunk(r *bufio.Reader) (*bytes.Buffer, error) {
func (rdfChunker) Chunk(r *bufio.Reader) (*bytes.Buffer, error) {
batch := new(bytes.Buffer)
batch.Grow(1 << 20)
for lineCount := 0; lineCount < 1e5; lineCount++ {
Expand Down Expand Up @@ -93,63 +100,36 @@ func (rdfChunker) chunk(r *bufio.Reader) (*bytes.Buffer, error) {
return batch, nil
}

func (rdfChunker) end(r *bufio.Reader) error {
return nil
}

func (rdfChunker) parse(chunkBuf *bytes.Buffer) ([]gql.NQuad, error) {
str, readErr := chunkBuf.ReadString('\n')
if readErr != nil && readErr != io.EOF {
x.Check(readErr)
}

nq, parseErr := rdf.Parse(strings.TrimSpace(str))
if parseErr == rdf.ErrEmpty {
return nil, readErr
} else if parseErr != nil {
return nil, errors.Wrapf(parseErr, "while parsing line %q", str)
func (rdfChunker) Parse(chunkBuf *bytes.Buffer) ([]*api.NQuad, error) {
if chunkBuf.Len() == 0 {
return nil, io.EOF
}

return []gql.NQuad{{NQuad: &nq}}, readErr
}

func slurpSpace(r *bufio.Reader) error {
for {
ch, _, err := r.ReadRune()
if err != nil {
return err
nqs := make([]*api.NQuad, 0)
for chunkBuf.Len() > 0 {
str, err := chunkBuf.ReadString('\n')
if err != nil && err != io.EOF {
x.Check(err)
}
if !unicode.IsSpace(ch) {
x.Check(r.UnreadRune())
return nil

nq, err := rdf.Parse(strings.TrimSpace(str))
if err == rdf.ErrEmpty {
continue // blank line or comment
} else if err != nil {
return nil, errors.Wrapf(err, "while parsing line %q", str)
}
nqs = append(nqs, &nq)
}
}

func slurpQuoted(r *bufio.Reader, out *bytes.Buffer) error {
for {
ch, _, err := r.ReadRune()
if err != nil {
return err
}
x.Check2(out.WriteRune(ch))
return nqs, nil
}

if ch == '\\' {
// Pick one more rune.
esc, _, err := r.ReadRune()
if err != nil {
return err
}
x.Check2(out.WriteRune(esc))
continue
}
if ch == '"' {
return nil
}
}
// RDF files don't require any special processing at the end of the file.
func (rdfChunker) End(r *bufio.Reader) error {
return nil
}

func (jsonChunker) begin(r *bufio.Reader) error {
func (jsonChunker) Begin(r *bufio.Reader) error {
// The JSON file to load must be an array of maps (that is, '[ { ... }, { ... }, ... ]').
// This function must be called before calling readJSONChunk for the first time to advance
// the Reader past the array start token ('[') so that calls to readJSONChunk can read
Expand All @@ -167,7 +147,7 @@ func (jsonChunker) begin(r *bufio.Reader) error {
return nil
}

func (jsonChunker) chunk(r *bufio.Reader) (*bytes.Buffer, error) {
func (jsonChunker) Chunk(r *bufio.Reader) (*bytes.Buffer, error) {
out := new(bytes.Buffer)
out.Grow(1 << 20)

Expand Down Expand Up @@ -231,27 +211,103 @@ func (jsonChunker) chunk(r *bufio.Reader) (*bytes.Buffer, error) {
return out, nil
}

func (jsonChunker) end(r *bufio.Reader) error {
func (jsonChunker) Parse(chunkBuf *bytes.Buffer) ([]*api.NQuad, error) {
if chunkBuf.Len() == 0 {
return nil, io.EOF
}

nqs, err := json.Parse(chunkBuf.Bytes(), json.SetNquads)
if err != nil && err != io.EOF {
x.Check(err)
}
chunkBuf.Reset()

return nqs, err
}

func (jsonChunker) End(r *bufio.Reader) error {
if slurpSpace(r) == io.EOF {
return nil
}
return errors.New("Not all of json file consumed")
return errors.New("Not all of JSON file consumed")
}

func (jsonChunker) parse(chunkBuf *bytes.Buffer) ([]gql.NQuad, error) {
if chunkBuf.Len() == 0 {
return nil, io.EOF
func slurpSpace(r *bufio.Reader) error {
for {
ch, _, err := r.ReadRune()
if err != nil {
return err
}
if !unicode.IsSpace(ch) {
x.Check(r.UnreadRune())
return nil
}
}
}

nqs, err := edgraph.NquadsFromJson(chunkBuf.Bytes())
if err != nil && err != io.EOF {
func slurpQuoted(r *bufio.Reader, out *bytes.Buffer) error {
for {
ch, _, err := r.ReadRune()
if err != nil {
return err
}
x.Check2(out.WriteRune(ch))

if ch == '\\' {
// Pick one more rune.
esc, _, err := r.ReadRune()
if err != nil {
return err
}
x.Check2(out.WriteRune(esc))
continue
}
if ch == '"' {
return nil
}
}
}

// FileReader returns an open reader and file on the given file. Gzip-compressed input is detected
// and decompressed automatically even without the gz extension. The caller is responsible for
// calling the returned cleanup function when done with the reader.
func FileReader(file string) (rd *bufio.Reader, cleanup func()) {
f, err := os.Open(file)
x.Check(err)

cleanup = func() { f.Close() }

if filepath.Ext(file) == ".gz" {
gzr, err := gzip.NewReader(f)
x.Check(err)
rd = bufio.NewReader(gzr)
cleanup = func() { f.Close(); gzr.Close() }
} else {
rd = bufio.NewReader(f)
buf, _ := rd.Peek(512)

typ := http.DetectContentType(buf)
if typ == "application/x-gzip" {
gzr, err := gzip.NewReader(rd)
x.Check(err)
rd = bufio.NewReader(gzr)
cleanup = func() { f.Close(); gzr.Close() }
}
}
chunkBuf.Reset()

gqlNq := make([]gql.NQuad, len(nqs))
for i, nq := range nqs {
gqlNq[i] = gql.NQuad{NQuad: nq}
return rd, cleanup
}

// IsJSONData returns true if the reader, which should be at the start of the stream, is reading
// a JSON stream, false otherwise.
func IsJSONData(r *bufio.Reader) (bool, error) {
buf, err := r.Peek(512)
if err != nil && err != io.EOF {
return false, err
}
return gqlNq, err

de := encjson.NewDecoder(bytes.NewReader(buf))
_, err = de.Token()

return err == nil, nil
}
27 changes: 14 additions & 13 deletions dgraph/cmd/bulk/chunk_test.go → chunker/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package bulk

package chunker

import (
"bufio"
Expand Down Expand Up @@ -45,8 +46,8 @@ func TestJSONLoadStart(t *testing.T) {
}

for _, test := range tests {
chunker := newChunker(jsonInput)
require.Error(t, chunker.begin(bufioReader(test.json)), test.desc)
chunker := NewChunker(JsonInput)
require.Error(t, chunker.Begin(bufioReader(test.json)), test.desc)
}
}

Expand All @@ -63,11 +64,11 @@ func TestJSONLoadReadNext(t *testing.T) {
{"[{}", "malformed array"},
}
for _, test := range tests {
chunker := newChunker(jsonInput)
chunker := NewChunker(JsonInput)
reader := bufioReader(test.json)
require.NoError(t, chunker.begin(reader), test.desc)
require.NoError(t, chunker.Begin(reader), test.desc)

json, err := chunker.chunk(reader)
json, err := chunker.Chunk(reader)
//fmt.Fprintf(os.Stderr, "err = %v, json = %v\n", err, json)
require.Nil(t, json, test.desc)
require.Error(t, err, test.desc)
Expand Down Expand Up @@ -112,11 +113,11 @@ func TestJSONLoadSuccessFirst(t *testing.T) {
},
}
for _, test := range tests {
chunker := newChunker(jsonInput)
chunker := NewChunker(JsonInput)
reader := bufioReader(test.json)
require.NoError(t, chunker.begin(reader), test.desc)
require.NoError(t, chunker.Begin(reader), test.desc)

json, err := chunker.chunk(reader)
json, err := chunker.Chunk(reader)
if err == io.EOF {
// pass
} else {
Expand Down Expand Up @@ -175,23 +176,23 @@ func TestJSONLoadSuccessAll(t *testing.T) {
}`,
}

chunker := newChunker(jsonInput)
chunker := NewChunker(JsonInput)
reader := bufioReader(testDoc)

var json *bytes.Buffer
var idx int

err := chunker.begin(reader)
err := chunker.Begin(reader)
require.NoError(t, err, "begin reading JSON document")
for idx = 0; err == nil; idx++ {
desc := fmt.Sprintf("reading chunk #%d", idx+1)
json, err = chunker.chunk(reader)
json, err = chunker.Chunk(reader)
//fmt.Fprintf(os.Stderr, "err = %v, json = %v\n", err, json)
if err != io.EOF {
require.NoError(t, err, desc)
require.Equal(t, testChunks[idx], json.String(), desc)
}
}
err = chunker.end(reader)
err = chunker.End(reader)
require.NoError(t, err, "end reading JSON document")
}
Loading