Skip to content

Commit

Permalink
Merge branch 'dev/demo-run-a-simple-flow'
Browse files Browse the repository at this point in the history
  • Loading branch information
dr-dobermann committed Jan 20, 2025
2 parents cf8780f + cafa21c commit a6520c2
Show file tree
Hide file tree
Showing 45 changed files with 732 additions and 487 deletions.
8 changes: 5 additions & 3 deletions internal/eventproc/eventhub/waiters/timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,21 +122,23 @@ func parseEDef(
eDef.Id(), name, err)
}

ctx := context.Background()

switch name {
case "Time":
tw.next, ok = tm.Get().(time.Time)
tw.next, ok = tm.Get(ctx).(time.Time)
if ok && tw.next.Before(time.Now()) {
return fmt.Errorf("couldn't use past time as a timer")
}

case "Cycle":
tw.cyclesLeft, ok = tm.Get().(int)
tw.cyclesLeft, ok = tm.Get(ctx).(int)
if ok && tw.cyclesLeft <= 0 {
return fmt.Errorf("cycle isn't defined")
}

case "Duration":
tw.duration, ok = tm.Get().(time.Duration)
tw.duration, ok = tm.Get(ctx).(time.Duration)
if ok && tw.duration <= 0 {
return fmt.Errorf("duration isn't defined")
}
Expand Down
114 changes: 47 additions & 67 deletions internal/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package instance
import (
"context"
"errors"
"fmt"
"reflect"
"slices"
"strings"
Expand Down Expand Up @@ -128,7 +129,7 @@ type Instance struct {
// Instance's run starting time.
startTime time.Time

Monitors []monitor.Writer
monitors []monitor.Writer
}

// New creates a new Instance from the Snapshot s and sets state to Ready.
Expand Down Expand Up @@ -164,7 +165,7 @@ func New(
parentScope: parentScope,
parentEventProducer: ep,
rr: rr,
Monitors: []monitor.Writer{},
monitors: []monitor.Writer{},
}

if mon != nil {
Expand Down Expand Up @@ -204,15 +205,12 @@ func (inst *Instance) loadProperties(parentScope scope.Scope) error {

inst.rootScope, err = inst.rootScope.Append(inst.s.ProcessName)
if err != nil {
return errs.New(
errs.M("couldn't create Instance Scope data path"),
errs.E(err))
return fmt.Errorf("couldn't create instance's scope data path: %w", err)
}

inst.runtimeScope, err = inst.rootScope.Append(runtimeVars)
if err != nil {
return errs.New(
errs.M("couldn't create Instance Scope runtime variables data path"),
errs.E(err))
return fmt.Errorf("couldn't create instance's scope runtime variables data path: %w", err)
}

inst.scopes[inst.rootScope] = map[string]data.Data{}
Expand Down Expand Up @@ -261,11 +259,12 @@ func (inst *Instance) Run(
errs.C(errorClass, errs.EmptyNotAllowed))
}

if inst.State() != Ready {
if inst.state != Ready {
return errs.New(
errs.M("invalid instance state to run (want: Ready, has: %s)",
errs.M("invalid instance state to run",
inst.state),
errs.C(errorClass, errs.InvalidState))
errs.C(errorClass, errs.InvalidState),
errs.D("current_state", inst.state))
}

inst.m.Lock()
Expand Down Expand Up @@ -299,18 +298,18 @@ func (inst *Instance) Run(

// runTracks runs all tracks of the instance.
func (inst *Instance) runTracks(ctx context.Context) error {
if inst.state != Ready {
return errs.New(
errs.M("invalid instance state to run (want: Ready, have: %s)",
inst.state),
errs.C(errorClass, errs.InvalidState))
}

inst.state = Runned

// run only registered tracks, not created by runned track's forks.
tracks := append([]*track{}, maps.Values(inst.tracks)...)
for _, t := range tracks {
inst.show(
"INSTANCE.RUN",
"track runned",
map[string]any{
"start_node": t.currentStep().node.Name(),
})

inst.wg.Add(1)

go func(t *track) {
Expand All @@ -320,40 +319,36 @@ func (inst *Instance) runTracks(ctx context.Context) error {
}(t)
}

inst.show(
"INSTANCE.RUN",
"all tracks runned",
map[string]any{
"tracks": len(tracks),
})

return nil
}

// addTrack adds a new track into the track pool.
// If instance is running, added track also runs.
func (inst *Instance) addTrack(ctx context.Context, nt *track) error {
if nt == nil {
return errs.New(
errs.M("couldn't add empty track to instance %q", inst.Id()),
errs.C(errorClass, errs.EmptyNotAllowed))
return fmt.Errorf("couldn't add empty track to instance %q", inst.Id())
}

inst.m.Lock()
defer inst.m.Unlock()

if _, ok := inst.tracks[nt.Id()]; ok {
return errs.New(
errs.M("track from node %q(%s) already registered in instance %q",
inst.tracks[nt.Id()].steps[0].node.Name(),
inst.tracks[nt.Id()].steps[0].node.Id(),
inst.Id()),
errs.C(errorClass, errs.DuplicateObject))
return fmt.Errorf("track from node %q(%s) already registered in instance %q",
inst.tracks[nt.Id()].steps[0].node.Name(),
inst.tracks[nt.Id()].steps[0].node.Id(),
inst.Id())
}

inst.tracks[nt.Id()] = nt

if inst.state == Runned {
inst.show(
"INSTANCE.RUN",
"track runned",
map[string]any{
"start_node": nt.currentStep().node.Name(),
})

inst.wg.Add(1)

go func() {
Expand Down Expand Up @@ -399,14 +394,12 @@ func (inst *Instance) addData(path scope.DataPath, dd ...data.Data) error {

for _, d := range dd {
if d == nil {
return errs.New(
errs.M("data is empty"))
return fmt.Errorf("data is empty")
}

dn := strings.TrimSpace(d.Name())
if dn == "" {
return errs.New(
errs.M("couldn't add data with no name"))
return fmt.Errorf("couldn't add data with no name")
}

vv[dn] = d
Expand Down Expand Up @@ -449,17 +442,13 @@ func (inst *Instance) getData(
path, err = path.DropTail()
if err != nil {
return nil,
errs.New(
errs.M("couldn't get upper level for Scope %q:",
path.String()),
errs.E(err))
fmt.Errorf("couldn't get upper level for Scope %q: %w",
path.String(), err)
}
}

return nil,
errs.New(
errs.M("data not found"),
errs.C(errs.ObjectNotFound))
fmt.Errorf("data not found")
}

// getRuntimeVar tries to find the Instance's runtime variable by its name.
Expand All @@ -479,40 +468,31 @@ func (inst *Instance) getRuntimeVar(name string) (data.Data, error) {

default:
return nil,
errs.New(
errs.M("invalid runtime variable name"),
errs.C(errorClass, errs.InvalidParameter),
errs.D("name", name))
fmt.Errorf("invalid runtime variable name %q", name)
}

id, err := data.NewItemDefinition(d)
if err != nil {
return nil,
errs.New(
errs.M("couldn't create an ItemDefinition for runtime variable",
errs.C(errorClass, errs.BulidingFailed)),
errs.E(err),
errs.D("name", name))
fmt.Errorf(
"couldn't create an ItemDefinition for runtime variable %q: %w",
name, err)
}

iae, err := data.NewItemAwareElement(id, data.ReadyDataState)
if err != nil {
return nil,
errs.New(
errs.M("couldn't create an ItemAwareElement for runtime variable",
errs.C(errorClass, errs.BulidingFailed)),
errs.E(err),
errs.D("name", name))
fmt.Errorf(
"couldn't create an ItemAwareElement for runtime variable %q: %w",
name, err)
}

p, err := data.NewParameter(name, iae)
if err != nil {
return nil,
errs.New(
errs.M("couldn't create an ItemDefinition for runtime variable",
errs.C(errorClass, errs.BulidingFailed)),
errs.E(err),
errs.D("name", name))
fmt.Errorf(
"couldn't create an ItemDefinition for runtime variable %q: %w",
name, err)
}

return p, nil
Expand Down Expand Up @@ -599,7 +579,7 @@ func (inst *Instance) show(

inst.monId.Add(1)

for _, m := range inst.Monitors {
for _, m := range inst.monitors {
go func(w monitor.Writer) {
w.Write(&ev)
}(m)
Expand Down Expand Up @@ -987,8 +967,8 @@ func (inst *Instance) RegisterWriter(m monitor.Writer) {
inst.m.Lock()
defer inst.m.Unlock()

if idx := slices.Index(inst.Monitors, m); idx == -1 {
inst.Monitors = append(inst.Monitors, m)
if idx := slices.Index(inst.monitors, m); idx == -1 {
inst.monitors = append(inst.monitors, m)
}
}

Expand Down
12 changes: 7 additions & 5 deletions internal/instance/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,19 @@ func TestMonitoring(t *testing.T) {
_, err = inst.GetData(rvs, "INVALID_NAME")
require.Error(t, err)

ctx := context.Background()

tc, err := inst.GetData(rvs, instance.TracksCount)
require.NoError(t, err)
require.Equal(t, 1, tc.Value().Get().(int))
require.Equal(t, 1, tc.Value().Get(ctx).(int))

st, err := inst.GetData(rvs, instance.CurrState)
require.NoError(t, err)
require.Equal(t, instance.Ready, st.Value().Get().(instance.State))
require.Equal(t, instance.Ready, st.Value().Get(ctx).(instance.State))

start, err := inst.GetData(rvs, instance.StartedAt)
require.NoError(t, err)
require.True(t, start.Value().Get().(time.Time).IsZero())
require.True(t, start.Value().Get(ctx).(time.Time).IsZero())

ctx, cancel := context.WithCancel(context.Background())

Expand Down Expand Up @@ -131,7 +133,7 @@ func getSnapshot(pname string) (*snapshot.Snapshot, error) {
}

helloFunc, err := gooper.New(
func(in *data.ItemDefinition) (*data.ItemDefinition, error) {
func(ctx context.Context, in *data.ItemDefinition) (*data.ItemDefinition, error) {
const inId = "user_name"

if in == nil {
Expand All @@ -150,7 +152,7 @@ func getSnapshot(pname string) (*snapshot.Snapshot, error) {
errs.D("got_id", in.Id()))
}

userName, ok := in.Structure().Get().(string)
userName, ok := in.Structure().Get(context.Background()).(string)
if !ok {
return nil,
errs.New(
Expand Down
Loading

0 comments on commit a6520c2

Please sign in to comment.