Streams Direct SDK (Go)

This documentation provides a detailed reference for the Data Streams SDK for Go. It implements a client library that offers a domain-oriented abstraction for interacting with the Data Streams API and enables both point-in-time data retrieval and real-time data streaming with built-in fault tolerance capabilities.



import streams ""


Client interface

Interface for interacting with the Data Streams API. Use the New function to create a new instance of the client.

Interface Methods
  • GetFeeds: Lists all streams available to you.

    GetFeeds(ctx context.Context) (r []*Feed, err error)
  • GetLatestReport: Fetches the latest report available for the specified FeedID.

    GetLatestReport(ctx context.Context, id FeedID) (r *Report, err error)
  • GetReports: Fetches reports for the specified stream IDs and a given timestamp.

    GetReports(ctx context.Context, ids []FeedID, timestamp uint64) ([]*Report, error)
  • GetReportPage: Paginates the reports for a specified FeedID starting from a given timestamp.

    GetReportPage(ctx context.Context, id FeedID, startTS uint64) (*ReportPage, error)
  • Stream: Creates a real-time report stream for specified stream IDs.

    Stream(ctx context.Context, feedIDs []FeedID) (Stream, error)

Config struct

Configuration struct for the client. Config specifies the client configuration and dependencies. If you specify the Logger function, informational client activity is logged.

type Config struct {
	ApiKey             string // Client API key
	ApiSecret          string // Client API secret
	RestURL            string // Rest API URL

	WsURL              string // Websocket API URL

	WsHA               bool // Use concurrent connections to multiple Streams servers
	WsMaxReconnect     int // Maximum number of reconnection attempts for Stream underlying connections
	LogDebug           bool // Log debug information
	InsecureSkipVerify bool // Skip server certificate chain and host name verification
	Logger             func(format string, a ...any) // Logger function

	// InspectHttp intercepts http responses for rest requests.
	// The response object must not be modified.
	InspectHttpResponse func(*http.Response)

CtxKey string

Custom context key type used for passing additional headers.

type CtxKey string
  • Constants:

    • CustomHeadersCtxKey: Key for passing custom HTTP headers.
    const (
      // CustomHeadersCtxKey is used as key in the context.Context object
      // to pass in a custom http headers in a http.Header to be used by the client.
      // Custom header values will overwrite client headers if they have the same key.
      CustomHeadersCtxKey CtxKey = "CustomHeaders"

ReportPage struct

Represents a paginated response of reports.

type ReportPage struct {
	Reports    []*Report // Slice of Report, representing individual report entries.
	NextPageTS uint64 // Timestamp for the next page, used for fetching subsequent pages.

ReportResponse struct

Implements the report envelope that contains the full report payload, its stream ID and timestamps. Use the Decode function to decode the report payload.

type ReportResponse struct {
    FeedID                feed.ID `json:"feedID"`
    FullReport            []byte  `json:"fullReport"`
    ValidFromTimestamp    uint64  `json:"validFromTimestamp"`
    ObservationsTimestamp uint64  `json:"observationsTimestamp"`
  • MarshalJSON: Serializes the ReportResponse into JSON.

    func (r *ReportResponse) MarshalJSON() ([]byte, error)
  • String: Returns the string representation of the ReportResponse.

    func (r *ReportResponse) String() (s string)
  • UnmarshalJSON: Deserializes the ReportResponse from JSON.

    func (r *ReportResponse) UnmarshalJSON(b []byte) (err error)

Stats struct

Statistics related to the Stream's operational metrics.

type Stats struct {
	Accepted              uint64 // Total number of accepted reports
	Deduplicated          uint64 // Total number of deduplicated reports when in HA
	TotalReceived         uint64 // Total number of received reports
	PartialReconnects     uint64 // Total number of partial reconnects when in HA
	FullReconnects        uint64 // Total number of full reconnects
	ConfiguredConnections uint64 // Number of configured connections if in HA
	ActiveConnections     uint64 // Current number of active connections
  • String: Returns a string representation of the Stats.

    func (s Stats) String() (st string)

Stream interface

Interface for managing a real-time data stream.

Interface Methods
  • Read: Reads the next available report from the stream. This method will pause (block) the execution until one of the following occurs:

    • A report is successfully received.
    • The provided context is canceled, typically due to a timeout or a cancel signal.
    • An error state is encountered in any of the underlying connections, such as a network failure.
    Read(context.Context) (*Report, error)
  • Stats: Returns statistics about the stream operations as Stats.

    Stats() Stats
  • Close: Closes the stream.

    Close() error



Creates a new client instance with the specified configuration.

func New(cfg Config) (c Client, err error)

Note: New does not initialize any connections to the Data Streams service.


Utility function for logging.

func LogPrintf(format string, a ...any)



import feed ""


Feed struct

Identifies the report stream ID.

type Feed struct {
    FeedID ID `json:"feedID"`

Where ID is the unique identifier for the stream.

FeedVersion uint16

Represents the stream report schema version.

type FeedVersion uint16

ID [32]byte

Represents a unique identifier for a stream.

type ID [32]byte
  • FromString: Converts a string into an ID.

    func (f *ID) FromString(s string) (err error)
  • MarshalJSON: Serializes the ID into JSON.

    func (f *ID) MarshalJSON() (b []byte, err error)
  • String: Returns the string representation of the ID.

    func (f *ID) String() (id string)
  • UnmarshalJSON: Deserializes the ID from JSON.

    func (f *ID) UnmarshalJSON(b []byte) (err error)
  • Version: Returns the version of the stream.

    func (f *ID) Version() FeedVersion



import report ""


Data interface

Interface that represents the actual report data and attributes.

type Data interface {
    v1.Data | v2.Data | v3.Data | v4.Data
    Schema() abi.Arguments

Report struct

Represents the report envelope that contains the full report payload, its Feed ID, and timestamps.

type Report[T Data] struct {
    Data          T
    ReportContext [3][32]byte
    ReportBlob    []byte
    RawRs         [][32]byte
    RawSs         [][32]byte
    RawVs         [32]byte



Decodes the report serialized bytes and its data.

func Decode[T Data](fullReport []byte) (r *Report[T], err error)


payload, _ := hex.DecodeString(

report, err := report.Decode[v3.Data](payload)
if err != nil {
    streams.LogPrintf("error decoding report: %s", err)

    "FeedID: %s, FeedVersion: %d, Bid: %s, Ask: %s, BenchMark: %s, LinkFee: %s, NativeFee: %s, ValidFromTS: %d, ExpiresAt: %d",

What's next

