Skip to content

Commit

Permalink
REFACTOR(rx-document) use CustomIdleQueue for atomic updates to enable
Browse files Browse the repository at this point in the history
  • Loading branch information
pubkey committed Feb 9, 2018
1 parent cf3e53c commit f6b9a67
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 55 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

### 7.X.X (coming soon)

Features:
- Use `CustomIdleQueue` for atomic updates to enable [#494](https://github.com/pubkey/rxdb/issues/494)

### 7.3.3 (February 4, 2018)

Other:
Expand Down
17 changes: 1 addition & 16 deletions src/plugins/attachments.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,6 @@ import RxChangeEvent from './../rx-change-event';
import * as util from './../util';
import RxError from '../rx-error';


/**
* to not have update-conflicts,
* we use atomic inserts (per document) on putAttachment()
* @type {WeakMap}
*/
const ATTACHMENT_ATOMIC_QUEUES = new WeakMap();


function ensureSchemaSupportsAttachments(doc) {
const schemaJson = doc.collection.schema.jsonID;
if (!schemaJson.attachments) {
Expand Down Expand Up @@ -158,12 +149,6 @@ RxAttachment.fromPouchDocument = (id, pouchDocAttachment, rxDocument) => {
});
};

export function getAtomicQueueOfDocument(doc) {
if (!ATTACHMENT_ATOMIC_QUEUES.has(doc))
ATTACHMENT_ATOMIC_QUEUES.set(doc, new IdleQueue());
return ATTACHMENT_ATOMIC_QUEUES.get(doc);
};

function shouldEncrypt(doc) {
return !!doc.collection.schema.jsonID.attachments.encrypted;
}
Expand All @@ -174,7 +159,7 @@ export async function putAttachment({
type = 'text/plain'
}) {
ensureSchemaSupportsAttachments(this);
const queue = getAtomicQueueOfDocument(this);
const queue = this.atomicQueue;

if (shouldEncrypt(this))
data = this.collection._crypter._encryptValue(data);
Expand Down
57 changes: 18 additions & 39 deletions src/rx-document.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import IdleQueue from 'custom-idle-queue';
import objectPath from 'object-path';
import deepEqual from 'deep-equal';

Expand Down Expand Up @@ -31,12 +32,6 @@ export class RxDocument {
// current doc-data, changes when setting values etc
this._data = util.clone(jsonData);

// atomic-update-functions that have not run yes
this._atomicUpdates = [];

// resolve-functions to resolve the promises of atomicUpdate
this._atomicUpdatesResolveFunctions = new WeakMap();

// false when _data !== _dataSync
this._synced$ = new BehaviorSubject(true);
this._deleted$ = new BehaviorSubject(false);
Expand All @@ -60,6 +55,11 @@ export class RxDocument {
get deleted() {
return this._deleted$.getValue();
}
get atomicQueue() {
if (!this._atomicQueue)
this._atomicQueue = new IdleQueue();
return this._atomicQueue;
}
get synced$() {
return this._synced$
.pipe(
Expand Down Expand Up @@ -340,41 +340,20 @@ export class RxDocument {
}

/**
* [atomicUpdate description]
* @param {[type]} fun [description]
* @return {Promise<RxDocument>} [description]
* runs an atomic update over the document
* @param {function(RxDocument)} fun
* @return {Promise<RxDocument>}
*/
async atomicUpdate(fun) {
this._atomicUpdates.push(fun);
const retPromise = new Promise((resolve, reject) => {
this._atomicUpdatesResolveFunctions.set(fun, {
resolve,
reject
});
});
this._runAtomicUpdates();
return retPromise;
}

async _runAtomicUpdates() {
if (this.__runAtomicUpdates_running) return;
else this.__runAtomicUpdates_running = true;

if (this._atomicUpdates.length === 0) {
this.__runAtomicUpdates_running = false;
return;
};
const fun = this._atomicUpdates.shift();

try {
await fun(this); // run atomic
await this.save();
} catch (err) {
this._atomicUpdatesResolveFunctions.get(fun).reject(err);
}
this._atomicUpdatesResolveFunctions.get(fun).resolve(this); // resolve promise
this.__runAtomicUpdates_running = false;
this._runAtomicUpdates();
const queue = this.atomicQueue;
await queue.requestIdlePromise();
const ret = await queue.wrapCall(
async () => {
await fun(this);
await this.save();
}
);
return this;
}

/**
Expand Down
2 changes: 2 additions & 0 deletions src/typings/rx-document.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Observable } from 'rxjs';
import IdleQueue from 'custom-idle-queue';

import {
RxCollection
Expand All @@ -20,6 +21,7 @@ export declare class RxDocumentBase<RxDocumentType> {

readonly $: Observable<any>;
readonly deleted$: Observable<boolean>;
readonly atomicQueue: IdleQueue;
readonly synced$: Observable<boolean>;
resync(): void;

Expand Down

0 comments on commit f6b9a67

Please sign in to comment.