Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: change the concurrency queue locking mechanism, example readme #1

Merged
merged 2 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Some destination plugins might not support all platform functionality. Refer to
- [Custom Logging](#custom-logging)
- [Handling errors](#handling-errors)
- [Reporting errors from plugins](#reporting-errors-from-plugins)
- [Example App](#example-app)
- [Contributing](#contributing)
- [Code of Conduct](#code-of-conduct)
- [License](#license)
Expand Down Expand Up @@ -569,6 +570,10 @@ try {
}
```

## Example App

See the [example app](./example/README.md) to check a full test app of how to integrate Analytics-Flutter into your own Flutter app.

## Contributing

See the [contributing guide](CONTRIBUTING.md) to learn how to contribute to the repository and the development workflow.
Expand Down
16 changes: 16 additions & 0 deletions example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,19 @@ A few resources to get you started if this is your first Flutter project:
For help getting started with Flutter development, view the
[online documentation](https://docs.flutter.dev/), which offers tutorials,
samples, guidance on mobile development, and a full API reference.

## Running the application

1. Make sure you have [Flutter SDK](https://docs.flutter.dev/get-started/install) installed.

2. The example app showcases the Firebase integration, so you will need to get your own Firebase Project. Create an account and a new project [here](https://firebase.google.com/).

3. Follow the instructions to add Firebase to your Flutter app.
1. It will require you to install [Firebase CLI](https://firebase.google.com/docs/cli?hl=en&authuser=1#install_the_firebase_cli).
2. At the root of the example app, run the `flutterfire configure --project={your-project-id}` command.
3. You can skip the step for "Initialize Firebase and add plugins"

4. On your Segment Workspace create your own [Flutter source](https://app.segment.com/{workspace-name}/sources/setup/flutter)
5. Set your Segment `WriteKey` in [`config.dart`](https://github.com/segmentio/analytics_flutter/blob/7a9c1f92d59b3520b9d1029045be6d80eaf1bad5/example/lib/config.dart#L1)
6. Run `flutter run` on the example

17 changes: 4 additions & 13 deletions packages/core/lib/timeline.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import 'package:analytics/errors.dart';
import 'package:analytics/event.dart';
import 'package:analytics/plugin.dart';
import 'package:analytics/logger.dart';
import 'package:analytics/utils/queue.dart';

typedef TimelinePlugins = Map<PluginType, List<Plugin>>;
typedef PluginClosure = void Function(Plugin);

class Timeline {
final TimelinePlugins _plugins = {};
Future<List<RawEvent?>>? _beforeFuture;
List<Future<RawEvent?>> _beforeQueue = [];
final _queue = ConcurrencyQueue<RawEvent?>();

List<Plugin> getPlugins(PluginType? ofType) {
if (ofType != null) {
Expand Down Expand Up @@ -69,19 +69,10 @@ class Timeline {
// apply .before first, ensuring all .before phases for all events triggered
// in a synchronous block are finished before moving onto the enrichment phase

final index = _beforeQueue.length;
_beforeQueue.add(applyPlugins(PluginType.before, incomingEvent));

_beforeFuture ??= Future.delayed(const Duration(microseconds: 1), () async {
final thisBeforeFutures = _beforeQueue;
_beforeQueue = [];
_beforeFuture = null;
return await Future.wait(thisBeforeFutures);
final beforeResult = await _queue.enqueue(() async {
return await applyPlugins(PluginType.before, incomingEvent);
});

final beforeResults = await _beforeFuture!;
final beforeResult = beforeResults[index];

if (beforeResult == null) {
return null;
}
Expand Down
55 changes: 55 additions & 0 deletions packages/core/lib/utils/queue.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import 'dart:async';
import 'dart:collection';

import 'package:flutter/foundation.dart';
import 'package:uuid/uuid.dart';

class Reducer<T> {
final Future<T> Function() operation;
final Completer<T> completer = Completer<T>();

Reducer({required this.operation});
}

class ConcurrencyQueue<T> {
final List<Reducer<T>> _queue = [];
bool _lock = false;

bool get _isLocked => (_lock == true);

bool _tryUnlock(Reducer<T> op) {
// We process and lock right away if the file is unlocked
if (!_isLocked) {
_lock = true;
_process(op);
return true;
}
return false;
}

Future<T> _process(Reducer<T> op) async {
final result = await op.operation();
// The actual operation future is resolved here so the caller doesn't await for the full queue
op.completer.complete(result);
_lock = false;

// When the process completes we trigger the next one right away if there's already a queue
if (_queue.isNotEmpty) {
final nextOp = _queue.first;
if (_tryUnlock(nextOp)) {
_queue.removeAt(0);
}
}
return result;
}

Future<T> enqueue(Future<T> Function() fun) async {
final op = Reducer<T>(operation: fun);

if (!_tryUnlock(op)) {
_queue.add(op);
}

return op.completer.future;
}
}
35 changes: 35 additions & 0 deletions packages/core/test/utils/queue_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import 'package:analytics/utils/queue.dart';
import 'package:flutter_test/flutter_test.dart';

class SimpleState {
String one;
String two;

SimpleState({required this.one, required this.two});
}

void main() {
test('handles multiple operations in a sync queue', () async {
final queue = ConcurrencyQueue<int>();
var counter = 0;

final future1 =
queue.enqueue(() => Future.delayed(const Duration(seconds: 2), () {
counter++;
return counter;
}));

final future2 =
queue.enqueue(() => Future.delayed(const Duration(seconds: 1), () {
counter++;
return counter;
}));

// Future2 will execute after future1 regardless if we await for it first
final result2 = await future2;
expect(result2, 2);

final result1 = await future1;
expect(result1, 1);
});
}