Skip to content

Commit

Permalink
Live & bulk loader changes (hypermodeinc#2961)
Browse files Browse the repository at this point in the history
Fixes hypermodeinc#2889, hypermodeinc#2927.

Summary of loader changes:

    * Support live loading JSON files

    * Support loading RDF or JSON stream instead of requiring files in live loader

    * Auto-detect compressed load data instead of requiring extension in filename in both loaders

    * Auto-detect JSON load data instead of requiring extension in filename in both loaders

    * Lots of refactoring of live and bulk loaders to share code for the same functionality
  • Loading branch information
codexnull authored and dna2github committed Jul 19, 2019
1 parent b6a19a2 commit a323fdd
Show file tree
Hide file tree
Showing 32 changed files with 1,287 additions and 620 deletions.
214 changes: 135 additions & 79 deletions dgraph/cmd/bulk/chunk.go → chunker/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,54 +14,61 @@
* limitations under the License.
*/

package bulk
package chunker

import (
"bufio"
"bytes"
"compress/gzip"
encjson "encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"unicode"

"github.com/dgraph-io/dgraph/edgraph"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/rdf"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/dgo/protos/api"
"github.com/dgraph-io/dgo/x"
"github.com/dgraph-io/dgraph/chunker/json"
"github.com/dgraph-io/dgraph/chunker/rdf"

"github.com/pkg/errors"
)

type chunker interface {
begin(r *bufio.Reader) error
chunk(r *bufio.Reader) (*bytes.Buffer, error)
end(r *bufio.Reader) error
parse(chunkBuf *bytes.Buffer) ([]gql.NQuad, error)
type Chunker interface {
Begin(r *bufio.Reader) error
Chunk(r *bufio.Reader) (*bytes.Buffer, error)
End(r *bufio.Reader) error
Parse(chunkBuf *bytes.Buffer) ([]*api.NQuad, error)
}

type rdfChunker struct{}
type jsonChunker struct{}

const (
rdfInput int = iota
jsonInput
RdfInput int = iota
JsonInput
)

func newChunker(inputFormat int) chunker {
func NewChunker(inputFormat int) Chunker {
switch inputFormat {
case rdfInput:
case RdfInput:
return &rdfChunker{}
case jsonInput:
case JsonInput:
return &jsonChunker{}
default:
panic("unknown loader type")
panic("unknown chunker type")
}
}

func (rdfChunker) begin(r *bufio.Reader) error {
// RDF files don't require any special processing at the beginning of the file.
func (rdfChunker) Begin(r *bufio.Reader) error {
return nil
}

func (rdfChunker) chunk(r *bufio.Reader) (*bytes.Buffer, error) {
func (rdfChunker) Chunk(r *bufio.Reader) (*bytes.Buffer, error) {
batch := new(bytes.Buffer)
batch.Grow(1 << 20)
for lineCount := 0; lineCount < 1e5; lineCount++ {
Expand Down Expand Up @@ -93,63 +100,36 @@ func (rdfChunker) chunk(r *bufio.Reader) (*bytes.Buffer, error) {
return batch, nil
}

func (rdfChunker) end(r *bufio.Reader) error {
return nil
}

func (rdfChunker) parse(chunkBuf *bytes.Buffer) ([]gql.NQuad, error) {
str, readErr := chunkBuf.ReadString('\n')
if readErr != nil && readErr != io.EOF {
x.Check(readErr)
}

nq, parseErr := rdf.Parse(strings.TrimSpace(str))
if parseErr == rdf.ErrEmpty {
return nil, readErr
} else if parseErr != nil {
return nil, errors.Wrapf(parseErr, "while parsing line %q", str)
func (rdfChunker) Parse(chunkBuf *bytes.Buffer) ([]*api.NQuad, error) {
if chunkBuf.Len() == 0 {
return nil, io.EOF
}

return []gql.NQuad{{NQuad: &nq}}, readErr
}

func slurpSpace(r *bufio.Reader) error {
for {
ch, _, err := r.ReadRune()
if err != nil {
return err
nqs := make([]*api.NQuad, 0)
for chunkBuf.Len() > 0 {
str, err := chunkBuf.ReadString('\n')
if err != nil && err != io.EOF {
x.Check(err)
}
if !unicode.IsSpace(ch) {
x.Check(r.UnreadRune())
return nil

nq, err := rdf.Parse(strings.TrimSpace(str))
if err == rdf.ErrEmpty {
continue // blank line or comment
} else if err != nil {
return nil, errors.Wrapf(err, "while parsing line %q", str)
}
nqs = append(nqs, &nq)
}
}

func slurpQuoted(r *bufio.Reader, out *bytes.Buffer) error {
for {
ch, _, err := r.ReadRune()
if err != nil {
return err
}
x.Check2(out.WriteRune(ch))
return nqs, nil
}

if ch == '\\' {
// Pick one more rune.
esc, _, err := r.ReadRune()
if err != nil {
return err
}
x.Check2(out.WriteRune(esc))
continue
}
if ch == '"' {
return nil
}
}
// RDF files don't require any special processing at the end of the file.
func (rdfChunker) End(r *bufio.Reader) error {
return nil
}

func (jsonChunker) begin(r *bufio.Reader) error {
func (jsonChunker) Begin(r *bufio.Reader) error {
// The JSON file to load must be an array of maps (that is, '[ { ... }, { ... }, ... ]').
// This function must be called before calling readJSONChunk for the first time to advance
// the Reader past the array start token ('[') so that calls to readJSONChunk can read
Expand All @@ -167,7 +147,7 @@ func (jsonChunker) begin(r *bufio.Reader) error {
return nil
}

func (jsonChunker) chunk(r *bufio.Reader) (*bytes.Buffer, error) {
func (jsonChunker) Chunk(r *bufio.Reader) (*bytes.Buffer, error) {
out := new(bytes.Buffer)
out.Grow(1 << 20)

Expand Down Expand Up @@ -231,27 +211,103 @@ func (jsonChunker) chunk(r *bufio.Reader) (*bytes.Buffer, error) {
return out, nil
}

func (jsonChunker) end(r *bufio.Reader) error {
func (jsonChunker) Parse(chunkBuf *bytes.Buffer) ([]*api.NQuad, error) {
if chunkBuf.Len() == 0 {
return nil, io.EOF
}

nqs, err := json.Parse(chunkBuf.Bytes(), json.SetNquads)
if err != nil && err != io.EOF {
x.Check(err)
}
chunkBuf.Reset()

return nqs, err
}

func (jsonChunker) End(r *bufio.Reader) error {
if slurpSpace(r) == io.EOF {
return nil
}
return errors.New("Not all of json file consumed")
return errors.New("Not all of JSON file consumed")
}

func (jsonChunker) parse(chunkBuf *bytes.Buffer) ([]gql.NQuad, error) {
if chunkBuf.Len() == 0 {
return nil, io.EOF
func slurpSpace(r *bufio.Reader) error {
for {
ch, _, err := r.ReadRune()
if err != nil {
return err
}
if !unicode.IsSpace(ch) {
x.Check(r.UnreadRune())
return nil
}
}
}

nqs, err := edgraph.NquadsFromJson(chunkBuf.Bytes())
if err != nil && err != io.EOF {
func slurpQuoted(r *bufio.Reader, out *bytes.Buffer) error {
for {
ch, _, err := r.ReadRune()
if err != nil {
return err
}
x.Check2(out.WriteRune(ch))

if ch == '\\' {
// Pick one more rune.
esc, _, err := r.ReadRune()
if err != nil {
return err
}
x.Check2(out.WriteRune(esc))
continue
}
if ch == '"' {
return nil
}
}
}

// FileReader returns an open reader and file on the given file. Gzip-compressed input is detected
// and decompressed automatically even without the gz extension. The caller is responsible for
// calling the returned cleanup function when done with the reader.
func FileReader(file string) (rd *bufio.Reader, cleanup func()) {
f, err := os.Open(file)
x.Check(err)

cleanup = func() { f.Close() }

if filepath.Ext(file) == ".gz" {
gzr, err := gzip.NewReader(f)
x.Check(err)
rd = bufio.NewReader(gzr)
cleanup = func() { f.Close(); gzr.Close() }
} else {
rd = bufio.NewReader(f)
buf, _ := rd.Peek(512)

typ := http.DetectContentType(buf)
if typ == "application/x-gzip" {
gzr, err := gzip.NewReader(rd)
x.Check(err)
rd = bufio.NewReader(gzr)
cleanup = func() { f.Close(); gzr.Close() }
}
}
chunkBuf.Reset()

gqlNq := make([]gql.NQuad, len(nqs))
for i, nq := range nqs {
gqlNq[i] = gql.NQuad{NQuad: nq}
return rd, cleanup
}

// IsJSONData returns true if the reader, which should be at the start of the stream, is reading
// a JSON stream, false otherwise.
func IsJSONData(r *bufio.Reader) (bool, error) {
buf, err := r.Peek(512)
if err != nil && err != io.EOF {
return false, err
}
return gqlNq, err

de := encjson.NewDecoder(bytes.NewReader(buf))
_, err = de.Token()

return err == nil, nil
}
27 changes: 14 additions & 13 deletions dgraph/cmd/bulk/chunk_test.go → chunker/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package bulk

package chunker

import (
"bufio"
Expand Down Expand Up @@ -45,8 +46,8 @@ func TestJSONLoadStart(t *testing.T) {
}

for _, test := range tests {
chunker := newChunker(jsonInput)
require.Error(t, chunker.begin(bufioReader(test.json)), test.desc)
chunker := NewChunker(JsonInput)
require.Error(t, chunker.Begin(bufioReader(test.json)), test.desc)
}
}

Expand All @@ -63,11 +64,11 @@ func TestJSONLoadReadNext(t *testing.T) {
{"[{}", "malformed array"},
}
for _, test := range tests {
chunker := newChunker(jsonInput)
chunker := NewChunker(JsonInput)
reader := bufioReader(test.json)
require.NoError(t, chunker.begin(reader), test.desc)
require.NoError(t, chunker.Begin(reader), test.desc)

json, err := chunker.chunk(reader)
json, err := chunker.Chunk(reader)
//fmt.Fprintf(os.Stderr, "err = %v, json = %v\n", err, json)
require.Nil(t, json, test.desc)
require.Error(t, err, test.desc)
Expand Down Expand Up @@ -112,11 +113,11 @@ func TestJSONLoadSuccessFirst(t *testing.T) {
},
}
for _, test := range tests {
chunker := newChunker(jsonInput)
chunker := NewChunker(JsonInput)
reader := bufioReader(test.json)
require.NoError(t, chunker.begin(reader), test.desc)
require.NoError(t, chunker.Begin(reader), test.desc)

json, err := chunker.chunk(reader)
json, err := chunker.Chunk(reader)
if err == io.EOF {
// pass
} else {
Expand Down Expand Up @@ -175,23 +176,23 @@ func TestJSONLoadSuccessAll(t *testing.T) {
}`,
}

chunker := newChunker(jsonInput)
chunker := NewChunker(JsonInput)
reader := bufioReader(testDoc)

var json *bytes.Buffer
var idx int

err := chunker.begin(reader)
err := chunker.Begin(reader)
require.NoError(t, err, "begin reading JSON document")
for idx = 0; err == nil; idx++ {
desc := fmt.Sprintf("reading chunk #%d", idx+1)
json, err = chunker.chunk(reader)
json, err = chunker.Chunk(reader)
//fmt.Fprintf(os.Stderr, "err = %v, json = %v\n", err, json)
if err != io.EOF {
require.NoError(t, err, desc)
require.Equal(t, testChunks[idx], json.String(), desc)
}
}
err = chunker.end(reader)
err = chunker.End(reader)
require.NoError(t, err, "end reading JSON document")
}
Loading

0 comments on commit a323fdd

Please sign in to comment.