Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support import data file from internet #64

Merged
merged 8 commits into from
Apr 21, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*.dll
*.so
*.dylib
nebula-importer

# Test binary, build with `go test -c`
*.test
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
| version | Configuration file version | v1rc2 |
| description | Description of this configure file | "" |
| clientSettings | Graph client settings | - |
| clientSettings.retrying | Number of graph clients retry to execute failed nGQL | 1 |
| clientSettings.retry | Number of graph clients retry to execute failed nGQL | 1 |
| clientSettings.concurrency | Number of graph clients | 4 |
| clientSettings.channelBufferSize | Buffer size of client channels | 128 |
| clientSettings.space | Space name of all data to be inserted | "" |
Expand Down
3 changes: 2 additions & 1 deletion examples/example.ngql
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
DROP SPACE test;
CREATE SPACE test(partition_num=10, replica_factor=1);
CREATE SPACE test(partition_num=5, replica_factor=1);
USE test;
CREATE TAG course(name string, credits int);
CREATE TAG building(name string);
CREATE TAG student(name string, age int, gender string);
CREATE TAG child_mental_health(years string, historical_data string, target string);
CREATE EDGE follow(likeness double);
CREATE EDGE choose(grade int);
SHOW TAGS;
Expand Down
27 changes: 27 additions & 0 deletions examples/example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,30 @@ files:
edge:
name: follow
withRanking: true

- path: https://data.ok.gov/sites/default/files/res_child_mental_health_treatment_w929-ctrc.csv
failDataPath: /tmp/err_child_mental_health_treatment.csv
batchSize: 2
type: csv
csv:
withHeader: false
withLabel: false
delimiter: ","
schema:
type: vertex
vertex:
vid:
index: 0
function: hash
tags:
- name: child_mental_health
props:
- name: years
type: string
index: 0
- name: historical_data
type: string
index: 1
- name: target
type: string
index: 2
4 changes: 4 additions & 0 deletions pkg/base/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@ func IsValidType(t string) bool {
}
return true
}

func HasHttpPrefix(path string) bool {
return strings.HasPrefix(path, "https://") || strings.HasPrefix(path, "http://")
}
43 changes: 27 additions & 16 deletions pkg/cmd/cmd.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package cmd

