Skip to content

Latest commit

 

History

History
424 lines (284 loc) · 12.6 KB

README.md

File metadata and controls

424 lines (284 loc) · 12.6 KB

streams

import "github.com/smartcontractkit/data-streams-sdk/go"

Package streams implements a client library for the Data Streams API providing a domain oriented abstraction for both report Streams and point in time retrieval with fault tolerant capabilities.

auth.go client.go config.go doc.go endpoints.go stream.go

var (
    ErrStreamClosed = fmt.Errorf("client: use of closed Stream")
)
func LogPrintf(format string, a ...any)

LogPrintf implements a LogFunction using fmt.Printf

type Client interface {
    // GetFeeds lists all feeds available to this client.
    GetFeeds(ctx context.Context) (r []*feed.Feed, err error)

    // GetLatestReport fetches the latest report available for the given feedID.
    GetLatestReport(ctx context.Context, id feed.ID) (r *ReportResponse, err error)

    // GetReports fetches the reports for the given feedIDs and timestamp.
    GetReports(ctx context.Context, ids []feed.ID, timestamp uint64) ([]*ReportResponse, error)

    // GetReportPage paginates the reports for the given feedID and start timestamp.
    GetReportPage(ctx context.Context, id feed.ID, startTS uint64) (*ReportPage, error)

    // Stream creates realtime report stream for the given feedIDs.
    Stream(ctx context.Context, feedIDs []feed.ID) (Stream, error)
}

Client is the data streams client interface.

Example Client:
cfg := streams.Config{
    ApiKey:    "mykey",
    ApiSecret: "mysecret",
    RestURL:   "https://streams.url",
    WsURL:     "https://streams.url",
    Logger:    streams.LogPrintf,
}

