Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: joshvanl <me@joshvanl.dev>
  • Loading branch information
JoshVanL committed Feb 1, 2024
1 parent a9d2652 commit b4505e8
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 33 deletions.
28 changes: 13 additions & 15 deletions pkg/actors/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,21 +952,19 @@ func (a *actorsRuntime) executeStateStoreTransaction(ctx context.Context, operat

func (a *actorsRuntime) IsActorHosted(ctx context.Context, req *ActorHostedRequest) bool {
key := req.ActorKey()
_, exists := a.actorsTable.Load(key)
return exists
// policyDef := a.resiliency.BuiltInPolicy(resiliency.BuiltInActorNotFoundRetries)
// policyRunner := resiliency.NewRunner[any](ctx, policyDef)
//
// _, err := policyRunner(func(ctx context.Context) (any, error) {
// _, exists := a.actorsTable.Load(key)
// if !exists {
// // Error message isn't used - we just need to have an error
// return nil, errors.New("")
// }
// return nil, nil
// })
//
// return err == nil
policyDef := a.resiliency.BuiltInPolicy(resiliency.BuiltInActorNotFoundRetries)
policyRunner := resiliency.NewRunner[any](ctx, policyDef)

_, err := policyRunner(func(ctx context.Context) (any, error) {
_, exists := a.actorsTable.Load(key)
if !exists {
// Error message isn't used - we just need to have an error
return nil, errors.New("")
}
return nil, nil
})

return err == nil
}

func (a *actorsRuntime) constructActorStateKey(actorKey, key string) string {
Expand Down
27 changes: 13 additions & 14 deletions pkg/actors/placement/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,20 +342,19 @@ func (p *actorPlacement) WaitUntilReady(ctx context.Context) error {
// LookupActor resolves to actor service instance address using consistent hashing table.
func (p *actorPlacement) LookupActor(ctx context.Context, req internal.LookupActorRequest) (internal.LookupActorResponse, error) {
// Retry here to allow placement table dissemination/rebalancing to happen.
// policyDef := p.resiliency.BuiltInPolicy(resiliency.BuiltInActorNotFoundRetries)
// policyRunner := resiliency.NewRunner[internal.LookupActorResponse](ctx, policyDef)
var res internal.LookupActorResponse
rAddr, rAppID, rErr := p.doLookupActor(ctx, req.ActorType, req.ActorID)
if rErr != nil {
return res, fmt.Errorf("error finding address for actor %s/%s: %w", req.ActorType, req.ActorID, rErr)
} else if rAddr == "" {
return res, fmt.Errorf("did not find address for actor %s/%s", req.ActorType, req.ActorID)
}
res.Address = rAddr
res.AppID = rAppID
return res, nil
// return policyRunner(func(ctx context.Context) (res internal.LookupActorResponse, rErr error) {
// })
policyDef := p.resiliency.BuiltInPolicy(resiliency.BuiltInActorNotFoundRetries)
policyRunner := resiliency.NewRunner[internal.LookupActorResponse](ctx, policyDef)
return policyRunner(func(ctx context.Context) (res internal.LookupActorResponse, rErr error) {
rAddr, rAppID, rErr := p.doLookupActor(ctx, req.ActorType, req.ActorID)
if rErr != nil {
return res, fmt.Errorf("error finding address for actor %s/%s: %w", req.ActorType, req.ActorID, rErr)
} else if rAddr == "" {
return res, fmt.Errorf("did not find address for actor %s/%s", req.ActorType, req.ActorID)
}
res.Address = rAddr
res.AppID = rAppID
return res, nil
})
}

func (p *actorPlacement) doLookupActor(ctx context.Context, actorType, actorID string) (string, string, error) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/resiliency/resiliency.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,9 +381,10 @@ func (r *Resiliency) addBuiltInPolicies() {

if _, ok := r.retries[string(BuiltInActorNotFoundRetries)]; !ok {
r.retries[string(BuiltInActorNotFoundRetries)] = &retry.Config{
Policy: retry.PolicyConstant,
MaxRetries: 5,
Duration: time.Second,
Policy: retry.PolicyConstant,
MaxRetries: 5,
Duration: time.Second,
InitialInterval: time.Second,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/framework/util/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
// The returned client will call CloseIdleConnections on test cleanup.
func HTTPClient(t *testing.T) *http.Client {
client := &http.Client{
Timeout: time.Second * 10,
Timeout: time.Second * 20,
Transport: http.DefaultTransport.(*http.Transport).Clone(),
}

Expand Down

0 comments on commit b4505e8

Please sign in to comment.