Skip to content

Commit

Permalink
Utilize consistency and consistencytoken protos for zookie support
Browse files Browse the repository at this point in the history
Signed-off-by: Jonathan Marcantonio <jmarcant@redhat.com>

Update info on changed tests

Signed-off-by: Jonathan Marcantonio <jmarcant@redhat.com>
  • Loading branch information
lennysgarage committed Feb 3, 2025
1 parent 687cc61 commit 1e4cfad
Show file tree
Hide file tree
Showing 12 changed files with 353 additions and 316 deletions.
50 changes: 14 additions & 36 deletions api/kessel/relations/v1beta1/common.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions api/kessel/relations/v1beta1/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ message Consistency {
// as found in the ConsistencyToken. More recent data might be used
// if available or faster.
ConsistencyToken at_least_as_fresh = 2;

// All data used in the API call *must* be at the most recent
// snapshot available.
bool fully_consistent = 3;
}
}

Expand Down
4 changes: 2 additions & 2 deletions internal/biz/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *GetSubjectsUsecase) Get(ctx context.Context, req *v1beta1.LookupSubject
subs, errs, err := s.repo.LookupSubjects(ctx, req.SubjectType, subjectRelation, req.Relation, &v1beta1.ObjectReference{
Type: req.Resource.Type,
Id: req.Resource.Id,
}, limit, continuation, req.Zookie)
}, limit, continuation, req.GetConsistency())

if err != nil {
return nil, nil, err
Expand All @@ -72,7 +72,7 @@ func (r *GetResourcesUsecase) Get(ctx context.Context, req *v1beta1.LookupResour
continuation = ContinuationToken(*req.Pagination.ContinuationToken)
}
}
resources, errs, err := r.repo.LookupResources(ctx, req.ResourceType, req.Relation, req.Subject, limit, continuation, req.Zookie)
resources, errs, err := r.repo.LookupResources(ctx, req.ResourceType, req.Relation, req.Subject, limit, continuation, req.GetConsistency())
if err != nil {
return nil, nil, err
}
Expand Down
26 changes: 13 additions & 13 deletions internal/biz/relationships.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,30 @@ type TouchSemantics bool

type ContinuationToken string
type SubjectResult struct {
Subject *v1beta1.SubjectReference
Continuation ContinuationToken
Zookie *v1beta1.Zookie
Subject *v1beta1.SubjectReference
Continuation ContinuationToken
ConsistencyToken *v1beta1.ConsistencyToken
}
type ResourceResult struct {
Resource *v1beta1.ObjectReference
Continuation ContinuationToken
Zookie *v1beta1.Zookie
Resource *v1beta1.ObjectReference
Continuation ContinuationToken
ConsistencyToken *v1beta1.ConsistencyToken
}

type RelationshipResult struct {
Relationship *v1beta1.Relationship
Continuation ContinuationToken
Zookie *v1beta1.Zookie
Relationship *v1beta1.Relationship
Continuation ContinuationToken
ConsistencyToken *v1beta1.ConsistencyToken
}

type ZanzibarRepository interface {
Check(ctx context.Context, request *v1beta1.CheckRequest) (*v1beta1.CheckResponse, error)
CheckForUpdate(ctx context.Context, request *v1beta1.CheckForUpdateRequest) (*v1beta1.CheckForUpdateResponse, error)
CreateRelationships(context.Context, []*v1beta1.Relationship, TouchSemantics) (*v1beta1.CreateTuplesResponse, error)
ReadRelationships(ctx context.Context, filter *v1beta1.RelationTupleFilter, limit uint32, continuation ContinuationToken, zookie *v1beta1.Zookie) (chan *RelationshipResult, chan error, error)
ReadRelationships(ctx context.Context, filter *v1beta1.RelationTupleFilter, limit uint32, continuation ContinuationToken, consistency *v1beta1.Consistency) (chan *RelationshipResult, chan error, error)
DeleteRelationships(context.Context, *v1beta1.RelationTupleFilter) (*v1beta1.DeleteTuplesResponse, error)
LookupSubjects(ctx context.Context, subjectType *v1beta1.ObjectType, subject_relation, relation string, resource *v1beta1.ObjectReference, limit uint32, continuation ContinuationToken, zookie *v1beta1.Zookie) (chan *SubjectResult, chan error, error)
LookupResources(ctx context.Context, resouce_type *v1beta1.ObjectType, relation string, subject *v1beta1.SubjectReference, limit uint32, continuation ContinuationToken, zookie *v1beta1.Zookie) (chan *ResourceResult, chan error, error)
LookupSubjects(ctx context.Context, subjectType *v1beta1.ObjectType, subject_relation, relation string, resource *v1beta1.ObjectReference, limit uint32, continuation ContinuationToken, consistency *v1beta1.Consistency) (chan *SubjectResult, chan error, error)
LookupResources(ctx context.Context, resouce_type *v1beta1.ObjectType, relation string, subject *v1beta1.SubjectReference, limit uint32, continuation ContinuationToken, consistency *v1beta1.Consistency) (chan *ResourceResult, chan error, error)
IsBackendAvailable() error
ImportBulkTuples(stream grpc.ClientStreamingServer[v1beta1.ImportBulkTuplesRequest, v1beta1.ImportBulkTuplesResponse]) error
}
Expand Down Expand Up @@ -105,7 +105,7 @@ func (rc *ReadRelationshipsUsecase) ReadRelationships(ctx context.Context, req *
}
}

