Skip to content

Commit

Permalink
Reimplemented CBLReplication.pendingDocumentIDs and -isDocumentPending:
Browse files Browse the repository at this point in the history
Now tracking replicator's lastSequence and caching it in an ivar.
-isDocumentPending: just compares the doc's sequence with that.
pendingDocumentIDs does basically what it used to do, but it doesn't
need to do it on a background thread.

Also, the bug-fix portion of this is that pendingDocumentIDs remembers
the database's lastSequenceNumber when it's cached; if that changes,
the cached value is invalidated.

Fixes #1132
  • Loading branch information
snej committed Feb 22, 2016
1 parent c2d69de commit 083779d
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 39 deletions.
128 changes: 90 additions & 38 deletions Source/API/CBLReplication.m
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ @interface CBLReplication ()

@implementation CBLReplication
{
NSSet* _pendingDocIDs;
bool _started;
bool _started; // Has replicator been started?
SequenceNumber _lastSequencePushed; // The latest sequence pushed by the replicator
NSSet* _pendingDocIDs; // Cached set of docIDs awaiting push
SequenceNumber _pendingDocIDsSequence; // DB lastSequenceNumber when _pendingDocIDs was set

// ONLY used on the server thread:
id<CBL_Replicator> _bg_replicator;
Expand Down Expand Up @@ -280,7 +282,7 @@ - (void) start {

// Initialize the status to something other than kCBLReplicationStopped:
[self updateStatus: kCBLReplicationOffline error: nil processed: 0 ofTotal: 0
serverCert: NULL];
lastSeqPushed: 0 serverCert: NULL];

[_database addReplication: self];
}
Expand Down Expand Up @@ -350,6 +352,7 @@ - (void) updateStatus: (CBLReplicationStatus)status
error: (NSError*)error
processed: (NSUInteger)changesProcessed
ofTotal: (NSUInteger)changesTotal
lastSeqPushed: (SequenceNumber)lastSeqPushed
serverCert: (SecCertificateRef)serverCert
{
if (!_started)
Expand All @@ -359,7 +362,10 @@ - (void) updateStatus: (CBLReplicationStatus)status
[_database forgetReplication: self];
}

_pendingDocIDs = nil; // forget cached IDs
if (lastSeqPushed >= 0 && lastSeqPushed != _lastSequencePushed) {
_lastSequencePushed = lastSeqPushed;
_pendingDocIDs = nil;
}

