Skip to content

Commit

Permalink
[Feature] Add warning if crud transactions are not completed (#172)
Browse files Browse the repository at this point in the history
* Show warning if crud transactions are not completed

* Test on native for upload tests

* Fix crudMutex and uploading status

* Add back retry delay for crudMutex

* chore(release): publish packages
 - powersync@1.8.4
 - powersync_attachments_helper@0.6.8
  • Loading branch information
mugikhan authored Oct 2, 2024
1 parent 70b487e commit f17ea6f
Show file tree
Hide file tree
Showing 18 changed files with 200 additions and 49 deletions.
28 changes: 27 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,32 @@
All notable changes to this project will be documented in this file.
See [Conventional Commits](https://conventionalcommits.org) for commit guidelines.

## 2024-10-01

### Changes

---

Packages with breaking changes:

- There are no breaking changes in this release.

Packages with other changes:

- [`powersync` - `v1.8.4`](#powersync---v184)
- [`powersync_attachments_helper` - `v0.6.8`](#powersync_attachments_helper---v068)

---

#### `powersync` - `v1.8.4`

- **FEAT**: Added a warning if connector `uploadData` functions don't process CRUD items completely.

#### `powersync_attachments_helper` - `v0.6.8`

- Update a dependency to the latest release.


## 2024-09-30

### Changes
Expand All @@ -22,7 +48,7 @@ Packages with other changes:

#### `powersync` - `v1.8.3`

- **FIX**: Pass maxReaders parameter to `PowerSyncDatabase.withFactory()`
- **FIX**: Pass maxReaders parameter to `PowerSyncDatabase.withFactory()`.

#### `powersync_attachments_helper` - `v0.6.7`

Expand Down
2 changes: 1 addition & 1 deletion demos/django-todolist/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ environment:
dependencies:
flutter:
sdk: flutter
powersync: ^1.8.3
powersync: ^1.8.4
path_provider: ^2.1.1
path: ^1.8.3
logging: ^1.2.0
Expand Down
2 changes: 1 addition & 1 deletion demos/supabase-anonymous-auth/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ dependencies:
flutter:
sdk: flutter

powersync: ^1.8.3
powersync: ^1.8.4
path_provider: ^2.1.1
supabase_flutter: ^2.0.2
path: ^1.8.3
Expand Down
2 changes: 1 addition & 1 deletion demos/supabase-edge-function-auth/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ dependencies:
flutter:
sdk: flutter

powersync: ^1.8.3
powersync: ^1.8.4
path_provider: ^2.1.1
supabase_flutter: ^2.0.2
path: ^1.8.3
Expand Down
2 changes: 1 addition & 1 deletion demos/supabase-simple-chat/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ dependencies:

supabase_flutter: ^2.0.2
timeago: ^3.6.0
powersync: ^1.8.3
powersync: ^1.8.4
path_provider: ^2.1.1
path: ^1.8.3
logging: ^1.2.0
Expand Down
4 changes: 2 additions & 2 deletions demos/supabase-todolist-drift/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ environment:
dependencies:
flutter:
sdk: flutter
powersync_attachments_helper: ^0.6.7
powersync: ^1.8.3
powersync_attachments_helper: ^0.6.8
powersync: ^1.8.4
path_provider: ^2.1.1
supabase_flutter: ^2.0.1
path: ^1.8.3
Expand Down
2 changes: 1 addition & 1 deletion demos/supabase-todolist-optional-sync/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ environment:
dependencies:
flutter:
sdk: flutter
powersync: ^1.8.3
powersync: ^1.8.4
path_provider: ^2.1.1
supabase_flutter: ^2.0.1
path: ^1.8.3
Expand Down
4 changes: 2 additions & 2 deletions demos/supabase-todolist/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ environment:
dependencies:
flutter:
sdk: flutter
powersync_attachments_helper: ^0.6.7
powersync: ^1.8.3
powersync_attachments_helper: ^0.6.8
powersync: ^1.8.4
path_provider: ^2.1.1
supabase_flutter: ^2.0.1
path: ^1.8.3
Expand Down
6 changes: 5 additions & 1 deletion packages/powersync/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
## 1.8.4

- **FEAT**: Added a warning if connector `uploadData` functions don't process CRUD items completely.

## 1.8.3

- **FIX**: Pass maxReaders parameter to `PowerSyncDatabase.withFactory()`
- **FIX**: Pass maxReaders parameter to `PowerSyncDatabase.withFactory()`.

## 1.8.2

Expand Down
6 changes: 6 additions & 0 deletions packages/powersync/lib/src/bucket_storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,12 @@ class BucketStorage {
});
}

Future<CrudEntry?> nextCrudItem() async {
var next = await _internalDb
.getOptional('SELECT * FROM ps_crud ORDER BY id ASC LIMIT 1');
return next == null ? null : CrudEntry.fromRow(next);
}

Future<bool> hasCrud() async {
final anyData = await select('SELECT 1 FROM ps_crud LIMIT 1');
return anyData.isNotEmpty;
Expand Down
75 changes: 46 additions & 29 deletions packages/powersync/lib/src/streaming_sync.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import 'package:sqlite_async/mutex.dart';

import 'bucket_storage.dart';
import 'connector.dart';
import 'crud.dart';
import 'stream_utils.dart';
import 'sync_status.dart';
import 'sync_types.dart';
Expand Down Expand Up @@ -102,6 +103,10 @@ class StreamingSyncImplementation {
return _abort?.aborted ?? false;
}

bool get isConnected {
return lastStatus.connected;
}

Future<void> streamingSync() async {
try {
_abort = AbortController();
Expand Down Expand Up @@ -159,38 +164,50 @@ class StreamingSyncImplementation {
}

Future<void> uploadAllCrud() async {
while (true) {
try {
bool done = await uploadCrudBatch();
_updateStatus(uploadError: _noError);
if (done) {
break;
}
} catch (e, stacktrace) {
isolateLogger.warning('Data upload error', e, stacktrace);
_updateStatus(uploading: false, uploadError: e);
await Future.delayed(retryDelay);
}
}
_updateStatus(uploading: false);
}

Future<bool> uploadCrudBatch() async {
return crudMutex.lock(() async {
if ((await adapter.hasCrud())) {
// Keep track of the first item in the CRUD queue for the last `uploadCrud` iteration.
CrudEntry? checkedCrudItem;

while (true) {
_updateStatus(uploading: true);
await uploadCrud();
return false;
} else {
// This isolate is the only one triggering
final updated = await adapter.updateLocalTarget(() async {
return getWriteCheckpoint();
});
if (updated) {
_localPingController.add(null);
try {
// This is the first item in the FIFO CRUD queue.
CrudEntry? nextCrudItem = await adapter.nextCrudItem();
if (nextCrudItem != null) {
if (nextCrudItem.clientId == checkedCrudItem?.clientId) {
// This will force a higher log level than exceptions which are caught here.
isolateLogger.warning(
"""Potentially previously uploaded CRUD entries are still present in the upload queue.
Make sure to handle uploads and complete CRUD transactions or batches by calling and awaiting their [.complete()] method.
The next upload iteration will be delayed.""");
throw Exception(
'Delaying due to previously encountered CRUD item.');
}

checkedCrudItem = nextCrudItem;
await uploadCrud();
_updateStatus(uploadError: _noError);
} else {
// Uploading is completed
await adapter.updateLocalTarget(() => getWriteCheckpoint());
break;
}
} catch (e, stacktrace) {
checkedCrudItem = null;
isolateLogger.warning('Data upload error', e, stacktrace);
_updateStatus(uploading: false, uploadError: e);
await Future.delayed(retryDelay);
if (!isConnected) {
// Exit the upload loop if the sync stream is no longer connected
break;
}
isolateLogger.warning(
"Caught exception when uploading. Upload will retry after a delay",
e,
stacktrace);
} finally {
_updateStatus(uploading: false);
}

return true;
}
}, timeout: retryDelay);
}
Expand Down
2 changes: 1 addition & 1 deletion packages/powersync/lib/src/version.dart
Original file line number Diff line number Diff line change
@@ -1 +1 @@
const String libraryVersion = '1.8.3';
const String libraryVersion = '1.8.4';
2 changes: 1 addition & 1 deletion packages/powersync/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: powersync
version: 1.8.3
version: 1.8.4
homepage: https://powersync.com
repository: https://github.com/powersync-ja/powersync.dart
description: PowerSync Flutter SDK - sync engine for building local-first apps.
Expand Down
91 changes: 91 additions & 0 deletions packages/powersync/test/upload_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
@TestOn('!browser')

import 'package:powersync/powersync.dart';
import 'package:test/test.dart';

import 'test_server.dart';
import 'utils/abstract_test_utils.dart';
import 'utils/test_utils_impl.dart';

final testUtils = TestUtils();
const testId = "2290de4f-0488-4e50-abed-f8e8eb1d0b42";
const testId2 = "2290de4f-0488-4e50-abed-f8e8eb1d0b43";
const partialWarning =
'Potentially previously uploaded CRUD entries are still present';

class TestConnector extends PowerSyncBackendConnector {
final Function _fetchCredentials;
final Future<void> Function(PowerSyncDatabase database) _uploadData;

TestConnector(this._fetchCredentials, this._uploadData);

@override
Future<PowerSyncCredentials?> fetchCredentials() {
return _fetchCredentials();
}

@override
Future<void> uploadData(PowerSyncDatabase database) async {
return _uploadData(database);
}
}

void main() {
group('CRUD Tests', () {
late PowerSyncDatabase powersync;
late String path;

setUp(() async {
path = testUtils.dbPath();
await testUtils.cleanDb(path: path);
});

tearDown(() async {
// await powersync.disconnectAndClear();
await powersync.close();
});

test('should warn for missing upload operations in uploadData', () async {
var server = await createServer();

credentialsCallback() async {
return PowerSyncCredentials(
endpoint: server.endpoint,
token: 'token',
userId: 'userId',
);
}

uploadData(PowerSyncDatabase db) async {
// Do nothing
}

final records = <String>[];
final sub =
testWarningLogger.onRecord.listen((log) => records.add(log.message));

powersync =
await testUtils.setupPowerSync(path: path, logger: testWarningLogger);
powersync.retryDelay = Duration(milliseconds: 0);
var connector = TestConnector(credentialsCallback, uploadData);
powersync.connect(connector: connector);

// Create something with CRUD in it.
await powersync.execute(
'INSERT INTO assets(id, description) VALUES(?, ?)', [testId, 'test']);

// Wait for the uploadData to be called.
await Future.delayed(Duration(milliseconds: 100));

// Create something else with CRUD in it.
await powersync.execute(
'INSERT INTO assets(id, description) VALUES(?, ?)',
[testId2, 'test2']);

sub.cancel();

expect(records, hasLength(2));
expect(records, anyElement(contains(partialWarning)));
});
});
}
10 changes: 6 additions & 4 deletions packages/powersync/test/utils/abstract_test_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ const defaultSchema = schema;

final testLogger = _makeTestLogger();

Logger _makeTestLogger() {
final testWarningLogger = _makeTestLogger(level: Level.WARNING);

Logger _makeTestLogger({Level level = Level.ALL}) {
final logger = Logger.detached('PowerSync Tests');
logger.level = Level.ALL;
logger.level = level;
logger.onRecord.listen((record) {
print(
'[${record.loggerName}] ${record.level.name}: ${record.time}: ${record.message}');
Expand Down Expand Up @@ -70,9 +72,9 @@ abstract class AbstractTestUtils {

/// Creates a SqliteDatabaseConnection
Future<PowerSyncDatabase> setupPowerSync(
{String? path, Schema? schema}) async {
{String? path, Schema? schema, Logger? logger}) async {
final db = PowerSyncDatabase.withFactory(await testFactory(path: path),
schema: schema ?? defaultSchema, logger: testLogger);
schema: schema ?? defaultSchema, logger: logger ?? testLogger);
await db.initialize();
return db;
}
Expand Down
3 changes: 2 additions & 1 deletion packages/powersync/test/utils/web_test_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import 'dart:async';
import 'dart:html';

import 'package:js/js.dart';
import 'package:logging/logging.dart';
import 'package:powersync/powersync.dart';
import 'package:sqlite_async/sqlite3_common.dart';
import 'package:sqlite_async/sqlite_async.dart';
Expand Down Expand Up @@ -51,7 +52,7 @@ class TestUtils extends AbstractTestUtils {

@override
Future<PowerSyncDatabase> setupPowerSync(
{String? path, Schema? schema}) async {
{String? path, Schema? schema, Logger? logger}) async {
await _isInitialized;
return super.setupPowerSync(path: path, schema: schema);
}
Expand Down
4 changes: 4 additions & 0 deletions packages/powersync_attachments_helper/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 0.6.8

- Update a dependency to the latest release.

## 0.6.7

- Update a dependency to the latest release.
Expand Down
Loading

0 comments on commit f17ea6f

Please sign in to comment.