Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dispersing blobs to eigenda #19

Merged
merged 5 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/blob-server/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

// Constants
const (
EigenDaRpcUrl = "disperser-goerli.eigenda.xyz:443"
EigenDaRpcUrl = "disperser-holesky.eigenda.xyz:443"
celestiaRpcUrl = "https://celestia-mocha-rpc.publicnode.com:443"
availLightClientRpcUrl = "http://localhost:26658"
)
Expand Down
8 changes: 7 additions & 1 deletion cmd/blob-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"
"sync"

"github.com/Layr-Labs/eigenda/api/clients"
"github.com/joho/godotenv"

"github.com/cenkalti/backoff/v4"
Expand Down Expand Up @@ -47,13 +48,14 @@ func main() {
router := gin.Default()
ctx := context.Background()

envFile, err := godotenv.Read("../../.env") // read from root
envFile, err := godotenv.Read(".env")
if err != nil {
fmt.Println("Error reading .env file")

return
}
authToken := envFile["CELESTIA_AUTH_TOKEN"]
eigenPrivateKey := envFile["EIGENDA_AUTH_PRIVATE_KEY"]

server := NewBlobServer()
// Initialise all DA clients
Expand All @@ -64,6 +66,10 @@ func main() {
authToken,
availLightClientRpcUrl,
celestiaRpcUrl,
clients.EigenDAClientConfig{
RPC: EigenDaRpcUrl,
SignerPrivateKeyHex: eigenPrivateKey,
},
)
if err != nil {
fmt.Printf("failed to build DA clients: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion da/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Client interface {
// Submit submits the Blobs to Data Availability layer.
//
// This method is synchronous. Upon successful submission to Data Availability layer, it returns ID identifying blob
// in DA and Proof of inclusion.
// in DA.
// If options is nil, default options are used.
Submit(ctx context.Context, blob Blob, gasPrice float64) (ID, error)

Expand Down
33 changes: 27 additions & 6 deletions daash.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"fmt"
"log"
"strings"
"time"

"github.com/Layr-Labs/eigenda/api/clients"
"github.com/cenkalti/backoff"
"github.com/stackrlabs/go-daash/avail"
"github.com/stackrlabs/go-daash/celestia"
Expand Down Expand Up @@ -52,6 +52,7 @@ func (d *ClientBuilder) InitClients(
celestiaAuthToken string,
celestiaLightClientUrl string,
celestiaNodeUrl string,
eigenConfig clients.EigenDAClientConfig,
) (*ClientBuilder, error) {
if len(layers) == 0 {
return nil, fmt.Errorf("no da layers provided")
Expand Down Expand Up @@ -88,7 +89,7 @@ func (d *ClientBuilder) InitClients(
d.Clients[Celestia] = celestia

case Eigen:
eigen, err := eigen.NewClient("disperser-goerli.eigenda.xyz:443", time.Second*90, time.Second*5)
eigen, err := eigen.NewClient(eigenConfig)
if err != nil {
return nil, err
}
Expand All @@ -115,7 +116,7 @@ func GetHumanReadableID(id da.ID, daLayer DALayer) any {
}
return availID
case Celestia:
id, ok := id.(celestia.ID)
celestiaID, ok := id.(celestia.ID)
if !ok {
return ""
}
Expand All @@ -124,9 +125,23 @@ func GetHumanReadableID(id da.ID, daLayer DALayer) any {
TxHash string `json:"txHash"`
Commitment da.Commitment `json:"commitment"`
}{
BlockHeight: id.Height,
TxHash: id.TxHash,
Commitment: id.ShareCommitment,
BlockHeight: celestiaID.Height,
TxHash: celestiaID.TxHash,
Commitment: celestiaID.ShareCommitment,
}
case Eigen:
eigenID, ok := id.(eigen.ID)
if !ok {
return ""
}
return struct {
BatchHeaderHash []byte
BlobIndex uint32
RequestID string
}{
BatchHeaderHash: eigenID.BlobInfo.BlobVerificationProof.BatchMetadata.BatchHeaderHash,
BlobIndex: eigenID.BlobInfo.BlobVerificationProof.BlobIndex,
RequestID: eigenID.RequestID,
}
default:
return ""
Expand Down Expand Up @@ -154,6 +169,12 @@ func GetExplorerLink(client da.Client, id da.ID) (string, error) {
extString := strings.Trim(string(extBytes), "\"")
fmt.Println(extString)
return fmt.Sprintf("https://goldberg.avail.tools/#/extrinsics/decode/%s", extString), nil
case *eigen.Client:
eigenID, ok := id.(eigen.ID)
if !ok {
return "", fmt.Errorf("invalid ID")
}
return fmt.Sprintf("https://blobs-holesky.eigenda.xyz/blobs/%s", eigenID.RequestID), nil
default:
return "", nil
}
Expand Down
207 changes: 108 additions & 99 deletions eigen/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,85 +5,62 @@ import (
"encoding/base64"
"encoding/hex"
"fmt"
"log"
"log/slog"
"os"
"time"

"github.com/Layr-Labs/eigenda/api/clients"
grpcdisperser "github.com/Layr-Labs/eigenda/api/grpc/disperser"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/ethereum/go-ethereum/log"
"github.com/stackrlabs/go-daash/da"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

type Client struct {
// DaRpc is the HTTP provider URL for the Data Availability node.
DARpc string

// DisperserClient is the gRPC client for the Disperser service.
disperserClient DisperserClient

// Quorum IDs and SecurityParams to use when dispersing and retrieving blobs
DADisperserSecurityParams []*SecurityParams

// The total amount of time that the batcher will spend waiting for EigenDA to confirm a blob
DAStatusQueryTimeout time.Duration

// The amount of time to wait between status queries of a newly dispersed blob
DAStatusQueryRetryInterval time.Duration
// internalClient is used to interact with the EigenDA API
internalClient clients.EigenDAClient
}

// NewClient returns a new instance of the EigenDA client.
func NewClient(daRpc string, daStatusQueryTimeout time.Duration, daStatusQueryRetryInterval time.Duration) (*Client, error) {
conn, err := grpc.Dial(daRpc, grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")))
func NewClient(config clients.EigenDAClientConfig) (*Client, error) {
logger := log.NewLogger(slog.NewTextHandler(os.Stdout, nil))
client, err := clients.NewEigenDAClient(logger, config)
if err != nil {
fmt.Println("Unable to connect to EigenDA, aborting", "err", err)
return nil, err
return nil, fmt.Errorf("failed to create EigenDA client: %v", err)
}
daClient := NewDisperserClient(conn)

disperserSecurityParams := []*SecurityParams{}
disperserSecurityParams = append(disperserSecurityParams, &SecurityParams{
QuorumId: 0,
AdversaryThreshold: 25,
QuorumThreshold: 50,
})
log.Println("🟢 EigenDA client initalised")

return &Client{
DARpc: daRpc,
disperserClient: daClient,
DADisperserSecurityParams: disperserSecurityParams,
DAStatusQueryTimeout: daStatusQueryTimeout,
DAStatusQueryRetryInterval: daStatusQueryRetryInterval,
internalClient: *client,
}, nil
}

func (e *Client) MaxBlobSize(ctx context.Context) (uint64, error) {
return 512 * 1024, nil // Currently set at 512KB
return 2 * 1024 * 1024, nil // Currently set at 2MB
aashutoshrathi marked this conversation as resolved.
Show resolved Hide resolved
}

func (e *Client) Submit(ctx context.Context, daBlob da.Blob, gasPrice float64) (da.ID, error) {
blobInfo, err := e.disperseBlob(ctx, daBlob)
func (c *Client) Submit(ctx context.Context, daBlob da.Blob, gasPrice float64) (da.ID, error) {
start := time.Now()
blobID, err := c.PutBlob(ctx, daBlob)
if err != nil {
return nil, fmt.Errorf("failed to disperse blob: %v", err)
}
blobID := ID{
BlobIndex: blobInfo.BlobVerificationProof.BlobIndex,
BatchHeaderHash: blobInfo.BlobVerificationProof.BatchMetadata.BatchHeaderHash,
}
end := time.Now()
fmt.Println("Time taken to disperse blob:", end.Sub(start))

return blobID, nil
}

func (e *Client) Get(ctx context.Context, id da.ID) (da.Blob, error) {
blobID, ok := id.(ID)
func (c *Client) Get(ctx context.Context, id da.ID) (da.Blob, error) {
blobID, ok := id.(*ID)
if !ok {
return nil, fmt.Errorf("invalid ID type")
}
resp, err := e.disperserClient.RetrieveBlob(ctx, &RetrieveBlobRequest{
BlobIndex: blobID.BlobIndex,
BatchHeaderHash: blobID.BatchHeaderHash,
})
blob, err := c.internalClient.GetBlob(ctx, blobID.BlobInfo.BlobVerificationProof.BatchMetadata.BatchHeaderHash, blobID.BlobInfo.BlobVerificationProof.BlobIndex)
if err != nil {
return nil, fmt.Errorf("failed to retrieve blob: %v", err)
}
return resp.Data, nil

return blob, nil
}

func (e *Client) Commit(ctx context.Context, daBlob da.Blob) (da.Commitment, error) {
Expand All @@ -98,66 +75,98 @@ func (e *Client) GetProof(ctx context.Context, id da.ID) (da.Proof, error) {
return nil, nil
}

type ID struct {
BlobIndex uint32
BatchHeaderHash []byte
// PutBlob encodes and writes a blob to EigenDA, waiting for it to be confirmed
// before returning. This function is resiliant to transient failures and
// timeouts.
func (c *Client) PutBlob(ctx context.Context, data []byte) (ID, error) {
resultChan, errorChan := c.PutBlobAsync(ctx, data)
select { // no timeout here because we depend on the configured timeout in PutBlobAsync
case result := <-resultChan:
return result, nil
case err := <-errorChan:
return ID{}, err
}
}

func (e *Client) disperseBlob(ctx context.Context, txData []byte) (*BlobInfo, error) {
fmt.Println("Attempting to disperse blob to EigenDA")
func (c *Client) PutBlobAsync(ctx context.Context, data []byte) (resultChan chan ID, errChan chan error) {
resultChan = make(chan ID, 1)
errChan = make(chan error, 1)
go c.putBlob(ctx, data, resultChan, errChan)
return
}

disperseReq := &DisperseBlobRequest{
Data: txData,
SecurityParams: e.DADisperserSecurityParams,
func (c *Client) putBlob(ctx context.Context, rawData []byte, resultChan chan ID, errChan chan error) {
// encode blob
if c.internalClient.Codec == nil {
errChan <- fmt.Errorf("codec cannot be nil")
return
}
daClient := e.disperserClient
disperseRes, err := daClient.DisperseBlob(ctx, disperseReq)
fmt.Println("DisperseBlob response", "disperseRes", disperseRes, "err", err)

data, err := c.internalClient.Codec.EncodeBlob(rawData)
if err != nil {
fmt.Printf("Unable to disperse blob to EigenDA, aborting", "err", err)
return nil, err
errChan <- fmt.Errorf("error encoding blob: %w", err)
return
}

if disperseRes.Result == BlobStatus_UNKNOWN ||
disperseRes.Result == BlobStatus_FAILED {
fmt.Printf("Unable to disperse blob to EigenDA, aborting", "err", err)
return nil, fmt.Errorf("reply status is %d", disperseRes.Result)
customQuorumNumbers := make([]uint8, len(c.internalClient.Config.CustomQuorumIDs))
0xRampey marked this conversation as resolved.
Show resolved Hide resolved
for i, id := range c.internalClient.Config.CustomQuorumIDs {
customQuorumNumbers[i] = uint8(id)
}
// disperse blob
blobStatus, requestID, err := c.internalClient.Client.DisperseBlobAuthenticated(ctx, data, customQuorumNumbers)
if err != nil {
errChan <- fmt.Errorf("error initializing DisperseBlobAuthenticated() client: %w", err)
return
}

base64RequestID := base64.StdEncoding.EncodeToString(disperseRes.RequestId)

fmt.Println("Blob disepersed to EigenDA, now waiting for confirmation", "requestID", base64RequestID)

var statusRes *BlobStatusReply
timeoutTime := time.Now().Add(e.DAStatusQueryTimeout)
// Wait before first status check
time.Sleep(e.DAStatusQueryRetryInterval)
for time.Now().Before(timeoutTime) {
statusRes, err = daClient.GetBlobStatus(ctx, &BlobStatusRequest{
RequestId: disperseRes.RequestId,
})
if err != nil {
fmt.Printf("Unable to retrieve blob dispersal status, will retry", "requestID", base64RequestID, "err", err)
} else if statusRes.Status == BlobStatus_CONFIRMED {
// TODO(eigenlayer): As long as fault proofs are disabled, we can move on once a blob is confirmed
// but not yet finalized, without further logic. Once fault proofs are enabled, we will need to update
// the proposer to wait until the blob associated with an L2 block has been finalized, i.e. the EigenDA
// contracts on Ethereum have confirmed the full availability of the blob on EigenDA.
batchHeaderHashHex := fmt.Sprintf("0x%s", hex.EncodeToString(statusRes.Info.BlobVerificationProof.BatchMetadata.BatchHeaderHash))
fmt.Println("Successfully dispersed blob to EigenDA", "requestID", base64RequestID, "batchHeaderHash", batchHeaderHashHex)
return statusRes.Info, nil
} else if statusRes.Status == BlobStatus_UNKNOWN ||
statusRes.Status == BlobStatus_FAILED {
fmt.Println("EigenDA blob dispersal failed in processing", "requestID", base64RequestID, "err", err)
return nil, fmt.Errorf("eigenDA blob dispersal failed in processing with reply status %d", statusRes.Status)
} else {
fmt.Println("Still waiting for confirmation from EigenDA", "requestID", base64RequestID)
}

// Wait before first status check
time.Sleep(e.DAStatusQueryRetryInterval)
// process response
if *blobStatus == disperser.Failed {
errChan <- fmt.Errorf("reply status is %d", blobStatus)
return
}

return nil, fmt.Errorf("timed out getting EigenDA status for dispersed blob key: %s", base64RequestID)
base64RequestID := base64.StdEncoding.EncodeToString(requestID)
fmt.Println("Blob dispersed to EigenDA, now waiting for confirmation", "requestID", base64RequestID)

ticker := time.NewTicker(c.internalClient.Config.StatusQueryRetryInterval)
defer ticker.Stop()

var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, c.internalClient.Config.StatusQueryTimeout)
defer cancel()

for {
select {
case <-ctx.Done():
errChan <- fmt.Errorf("timed out waiting for EigenDA blob to confirm blob with request id=%s: %w", base64RequestID, ctx.Err())
return
case <-ticker.C:
statusRes, err := c.internalClient.Client.GetBlobStatus(ctx, requestID)
if err != nil {
c.internalClient.Log.Error("Unable to retrieve blob dispersal status, will retry", "requestID", base64RequestID, "err", err)
continue
}

switch statusRes.Status {
case grpcdisperser.BlobStatus_PROCESSING, grpcdisperser.BlobStatus_DISPERSING:
fmt.Println("Blob submitted, waiting for dispersal from EigenDA", "requestID", base64RequestID)
case grpcdisperser.BlobStatus_FAILED:
errChan <- fmt.Errorf("EigenDA blob dispersal failed in processing, requestID=%s: %w", base64RequestID, err)
return
case grpcdisperser.BlobStatus_INSUFFICIENT_SIGNATURES:
errChan <- fmt.Errorf("EigenDA blob dispersal failed in processing with insufficient signatures, requestID=%s: %w", base64RequestID, err)
return
case grpcdisperser.BlobStatus_CONFIRMED:
fmt.Println("EigenDA blob confirmed, waiting for finalization", "requestID", base64RequestID)
resultChan <- ID{BlobInfo: statusRes.Info, RequestID: string(requestID)}
case grpcdisperser.BlobStatus_FINALIZED:
batchHeaderHashHex := fmt.Sprintf("0x%s", hex.EncodeToString(statusRes.Info.BlobVerificationProof.BatchMetadata.BatchHeaderHash))
fmt.Println("Successfully dispersed blob to EigenDA", "requestID", base64RequestID, "batchHeaderHash", batchHeaderHashHex)
return
default:
errChan <- fmt.Errorf("EigenDA blob dispersal failed in processing with reply status %d", statusRes.Status)
return
}
}
}
}
Loading