relationships, errs, err := rc.repo.ReadRelationships(ctx, req.Filter, limit, continuation, req.Zookie)
relationships, errs, err := rc.repo.ReadRelationships(ctx, req.Filter, limit, continuation, req.GetConsistency())

if err != nil {
return nil, nil, err
Expand Down
4 changes: 2 additions & 2 deletions internal/data/LocalSpiceDbContainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (l *LocalSpiceDbContainer) Close() {
}

// CheckForRelationship returns true if the given subject has the given relationship to the given resource, otherwise false
func CheckForRelationship(client biz.ZanzibarRepository, subjectID string, subjectNamespace string, subjectType string, subjectRelationship string, relationship string, resourceNamespace string, resourceType string, resourceID string, zookie *v1beta1.Zookie) bool {
func CheckForRelationship(client biz.ZanzibarRepository, subjectID string, subjectNamespace string, subjectType string, subjectRelationship string, relationship string, resourceNamespace string, resourceType string, resourceID string, consistency *v1beta1.Consistency) bool {
ctx := context.TODO()

var subjectRelationRef *string = nil //Relation is optional
Expand All @@ -209,7 +209,7 @@ func CheckForRelationship(client biz.ZanzibarRepository, subjectID string, subje
SubjectId: &subjectID,
Relation: subjectRelationRef,
},
}, 1, biz.ContinuationToken(""), zookie)
}, 1, biz.ContinuationToken(""), consistency)

if err != nil {
panic(err)
Expand Down
61 changes: 35 additions & 26 deletions internal/data/spicedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (s *SpiceDbRepository) initialize() error {
return nil
}

func (s *SpiceDbRepository) LookupSubjects(ctx context.Context, subject_type *apiV1beta1.ObjectType, subject_relation, relation string, object *apiV1beta1.ObjectReference, limit uint32, continuation biz.ContinuationToken, zookie *apiV1beta1.Zookie) (chan *biz.SubjectResult, chan error, error) {
func (s *SpiceDbRepository) LookupSubjects(ctx context.Context, subject_type *apiV1beta1.ObjectType, subject_relation, relation string, object *apiV1beta1.ObjectReference, limit uint32, continuation biz.ContinuationToken, consistency *apiV1beta1.Consistency) (chan *biz.SubjectResult, chan error, error) {
if err := s.initialize(); err != nil {
return nil, nil, err
}
Expand All @@ -129,7 +129,7 @@ func (s *SpiceDbRepository) LookupSubjects(ctx context.Context, subject_type *ap
}

req := &v1.LookupSubjectsRequest{
Consistency: s.determineConsistency(zookie),
Consistency: s.determineConsistency(consistency),
Resource: &v1.ObjectReference{
ObjectType: kesselTypeToSpiceDBType(object.Type),
ObjectId: object.Id,
Expand Down Expand Up @@ -176,16 +176,16 @@ func (s *SpiceDbRepository) LookupSubjects(ctx context.Context, subject_type *ap
Id: subj.SubjectObjectId,
},
},
Continuation: continuation,
Zookie: &apiV1beta1.Zookie{Token: msg.GetLookedUpAt().GetToken()},
Continuation: continuation,
ConsistencyToken: &apiV1beta1.ConsistencyToken{Token: msg.GetLookedUpAt().GetToken()},
}
}
}()

return subjects, errs, nil
}

