Skip to content

Commit

Permalink
Introduce dedicated event publisher per document (#1052)
Browse files Browse the repository at this point in the history
Before:

- PushPullChanges and WatchDocument routines directly published events
- Event publishing was not controllable per document
- Network issues could block event publishing routines

This change introduces a dedicated event publisher per document.

- Centralize event publishing control
- Prevent publishing routines from being blocked
- Future improvements: Event throttling, Duplicate event prevention

---------

Co-authored-by: Yourim Cha <yourim.cha@navercorp.com>
  • Loading branch information
hackerwins and chacha912 authored Nov 1, 2024
1 parent 4485a28 commit 6c15a3b
Show file tree
Hide file tree
Showing 11 changed files with 451 additions and 84 deletions.
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document, options ...
c.attachments[doc.Key()].watchCtx = watchCtx
c.attachments[doc.Key()].closeWatchStream = cancelFunc

if !opts.IsManual {
if opts.IsRealtime {
err = c.runWatchLoop(watchCtx, doc)
if err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type AttachOptions struct {
// Presence is the presence of the client.
Presence innerpresence.Presence
InitialRoot map[string]any
IsManual bool
IsRealtime bool
}

// WithPresence configures the presence of the client.
Expand All @@ -111,9 +111,9 @@ func WithInitialRoot(root map[string]any) AttachOption {
}
}

// WithManualSync configures the manual sync of the client.
func WithManualSync() AttachOption {
return func(o *AttachOptions) { o.IsManual = true }
// WithRealtimeSync configures the manual sync of the client.
func WithRealtimeSync() AttachOption {
return func(o *AttachOptions) { o.IsRealtime = true }
}

// DetachOption configures DetachOptions.
Expand Down
131 changes: 131 additions & 0 deletions server/backend/sync/memory/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2024 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package memory

import (
"strconv"
gosync "sync"
"sync/atomic"
time "time"

"go.uber.org/zap"

"github.com/yorkie-team/yorkie/server/backend/sync"
"github.com/yorkie-team/yorkie/server/logging"
)

var id loggerID

type loggerID int32

func (c *loggerID) next() string {
next := atomic.AddInt32((*int32)(c), 1)
return "p" + strconv.Itoa(int(next))
}

// BatchPublisher is a publisher that publishes events in batch.
type BatchPublisher struct {
logger *zap.SugaredLogger
mutex gosync.Mutex
events []sync.DocEvent

window time.Duration
closeChan chan struct{}
subs *Subscriptions
}

// NewBatchPublisher creates a new BatchPublisher instance.
func NewBatchPublisher(subs *Subscriptions, window time.Duration) *BatchPublisher {
bp := &BatchPublisher{
logger: logging.New(id.next()),
window: window,
closeChan: make(chan struct{}),
subs: subs,
}

go bp.processLoop()
return bp
}

// Publish adds the given event to the batch. If the batch is full, it publishes
// the batch.
func (bp *BatchPublisher) Publish(event sync.DocEvent) {
bp.mutex.Lock()
defer bp.mutex.Unlock()

// TODO(hackerwins): If DocumentChangedEvent is already in the batch, we don't
// need to add it again.
bp.events = append(bp.events, event)
}

func (bp *BatchPublisher) processLoop() {
ticker := time.NewTicker(bp.window)
defer ticker.Stop()

for {
select {
case <-ticker.C:
bp.publish()
case <-bp.closeChan:
return
}
}
}

func (bp *BatchPublisher) publish() {
bp.mutex.Lock()

if len(bp.events) == 0 {
bp.mutex.Unlock()
return
}

events := bp.events
bp.events = nil

bp.mutex.Unlock()

if logging.Enabled(zap.DebugLevel) {
bp.logger.Infof(
"Publishing batch of %d events for document %s",
len(bp.events),
bp.subs.docKey,
)
}

for _, sub := range bp.subs.Values() {
for _, event := range events {
if sub.Subscriber().Compare(event.Publisher) == 0 {
continue
}

if ok := sub.Publish(event); !ok {
bp.logger.Infof(
"Publish(%s,%s) to %s timeout or closed",
event.Type,
event.Publisher,
sub.Subscriber(),
)
}
}
}
}

// Close stops the batch publisher
func (bp *BatchPublisher) Close() {
close(bp.closeChan)
}
50 changes: 14 additions & 36 deletions server/backend/sync/memory/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package memory

import (
"context"
gotime "time"

"go.uber.org/zap"

Expand All @@ -32,13 +33,16 @@ import (
type Subscriptions struct {
docKey types.DocRefKey
internalMap *cmap.Map[string, *sync.Subscription]
publisher *BatchPublisher
}

func newSubscriptions(docKey types.DocRefKey) *Subscriptions {
return &Subscriptions{
s := &Subscriptions{
docKey: docKey,
internalMap: cmap.New[string, *sync.Subscription](),
}
s.publisher = NewBatchPublisher(s, 100*gotime.Millisecond)
return s
}

// Set adds the given subscription.
Expand All @@ -52,40 +56,8 @@ func (s *Subscriptions) Values() []*sync.Subscription {
}

// Publish publishes the given event.
func (s *Subscriptions) Publish(ctx context.Context, event sync.DocEvent) {
// TODO(hackerwins): Introduce batch publish to reduce lock contention.
// Problem:
// - High lock contention when publishing events frequently.
// - Redundant events being published in short time windows.
// Solution:
// - Collect events to publish in configurable time window.
// - Keep only the latest event for the same event type.
// - Run dedicated publish loop in a single goroutine.
// - Batch publish collected events when the time window expires.
for _, sub := range s.internalMap.Values() {
if sub.Subscriber().Compare(event.Publisher) == 0 {
continue
}

if logging.Enabled(zap.DebugLevel) {
logging.From(ctx).Debugf(
`Publish %s(%s,%s) to %s`,
event.Type,
s.docKey,
event.Publisher,
sub.Subscriber(),
)
}

if ok := sub.Publish(event); !ok {
logging.From(ctx).Warnf(
`Publish(%s,%s) to %s timeout or closed`,
s.docKey,
event.Publisher,
sub.Subscriber(),
)
}
}
func (s *Subscriptions) Publish(event sync.DocEvent) {
s.publisher.Publish(event)
}

// Delete deletes the subscription of the given id.
Expand All @@ -103,6 +75,11 @@ func (s *Subscriptions) Len() int {
return s.internalMap.Len()
}

// Close closes the subscriptions.
func (s *Subscriptions) Close() {
s.publisher.Close()
}

// PubSub is the memory implementation of PubSub, used for single server.
type PubSub struct {
subscriptionsMap *cmap.Map[types.DocRefKey, *Subscriptions]
Expand Down Expand Up @@ -170,6 +147,7 @@ func (m *PubSub) Unsubscribe(

if subs.Len() == 0 {
m.subscriptionsMap.Delete(docKey, func(subs *Subscriptions, exists bool) bool {
subs.Close()
return exists
})
}
Expand Down Expand Up @@ -200,7 +178,7 @@ func (m *PubSub) Publish(
}

if subs, ok := m.subscriptionsMap.Get(docKey); ok {
subs.Publish(ctx, event)
subs.Publish(event)
}

if logging.Enabled(zap.DebugLevel) {
Expand Down
9 changes: 7 additions & 2 deletions server/backend/sync/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ import (
"github.com/yorkie-team/yorkie/pkg/document/time"
)

const (
// publishTimeout is the timeout for publishing an event.
publishTimeout = 100 * gotime.Millisecond
)

// Subscription represents a subscription of a subscriber to documents.
type Subscription struct {
id string
Expand Down Expand Up @@ -88,12 +93,12 @@ func (s *Subscription) Publish(event DocEvent) bool {
return false
}

// NOTE: When a subscription is being closed by a subscriber,
// NOTE(hackerwins): When a subscription is being closed by a subscriber,
// the subscriber may not receive messages.
select {
case s.Events() <- event:
return true
case <-gotime.After(100 * gotime.Millisecond):
case <-gotime.After(publishTimeout):
return false
}
}
4 changes: 2 additions & 2 deletions test/bench/grpc_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func BenchmarkRPC(b *testing.B) {
ctx := context.Background()

d1 := document.New(helper.TestDocKey(b))
err := c1.Attach(ctx, d1)
err := c1.Attach(ctx, d1, client.WithRealtimeSync())
assert.NoError(b, err)
testKey1 := "testKey1"
err = d1.Update(func(root *json.Object, p *presence.Presence) error {
Expand All @@ -212,7 +212,7 @@ func BenchmarkRPC(b *testing.B) {
assert.NoError(b, err)

d2 := document.New(helper.TestDocKey(b))
err = c2.Attach(ctx, d2)
err = c2.Attach(ctx, d2, client.WithRealtimeSync())
assert.NoError(b, err)
testKey2 := "testKey2"
err = d2.Update(func(root *json.Object, p *presence.Presence) error {
Expand Down
2 changes: 1 addition & 1 deletion test/integration/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestAdmin(t *testing.T) {

// 01. c1 attaches and watches d1.
d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))
assert.NoError(t, c1.Attach(ctx, d1, client.WithRealtimeSync()))
wg := sync.WaitGroup{}
wg.Add(1)
rch, cancel, err := c1.Subscribe(d1)
Expand Down
2 changes: 1 addition & 1 deletion test/integration/auth_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func TestProjectAuthWebhook(t *testing.T) {
assert.NoError(t, err)

doc := document.New(helper.TestDocKey(t))
err = cli.Attach(ctx, doc)
err = cli.Attach(ctx, doc, client.WithRealtimeSync())
assert.Equal(t, connect.CodeUnauthenticated, connect.CodeOf(err))

_, _, err = cli.Subscribe(doc)
Expand Down
Loading

0 comments on commit 6c15a3b

Please sign in to comment.