From 9e2e7cfd05c38a26fecd17222d32c6dd0c158911 Mon Sep 17 00:00:00 2001 From: Oscar Bazaldua <511911+oscb@users.noreply.github.com> Date: Tue, 9 May 2023 10:44:32 -0700 Subject: [PATCH 1/2] fix: change the concurrency queue locking mechanism, example readme --- README.md | 5 +++ example/README.md | 16 +++++++ packages/core/lib/timeline.dart | 17 ++------ packages/core/lib/utils/queue.dart | 53 ++++++++++++++++++++++++ packages/core/test/utils/queue_test.dart | 35 ++++++++++++++++ 5 files changed, 113 insertions(+), 13 deletions(-) create mode 100644 packages/core/lib/utils/queue.dart create mode 100644 packages/core/test/utils/queue_test.dart diff --git a/README.md b/README.md index 82d8aac..963590a 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,7 @@ Some destination plugins might not support all platform functionality. Refer to - [Custom Logging](#custom-logging) - [Handling errors](#handling-errors) - [Reporting errors from plugins](#reporting-errors-from-plugins) + - [Example App](#example-app) - [Contributing](#contributing) - [Code of Conduct](#code-of-conduct) - [License](#license) @@ -569,6 +570,10 @@ try { } ``` +## Example App + +See the [example app](./example/README.md) to check a full test app of how to integrate Analytics-Flutter into your own Flutter app. + ## Contributing See the [contributing guide](CONTRIBUTING.md) to learn how to contribute to the repository and the development workflow. diff --git a/example/README.md b/example/README.md index b9f00cf..49e23b0 100644 --- a/example/README.md +++ b/example/README.md @@ -14,3 +14,19 @@ A few resources to get you started if this is your first Flutter project: For help getting started with Flutter development, view the [online documentation](https://docs.flutter.dev/), which offers tutorials, samples, guidance on mobile development, and a full API reference. + +## Running the application + +1. Make sure you have [Flutter SDK](https://docs.flutter.dev/get-started/install) installed. + +2. The example app showcases the Firebase integration, so you will need to get your own Firebase Project. Create an account and a new project [here](https://firebase.google.com/). + +3. Follow the instructions to add Firebase to your Flutter app. + 1. It will require you to install [Firebase CLI](https://firebase.google.com/docs/cli?hl=en&authuser=1#install_the_firebase_cli). + 2. At the root of the example app, run the `flutterfire configure --project={your-project-id}` command. + 3. You can skip the step for "Initialize Firebase and add plugins" + +4. On your Segment Workspace create your own [Flutter source](https://app.segment.com/{workspace-name}/sources/setup/flutter) +5. Set your Segment `WriteKey` in [`config.dart`](https://github.com/segmentio/analytics_flutter/blob/7a9c1f92d59b3520b9d1029045be6d80eaf1bad5/example/lib/config.dart#L1) +6. Run `flutter run` on the example + diff --git a/packages/core/lib/timeline.dart b/packages/core/lib/timeline.dart index f4ecbd5..59895a6 100644 --- a/packages/core/lib/timeline.dart +++ b/packages/core/lib/timeline.dart @@ -4,14 +4,14 @@ import 'package:analytics/errors.dart'; import 'package:analytics/event.dart'; import 'package:analytics/plugin.dart'; import 'package:analytics/logger.dart'; +import 'package:analytics/utils/queue.dart'; typedef TimelinePlugins = Map>; typedef PluginClosure = void Function(Plugin); class Timeline { final TimelinePlugins _plugins = {}; - Future>? _beforeFuture; - List> _beforeQueue = []; + final _queue = ConcurrencyQueue(); List getPlugins(PluginType? ofType) { if (ofType != null) { @@ -69,19 +69,10 @@ class Timeline { // apply .before first, ensuring all .before phases for all events triggered // in a synchronous block are finished before moving onto the enrichment phase - final index = _beforeQueue.length; - _beforeQueue.add(applyPlugins(PluginType.before, incomingEvent)); - - _beforeFuture ??= Future.delayed(const Duration(microseconds: 1), () async { - final thisBeforeFutures = _beforeQueue; - _beforeQueue = []; - _beforeFuture = null; - return await Future.wait(thisBeforeFutures); + final beforeResult = await _queue.enqueue(() async { + return await applyPlugins(PluginType.before, incomingEvent); }); - final beforeResults = await _beforeFuture!; - final beforeResult = beforeResults[index]; - if (beforeResult == null) { return null; } diff --git a/packages/core/lib/utils/queue.dart b/packages/core/lib/utils/queue.dart new file mode 100644 index 0000000..7cefbbb --- /dev/null +++ b/packages/core/lib/utils/queue.dart @@ -0,0 +1,53 @@ +import 'dart:async'; +import 'dart:collection'; + +import 'package:flutter/foundation.dart'; +import 'package:uuid/uuid.dart'; + +class Reducer { + final Future Function() operation; + final Completer completer = Completer(); + + Reducer({required this.operation}); +} + +class ConcurrencyQueue { + final List> _queue = []; + bool _lock = false; + + bool get _isLocked => (_lock == true); + + bool _tryUnlock(Reducer op) { + // If the file is unlo + if (!_isLocked) { + _lock = true; + _process(op); + return true; + } + return false; + } + + Future _process(Reducer op) async { + final result = await op.operation(); + op.completer.complete(result); + _lock = false; + + if (_queue.isNotEmpty) { + final nextOp = _queue.first; + if (_tryUnlock(nextOp)) { + _queue.removeAt(0); + } + } + return result; + } + + Future enqueue(Future Function() fun) async { + final op = Reducer(operation: fun); + + if (!_tryUnlock(op)) { + _queue.add(op); + } + + return op.completer.future; + } +} diff --git a/packages/core/test/utils/queue_test.dart b/packages/core/test/utils/queue_test.dart new file mode 100644 index 0000000..c0aff36 --- /dev/null +++ b/packages/core/test/utils/queue_test.dart @@ -0,0 +1,35 @@ +import 'package:analytics/utils/queue.dart'; +import 'package:flutter_test/flutter_test.dart'; + +class SimpleState { + String one; + String two; + + SimpleState({required this.one, required this.two}); +} + +void main() { + test('handles multiple operations in a sync queue', () async { + final queue = ConcurrencyQueue(); + var counter = 0; + + final future1 = + queue.enqueue(() => Future.delayed(const Duration(seconds: 2), () { + counter++; + return counter; + })); + + final future2 = + queue.enqueue(() => Future.delayed(const Duration(seconds: 1), () { + counter++; + return counter; + })); + + // Future2 will execute after future1 regardless if we await for it first + final result2 = await future2; + expect(result2, 2); + + final result1 = await future1; + expect(result1, 1); + }); +} From 21519da8b1ee9c8e1ad102f447efe7dd4184bc20 Mon Sep 17 00:00:00 2001 From: Oscar Bazaldua <511911+oscb@users.noreply.github.com> Date: Tue, 9 May 2023 10:57:01 -0700 Subject: [PATCH 2/2] chore: add comments --- packages/core/lib/utils/queue.dart | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/core/lib/utils/queue.dart b/packages/core/lib/utils/queue.dart index 7cefbbb..b51f362 100644 --- a/packages/core/lib/utils/queue.dart +++ b/packages/core/lib/utils/queue.dart @@ -18,7 +18,7 @@ class ConcurrencyQueue { bool get _isLocked => (_lock == true); bool _tryUnlock(Reducer op) { - // If the file is unlo + // We process and lock right away if the file is unlocked if (!_isLocked) { _lock = true; _process(op); @@ -29,9 +29,11 @@ class ConcurrencyQueue { Future _process(Reducer op) async { final result = await op.operation(); + // The actual operation future is resolved here so the caller doesn't await for the full queue op.completer.complete(result); _lock = false; + // When the process completes we trigger the next one right away if there's already a queue if (_queue.isNotEmpty) { final nextOp = _queue.first; if (_tryUnlock(nextOp)) {