Skip to content

Commit

Permalink
Add crudThrottleTime to create a throttled update notification stream…
Browse files Browse the repository at this point in the history
… for sync worker
  • Loading branch information
mugikhan committed Oct 29, 2024
1 parent 6e61c38 commit acd3960
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 24 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ jobs:
tag="${{ github.ref_name }}"
body="Release $tag"
gh release create --draft "$tag" --title "$tag" --notes "$body" --generate-notes
gh release upload "${{ github.ref_name }}" packages/powersync/assets/powersync_db.worker.js
gh release upload "${{ github.ref_name }}" packages/powersync/assets/powersync_db.worker.js packages/powersync/assets/powersync_sync.worker.js
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ assets/*
powersync_db.worker.js
powersync_db.worker.js*
sqlite3.wasm
powersync_sync.worker.js
powersync_sync.worker.js*

#Core binaries
*.dylib
Expand Down
6 changes: 6 additions & 0 deletions packages/powersync/bin/setup_web.dart
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ void main(List<String> arguments) async {
final wasmPath = '${root.toFilePath()}$outputDir/sqlite3.wasm';

final workerPath = '${root.toFilePath()}$outputDir/powersync_db.worker.js';
final syncWorkerPath =
'${root.toFilePath()}$outputDir/powersync_sync.worker.js';

final packageConfigFile = File.fromUri(
root.resolve('.dart_tool/package_config.json'),
Expand Down Expand Up @@ -57,7 +59,11 @@ void main(List<String> arguments) async {
final workerUrl =
'https://github.com/powersync-ja/powersync.dart/releases/download/powersync-v$powersyncVersion/powersync_db.worker.js';

final syncWorkerUrl =
'https://github.com/powersync-ja/powersync.dart/releases/download/powersync-v$powersyncVersion/powersync_sync.worker.js';

await downloadFile(httpClient, workerUrl, workerPath);
await downloadFile(httpClient, syncWorkerUrl, syncWorkerPath);
}

final sqlitePackageName = 'sqlite3';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class PowerSyncDatabaseImpl
sync = await SyncWorkerHandle.start(
this,
connector,
crudThrottleTime.inMilliseconds,
Uri.base.resolve('/powersync_sync.worker.js'),
);
} catch (e) {
Expand Down
18 changes: 12 additions & 6 deletions packages/powersync/lib/src/web/sync_controller.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import 'sync_worker_protocol.dart';
class SyncWorkerHandle implements StreamingSync {
final PowerSyncDatabaseImpl _database;
final PowerSyncBackendConnector _connector;
final int _crudThrottleTimeMs;

late final WorkerCommunicationChannel _channel;

final StreamController<SyncStatus> _status = StreamController.broadcast();

SyncWorkerHandle._(this._database, this._connector, MessagePort sendToWorker,
SharedWorker worker) {
SyncWorkerHandle._(this._database, this._connector, this._crudThrottleTimeMs,
MessagePort sendToWorker, SharedWorker worker) {
_channel = WorkerCommunicationChannel(
port: sendToWorker,
errors: EventStreamProviders.errorEvent.forTarget(worker),
Expand Down Expand Up @@ -69,10 +70,14 @@ class SyncWorkerHandle implements StreamingSync {
});
}

static Future<SyncWorkerHandle> start(PowerSyncDatabaseImpl database,
PowerSyncBackendConnector connector, Uri workerUri) async {
static Future<SyncWorkerHandle> start(
PowerSyncDatabaseImpl database,
PowerSyncBackendConnector connector,
int crudThrottleTimeMs,
Uri workerUri) async {
final worker = SharedWorker(workerUri.toString().toJS);
final handle = SyncWorkerHandle._(database, connector, worker.port, worker);
final handle = SyncWorkerHandle._(
database, connector, crudThrottleTimeMs, worker.port, worker);

// Make sure that the worker is working, or throw immediately.
await handle._channel.ping();
Expand All @@ -95,6 +100,7 @@ class SyncWorkerHandle implements StreamingSync {

@override
Future<void> streamingSync() async {
await _channel.startSynchronization(_database.openFactory.path);
await _channel.startSynchronization(
_database.openFactory.path, _crudThrottleTimeMs);
}
}
34 changes: 25 additions & 9 deletions packages/powersync/lib/src/web/sync_worker.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/// This file needs to be compiled to JavaScript with the command
/// dart compile js -O4 packages/powersync/lib/src/web/sync_worker.worker.dart -o assets/db_worker.js
/// dart compile js -O4 packages/powersync/lib/src/web/sync_worker.dart -o assets/powersync_sync.worker.js
/// The output should then be included in each project's `web` directory
library;

Expand All @@ -9,12 +9,13 @@ import 'dart:js_interop';
import 'package:async/async.dart';
import 'package:fetch_client/fetch_client.dart';
import 'package:powersync/powersync.dart';
import 'package:powersync/sqlite_async.dart';
import 'package:powersync/src/database/powersync_db_mixin.dart';
import 'package:powersync/src/streaming_sync.dart';
import 'package:sqlite_async/web.dart';
import 'package:web/web.dart' hide RequestMode;

import '../bucket_storage.dart';
import '../database/powersync_db_mixin.dart';
import 'sync_worker_protocol.dart';

final _logger = autoLogger;
Expand All @@ -40,10 +41,10 @@ class _SyncWorker {
});
}

_SyncRunner referenceSyncTask(
String databaseIdentifier, _ConnectedClient client) {
_SyncRunner referenceSyncTask(String databaseIdentifier,
int crudThrottleTimeMs, _ConnectedClient client) {
return _requestedSyncTasks.putIfAbsent(databaseIdentifier, () {
return _SyncRunner(databaseIdentifier);
return _SyncRunner(databaseIdentifier, crudThrottleTimeMs);
})
..registerClient(client);
}
Expand All @@ -63,7 +64,8 @@ class _ConnectedClient {
switch (type) {
case SyncWorkerMessageType.startSynchronization:
final request = payload as StartSynchronization;
_runner = _worker.referenceSyncTask(request.databaseName, this);
_runner = _worker.referenceSyncTask(
request.databaseName, request.crudThrottleTimeMs, this);
return (JSObject(), null);
case SyncWorkerMessageType.abortSynchronization:
_runner?.unregisterClient(this);
Expand Down Expand Up @@ -103,6 +105,7 @@ class _ConnectedClient {

class _SyncRunner {
final String identifier;
final int crudThrottleTimeMs;

final StreamGroup<_RunnerEvent> _group = StreamGroup();
final StreamController<_RunnerEvent> _mainEvents = StreamController();
Expand All @@ -111,7 +114,7 @@ class _SyncRunner {
_ConnectedClient? databaseHost;
final connections = <_ConnectedClient>[];

_SyncRunner(this.identifier) {
_SyncRunner(this.identifier, this.crudThrottleTimeMs) {
_group.add(_mainEvents.stream);

Future(() async {
Expand Down Expand Up @@ -209,13 +212,26 @@ class _SyncRunner {
}
});

final tables = ['ps_crud'];
final crudThrottleTime = Duration(milliseconds: crudThrottleTimeMs);
Stream<UpdateNotification> crudStream =
powerSyncUpdateNotifications(Stream.empty());
if (database.updates != null) {
final filteredStream = database.updates!
.transform(UpdateNotification.filterTablesTransformer(tables));
crudStream = UpdateNotification.throttleStream(
filteredStream,
crudThrottleTime,
addOne: UpdateNotification.empty(),
);
}

sync = StreamingSyncImplementation(
adapter: BucketStorage(database),
credentialsCallback: client.channel.credentialsCallback,
invalidCredentialsCallback: client.channel.invalidCredentialsCallback,
uploadCrud: client.channel.uploadCrud,
updateStream: powerSyncUpdateNotifications(
database.updates ?? const Stream.empty()),
crudUpdateTriggerStream: crudStream,
retryDelay: Duration(seconds: 3),
client: FetchClient(mode: RequestMode.cors),
identifier: identifier,
Expand Down
10 changes: 8 additions & 2 deletions packages/powersync/lib/src/web/sync_worker_protocol.dart
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ extension type SyncWorkerMessage._(JSObject _) implements JSObject {
extension type StartSynchronization._(JSObject _) implements JSObject {
external factory StartSynchronization({
required String databaseName,
required int crudThrottleTimeMs,
required int requestId,
});

external String get databaseName;
external int get requestId;
external int get crudThrottleTimeMs;
}

@anonymous
Expand Down Expand Up @@ -313,11 +315,15 @@ final class WorkerCommunicationChannel {
await _numericRequest(SyncWorkerMessageType.ping);
}

Future<void> startSynchronization(String databaseName) async {
Future<void> startSynchronization(
String databaseName, int crudThrottleTimeMs) async {
final (id, completion) = _newRequest();
port.postMessage(SyncWorkerMessage(
type: SyncWorkerMessageType.startSynchronization.name,
payload: StartSynchronization(databaseName: databaseName, requestId: id),
payload: StartSynchronization(
databaseName: databaseName,
crudThrottleTimeMs: crudThrottleTimeMs,
requestId: id),
));
await completion;
}
Expand Down
38 changes: 32 additions & 6 deletions scripts/compile_webworker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,52 @@ Future<void> main() async {

/// The monorepo root assets directory
final workerFilename = 'powersync_db.worker.js';
final outputPath =
final dbWorkerOutputPath =
path.join(repoRoot, 'packages/powersync/assets/$workerFilename');

final workerSourcePath = path.join(
repoRoot, './packages/powersync/lib/src/web/powersync_db.worker.dart');

// And compile worker code
final process = await Process.run(
final dbWorkerProcess = await Process.run(
Platform.executable,
[
'compile',
'js',
'-o',
outputPath,
dbWorkerOutputPath,
'-O4',
workerSourcePath,
],
workingDirectory: cwd);

if (process.exitCode != 0) {
throw Exception('Could not compile worker: ${process.stdout.toString()}');
if (dbWorkerProcess.exitCode != 0) {
throw Exception(
'Could not compile db worker: ${dbWorkerProcess.stdout.toString()}');
}

final syncWorkerFilename = 'powersync_sync.worker.js';
final syncWorkerOutputPath =
path.join(repoRoot, 'packages/powersync/assets/$syncWorkerFilename');

final syncWorkerSourcePath =
path.join(repoRoot, './packages/powersync/lib/src/web/sync_worker.dart');

final syncWorkerProcess = await Process.run(
Platform.executable,
[
'compile',
'js',
'-o',
syncWorkerOutputPath,
'-O4',
syncWorkerSourcePath,
],
workingDirectory: cwd);

if (syncWorkerProcess.exitCode != 0) {
throw Exception(
'Could not compile sync worker: ${dbWorkerProcess.stdout.toString()}');
}

// Copy this to all demo apps web folders
Expand All @@ -44,6 +69,7 @@ Future<void> main() async {
continue;
}
final demoOutputPath = path.join(demoWebDir, workerFilename);
File(outputPath).copySync(demoOutputPath);
File(dbWorkerOutputPath).copySync(demoOutputPath);
File(syncWorkerOutputPath).copySync(demoOutputPath);
}
}

0 comments on commit acd3960

Please sign in to comment.