streamsClient, err := streams.New(cfg)
if err != nil {
    streams.LogPrintf("error creating client: %s", err)
    os.Exit(1)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
availableFeeds, err := streamsClient.GetFeeds(ctx)
cancel()
if err != nil {
    streams.LogPrintf("error fetching feeds: %s", err)
    os.Exit(1)
}

f := availableFeeds[0]
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
report, err := streamsClient.GetLatestReport(ctx, f.FeedID)
cancel()
if err != nil {
    streams.LogPrintf("error fetching latest report: %s", err)
    os.Exit(1)
}

streams.LogPrintf("report feedID: %s, observations_timestamp: %d, valid_from_timestamp: %d",
    report.FeedID, report.ObservationsTimestamp, report.ValidFromTimestamp)

ctx, cancel = context.WithTimeout(context.Background(), time.Second)
reports, err := streamsClient.GetReports(
    ctx, []feed.ID{availableFeeds[0].FeedID, availableFeeds[1].FeedID}, uint64(time.Now().Unix())-3)
cancel()
if err != nil {
    streams.LogPrintf("error fetching reports: %s", err)
    os.Exit(1)
}

for _, report = range reports {
    decoded, err := streamsReport.Decode[v3.Data](report.FullReport)
    if err != nil {
        streams.LogPrintf("error decoding decoded: %s", err)
        os.Exit(1)
    }

    streams.LogPrintf("report feedID: %s, observations_timestamp: %d, valid_from_timestamp: %d",
        report.FeedID, report.ObservationsTimestamp, report.ValidFromTimestamp)
    streams.LogPrintf(
        "FeedID: %s, FeedVersion: %d, Bid: %s, Ask: %s, BenchMark: %s, LinkFee: %s, NativeFee: %s, ValidFromTS: %d, ExpiresAt: %d",
        decoded.Data.FeedID.String(),
        decoded.Data.FeedID.Version(),
        decoded.Data.Bid.String(),
        decoded.Data.Ask.String(),
        decoded.Data.BenchmarkPrice.String(),
        decoded.Data.LinkFee.String(),
        decoded.Data.NativeFee.String(),
        decoded.Data.ValidFromTimestamp,
        decoded.Data.ExpiresAt,
    )
}
func New(cfg Config) (c Client, err error)

New creates a new Client with the given config. New does not initialize any connection to the Data Streams service.

type Config struct {
    ApiKey    string // Client Api key
    ApiSecret string // Client Api secret
    RestURL   string // Rest Api url

    WsURL string // Websocket Api url

    WsHA               bool                          // Use concurrent connections to multiple Streams servers
    WsMaxReconnect     int                           // Maximum number of reconnection attempts for Stream underlying connections
    LogDebug           bool                          // Log debug information
    InsecureSkipVerify bool                          // Skip server certificate chain and host name verification
    Logger             func(format string, a ...any) // Logger function

    // InspectHttp intercepts http responses for rest requests.
    // The response object must not be modified.
    InspectHttpResponse func(*http.Response)
    // contains filtered or unexported fields
}

Config specifies the client configuration and dependencies. If specified the Logger function will be used to log informational client activity.

type CtxKey string

CtxKey type for context values

const (

    // CustomHeadersCtxKey is used as key in the context.Context object
    // to pass in a custom http headers in a http.Header to be used by the client.
    // Custom header values will overwrite client headers if they have the same key.
    CustomHeadersCtxKey CtxKey = "CustomHeaders"
)
type ReportPage struct {
    Reports    []*ReportResponse
    NextPageTS uint64
}

ReportPage implements the server pagination response. NextPageTS is the timestamp to be used when requesting the next page.

type ReportResponse struct {
    FeedID                feed.ID `json:"feedID"`
    FullReport            []byte  `json:"fullReport"`
    ValidFromTimestamp    uint64  `json:"validFromTimestamp"`
    ObservationsTimestamp uint64  `json:"observationsTimestamp"`
}

ReportResponse implements the report envelope that contains the full report payload, its FeedID and timestamps. For decoding the Report Payload use report.Decode().

func (*ReportResponse) MarshalJSON

func (r *ReportResponse) MarshalJSON() ([]byte, error)

func (*ReportResponse) String

func (r *ReportResponse) String() (s string)

func (*ReportResponse) UnmarshalJSON

func (r *ReportResponse) UnmarshalJSON(b []byte) (err error)
type Stats struct {
    Accepted              uint64 // Total number of accepted reports
    Deduplicated          uint64 // Total number of deduplicated reports when in HA
    TotalReceived         uint64 // Total number of received reports
    PartialReconnects     uint64 // Total number of partial reconnects when in HA
    FullReconnects        uint64 // Total number of full reconnects
    ConfiguredConnections uint64 // Number of configured connections if in HA
    ActiveConnections     uint64 // Current number of active connections
}

Stats for the Stream

func (Stats) String

func (s Stats) String() (st string)
type Stream interface {
    // Read the next available report on the Stream.
    // Read blocks until a report is received, the context is canceled or
    // all underlying connections are in a error state.
    Read(context.Context) (*ReportResponse, error)

    // Stats return basic stats about the Stream.
    Stats() Stats

    // Close the Stream. Is the caller responsibility to call close when
    // the stream is no longer needed.
    Close() error
}

Stream represents a realtime report stream. Safe for concurrent usage.

The Stream will maintain at least 2 concurrent connections to different instances to ensure high availability, fault tolerance and minimize the risk of report gaps.

Example Stream:
cfg := streams.Config{
    ApiKey:    "mykey",
    ApiSecret: "mysecret",
    RestURL:   "https://streams.url",
    WsURL:     "https://streams.url",
    Logger:    streams.LogPrintf,
}

client, err := streams.New(cfg)
if err != nil {
    streams.LogPrintf("error creating client: %s", err)
    os.Exit(1)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
availableFeeds, err := client.GetFeeds(ctx)
cancel()
if err != nil {
    streams.LogPrintf("error fetching feeds: %s", err)
    os.Exit(1)
}

ctx, cancel = context.WithTimeout(context.Background(), time.Second)
stream, err := client.Stream(
    ctx, []feed.ID{availableFeeds[0].FeedID, availableFeeds[1].FeedID})
cancel()
if err != nil {
    streams.LogPrintf("error subscribing: %s", err)
    os.Exit(1)
}

defer stream.Close()
for stream.Stats().Accepted < 100 {
    report, err := stream.Read(context.Background())
    if err != nil {
        streams.LogPrintf("error reading from stream: %s", err)
        os.Exit(1)
    }

    streams.LogPrintf("report feedID: %s, observations_timestamp: %d, valid_from_timestamp: %d",
        report.FeedID, report.ObservationsTimestamp, report.ValidFromTimestamp)

    decoded, err := streamsReport.Decode[v3.Data](report.FullReport)
    if err != nil {
        streams.LogPrintf("error decoding decoded: %s", err)
        os.Exit(1)
    }

    streams.LogPrintf(
        "FeedID: %s, FeedVersion: %d, Bid: %s, Ask: %s, BenchMark: %s, LinkFee: %s, NativeFee: %s, ValidFromTS: %d, ExpiresAt: %d",
        decoded.Data.FeedID.String(),
        decoded.Data.FeedID.Version(),
        decoded.Data.Bid.String(),
        decoded.Data.Ask.String(),
        decoded.Data.BenchmarkPrice.String(),
        decoded.Data.LinkFee.String(),
        decoded.Data.NativeFee.String(),
        decoded.Data.ValidFromTimestamp,
        decoded.Data.ExpiresAt,
    )

    streams.LogPrintf("stream stats: %s", stream.Stats().String())
}

Generated by godoc2md