func (s *SpiceDbRepository) LookupResources(ctx context.Context, resouce_type *apiV1beta1.ObjectType, relation string, subject *apiV1beta1.SubjectReference, limit uint32, continuation biz.ContinuationToken, zookie *apiV1beta1.Zookie) (chan *biz.ResourceResult, chan error, error) {
func (s *SpiceDbRepository) LookupResources(ctx context.Context, resouce_type *apiV1beta1.ObjectType, relation string, subject *apiV1beta1.SubjectReference, limit uint32, continuation biz.ContinuationToken, consistency *apiV1beta1.Consistency) (chan *biz.ResourceResult, chan error, error) {
if err := s.initialize(); err != nil {
return nil, nil, err
}
Expand All @@ -197,7 +197,7 @@ func (s *SpiceDbRepository) LookupResources(ctx context.Context, resouce_type *a
}
}
client, err := s.client.LookupResources(ctx, &v1.LookupResourcesRequest{
Consistency: s.determineConsistency(zookie),
Consistency: s.determineConsistency(consistency),
ResourceObjectType: kesselTypeToSpiceDBType(resouce_type),
Permission: relation,
Subject: &v1.SubjectReference{
Expand Down Expand Up @@ -240,8 +240,8 @@ func (s *SpiceDbRepository) LookupResources(ctx context.Context, resouce_type *a
Type: resouce_type,
Id: resId,
},
Continuation: continuation,
Zookie: &apiV1beta1.Zookie{Token: msg.GetLookedUpAt().GetToken()},
Continuation: continuation,
ConsistencyToken: &apiV1beta1.ConsistencyToken{Token: msg.GetLookedUpAt().GetToken()},
}
}
}()
Expand Down Expand Up @@ -324,10 +324,10 @@ func (s *SpiceDbRepository) CreateRelationships(ctx context.Context, rels []*api
return nil, fmt.Errorf("error writing relationships to SpiceDB: %w", err)
}

return &apiV1beta1.CreateTuplesResponse{CreatedAt: &apiV1beta1.Zookie{Token: resp.GetWrittenAt().GetToken()}}, nil
return &apiV1beta1.CreateTuplesResponse{ConsistencyToken: &apiV1beta1.ConsistencyToken{Token: resp.GetWrittenAt().GetToken()}}, nil
}

func (s *SpiceDbRepository) ReadRelationships(ctx context.Context, filter *apiV1beta1.RelationTupleFilter, limit uint32, continuation biz.ContinuationToken, zookie *apiV1beta1.Zookie) (chan *biz.RelationshipResult, chan error, error) {
func (s *SpiceDbRepository) ReadRelationships(ctx context.Context, filter *apiV1beta1.RelationTupleFilter, limit uint32, continuation biz.ContinuationToken, consistency *apiV1beta1.Consistency) (chan *biz.RelationshipResult, chan error, error) {
if err := s.initialize(); err != nil {
return nil, nil, err
}
Expand All @@ -353,7 +353,7 @@ func (s *SpiceDbRepository) ReadRelationships(ctx context.Context, filter *apiV1
}

req := &v1.ReadRelationshipsRequest{
Consistency: s.determineConsistency(zookie),
Consistency: s.determineConsistency(consistency),
RelationshipFilter: relationshipFilter,
OptionalLimit: limit,
OptionalCursor: cursor,
Expand Down Expand Up @@ -401,8 +401,8 @@ func (s *SpiceDbRepository) ReadRelationships(ctx context.Context, filter *apiV1
},
},
},
Continuation: continuation,
Zookie: &apiV1beta1.Zookie{Token: msg.ReadAt.GetToken()},
Continuation: continuation,
ConsistencyToken: &apiV1beta1.ConsistencyToken{Token: msg.ReadAt.GetToken()},
}
}
}()
Expand Down Expand Up @@ -435,7 +435,7 @@ func (s *SpiceDbRepository) DeleteRelationships(ctx context.Context, filter *api
return nil, fmt.Errorf("error invoking DeleteRelationships in SpiceDB %w", err)
}

return &apiV1beta1.DeleteTuplesResponse{DeletedAt: &apiV1beta1.Zookie{Token: resp.GetDeletedAt().GetToken()}}, nil
return &apiV1beta1.DeleteTuplesResponse{ConsistencyToken: &apiV1beta1.ConsistencyToken{Token: resp.GetDeletedAt().GetToken()}}, nil
}

