mattermost-community-enterp.../platform/services/sharedchannel/sync_send_remote.go
Claude ec1f89217a Merge: Complete Mattermost Server with Community Enterprise
Full Mattermost server source with integrated Community Enterprise features.
Includes vendor directory for offline/air-gapped builds.

Structure:
- enterprise-impl/: Enterprise feature implementations
- enterprise-community/: Init files that register implementations
- enterprise/: Bridge imports (community_imports.go)
- vendor/: All dependencies for offline builds

Build (online):
  go build ./cmd/mattermost

Build (offline/air-gapped):
  go build -mod=vendor ./cmd/mattermost

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-17 23:59:07 +09:00

1121 lines
37 KiB
Go

// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package sharedchannel
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"github.com/wiggin77/merror"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/shared/mlog"
"github.com/mattermost/mattermost/server/public/shared/request"
"github.com/mattermost/mattermost/server/v8/channels/store"
"github.com/mattermost/mattermost/server/v8/platform/services/remotecluster"
)
type sendSyncMsgResultFunc func(syncResp model.SyncResponse, err error)
type attachment struct {
fi *model.FileInfo
post *model.Post
}
type syncData struct {
task syncTask
rc *model.RemoteCluster
scr *model.SharedChannelRemote
users map[string]*model.User
profileImages map[string]*model.User
posts []*model.Post
reactions []*model.Reaction
acknowledgements []*model.PostAcknowledgement
statuses []*model.Status
attachments []attachment
mentionTransforms map[string]string
resultRepeat bool
resultNextCursor model.GetPostsSinceForSyncCursor
GlobalUserSyncLastTimestamp int64
}
func newSyncData(task syncTask, rc *model.RemoteCluster, scr *model.SharedChannelRemote) *syncData {
return &syncData{
task: task,
rc: rc,
scr: scr,
users: make(map[string]*model.User),
profileImages: make(map[string]*model.User),
mentionTransforms: make(map[string]string),
resultNextCursor: model.GetPostsSinceForSyncCursor{
LastPostUpdateAt: scr.LastPostUpdateAt, LastPostUpdateID: scr.LastPostUpdateID,
LastPostCreateAt: scr.LastPostCreateAt, LastPostCreateID: scr.LastPostCreateID,
},
}
}
func (sd *syncData) isEmpty() bool {
return len(sd.users) == 0 && len(sd.profileImages) == 0 && len(sd.posts) == 0 && len(sd.reactions) == 0 && len(sd.acknowledgements) == 0 && len(sd.attachments) == 0
}
func (sd *syncData) isCursorChanged() bool {
if sd.resultNextCursor.IsEmpty() {
return false
}
return sd.scr.LastPostCreateAt != sd.resultNextCursor.LastPostCreateAt || sd.scr.LastPostCreateID != sd.resultNextCursor.LastPostCreateID ||
sd.scr.LastPostUpdateAt != sd.resultNextCursor.LastPostUpdateAt || sd.scr.LastPostUpdateID != sd.resultNextCursor.LastPostUpdateID
}
func (sd *syncData) setDataFromMsg(msg *model.SyncMsg) {
sd.users = msg.Users
sd.posts = msg.Posts
sd.reactions = msg.Reactions
sd.acknowledgements = msg.Acknowledgements
sd.statuses = msg.Statuses
}
// syncForRemote updates a remote cluster with any new posts/reactions for a specific
// channel. If many changes are found, only the oldest X changes are sent and the channel
// is re-added to the task map. This ensures no channels are starved for updates even if some
// channels are very active.
// Returning an error forces a retry on the task.
func (scs *Service) syncForRemote(task syncTask, rc *model.RemoteCluster) error {
// Empty channelID indicates a global user sync task
// Normal syncTasks always include a valid channelID
if task.channelID == "" {
return scs.syncAllUsers(rc)
}
rcs := scs.server.GetRemoteClusterService()
if rcs == nil {
return fmt.Errorf("cannot update remote cluster %s for channel id %s; Remote Cluster Service not enabled", rc.Name, task.channelID)
}
metrics := scs.server.GetMetrics()
start := time.Now()
var metricsRecorded bool
defer func() {
if !metricsRecorded && metrics != nil {
metrics.IncrementSharedChannelsSyncCounter(rc.RemoteId)
metrics.ObserveSharedChannelsSyncCollectionDuration(rc.RemoteId, time.Since(start).Seconds())
metricsRecorded = true
}
}()
scr, err := scs.server.GetStore().SharedChannel().GetRemoteByIds(task.channelID, rc.RemoteId)
if isNotFoundError(err) && rc.IsOptionFlagSet(model.BitflagOptionAutoInvited) {
// if SharedChannelRemote not found and remote has autoinvite flag, create a scr for it, thus inviting the remote.
scr = &model.SharedChannelRemote{
Id: model.NewId(),
ChannelId: task.channelID,
CreatorId: rc.CreatorId,
IsInviteAccepted: true,
IsInviteConfirmed: true,
RemoteId: rc.RemoteId,
LastPostCreateAt: model.GetMillis(),
LastPostUpdateAt: model.GetMillis(),
LastMembersSyncAt: 0,
}
if scr, err = scs.server.GetStore().SharedChannel().SaveRemote(scr); err != nil {
return fmt.Errorf("cannot auto-create shared channel remote (channel_id=%s, remote_id=%s): %w", task.channelID, rc.RemoteId, err)
}
scs.server.Log().Log(mlog.LvlSharedChannelServiceDebug, "Auto-invited remote to channel (BitflagOptionAutoInvited)",
mlog.String("remote", rc.DisplayName),
mlog.String("channel_id", task.channelID),
)
} else if err == nil && scr.DeleteAt != 0 {
// if SharedChannelRemote is deleted, regardless of the autoinvite flag, do nothing
return nil
} else if err != nil {
return err
}
sd := newSyncData(task, rc, scr)
// if this is retrying a failed msg, just send it again.
if task.retryMsg != nil {
sd.setDataFromMsg(task.retryMsg)
return scs.sendSyncData(sd)
}
// if this has an already existing msg, just send it right away
if task.existingMsg != nil {
sd.setDataFromMsg(task.existingMsg)
return scs.sendSyncData(sd)
}
// if we don't have a channelID at this point, we cannot fetch new
// data from the database
if task.channelID == "" {
return fmt.Errorf("task doesn't have prefetched data nor a channel ID set")
}
// schedule another sync if the repeat flag is set at some point.
defer func(rpt *bool) {
if *rpt {
scs.addTask(newSyncTask(task.channelID, task.userID, task.remoteID, nil, nil))
}
}(&sd.resultRepeat)
// fetch new posts or retry post.
if err := scs.fetchPostsForSync(sd); err != nil {
return fmt.Errorf("cannot fetch posts for sync %v: %w", sd, err)
}
if !rc.IsOnline() {
if len(sd.posts) != 0 {
scs.notifyRemoteOffline(sd.posts, rc)
}
sd.resultRepeat = false
return nil
}
// fetch users that have updated their user profile or image.
if err := scs.fetchUsersForSync(sd); err != nil {
return fmt.Errorf("cannot fetch users for sync %v: %w", sd, err)
}
// fetch reactions for posts
if err := scs.fetchReactionsForSync(sd); err != nil {
return fmt.Errorf("cannot fetch reactions for sync %v: %w", sd, err)
}
// fetch acknowledgements for posts
if err := scs.fetchAcknowledgementsForSync(sd); err != nil {
return fmt.Errorf("cannot fetch acknowledgements for sync %v: %w", sd, err)
}
// fetch users associated with posts & reactions
if err := scs.fetchPostUsersForSync(sd); err != nil {
return fmt.Errorf("cannot fetch post users for sync %v: %w", sd, err)
}
// filter out any posts that don't need to be sent.
scs.filterPostsForSync(sd)
// fetch attachments for posts
if err := scs.fetchPostAttachmentsForSync(sd); err != nil {
return fmt.Errorf("cannot fetch post attachments for sync %v: %w", sd, err)
}
if sd.isEmpty() {
scs.server.Log().Log(mlog.LvlSharedChannelServiceDebug, "Not sending sync data; everything filtered out",
mlog.String("remote", rc.DisplayName),
mlog.String("channel_id", task.channelID),
mlog.Bool("repeat", sd.resultRepeat),
)
if sd.isCursorChanged() {
scs.updateCursorForRemote(sd.scr.Id, sd.rc, sd.resultNextCursor)
}
return nil
}
scs.server.Log().Log(mlog.LvlSharedChannelServiceDebug, "Sending sync data",
mlog.String("remote", rc.DisplayName),
mlog.String("channel_id", task.channelID),
mlog.Bool("repeat", sd.resultRepeat),
mlog.Int("users", len(sd.users)),
mlog.Int("images", len(sd.profileImages)),
mlog.Int("posts", len(sd.posts)),
mlog.Int("reactions", len(sd.reactions)),
mlog.Int("acknowledgements", len(sd.acknowledgements)),
mlog.Int("attachments", len(sd.attachments)),
)
if !metricsRecorded && metrics != nil {
metrics.IncrementSharedChannelsSyncCounter(rc.RemoteId)
metrics.ObserveSharedChannelsSyncCollectionDuration(rc.RemoteId, time.Since(start).Seconds())
metricsRecorded = true
}
return scs.sendSyncData(sd)
}
// fetchUsersForSync populates the sync data with any channel users who updated their user profile
// since the last sync.
func (scs *Service) fetchUsersForSync(sd *syncData) error {
start := time.Now()
defer func() {
if metrics := scs.server.GetMetrics(); metrics != nil {
metrics.ObserveSharedChannelsSyncCollectionStepDuration(sd.rc.RemoteId, "Users", time.Since(start).Seconds())
}
}()
filter := model.GetUsersForSyncFilter{
ChannelID: sd.task.channelID,
Limit: MaxUsersPerSync,
}
users, err := scs.server.GetStore().SharedChannel().GetUsersForSync(filter)
if err != nil {
return err
}
// Don't sync users back to the remote cluster they originated from
for _, u := range users {
if u.GetRemoteID() != sd.rc.RemoteId {
sd.users[u.Id] = u
}
}
filter.CheckProfileImage = true
usersImage, err := scs.server.GetStore().SharedChannel().GetUsersForSync(filter)
if err != nil {
return err
}
for _, u := range usersImage {
if u.GetRemoteID() != sd.rc.RemoteId {
sd.profileImages[u.Id] = u
}
}
return nil
}
// fetchPostsForSync populates the sync data with any new or edited posts since the last sync.
func (scs *Service) fetchPostsForSync(sd *syncData) error {
start := time.Now()
defer func() {
if metrics := scs.server.GetMetrics(); metrics != nil {
metrics.ObserveSharedChannelsSyncCollectionStepDuration(sd.rc.RemoteId, "Posts", time.Since(start).Seconds())
}
}()
options := model.GetPostsSinceForSyncOptions{
ChannelId: sd.task.channelID,
IncludeDeleted: true,
SinceCreateAt: true,
ExcludeChannelMetadataSystemPosts: true,
}
cursor := model.GetPostsSinceForSyncCursor{
LastPostUpdateAt: sd.scr.LastPostUpdateAt,
LastPostUpdateID: sd.scr.LastPostUpdateID,
LastPostCreateAt: sd.scr.LastPostCreateAt,
LastPostCreateID: sd.scr.LastPostCreateID,
}
maxPostsPerSync := *scs.server.Config().ConnectedWorkspacesSettings.MaxPostsPerSync
// Fetch all newly created posts first. This is to ensure that post order is preserved for sync targets
// that cannot set the CreateAt timestamp for incoming posts (e.g. MS Teams). If we simply used UpdateAt
// then posts could get out of order. For example: p1 created, p2 created, p1 updated... sync'ing on UpdateAt
// would order the posts p2, p1.
posts, nextCursor, err := scs.server.GetStore().Post().GetPostsSinceForSync(options, cursor, maxPostsPerSync)
if err != nil {
return fmt.Errorf("could not fetch new posts for sync: %w", err)
}
count := len(posts)
sd.posts = appendPosts(sd.posts, posts, scs.server.GetStore().Post(), cursor.LastPostCreateAt, scs.server.Log())
cache := postsSliceToMap(posts)
// Fill remaining batch capacity with updated posts.
if len(posts) < maxPostsPerSync {
options.SinceCreateAt = false
// use 'nextcursor' as it has the correct xxxUpdateAt values, and the updsted xxxCreateAt values.
posts, nextCursor, err = scs.server.GetStore().Post().GetPostsSinceForSync(options, nextCursor, maxPostsPerSync-len(posts))
if err != nil {
return fmt.Errorf("could not fetch modified posts for sync: %w", err)
}
posts = reducePostsSliceInCache(posts, cache)
count += len(posts)
sd.posts = appendPosts(sd.posts, posts, scs.server.GetStore().Post(), cursor.LastPostUpdateAt, scs.server.Log())
}
// Populate metadata for all posts before syncing
for i, post := range sd.posts {
if post != nil {
sd.posts[i] = scs.app.PreparePostForClient(request.EmptyContext(scs.server.Log()), post, &model.PreparePostForClientOpts{IncludePriority: true})
}
}
sd.resultNextCursor = nextCursor
sd.resultRepeat = count >= maxPostsPerSync
return nil
}
func appendPosts(dest []*model.Post, posts []*model.Post, postStore store.PostStore, timestamp int64, logger mlog.LoggerIFace) []*model.Post {
// Append the posts individually, checking for root posts that might appear later in the list.
// This is due to the UpdateAt collision handling algorithm where the order of posts is not based
// on UpdateAt or CreateAt when the posts have the same UpdateAt value. Here we are guarding
// against a root post with the same UpdateAt (and probably the same CreateAt) appearing later
// in the list and must be sync'd before the child post. This is an edge case that likely only
// happens during load testing or bulk imports.
for _, p := range posts {
if p.RootId != "" {
root, err := postStore.GetSingle(request.EmptyContext(logger), p.RootId, true)
if err == nil {
if (root.CreateAt >= timestamp || root.UpdateAt >= timestamp) && !containsPost(dest, root) {
dest = append(dest, root)
}
}
}
dest = append(dest, p)
}
return dest
}
func containsPost(posts []*model.Post, post *model.Post) bool {
for _, p := range posts {
if p.Id == post.Id {
return true
}
}
return false
}
// fetchReactionsForSync populates the sync data with any new reactions since the last sync.
func (scs *Service) fetchReactionsForSync(sd *syncData) error {
start := time.Now()
defer func() {
if metrics := scs.server.GetMetrics(); metrics != nil {
metrics.ObserveSharedChannelsSyncCollectionStepDuration(sd.rc.RemoteId, "Reactions", time.Since(start).Seconds())
}
}()
merr := merror.New()
for _, post := range sd.posts {
// any reactions originating from the remote cluster are filtered out
reactions, err := scs.server.GetStore().Reaction().GetForPostSince(post.Id, sd.scr.LastPostUpdateAt, sd.rc.RemoteId, true)
if err != nil {
merr.Append(fmt.Errorf("could not get reactions for post %s: %w", post.Id, err))
continue
}
sd.reactions = append(sd.reactions, reactions...)
}
return merr.ErrorOrNil()
}
// fetchAcknowledgementsForSync populates the sync data with any new acknowledgements since the last sync.
func (scs *Service) fetchAcknowledgementsForSync(sd *syncData) error {
start := time.Now()
defer func() {
if metrics := scs.server.GetMetrics(); metrics != nil {
metrics.ObserveSharedChannelsSyncCollectionStepDuration(sd.rc.RemoteId, "Acknowledgements", time.Since(start).Seconds())
}
}()
merr := merror.New()
for _, post := range sd.posts {
// any acknowledgements originating from the remote cluster are filtered out
acknowledgements, err := scs.server.GetStore().PostAcknowledgement().GetForPostSince(post.Id, sd.scr.LastPostUpdateAt, sd.rc.RemoteId, true)
if err != nil {
merr.Append(fmt.Errorf("could not get acknowledgements for post %s: %w", post.Id, err))
continue
}
sd.acknowledgements = append(sd.acknowledgements, acknowledgements...)
}
return merr.ErrorOrNil()
}
// fetchPostUsersForSync populates the sync data with all users associated with posts.
func (scs *Service) fetchPostUsersForSync(sd *syncData) error {
start := time.Now()
defer func() {
if metrics := scs.server.GetMetrics(); metrics != nil {
metrics.ObserveSharedChannelsSyncCollectionStepDuration(sd.rc.RemoteId, "PostUsers", time.Since(start).Seconds())
}
}()
sc, err := scs.server.GetStore().SharedChannel().Get(sd.task.channelID)
if err != nil {
return fmt.Errorf("cannot determine teamID: %w", err)
}
type p2mm struct {
post *model.Post
mentionMap model.UserMentionMap
}
userIDs := make(map[string]p2mm)
for _, reaction := range sd.reactions {
userIDs[reaction.UserId] = p2mm{}
}
for _, acknowledgement := range sd.acknowledgements {
userIDs[acknowledgement.UserId] = p2mm{}
}
for _, post := range sd.posts {
// get mentions and users for each mention
mentionMap := scs.app.MentionsToTeamMembers(request.EmptyContext(scs.server.Log()), post.Message, sc.TeamId)
// Skip notifications for remote users unless mentioned with @username:remote format
for mention, userID := range mentionMap {
user, err := scs.server.GetStore().User().Get(context.Background(), userID)
if err != nil {
continue
}
// Skip remote users unless mention contains a colon (@username:remote)
if user.IsRemote() && !strings.Contains(mention, ":") {
continue
}
}
// add author with post and mentionMap so transformations can be applied
userIDs[post.UserId] = p2mm{
post: post,
mentionMap: mentionMap,
}
// Add all mentioned users
for _, userID := range mentionMap {
userIDs[userID] = p2mm{
post: post,
mentionMap: mentionMap,
}
}
}
merr := merror.New()
for userID, v := range userIDs {
user, err := scs.server.GetStore().User().Get(context.Background(), userID)
if err != nil {
merr.Append(fmt.Errorf("could not get user %s: %w", userID, err))
continue
}
sync, syncImage, err2 := scs.shouldUserSync(user, sd.task.channelID, sd.rc)
if err2 != nil {
merr.Append(fmt.Errorf("could not check should sync user %s: %w", userID, err2))
continue
}
if sync {
sd.users[user.Id] = user
}
if syncImage {
sd.profileImages[user.Id] = user
}
// Collect mention transforms for all mentioned users
if v.mentionMap != nil {
for mention, mentionUserID := range v.mentionMap {
if mentionUserID == userID {
// Always add the mention transform - let receiver decide how to display
// The sender should NOT modify the message, only provide the mapping
sd.mentionTransforms[mention] = userID
}
}
}
}
return merr.ErrorOrNil()
}
// fetchPostAttachmentsForSync populates the sync data with any file attachments for new posts.
func (scs *Service) fetchPostAttachmentsForSync(sd *syncData) error {
start := time.Now()
defer func() {
if metrics := scs.server.GetMetrics(); metrics != nil {
metrics.ObserveSharedChannelsSyncCollectionStepDuration(sd.rc.RemoteId, "Attachments", time.Since(start).Seconds())
}
}()
merr := merror.New()
for _, post := range sd.posts {
fis, err := scs.server.GetStore().FileInfo().GetForPost(post.Id, false, true, true)
if err != nil {
merr.Append(fmt.Errorf("could not get file attachment info for post %s: %w", post.Id, err))
continue
}
for _, fi := range fis {
if scs.shouldSyncAttachment(fi, sd.rc) {
sd.attachments = append(sd.attachments, attachment{fi: fi, post: post})
}
}
}
return merr.ErrorOrNil()
}
// filterPostsForSync removes any posts that do not need to sync.
func (scs *Service) filterPostsForSync(sd *syncData) {
filtered := make([]*model.Post, 0, len(sd.posts))
for _, p := range sd.posts {
// Don't resend an existing post where only the reactions changed.
// Posts we must send:
// - new posts (EditAt == 0)
// - edited posts (EditAt >= LastPostUpdateAt)
// - deleted posts (DeleteAt > 0)
// - posts with metadata changes (acknowledgements/priority)
hasMetadataChanges := p.Metadata != nil && (p.Metadata.Acknowledgements != nil || p.Metadata.Priority != nil)
if p.EditAt > 0 && p.EditAt < sd.scr.LastPostUpdateAt && p.DeleteAt == 0 && !hasMetadataChanges {
continue
}
// Don't send a deleted post if it is just the original copy from an edit.
if p.DeleteAt > 0 && p.OriginalId != "" {
continue
}
// don't sync a post back to the remote cluster it came from.
if p.GetRemoteID() == sd.rc.RemoteId {
continue
}
// parse out all permalinks in the message.
p.Message = scs.processPermalinkToRemote(p)
filtered = append(filtered, p)
}
sd.posts = filtered
}
// sendSyncData sends all the collected users, posts, reactions, images, and attachments to the
// remote cluster.
// The order of items sent is important: users -> attachments -> posts -> reactions -> profile images
func (scs *Service) sendSyncData(sd *syncData) error {
start := time.Now()
defer func() {
if metrics := scs.server.GetMetrics(); metrics != nil {
metrics.ObserveSharedChannelsSyncSendDuration(sd.rc.RemoteId, time.Since(start).Seconds())
}
}()
merr := merror.New()
sanitizeSyncData(sd)
// send users
if len(sd.users) != 0 {
if err := scs.sendUserSyncData(sd); err != nil {
merr.Append(fmt.Errorf("cannot send user sync data: %w", err))
}
}
// send attachments
if len(sd.attachments) != 0 {
scs.sendAttachmentSyncData(sd)
}
// send posts
if len(sd.posts) != 0 {
if err := scs.sendPostSyncData(sd); err != nil {
merr.Append(fmt.Errorf("cannot send post sync data: %w", err))
}
} else if sd.isCursorChanged() {
scs.updateCursorForRemote(sd.scr.Id, sd.rc, sd.resultNextCursor)
}
// send acknowledgements
if len(sd.acknowledgements) != 0 {
if err := scs.sendAcknowledgementSyncData(sd); err != nil {
merr.Append(fmt.Errorf("cannot send acknowledgement sync data: %w", err))
}
}
// send reactions
if len(sd.reactions) != 0 {
if err := scs.sendReactionSyncData(sd); err != nil {
merr.Append(fmt.Errorf("cannot send reaction sync data: %w", err))
}
}
// send statuses
if len(sd.statuses) != 0 {
if err := scs.sendStatusSyncData(sd); err != nil {
merr.Append(fmt.Errorf("cannot send status sync data: %w", err))
}
}
// send user profile images
if len(sd.profileImages) != 0 {
scs.sendProfileImageSyncData(sd)
}
return merr.ErrorOrNil()
}
// sendUserSyncData sends the collected user updates to the remote cluster.
func (scs *Service) sendUserSyncData(sd *syncData) error {
start := time.Now()
defer func() {
if metrics := scs.server.GetMetrics(); metrics != nil {
metrics.ObserveSharedChannelsSyncSendStepDuration(sd.rc.RemoteId, "Users", time.Since(start).Seconds())
}
}()
msg := model.NewSyncMsg(sd.task.channelID)
msg.Users = sd.users
err := scs.sendSyncMsgToRemote(msg, sd.rc, func(syncResp model.SyncResponse, errResp error) {
// Only update cursor on successful sync
if errResp == nil && sd.GlobalUserSyncLastTimestamp > 0 {
scs.updateGlobalSyncCursor(sd.rc, sd.GlobalUserSyncLastTimestamp)
}
for _, userID := range syncResp.UsersSyncd {
if err := scs.server.GetStore().SharedChannel().UpdateUserLastSyncAt(userID, sd.task.channelID, sd.rc.RemoteId); err != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Cannot update shared channel user LastSyncAt",
mlog.String("user_id", userID),
mlog.String("channel_id", sd.task.channelID),
mlog.String("remote_id", sd.rc.RemoteId),
mlog.Err(err),
)
}
}
if len(syncResp.UserErrors) != 0 {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Response indicates error for user(s) sync",
mlog.String("channel_id", sd.task.channelID),
mlog.String("remote_id", sd.rc.RemoteId),
mlog.Array("users", syncResp.UserErrors),
)
}
})
return err
}
// sendAttachmentSyncData sends the collected post updates to the remote cluster.
func (scs *Service) sendAttachmentSyncData(sd *syncData) {
for _, a := range sd.attachments {
if err := scs.sendAttachmentForRemote(a.fi, a.post, sd.rc); err != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Cannot sync post attachment",
mlog.String("post_id", a.post.Id),
mlog.String("channel_id", sd.task.channelID),
mlog.String("remote_id", sd.rc.RemoteId),
mlog.Err(err),
)
}
// updating SharedChannelAttachments with LastSyncAt is already done.
}
}
// sendPostSyncData sends the collected post updates to the remote cluster.
func (scs *Service) sendPostSyncData(sd *syncData) error {
start := time.Now()
defer func() {
if metrics := scs.server.GetMetrics(); metrics != nil {
metrics.ObserveSharedChannelsSyncSendStepDuration(sd.rc.RemoteId, "Posts", time.Since(start).Seconds())
}
}()
msg := model.NewSyncMsg(sd.task.channelID)
msg.Posts = sd.posts
msg.MentionTransforms = sd.mentionTransforms
return scs.sendSyncMsgToRemote(msg, sd.rc, func(syncResp model.SyncResponse, errResp error) {
if len(syncResp.PostErrors) != 0 {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Response indicates error for post(s) sync",
mlog.String("channel_id", sd.task.channelID),
mlog.String("remote_id", sd.rc.RemoteId),
mlog.Array("posts", syncResp.PostErrors),
)
for _, postID := range syncResp.PostErrors {
scs.handlePostError(postID, sd.task, sd.rc)
}
}
scs.updateCursorForRemote(sd.scr.Id, sd.rc, sd.resultNextCursor)
})
}
// sendReactionSyncData sends the collected reaction updates to the remote cluster.
func (scs *Service) sendReactionSyncData(sd *syncData) error {
start := time.Now()
defer func() {
if metrics := scs.server.GetMetrics(); metrics != nil {
metrics.ObserveSharedChannelsSyncSendStepDuration(sd.rc.RemoteId, "Reactions", time.Since(start).Seconds())
}
}()
msg := model.NewSyncMsg(sd.task.channelID)
msg.Reactions = sd.reactions
return scs.sendSyncMsgToRemote(msg, sd.rc, func(syncResp model.SyncResponse, errResp error) {
if len(syncResp.ReactionErrors) != 0 {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Response indicates error for reactions(s) sync",
mlog.String("channel_id", sd.task.channelID),
mlog.String("remote_id", sd.rc.RemoteId),
mlog.Array("reaction_posts", syncResp.ReactionErrors),
)
}
})
}
// sendAcknowledgementSyncData sends the collected acknowledgement updates to the remote cluster.
func (scs *Service) sendAcknowledgementSyncData(sd *syncData) error {
start := time.Now()
defer func() {
if metrics := scs.server.GetMetrics(); metrics != nil {
metrics.ObserveSharedChannelsSyncSendStepDuration(sd.rc.RemoteId, "Acknowledgements", time.Since(start).Seconds())
}
}()
msg := model.NewSyncMsg(sd.task.channelID)
msg.Acknowledgements = sd.acknowledgements
return scs.sendSyncMsgToRemote(msg, sd.rc, func(syncResp model.SyncResponse, errResp error) {
if len(syncResp.AcknowledgementErrors) != 0 {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Response indicates error for acknowledgement(s) sync",
mlog.String("channel_id", sd.task.channelID),
mlog.String("remote_id", sd.rc.RemoteId),
mlog.Array("acknowledgement_posts", syncResp.AcknowledgementErrors),
)
}
})
}
// sendStatusSyncData sends the collected status updates to the remote cluster.
func (scs *Service) sendStatusSyncData(sd *syncData) error {
msg := model.NewSyncMsg(sd.task.channelID)
msg.Statuses = sd.statuses
return scs.sendSyncMsgToRemote(msg, sd.rc, func(syncResp model.SyncResponse, errResp error) {
if len(syncResp.StatusErrors) != 0 {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Response indicates error from status(es) sync",
mlog.String("remote_id", sd.rc.RemoteId),
mlog.Array("user_ids", syncResp.StatusErrors),
)
for _, userID := range syncResp.StatusErrors {
scs.handleStatusError(userID, sd.task, sd.rc)
}
}
})
}
// sendProfileImageSyncData sends the collected user profile image updates to the remote cluster.
func (scs *Service) sendProfileImageSyncData(sd *syncData) {
for _, user := range sd.profileImages {
scs.syncProfileImage(user, sd.task.channelID, sd.rc)
}
}
// shouldUserSyncGlobal determines if a user needs to be synchronized globally.
// Compares user's update timestamp with the remote cluster's LastGlobalUserSyncAt.
func (scs *Service) shouldUserSyncGlobal(user *model.User, rc *model.RemoteCluster) (bool, error) {
// Don't sync users back to the remote cluster they originated from
if user.IsRemote() && user.GetRemoteID() == rc.RemoteId {
return false, nil
}
// Calculate latest update time for this user (profile or picture)
latestUserUpdateTime := max(user.LastPictureUpdate, user.UpdateAt)
// For initial sync (LastGlobalUserSyncAt=0), sync all users
// For incremental sync, only sync users updated after the last sync
if rc.LastGlobalUserSyncAt == 0 {
return true, nil
}
return latestUserUpdateTime > rc.LastGlobalUserSyncAt, nil
}
// syncAllUsers synchronizes all local users to a remote cluster.
// This is called when a connection with a remote cluster is established or when handling a global user sync task.
// Uses cursor-based approach with LastGlobalUserSyncAt to resume after interruptions.
func (scs *Service) syncAllUsers(rc *model.RemoteCluster) error {
// Check if feature is enabled
if !scs.server.Config().FeatureFlags.EnableSyncAllUsersForRemoteCluster {
return nil
}
if !rc.IsOnline() {
return fmt.Errorf("remote cluster %s is not online", rc.RemoteId)
}
// Start metrics tracking
metrics := scs.server.GetMetrics()
start := time.Now()
defer func() {
if metrics != nil {
metrics.IncrementSharedChannelsSyncCounter(rc.RemoteId)
metrics.ObserveSharedChannelsSyncCollectionDuration(rc.RemoteId, time.Since(start).Seconds())
}
}()
batchSize := scs.getGlobalUserSyncBatchSize()
// Create sync data with collected users
sd := &syncData{
task: syncTask{remoteID: rc.RemoteId},
rc: rc,
scr: &model.SharedChannelRemote{RemoteId: rc.RemoteId},
users: make(map[string]*model.User),
}
// Collect users to sync
users, latestTimestamp, _, hasMore, err := scs.collectUsersForGlobalSync(rc, batchSize)
if err != nil {
return err
}
// Exit early if no users to sync
if len(users) == 0 {
scs.server.Log().Log(mlog.LvlSharedChannelServiceDebug, "No users to sync for remote cluster",
mlog.String("remote_id", rc.RemoteId))
return nil
}
// Add users to sync data
sd.users = users
sd.GlobalUserSyncLastTimestamp = latestTimestamp
// Send the collected users to remote
if err := scs.sendUserSyncData(sd); err != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Error sending user batch during sync",
mlog.String("remote_id", rc.RemoteId),
mlog.Err(err),
)
return fmt.Errorf("error sending user batch during sync: %w", err)
}
// Schedule next batch if needed
if hasMore {
scs.scheduleNextUserSyncBatch(rc, latestTimestamp, batchSize, len(users))
}
return nil
}
// getGlobalUserSyncBatchSize returns the configured batch size for user syncing
func (scs *Service) getGlobalUserSyncBatchSize() int {
batchSize := MaxUsersPerSync
if scs.server.Config().ConnectedWorkspacesSettings.GlobalUserSyncBatchSize != nil {
configValue := *scs.server.Config().ConnectedWorkspacesSettings.GlobalUserSyncBatchSize
if configValue > 0 && configValue <= 200 {
batchSize = configValue
}
}
return batchSize
}
// collectUsersForGlobalSync fetches users that need to be synced to the remote
func (scs *Service) collectUsersForGlobalSync(rc *model.RemoteCluster, batchSize int) (map[string]*model.User, int64, int, bool, error) {
options := &model.UserGetOptions{
Page: 0,
PerPage: 100, // Database fetch batch size
Active: true,
Sort: "update_at_asc", // Order by UpdateAt ASC to ensure cursor consistency
}
// Only use UpdatedAfter for incremental syncs, not the initial sync
// This ensures users with UpdateAt=0 are included in the first sync
if rc.LastGlobalUserSyncAt > 0 {
options.UpdatedAfter = rc.LastGlobalUserSyncAt
}
users := make(map[string]*model.User)
latestTimestamp := rc.LastGlobalUserSyncAt
totalCount := 0
// Page through database results
for {
batch, err := scs.server.GetStore().User().GetAllProfiles(options)
if err != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Error fetching users for global sync",
mlog.String("remote_id", rc.RemoteId),
mlog.Err(err),
)
return nil, 0, 0, false, err
}
if len(batch) == 0 {
break // No more users to process
}
totalCount += len(batch)
// Process each user in this database page
for _, user := range batch {
// Stop if we've reached batch limit
if len(users) >= batchSize {
return users, latestTimestamp, totalCount, true, nil
}
// Skip users from remotes
if user.IsRemote() {
continue
}
// Check if user needs syncing
needsSync, _ := scs.shouldUserSyncGlobal(user, rc)
if !needsSync {
continue
}
// Add user and update cursor timestamp
users[user.Id] = user
userUpdateTime := max(user.UpdateAt, user.LastPictureUpdate)
if userUpdateTime > latestTimestamp {
latestTimestamp = userUpdateTime
}
}
// Check if we've reached the end of results
if len(batch) < options.PerPage {
break
}
// Move to next page
options.Page++
}
return users, latestTimestamp, totalCount, false, nil
}
// updateGlobalSyncCursor updates the LastGlobalUserSyncAt value for the remote cluster
func (scs *Service) updateGlobalSyncCursor(rc *model.RemoteCluster, newTimestamp int64) {
if err := scs.server.GetStore().RemoteCluster().UpdateLastGlobalUserSyncAt(rc.RemoteId, newTimestamp); err == nil {
rc.LastGlobalUserSyncAt = newTimestamp
} else {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Failed to update global user sync cursor",
mlog.String("remote_id", rc.RemoteId),
mlog.Err(err),
)
}
}
// scheduleNextUserSyncBatch creates a new task for the next batch of user sync
func (scs *Service) scheduleNextUserSyncBatch(rc *model.RemoteCluster, timestamp int64, batchSize, processedCount int) {
// Use timestamp as userID to make each batch task unique
// This prevents task ID collisions between different batches for the same remote
timestampStr := fmt.Sprintf("%d", timestamp)
task := newSyncTask("", timestampStr, rc.RemoteId, nil, nil)
task.schedule = time.Now().Add(NotifyMinimumDelay)
scs.addTask(task)
}
// sendSyncMsgToRemote synchronously sends the sync message to the remote cluster (or plugin).
func (scs *Service) sendSyncMsgToRemote(msg *model.SyncMsg, rc *model.RemoteCluster, f sendSyncMsgResultFunc) error {
rcs := scs.server.GetRemoteClusterService()
if rcs == nil {
return fmt.Errorf("cannot update remote cluster %s for channel id %s; Remote Cluster Service not enabled", rc.Name, msg.ChannelId)
}
if rc.IsPlugin() {
return scs.sendSyncMsgToPlugin(msg, rc, f)
}
b, err := json.Marshal(msg)
if err != nil {
return err
}
// Use appropriate topic based on message type
topic := TopicSync
if msg.ChannelId == "" && len(msg.Users) > 0 &&
len(msg.Posts) == 0 && len(msg.Reactions) == 0 &&
len(msg.Statuses) == 0 {
topic = TopicGlobalUserSync
}
rcMsg := model.NewRemoteClusterMsg(topic, b)
ctx, cancel := context.WithTimeout(context.Background(), remotecluster.SendTimeout)
defer cancel()
var wg sync.WaitGroup
wg.Add(1)
err = rcs.SendMsg(ctx, rcMsg, rc, func(rcMsg model.RemoteClusterMsg, rc *model.RemoteCluster, rcResp *remotecluster.Response, errResp error) {
defer wg.Done()
// Check for ErrChannelNotShared in the application response
if rcResp != nil && !rcResp.IsSuccess() && strings.Contains(rcResp.Err, ErrChannelNotShared.Error()) {
scs.handleChannelNotSharedError(msg, rc)
return
}
var syncResp model.SyncResponse
if errResp == nil {
if rcResp != nil && len(rcResp.Payload) > 0 {
if err2 := json.Unmarshal(rcResp.Payload, &syncResp); err2 != nil {
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Invalid sync msg response from remote cluster",
mlog.String("remote", rc.Name),
mlog.String("channel_id", msg.ChannelId),
mlog.Err(err2),
)
return
}
if f != nil {
f(syncResp, errResp)
}
} else {
// No error but response is nil or empty
scs.server.Log().Log(mlog.LvlSharedChannelServiceError, "Empty or nil response payload from remote cluster",
mlog.String("remote", rc.Name),
mlog.String("channel_id", msg.ChannelId),
)
}
}
})
wg.Wait()
return err
}
// sendSyncMsgToRemote synchronously sends the sync message to a plugin.
func (scs *Service) sendSyncMsgToPlugin(msg *model.SyncMsg, rc *model.RemoteCluster, f sendSyncMsgResultFunc) error {
syncResp, errResp := scs.app.OnSharedChannelsSyncMsg(msg, rc)
if f != nil {
f(syncResp, errResp)
}
return errResp
}
func sanitizeSyncData(sd *syncData) {
for id, user := range sd.users {
sd.users[id] = sanitizeUserForSync(user)
}
for id, user := range sd.profileImages {
sd.profileImages[id] = sanitizeUserForSync(user)
}
}
// handleChannelNotSharedError processes the case when a remote indicates a channel
// is no longer shared. It removes the remote from the shared channel locally and,
// if it was the last remote, completely unshares the channel.
func (scs *Service) handleChannelNotSharedError(msg *model.SyncMsg, rc *model.RemoteCluster) {
logger := scs.server.Log()
logger.Log(mlog.LvlSharedChannelServiceDebug, "Remote indicated channel is no longer shared; unsharing locally",
mlog.String("remote", rc.Name),
mlog.String("channel_id", msg.ChannelId),
)
// Get the SharedChannelRemote record for this channel and remote
scr, getErr := scs.server.GetStore().SharedChannel().GetRemoteByIds(msg.ChannelId, rc.RemoteId)
if getErr != nil {
logger.Log(mlog.LvlSharedChannelServiceError, "Failed to get shared channel remote",
mlog.String("remote", rc.Name),
mlog.String("channel_id", msg.ChannelId),
mlog.Err(getErr),
)
return
}
// Get channel details for posting the system message
channel, channelErr := scs.server.GetStore().Channel().Get(msg.ChannelId, true)
if channelErr != nil {
logger.Log(mlog.LvlSharedChannelServiceError, "Failed to get channel details",
mlog.String("remote", rc.Name),
mlog.String("channel_id", msg.ChannelId),
mlog.Err(channelErr),
)
return
}
// Post a system message to notify users that the channel is no longer shared with this remote
scs.postUnshareNotification(msg.ChannelId, scr.CreatorId, channel, rc)
if err := scs.UninviteRemoteFromChannel(msg.ChannelId, rc.RemoteId); err != nil {
logger.Log(mlog.LvlSharedChannelServiceError, "Failed to uninvite remote from shared channel", mlog.Err(err))
return
}
}