Skip to content

Commit 3afe34c

Browse files
fix live loader and add tests for dropall, drop namespace, live load
1 parent 5b69c8b commit 3afe34c

File tree

11 files changed

+990
-370
lines changed

11 files changed

+990
-370
lines changed

chunker/rdf_parser.go

+1
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,7 @@ var typeMap = map[string]types.TypeID{
366366
"xs:float": types.FloatID,
367367
"xs:base64Binary": types.BinaryID,
368368
"geo:geojson": types.GeoID,
369+
"xs:[]float32": types.VFloatID,
369370
"http://www.w3.org/2001/XMLSchema#string": types.StringID,
370371
"http://www.w3.org/2001/XMLSchema#dateTime": types.DateTimeID,
371372
"http://www.w3.org/2001/XMLSchema#date": types.DateTimeID,

dgraphtest/cluster.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,7 @@ loop2:
528528
return errors.Errorf("restore wasn't started on at least 1 alpha")
529529
}
530530

531-
func (hc *HTTPClient) Export(dest string, namespace int) error {
531+
func (hc *HTTPClient) Export(dest, format string, namespace int) error {
532532
const exportRequest = `mutation export($dest: String!, $f: String!, $ns: Int) {
533533
export(input: {destination: $dest, format: $f, namespace: $ns}) {
534534
response {
@@ -540,7 +540,7 @@ func (hc *HTTPClient) Export(dest string, namespace int) error {
540540
Query: exportRequest,
541541
Variables: map[string]interface{}{
542542
"dest": dest,
543-
"f": "rdf",
543+
"f": format,
544544
"ns": namespace,
545545
},
546546
}

dgraphtest/load.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ func (c *LocalCluster) LiveLoadFromExport(exportDir string) error {
386386
}()
387387

388388
// .rdf.gz, .schema.gz,.gql_schema.gz
389-
var rdfFiles, schemaFiles, gqlSchemaFiles []string
389+
var rdfFiles, schemaFiles, gqlSchemaFiles, jsonFiles []string
390390
tr := tar.NewReader(ts)
391391
for {
392392
header, err := tr.Next()
@@ -404,6 +404,8 @@ func (c *LocalCluster) LiveLoadFromExport(exportDir string) error {
404404
switch {
405405
case strings.HasSuffix(fileName, ".rdf.gz"):
406406
rdfFiles = append(rdfFiles, hostFile)
407+
case strings.HasSuffix(fileName, ".json.gz"):
408+
jsonFiles = append(jsonFiles, hostFile)
407409
case strings.HasSuffix(fileName, ".schema.gz"):
408410
schemaFiles = append(schemaFiles, hostFile)
409411
case strings.HasSuffix(fileName, ".gql_schema.gz"):
@@ -441,10 +443,14 @@ func (c *LocalCluster) LiveLoadFromExport(exportDir string) error {
441443
}
442444

443445
opts := LiveOpts{
444-
DataFiles: rdfFiles,
445446
SchemaFiles: schemaFiles,
446447
GqlSchemaFiles: gqlSchemaFiles,
447448
}
449+
if len(rdfFiles) == 0 {
450+
opts.DataFiles = jsonFiles
451+
} else {
452+
opts.DataFiles = rdfFiles
453+
}
448454
if err := c.LiveLoad(opts); err != nil {
449455
return errors.Wrapf(err, "error running live loader: %v", err)
450456
}

dgraphtest/local_cluster.go

+21-1
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,7 @@ func (c *LocalCluster) Upgrade(version string, strategy UpgradeStrategy) error {
666666
}
667667
}
668668
// using -1 as namespace exports all the namespaces
669-
if err := hc.Export(DefaultExportDir, -1); err != nil {
669+
if err := hc.Export(DefaultExportDir, "rdf", -1); err != nil {
670670
return errors.Wrap(err, "error taking export during upgrade")
671671
}
672672
if err := c.Stop(); err != nil {
@@ -747,6 +747,26 @@ func (c *LocalCluster) Client() (*GrpcClient, func(), error) {
747747
return &GrpcClient{Dgraph: client}, cleanup, nil
748748
}
749749

750+
func (c *LocalCluster) AlphaClient(id int) (*GrpcClient, func(), error) {
751+
alpha := c.alphas[id]
752+
url, err := alpha.alphaURL(c)
753+
if err != nil {
754+
return nil, nil, errors.Wrap(err, "error getting health URL")
755+
}
756+
conn, err := grpc.Dial(url, grpc.WithTransportCredentials(insecure.NewCredentials()))
757+
if err != nil {
758+
return nil, nil, errors.Wrap(err, "error connecting to alpha")
759+
}
760+
761+
client := dgo.NewDgraphClient(api.NewDgraphClient(conn))
762+
cleanup := func() {
763+
if err := conn.Close(); err != nil {
764+
log.Printf("[WARNING] error closing connection: %v", err)
765+
}
766+
}
767+
return &GrpcClient{Dgraph: client}, cleanup, nil
768+
}
769+
750770
// HTTPClient creates an HTTP client
751771
func (c *LocalCluster) HTTPClient() (*HTTPClient, error) {
752772
adminURL, err := c.serverURL("alpha", "/admin")

dgraphtest/vector.go

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright 2023 Dgraph Labs, Inc. and Contributors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package dgraphtest
18+
19+
import (
20+
"encoding/json"
21+
"fmt"
22+
"math/rand"
23+
"strings"
24+
25+
"github.com/dgraph-io/dgo/v230/protos/api"
26+
)
27+
28+
func GenerateRandomVector(size int) []float32 {
29+
vector := make([]float32, size)
30+
for i := 0; i < size; i++ {
31+
vector[i] = rand.Float32() * 10
32+
}
33+
return vector
34+
}
35+
36+
func formatVector(label string, vector []float32, index int) string {
37+
vectorString := fmt.Sprintf(`"[%s]"`, strings.Trim(strings.Join(strings.Fields(fmt.Sprint(vector)), ", "), "[]"))
38+
return fmt.Sprintf("<0x%x> <%s> %s . \n", index+10, label, vectorString)
39+
}
40+
41+
func GenerateRandomVectors(lowerLimit, uppermLimit, vectorSize int, label string) (string, [][]float32) {
42+
var builder strings.Builder
43+
var vectors [][]float32
44+
// builder.WriteString("`")
45+
for i := lowerLimit; i < uppermLimit; i++ {
46+
randomVector := GenerateRandomVector(vectorSize)
47+
vectors = append(vectors, randomVector)
48+
formattedVector := formatVector(label, randomVector, i)
49+
builder.WriteString(formattedVector)
50+
}
51+
52+
return builder.String(), vectors
53+
}
54+
55+
func (gc *GrpcClient) QueryMultipleVectorsUsingSimilarTo(vector []float32, pred string, topK int) ([][]float32, error) {
56+
vectorQuery := fmt.Sprintf(`
57+
{
58+
vector(func: similar_to(%v, %v, "%v")) {
59+
uid
60+
%v
61+
}
62+
}`, pred, topK, vector, pred)
63+
resp, err := gc.Query(vectorQuery)
64+
65+
if err != nil {
66+
return [][]float32{}, err
67+
}
68+
69+
return UnmarshalVectorResp(resp)
70+
}
71+
72+
func (gc *GrpcClient) QuerySingleVectorsUsingUid(uid, pred string) ([][]float32, error) {
73+
vectorQuery := fmt.Sprintf(`
74+
{
75+
vector(func: uid(%v)) {
76+
uid
77+
%v
78+
}
79+
}`, uid[1:len(uid)-1], pred)
80+
81+
resp, err := gc.Query(vectorQuery)
82+
if err != nil {
83+
return [][]float32{}, err
84+
}
85+
86+
return UnmarshalVectorResp(resp)
87+
}
88+
89+
func UnmarshalVectorResp(resp *api.Response) ([][]float32, error) {
90+
type Data struct {
91+
Vector []struct {
92+
UID string `json:"uid"`
93+
ProjectDescriptionV []float32 `json:"project_discription_v"`
94+
} `json:"vector"`
95+
}
96+
var data Data
97+
if err := json.Unmarshal([]byte(resp.Json), &data); err != nil {
98+
return nil, err
99+
}
100+
101+
var vectors [][]float32
102+
for _, item := range data.Vector {
103+
vectors = append(vectors, item.ProjectDescriptionV)
104+
}
105+
return vectors, nil
106+
}

protos/pb.proto

+2
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,8 @@ message SchemaNode {
483483
bool lang = 9;
484484
bool no_conflict = 10;
485485
bool unique = 11;
486+
repeated VectorIndexSpec index_specs = 12;
487+
486488
}
487489

488490
message SchemaResult {

0 commit comments

Comments
 (0)