Skip to content
This repository has been archived by the owner on Mar 15, 2024. It is now read-only.

Complete aggregated event work #36

Merged
merged 1 commit into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions eventrecorder/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,28 @@ func (e EventBatch) Validate() error {
return nil
}

type RetrievalAttempt struct {
Error string `json:"error,omitempty"`
TimeToFirstByte string `json:"timeToFirstByte,omitempty"`
}

type AggregateEvent struct {
RetrievalID string `json:"retrievalId"` // The unique ID of the retrieval
InstanceID string `json:"instanceId"` // The ID of the Lassie instance generating the event
RetrievalID string `json:"retrievalId"` // The unique ID of the retrieval
StorageProviderID string `json:"storageProviderId,omitempty"` // The ID of the storage provider that served the retrieval content
TimeToFirstByte int64 `json:"timeToFirstByte,omitempty"` // The time it took to receive the first byte in milliseconds
TimeToFirstByte string `json:"timeToFirstByte,omitempty"` // The time it took to receive the first byte in milliseconds
Bandwidth uint64 `json:"bandwidth,omitempty"` // The bandwidth of the retrieval in bytes per second
BytesTransferred uint64 `json:"bytesTransferred,omitempty"` // The total transmitted deal size
Success bool `json:"success"` // Wether or not the retreival ended with a success event
StartTime time.Time `json:"startTime"` // The time the retrieval started
EndTime time.Time `json:"endTime"` // The time the retrieval ended

TimeToFirstIndexerResult string `json:"timeToFirstIndexerResult,omitempty"` // time it took to receive our first "CandidateFound" event
IndexerCandidatesReceived int `json:"indexerCandidatesReceived"` // The number of candidates received from the indexer
IndexerCandidatesFiltered int `json:"indexerCandidatesFiltered"` // The number of candidates that made it through the filtering stage
ProtocolsAllowed []string `json:"protocolsAllowed,omitempty"` // The available protocols that could be used for this retrieval
ProtocolsAttempted []string `json:"protocolsAttempted,omitempty"` // The protocols that were used to attempt this retrieval
RetrievalAttempts map[string]*RetrievalAttempt `json:"retrievalAttempts,omitempty"` // All of the retrieval attempts, indexed by their SP ID
}

