Skip to content
This repository has been archived by the owner on Nov 25, 2024. It is now read-only.

Commit

Permalink
Only fetch events once for all rooms (#3311)
Browse files Browse the repository at this point in the history
This refactors `PDUStreamProvider` a bit so that it doesn't trigger a
database query per room, but instead utilizes the fact that it's
possible to bulk query. This improves sync performance significantly
when you have 1000s of rooms.

### Pull Request Checklist

<!-- Please read
https://matrix-org.github.io/dendrite/development/contributing before
submitting your pull request -->

* [x] I have added Go unit tests or [Complement integration
tests](https://github.com/matrix-org/complement) for this PR _or_ I have
justified why this PR doesn't need tests
* [x] Pull request includes a [sign off below using a legally
identifiable
name](https://matrix-org.github.io/dendrite/development/contributing#sign-off)
_or_ I have already signed off privately

Signed-off-by: `Joakim Recht <joakim@beyondwork.ai>`
  • Loading branch information
recht authored Jan 25, 2024
1 parent d58daf9 commit 00217a6
Showing 1 changed file with 70 additions and 24 deletions.
94 changes: 70 additions & 24 deletions syncapi/streams/stream_pdu.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@ func (p *PDUStreamProvider) IncrementalSync(
req.Log.WithError(err).Error("unable to update event filter with ignored users")
}

dbEvents, err := p.getRecentEvents(ctx, stateDeltas, r, eventFilter, snapshot)
if err != nil {
req.Log.WithError(err).Error("unable to get recent events")
return r.From
}

newPos = from
for _, delta := range stateDeltas {
newRange := r
Expand All @@ -218,7 +224,7 @@ func (p *PDUStreamProvider) IncrementalSync(
}
}
var pos types.StreamPosition
if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req); err != nil {
if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req, dbEvents); err != nil {
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
if err == context.DeadlineExceeded || err == context.Canceled || err == sql.ErrTxDone {
return newPos
Expand All @@ -240,6 +246,66 @@ func (p *PDUStreamProvider) IncrementalSync(
return newPos
}

func (p *PDUStreamProvider) getRecentEvents(ctx context.Context, stateDeltas []types.StateDelta, r types.Range, eventFilter synctypes.RoomEventFilter, snapshot storage.DatabaseTransaction) (map[string]types.RecentEvents, error) {
var roomIDs []string
var newlyJoinedRoomIDs []string
for _, delta := range stateDeltas {
if delta.NewlyJoined {
newlyJoinedRoomIDs = append(newlyJoinedRoomIDs, delta.RoomID)
} else {
roomIDs = append(roomIDs, delta.RoomID)
}
}
dbEvents := make(map[string]types.RecentEvents)
if len(roomIDs) > 0 {
events, err := snapshot.RecentEvents(
ctx, roomIDs, r,
&eventFilter, true, true,
)
if err != nil {
if err != sql.ErrNoRows {
return nil, err
}
}
for k, v := range events {
dbEvents[k] = v
}
}
if len(newlyJoinedRoomIDs) > 0 {
// For rooms that were joined in this sync, try to fetch
// as much timeline events as allowed by the filter.

filter := eventFilter
// If we're going backwards, grep at least X events, this is mostly to satisfy Sytest
if eventFilter.Limit < recentEventBackwardsLimit {
filter.Limit = recentEventBackwardsLimit // TODO: Figure out a better way
diff := r.From - r.To
if diff > 0 && diff < recentEventBackwardsLimit {
filter.Limit = int(diff)
}
}

events, err := snapshot.RecentEvents(
ctx, newlyJoinedRoomIDs, types.Range{
From: r.To,
To: 0,
Backwards: true,
},
&filter, true, true,
)
if err != nil {
if err != sql.ErrNoRows {
return nil, err
}
}
for k, v := range events {
dbEvents[k] = v
}
}

return dbEvents, nil
}

// Limit the recent events to X when going backwards
const recentEventBackwardsLimit = 100

Expand All @@ -253,29 +319,9 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
eventFilter *synctypes.RoomEventFilter,
stateFilter *synctypes.StateFilter,
req *types.SyncRequest,
dbEvents map[string]types.RecentEvents,
) (types.StreamPosition, error) {
var err error
originalLimit := eventFilter.Limit
// If we're going backwards, grep at least X events, this is mostly to satisfy Sytest
if r.Backwards && originalLimit < recentEventBackwardsLimit {
eventFilter.Limit = recentEventBackwardsLimit // TODO: Figure out a better way
diff := r.From - r.To
if diff > 0 && diff < recentEventBackwardsLimit {
eventFilter.Limit = int(diff)
}
}

dbEvents, err := snapshot.RecentEvents(
ctx, []string{delta.RoomID}, r,
eventFilter, true, true,
)
if err != nil {
if err == sql.ErrNoRows {
return r.To, nil
}
return r.From, fmt.Errorf("p.DB.RecentEvents: %w", err)
}

recentStreamEvents := dbEvents[delta.RoomID].Events
limited := dbEvents[delta.RoomID].Limited

Expand Down Expand Up @@ -337,9 +383,9 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
logrus.WithError(err).Error("unable to apply history visibility filter")
}

if r.Backwards && len(events) > originalLimit {
if r.Backwards && len(events) > eventFilter.Limit {
// We're going backwards and the events are ordered chronologically, so take the last `limit` events
events = events[len(events)-originalLimit:]
events = events[len(events)-eventFilter.Limit:]
limited = true
}

Expand Down

0 comments on commit 00217a6

Please sign in to comment.