From 646b4c1bf559df6f17a4f9ea55be0179ee86135e Mon Sep 17 00:00:00 2001 From: Joakim Recht Date: Mon, 22 Jan 2024 13:19:32 +0100 Subject: [PATCH 1/5] Only fetch events once for all rooms --- syncapi/streams/stream_pdu.go | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 3abb0b3c6d..303148ba22 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -203,6 +203,21 @@ func (p *PDUStreamProvider) IncrementalSync( req.Log.WithError(err).Error("unable to update event filter with ignored users") } + roomIDs := make([]string, len(stateDeltas)) + for i := range stateDeltas { + roomIDs[i] = stateDeltas[i].RoomID + } + dbEvents, err := snapshot.RecentEvents( + ctx, roomIDs, r, + &eventFilter, true, true, + ) + if err != nil { + if err == sql.ErrNoRows { + return r.To + } + return r.From + } + newPos = from for _, delta := range stateDeltas { newRange := r @@ -218,7 +233,7 @@ func (p *PDUStreamProvider) IncrementalSync( } } var pos types.StreamPosition - if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req); err != nil { + if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req, dbEvents); err != nil { req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") if err == context.DeadlineExceeded || err == context.Canceled || err == sql.ErrTxDone { return newPos @@ -253,6 +268,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( eventFilter *synctypes.RoomEventFilter, stateFilter *synctypes.StateFilter, req *types.SyncRequest, + dbEvents map[string]types.RecentEvents, ) (types.StreamPosition, error) { var err error originalLimit := eventFilter.Limit @@ -265,17 +281,6 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( } } - dbEvents, err := snapshot.RecentEvents( - ctx, []string{delta.RoomID}, r, - eventFilter, true, true, - ) - if err != nil { - if err == sql.ErrNoRows { - return r.To, nil - } - return r.From, fmt.Errorf("p.DB.RecentEvents: %w", err) - } - recentStreamEvents := dbEvents[delta.RoomID].Events limited := dbEvents[delta.RoomID].Limited From da54821c9f7a072d052e4b9046d1b81e569d953a Mon Sep 17 00:00:00 2001 From: Joakim Recht Date: Tue, 23 Jan 2024 10:51:20 +0100 Subject: [PATCH 2/5] handle newly joined rooms better by using a different range/filter for those --- syncapi/streams/stream_pdu.go | 90 ++++++++++++++++++++++++++--------- 1 file changed, 67 insertions(+), 23 deletions(-) diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 303148ba22..a1d9a4ea94 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -203,20 +203,14 @@ func (p *PDUStreamProvider) IncrementalSync( req.Log.WithError(err).Error("unable to update event filter with ignored users") } - roomIDs := make([]string, len(stateDeltas)) - for i := range stateDeltas { - roomIDs[i] = stateDeltas[i].RoomID - } - dbEvents, err := snapshot.RecentEvents( - ctx, roomIDs, r, - &eventFilter, true, true, - ) + dbEvents, err := p.getRecentEvents(ctx, stateDeltas, r, eventFilter, &stateFilter, req, snapshot) if err != nil { - if err == sql.ErrNoRows { - return r.To - } + req.Log.WithError(err).Error("unable to get recent events") return r.From } + if len(dbEvents) == 0 { + return r.To + } newPos = from for _, delta := range stateDeltas { @@ -255,6 +249,66 @@ func (p *PDUStreamProvider) IncrementalSync( return newPos } +func (p *PDUStreamProvider) getRecentEvents(ctx context.Context, stateDeltas []types.StateDelta, r types.Range, eventFilter synctypes.RoomEventFilter, stateFilter *synctypes.StateFilter, req *types.SyncRequest, snapshot storage.DatabaseTransaction) (map[string]types.RecentEvents, error) { + var roomIDs []string + var newlyJoinedRoomIDs []string + for _, delta := range stateDeltas { + if delta.NewlyJoined { + newlyJoinedRoomIDs = append(newlyJoinedRoomIDs, delta.RoomID) + } else { + roomIDs = append(roomIDs, delta.RoomID) + } + } + dbEvents := make(map[string]types.RecentEvents) + if len(roomIDs) > 0 { + events, err := snapshot.RecentEvents( + ctx, roomIDs, r, + &eventFilter, true, true, + ) + if err != nil { + if err != sql.ErrNoRows { + return nil, err + } + } + for k, v := range events { + dbEvents[k] = v + } + } + if len(newlyJoinedRoomIDs) > 0 { + // For rooms that were joined in this sync, try to fetch + // as much timeline events as allowed by the filter. + + filter := eventFilter + // If we're going backwards, grep at least X events, this is mostly to satisfy Sytest + if eventFilter.Limit < recentEventBackwardsLimit { + filter.Limit = recentEventBackwardsLimit // TODO: Figure out a better way + diff := r.From - r.To + if diff > 0 && diff < recentEventBackwardsLimit { + filter.Limit = int(diff) + } + } + + events, err := snapshot.RecentEvents( + ctx, newlyJoinedRoomIDs, types.Range{ + From: r.To, + To: 0, + Backwards: true, + }, + &filter, true, true, + ) + if err != nil { + if err != sql.ErrNoRows { + return nil, err + } + } + for k, v := range events { + dbEvents[k] = v + } + } + + return dbEvents, nil +} + // Limit the recent events to X when going backwards const recentEventBackwardsLimit = 100 @@ -271,16 +325,6 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( dbEvents map[string]types.RecentEvents, ) (types.StreamPosition, error) { var err error - originalLimit := eventFilter.Limit - // If we're going backwards, grep at least X events, this is mostly to satisfy Sytest - if r.Backwards && originalLimit < recentEventBackwardsLimit { - eventFilter.Limit = recentEventBackwardsLimit // TODO: Figure out a better way - diff := r.From - r.To - if diff > 0 && diff < recentEventBackwardsLimit { - eventFilter.Limit = int(diff) - } - } - recentStreamEvents := dbEvents[delta.RoomID].Events limited := dbEvents[delta.RoomID].Limited @@ -342,9 +386,9 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( logrus.WithError(err).Error("unable to apply history visibility filter") } - if r.Backwards && len(events) > originalLimit { + if r.Backwards && len(events) > eventFilter.Limit { // We're going backwards and the events are ordered chronologically, so take the last `limit` events - events = events[len(events)-originalLimit:] + events = events[len(events)-eventFilter.Limit:] limited = true } From 432583ed472b0c742133302e0cb5f0495292efa0 Mon Sep 17 00:00:00 2001 From: Joakim Recht Date: Tue, 23 Jan 2024 12:53:11 +0100 Subject: [PATCH 3/5] remove unused argument --- syncapi/streams/stream_pdu.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index a1d9a4ea94..697b91d5ef 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -203,7 +203,7 @@ func (p *PDUStreamProvider) IncrementalSync( req.Log.WithError(err).Error("unable to update event filter with ignored users") } - dbEvents, err := p.getRecentEvents(ctx, stateDeltas, r, eventFilter, &stateFilter, req, snapshot) + dbEvents, err := p.getRecentEvents(ctx, stateDeltas, r, eventFilter, req, snapshot) if err != nil { req.Log.WithError(err).Error("unable to get recent events") return r.From @@ -249,7 +249,7 @@ func (p *PDUStreamProvider) IncrementalSync( return newPos } -func (p *PDUStreamProvider) getRecentEvents(ctx context.Context, stateDeltas []types.StateDelta, r types.Range, eventFilter synctypes.RoomEventFilter, stateFilter *synctypes.StateFilter, req *types.SyncRequest, snapshot storage.DatabaseTransaction) (map[string]types.RecentEvents, error) { +func (p *PDUStreamProvider) getRecentEvents(ctx context.Context, stateDeltas []types.StateDelta, r types.Range, eventFilter synctypes.RoomEventFilter, req *types.SyncRequest, snapshot storage.DatabaseTransaction) (map[string]types.RecentEvents, error) { var roomIDs []string var newlyJoinedRoomIDs []string for _, delta := range stateDeltas { From 5bb2f334b00061946a38be1861c3dc6bf10a153f Mon Sep 17 00:00:00 2001 From: Joakim Recht Date: Tue, 23 Jan 2024 14:24:51 +0100 Subject: [PATCH 4/5] remove unused param --- syncapi/streams/stream_pdu.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 697b91d5ef..8cee290f46 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -203,7 +203,7 @@ func (p *PDUStreamProvider) IncrementalSync( req.Log.WithError(err).Error("unable to update event filter with ignored users") } - dbEvents, err := p.getRecentEvents(ctx, stateDeltas, r, eventFilter, req, snapshot) + dbEvents, err := p.getRecentEvents(ctx, stateDeltas, r, eventFilter, snapshot) if err != nil { req.Log.WithError(err).Error("unable to get recent events") return r.From @@ -249,7 +249,7 @@ func (p *PDUStreamProvider) IncrementalSync( return newPos } -func (p *PDUStreamProvider) getRecentEvents(ctx context.Context, stateDeltas []types.StateDelta, r types.Range, eventFilter synctypes.RoomEventFilter, req *types.SyncRequest, snapshot storage.DatabaseTransaction) (map[string]types.RecentEvents, error) { +func (p *PDUStreamProvider) getRecentEvents(ctx context.Context, stateDeltas []types.StateDelta, r types.Range, eventFilter synctypes.RoomEventFilter, snapshot storage.DatabaseTransaction) (map[string]types.RecentEvents, error) { var roomIDs []string var newlyJoinedRoomIDs []string for _, delta := range stateDeltas { From c48a7aad583121c8b7d5cee0b4d09817e5ee86f1 Mon Sep 17 00:00:00 2001 From: Joakim Recht Date: Tue, 23 Jan 2024 16:41:45 +0100 Subject: [PATCH 5/5] Don't return early if there are no events --- syncapi/streams/stream_pdu.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 8cee290f46..790f5bd1b1 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -208,9 +208,6 @@ func (p *PDUStreamProvider) IncrementalSync( req.Log.WithError(err).Error("unable to get recent events") return r.From } - if len(dbEvents) == 0 { - return r.To - } newPos = from for _, delta := range stateDeltas {