func (e AggregateEvent) Validate() error {
Expand All @@ -144,6 +157,29 @@ func (e AggregateEvent) Validate() error {
case e.EndTime.Before(e.StartTime):
return errors.New("property endTime cannot be before startTime")
default:
if e.TimeToFirstByte != "" {
_, err := time.ParseDuration(e.TimeToFirstByte)
if err != nil {
return err
}
}
if e.TimeToFirstIndexerResult != "" {
_, err := time.ParseDuration(e.TimeToFirstIndexerResult)
if err != nil {
return err
}
}
for _, retrievalAttempt := range e.RetrievalAttempts {
if retrievalAttempt == nil {
return errors.New("all retrieval attempts should have values")
}
if retrievalAttempt.TimeToFirstByte != "" {
_, err := time.ParseDuration(retrievalAttempt.TimeToFirstByte)
if err != nil {
return err
}
}
}
return nil
}
}
Expand Down
90 changes: 82 additions & 8 deletions eventrecorder/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package eventrecorder
import (
"context"
"fmt"
"time"

"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -105,47 +106,120 @@ func (r *EventRecorder) RecordAggregateEvents(ctx context.Context, events []Aggr
totalLogger := logger.With("total", len(events))

var batchQuery pgx.Batch
var batchRetrievalAttempts pgx.Batch
for _, event := range events {
var timeToFirstByte time.Duration
if event.TimeToFirstByte != "" {
timeToFirstByte, _ = time.ParseDuration(event.TimeToFirstByte)
}
var timeToFirstIndexerResult time.Duration
if event.TimeToFirstIndexerResult != "" {
timeToFirstIndexerResult, _ = time.ParseDuration(event.TimeToFirstIndexerResult)
}
query := `
INSERT INTO aggregate_retrieval_events(
instance_id,
retrieval_id,
storage_provider_id,
time_to_first_byte_ms,
time_to_first_byte,
bandwidth_bytes_sec,
bytes_transferred,
success,
start_time,
end_time
end_time,
time_to_first_indexer_result,
indexer_candidates_received,
indexer_candidates_filtered,
protocols_allowed,
protocols_attempted
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
`
batchQuery.Queue(query,
event.InstanceID,
event.RetrievalID,
event.StorageProviderID,
event.TimeToFirstByte,
timeToFirstByte,
event.Bandwidth,
event.BytesTransferred,
event.Success,
event.StartTime,
event.EndTime,
timeToFirstIndexerResult,
event.IndexerCandidatesReceived,
event.IndexerCandidatesFiltered,
event.ProtocolsAllowed,
event.ProtocolsAttempted,
).Exec(func(ct pgconn.CommandTag) error {
rowsAffected := ct.RowsAffected()
switch rowsAffected {
case 0:
totalLogger.Warnw("Retrieval event insertion did not affect any rows", "event", event, "rowsAffected", rowsAffected)
totalLogger.Warnw("Aggregated event insertion did not affect any rows", "event", event, "rowsAffected", rowsAffected)
default:
totalLogger.Debugw("Inserted event successfully", "event", event, "rowsAffected", rowsAffected)
totalLogger.Debugw("Inserted aggregated event successfully", "event", event, "rowsAffected", rowsAffected)
}
return nil
})
attempts := make(map[string]string, len(event.RetrievalAttempts))
for storageProviderID, retrievalAttempt := range event.RetrievalAttempts {
attempts[storageProviderID] = retrievalAttempt.Error
var timeToFirstByte time.Duration
if retrievalAttempt.TimeToFirstByte != "" {
timeToFirstByte, _ = time.ParseDuration(retrievalAttempt.TimeToFirstByte)
}
query := `
INSERT INTO retrieval_attempts(
retrieval_id,
storage_provider_id,
time_to_first_byte,
error
)
VALUES ($1, $2, $3, $4)
`
batchRetrievalAttempts.Queue(query,
event.RetrievalID,
storageProviderID,
timeToFirstByte,
retrievalAttempt.Error,
).Exec(func(ct pgconn.CommandTag) error {
rowsAffected := ct.RowsAffected()
switch rowsAffected {
case 0:
totalLogger.Warnw("Retrieval attempt insertion did not affect any rows", "retrievalID", event.RetrievalID, "retrievalAttempt", retrievalAttempt, "storageProviderID", storageProviderID, "rowsAffected", rowsAffected)
default:
totalLogger.Debugw("Inserted retrieval attempt successfully", "retrievalID", event.RetrievalID, "retrievalAttempt", retrievalAttempt, "storageProviderID", storageProviderID, "rowsAffected", rowsAffected)
}
return nil
})
}
if r.cfg.metrics != nil {
r.cfg.metrics.HandleAggregatedEvent(ctx,
timeToFirstIndexerResult,
timeToFirstByte,
event.Success,
event.StorageProviderID,
event.StartTime,
event.EndTime,
int64(event.Bandwidth),
int64(event.BytesTransferred),
int64(event.IndexerCandidatesReceived),
int64(event.IndexerCandidatesFiltered),
attempts,
)
}
}
batchResult := r.db.SendBatch(ctx, &batchQuery)
err := batchResult.Close()
if err != nil {
totalLogger.Errorw("At least one retrieval event insertion failed", "err", err)
totalLogger.Errorw("At least one aggregated event insertion failed", "err", err)
return err
}
batchResult = r.db.SendBatch(ctx, &batchRetrievalAttempts)
err = batchResult.Close()
if err != nil {
totalLogger.Errorw("At least one retrieval attempt insertion failed", "err", err)
return err
}

totalLogger.Info("Successfully submitted batch event insertion")
return nil
}
Expand Down
121 changes: 96 additions & 25 deletions metrics/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,31 +31,7 @@ func (m *Metrics) HandleFailureEvent(ctx context.Context, id types.RetrievalID,
if storageProviderID != types.BitswapIndentifier {
m.graphsyncRetrievalFailureCount.Add(ctx, 1, attribute.String("sp_id", storageProviderID))
}
var errorMetricMatches = map[string]instrument.Int64Counter{
"response rejected": m.retrievalErrorRejectedCount,
"Too many retrieval deals received": m.retrievalErrorTooManyCount,
"Access Control": m.retrievalErrorACLCount,
"Under maintenance, retry later": m.retrievalErrorMaintenanceCount,
"miner is not accepting online retrieval deals": m.retrievalErrorNoOnlineCount,
"unconfirmed block transfer": m.retrievalErrorUnconfirmedCount,
"timeout after ": m.retrievalErrorTimeoutCount,
"there is no unsealed piece containing payload cid": m.retrievalErrorNoUnsealedCount,
"getting pieces for cid": m.retrievalErrorDAGStoreCount,
"graphsync request failed to complete: request failed - unknown reason": m.retrievalErrorGraphsyncCount,
"failed to dial": m.retrievalErrorFailedToDialCount,
}

var matched bool
for substr, metric := range errorMetricMatches {
if strings.Contains(msg, substr) {
metric.Add(ctx, 1, attribute.String("protocol", protocol(storageProviderID)))
matched = true
break
}
}
if !matched {
m.retrievalErrorOtherCount.Add(ctx, 1, attribute.String("protocol", protocol(storageProviderID)))
}
m.matchErrorMessage(ctx, msg, storageProviderID)
}
}

Expand Down Expand Up @@ -158,6 +134,101 @@ func (m *Metrics) HandleSuccessEvent(ctx context.Context, id types.RetrievalID,
m.failedRetrievalsPerRequestCount.Record(ctx, int64(finalDetails.FailedCount))
}

