Skip to content

GO-3046: Files: New uploading / removing queue #1007

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 35 commits into from
Apr 17, 2024

Conversation

deff7
Copy link
Member

@deff7 deff7 commented Mar 15, 2024

Summary by CodeRabbit

  • New Features

    • Introduced new methods and processes for file synchronization, including handling file deletion and upload limits.
    • Added functionality for file object migration, including services and testing for migration processes.
    • Implemented a persistent queue system for managing file synchronization tasks.
    • Updated file status handling to use a new custom status type for improved tracking.
    • Updated counter values and added new reindexing processes for file objects.
  • Bug Fixes

    • Enhanced error handling across various modules including file storage and identity management.
    • Fixed issues related to file ID validation and synchronization logic.
  • Refactor

    • Major refactoring of file synchronization logic to streamline operations and improve performance.
    • Adjusted the order of component registration during system bootstrap for optimized initialization.
  • Documentation

    • Updated test setups and assertions to align with new functionalities and error handling mechanisms.
  • Tests

    • Expanded test coverage for new functionalities in file handling and synchronization services.
    • Added tests for persistent queue operations and file migration scenarios.
  • Chores

    • Cleaned up imports and unused variables across several modules to maintain code cleanliness and efficiency.

Copy link

github-actions bot commented Mar 15, 2024

New Coverage 43.4% of statements
Patch Coverage 70.7% of changed statements (383/542)

Coverage provided by https://github.com/seriousben/go-patch-cover-action

Key() string
}

type ItemWithOrder interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OrderedItem?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

return nil
}

