Skip to content

Commit

Permalink
{chat,user}info: add resync queue
Browse files Browse the repository at this point in the history
  • Loading branch information
tulir committed Sep 26, 2024
1 parent c4a466b commit d07fcc8
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 28 deletions.
8 changes: 7 additions & 1 deletion cmd/mautrix-whatsapp/legacymigrate.sql
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,13 @@ SELECT
END, -- room_type
CASE WHEN expiration_time>0 THEN 'after_read' END, -- disappear_type
CASE WHEN expiration_time > 0 THEN expiration_time * 1000000000 END, -- disappear_timer TODO check multiplier
'{}' -- metadata
-- only: postgres
jsonb_build_object
-- only: sqlite (line commented)
-- json_object
(
'last_sync', last_sync,
) -- metadata
FROM portal_old;

INSERT INTO user_portal (bridge_id, user_mxid, login_id, portal_id, portal_receiver, in_space, preferred, last_read)
Expand Down
8 changes: 1 addition & 7 deletions pkg/connector/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,7 @@ var _ bridgev2.BackfillingNetworkAPI = (*WhatsAppClient)(nil)

const historySyncDispatchWait = 30 * time.Second

func (wa *WhatsAppClient) historySyncLoop() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
oldStop := wa.stopHistorySyncLoop.Swap(&cancel)
if oldStop != nil {
(*oldStop)()
}
func (wa *WhatsAppClient) historySyncLoop(ctx context.Context) {
dispatchTimer := time.NewTimer(historySyncDispatchWait)
dispatchTimer.Stop()
wa.UserLogin.Log.Debug().Msg("Starting history sync loop")
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

var WhatsAppGeneralCaps = &bridgev2.NetworkGeneralCapabilities{
DisappearingMessages: true,
AggressiveUpdateInfo: false,
AggressiveUpdateInfo: true,
}

func (wa *WhatsAppConnector) GetCapabilities() *bridgev2.NetworkGeneralCapabilities {
Expand Down
37 changes: 30 additions & 7 deletions pkg/connector/chatinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package connector

import (
"context"
"errors"
"fmt"
"time"

"github.com/rs/zerolog"
"go.mau.fi/util/jsontime"
"go.mau.fi/util/ptr"
"go.mau.fi/whatsmeow"
"go.mau.fi/whatsmeow/types"
Expand Down Expand Up @@ -43,6 +45,7 @@ func (wa *WhatsAppClient) getChatInfo(ctx context.Context, portalJID types.JID,
return nil, err
}
wrapped = wa.wrapGroupInfo(info)
wrapped.ExtraUpdates = bridgev2.MergeExtraUpdaters(wrapped.ExtraUpdates, updatePortalLastSyncAt)
case types.NewsletterServer:
info, err := wa.Client.GetNewsletterInfo(portalJID)
if err != nil {
Expand All @@ -65,6 +68,13 @@ func (wa *WhatsAppClient) getChatInfo(ctx context.Context, portalJID types.JID,
return wrapped, nil
}

func updatePortalLastSyncAt(_ context.Context, portal *bridgev2.Portal) bool {
meta := portal.Metadata.(*waid.PortalMetadata)
forceSave := time.Since(meta.LastSync.Time) > 24*time.Hour
meta.LastSync = jsontime.UnixNow()
return forceSave
}

func updateDisappearingTimerSetAt(ts int64) bridgev2.ExtraUpdater[*bridgev2.Portal] {
return func(_ context.Context, portal *bridgev2.Portal) bool {
meta := portal.Metadata.(*waid.PortalMetadata)
Expand Down Expand Up @@ -331,28 +341,41 @@ func (wa *WhatsAppClient) makePortalAvatarFetcher(avatarID string, sender types.
if existingID == "remove" || existingID == "unauthorized" {
existingID = ""
}
var wrappedAvatar *bridgev2.Avatar
avatar, err := wa.Client.GetProfilePictureInfo(jid, &whatsmeow.GetProfilePictureParams{
ExistingID: existingID,
IsCommunity: portal.RoomType == database.RoomTypeSpace,
})
if err != nil {
if errors.Is(err, whatsmeow.ErrProfilePictureNotSet) {
wrappedAvatar = &bridgev2.Avatar{
ID: "remove",
Remove: true,
}
} else if errors.Is(err, whatsmeow.ErrProfilePictureUnauthorized) {
wrappedAvatar = &bridgev2.Avatar{
ID: "unauthorized",
Remove: true,
}
} else if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to get avatar info")
return false
} else if avatar == nil {
return false
} else {
wrappedAvatar = &bridgev2.Avatar{
ID: networkid.AvatarID(avatar.ID),
Get: func(ctx context.Context) ([]byte, error) {
return wa.Client.DownloadMediaWithPath(avatar.DirectPath, nil, nil, nil, 0, "", "")
},
}
}
var evtSender bridgev2.EventSender
if !sender.IsEmpty() {
evtSender = wa.makeEventSender(sender)
}
senderIntent := portal.GetIntentFor(ctx, evtSender, wa.UserLogin, bridgev2.RemoteEventChatInfoChange)
//lint:ignore SA1019 TODO invent a cleaner way to fetch avatar metadata before updating?
return portal.Internal().UpdateAvatar(ctx, &bridgev2.Avatar{
ID: networkid.AvatarID(avatar.ID),
Get: func(ctx context.Context) ([]byte, error) {
return wa.Client.DownloadMediaWithPath(avatar.DirectPath, nil, nil, nil, 0, "", "")
},
}, senderIntent, ts)
return portal.Internal().UpdateAvatar(ctx, wrappedAvatar, senderIntent, ts)
}
}

Expand Down
29 changes: 25 additions & 4 deletions pkg/connector/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package connector
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/rs/zerolog"
"go.mau.fi/whatsmeow"
Expand All @@ -24,6 +26,7 @@ func (wa *WhatsAppConnector) LoadUserLogin(_ context.Context, login *bridgev2.Us
UserLogin: login,

historySyncs: make(chan *waHistorySync.HistorySync, 64),
resyncQueue: make(map[types.JID]resyncQueueItem),
}
login.Client = w

Expand Down Expand Up @@ -52,15 +55,23 @@ func (wa *WhatsAppConnector) LoadUserLogin(_ context.Context, login *bridgev2.Us
return nil
}

type resyncQueueItem struct {
portal *bridgev2.Portal
ghost *bridgev2.Ghost
}

type WhatsAppClient struct {
Main *WhatsAppConnector
UserLogin *bridgev2.UserLogin
Client *whatsmeow.Client
Device *store.Device
JID types.JID

historySyncs chan *waHistorySync.HistorySync
stopHistorySyncLoop atomic.Pointer[context.CancelFunc]
historySyncs chan *waHistorySync.HistorySync
stopLoops atomic.Pointer[context.CancelFunc]
resyncQueue map[types.JID]resyncQueueItem
resyncQueueLock sync.Mutex
nextResync time.Time
}

var _ bridgev2.NetworkAPI = (*WhatsAppClient)(nil)
Expand Down Expand Up @@ -103,12 +114,22 @@ func (wa *WhatsAppClient) Connect(ctx context.Context) error {
if err := wa.Main.updateProxy(ctx, wa.Client, false); err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to update proxy")
}
go wa.historySyncLoop()
wa.startLoops()
return wa.Client.Connect()
}

func (wa *WhatsAppClient) startLoops() {
ctx, cancel := context.WithCancel(context.Background())
oldStop := wa.stopLoops.Swap(&cancel)
if oldStop != nil {
(*oldStop)()
}
go wa.historySyncLoop(ctx)
go wa.ghostResyncLoop(ctx)
}

func (wa *WhatsAppClient) Disconnect() {
if stopHistorySyncLoop := wa.stopHistorySyncLoop.Swap(nil); stopHistorySyncLoop != nil {
if stopHistorySyncLoop := wa.stopLoops.Swap(nil); stopHistorySyncLoop != nil {
(*stopHistorySyncLoop)()
}
if cli := wa.Client; cli != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/connector/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func (evt *WAMessageEvent) HandleExisting(ctx context.Context, portal *bridgev2.
}

func (evt *WAMessageEvent) ConvertMessage(ctx context.Context, portal *bridgev2.Portal, intent bridgev2.MatrixAPI) (*bridgev2.ConvertedMessage, error) {
evt.wa.EnqueuePortalResync(portal)
converted := evt.wa.Main.MsgConv.ToMatrix(ctx, portal, evt.wa.Client, intent, evt.Message, &evt.Info)
return converted, nil
}
Expand Down
Loading

0 comments on commit d07fcc8

Please sign in to comment.