Skip to content

Commit

Permalink
Selectively configure flushing in initial sync.
Browse files Browse the repository at this point in the history
  • Loading branch information
rkistner committed Nov 5, 2024
1 parent 84b9708 commit d5b48d2
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 37 deletions.
61 changes: 26 additions & 35 deletions packages/powersync/lib/src/bucket_storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,6 @@ class BucketStorage {
return rows.first['client_id'] as String;
}

Future<void> streamOp(String op) async {
await writeTransaction((tx) async {
await tx.execute(
'INSERT INTO powersync_operations(op, data) VALUES(?, ?)',
['stream', op]);
});
}

Future<void> saveSyncData(SyncDataBatch batch) async {
var count = 0;

Expand All @@ -65,7 +57,10 @@ class BucketStorage {
'buckets': [b]
}));
}
});
// No need to flush - the data is not directly visible to the user either way.
// We get major initial sync performance improvements with IndexedDB by
// not flushing here.
}, flush: false);
_compactCounter += count;
}

Expand All @@ -85,7 +80,8 @@ class BucketStorage {
await tx.execute(
'INSERT INTO powersync_operations(op, data) VALUES(?, ?)',
['delete_bucket', bucket]);
});
// No need to flush - not directly visible to the user
}, flush: false);

_pendingBucketDeletes = true;
}
Expand Down Expand Up @@ -125,7 +121,8 @@ class BucketStorage {
"UPDATE ps_buckets SET last_op = ? WHERE name = '\$local'",
[checkpoint.writeCheckpoint]);
}
});
// Not flushing here - the flush will happen in the next step
}, flush: false);

final valid = await updateObjectsFromBuckets(checkpoint);
if (!valid) {
Expand All @@ -150,7 +147,10 @@ class BucketStorage {
// can_update_local(db) == false
return false;
}
});
// Important to flush here.
// After this step, the synced data will be visible to the user,
// and we don't want that to be reverted.
}, flush: true);
}

Future<SyncLocalDatabaseResult> validateChecksums(
Expand All @@ -176,39 +176,25 @@ class BucketStorage {
}

Future<void> autoCompact() async {
// This is a no-op since powersync-sqlite-core v0.3.0

// 1. Delete buckets
await _deletePendingBuckets();

// 2. Clear REMOVE operations, only keeping PUT ones
await _clearRemoveOps();

// await _compactWal();
}

// ignore: unused_element
Future<void> _compactWal() async {
try {
await writeTransaction((tx) async {
await tx.execute('PRAGMA wal_checkpoint(TRUNCATE)');
});
} on SqliteException catch (e) {
// Ignore SQLITE_BUSY
if (e.resultCode == 5) {
// Ignore
} else if (e.resultCode == 6) {
// Ignore
}
}
}

Future<void> _deletePendingBuckets() async {
// This is a no-op since powersync-sqlite-core v0.3.0
if (_pendingBucketDeletes) {
// Executed once after start-up, and again when there are pending deletes.
await writeTransaction((tx) async {
await tx.execute(
'INSERT INTO powersync_operations(op, data) VALUES (?, ?)',
['delete_pending_buckets', '']);
});
// No need to flush - not directly visible to the user
}, flush: false);
_pendingBucketDeletes = false;
}
}
Expand All @@ -218,11 +204,13 @@ class BucketStorage {
return;
}

// This is a no-op since powersync-sqlite-core v0.3.0
await writeTransaction((tx) async {
await tx.execute(
'INSERT INTO powersync_operations(op, data) VALUES (?, ?)',
['clear_remove_ops', '']);
});
// No need to flush - not directly visible to the user
}, flush: false);
_compactCounter = 0;
}

Expand Down Expand Up @@ -267,7 +255,8 @@ class BucketStorage {
[opId]);

return true;
});
// Flush here - don't want to lose the write checkpoint updates.
}, flush: true);
}

Future<CrudEntry?> nextCrudItem() async {
Expand Down Expand Up @@ -313,7 +302,8 @@ class BucketStorage {
await tx.execute(
'UPDATE ps_buckets SET target_op = $maxOpId WHERE name=\'\$local\'');
}
});
// Flush here - don't want to lose the write checkpoint updates.
}, flush: true);
});
}

Expand All @@ -323,7 +313,8 @@ class BucketStorage {
/// concurrently.
Future<T> writeTransaction<T>(
Future<T> Function(SqliteWriteContext tx) callback,
{Duration? lockTimeout}) async {
{Duration? lockTimeout,
required bool flush}) async {
return _internalDb.writeTransaction(callback, lockTimeout: lockTimeout);
}
}
Expand Down
4 changes: 2 additions & 2 deletions packages/powersync/lib/src/web/sync_worker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import 'package:powersync/src/streaming_sync.dart';
import 'package:sqlite_async/web.dart';
import 'package:web/web.dart' hide RequestMode;

import '../bucket_storage.dart';
import 'sync_worker_protocol.dart';
import 'web_bucket_storage.dart';

final _logger = autoLogger;

Expand Down Expand Up @@ -258,7 +258,7 @@ class _SyncRunner {
: jsonDecode(syncParamsEncoded!) as Map<String, dynamic>;

sync = StreamingSyncImplementation(
adapter: BucketStorage(database),
adapter: WebBucketStorage(database),
credentialsCallback: client.channel.credentialsCallback,
invalidCredentialsCallback: client.channel.invalidCredentialsCallback,
uploadCrud: client.channel.uploadCrud,
Expand Down
20 changes: 20 additions & 0 deletions packages/powersync/lib/src/web/web_bucket_storage.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import 'package:powersync/sqlite_async.dart';
import 'package:powersync/src/bucket_storage.dart';
import 'package:sqlite_async/web.dart';

class WebBucketStorage extends BucketStorage {
final WebSqliteConnection _webDb;

WebBucketStorage(this._webDb) : super(_webDb);

@override

/// Override to implement the flush parameter for web.
Future<T> writeTransaction<T>(
Future<T> Function(SqliteWriteContext tx) callback,
{Duration? lockTimeout,
required bool flush}) async {
return _webDb.writeTransaction(callback,
lockTimeout: lockTimeout, flush: flush);
}
}

0 comments on commit d5b48d2

Please sign in to comment.