import (
"errors"
"fmt"
"strings"
"time"

"github.com/vesoft-inc/nebula-importer/pkg/base"
"github.com/vesoft-inc/nebula-importer/pkg/client"
"github.com/vesoft-inc/nebula-importer/pkg/config"
"github.com/vesoft-inc/nebula-importer/pkg/errhandler"
Expand All @@ -13,22 +16,30 @@ import (
)

type Runner struct {
err error
errs []error
Readers []*reader.FileReader
NumFailed int64
}

func (r *Runner) Error() error {
return r.err
if len(r.errs) == 0 {
return nil
}

var msg []string
for _, e := range r.errs {
msg = append(msg, e.Error())
}
return errors.New(strings.Join(msg, "\n"))
}

func (r *Runner) Run(yaml *config.YAMLConfig) {
now := time.Now()
defer func() {
if re := recover(); re != nil {
r.err = fmt.Errorf("%v", re)
r.errs = append(r.errs, fmt.Errorf("%v", re))
} else {
if r.err == nil {
if len(r.errs) == 0 {
logger.Infof("Finish import data, consume time: %.2fs", time.Since(now).Seconds())
}
}
Expand All @@ -41,7 +52,7 @@ func (r *Runner) Run(yaml *config.YAMLConfig) {

clientMgr, err := client.NewNebulaClientMgr(yaml.NebulaClientSettings, statsMgr.StatsCh)
if err != nil {
r.err = err
r.errs = append(r.errs, err)
return
}
defer clientMgr.Close()
Expand All @@ -51,22 +62,24 @@ func (r *Runner) Run(yaml *config.YAMLConfig) {
freaders := make([]*reader.FileReader, len(yaml.Files))

for i, file := range yaml.Files {
// TODO: skip files with error
errCh, err := errHandler.Init(file, clientMgr.GetNumConnections())
if err != nil {
r.err = err
return
r.errs = append(r.errs, err)
statsMgr.StatsCh <- base.NewFileDoneStats(*file.Path)
continue
}

if fr, err := reader.New(i, file, clientMgr.GetRequestChans(), errCh); err != nil {
r.err = err
return
r.errs = append(r.errs, err)
statsMgr.StatsCh <- base.NewFileDoneStats(*file.Path)
continue
} else {
go func() {
go func(fr *reader.FileReader, filename string) {
if err := fr.Read(); err != nil {
logger.Error(err)
r.errs = append(r.errs, err)
statsMgr.StatsCh <- base.NewFileDoneStats(filename)
}
}()
}(fr, *file.Path)
freaders[i] = fr
}
}
Expand All @@ -79,8 +92,6 @@ func (r *Runner) Run(yaml *config.YAMLConfig) {
r.NumFailed = statsMgr.NumFailed

if statsMgr.NumFailed > 0 {
r.err = fmt.Errorf("Total %d lines fail to insert to nebula", statsMgr.NumFailed)
} else {
r.err = nil
r.errs = append(r.errs, fmt.Errorf("Total %d lines fail to insert to nebula", statsMgr.NumFailed))
}
}
21 changes: 15 additions & 6 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"net/url"
"path/filepath"
"strings"

Expand Down Expand Up @@ -201,14 +202,22 @@ func (f *File) validateAndReset(dir, prefix string) error {
if f.Path == nil {
return fmt.Errorf("Please configure file path in: %s.path", prefix)
}
if !base.FileExists(*f.Path) {
path := filepath.Join(dir, *f.Path)
if !base.FileExists(path) {
return fmt.Errorf("File(%s) doesn't exist", *f.Path)
} else {
f.Path = &path

if base.HasHttpPrefix(*f.Path) {
if _, err := url.ParseRequestURI(*f.Path); err != nil {
return err
}
} else {
if !base.FileExists(*f.Path) {
path := filepath.Join(dir, *f.Path)
if !base.FileExists(path) {
return fmt.Errorf("File(%s) doesn't exist", *f.Path)
} else {
f.Path = &path
}
}
}

if f.FailDataPath == nil {
if d, err := filepath.Abs(filepath.Dir(*f.Path)); err != nil {
return err
Expand Down
61 changes: 59 additions & 2 deletions pkg/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ package reader
import (
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"path"
"strings"

"github.com/vesoft-inc/nebula-importer/pkg/base"
Expand Down Expand Up @@ -57,8 +61,61 @@ func (r *FileReader) Stop() {
r.StopFlag = true
}

func extractFilenameFromURL(uri string) (string, error) {
base := path.Base(uri)
if index := strings.Index(base, "?"); index != -1 {
return url.QueryUnescape(base[:index])
} else {
return url.QueryUnescape(base)
}
}

func (r *FileReader) prepareDataFile() (*string, error) {
if !base.HasHttpPrefix(*r.File.Path) {
// This is a local path
return r.File.Path, nil
}

if _, err := url.ParseRequestURI(*r.File.Path); err != nil {
return nil, err
}

// Download data file from internet to `/tmp` directory and return the path
filename, err := extractFilenameFromURL(*r.File.Path)
if err != nil {
return nil, err
}

file, err := ioutil.TempFile("", fmt.Sprintf("*_%s", filename))
if err != nil {
return nil, err
}
defer file.Close()

resp, err := http.Get(*r.File.Path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default http client dont has timeout param. Please set timeout for this http client.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if err != nil {
return nil, err
}
defer resp.Body.Close()

n, err := io.Copy(file, resp.Body)
if err != nil {
return nil, err
}

filepath := file.Name()

logger.Infof("File(%s) has been downloaded to \"%s\", size: %d", *r.File.Path, filepath, n)

return &filepath, nil
}

func (r *FileReader) Read() error {
file, err := os.Open(*r.File.Path)
filePath, err := r.prepareDataFile()
if err != nil {
return err
}
file, err := os.Open(*filePath)
if err != nil {
return err
}
Expand Down Expand Up @@ -95,7 +152,7 @@ func (r *FileReader) Read() error {
}

if err != nil {
logger.Errorf("Fail to read line %d, error: %s", lineNum, err.Error())
logger.Errorf("Fail to read file(%s) line %d, error: %s", *r.File.Path, lineNum, err.Error())
numErrorLines++
}

Expand Down