Skip to content
This repository has been archived by the owner on Jan 9, 2025. It is now read-only.

Fixing storage stalls in case of errors during the housekeeping phase #303

Merged
merged 1 commit into from
Jan 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ private void work() throws InterruptedException
{
this.eventLogger.logDisruption(this, t);
this.operationController.setChannelProcessingEnabled(false);
this.operationController.registerDisruption(t);
this.eventLogger.logChannelProcessingDisabled(this);
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@ public abstract class AbstractCompletingTask<R>
// constructors //
/////////////////

public AbstractCompletingTask(final long timestamp, final int channelCount)
public AbstractCompletingTask(
final long timestamp ,
final int channelCount,
final StorageOperationController controller
)
{
super(timestamp, channelCount);
super(timestamp, channelCount, controller);
}



///////////////////////////////////////////////////////////////////////////
// declared methods //
/////////////////////
Expand Down Expand Up @@ -140,9 +143,9 @@ protected final void complete(final StorageChannel channel, final R result) thro
public static final class Dummy extends AbstractCompletingTask<Void> implements StorageRequestTask
{

public Dummy(final int channelCount)
public Dummy(final int channelCount, StorageOperationController controller)
{
super(0, channelCount);
super(0, channelCount, controller);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,26 +42,31 @@ public abstract class Abstract<R>
// instance fields //
////////////////////

private int remainingForCompletion;
private int remainingForProcessing;
private int remainingForCompletion;
private int remainingForProcessing;

private AtomicBoolean hasProblems = new AtomicBoolean();
private final Throwable[] problems ; // unshared instance conveniently abused as a second lock
private AtomicBoolean hasProblems = new AtomicBoolean();
private final Throwable[] problems ; // unshared instance conveniently abused as a second lock
protected final StorageOperationController controller;



///////////////////////////////////////////////////////////////////////////
// constructors //
/////////////////

public Abstract(final long timestamp, final int channelCount)
public Abstract(
final long timestamp,
final int channelCount,
final StorageOperationController controller)
{
super(timestamp);

// (20.11.2019 TM)NOTE: inlined assignments caused an "Unsafe" error on an ARM machine. No kidding.
this.remainingForProcessing = channelCount;
this.remainingForCompletion = channelCount;
this.problems = new Throwable[channelCount];
this.controller = controller;
}


Expand All @@ -72,10 +77,15 @@ public Abstract(final long timestamp, final int channelCount)

private void checkForProblems()
{
if(controller.hasDisruptions()) {
throw new StorageException("Aborting after: ", controller.disruptions().first());
}

if(!this.hasProblems.get())
{
return;
}

// (30.05.2013 TM)FIXME: check why this is never reached when task fails?
// (15.06.2013 TM)NOTE: should be fixed by double check in waitOnCompletion()
// (09.12.2019 TM)NOTE: still needs to be investigated.
Expand Down Expand Up @@ -143,7 +153,7 @@ public final synchronized void waitOnCompletion() throws InterruptedException
while(this.remainingForCompletion > 0)
{
this.checkForProblems(); // check for problems already while waiting
this.wait();
this.wait(100);
}
this.checkForProblems(); // check for problems after every channel reported completion
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public Default(
final StorageOperationController operationController
)
{
super(timestamp, channelCount);
super(timestamp, channelCount, operationController);
this.operationController = notNull(operationController) ;
this.result = new StorageInventory[channelCount];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public Default(
final StorageOperationController operationController
)
{
super(timestamp, channelCount);
super(timestamp, channelCount, operationController);
this.operationController = notNull(operationController);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ public final class Default
final long timestamp ,
final int channelCount ,
final long nanoTimeBudget ,
final StorageEntityCacheEvaluator entityEvaluator
final StorageEntityCacheEvaluator entityEvaluator,
final StorageOperationController controller
)
{
super(timestamp, channelCount);
super(timestamp, channelCount, controller);
this.entityEvaluator = entityEvaluator; // may be null
this.nanoTimeBudget = nanoTimeBudget;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@ public final class Default
/////////////////

Default(
final long timestamp ,
final int channelCount
final long timestamp ,
final int channelCount,
final StorageOperationController controller
)
{
super(timestamp, channelCount);
super(timestamp, channelCount, controller);
this.channelResults = new StorageRawFileStatistics.ChannelStatistics[channelCount];
this.creationTime = XTime.now();
}
Expand Down
Loading