func (s *service) Run(_ context.Context) error {
go func() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't block starting of application

if err != nil {
return err
}
return q.store(item)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would split all the badger parts into other components, maybe just provide an interface inside queue which would store the items, and remove them. And make this interface abstract

@deff7
Copy link
Member Author

deff7 commented Apr 12, 2024

@coderabbitai review

Copy link

coderabbitai bot commented Apr 12, 2024

Walkthrough

The recent changes involve a significant overhaul and enhancement of file synchronization and management functionalities system-wide. These updates introduce new methods for file operations, refine error handling, and implement a persistent queue system to boost operational efficiency and reliability within the system.

Changes

File Path Change Summary
core/filestorage/filesync/filesync.go,
core/filestorage/filesync/mock_filesync/mock_FileSync.go
Modified FileSync interface and fileSync struct. Updated method signatures, added DeleteFile methods, and introduced persistent queues.
core/filestorage/filesync/stats.go Renamed receiver variable in fileSync struct, updated method calls and variable names.
core/syncstatus/filestatus.go Refactored file status handling logic, replaced constants with filesyncstatus.Status. Updated functions to use the new status type.
core/identity/identity_test.go Switched to using dataStoreProvider initialized with NewInMemory(), updated references throughout the test fixture setup.
core/indexer/reindex.go Updated counter values for reindexing file objects, introduced new reindexing process based on specific conditions.
util/testMock/file_service_mock.go Removed Exif method, adjusted method signatures for GetFileForWidth and GetOriginalFile in MockImage struct.

🐰✨📁✨🐰
A hop through code, a leap through files,
Syncing data across many miles.
With every push and every pull,
Our rabbit's work is never dull.
Celebrate the changes, for they refine,
Our digital burrow's grand design. 🌟🥕


Recent Review Details

Configuration used: CodeRabbit UI

Commits Files that changed from the base of the PR and between 0eb81a4 and 84efe8c.
Files selected for processing (1)
  • core/filestorage/filesync/filesync.go (5 hunks)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger a review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 14

Comment on lines +301 to +304
s.fileObjectMigrator.MigrateFiles(st, s.space, s.GetFileKeysSnapshot())
// Details in spaceview comes from Workspace object, so we don't need to migrate them
if s.Type() != smartblock.SmartBlockTypeSpaceView {
s.fileObjectMigrator.MigrateDetails(st, s.space, s.GetFileKeysSnapshot())
s.fileObjectMigrator.MigrateFileIdsInDetails(st, s.space)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider handling the return values of MigrateFiles and MigrateFileIdsInDetails to manage potential errors.

- s.fileObjectMigrator.MigrateFiles(st, s.space, s.GetFileKeysSnapshot())
+ if err := s.fileObjectMigrator.MigrateFiles(st, s.space, s.GetFileKeysSnapshot()); err != nil {
+     log.With(zap.Error(err)).Error("Failed to migrate files")
+     return nil, err
+ }

- s.fileObjectMigrator.MigrateFileIdsInDetails(st, s.space)
+ if err := s.fileObjectMigrator.MigrateFileIdsInDetails(st, s.space); err != nil {
+     log.With(zap.Error(err)).Error("Failed to migrate file IDs in details")
+     return nil, err
+ }

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
s.fileObjectMigrator.MigrateFiles(st, s.space, s.GetFileKeysSnapshot())
// Details in spaceview comes from Workspace object, so we don't need to migrate them
if s.Type() != smartblock.SmartBlockTypeSpaceView {
s.fileObjectMigrator.MigrateDetails(st, s.space, s.GetFileKeysSnapshot())
s.fileObjectMigrator.MigrateFileIdsInDetails(st, s.space)
if err := s.fileObjectMigrator.MigrateFiles(st, s.space, s.GetFileKeysSnapshot()); err != nil {
log.With(zap.Error(err)).Error("Failed to migrate files")
return nil, err
}
// Details in spaceview comes from Workspace object, so we don't need to migrate them
if s.Type() != smartblock.SmartBlockTypeSpaceView {
if err := s.fileObjectMigrator.MigrateFileIdsInDetails(st, s.space); err != nil {
log.With(zap.Error(err)).Error("Failed to migrate file IDs in details")
return nil, err
}

Comment on lines +125 to +133
go func() {
err := s.ensureNotSyncedFilesAddedToQueue()
if err != nil {
log.Errorf("ensure not synced files added to queue: %v", err)
}
}()
s.indexer.run()
s.migrationQueue.Run()
return nil
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid blocking the main thread by ensuring that the migration queue and file sync operations are handled asynchronously.

- go func() {
-     err := s.ensureNotSyncedFilesAddedToQueue()
-     if err != nil {
-         log.Errorf("ensure not synced files added to queue: %v", err)
-     }
- }()
+ go s.ensureNotSyncedFilesAddedToQueue()
+ s.indexer.run()
+ s.migrationQueue.Run()

Refactor ensureNotSyncedFilesAddedToQueue to handle errors internally instead of logging them in the goroutine.


Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
go func() {
err := s.ensureNotSyncedFilesAddedToQueue()
if err != nil {
log.Errorf("ensure not synced files added to queue: %v", err)
}
}()
s.indexer.run()
s.migrationQueue.Run()
return nil
go s.ensureNotSyncedFilesAddedToQueue()
s.indexer.run()
s.migrationQueue.Run()
return nil

Comment on lines +36 to +40
func newInMemoryBadger(t *testing.T) *badger.DB {
db, err := badger.Open(badger.DefaultOptions("").WithInMemory(true).WithLoggingLevel(badger.ERROR))
require.NoError(t, err)
return db
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider handling potential errors from badger.Open directly within newInMemoryBadger.

- db, err := badger.Open(badger.DefaultOptions("").WithInMemory(true).WithLoggingLevel(badger.ERROR))
- require.NoError(t, err)
+ db, err := badger.Open(badger.DefaultOptions("").WithInMemory(true).WithLoggingLevel(badger.ERROR))
+ if err != nil {
+   t.Fatalf("Failed to open in-memory Badger DB: %v", err)
+ }

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
func newInMemoryBadger(t *testing.T) *badger.DB {
db, err := badger.Open(badger.DefaultOptions("").WithInMemory(true).WithLoggingLevel(badger.ERROR))
require.NoError(t, err)
return db
}
func newInMemoryBadger(t *testing.T) *badger.DB {
db, err := badger.Open(badger.DefaultOptions("").WithInMemory(true).WithLoggingLevel(badger.ERROR))
if err != nil {
t.Fatalf("Failed to open in-memory Badger DB: %v", err)
}
return db
}

Comment on lines +77 to +78
err := q.Add(&testItem{Id: "1", Timestamp: 1, Data: "data1"})
require.Error(t, err)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure error messages are informative for better debugging.

- require.Error(t, err)
+ require.Error(t, err, "Expected error when adding to a closed queue, but got none")

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
err := q.Add(&testItem{Id: "1", Timestamp: 1, Data: "data1"})
require.Error(t, err)
err := q.Add(&testItem{Id: "1", Timestamp: 1, Data: "data1"})
require.Error(t, err, "Expected error when adding to a closed queue, but got none")

Comment on lines +169 to +170
t.Fatal("handler not called")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increase the timeout to allow more time for the handler to be called in slow environments.

- case <-time.After(50 * time.Millisecond):
+ case <-time.After(100 * time.Millisecond):

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
t.Fatal("handler not called")
}
case <-time.After(100 * time.Millisecond):
t.Fatal("handler not called")
}

Comment on lines +12 to +47
func TestDeleteFile(t *testing.T) {
t.Run("if all is ok, delete file", func(t *testing.T) {
fx := newFixture(t, 1024*1024*1024)
defer fx.Finish(t)
spaceId := "spaceId"

fileId, _ := fx.givenFileAddedToDAG(t)
fx.givenFileUploaded(t, spaceId, fileId)

err := fx.DeleteFile("objectId", domain.FullFileId{SpaceId: spaceId, FileId: fileId})
require.NoError(t, err)

fx.waitEmptyQueue(t, fx.deletionQueue, time.Second*1)

_, err = fx.rpcStore.FilesInfo(ctx, spaceId, fileId)
require.Error(t, err)
})

t.Run("with error while deleting, add to retry queue", func(t *testing.T) {
fx := newFixture(t, 1024*1024*1024)
defer fx.Finish(t)
spaceId := "spaceId"

testFileId := domain.FileId("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku")

// Just try to delete missing file, in-memory RPC store will return error
err := fx.DeleteFile("objectId", domain.FullFileId{SpaceId: spaceId, FileId: testFileId})
require.NoError(t, err)

fx.waitEmptyQueue(t, fx.deletionQueue, time.Second*1)

fx.waitCondition(t, 100*time.Millisecond, func() bool {
return fx.retryDeletionQueue.Len() == 1 || fx.retryDeletionQueue.NumProcessedItems() > 0
})
})
}
Copy link

@coderabbitai coderabbitai bot Apr 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests for file deletion scenarios. Ensure that these tests cover all relevant cases and consider adding more scenarios if needed.

Would you like me to help expand these tests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try it

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@deff7, based on your request to expand the test scenarios for file deletion, I've drafted additional tests that cover a variety of cases which might occur in real-world scenarios. Here are the additional tests:

func TestDeleteFileWithInsufficientPermissions(t *testing.T) {
	t.Run("error due to insufficient permissions", func(t *testing.T) {
		fx := newFixture(t, 1024*1024*1024)
		defer fx.Finish(t)
		spaceId := "spaceId"

		fileId, _ := fx.givenFileAddedToDAG(t)
		fx.givenFileUploaded(t, spaceId, fileId)

		// Simulate insufficient permissions
		fx.mockPermissions(t, spaceId, fileId, false)

		err := fx.DeleteFile("objectId", domain.FullFileId{SpaceId: spaceId, FileId: fileId})
		require.Error(t, err)

		fx.assertFileNotDeleted(t, spaceId, fileId)
	})
}

func TestDeleteFileDuringNetworkFailure(t *testing.T) {
	t.Run("network failure during deletion", func(t *testing.T) {
		fx := newFixture(t, 1024*1024*1024)
		defer fx.Finish(t)
		spaceId := "spaceId"

		fileId, _ := fx.givenFileAddedToDAG(t)
		fx.givenFileUploaded(t, spaceId, fileId)

		// Simulate network failure
		fx.simulateNetworkFailure()

		err := fx.DeleteFile("objectId", domain.FullFileId{SpaceId: spaceId, FileId: fileId})
		require.Error(t, err)

		// Ensure the file is still present
		fx.assertFileExists(t, spaceId, fileId)
	})
}

func TestConcurrentFileDeletion(t *testing.T) {
	t.Run("concurrent deletion requests", func(t *testing.T) {
		fx := newFixture(t, 1024*1024*1024)
		defer fx.Finish(t)
		spaceId := "spaceId"

		fileId, _ := fx.givenFileAddedToDAG(t)
		fx.givenFileUploaded(t, spaceId, fileId)

		// Launch concurrent deletion requests
		errs := fx.launchConcurrentDeletions("objectId", domain.FullFileId{SpaceId: spaceId, FileId: fileId}, 5)
		require.Contains(t, errs, nil) // Expect at least one success

		fx.assertFileNotExists(t, spaceId, fileId)
	})
}

These tests simulate more complex scenarios such as insufficient permissions, network failures, and concurrent deletions. Please review these additions and let me know if there are any other scenarios you would like to cover!

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

s.uploadPingCh = make(chan struct{})
db, err := s.dbProvider.LocalStorage()
if err != nil {
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of err here fileSync.Close will panic on queues closing. Maybe check for nil in Close

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you, fixed

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Comment on lines +35 to +41
AddFile(fileObjectId string, fileId domain.FullFileId, uploadedByUser, imported bool) (err error)
UploadSynchronously(spaceId string, fileId domain.FileId) error
OnUploadStarted(func(fileId domain.FileId) error)
OnUploaded(func(fileId domain.FileId) error)
OnLimited(func(fileId domain.FileId) error)
RemoveFile(spaceId string, fileId domain.FileId) (err error)
RemoveSynchronously(spaceId string, fileId domain.FileId) (err error)
OnUploadStarted(StatusCallback)
OnUploaded(StatusCallback)
OnLimited(StatusCallback)
DeleteFile(objectId string, fileId domain.FullFileId) (err error)
DeleteFileSynchronously(fileId domain.FullFileId) (err error)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure the implementation of DeleteFileSynchronously as it is critical for synchronous file operations.

+ Implement the `DeleteFileSynchronously` method to handle synchronous file deletions.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
AddFile(fileObjectId string, fileId domain.FullFileId, uploadedByUser, imported bool) (err error)
UploadSynchronously(spaceId string, fileId domain.FileId) error
OnUploadStarted(func(fileId domain.FileId) error)
OnUploaded(func(fileId domain.FileId) error)
OnLimited(func(fileId domain.FileId) error)
RemoveFile(spaceId string, fileId domain.FileId) (err error)
RemoveSynchronously(spaceId string, fileId domain.FileId) (err error)
OnUploadStarted(StatusCallback)
OnUploaded(StatusCallback)
OnLimited(StatusCallback)
DeleteFile(objectId string, fileId domain.FullFileId) (err error)
DeleteFileSynchronously(fileId domain.FullFileId) (err error)
AddFile(fileObjectId string, fileId domain.FullFileId, uploadedByUser, imported bool) (err error)
UploadSynchronously(spaceId string, fileId domain.FileId) error
OnUploadStarted(StatusCallback)
OnUploaded(StatusCallback)
OnLimited(StatusCallback)
DeleteFile(objectId string, fileId domain.FullFileId) (err error)
DeleteFileSynchronously(fileId domain.FullFileId) (err error)
Implement the `DeleteFileSynchronously` method to handle synchronous file deletions.

Comment on lines +55 to +58
UploadingQueue []string
RetryUploadingQueue []string
DeletionQueue []string
RetryDeletionQueue []string
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new queue structures introduced in QueueInfo should be accompanied by comprehensive unit tests to verify their behavior under various scenarios.

Would you like me to help by writing some unit tests for these new queue structures?

Comment on lines +93 to +108
func (s *fileSync) Init(a *app.App) (err error) {
s.dbProvider = app.MustComponent[datastore.Datastore](a)
s.rpcStore = a.MustComponent(rpcstore.CName).(rpcstore.Service).NewStore()
s.dagService = a.MustComponent(fileservice.CName).(fileservice.FileService).DAGService()
s.fileStore = app.MustComponent[filestore.FileStore](a)
s.eventSender = app.MustComponent[event.Sender](a)
s.removePingCh = make(chan struct{})
s.uploadPingCh = make(chan struct{})
db, err := s.dbProvider.LocalStorage()
if err != nil {
return
}
s.uploadingQueue = persistentqueue.New(persistentqueue.NewBadgerStorage(db, uploadingKeyPrefix, makeQueueItem), log.Logger, s.uploadingHandler)
s.retryUploadingQueue = persistentqueue.New(persistentqueue.NewBadgerStorage(db, retryUploadingKeyPrefix, makeQueueItem), log.Logger, s.retryingHandler, persistentqueue.WithHandlerTickPeriod(loopTimeout))
s.deletionQueue = persistentqueue.New(persistentqueue.NewBadgerStorage(db, deletionKeyPrefix, makeQueueItem), log.Logger, s.deletionHandler)
s.retryDeletionQueue = persistentqueue.New(persistentqueue.NewBadgerStorage(db, retryDeletionKeyPrefix, makeQueueItem), log.Logger, s.retryDeletionHandler, persistentqueue.WithHandlerTickPeriod(loopTimeout))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Init method setup is complex and involves multiple components. Consider refactoring to improve error handling and robustness.

Consider introducing a helper function that consolidates error checks and handling in the Init method to make the code cleaner and more maintainable.

s.uploadPingCh = make(chan struct{})
db, err := s.dbProvider.LocalStorage()
if err != nil {
return
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the Init method, ensure that fileSync.Close checks for nil on queues before attempting to close them to prevent potential panics.

+ Add nil checks in the `Close` method before closing queues.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
return
return

@fat-fellow fat-fellow merged commit cdc2dc6 into main Apr 17, 2024
@fat-fellow fat-fellow deleted the go-3046-file-sync-use-batcher-for-uploading-queue branch April 17, 2024 13:04
@github-actions github-actions bot locked and limited conversation to collaborators Apr 17, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants