-
Notifications
You must be signed in to change notification settings - Fork 75
GO-3046: Files: New uploading / removing queue #1007
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GO-3046: Files: New uploading / removing queue #1007
Conversation
Coverage provided by https://github.com/seriousben/go-patch-cover-action |
It allows to simplify code and fix sync status troubles
core/queue/queue.go
Outdated
Key() string | ||
} | ||
|
||
type ItemWithOrder interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OrderedItem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
return nil | ||
} | ||
|
||
func (s *service) Run(_ context.Context) error { | ||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't block starting of application
core/queue/queue.go
Outdated
if err != nil { | ||
return err | ||
} | ||
return q.store(item) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would split all the badger parts into other components, maybe just provide an interface inside queue which would store the items, and remove them. And make this interface abstract
@coderabbitai review |
WalkthroughThe recent changes involve a significant overhaul and enhancement of file synchronization and management functionalities system-wide. These updates introduce new methods for file operations, refine error handling, and implement a persistent queue system to boost operational efficiency and reliability within the system. Changes
Recent Review DetailsConfiguration used: CodeRabbit UI Files selected for processing (1)
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 14
s.fileObjectMigrator.MigrateFiles(st, s.space, s.GetFileKeysSnapshot()) | ||
// Details in spaceview comes from Workspace object, so we don't need to migrate them | ||
if s.Type() != smartblock.SmartBlockTypeSpaceView { | ||
s.fileObjectMigrator.MigrateDetails(st, s.space, s.GetFileKeysSnapshot()) | ||
s.fileObjectMigrator.MigrateFileIdsInDetails(st, s.space) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider handling the return values of MigrateFiles
and MigrateFileIdsInDetails
to manage potential errors.
- s.fileObjectMigrator.MigrateFiles(st, s.space, s.GetFileKeysSnapshot())
+ if err := s.fileObjectMigrator.MigrateFiles(st, s.space, s.GetFileKeysSnapshot()); err != nil {
+ log.With(zap.Error(err)).Error("Failed to migrate files")
+ return nil, err
+ }
- s.fileObjectMigrator.MigrateFileIdsInDetails(st, s.space)
+ if err := s.fileObjectMigrator.MigrateFileIdsInDetails(st, s.space); err != nil {
+ log.With(zap.Error(err)).Error("Failed to migrate file IDs in details")
+ return nil, err
+ }
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
s.fileObjectMigrator.MigrateFiles(st, s.space, s.GetFileKeysSnapshot()) | |
// Details in spaceview comes from Workspace object, so we don't need to migrate them | |
if s.Type() != smartblock.SmartBlockTypeSpaceView { | |
s.fileObjectMigrator.MigrateDetails(st, s.space, s.GetFileKeysSnapshot()) | |
s.fileObjectMigrator.MigrateFileIdsInDetails(st, s.space) | |
if err := s.fileObjectMigrator.MigrateFiles(st, s.space, s.GetFileKeysSnapshot()); err != nil { | |
log.With(zap.Error(err)).Error("Failed to migrate files") | |
return nil, err | |
} | |
// Details in spaceview comes from Workspace object, so we don't need to migrate them | |
if s.Type() != smartblock.SmartBlockTypeSpaceView { | |
if err := s.fileObjectMigrator.MigrateFileIdsInDetails(st, s.space); err != nil { | |
log.With(zap.Error(err)).Error("Failed to migrate file IDs in details") | |
return nil, err | |
} |
go func() { | ||
err := s.ensureNotSyncedFilesAddedToQueue() | ||
if err != nil { | ||
log.Errorf("ensure not synced files added to queue: %v", err) | ||
} | ||
}() | ||
s.indexer.run() | ||
s.migrationQueue.Run() | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid blocking the main thread by ensuring that the migration queue and file sync operations are handled asynchronously.
- go func() {
- err := s.ensureNotSyncedFilesAddedToQueue()
- if err != nil {
- log.Errorf("ensure not synced files added to queue: %v", err)
- }
- }()
+ go s.ensureNotSyncedFilesAddedToQueue()
+ s.indexer.run()
+ s.migrationQueue.Run()
Refactor ensureNotSyncedFilesAddedToQueue
to handle errors internally instead of logging them in the goroutine.
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
go func() { | |
err := s.ensureNotSyncedFilesAddedToQueue() | |
if err != nil { | |
log.Errorf("ensure not synced files added to queue: %v", err) | |
} | |
}() | |
s.indexer.run() | |
s.migrationQueue.Run() | |
return nil | |
go s.ensureNotSyncedFilesAddedToQueue() | |
s.indexer.run() | |
s.migrationQueue.Run() | |
return nil |
func newInMemoryBadger(t *testing.T) *badger.DB { | ||
db, err := badger.Open(badger.DefaultOptions("").WithInMemory(true).WithLoggingLevel(badger.ERROR)) | ||
require.NoError(t, err) | ||
return db | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider handling potential errors from badger.Open
directly within newInMemoryBadger
.
- db, err := badger.Open(badger.DefaultOptions("").WithInMemory(true).WithLoggingLevel(badger.ERROR))
- require.NoError(t, err)
+ db, err := badger.Open(badger.DefaultOptions("").WithInMemory(true).WithLoggingLevel(badger.ERROR))
+ if err != nil {
+ t.Fatalf("Failed to open in-memory Badger DB: %v", err)
+ }
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
func newInMemoryBadger(t *testing.T) *badger.DB { | |
db, err := badger.Open(badger.DefaultOptions("").WithInMemory(true).WithLoggingLevel(badger.ERROR)) | |
require.NoError(t, err) | |
return db | |
} | |
func newInMemoryBadger(t *testing.T) *badger.DB { | |
db, err := badger.Open(badger.DefaultOptions("").WithInMemory(true).WithLoggingLevel(badger.ERROR)) | |
if err != nil { | |
t.Fatalf("Failed to open in-memory Badger DB: %v", err) | |
} | |
return db | |
} |
err := q.Add(&testItem{Id: "1", Timestamp: 1, Data: "data1"}) | ||
require.Error(t, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure error messages are informative for better debugging.
- require.Error(t, err)
+ require.Error(t, err, "Expected error when adding to a closed queue, but got none")
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
err := q.Add(&testItem{Id: "1", Timestamp: 1, Data: "data1"}) | |
require.Error(t, err) | |
err := q.Add(&testItem{Id: "1", Timestamp: 1, Data: "data1"}) | |
require.Error(t, err, "Expected error when adding to a closed queue, but got none") |
t.Fatal("handler not called") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Increase the timeout to allow more time for the handler to be called in slow environments.
- case <-time.After(50 * time.Millisecond):
+ case <-time.After(100 * time.Millisecond):
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
t.Fatal("handler not called") | |
} | |
case <-time.After(100 * time.Millisecond): | |
t.Fatal("handler not called") | |
} |
func TestDeleteFile(t *testing.T) { | ||
t.Run("if all is ok, delete file", func(t *testing.T) { | ||
fx := newFixture(t, 1024*1024*1024) | ||
defer fx.Finish(t) | ||
spaceId := "spaceId" | ||
|
||
fileId, _ := fx.givenFileAddedToDAG(t) | ||
fx.givenFileUploaded(t, spaceId, fileId) | ||
|
||
err := fx.DeleteFile("objectId", domain.FullFileId{SpaceId: spaceId, FileId: fileId}) | ||
require.NoError(t, err) | ||
|
||
fx.waitEmptyQueue(t, fx.deletionQueue, time.Second*1) | ||
|
||
_, err = fx.rpcStore.FilesInfo(ctx, spaceId, fileId) | ||
require.Error(t, err) | ||
}) | ||
|
||
t.Run("with error while deleting, add to retry queue", func(t *testing.T) { | ||
fx := newFixture(t, 1024*1024*1024) | ||
defer fx.Finish(t) | ||
spaceId := "spaceId" | ||
|
||
testFileId := domain.FileId("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku") | ||
|
||
// Just try to delete missing file, in-memory RPC store will return error | ||
err := fx.DeleteFile("objectId", domain.FullFileId{SpaceId: spaceId, FileId: testFileId}) | ||
require.NoError(t, err) | ||
|
||
fx.waitEmptyQueue(t, fx.deletionQueue, time.Second*1) | ||
|
||
fx.waitCondition(t, 100*time.Millisecond, func() bool { | ||
return fx.retryDeletionQueue.Len() == 1 || fx.retryDeletionQueue.NumProcessedItems() > 0 | ||
}) | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added tests for file deletion scenarios. Ensure that these tests cover all relevant cases and consider adding more scenarios if needed.
Would you like me to help expand these tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@deff7, based on your request to expand the test scenarios for file deletion, I've drafted additional tests that cover a variety of cases which might occur in real-world scenarios. Here are the additional tests:
func TestDeleteFileWithInsufficientPermissions(t *testing.T) {
t.Run("error due to insufficient permissions", func(t *testing.T) {
fx := newFixture(t, 1024*1024*1024)
defer fx.Finish(t)
spaceId := "spaceId"
fileId, _ := fx.givenFileAddedToDAG(t)
fx.givenFileUploaded(t, spaceId, fileId)
// Simulate insufficient permissions
fx.mockPermissions(t, spaceId, fileId, false)
err := fx.DeleteFile("objectId", domain.FullFileId{SpaceId: spaceId, FileId: fileId})
require.Error(t, err)
fx.assertFileNotDeleted(t, spaceId, fileId)
})
}
func TestDeleteFileDuringNetworkFailure(t *testing.T) {
t.Run("network failure during deletion", func(t *testing.T) {
fx := newFixture(t, 1024*1024*1024)
defer fx.Finish(t)
spaceId := "spaceId"
fileId, _ := fx.givenFileAddedToDAG(t)
fx.givenFileUploaded(t, spaceId, fileId)
// Simulate network failure
fx.simulateNetworkFailure()
err := fx.DeleteFile("objectId", domain.FullFileId{SpaceId: spaceId, FileId: fileId})
require.Error(t, err)
// Ensure the file is still present
fx.assertFileExists(t, spaceId, fileId)
})
}
func TestConcurrentFileDeletion(t *testing.T) {
t.Run("concurrent deletion requests", func(t *testing.T) {
fx := newFixture(t, 1024*1024*1024)
defer fx.Finish(t)
spaceId := "spaceId"
fileId, _ := fx.givenFileAddedToDAG(t)
fx.givenFileUploaded(t, spaceId, fileId)
// Launch concurrent deletion requests
errs := fx.launchConcurrentDeletions("objectId", domain.FullFileId{SpaceId: spaceId, FileId: fileId}, 5)
require.Contains(t, errs, nil) // Expect at least one success
fx.assertFileNotExists(t, spaceId, fileId)
})
}
These tests simulate more complex scenarios such as insufficient permissions, network failures, and concurrent deletions. Please review these additions and let me know if there are any other scenarios you would like to cover!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
s.uploadPingCh = make(chan struct{}) | ||
db, err := s.dbProvider.LocalStorage() | ||
if err != nil { | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case of err here fileSync.Close will panic on queues closing. Maybe check for nil in Close
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you, fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
AddFile(fileObjectId string, fileId domain.FullFileId, uploadedByUser, imported bool) (err error) | ||
UploadSynchronously(spaceId string, fileId domain.FileId) error | ||
OnUploadStarted(func(fileId domain.FileId) error) | ||
OnUploaded(func(fileId domain.FileId) error) | ||
OnLimited(func(fileId domain.FileId) error) | ||
RemoveFile(spaceId string, fileId domain.FileId) (err error) | ||
RemoveSynchronously(spaceId string, fileId domain.FileId) (err error) | ||
OnUploadStarted(StatusCallback) | ||
OnUploaded(StatusCallback) | ||
OnLimited(StatusCallback) | ||
DeleteFile(objectId string, fileId domain.FullFileId) (err error) | ||
DeleteFileSynchronously(fileId domain.FullFileId) (err error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure the implementation of DeleteFileSynchronously
as it is critical for synchronous file operations.
+ Implement the `DeleteFileSynchronously` method to handle synchronous file deletions.
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
AddFile(fileObjectId string, fileId domain.FullFileId, uploadedByUser, imported bool) (err error) | |
UploadSynchronously(spaceId string, fileId domain.FileId) error | |
OnUploadStarted(func(fileId domain.FileId) error) | |
OnUploaded(func(fileId domain.FileId) error) | |
OnLimited(func(fileId domain.FileId) error) | |
RemoveFile(spaceId string, fileId domain.FileId) (err error) | |
RemoveSynchronously(spaceId string, fileId domain.FileId) (err error) | |
OnUploadStarted(StatusCallback) | |
OnUploaded(StatusCallback) | |
OnLimited(StatusCallback) | |
DeleteFile(objectId string, fileId domain.FullFileId) (err error) | |
DeleteFileSynchronously(fileId domain.FullFileId) (err error) | |
AddFile(fileObjectId string, fileId domain.FullFileId, uploadedByUser, imported bool) (err error) | |
UploadSynchronously(spaceId string, fileId domain.FileId) error | |
OnUploadStarted(StatusCallback) | |
OnUploaded(StatusCallback) | |
OnLimited(StatusCallback) | |
DeleteFile(objectId string, fileId domain.FullFileId) (err error) | |
DeleteFileSynchronously(fileId domain.FullFileId) (err error) | |
Implement the `DeleteFileSynchronously` method to handle synchronous file deletions. |
UploadingQueue []string | ||
RetryUploadingQueue []string | ||
DeletionQueue []string | ||
RetryDeletionQueue []string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new queue structures introduced in QueueInfo
should be accompanied by comprehensive unit tests to verify their behavior under various scenarios.
Would you like me to help by writing some unit tests for these new queue structures?
func (s *fileSync) Init(a *app.App) (err error) { | ||
s.dbProvider = app.MustComponent[datastore.Datastore](a) | ||
s.rpcStore = a.MustComponent(rpcstore.CName).(rpcstore.Service).NewStore() | ||
s.dagService = a.MustComponent(fileservice.CName).(fileservice.FileService).DAGService() | ||
s.fileStore = app.MustComponent[filestore.FileStore](a) | ||
s.eventSender = app.MustComponent[event.Sender](a) | ||
s.removePingCh = make(chan struct{}) | ||
s.uploadPingCh = make(chan struct{}) | ||
db, err := s.dbProvider.LocalStorage() | ||
if err != nil { | ||
return | ||
} | ||
s.uploadingQueue = persistentqueue.New(persistentqueue.NewBadgerStorage(db, uploadingKeyPrefix, makeQueueItem), log.Logger, s.uploadingHandler) | ||
s.retryUploadingQueue = persistentqueue.New(persistentqueue.NewBadgerStorage(db, retryUploadingKeyPrefix, makeQueueItem), log.Logger, s.retryingHandler, persistentqueue.WithHandlerTickPeriod(loopTimeout)) | ||
s.deletionQueue = persistentqueue.New(persistentqueue.NewBadgerStorage(db, deletionKeyPrefix, makeQueueItem), log.Logger, s.deletionHandler) | ||
s.retryDeletionQueue = persistentqueue.New(persistentqueue.NewBadgerStorage(db, retryDeletionKeyPrefix, makeQueueItem), log.Logger, s.retryDeletionHandler, persistentqueue.WithHandlerTickPeriod(loopTimeout)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Init
method setup is complex and involves multiple components. Consider refactoring to improve error handling and robustness.
Consider introducing a helper function that consolidates error checks and handling in the Init
method to make the code cleaner and more maintainable.
s.uploadPingCh = make(chan struct{}) | ||
db, err := s.dbProvider.LocalStorage() | ||
if err != nil { | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the Init
method, ensure that fileSync.Close
checks for nil on queues before attempting to close them to prevent potential panics.
+ Add nil checks in the `Close` method before closing queues.
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
return | |
return |
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Documentation
Tests
Chores