func (s *SpiceDbRepository) Check(ctx context.Context, check *apiV1beta1.CheckRequest) (*apiV1beta1.CheckResponse, error) {
Expand All @@ -456,7 +456,7 @@ func (s *SpiceDbRepository) Check(ctx context.Context, check *apiV1beta1.CheckRe
ObjectId: check.GetResource().GetId(),
}
req := &v1.CheckPermissionRequest{
Consistency: s.determineConsistency(check.Zookie),
Consistency: s.determineConsistency(check.Consistency),
Resource: resource,
Permission: check.GetRelation(),
Subject: subject,
Expand All @@ -468,14 +468,14 @@ func (s *SpiceDbRepository) Check(ctx context.Context, check *apiV1beta1.CheckRe

if checkResponse.Permissionship == v1.CheckPermissionResponse_PERMISSIONSHIP_HAS_PERMISSION {
return &apiV1beta1.CheckResponse{
Allowed: apiV1beta1.CheckResponse_ALLOWED_TRUE,
CheckedAt: &apiV1beta1.Zookie{Token: checkResponse.GetCheckedAt().GetToken()},
Allowed: apiV1beta1.CheckResponse_ALLOWED_TRUE,
ConsistencyToken: &apiV1beta1.ConsistencyToken{Token: checkResponse.GetCheckedAt().GetToken()},
}, nil
}

return &apiV1beta1.CheckResponse{
Allowed: apiV1beta1.CheckResponse_ALLOWED_FALSE,
CheckedAt: &apiV1beta1.Zookie{Token: checkResponse.GetCheckedAt().GetToken()},
Allowed: apiV1beta1.CheckResponse_ALLOWED_FALSE,
ConsistencyToken: &apiV1beta1.ConsistencyToken{Token: checkResponse.GetCheckedAt().GetToken()},
}, nil
}

Expand Down Expand Up @@ -509,14 +509,14 @@ func (s *SpiceDbRepository) CheckForUpdate(ctx context.Context, check *apiV1beta

if checkResponse.Permissionship == v1.CheckPermissionResponse_PERMISSIONSHIP_HAS_PERMISSION {
return &apiV1beta1.CheckForUpdateResponse{
Allowed: apiV1beta1.CheckForUpdateResponse_ALLOWED_TRUE,
CheckedAt: &apiV1beta1.Zookie{Token: checkResponse.GetCheckedAt().GetToken()},
Allowed: apiV1beta1.CheckForUpdateResponse_ALLOWED_TRUE,
ConsistencyToken: &apiV1beta1.ConsistencyToken{Token: checkResponse.GetCheckedAt().GetToken()},
}, nil
}

return &apiV1beta1.CheckForUpdateResponse{
Allowed: apiV1beta1.CheckForUpdateResponse_ALLOWED_FALSE,
CheckedAt: &apiV1beta1.Zookie{Token: checkResponse.GetCheckedAt().GetToken()},
Allowed: apiV1beta1.CheckForUpdateResponse_ALLOWED_FALSE,
ConsistencyToken: &apiV1beta1.ConsistencyToken{Token: checkResponse.GetCheckedAt().GetToken()},
}, nil
}

Expand Down Expand Up @@ -665,19 +665,28 @@ func readFile(file string) (string, error) {
return string(bytes), nil
}

func (s *SpiceDbRepository) determineConsistency(zookie *apiV1beta1.Zookie) *v1.Consistency {
func (s *SpiceDbRepository) determineConsistency(consistency *apiV1beta1.Consistency) *v1.Consistency {
if s.fullyConsistent {
// will ensure that all data used is fully consistent with the latest data available within the SpiceDB datastore.
return &v1.Consistency{Requirement: &v1.Consistency_FullyConsistent{FullyConsistent: true}}
}

if zookie != nil {
if consistency.GetAtLeastAsFresh() != nil {
return &v1.Consistency{
Requirement: &v1.Consistency_AtLeastAsFresh{
AtLeastAsFresh: &v1.ZedToken{Token: zookie.GetToken()},
AtLeastAsFresh: &v1.ZedToken{Token: consistency.GetAtLeastAsFresh().GetToken()},
},
}
}

if consistency.GetMinimizeLatency() {
return &v1.Consistency{
Requirement: &v1.Consistency_MinimizeLatency{
MinimizeLatency: true,
},
}
}

// Default consistency for read APIs is minimize_latency
return &v1.Consistency{
Requirement: &v1.Consistency_MinimizeLatency{
Expand Down
Loading

0 comments on commit 1e4cfad

Please sign in to comment.