Skip to content

Commit

Permalink
feat(sync) configurable artificial delay for cassandra deployments
Browse files Browse the repository at this point in the history
decK first creates all the services and then creates related routes.
With a distributed like Cassandra, route creation happens before
the DB rows for new services are propagated to other nodes in the
cassandra cluster. This results in a 409 from kong because kong ensures
that service specified in a route is actually present in the db (c* has
no notion of foreign relations, kong does a read on the service to
verify and enforce foreign key relations).

With the new configurable flag, users of cassandra can inject an
aritifical between insert operations of related entities to avoid failures
described above. The use of the flag is discouraged and is only
implemented to provide a stop-gap solution.

Fix #160
Fix #154
  • Loading branch information
hbagdi authored and Travis Raines committed Apr 21, 2021
1 parent bf4ba54 commit 41061c9
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 7 deletions.
3 changes: 2 additions & 1 deletion cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func checkWorkspace(config utils.KongClientConfig) error {
return nil
}

func syncMain(filenames []string, dry bool, parallelism int) error {
func syncMain(filenames []string, dry bool, parallelism, delay int) error {

// load Kong version before workspace
kongVersion, err := kongVersion(config)
Expand Down Expand Up @@ -118,6 +118,7 @@ func syncMain(filenames []string, dry bool, parallelism int) error {
}

s, _ := diff.NewSyncer(currentState, targetState)
s.StageDelaySec = delay
stats, errs := solver.Solve(stopChannel, s, client, parallelism, dry)
if errs != nil {
return utils.ErrArray{Errors: errs}
Expand Down
2 changes: 1 addition & 1 deletion cmd/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ that will be created or updated or deleted.
`,
Args: validateNoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
return sync(diffCmdKongStateFile, true)
return syncMain(diffCmdKongStateFile, true, diffCmdParallelism, 0)
},
PreRunE: func(cmd *cobra.Command, args []string) error {
if len(diffCmdKongStateFile) == 0 {
Expand Down
7 changes: 6 additions & 1 deletion cmd/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
var (
syncCmdKongStateFile []string
syncCmdParallelism int
syncCmdDBUpdateDelay int
)

// syncCmd represents the sync command
Expand All @@ -19,7 +20,7 @@ var syncCmd = &cobra.Command{
to get Kong's state in sync with the input state.`,
Args: validateNoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
return sync(syncCmdKongStateFile, false)
return syncMain(syncCmdKongStateFile, false, syncCmdParallelism, syncCmdDBUpdateDelay)
},
PreRunE: func(cmd *cobra.Command, args []string) error {
if len(syncCmdKongStateFile) == 0 {
Expand All @@ -45,4 +46,8 @@ func init() {
"select-tag", []string{},
"only entities matching tags specified via this flag are synced.\n"+
"Multiple tags are ANDed together.")
syncCmd.Flags().IntVar(&syncCmdDBUpdateDelay, "db-update-propagation-delay",
0, "aritificial delay in seconds that is injected between insert operations \n"+
"for related entities (usually for cassandra deployments).\n"+
"See 'db_update_propagation' in kong.conf.")
}
10 changes: 6 additions & 4 deletions diff/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ type Syncer struct {
errChan chan error
stopChan chan struct{}

InFlightOps int32
inFlightOps int32

SilenceWarnings bool
StageDelaySec int

once sync.Once
}
Expand Down Expand Up @@ -253,7 +254,7 @@ func (sc *Syncer) createUpdate() error {
}

func (sc *Syncer) queueEvent(e Event) error {
atomic.AddInt32(&sc.InFlightOps, 1)
atomic.AddInt32(&sc.inFlightOps, 1)
select {
case sc.eventChan <- e:
return nil
Expand All @@ -263,11 +264,12 @@ func (sc *Syncer) queueEvent(e Event) error {
}

func (sc *Syncer) eventCompleted() {
atomic.AddInt32(&sc.InFlightOps, -1)
atomic.AddInt32(&sc.inFlightOps, -1)
}

func (sc *Syncer) wait() {
for atomic.LoadInt32(&sc.InFlightOps) != 0 {
time.Sleep(time.Duration(sc.StageDelaySec) * time.Second)
for atomic.LoadInt32(&sc.inFlightOps) != 0 {
// TODO hack?
time.Sleep(1 * time.Millisecond)
}
Expand Down

0 comments on commit 41061c9

Please sign in to comment.