Skip to content
This repository has been archived by the owner on Nov 25, 2024. It is now read-only.

Only fetch events once for all rooms #3311

Merged
merged 5 commits into from
Jan 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 70 additions & 24 deletions syncapi/streams/stream_pdu.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@ func (p *PDUStreamProvider) IncrementalSync(
req.Log.WithError(err).Error("unable to update event filter with ignored users")
}

dbEvents, err := p.getRecentEvents(ctx, stateDeltas, r, eventFilter, snapshot)
if err != nil {
req.Log.WithError(err).Error("unable to get recent events")
return r.From
}

newPos = from
for _, delta := range stateDeltas {
newRange := r
Expand All @@ -218,7 +224,7 @@ func (p *PDUStreamProvider) IncrementalSync(
}
}
var pos types.StreamPosition
if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req); err != nil {
if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req, dbEvents); err != nil {
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
if err == context.DeadlineExceeded || err == context.Canceled || err == sql.ErrTxDone {
return newPos
Expand All @@ -240,6 +246,66 @@ func (p *PDUStreamProvider) IncrementalSync(
return newPos
}

func (p *PDUStreamProvider) getRecentEvents(ctx context.Context, stateDeltas []types.StateDelta, r types.Range, eventFilter synctypes.RoomEventFilter, snapshot storage.DatabaseTransaction) (map[string]types.RecentEvents, error) {
var roomIDs []string
var newlyJoinedRoomIDs []string
for _, delta := range stateDeltas {
if delta.NewlyJoined {
newlyJoinedRoomIDs = append(newlyJoinedRoomIDs, delta.RoomID)
} else {
roomIDs = append(roomIDs, delta.RoomID)
}
}
dbEvents := make(map[string]types.RecentEvents)
if len(roomIDs) > 0 {
events, err := snapshot.RecentEvents(
ctx, roomIDs, r,
&eventFilter, true, true,
)
if err != nil {
if err != sql.ErrNoRows {
return nil, err
}
}
for k, v := range events {
dbEvents[k] = v
}
}
if len(newlyJoinedRoomIDs) > 0 {
// For rooms that were joined in this sync, try to fetch
// as much timeline events as allowed by the filter.

filter := eventFilter
// If we're going backwards, grep at least X events, this is mostly to satisfy Sytest
if eventFilter.Limit < recentEventBackwardsLimit {
filter.Limit = recentEventBackwardsLimit // TODO: Figure out a better way
diff := r.From - r.To
if diff > 0 && diff < recentEventBackwardsLimit {
filter.Limit = int(diff)
}
}

events, err := snapshot.RecentEvents(
ctx, newlyJoinedRoomIDs, types.Range{
From: r.To,
To: 0,
Backwards: true,
},
&filter, true, true,
)
if err != nil {
if err != sql.ErrNoRows {
return nil, err
}
}
for k, v := range events {
dbEvents[k] = v
}
}

return dbEvents, nil
}

// Limit the recent events to X when going backwards
const recentEventBackwardsLimit = 100

Expand All @@ -253,29 +319,9 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
eventFilter *synctypes.RoomEventFilter,
stateFilter *synctypes.StateFilter,
req *types.SyncRequest,
dbEvents map[string]types.RecentEvents,
) (types.StreamPosition, error) {
var err error
originalLimit := eventFilter.Limit
// If we're going backwards, grep at least X events, this is mostly to satisfy Sytest
if r.Backwards && originalLimit < recentEventBackwardsLimit {
eventFilter.Limit = recentEventBackwardsLimit // TODO: Figure out a better way
diff := r.From - r.To
if diff > 0 && diff < recentEventBackwardsLimit {
eventFilter.Limit = int(diff)
}
}

dbEvents, err := snapshot.RecentEvents(
ctx, []string{delta.RoomID}, r,
eventFilter, true, true,
)
if err != nil {
if err == sql.ErrNoRows {
return r.To, nil
}
return r.From, fmt.Errorf("p.DB.RecentEvents: %w", err)
}

recentStreamEvents := dbEvents[delta.RoomID].Events
limited := dbEvents[delta.RoomID].Limited

Expand Down Expand Up @@ -337,9 +383,9 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
logrus.WithError(err).Error("unable to apply history visibility filter")
}

if r.Backwards && len(events) > originalLimit {
if r.Backwards && len(events) > eventFilter.Limit {
// We're going backwards and the events are ordered chronologically, so take the last `limit` events
events = events[len(events)-originalLimit:]
events = events[len(events)-eventFilter.Limit:]
limited = true
}

Expand Down
Loading