BOOL changed = NO;
if (!$equal(error, _lastError)) {
Expand Down Expand Up @@ -403,46 +409,84 @@ - (void) updateStatus: (CBLReplicationStatus)status

#pragma mark - PUSH REPLICATION ONLY:


- (CBL_ReplicatorSettings*) replicatorSettings {
CBLStatus status;
CBL_ReplicatorSettings* settings;
settings = [_database.manager replicatorSettingsWithProperties: self.properties
toDatabase: nil
status: &status];
if (!settings)
Warn(@"Error parsing replicator settings : %@", CBLStatusToNSError(status));
return settings;
}


- (SInt64) lastSequencePushed {
if (_pull)
return -1;
if (_lastSequencePushed <= 0) {
// If running replicator hasn't updated yet, fetch the checkpointed last sequence:
CBL_ReplicatorSettings* settings = self.replicatorSettings;
if (!settings)
return -1;
_lastSequencePushed = [[_database lastSequenceForReplicator: settings] longLongValue];
}
return _lastSequencePushed;
}


- (NSSet*) pendingDocumentIDs {
if (_pull || (_started && _pendingDocIDs))
return _pendingDocIDs;

_pendingDocIDs = [_database.manager.backgroundServer
waitForDatabaseNamed: _database.name to: ^id(CBLDatabase* bgDb)
{
CBLStatus status;
CBL_ReplicatorSettings* settings =
[bgDb.manager replicatorSettingsWithProperties: self.properties
toDatabase: nil
status: &status];
if (!settings) {
Warn(@"Error parsing replicator settings : %@", CBLStatusToNSError(status));
return nil;
}
if (_pull)
return nil;
if (_pendingDocIDs) {
if (_pendingDocIDsSequence == _database.lastSequenceNumber)
return _pendingDocIDs; // Still valid
_pendingDocIDs = nil;
}

NSString* lastSequence = _bg_replicator.lastSequence;
if (!lastSequence)
lastSequence = [bgDb lastSequenceForReplicator: settings];

NSError* error;
CBL_RevisionList* revs = [bgDb unpushedRevisionsSince: lastSequence
filter: settings.filterBlock
params: settings.filterParameters
error: &error];
if (revs)
return [NSSet setWithArray: revs.allDocIDs];
else
Warn(@"Error getting unpushed revisions : %@", error);
CBL_ReplicatorSettings* settings = self.replicatorSettings;
if (!settings)
return nil;
}];
SequenceNumber lastSequence = self.lastSequencePushed;
if (lastSequence < 0)
return nil;

_pendingDocIDsSequence = _database.lastSequenceNumber;
NSError* error;
CBL_RevisionList* revs = [_database unpushedRevisionsSince: $sprintf(@"%lld", lastSequence)
filter: settings.filterBlock
params: settings.filterParameters
error: &error];
if (!revs) {
Warn(@"Error getting unpushed revisions : %@", error);
return nil;
}
_pendingDocIDs = [NSSet setWithArray: revs.allDocIDs];
return _pendingDocIDs;
}


- (BOOL) isDocumentPending: (CBLDocument*)doc {
return doc && [self.pendingDocumentIDs containsObject: doc.documentID];
//OPT: It may be cheaper to do this by fetching the replicator's checkpoint sequence and
// comparing the doc's sequence to it.
SequenceNumber lastSeq = self.lastSequencePushed;
if (lastSeq < 0)
return NO; // error

CBLSavedRevision* rev = doc.currentRevision;
SequenceNumber seq = rev.sequence;
if (seq <= lastSeq)
return NO;

if (_filter) {
// Use _pendingDocIDs as a shortcut, if it's valid
if (_pendingDocIDs && _pendingDocIDsSequence == _database.lastSequenceNumber)
return [_pendingDocIDs containsObject: doc.documentID];
// Else run the filter on the doc:
CBL_ReplicatorSettings* settings = self.replicatorSettings;
if (!settings.filterBlock(rev, settings.filterParameters))
return NO;
}
return YES;
}


Expand Down Expand Up @@ -538,7 +582,9 @@ - (void) bg_startReplicator: (CBLManager*)server_dbmgr
CBLReplication *strongSelf = weakSelf;
[strongSelf updateStatus: kCBLReplicationStopped
error: CBLStatusToNSError(status)
processed: 0 ofTotal: 0 serverCert: NULL];
processed: 0 ofTotal: 0
lastSeqPushed: -1
serverCert: NULL];
}];
return;
}
Expand Down Expand Up @@ -605,6 +651,10 @@ - (void) bg_updateProgress {
SecCertificateRef serverCert = _bg_replicator.serverCert;
cfretain(serverCert);

SequenceNumber lastSeqPushed = -1;
if (!_pull)
lastSeqPushed = [_bg_replicator.lastSequence longLongValue];

if (status == kCBLReplicationStopped) {
[self bg_setReplicator: nil];
}
Expand All @@ -617,7 +667,9 @@ - (void) bg_updateProgress {
__weak CBLReplication *weakSelf = self;
[_database.manager doAsync:^{
CBLReplication *strongSelf = weakSelf;
[strongSelf updateStatus: status error: error processed: changes ofTotal: total
[strongSelf updateStatus: status error: error
processed: changes ofTotal: total
lastSeqPushed: lastSeqPushed
serverCert: serverCert];
cfrelease(serverCert);
}];
Expand Down
7 changes: 6 additions & 1 deletion Unit-Tests/Replication_Tests.m
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,7 @@ - (void)test18_PendingDocumentIDs {
AssertEq(repl.pendingDocumentIDs.count, 0u);
Assert(![repl isDocumentPending: [db documentWithID: @"doc-1"]]);

// Add another set of documents and create a new replicator:
// Add another set of documents:
[db inTransaction: ^BOOL{
for (int i = 11; i <= 20; i++) {
@autoreleasepool {
Expand All @@ -1142,6 +1142,11 @@ - (void)test18_PendingDocumentIDs {
return YES;
}];

// Make sure newly-added documents are considered pending: (#1132)
Assert([repl isDocumentPending: [db documentWithID: @"doc-11"]]);
AssertEq(repl.pendingDocumentIDs.count, 10u);

// Create a new replicator:
repl = [db createPushReplication: remoteDbURL];

AssertEq(repl.pendingDocumentIDs.count, 10u);
Expand Down

0 comments on commit 083779d

Please sign in to comment.