Skip to content

Commit

Permalink
Merge pull request #1 from segmentio/oscb/concurrencyFix
Browse files Browse the repository at this point in the history
fix: change the concurrency queue locking mechanism, example readme
  • Loading branch information
MichaelGHSeg authored Jun 28, 2023
2 parents 1e4d686 + 21519da commit 505ac1e
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 13 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Some destination plugins might not support all platform functionality. Refer to
- [Custom Logging](#custom-logging)
- [Handling errors](#handling-errors)
- [Reporting errors from plugins](#reporting-errors-from-plugins)
- [Example App](#example-app)
- [Contributing](#contributing)
- [Code of Conduct](#code-of-conduct)
- [License](#license)
Expand Down Expand Up @@ -569,6 +570,10 @@ try {
}
```

## Example App

See the [example app](./example/README.md) to check a full test app of how to integrate Analytics-Flutter into your own Flutter app.

## Contributing

See the [contributing guide](CONTRIBUTING.md) to learn how to contribute to the repository and the development workflow.
Expand Down
16 changes: 16 additions & 0 deletions example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,19 @@ A few resources to get you started if this is your first Flutter project:
For help getting started with Flutter development, view the
[online documentation](https://docs.flutter.dev/), which offers tutorials,
samples, guidance on mobile development, and a full API reference.

## Running the application

1. Make sure you have [Flutter SDK](https://docs.flutter.dev/get-started/install) installed.

2. The example app showcases the Firebase integration, so you will need to get your own Firebase Project. Create an account and a new project [here](https://firebase.google.com/).

3. Follow the instructions to add Firebase to your Flutter app.
1. It will require you to install [Firebase CLI](https://firebase.google.com/docs/cli?hl=en&authuser=1#install_the_firebase_cli).
2. At the root of the example app, run the `flutterfire configure --project={your-project-id}` command.
3. You can skip the step for "Initialize Firebase and add plugins"

4. On your Segment Workspace create your own [Flutter source](https://app.segment.com/{workspace-name}/sources/setup/flutter)
5. Set your Segment `WriteKey` in [`config.dart`](https://github.com/segmentio/analytics_flutter/blob/7a9c1f92d59b3520b9d1029045be6d80eaf1bad5/example/lib/config.dart#L1)
6. Run `flutter run` on the example

17 changes: 4 additions & 13 deletions packages/core/lib/timeline.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import 'package:analytics/errors.dart';
import 'package:analytics/event.dart';
import 'package:analytics/plugin.dart';
import 'package:analytics/logger.dart';
import 'package:analytics/utils/queue.dart';

typedef TimelinePlugins = Map<PluginType, List<Plugin>>;
typedef PluginClosure = void Function(Plugin);

class Timeline {
final TimelinePlugins _plugins = {};
Future<List<RawEvent?>>? _beforeFuture;
List<Future<RawEvent?>> _beforeQueue = [];
final _queue = ConcurrencyQueue<RawEvent?>();

List<Plugin> getPlugins(PluginType? ofType) {
if (ofType != null) {
Expand Down Expand Up @@ -69,19 +69,10 @@ class Timeline {
// apply .before first, ensuring all .before phases for all events triggered
// in a synchronous block are finished before moving onto the enrichment phase

final index = _beforeQueue.length;
_beforeQueue.add(applyPlugins(PluginType.before, incomingEvent));

_beforeFuture ??= Future.delayed(const Duration(microseconds: 1), () async {
final thisBeforeFutures = _beforeQueue;
_beforeQueue = [];
_beforeFuture = null;
return await Future.wait(thisBeforeFutures);
final beforeResult = await _queue.enqueue(() async {
return await applyPlugins(PluginType.before, incomingEvent);
});

final beforeResults = await _beforeFuture!;
final beforeResult = beforeResults[index];

if (beforeResult == null) {
return null;
}
Expand Down
55 changes: 55 additions & 0 deletions packages/core/lib/utils/queue.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import 'dart:async';
import 'dart:collection';

import 'package:flutter/foundation.dart';
import 'package:uuid/uuid.dart';

class Reducer<T> {
final Future<T> Function() operation;
final Completer<T> completer = Completer<T>();

Reducer({required this.operation});
}

class ConcurrencyQueue<T> {
final List<Reducer<T>> _queue = [];
bool _lock = false;

bool get _isLocked => (_lock == true);

bool _tryUnlock(Reducer<T> op) {
// We process and lock right away if the file is unlocked
if (!_isLocked) {
_lock = true;
_process(op);
return true;
}
return false;
}

Future<T> _process(Reducer<T> op) async {
final result = await op.operation();
// The actual operation future is resolved here so the caller doesn't await for the full queue
op.completer.complete(result);
_lock = false;

// When the process completes we trigger the next one right away if there's already a queue
if (_queue.isNotEmpty) {
final nextOp = _queue.first;
if (_tryUnlock(nextOp)) {
_queue.removeAt(0);
}
}
return result;
}

Future<T> enqueue(Future<T> Function() fun) async {
final op = Reducer<T>(operation: fun);

if (!_tryUnlock(op)) {
_queue.add(op);
}

return op.completer.future;
}
}
35 changes: 35 additions & 0 deletions packages/core/test/utils/queue_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import 'package:analytics/utils/queue.dart';
import 'package:flutter_test/flutter_test.dart';

class SimpleState {
String one;
String two;

SimpleState({required this.one, required this.two});
}

void main() {
test('handles multiple operations in a sync queue', () async {
final queue = ConcurrencyQueue<int>();
var counter = 0;

final future1 =
queue.enqueue(() => Future.delayed(const Duration(seconds: 2), () {
counter++;
return counter;
}));

final future2 =
queue.enqueue(() => Future.delayed(const Duration(seconds: 1), () {
counter++;
return counter;
}));

// Future2 will execute after future1 regardless if we await for it first
final result2 = await future2;
expect(result2, 2);

final result1 = await future1;
expect(result1, 1);
});
}

0 comments on commit 505ac1e

Please sign in to comment.