func (m *Metrics) HandleAggregatedEvent(ctx context.Context,
timeToFirstIndexerResult time.Duration,
timeToFirstByte time.Duration,
success bool,
storageProviderID string,
startTime time.Time,
endTime time.Time,
bandwidth int64,
bytesTransferred int64,
indexerCandidates int64,
indexerFiltered int64,
attempts map[string]string) {
m.totalRequestCount.Add(ctx, 1)
failureCount := 0
var recordedGraphSync, recordedBitswap bool
for storageProviderID, msg := range attempts {
if storageProviderID == types.BitswapIndentifier {
if !recordedBitswap {
recordedBitswap = true
m.requestWithBitswapAttempt.Add(ctx, 1)
}
} else {
if !recordedGraphSync {
recordedGraphSync = true
m.requestWithGraphSyncAttempt.Add(ctx, 1, attribute.String("sp_id", storageProviderID))
}
}
if msg != "" {
if storageProviderID != types.BitswapIndentifier {
m.graphsyncRetrievalFailureCount.Add(ctx, 1, attribute.String("sp_id", storageProviderID))
}
m.matchErrorMessage(ctx, msg, storageProviderID)
failureCount += 0
}
}
if timeToFirstIndexerResult > 0 {
m.timeToFirstIndexerResult.Record(ctx, timeToFirstIndexerResult.Seconds())
}
if indexerCandidates > 0 {
m.requestWithIndexerCandidatesCount.Add(ctx, 1)
}
if indexerFiltered > 0 {
m.requestWithIndexerCandidatesFilteredCount.Add(ctx, 1)
}
if timeToFirstByte > 0 {
m.requestWithFirstByteReceivedCount.Add(ctx, 1, attribute.String("protocol", protocol(storageProviderID)))
m.timeToFirstByte.Record(ctx, timeToFirstByte.Seconds(), attribute.String("protocol", protocol(storageProviderID)))
}
if success {
m.requestWithSuccessCount.Add(ctx, 1)
if storageProviderID == types.BitswapIndentifier {
m.requestWithBitswapSuccessCount.Add(ctx, 1)
} else {
m.requestWithGraphSyncSuccessCount.Add(ctx, 1, attribute.String("sp_id", storageProviderID))
}

m.retrievalDealDuration.Record(ctx, endTime.Sub(startTime).Seconds(), attribute.String("protocol", protocol(storageProviderID)))
m.retrievalDealSize.Record(ctx, bytesTransferred, attribute.String("protocol", protocol(storageProviderID)))
m.bandwidthBytesPerSecond.Record(ctx, bandwidth, attribute.String("protocol", protocol(storageProviderID)))

m.indexerCandidatesPerRequestCount.Record(ctx, indexerCandidates)
m.indexerCandidatesFilteredPerRequestCount.Record(ctx, indexerFiltered)
m.failedRetrievalsPerRequestCount.Record(ctx, int64(failureCount))
} else if len(attempts) == 0 {
m.requestWithIndexerFailures.Add(ctx, 1)
}
}

func (m *Metrics) matchErrorMessage(ctx context.Context, msg string, storageProviderID string) {
var errorMetricMatches = map[string]instrument.Int64Counter{
"response rejected": m.retrievalErrorRejectedCount,
"Too many retrieval deals received": m.retrievalErrorTooManyCount,
"Access Control": m.retrievalErrorACLCount,
"Under maintenance, retry later": m.retrievalErrorMaintenanceCount,
"miner is not accepting online retrieval deals": m.retrievalErrorNoOnlineCount,
"unconfirmed block transfer": m.retrievalErrorUnconfirmedCount,
"timeout after ": m.retrievalErrorTimeoutCount,
"there is no unsealed piece containing payload cid": m.retrievalErrorNoUnsealedCount,
"getting pieces for cid": m.retrievalErrorDAGStoreCount,
"graphsync request failed to complete: request failed - unknown reason": m.retrievalErrorGraphsyncCount,
"failed to dial": m.retrievalErrorFailedToDialCount,
}

var matched bool
for substr, metric := range errorMetricMatches {
if strings.Contains(msg, substr) {
metric.Add(ctx, 1, attribute.String("protocol", protocol(storageProviderID)))
matched = true
break
}
}
if !matched {
m.retrievalErrorOtherCount.Add(ctx, 1, attribute.String("protocol", protocol(storageProviderID)))
}
}
func protocol(storageProviderId string) string {
if storageProviderId == types.BitswapIndentifier {
return "bitswap"
Expand Down
24 changes: 19 additions & 5 deletions schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,26 @@ create table if not exists retrieval_events(
);

create table if not exists aggregate_retrieval_events(
retrieval_id uuid not null,
retrieval_id uuid not null UNIQUE,
instance_id character varying(64) not null,
storage_provider_id character varying(256),
time_to_first_byte_ms integer,
bandwidth_bytes_sec integer,
time_to_first_byte int8,
bandwidth_bytes_sec int8,
bytes_transferred int8,
success boolean not null,
start_time timestamp with time zone not null,
end_time timestamp with time zone not null
);
end_time timestamp with time zone not null,
time_to_first_indexer_result int8,
indexer_candidates_received integer,
indexer_candidates_filtered integer,
protocols_allowed varchar[256][],
protocols_attempted varchar[256][]
);

create table if not exists retrieval_attempts(
retrieval_id uuid not null,
storage_provider_id character varying(256),
time_to_first_byte int8,
error character varying(256),
FOREIGN KEY (retrieval_id) REFERENCES aggregate_retrieval_events (retrieval_id)
);