Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added 2 new HystrixEventTypes: EMIT and FALLBACK_EMIT #604

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 82 additions & 21 deletions hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -551,27 +551,22 @@ public void call(Notification<? super R> n) {
}


}).lift(new HystrixObservableTimeoutOperator<>(_self)).map(new Func1<R, R>() {

boolean once = false;

}).lift(new HystrixObservableTimeoutOperator<>(_self)).doOnNext(new Action1<R>() {
@Override
public R call(R t1) {
if (!once) {
// report success
executionResult = executionResult.addEvents(HystrixEventType.SUCCESS);
once = true;
public void call(R r) {
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEmission(HystrixEventType.EMIT);
eventNotifier.markEvent(HystrixEventType.EMIT, getCommandKey());
}
return t1;
}

}).doOnCompleted(new Action0() {

@Override
public void call() {
long duration = System.currentTimeMillis() - invocationStartTime;
metrics.addCommandExecutionTime(duration);
metrics.markSuccess(duration);
executionResult = executionResult.addEvents(HystrixEventType.SUCCESS);
circuitBreaker.markSuccess();
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) duration, executionResult.events);
}
Expand Down Expand Up @@ -781,7 +776,15 @@ private Observable<R> getFallbackOrThrowException(final HystrixEventType eventTy
executionResult = executionResult.addEvents(eventType);
final AbstractCommand<R> _cmd = this;

return getFallbackWithProtection().doOnCompleted(new Action0() {
return getFallbackWithProtection().doOnNext(new Action1<R>() {
@Override
public void call(R r) {
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEmission(HystrixEventType.FALLBACK_EMIT);
eventNotifier.markEvent(HystrixEventType.FALLBACK_EMIT, getCommandKey());
}
}
}).doOnCompleted(new Action0() {

@Override
public void call() {
Expand Down Expand Up @@ -886,6 +889,16 @@ protected void handleThreadEnd() {
}
}

/**
*
* @return if onNext events should be reported on
* This affects {@link HystrixRequestLog}, and {@link HystrixEventNotifier} currently.
* Metrics/hooks will be affected once they are in place
*/
protected boolean shouldOutputOnNextEvents() {
return false;
}

private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {

final AbstractCommand<R> originalCommand;
Expand Down Expand Up @@ -1402,32 +1415,36 @@ protected static class ExecutionResult {
private final int executionTime;
private final Exception exception;
private final long commandRunStartTimeInNanos;
private final int numEmissions;
private final int numFallbackEmissions;

private ExecutionResult(HystrixEventType... events) {
this(Arrays.asList(events), -1, null);
this(Arrays.asList(events), -1, null, 0, 0);
}

public ExecutionResult setExecutionTime(int executionTime) {
return new ExecutionResult(events, executionTime, exception);
return new ExecutionResult(events, executionTime, exception, numEmissions, numFallbackEmissions);
}

public ExecutionResult setException(Exception e) {
return new ExecutionResult(events, executionTime, e);
return new ExecutionResult(events, executionTime, e, numEmissions, numFallbackEmissions);
}

private ExecutionResult(List<HystrixEventType> events, int executionTime, Exception e) {
private ExecutionResult(List<HystrixEventType> events, int executionTime, Exception e, int numEmissions, int numFallbackEmissions) {
// we are safe assigning the List reference instead of deep-copying
// because we control the original list in 'newEvent'
this.events = events;
this.executionTime = executionTime;
if (executionTime >= 0 ) {
this.commandRunStartTimeInNanos = System.nanoTime() - this.executionTime*1000*1000; // 1000*1000 will conver the milliseconds to nanoseconds
this.commandRunStartTimeInNanos = System.nanoTime() - this.executionTime*1000*1000; // 1000*1000 will convert the milliseconds to nanoseconds
}
else {
this.commandRunStartTimeInNanos = -1;
}
this.exception = e;

this.numEmissions = numEmissions;
this.numFallbackEmissions = numFallbackEmissions;
}

// we can return a static version since it's immutable
Expand All @@ -1440,10 +1457,14 @@ private ExecutionResult(List<HystrixEventType> events, int executionTime, Except
* @return
*/
public ExecutionResult addEvents(HystrixEventType... events) {
ArrayList<HystrixEventType> newEvents = new ArrayList<>();
newEvents.addAll(this.events);
Collections.addAll(newEvents, events);
return new ExecutionResult(Collections.unmodifiableList(newEvents), executionTime, exception);
return new ExecutionResult(getUpdatedList(this.events, events), executionTime, exception, numEmissions, numFallbackEmissions);
}

private static List<HystrixEventType> getUpdatedList(List<HystrixEventType> currentList, HystrixEventType... newEvents) {
ArrayList<HystrixEventType> updatedEvents = new ArrayList<>();
updatedEvents.addAll(currentList);
Collections.addAll(updatedEvents, newEvents);
return Collections.unmodifiableList(updatedEvents);
}

public int getExecutionTime() {
Expand All @@ -1456,6 +1477,28 @@ public int getExecutionTime() {
public Exception getException() {
return exception;
}

/**
* This method may be called many times for {@code HystrixEventType.EMIT} and {@link HystrixEventType.FALLBACK_EMIT}.
* To save on storage, on the first time we see that event type, it gets added to the event list, and the count gets incremented.
* @param eventType emission event
* @return "updated" {@link ExecutionResult}
*/
public ExecutionResult addEmission(HystrixEventType eventType) {
switch (eventType) {
case EMIT: if (events.contains(HystrixEventType.EMIT)) {
return new ExecutionResult(events, executionTime, exception, numEmissions + 1, numFallbackEmissions);
} else {
return new ExecutionResult(getUpdatedList(this.events, HystrixEventType.EMIT), executionTime, exception, numEmissions +1, numFallbackEmissions);
}
case FALLBACK_EMIT: if (events.contains(HystrixEventType.FALLBACK_EMIT)) {
return new ExecutionResult(events, executionTime, exception, numEmissions, numFallbackEmissions + 1);
} else {
return new ExecutionResult(getUpdatedList(this.events, HystrixEventType.FALLBACK_EMIT), executionTime, exception, numEmissions, numFallbackEmissions + 1);
}
default: return this;
}
}
}

/* ******************************************************************************** */
Expand Down Expand Up @@ -1610,6 +1653,24 @@ public List<HystrixEventType> getExecutionEvents() {
return executionResult.events;
}

/**
* Number of emissions of the execution of a command. Only interesting in the streaming case.
* @return number of <code>OnNext</code> emissions by a streaming command
*/
@Override
public int getNumberEmissions() {
return executionResult.numEmissions;
}

/**
* Number of emissions of the execution of a fallback. Only interesting in the streaming case.
* @return number of <code>OnNext</code> emissions by a streaming fallback
*/
@Override
public int getNumberFallbackEmissions() {
return executionResult.numFallbackEmissions;
}

/**
* The execution time of this command instance in milliseconds, or -1 if not executed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@
* These are most often accessed via {@link HystrixRequestLog} or {@link HystrixCommand#getExecutionEvents()}.
*/
public enum HystrixEventType {
SUCCESS, FAILURE, TIMEOUT, SHORT_CIRCUITED, THREAD_POOL_REJECTED, SEMAPHORE_REJECTED, FALLBACK_SUCCESS, FALLBACK_FAILURE, FALLBACK_REJECTION, EXCEPTION_THROWN, RESPONSE_FROM_CACHE, COLLAPSED, BAD_REQUEST
EMIT, SUCCESS, FAILURE, TIMEOUT, SHORT_CIRCUITED, THREAD_POOL_REJECTED, SEMAPHORE_REJECTED, FALLBACK_EMIT, FALLBACK_SUCCESS, FALLBACK_FAILURE, FALLBACK_REJECTION, EXCEPTION_THROWN, RESPONSE_FROM_CACHE, COLLAPSED, BAD_REQUEST
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public interface HystrixInvokableInfo<R> {

public List<HystrixEventType> getExecutionEvents();

public int getNumberEmissions();

public int getNumberFallbackEmissions();

public int getExecutionTimeInMilliseconds();

public long getCommandRunStartTimeInNanos();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy;
import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier;

/**
* Used to wrap code that will execute potentially risky functionality (typically meaning a service call over the network)
Expand Down Expand Up @@ -49,6 +50,18 @@ protected HystrixObservableCommand(HystrixCommandGroupKey group) {
this(new Setter(group));
}

/**
*
* Overridden to true so that all onNext emissions are captured
*
* @return if onNext events should be reported on
* This affects {@link HystrixRequestLog}, and {@link HystrixEventNotifier} currently. Metrics/Hooks later
*/
@Override
protected boolean shouldOutputOnNextEvents() {
return true;
}

/**
* Construct a {@link HystrixObservableCommand} with defined {@link Setter} that allows injecting property and strategy overrides and other optional arguments.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,24 @@ public Collection<HystrixInvokableInfo<?>> getAllExecutedCommands() {
* <li>TestCommand[FAILURE][1ms]</li>
* <li>TestCommand[THREAD_POOL_REJECTED][1ms]</li>
* <li>TestCommand[THREAD_POOL_REJECTED, FALLBACK_SUCCESS][1ms]</li>
* <li>TestCommand[EMIT, SUCCESS][1ms]</li>
* <li>TestCommand[EMITx5, SUCCESS][1ms]</li>
* <li>TestCommand[EMITx5, FAILURE, FALLBACK_EMITx6, FALLBACK_FAILURE][100ms]</li>
* <li>TestCommand[FAILURE, FALLBACK_SUCCESS][1ms], TestCommand[FAILURE, FALLBACK_SUCCESS, RESPONSE_FROM_CACHE][1ms]x4</li>
* <li>GetData[SUCCESS][1ms], PutData[SUCCESS][1ms], GetValues[SUCCESS][1ms], GetValues[SUCCESS, RESPONSE_FROM_CACHE][1ms], TestCommand[FAILURE, FALLBACK_FAILURE][1ms], TestCommand[FAILURE,
* FALLBACK_FAILURE, RESPONSE_FROM_CACHE][1ms]</li>
* </ul>
* <p>
* If a command has a multiplier such as <code>x4</code> that means this command was executed 4 times with the same events. The time in milliseconds is the sum of the 4 executions.
* If a command has a multiplier such as <code>x4</code>, that means this command was executed 4 times with the same events. The time in milliseconds is the sum of the 4 executions.
* <p>
* For example, <code>TestCommand[SUCCESS][15ms]x4</code> represents TestCommand being executed 4 times and the sum of those 4 executions was 15ms. These 4 each executed the run() method since
* <code>RESPONSE_FROM_CACHE</code> was not present as an event.
*
*
* If an EMIT or FALLBACK_EMIT has a multiplier such as <code>x5</code>, that means a <code>HystrixObservableCommand</code> was used and it emitted that number of <code>OnNext</code>s.
* <p>
* For example, <code>TestCommand[EMITx5, FAILURE, FALLBACK_EMITx6, FALLBACK_FAILURE][100ms]</code> represents TestCommand executing observably, emitted 5 <code>OnNext</code>s, then an <code>OnError</code>.
* This command also has an Observable fallback, and it emits 6 <code>OnNext</code>s, then an <code>OnCompleted</code>.
*
* @return String request log or "Unknown" if unable to instead of throwing an exception.
*/
public String getExecutedCommandsAsString() {
Expand All @@ -179,7 +187,26 @@ public String getExecutedCommandsAsString() {
//replicate functionality of Arrays.toString(events.toArray()) to append directly to existing StringBuilder
builder.append("[");
for (HystrixEventType event : events) {
builder.append(event).append(", ");
switch (event) {
case EMIT:
int numEmissions = command.getNumberEmissions();
if (numEmissions > 1) {
builder.append(event).append("x").append(numEmissions).append(", ");
} else {
builder.append(event).append(", ");
}
break;
case FALLBACK_EMIT:
int numFallbackEmissions = command.getNumberFallbackEmissions();
if (numFallbackEmissions > 1) {
builder.append(event).append("x").append(numFallbackEmissions).append(", ");
} else {
builder.append(event).append(", ");
}
break;
default:
builder.append(event).append(", ");
}
}
builder.setCharAt(builder.length() - 2, ']');
builder.setLength(builder.length() - 1);
Expand Down
Loading