Skip to content

Commit 7ab71d4

Browse files
committed
Make sure historical events aren't sent as AS transactions
Regression tests for matrix-org/synapse#11241 Synapse changes: matrix-org/synapse#11265
1 parent 865848d commit 7ab71d4

File tree

2 files changed

+99
-1
lines changed

2 files changed

+99
-1
lines changed

internal/b/hs_with_application_service.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ var BlueprintHSWithApplicationService = MustValidate(Blueprint{
1515
ApplicationServices: []ApplicationService{
1616
{
1717
ID: "my_as_id",
18-
URL: "http://localhost:9000",
18+
URL: "http://host.docker.internal:9111",
1919
SenderLocalpart: "the-bridge-user",
2020
RateLimited: false,
2121
},

tests/msc2716_test.go

+98
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@ package tests
88

99
import (
1010
"bytes"
11+
"context"
1112
"fmt"
1213
"io/ioutil"
1314
"net/http"
1415
"net/url"
1516
"testing"
1617
"time"
1718

19+
"github.com/gorilla/mux"
1820
"github.com/tidwall/gjson"
1921

2022
"github.com/matrix-org/complement/internal/b"
@@ -307,6 +309,102 @@ func TestImportHistoricalMessages(t *testing.T) {
307309
})
308310
})
309311

312+
t.Run("Historical events from batch_send do not get pushed out as application service transactions", func(t *testing.T) {
313+
t.Parallel()
314+
315+
// Create a listener and handler to stub an application service listening
316+
// for transactions from a homeserver.
317+
handler := mux.NewRouter()
318+
// Application Service API: /_matrix/app/v1/transactions/{txnId}
319+
waiter := NewWaiter()
320+
var eventIDsWeSawOverTransactions []string
321+
var eventIDAfterHistoricalImport string
322+
handler.HandleFunc("/transactions/{txnId}", func(w http.ResponseWriter, req *http.Request) {
323+
must.MatchRequest(t, req, match.HTTPRequest{
324+
JSON: []match.JSON{
325+
match.JSONArrayEach("events", func(r gjson.Result) error {
326+
// Add to our running list of events
327+
eventIDsWeSawOverTransactions = append(eventIDsWeSawOverTransactions, r.Get("event_id").Str)
328+
329+
// If we found the event that occurs after our batch send. we can
330+
// probably safely assume the historical events won't come later.
331+
if r.Get("event_id").Str != "" && r.Get("event_id").Str == eventIDAfterHistoricalImport {
332+
defer waiter.Finish()
333+
}
334+
335+
return nil
336+
}),
337+
},
338+
})
339+
340+
// Acknowledge that we've seen the transaction
341+
w.WriteHeader(200)
342+
w.Write([]byte("{}"))
343+
}).Methods("PUT")
344+
345+
srv := &http.Server{
346+
Addr: ":9111",
347+
Handler: handler,
348+
}
349+
go func() {
350+
srv.ListenAndServe()
351+
}()
352+
defer srv.Shutdown(context.Background())
353+
// ----------------------------------------------------------
354+
355+
// Create the room all of the action is going to happen in
356+
roomID := as.CreateRoom(t, createPublicRoomOpts)
357+
alice.JoinRoom(t, roomID, nil)
358+
359+
// Create the "live" event we are going to insert our historical events next to
360+
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1)
361+
eventIdBefore := eventIDsBefore[0]
362+
timeAfterEventBefore := time.Now()
363+
364+
// Import a historical event
365+
batchSendRes := batchSendHistoricalMessages(
366+
t,
367+
as,
368+
roomID,
369+
eventIdBefore,
370+
"",
371+
createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore),
372+
createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, 1),
373+
// Status
374+
200,
375+
)
376+
batchSendResBody := client.ParseJSON(t, batchSendRes)
377+
historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids")
378+
historicalStateEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "state_event_ids")
379+
380+
// This is just a dummy event we search for after the historicalEventIDs/historicalStateEventIDs
381+
eventIDsAfterHistoricalImport := createMessagesInRoom(t, alice, roomID, 1)
382+
eventIDAfterHistoricalImport = eventIDsAfterHistoricalImport[0]
383+
384+
// Check if eventIDAfterHistoricalImport already came over `/transactions`.
385+
if !includes(eventIDAfterHistoricalImport, eventIDsWeSawOverTransactions) {
386+
// If not, wait 5 seconds for to see if it happens. The waiter will only
387+
// resolve if we see eventIDAfterHistoricalImport, otherwise timeout
388+
waiter.Wait(t, 5*time.Second)
389+
}
390+
391+
// Now, that we know eventIDAfterHistoricalImport came over /transactions,
392+
// we can probably safely assume the historical events won't come later.
393+
394+
// Check to make sure the historical events didn't come over /transactions
395+
for _, historicalEventID := range historicalEventIDs {
396+
if includes(historicalEventID, eventIDsWeSawOverTransactions) {
397+
t.Fatalf("We should not see the %s historical event come over /transactions but it did", historicalEventID)
398+
}
399+
}
400+
// Check to make sure the historical state events didn't come over /transactions
401+
for _, historicalStateEventID := range historicalStateEventIDs {
402+
if includes(historicalStateEventID, eventIDsWeSawOverTransactions) {
403+
t.Fatalf("We should not see the %s historical state event come over /transactions but it did", historicalStateEventID)
404+
}
405+
}
406+
})
407+
310408
t.Run("Batch send endpoint only returns state events that we passed in via state_events_at_start", func(t *testing.T) {
311409
t.Parallel()
312410

0 commit comments

Comments
 (0)