Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
jmacd authored Jun 23, 2020
1 parent 0382850 commit 0f682d9
Showing 1 changed file with 39 additions and 7 deletions.
46 changes: 39 additions & 7 deletions sdk/metric/integrator/basic/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,25 @@ type (
// being maintained, taken from the process start time.
stateful bool

// TODO: as seen in lengthy comments below, both the
// `current` and `delta` fields have multiple uses
// depending on the specific configuration of
// instrument, exporter, and accumulator. It is
// possible to simplify this situation by declaring
// explicit fields that are not used with a dual
// purpose. Improve this situation?
//
// 1. "delta" is used to combine deltas from multiple
// accumulators, and it is also used to store the
// output of subtraction when computing deltas of
// PrecomputedSum instruments.
//
// 2. "current" either refers to the Aggregator passed
// to Process() by a single accumulator (when either
// there is just one Accumulator, or the instrument is
// Asynchronous), or it refers to "delta", depending
// on configuration.

current export.Aggregator // refers to single-accumulator checkpoint or delta.
delta export.Aggregator // owned if multi accumulator else nil.
cumulative export.Aggregator // owned if stateful else nil.
Expand Down Expand Up @@ -151,9 +170,20 @@ func (b *Integrator) Process(accum export.Accumulation) error {
sameCollection := b.state.finishedCollection == value.updated
value.updated = b.state.finishedCollection

// An existing value will be found for some stateKey when:
// (a) stateful aggregation is being used
// (b) multiple accumulators are being used.
// At this point in the code, we have located an existing
// value for some stateKey. This can be because:
//
// (a) stateful aggregation is being used, the entry was
// entered during a prior collection, and this is the first
// time processing an accumulation for this stateKey in the
// current collection. Since this is the first time
// processing an accumulation for this stateKey during this
// collection, we don't know yet whether there are multiple
// accumulators at work. If there are multiple accumulators,
// they'll hit case (b) the second time through.
//
// (b) multiple accumulators are being used, whether stateful
// or not.
//
// Case (a) occurs when the instrument and the exporter
// require memory to work correctly, either because the
Expand Down Expand Up @@ -273,9 +303,10 @@ func (b *Integrator) FinishCollection() error {
// delta or a cumulative aggregation.
var err error
if mkind.PrecomputedSum() {
// delta_value = current_cumulative_value - previous_cumulative_value
if subt, ok := value.current.(export.Subtractor); ok {
err = subt.Subtract(value.cumulative, value.delta, key.descriptor)
if currentSubtractor, ok := value.current.(export.Subtractor); ok {
// This line is equivalent to:
// value.delta = currentSubtractor - value.cumulative
err = currentSubtractor.Subtract(value.cumulative, value.delta, key.descriptor)

if err == nil {
err = value.current.SynchronizedMove(value.cumulative, key.descriptor)
Expand All @@ -284,7 +315,8 @@ func (b *Integrator) FinishCollection() error {
err = aggregation.ErrNoSubtraction
}
} else {
// cumulative_value = previous_cumulative_value + current_delta_value
// This line is equivalent to:
// value.cumulative = value.cumulative + value.delta
err = value.cumulative.Merge(value.current, key.descriptor)
}
if err != nil {
Expand Down

0 comments on commit 0f682d9

Please sign in to comment.