diff --git a/api/datasets.go b/api/datasets.go index cd16d15..cb8301e 100644 --- a/api/datasets.go +++ b/api/datasets.go @@ -6,7 +6,6 @@ import ( "github.com/application-research/delta-dm/core" "github.com/application-research/delta-dm/util" "github.com/labstack/echo/v4" - "gorm.io/gorm" ) func ConfigureDatasetsRouter(e *echo.Group, dldm *core.DeltaDM) { @@ -17,9 +16,7 @@ func ConfigureDatasetsRouter(e *echo.Group, dldm *core.DeltaDM) { datasets.GET("", func(c echo.Context) error { var ds []core.Dataset - dldm.DB.Preload("Wallets").Preload("AllowedProviders", func(db *gorm.DB) *gorm.DB { - return db.Select("actor_id") - }).Find(&ds) + dldm.DB.Preload("Wallets").Preload("ReplicationProfiles").Find(&ds) // Find # of bytes total and replicated for each dataset for i, d := range ds { diff --git a/api/providers.go b/api/providers.go index c1a993c..587346c 100644 --- a/api/providers.go +++ b/api/providers.go @@ -11,9 +11,8 @@ import ( ) type ProviderPutBody struct { - ActorName string `json:"actor_name"` - AllowSelfService string `json:"allow_self_service"` - AllowedDatasets []string `json:"allowed_datasets"` + ActorName string `json:"actor_name"` + AllowSelfService string `json:"allow_self_service"` } func ConfigureProvidersRouter(e *echo.Group, dldm *core.DeltaDM) { @@ -24,7 +23,7 @@ func ConfigureProvidersRouter(e *echo.Group, dldm *core.DeltaDM) { providers.GET("", func(c echo.Context) error { var p []core.Provider - dldm.DB.Preload("AllowedDatasets").Find(&p) + dldm.DB.Preload("ReplicationProfiles").Find(&p) for i, sp := range p { var rb [2]uint64 @@ -93,22 +92,6 @@ func ConfigureProvidersRouter(e *echo.Group, dldm *core.DeltaDM) { existing.AllowSelfService = false } - // If array of allowed datasets is empty, don't modify the association - if len(p.AllowedDatasets) > 0 { - var newAllowedDatasets []core.Dataset - for _, ds_name := range p.AllowedDatasets { - var ds core.Dataset - res := dldm.DB.Model(&core.Dataset{}).Where("name = ?", ds_name).First(&ds) - if res.Error != nil { - return fmt.Errorf("error fetching dataset %s : %s", ds_name, res.Error) - } else { - newAllowedDatasets = append(newAllowedDatasets, ds) - } - } - - dldm.DB.Model(&existing).Association("AllowedDatasets").Replace(newAllowedDatasets) - } - res = dldm.DB.Save(&existing) if res.Error != nil { return fmt.Errorf("error saving provider %s", res.Error) diff --git a/api/replication-profiles.go b/api/replication-profiles.go new file mode 100644 index 0000000..b62a092 --- /dev/null +++ b/api/replication-profiles.go @@ -0,0 +1,122 @@ +package api + +import ( + "fmt" + "net/http" + + "github.com/application-research/delta-dm/core" + "github.com/labstack/echo/v4" +) + +type ReplicationProfile struct { + DatasetName string `json:"dataset_name"` + Unsealed bool `json:"unsealed"` + Indexed bool `json:"indexed"` +} + +func ConfigureReplicationProfilesRouter(e *echo.Group, dldm *core.DeltaDM) { + replicationProfiles := e.Group("/replication-profiles") + + replicationProfiles.Use(dldm.AS.AuthMiddleware) + + replicationProfiles.GET("", func(c echo.Context) error { + var p []core.ReplicationProfile + + res := dldm.DB.Model(&core.ReplicationProfile{}).Find(&p) + if res.Error != nil { + return fmt.Errorf("error finding replication profiles: %s", res.Error) + } + + return c.JSON(http.StatusOK, p) + }) + + replicationProfiles.POST("", func(c echo.Context) error { + var p core.ReplicationProfile + + if err := c.Bind(&p); err != nil { + return fmt.Errorf("failed to parse request body: %s", err.Error()) + } + + // Check if the dataset and provider exist + var ds core.Dataset + var provider core.Provider + + dsRes := dldm.DB.Where("id = ?", p.DatasetID).First(&ds) + providerRes := dldm.DB.Where("actor_id = ?", p.ProviderActorID).First(&provider) + + if dsRes.Error != nil || ds.ID == 0 { + return fmt.Errorf("invalid dataset: %s", dsRes.Error) + } + + if providerRes.Error != nil || provider.ActorID == "" { + return fmt.Errorf("invalid provider: %s", providerRes.Error) + } + + // Save the replication profile + res := dldm.DB.Create(&p) + if res.Error != nil { + if res.Error.Error() == "UNIQUE constraint failed: replication_profiles.provider_actor_id, replication_profiles.dataset_id" { + return fmt.Errorf("replication profile for provider %s and datasetID %d already exists", p.ProviderActorID, p.DatasetID) + } + return fmt.Errorf("failed to save replication profile: %s", res.Error.Error()) + } + + return c.JSON(http.StatusOK, p) + }) + + replicationProfiles.DELETE("", func(c echo.Context) error { + var p core.ReplicationProfile + if err := c.Bind(&p); err != nil { + return fmt.Errorf("failed to parse request body: %s", err.Error()) + } + + if p.DatasetID == 0 || p.ProviderActorID == "" { + return fmt.Errorf("invalid replication profile ID") + } + + var existingProfile core.ReplicationProfile + res := dldm.DB.Where("provider_actor_id = ? AND dataset_id = ?", p.ProviderActorID, p.DatasetID).First(&existingProfile) + + if res.Error != nil { + return fmt.Errorf("replication profile not found: %s", res.Error) + } + + deleteRes := dldm.DB.Delete(&existingProfile) + if deleteRes.Error != nil { + return fmt.Errorf("failed to delete replication profile: %s", deleteRes.Error.Error()) + } + + return c.JSON(http.StatusOK, fmt.Sprintf("replication profile with ProviderActorID %s and DatasetID %d deleted successfully", p.ProviderActorID, p.DatasetID)) + }) + + replicationProfiles.PUT("", func(c echo.Context) error { + var updatedProfile core.ReplicationProfile + if err := c.Bind(&updatedProfile); err != nil { + return fmt.Errorf("failed to parse request body: %s", err.Error()) + } + + if updatedProfile.DatasetID == 0 || updatedProfile.ProviderActorID == "" { + return fmt.Errorf("invalid replication profile ID") + } + + var existingProfile core.ReplicationProfile + res := dldm.DB.Where("provider_actor_id = ? AND dataset_id = ?", updatedProfile.ProviderActorID, updatedProfile.DatasetID).First(&existingProfile) + + if res.Error != nil { + return fmt.Errorf("replication profile not found: %s", res.Error) + } + + updateData := map[string]interface{}{ + "unsealed": updatedProfile.Unsealed, + "indexed": updatedProfile.Indexed, + } + + updateRes := dldm.DB.Model(&existingProfile).Updates(updateData) + if updateRes.Error != nil { + return fmt.Errorf("failed to update replication profile: %s", updateRes.Error.Error()) + } + + return c.JSON(http.StatusOK, updatedProfile) + }) + +} diff --git a/api/replications.go b/api/replications.go index bbe2a27..e8d2b1f 100644 --- a/api/replications.go +++ b/api/replications.go @@ -256,8 +256,8 @@ func handlePostReplications(c echo.Context, dldm *core.DeltaDM) error { ConnectionMode: "import", Miner: d.Provider, Size: c.Size, - SkipIpniAnnounce: !c.Indexed, - RemoveUnsealedCopy: !c.Unsealed, + SkipIpniAnnounce: !c.ReplicationProfile.Indexed, + RemoveUnsealedCopy: !c.ReplicationProfile.Unsealed, DurationInDays: c.DealDuration, StartEpochInDays: delayStartEpoch, PieceCommitment: core.PieceCommitment{ @@ -278,6 +278,7 @@ func handlePostReplications(c echo.Context, dldm *core.DeltaDM) error { type replicatedContentQueryResponse struct { core.Content core.Dataset + core.ReplicationProfile } // Query the database for all contant that does not have replications to this actor yet @@ -290,6 +291,7 @@ func findUnreplicatedContentForProvider(db *gorm.DB, providerID string, datasetN SELECT * FROM datasets d INNER JOIN contents c ON d.name = c.dataset_name + INNER JOIN replication_profiles rp ON rp.dataset_id = d.id -- Only select content that does not have a non-failed replication to this provider WHERE c.comm_p NOT IN ( SELECT r.content_comm_p @@ -301,12 +303,8 @@ func findUnreplicatedContentForProvider(db *gorm.DB, providerID string, datasetN WHERE p.actor_id <> ? ) ) - -- Only select content from datasets that this provider is allowed to replicate - AND d.id IN ( - SELECT dataset_id - FROM provider_allowed_datasets - WHERE provider_actor_id = ? - ) + -- Only select content from datasets that this provider is allowed to replicate + AND rp.provider_actor_id = ? AND c.num_replications < d.replication_quota ` var rawValues = []interface{}{providerID, providerID} diff --git a/api/router.go b/api/router.go index 2a692aa..e2819ed 100644 --- a/api/router.go +++ b/api/router.go @@ -64,6 +64,7 @@ func InitializeEchoRouterConfig(dldm *core.DeltaDM) { ConfigureWalletsRouter(apiGroup, dldm) ConfigureHealthRouter(apiGroup, dldm) ConfigureSelfServiceRouter(apiGroup, dldm) + ConfigureReplicationProfilesRouter(apiGroup, dldm) // Start server e.Logger.Fatal(e.Start("0.0.0.0:1314")) // configuration } diff --git a/api/selfservice.go b/api/selfservice.go index 6e5c58b..84a8b55 100644 --- a/api/selfservice.go +++ b/api/selfservice.go @@ -37,7 +37,7 @@ func selfServiceTokenMiddleware(dldm *core.DeltaDM) echo.MiddlewareFunc { return c.String(401, "missing provider self-service token") } var p core.Provider - res := dldm.DB.Model(&core.Provider{}).Preload("AllowedDatasets").Where("key = ?", providerToken).Find(&p) + res := dldm.DB.Model(&core.Provider{}).Preload("ReplicationProfiles").Where("key = ?", providerToken).Find(&p) if res.Error != nil { log.Errorf("error finding provider: %s", res.Error) @@ -93,10 +93,12 @@ func handleSelfServiceByCid(c echo.Context, dldm *core.DeltaDM) error { return fmt.Errorf("unable to find associated dataset %s", cnt.DatasetName) } + var rp core.ReplicationProfile isAllowed := false - for _, allowedDs := range p.AllowedDatasets { - if allowedDs.Name == ds.Name { + for _, thisRp := range p.ReplicationProfiles { + if thisRp.DatasetID == ds.ID { isAllowed = true + rp = thisRp break } } @@ -133,8 +135,8 @@ func handleSelfServiceByCid(c echo.Context, dldm *core.DeltaDM) error { ConnectionMode: "import", Miner: p.ActorID, Size: cnt.Size, - SkipIpniAnnounce: !ds.Indexed, - RemoveUnsealedCopy: !ds.Unsealed, + SkipIpniAnnounce: !rp.Indexed, + RemoveUnsealedCopy: !rp.Unsealed, DurationInDays: ds.DealDuration, StartEpochInDays: delayDays, PieceCommitment: core.PieceCommitment{ @@ -159,6 +161,12 @@ func handleSelfServiceByDataset(c echo.Context, dldm *core.DeltaDM) error { return fmt.Errorf("must provide a dataset name") } + var ds core.Dataset + dsRes := dldm.DB.Where("name = ?", dataset).First(&ds) + if dsRes.Error != nil || ds.ID == 0 { + return fmt.Errorf("invalid dataset: %s", dsRes.Error) + } + var delayDays uint64 = 3 if startEpochDelay != "" { var err error @@ -175,8 +183,8 @@ func handleSelfServiceByDataset(c echo.Context, dldm *core.DeltaDM) error { p := c.Get(PROVIDER).(core.Provider) isAllowed := false - for _, ds := range p.AllowedDatasets { - if ds.Name == dataset { + for _, rp := range p.ReplicationProfiles { + if rp.DatasetID == ds.ID { isAllowed = true break } @@ -215,8 +223,8 @@ func handleSelfServiceByDataset(c echo.Context, dldm *core.DeltaDM) error { ConnectionMode: "import", Miner: p.ActorID, Size: deal.Size, - SkipIpniAnnounce: !deal.Indexed, - RemoveUnsealedCopy: !deal.Unsealed, + SkipIpniAnnounce: !deal.ReplicationProfile.Indexed, + RemoveUnsealedCopy: !deal.ReplicationProfile.Unsealed, DurationInDays: deal.DealDuration - delayDays, StartEpochInDays: delayDays, PieceCommitment: core.PieceCommitment{ diff --git a/cmd/daemon.go b/cmd/daemon.go index 9a73cdf..e206e6d 100644 --- a/cmd/daemon.go +++ b/cmd/daemon.go @@ -18,7 +18,6 @@ func DaemonCmd(di core.DeploymentInfo) []*cli.Command { var deltaAuthToken string var authServer string - // add a command to run API node var daemonCommands []*cli.Command daemonCmd := &cli.Command{ Name: "daemon", diff --git a/cmd/dataset.go b/cmd/dataset.go index 47d8d7c..942b61f 100644 --- a/cmd/dataset.go +++ b/cmd/dataset.go @@ -14,10 +14,7 @@ func DatasetCmd() []*cli.Command { var datasetName string var replicationQuota uint64 var dealDuration uint64 - var keepUnsealedCopy bool - var announceToIndexer bool - // add a command to run API node var datasetCmds []*cli.Command datasetCmd := &cli.Command{ Name: "dataset", @@ -50,22 +47,6 @@ func DatasetCmd() []*cli.Command { Value: 540, Destination: &dealDuration, }, - &cli.BoolFlag{ - Name: "unsealed", - Aliases: []string{"u"}, - Usage: "flag to keep unsealed copy when deals are made", - DefaultText: "true", - Value: true, - Destination: &keepUnsealedCopy, - }, - &cli.BoolFlag{ - Name: "indexed", - Aliases: []string{"i"}, - Usage: "flag to announce to IPNI (Interplanetary Network Indexer)", - DefaultText: "true", - Value: true, - Destination: &announceToIndexer, - }, }, Action: func(c *cli.Context) error { cmd, err := NewCmdProcessor(c) @@ -81,8 +62,6 @@ func DatasetCmd() []*cli.Command { Name: datasetName, ReplicationQuota: replicationQuota, DealDuration: dealDuration, - Indexed: announceToIndexer, - Unsealed: keepUnsealedCopy, } b, err := json.Marshal(body) diff --git a/cmd/provider.go b/cmd/provider.go index 4ed69d6..88ff74d 100644 --- a/cmd/provider.go +++ b/cmd/provider.go @@ -14,9 +14,7 @@ func ProviderCmd() []*cli.Command { var spId string var spName string var allowSelfService string - var allowedDatasets cli.StringSlice - // add a command to run API node var providerCmds []*cli.Command providerCmd := &cli.Command{ Name: "provider", @@ -90,11 +88,6 @@ func ProviderCmd() []*cli.Command { Usage: "enable self-service for provider (on|off)", Destination: &allowSelfService, }, - &cli.StringSliceFlag{ - Name: "allowed-datasets", - Usage: "datasets the provider is permitted to replicate (comma separated list)", - Destination: &allowedDatasets, - }, }, Action: func(c *cli.Context) error { cmd, err := NewCmdProcessor(c) @@ -111,7 +104,6 @@ func ProviderCmd() []*cli.Command { body := api.ProviderPutBody{ ActorName: spName, AllowSelfService: allowSelfService, - AllowedDatasets: allowedDatasets.Value(), } b, err := json.Marshal(body) diff --git a/cmd/replication-profiles.go b/cmd/replication-profiles.go new file mode 100644 index 0000000..0c01aad --- /dev/null +++ b/cmd/replication-profiles.go @@ -0,0 +1,209 @@ +package cmd + +import ( + "encoding/json" + "fmt" + "net/http" + + "github.com/application-research/delta-dm/core" + "github.com/urfave/cli/v2" +) + +func ReplicationProfilesCmd() []*cli.Command { + var spId string + var datasetId uint + var unsealed bool + var indexed bool + + var providerCmds []*cli.Command + providerCmd := &cli.Command{ + Name: "replication-profile", + Aliases: []string{"rp"}, + Usage: "Replication Profile Commands", + Subcommands: []*cli.Command{ + { + Name: "add", + Usage: "add replication profile", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "spid", + Usage: "storage provider id", + Destination: &spId, + Required: true, + }, + &cli.UintFlag{ + Name: "dataset", + Usage: "dataset id", + Aliases: []string{"dsid"}, + Destination: &datasetId, + Required: true, + }, + &cli.BoolFlag{ + Name: "indexed", + Usage: "announce deals to indexer", + Destination: &indexed, + }, + &cli.BoolFlag{ + Name: "unsealed", + Usage: "keep unsealed copy", + Destination: &unsealed, + }, + }, + Action: func(c *cli.Context) error { + cmd, err := NewCmdProcessor(c) + if err != nil { + return err + } + + body := core.ReplicationProfile{ + ProviderActorID: spId, + DatasetID: datasetId, + Unsealed: unsealed, + Indexed: indexed, + } + + b, err := json.Marshal(body) + if err != nil { + return fmt.Errorf("unable to construct request body %s", err) + } + + res, closer, err := cmd.MakeRequest(http.MethodPost, "/api/v1/replication-profiles/", b) + if err != nil { + return fmt.Errorf("unable to make request %s", err) + } + defer closer() + + fmt.Printf("%s", string(res)) + + return nil + }, + }, + { + Name: "modify", + Usage: "modify replication profile", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "spid", + Usage: "storage provider id", + Destination: &spId, + Required: true, + }, + &cli.UintFlag{ + Name: "dataset", + Usage: "dataset id", + Aliases: []string{"dsid"}, + Destination: &datasetId, + Required: true, + }, + &cli.BoolFlag{ + Name: "indexed", + Usage: "announce deals to indexer", + Destination: &indexed, + }, + &cli.BoolFlag{ + Name: "unsealed", + Usage: "keep unsealed copy", + Destination: &unsealed, + }, + }, + Action: func(c *cli.Context) error { + cmd, err := NewCmdProcessor(c) + if err != nil { + return err + } + + body := core.ReplicationProfile{ + ProviderActorID: spId, + DatasetID: datasetId, + Unsealed: unsealed, + Indexed: indexed, + } + + b, err := json.Marshal(body) + if err != nil { + return fmt.Errorf("unable to construct request body %s", err) + } + + res, closer, err := cmd.MakeRequest(http.MethodPut, "/api/v1/replication-profiles/", b) + if err != nil { + return fmt.Errorf("unable to make request %s", err) + } + defer closer() + + fmt.Printf("%s", string(res)) + + return nil + }, + }, + { + Name: "delete", + Usage: "delete replication profile", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "spid", + Usage: "storage provider id", + Destination: &spId, + Required: true, + }, + &cli.UintFlag{ + Name: "dataset", + Usage: "dataset id", + Aliases: []string{"dsid"}, + Destination: &datasetId, + Required: true, + }, + }, + Action: func(c *cli.Context) error { + cmd, err := NewCmdProcessor(c) + if err != nil { + return err + } + + body := core.ReplicationProfile{ + ProviderActorID: spId, + DatasetID: datasetId, + } + + b, err := json.Marshal(body) + if err != nil { + return fmt.Errorf("unable to construct request body %s", err) + } + + res, closer, err := cmd.MakeRequest(http.MethodDelete, "/api/v1/replication-profiles/", b) + if err != nil { + return fmt.Errorf("unable to make request %s", err) + } + defer closer() + + fmt.Printf("%s", string(res)) + + return nil + }, + }, + { + Name: "list", + Usage: "list replication profiles", + Action: func(c *cli.Context) error { + cmd, err := NewCmdProcessor(c) + if err != nil { + return err + } + + res, closer, err := cmd.MakeRequest(http.MethodGet, "/api/v1/replication-profiles/", nil) + if err != nil { + return fmt.Errorf("unable to make request %s", err) + } + defer closer() + + fmt.Printf("%s", string(res)) + + return nil + }, + }, + }, + } + + providerCmds = append(providerCmds, providerCmd) + + return providerCmds +} diff --git a/cmd/replication.go b/cmd/replication.go index e678550..9de5de5 100644 --- a/cmd/replication.go +++ b/cmd/replication.go @@ -15,7 +15,6 @@ func ReplicationCmd() []*cli.Command { var dataset string var delayStartDays uint64 - // add a command to run API node var replicationCmds []*cli.Command replicationCmd := &cli.Command{ Name: "replication", diff --git a/cmd/wallet.go b/cmd/wallet.go index 52d6220..96bf0a6 100644 --- a/cmd/wallet.go +++ b/cmd/wallet.go @@ -15,7 +15,6 @@ func WalletCmd() []*cli.Command { var dataset string var walletAddress string - // add a command to run API node var walletCmds []*cli.Command walletCmd := &cli.Command{ Name: "wallet", diff --git a/core/database.go b/core/database.go index 96a9a59..0cac8da 100644 --- a/core/database.go +++ b/core/database.go @@ -39,6 +39,10 @@ func OpenDatabase(dbDsn string, debug bool) (*gorm.DB, error) { // generate new models. ConfigureModels(DB) // create models. + if debug { + log.Debugf("connected to db at: %s", dbDsn) + } + if err != nil { return nil, err } @@ -46,7 +50,7 @@ func OpenDatabase(dbDsn string, debug bool) (*gorm.DB, error) { } func ConfigureModels(db *gorm.DB) { - err := db.AutoMigrate(&Replication{}, &Provider{}, &Dataset{}, &Content{}, &Wallet{}, &ProviderAllowedDatasets{}, &WalletDatasets{}) + err := db.AutoMigrate(&Replication{}, &Provider{}, &Dataset{}, &Content{}, &Wallet{}, &ReplicationProfile{}, &WalletDatasets{}) if err != nil { log.Fatalf("error migrating database: %s", err) @@ -77,21 +81,21 @@ type Replication struct { // A client is a Storage Provider that is being replicated to type Provider struct { - Key uuid.UUID `json:"key,omitempty" gorm:"type:uuid"` - ActorID string `json:"actor_id" gorm:"primaryKey"` - ActorName string `json:"actor_name,omitempty"` - AllowSelfService bool `json:"allow_self_service,omitempty" gorm:"notnull,default:true"` - BytesReplicated ByteSizes `json:"bytes_replicated,omitempty" gorm:"-"` - CountReplicated uint64 `json:"count_replicated,omitempty" gorm:"-"` - Replications []Replication `json:"replications,omitempty" gorm:"foreignKey:ProviderActorID"` - AllowedDatasets []Dataset `json:"allowed_datasets" gorm:"many2many:provider_allowed_datasets;"` + Key uuid.UUID `json:"key,omitempty" gorm:"type:uuid"` + ActorID string `json:"actor_id" gorm:"primaryKey"` + ActorName string `json:"actor_name,omitempty"` + AllowSelfService bool `json:"allow_self_service,omitempty" gorm:"notnull,default:true"` + BytesReplicated ByteSizes `json:"bytes_replicated,omitempty" gorm:"-"` + CountReplicated uint64 `json:"count_replicated,omitempty" gorm:"-"` + Replications []Replication `json:"replications,omitempty" gorm:"foreignKey:ProviderActorID"` + ReplicationProfiles []ReplicationProfile `json:"replication_profiles" gorm:"foreignKey:ProviderActorID"` } -type ProviderAllowedDatasets struct { - ProviderActorID string `gorm:"primaryKey" json:"provider_actor_id"` - DatasetID uint `gorm:"primaryKey" json:"dataset_id"` - CreatedAt time.Time - DeletedAt gorm.DeletedAt +type ReplicationProfile struct { + ProviderActorID string `gorm:"primaryKey;uniqueIndex:idx_provider_dataset" json:"provider_actor_id"` + DatasetID uint `gorm:"primaryKey;uniqueIndex:idx_provider_dataset" json:"dataset_id"` + Unsealed bool `json:"unsealed"` + Indexed bool `json:"indexed"` } type ByteSizes struct { @@ -102,18 +106,16 @@ type ByteSizes struct { // A Dataset is a collection of CAR files, and is identified by a name/slug type Dataset struct { gorm.Model - Name string `json:"name" gorm:"unique; not null"` - ReplicationQuota uint64 `json:"replication_quota"` - DealDuration uint64 `json:"deal_duration"` - Wallets []Wallet `json:"wallets,omitempty" gorm:"many2many:wallet_datasets;"` - Unsealed bool `json:"unsealed"` - Indexed bool `json:"indexed"` - Contents []Content `json:"contents" gorm:"foreignKey:DatasetName;references:Name"` - BytesReplicated ByteSizes `json:"bytes_replicated,omitempty" gorm:"-"` - BytesTotal ByteSizes `json:"bytes_total,omitempty" gorm:"-"` - CountReplicated uint64 `json:"count_replicated" gorm:"-"` - CountTotal uint64 `json:"count_total" gorm:"-"` - AllowedProviders []Provider `json:"allowed_providers" gorm:"many2many:provider_allowed_datasets;"` + Name string `json:"name" gorm:"unique; not null"` + ReplicationQuota uint64 `json:"replication_quota"` + DealDuration uint64 `json:"deal_duration"` + Wallets []Wallet `json:"wallets,omitempty" gorm:"many2many:wallet_datasets;"` + Contents []Content `json:"contents" gorm:"foreignKey:DatasetName;references:Name"` + BytesReplicated ByteSizes `json:"bytes_replicated,omitempty" gorm:"-"` + BytesTotal ByteSizes `json:"bytes_total,omitempty" gorm:"-"` + CountReplicated uint64 `json:"count_replicated" gorm:"-"` + CountTotal uint64 `json:"count_total" gorm:"-"` + ReplicationProfiles []ReplicationProfile `json:"replication_profiles" gorm:"foreignKey:dataset_id"` } type Content struct { diff --git a/docs/api.md b/docs/api.md index ddcff1f..7e0debd 100644 --- a/docs/api.md +++ b/docs/api.md @@ -52,8 +52,6 @@ All endpoints (with the exception of `/self-service`) require the `Authorization "name": "delta-test", "replication_quota": 6, "deal_duration": 540, - "unsealed": false, - "indexed": true } ``` @@ -87,8 +85,6 @@ All endpoints (with the exception of `/self-service`) require the `Authorization "type": "secp256k1" } ], - "unsealed": false, - "indexed": true, "contents": null, "count_replicated": 21, // # of successful replications/storage deals "count_total": 210, // total # of contents for this dataset @@ -99,6 +95,14 @@ All endpoints (with the exception of `/self-service`) require the `Authorization "bytes_total": [ 1801001922192, // Raw bytes (the content itself) 3435973836800 // Padded bytes (i.e, filecoin piece) + ], + "replication_profiles": [ + { + "provider_actor_id": "f012345", + "dataset_id": 1, + "unsealed": false, + "indexed": false + } ] }, { @@ -112,8 +116,6 @@ All endpoints (with the exception of `/self-service`) require the `Authorization "dataset_name": "delta-test-2", "type": "secp256k1" }, - "unsealed": false, - "indexed": true, "contents": null, "count_replicated": 14, // # of successful replications/storage deals "count_total": 440, // total # of contents for this dataset @@ -125,6 +127,14 @@ All endpoints (with the exception of `/self-service`) require the `Authorization 1801001922192, // Raw bytes (the content itself) 3435973836800 // Padded bytes (i.e, filecoin piece) ] + "replication_profiles": [ + { + "provider_actor_id": "f012345", + "dataset_id": 2, + "unsealed": false, + "indexed": false + } + ] } ] @@ -271,7 +281,6 @@ baga6ea4seaqhf2ymr6ahkxe3i2txmnqbmltzyf65nwcdvq2hvwmcx4eu4wzl4fi,bafybeif2bu5bdq { actor_name: "Friendly name" // optional - friendly sp name allow_self_service: "on" // allow self-service replications ("on" or "off") - allowed_datasets: ["delta-test", "delta-test-2"] // list of datasets allowed to be replicated by this SP } ``` @@ -292,34 +301,25 @@ baga6ea4seaqhf2ymr6ahkxe3i2txmnqbmltzyf65nwcdvq2hvwmcx4eu4wzl4fi,bafybeif2bu5bdq { "key": "b3cc8a99-155a-4fff-8974-999ec313e5cc", "actor_id": "f0123456", - "actor_name": "jason", + "actor_name": "friendly sp", "allow_self_service": false, "bytes_replicated": { "raw": 234130249877, "padded": 446676598784 }, "count_replicated": 12, - "allowed_datasets": [ + "replication_profiles": [ { - "ID": 1, - "CreatedAt": "2023-02-28T13:50:15.591038-08:00", - "UpdatedAt": "2023-02-28T13:50:23.928193-08:00", - "DeletedAt": null, - "name": "delta-test", - "replication_quota": 6, - "deal_duration": 540, + "provider_actor_id": "f0123456", + "dataset_id": 1, "unsealed": false, - "indexed": true, - "contents": null, - "bytes_replicated": { - "raw": 0, - "padded": 0 - }, - "bytes_total": { - "raw": 0, - "padded": 0 - }, - "allowed_providers": null + "indexed": false + }, + { + "provider_actor_id": "f0123456", + "dataset_id": 2, + "unsealed": false, + "indexed": false } ] }, @@ -328,7 +328,7 @@ baga6ea4seaqhf2ymr6ahkxe3i2txmnqbmltzyf65nwcdvq2hvwmcx4eu4wzl4fi,bafybeif2bu5bdq "actor_id": "f0998272", "actor_name": "test sp", "allow_self_service": true, - "allowed_datasets": [], + "replication_profiles": [], "bytes_replicated": { "raw": 0, "padded": 0 @@ -624,4 +624,84 @@ For more details, see the [Self-Service API](/docs/self-service.md) documentatio > 200: Success ```sh "successfully made deal with f0123456" -``` \ No newline at end of file +``` + +## /replication-profiles + +### GET /replication-profiles +- Get all replication profiles + +#### Params + + +#### Body + + +#### Response +```json +[ + { + "provider_actor_id": "f012345", + "dataset_id": 1, + "unsealed": false, + "indexed": false + }, + { + "provider_actor_id": "f012345", + "dataset_id": 2, + "unsealed": true, + "indexed": false + } +] +``` + +### PUT /replication-profiles +- Update a replication profile + +#### Params + + +#### Body +```json +{ + "provider_actor_id": "f012345", + "dataset_id": 1, + "unsealed": true, + "indexed": true +} +``` + +- Note: `provider_actor_id` and `dataset_id` cannot be changed with the PUT request - they are used to identify the profile to update. + +#### Response +> 200: Success + +```json +{ + "provider_actor_id": "f012345", + "dataset_id": 1, + "unsealed": true, + "indexed": true +} +``` + +### DELETE /replication-profiles +- Delete a replication profile + +#### Params + + +#### Body +```json +{ + "provider_actor_id": "f012345", + "dataset_id": 2, +} +``` + +#### Response +> 200: Success + +```json +"replication profile with ProviderActorID f012345 and DatasetID 2 deleted successfully" +``` diff --git a/docs/cmd.md b/docs/cmd.md index 46582ec..c945f6d 100644 --- a/docs/cmd.md +++ b/docs/cmd.md @@ -105,3 +105,23 @@ Example: ### List content in a dataset `> ./delta-dm content list --dataset ` + + +## replication profiles +- Note: `replication-profile`/`rp` commands take a `dataset id`, not a `dataset name`- you can run `dataset list` to get the id for a dataset. +### Add a replication profile +`> ./delta-dm rp add --spid --dataset [--unsealed] [--indexed]` + +Example: +```bash +./delta-dm rp add --spid f01000 --dataset 1 --unsealed --indexed +``` + +### Modify a replication profile +`> ./delta-dm rp modify --spid --dataset [--unsealed] [--indexed]` + +### Delete a replication profile +`> ./delta-dm rp delete --spid --dataset ` + +### List replication profiles +`> ./delta-dm rp list` \ No newline at end of file diff --git a/main.go b/main.go index dfebc8e..ce55b61 100644 --- a/main.go +++ b/main.go @@ -29,6 +29,7 @@ func main() { commands = append(commands, cmd.DaemonCmd(di)...) commands = append(commands, cmd.WalletCmd()...) commands = append(commands, cmd.ReplicationCmd()...) + commands = append(commands, cmd.ReplicationProfilesCmd()...) commands = append(commands, cmd.ProviderCmd()...) commands = append(commands, cmd.DatasetCmd()...) commands = append(commands, cmd.ContentCmd()...) diff --git a/readme.md b/readme.md index d05da16..b9b50f3 100644 --- a/readme.md +++ b/readme.md @@ -4,7 +4,7 @@ A tool to manage deal replication tracking for onboarding datasets to the Filecoin network via **import storage deals**. This provides a solution to quickly make deals for massive amounts of data, where the transfer is better handled out-of-band. -## Data Flow +## Core Concepts ### Dataset The top-level logical grouping of data in DDM is the **dataset**. Datasets are identified by a name (aka "slug"), along with a replication quota, deal length, and a wallet to make the deals from. @@ -16,9 +16,12 @@ Once a dataset has been created, content may be added to it. A content represent ### Providers DDM tracks deals to Storage Providers in the network. Add a list of storage providers to DDM before making deals to begin tracking them. +### Replication Profiles +A **Replication Profile** is what ties a **Dataset** together with a **Provider**. It defines the parameters for any deals made to that provider for that dataset. Currently, it allows specifying whether to keep an `unsealed copy` and whether to announce to the `IPNI` indexer. This allows for flexibility in how deals are made to different providers, such as defining a single SP to host the unsealed copies for retrieval while the others maintain a cold copy for backup. + ### Replication -Once a **Dataset**, **Content**, and **Providers** have been specified, the DDM `replication` API can be called to issue a number of import deals out to the providers. +Once a **Dataset**, **Content**, **Providers**, and a **Replication Strategy** have been specified, DDM can make replications for the content to the providers. A **Replication** is a single deal made to a single provider for a single piece of content. Replications are tracked by DDM, and can be queried for status and deal information. # Instructions