diff --git a/api/contents.go b/api/contents.go index 8983161..bc5b689 100644 --- a/api/contents.go +++ b/api/contents.go @@ -1,12 +1,15 @@ package api import ( + "errors" "fmt" "io/ioutil" + "strconv" "github.com/application-research/delta-dm/core" "github.com/jszwec/csvutil" "github.com/labstack/echo/v4" + "gorm.io/gorm" ) func ConfigureContentsRouter(e *echo.Group, dldm *core.DeltaDM) { @@ -21,12 +24,21 @@ func ConfigureContentsRouter(e *echo.Group, dldm *core.DeltaDM) { d := c.Param("dataset") if d == "" { - return fmt.Errorf("dataset must be specified") + return fmt.Errorf("dataset id must be specified") + } + did, err := strconv.ParseUint(d, 10, 64) + if err != nil { + return fmt.Errorf("dataset id must be numeric %s", err) } - dldm.DB.Where("name = ?", d).First(&dataset) + if tx := dldm.DB.First(&dataset, did); tx.Error != nil { + if errors.Is(tx.Error, gorm.ErrRecordNotFound) { + return fmt.Errorf("dataset not found") + } + return fmt.Errorf("failed to get dataset: %s", tx.Error) + } - err := dldm.DB.Model(&dataset).Association("Contents").Find(&content) + err = dldm.DB.Model(&dataset).Association("Contents").Find(&content) if err != nil { return err } @@ -46,14 +58,21 @@ func ConfigureContentsRouter(e *echo.Group, dldm *core.DeltaDM) { } d := c.Param("dataset") + if d == "" { - return fmt.Errorf("dataset must be specified") + return fmt.Errorf("dataset id must be specified") + } + did, err := strconv.ParseUint(d, 10, 64) + if err != nil { + return fmt.Errorf("dataset id must be numeric %s", err) } // Check if dataset exists - res := dldm.DB.Where("name = ?", d).First(&dataset) - if res.Error != nil { - return fmt.Errorf("could not find dataset %s : %s", d, res.Error) + if tx := dldm.DB.First(&dataset, did); tx.Error != nil { + if errors.Is(tx.Error, gorm.ErrRecordNotFound) { + return fmt.Errorf("dataset not found") + } + return fmt.Errorf("failed to get dataset: %s", tx.Error) } it := c.QueryParam("import_type") @@ -90,7 +109,7 @@ func ConfigureContentsRouter(e *echo.Group, dldm *core.DeltaDM) { continue } - cnt.DatasetName = dataset.Name + cnt.DatasetID = dataset.ID err := dldm.DB.Create(&cnt).Error if err != nil { diff --git a/api/datasets.go b/api/datasets.go index cd16d15..da3d12a 100644 --- a/api/datasets.go +++ b/api/datasets.go @@ -6,7 +6,6 @@ import ( "github.com/application-research/delta-dm/core" "github.com/application-research/delta-dm/util" "github.com/labstack/echo/v4" - "gorm.io/gorm" ) func ConfigureDatasetsRouter(e *echo.Group, dldm *core.DeltaDM) { @@ -17,25 +16,23 @@ func ConfigureDatasetsRouter(e *echo.Group, dldm *core.DeltaDM) { datasets.GET("", func(c echo.Context) error { var ds []core.Dataset - dldm.DB.Preload("Wallets").Preload("AllowedProviders", func(db *gorm.DB) *gorm.DB { - return db.Select("actor_id") - }).Find(&ds) + dldm.DB.Preload("Wallets").Preload("ReplicationProfiles").Find(&ds) // Find # of bytes total and replicated for each dataset for i, d := range ds { var rb [2]uint64 - dldm.DB.Raw("select SUM(size) s, SUM(padded_size) ps FROM contents c inner join replications r on r.content_comm_p = c.comm_p where r.status = 'SUCCESS' AND dataset_name = ?", d.Name).Row().Scan(&rb[0], &rb[1]) + dldm.DB.Raw("select SUM(size) s, SUM(padded_size) ps FROM contents c inner join replications r on r.content_comm_p = c.comm_p where r.status = 'SUCCESS' AND dataset_id = ?", d.ID).Row().Scan(&rb[0], &rb[1]) var tb [2]uint64 - dldm.DB.Raw("select SUM(size) s, SUM(padded_size) ps FROM contents where dataset_name = ?", d.Name).Row().Scan(&tb[0], &tb[1]) + dldm.DB.Raw("select SUM(size) s, SUM(padded_size) ps FROM contents where dataset_id = ?", d.ID).Row().Scan(&tb[0], &tb[1]) ds[i].BytesReplicated = core.ByteSizes{Raw: rb[0], Padded: rb[1]} ds[i].BytesTotal = core.ByteSizes{Raw: tb[0], Padded: tb[1]} var countReplicated uint64 = 0 var countTotal uint64 = 0 - dldm.DB.Raw("select count(*) cr FROM contents c inner join replications r on r.content_comm_p = c.comm_p where r.status = 'SUCCESS' AND dataset_name = ?", d.Name).Row().Scan(&countReplicated) - dldm.DB.Raw("select count(*) cr FROM contents c where dataset_name = ?", d.Name).Row().Scan(&countTotal) + dldm.DB.Raw("select count(*) cr FROM contents c inner join replications r on r.content_comm_p = c.comm_p where r.status = 'SUCCESS' AND dataset_id = ?", d.ID).Row().Scan(&countReplicated) + dldm.DB.Raw("select count(*) cr FROM contents c where dataset_id = ?", d.ID).Row().Scan(&countTotal) ds[i].CountReplicated = countReplicated ds[i].CountTotal = countTotal diff --git a/api/providers.go b/api/providers.go index c1a993c..587346c 100644 --- a/api/providers.go +++ b/api/providers.go @@ -11,9 +11,8 @@ import ( ) type ProviderPutBody struct { - ActorName string `json:"actor_name"` - AllowSelfService string `json:"allow_self_service"` - AllowedDatasets []string `json:"allowed_datasets"` + ActorName string `json:"actor_name"` + AllowSelfService string `json:"allow_self_service"` } func ConfigureProvidersRouter(e *echo.Group, dldm *core.DeltaDM) { @@ -24,7 +23,7 @@ func ConfigureProvidersRouter(e *echo.Group, dldm *core.DeltaDM) { providers.GET("", func(c echo.Context) error { var p []core.Provider - dldm.DB.Preload("AllowedDatasets").Find(&p) + dldm.DB.Preload("ReplicationProfiles").Find(&p) for i, sp := range p { var rb [2]uint64 @@ -93,22 +92,6 @@ func ConfigureProvidersRouter(e *echo.Group, dldm *core.DeltaDM) { existing.AllowSelfService = false } - // If array of allowed datasets is empty, don't modify the association - if len(p.AllowedDatasets) > 0 { - var newAllowedDatasets []core.Dataset - for _, ds_name := range p.AllowedDatasets { - var ds core.Dataset - res := dldm.DB.Model(&core.Dataset{}).Where("name = ?", ds_name).First(&ds) - if res.Error != nil { - return fmt.Errorf("error fetching dataset %s : %s", ds_name, res.Error) - } else { - newAllowedDatasets = append(newAllowedDatasets, ds) - } - } - - dldm.DB.Model(&existing).Association("AllowedDatasets").Replace(newAllowedDatasets) - } - res = dldm.DB.Save(&existing) if res.Error != nil { return fmt.Errorf("error saving provider %s", res.Error) diff --git a/api/replication-profiles.go b/api/replication-profiles.go new file mode 100644 index 0000000..b62a092 --- /dev/null +++ b/api/replication-profiles.go @@ -0,0 +1,122 @@ +package api + +import ( + "fmt" + "net/http" + + "github.com/application-research/delta-dm/core" + "github.com/labstack/echo/v4" +) + +type ReplicationProfile struct { + DatasetName string `json:"dataset_name"` + Unsealed bool `json:"unsealed"` + Indexed bool `json:"indexed"` +} + +func ConfigureReplicationProfilesRouter(e *echo.Group, dldm *core.DeltaDM) { + replicationProfiles := e.Group("/replication-profiles") + + replicationProfiles.Use(dldm.AS.AuthMiddleware) + + replicationProfiles.GET("", func(c echo.Context) error { + var p []core.ReplicationProfile + + res := dldm.DB.Model(&core.ReplicationProfile{}).Find(&p) + if res.Error != nil { + return fmt.Errorf("error finding replication profiles: %s", res.Error) + } + + return c.JSON(http.StatusOK, p) + }) + + replicationProfiles.POST("", func(c echo.Context) error { + var p core.ReplicationProfile + + if err := c.Bind(&p); err != nil { + return fmt.Errorf("failed to parse request body: %s", err.Error()) + } + + // Check if the dataset and provider exist + var ds core.Dataset + var provider core.Provider + + dsRes := dldm.DB.Where("id = ?", p.DatasetID).First(&ds) + providerRes := dldm.DB.Where("actor_id = ?", p.ProviderActorID).First(&provider) + + if dsRes.Error != nil || ds.ID == 0 { + return fmt.Errorf("invalid dataset: %s", dsRes.Error) + } + + if providerRes.Error != nil || provider.ActorID == "" { + return fmt.Errorf("invalid provider: %s", providerRes.Error) + } + + // Save the replication profile + res := dldm.DB.Create(&p) + if res.Error != nil { + if res.Error.Error() == "UNIQUE constraint failed: replication_profiles.provider_actor_id, replication_profiles.dataset_id" { + return fmt.Errorf("replication profile for provider %s and datasetID %d already exists", p.ProviderActorID, p.DatasetID) + } + return fmt.Errorf("failed to save replication profile: %s", res.Error.Error()) + } + + return c.JSON(http.StatusOK, p) + }) + + replicationProfiles.DELETE("", func(c echo.Context) error { + var p core.ReplicationProfile + if err := c.Bind(&p); err != nil { + return fmt.Errorf("failed to parse request body: %s", err.Error()) + } + + if p.DatasetID == 0 || p.ProviderActorID == "" { + return fmt.Errorf("invalid replication profile ID") + } + + var existingProfile core.ReplicationProfile + res := dldm.DB.Where("provider_actor_id = ? AND dataset_id = ?", p.ProviderActorID, p.DatasetID).First(&existingProfile) + + if res.Error != nil { + return fmt.Errorf("replication profile not found: %s", res.Error) + } + + deleteRes := dldm.DB.Delete(&existingProfile) + if deleteRes.Error != nil { + return fmt.Errorf("failed to delete replication profile: %s", deleteRes.Error.Error()) + } + + return c.JSON(http.StatusOK, fmt.Sprintf("replication profile with ProviderActorID %s and DatasetID %d deleted successfully", p.ProviderActorID, p.DatasetID)) + }) + + replicationProfiles.PUT("", func(c echo.Context) error { + var updatedProfile core.ReplicationProfile + if err := c.Bind(&updatedProfile); err != nil { + return fmt.Errorf("failed to parse request body: %s", err.Error()) + } + + if updatedProfile.DatasetID == 0 || updatedProfile.ProviderActorID == "" { + return fmt.Errorf("invalid replication profile ID") + } + + var existingProfile core.ReplicationProfile + res := dldm.DB.Where("provider_actor_id = ? AND dataset_id = ?", updatedProfile.ProviderActorID, updatedProfile.DatasetID).First(&existingProfile) + + if res.Error != nil { + return fmt.Errorf("replication profile not found: %s", res.Error) + } + + updateData := map[string]interface{}{ + "unsealed": updatedProfile.Unsealed, + "indexed": updatedProfile.Indexed, + } + + updateRes := dldm.DB.Model(&existingProfile).Updates(updateData) + if updateRes.Error != nil { + return fmt.Errorf("failed to update replication profile: %s", updateRes.Error.Error()) + } + + return c.JSON(http.StatusOK, updatedProfile) + }) + +} diff --git a/api/replications.go b/api/replications.go index 1ede4aa..34312fb 100644 --- a/api/replications.go +++ b/api/replications.go @@ -16,7 +16,7 @@ const DEFAULT_DELAY_DAYS = 3 type PostReplicationBody struct { Provider string `json:"provider"` - Dataset *string `json:"dataset,omitempty"` + DatasetID *uint `json:"dataset_id,omitempty"` NumDeals *uint `json:"num_deals,omitempty"` DelayStartDays *uint64 `json:"delay_start_days,omitempty"` // NumTib *int `json:"num_tib,omitempty"` @@ -229,24 +229,24 @@ func handlePostReplications(c echo.Context, dldm *core.DeltaDM) error { delayStartEpoch = *d.DelayStartDays } - if d.Dataset != nil && *d.Dataset != "" { + if d.DatasetID != nil { var datasetExists bool err = dldm.DB.Model(core.Dataset{}). Select("count(*) > 0"). - Where("name = ?", d.Dataset). + Where("id = ?", d.DatasetID). Find(&datasetExists). Error if err != nil { - return fmt.Errorf("could not check if dataset %s exists: %s", *d.Dataset, err) + return fmt.Errorf("could not check if dataset with id %d exists: %s", *d.DatasetID, err) } if !datasetExists { - return fmt.Errorf("dataset %s does not exist in ddm. please add it first", *d.Dataset) + return fmt.Errorf("dataset id %d does not exist in ddm.", *d.DatasetID) } } // TODO: Support num_tib to allow specifying the amount of data to replicate - toReplicate, err := findUnreplicatedContentForProvider(dldm.DB, d.Provider, d.Dataset, d.NumDeals) + toReplicate, err := findUnreplicatedContentForProvider(dldm.DB, d.Provider, d.DatasetID, d.NumDeals) if err != nil { return err } @@ -259,7 +259,7 @@ func handlePostReplications(c echo.Context, dldm *core.DeltaDM) error { log.Debugf("calling DELTA api for %+v deals\n\n", len(toReplicate)) for _, c := range toReplicate { - wallet, err := walletSelection(dldm.DB, &c.DatasetName) + wallet, err := walletSelection(dldm.DB, d.DatasetID) if err != nil || wallet.Addr == "" { return fmt.Errorf("dataset '%s' does not have a wallet. no deals were made. please add a wallet for this dataset and try again. alternatively, explicitly specify a dataset in the request to force replication of one with an existing wallet", c.Dataset.Name) @@ -273,8 +273,8 @@ func handlePostReplications(c echo.Context, dldm *core.DeltaDM) error { ConnectionMode: "import", Miner: d.Provider, Size: c.Size, - SkipIpniAnnounce: !c.Indexed, - RemoveUnsealedCopy: !c.Unsealed, + SkipIpniAnnounce: !c.ReplicationProfile.Indexed, + RemoveUnsealedCopy: !c.ReplicationProfile.Unsealed, DurationInDays: c.DealDuration, StartEpochInDays: delayStartEpoch, PieceCommitment: core.PieceCommitment{ @@ -295,19 +295,21 @@ func handlePostReplications(c echo.Context, dldm *core.DeltaDM) error { type replicatedContentQueryResponse struct { core.Content core.Dataset + core.ReplicationProfile } // Query the database for all contant that does not have replications to this actor yet // Arguments: providerID - the actor ID of the provider // -// datasetName (optional) - the name of the dataset to replicate +// datasetID (optional) - the ID of the dataset to replicate // numDeals (optional) - the number of replications (deals) to return. If nil, return all -func findUnreplicatedContentForProvider(db *gorm.DB, providerID string, datasetName *string, numDeals *uint) ([]replicatedContentQueryResponse, error) { +func findUnreplicatedContentForProvider(db *gorm.DB, providerID string, datasetId *uint, numDeals *uint) ([]replicatedContentQueryResponse, error) { rawQuery := ` SELECT * FROM datasets d - INNER JOIN contents c ON d.name = c.dataset_name + INNER JOIN contents c ON d.id = c.dataset_id + INNER JOIN replication_profiles rp ON rp.dataset_id = d.id -- Only select content that does not have a non-failed replication to this provider WHERE c.comm_p NOT IN ( SELECT r.content_comm_p @@ -319,19 +321,15 @@ func findUnreplicatedContentForProvider(db *gorm.DB, providerID string, datasetN WHERE p.actor_id <> ? ) ) - -- Only select content from datasets that this provider is allowed to replicate - AND d.id IN ( - SELECT dataset_id - FROM provider_allowed_datasets - WHERE provider_actor_id = ? - ) + -- Only select content from datasets that this provider is allowed to replicate + AND rp.provider_actor_id = ? AND c.num_replications < d.replication_quota ` var rawValues = []interface{}{providerID, providerID} - if datasetName != nil && *datasetName != "" { - rawQuery += " AND d.name = ?" - rawValues = append(rawValues, datasetName) + if datasetId != nil && *datasetId != 0 { + rawQuery += " AND d.id = ?" + rawValues = append(rawValues, datasetId) } if numDeals != nil { @@ -345,17 +343,17 @@ func findUnreplicatedContentForProvider(db *gorm.DB, providerID string, datasetN } // Find which wallet to use when making deals for a given dataset -func walletSelection(db *gorm.DB, datasetName *string) (*core.Wallet, error) { +func walletSelection(db *gorm.DB, datasetId *uint) (*core.Wallet, error) { var w []core.Wallet - res := db.Raw("select * from wallets w inner join wallet_datasets wd on w.addr = wd.wallet_addr inner join datasets d on wd.dataset_id = d.id where d.name = ?", datasetName).Scan(&w) + res := db.Raw("select * from wallets w inner join wallet_datasets wd on w.addr = wd.wallet_addr inner join datasets d on wd.dataset_id = d.id where d.id = ?", datasetId).Scan(&w) if res.Error != nil { return nil, res.Error } if len(w) == 0 { - return nil, fmt.Errorf("no wallet found for dataset '%s'", *datasetName) + return nil, fmt.Errorf("no wallet found for dataset '%d'", *datasetId) } diff --git a/api/router.go b/api/router.go index 62d2721..1697840 100644 --- a/api/router.go +++ b/api/router.go @@ -64,6 +64,7 @@ func InitializeEchoRouterConfig(dldm *core.DeltaDM, port uint) { ConfigureWalletsRouter(apiGroup, dldm) ConfigureHealthRouter(apiGroup, dldm) ConfigureSelfServiceRouter(apiGroup, dldm) + ConfigureReplicationProfilesRouter(apiGroup, dldm) // Start server e.Logger.Fatal(e.Start(fmt.Sprintf("0.0.0.0:%d", (port)))) // configuration } diff --git a/api/selfservice.go b/api/selfservice.go index ede3652..ddb0db0 100644 --- a/api/selfservice.go +++ b/api/selfservice.go @@ -37,7 +37,7 @@ func selfServiceTokenMiddleware(dldm *core.DeltaDM) echo.MiddlewareFunc { return c.String(401, "missing provider self-service token") } var p core.Provider - res := dldm.DB.Model(&core.Provider{}).Preload("AllowedDatasets").Where("key = ?", providerToken).Find(&p) + res := dldm.DB.Model(&core.Provider{}).Preload("ReplicationProfiles").Where("key = ?", providerToken).Find(&p) if res.Error != nil { log.Errorf("error finding provider: %s", res.Error) @@ -91,15 +91,17 @@ func handleSelfServiceByCid(c echo.Context, dldm *core.DeltaDM) error { } var ds core.Dataset - res = dldm.DB.Model(&core.Dataset{}).Where("name = ?", cnt.DatasetName).Find(&ds) + res = dldm.DB.Model(&core.Dataset{}).Where("id = ?", cnt.DatasetID).Find(&ds) if res.Error != nil { - return fmt.Errorf("unable to find associated dataset %s", cnt.DatasetName) + return fmt.Errorf("unable to find dataset %d associated with requested CID", cnt.DatasetID) } + var rp core.ReplicationProfile isAllowed := false - for _, allowedDs := range p.AllowedDatasets { - if allowedDs.Name == ds.Name { + for _, thisRp := range p.ReplicationProfiles { + if thisRp.DatasetID == ds.ID { isAllowed = true + rp = thisRp break } } @@ -122,10 +124,10 @@ func handleSelfServiceByCid(c echo.Context, dldm *core.DeltaDM) error { var dealsToMake core.OfflineDealRequest log.Debugf("calling DELTA api for deal\n\n") - wallet, err := walletSelection(dldm.DB, &cnt.DatasetName) + wallet, err := walletSelection(dldm.DB, &cnt.DatasetID) if err != nil || wallet.Addr == "" { - return fmt.Errorf("dataset '%s' does not have a wallet. no deals were made. please contact administrator", cnt.DatasetName) + return fmt.Errorf("dataset '%s' does not have a wallet. no deals were made. please contact administrator", ds.Name) } dealsToMake = append(dealsToMake, core.Deal{ @@ -136,8 +138,8 @@ func handleSelfServiceByCid(c echo.Context, dldm *core.DeltaDM) error { ConnectionMode: "import", Miner: p.ActorID, Size: cnt.Size, - SkipIpniAnnounce: !ds.Indexed, - RemoveUnsealedCopy: !ds.Unsealed, + SkipIpniAnnounce: !rp.Indexed, + RemoveUnsealedCopy: !rp.Unsealed, DurationInDays: ds.DealDuration, StartEpochInDays: delayDays, PieceCommitment: core.PieceCommitment{ @@ -162,6 +164,12 @@ func handleSelfServiceByDataset(c echo.Context, dldm *core.DeltaDM) error { return fmt.Errorf("must provide a dataset name") } + var ds core.Dataset + dsRes := dldm.DB.Where("name = ?", dataset).First(&ds) + if dsRes.Error != nil || ds.ID == 0 { + return fmt.Errorf("invalid dataset: %s", dsRes.Error) + } + var delayDays uint64 = 3 if startEpochDelay != "" { var err error @@ -178,8 +186,8 @@ func handleSelfServiceByDataset(c echo.Context, dldm *core.DeltaDM) error { p := c.Get(PROVIDER).(core.Provider) isAllowed := false - for _, ds := range p.AllowedDatasets { - if ds.Name == dataset { + for _, rp := range p.ReplicationProfiles { + if rp.DatasetID == ds.ID { isAllowed = true break } @@ -191,7 +199,7 @@ func handleSelfServiceByDataset(c echo.Context, dldm *core.DeltaDM) error { // give one deal at a time numDeals := uint(1) - cnt, err := findUnreplicatedContentForProvider(dldm.DB, p.ActorID, &dataset, &numDeals) + cnt, err := findUnreplicatedContentForProvider(dldm.DB, p.ActorID, &ds.ID, &numDeals) if err != nil { return fmt.Errorf("unable to find content for dataset: %s", err) } @@ -202,10 +210,10 @@ func handleSelfServiceByDataset(c echo.Context, dldm *core.DeltaDM) error { deal := cnt[0] - wallet, err := walletSelection(dldm.DB, &deal.DatasetName) + wallet, err := walletSelection(dldm.DB, &ds.ID) if err != nil || wallet.Addr == "" { - return fmt.Errorf("dataset '%s' does not have a wallet associated. no deals were made. please contact administrator", deal.DatasetName) + return fmt.Errorf("dataset '%s' does not have a wallet associated. no deals were made. please contact administrator", ds.Name) } var dealsToMake []core.Deal @@ -218,8 +226,8 @@ func handleSelfServiceByDataset(c echo.Context, dldm *core.DeltaDM) error { ConnectionMode: "import", Miner: p.ActorID, Size: deal.Size, - SkipIpniAnnounce: !deal.Indexed, - RemoveUnsealedCopy: !deal.Unsealed, + SkipIpniAnnounce: !deal.ReplicationProfile.Indexed, + RemoveUnsealedCopy: !deal.ReplicationProfile.Unsealed, DurationInDays: deal.DealDuration - delayDays, StartEpochInDays: delayDays, PieceCommitment: core.PieceCommitment{ diff --git a/api/wallet.go b/api/wallet.go index 4dd4476..bde7d30 100644 --- a/api/wallet.go +++ b/api/wallet.go @@ -138,8 +138,8 @@ func handleAddWallet(c echo.Context, dldm *core.DeltaDM) error { } type AssociateWalletBody struct { - Address string `json:"address"` - Datasets []string `json:"datasets"` + Address string `json:"address"` + Datasets []uint `json:"datasets"` } // POST /api/wallet/associate @@ -166,20 +166,20 @@ func handleAssociateWallet(c echo.Context, dldm *core.DeltaDM) error { } var newDatasets []core.Dataset - for _, datasetName := range awb.Datasets { + for _, datasetID := range awb.Datasets { var dataset core.Dataset err := dldm.DB.Model(core.Dataset{}). - Where("name = ?", datasetName). + Where("id = ?", datasetID). Find(&dataset). Error if err != nil { - return fmt.Errorf("could not check for dataset %s : %s", datasetName, err) + return fmt.Errorf("could not check for dataset %d : %s", datasetID, err) } if dataset.ID == 0 { - return fmt.Errorf("dataset %s does not exist", datasetName) + return fmt.Errorf("dataset %d does not exist", datasetID) } newDatasets = append(newDatasets, dataset) diff --git a/cmd/content.go b/cmd/content.go index 574a7be..40accb5 100644 --- a/cmd/content.go +++ b/cmd/content.go @@ -8,7 +8,7 @@ import ( ) func ContentCmd() []*cli.Command { - var datasetName string + var datasetID uint // add a command to run API node var contentCmds []*cli.Command @@ -20,11 +20,11 @@ func ContentCmd() []*cli.Command { Name: "import", Usage: "import content to a dataset", Flags: []cli.Flag{ - &cli.StringFlag{ + &cli.UintFlag{ Name: "dataset", Aliases: []string{"d"}, - Usage: "dataset name (slug)", - Destination: &datasetName, + Usage: "dataset id (numeric)", + Destination: &datasetID, Required: true, }, &cli.StringFlag{ @@ -57,7 +57,7 @@ func ContentCmd() []*cli.Command { } var body []byte - url := "/api/v1/contents/" + datasetName + url := "/api/v1/contents/" + string(datasetID) if jsonFilename != "" { jsonFile, err := ioutil.ReadFile(jsonFilename) @@ -98,11 +98,11 @@ func ContentCmd() []*cli.Command { Name: "list", Usage: "list content in a dataset", Flags: []cli.Flag{ - &cli.StringFlag{ + &cli.UintFlag{ Name: "dataset", Aliases: []string{"d"}, - Usage: "dataset name (slug)", - Destination: &datasetName, + Usage: "dataset id (numeric)", + Destination: &datasetID, Required: true, }, }, @@ -112,7 +112,7 @@ func ContentCmd() []*cli.Command { return err } - url := "/api/v1/contents/" + datasetName + url := "/api/v1/contents/" + string(datasetID) res, closer, err := cmd.MakeRequest("GET", url, nil) diff --git a/cmd/daemon.go b/cmd/daemon.go index 60d7fe2..f074cfa 100644 --- a/cmd/daemon.go +++ b/cmd/daemon.go @@ -19,7 +19,6 @@ func DaemonCmd(di core.DeploymentInfo) []*cli.Command { var authServer string var port uint - // add a command to run API node var daemonCommands []*cli.Command daemonCmd := &cli.Command{ Name: "daemon", diff --git a/cmd/dataset.go b/cmd/dataset.go index 47d8d7c..942b61f 100644 --- a/cmd/dataset.go +++ b/cmd/dataset.go @@ -14,10 +14,7 @@ func DatasetCmd() []*cli.Command { var datasetName string var replicationQuota uint64 var dealDuration uint64 - var keepUnsealedCopy bool - var announceToIndexer bool - // add a command to run API node var datasetCmds []*cli.Command datasetCmd := &cli.Command{ Name: "dataset", @@ -50,22 +47,6 @@ func DatasetCmd() []*cli.Command { Value: 540, Destination: &dealDuration, }, - &cli.BoolFlag{ - Name: "unsealed", - Aliases: []string{"u"}, - Usage: "flag to keep unsealed copy when deals are made", - DefaultText: "true", - Value: true, - Destination: &keepUnsealedCopy, - }, - &cli.BoolFlag{ - Name: "indexed", - Aliases: []string{"i"}, - Usage: "flag to announce to IPNI (Interplanetary Network Indexer)", - DefaultText: "true", - Value: true, - Destination: &announceToIndexer, - }, }, Action: func(c *cli.Context) error { cmd, err := NewCmdProcessor(c) @@ -81,8 +62,6 @@ func DatasetCmd() []*cli.Command { Name: datasetName, ReplicationQuota: replicationQuota, DealDuration: dealDuration, - Indexed: announceToIndexer, - Unsealed: keepUnsealedCopy, } b, err := json.Marshal(body) diff --git a/cmd/provider.go b/cmd/provider.go index 4ed69d6..88ff74d 100644 --- a/cmd/provider.go +++ b/cmd/provider.go @@ -14,9 +14,7 @@ func ProviderCmd() []*cli.Command { var spId string var spName string var allowSelfService string - var allowedDatasets cli.StringSlice - // add a command to run API node var providerCmds []*cli.Command providerCmd := &cli.Command{ Name: "provider", @@ -90,11 +88,6 @@ func ProviderCmd() []*cli.Command { Usage: "enable self-service for provider (on|off)", Destination: &allowSelfService, }, - &cli.StringSliceFlag{ - Name: "allowed-datasets", - Usage: "datasets the provider is permitted to replicate (comma separated list)", - Destination: &allowedDatasets, - }, }, Action: func(c *cli.Context) error { cmd, err := NewCmdProcessor(c) @@ -111,7 +104,6 @@ func ProviderCmd() []*cli.Command { body := api.ProviderPutBody{ ActorName: spName, AllowSelfService: allowSelfService, - AllowedDatasets: allowedDatasets.Value(), } b, err := json.Marshal(body) diff --git a/cmd/replication-profiles.go b/cmd/replication-profiles.go new file mode 100644 index 0000000..5a2ff17 --- /dev/null +++ b/cmd/replication-profiles.go @@ -0,0 +1,209 @@ +package cmd + +import ( + "encoding/json" + "fmt" + "net/http" + + "github.com/application-research/delta-dm/core" + "github.com/urfave/cli/v2" +) + +func ReplicationProfilesCmd() []*cli.Command { + var spId string + var datasetId uint + var unsealed bool + var indexed bool + + var providerCmds []*cli.Command + providerCmd := &cli.Command{ + Name: "replication-profile", + Aliases: []string{"rp"}, + Usage: "Replication Profile Commands", + Subcommands: []*cli.Command{ + { + Name: "add", + Usage: "add replication profile", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "spid", + Usage: "storage provider id", + Destination: &spId, + Required: true, + }, + &cli.UintFlag{ + Name: "dataset", + Usage: "dataset id", + Aliases: []string{"d"}, + Destination: &datasetId, + Required: true, + }, + &cli.BoolFlag{ + Name: "indexed", + Usage: "announce deals to indexer", + Destination: &indexed, + }, + &cli.BoolFlag{ + Name: "unsealed", + Usage: "keep unsealed copy", + Destination: &unsealed, + }, + }, + Action: func(c *cli.Context) error { + cmd, err := NewCmdProcessor(c) + if err != nil { + return err + } + + body := core.ReplicationProfile{ + ProviderActorID: spId, + DatasetID: datasetId, + Unsealed: unsealed, + Indexed: indexed, + } + + b, err := json.Marshal(body) + if err != nil { + return fmt.Errorf("unable to construct request body %s", err) + } + + res, closer, err := cmd.MakeRequest(http.MethodPost, "/api/v1/replication-profiles/", b) + if err != nil { + return fmt.Errorf("unable to make request %s", err) + } + defer closer() + + fmt.Printf("%s", string(res)) + + return nil + }, + }, + { + Name: "modify", + Usage: "modify replication profile", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "spid", + Usage: "storage provider id", + Destination: &spId, + Required: true, + }, + &cli.UintFlag{ + Name: "dataset", + Usage: "dataset id", + Aliases: []string{"d"}, + Destination: &datasetId, + Required: true, + }, + &cli.BoolFlag{ + Name: "indexed", + Usage: "announce deals to indexer", + Destination: &indexed, + }, + &cli.BoolFlag{ + Name: "unsealed", + Usage: "keep unsealed copy", + Destination: &unsealed, + }, + }, + Action: func(c *cli.Context) error { + cmd, err := NewCmdProcessor(c) + if err != nil { + return err + } + + body := core.ReplicationProfile{ + ProviderActorID: spId, + DatasetID: datasetId, + Unsealed: unsealed, + Indexed: indexed, + } + + b, err := json.Marshal(body) + if err != nil { + return fmt.Errorf("unable to construct request body %s", err) + } + + res, closer, err := cmd.MakeRequest(http.MethodPut, "/api/v1/replication-profiles/", b) + if err != nil { + return fmt.Errorf("unable to make request %s", err) + } + defer closer() + + fmt.Printf("%s", string(res)) + + return nil + }, + }, + { + Name: "delete", + Usage: "delete replication profile", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "spid", + Usage: "storage provider id", + Destination: &spId, + Required: true, + }, + &cli.UintFlag{ + Name: "dataset", + Usage: "dataset id", + Aliases: []string{"id"}, + Destination: &datasetId, + Required: true, + }, + }, + Action: func(c *cli.Context) error { + cmd, err := NewCmdProcessor(c) + if err != nil { + return err + } + + body := core.ReplicationProfile{ + ProviderActorID: spId, + DatasetID: datasetId, + } + + b, err := json.Marshal(body) + if err != nil { + return fmt.Errorf("unable to construct request body %s", err) + } + + res, closer, err := cmd.MakeRequest(http.MethodDelete, "/api/v1/replication-profiles/", b) + if err != nil { + return fmt.Errorf("unable to make request %s", err) + } + defer closer() + + fmt.Printf("%s", string(res)) + + return nil + }, + }, + { + Name: "list", + Usage: "list replication profiles", + Action: func(c *cli.Context) error { + cmd, err := NewCmdProcessor(c) + if err != nil { + return err + } + + res, closer, err := cmd.MakeRequest(http.MethodGet, "/api/v1/replication-profiles/", nil) + if err != nil { + return fmt.Errorf("unable to make request %s", err) + } + defer closer() + + fmt.Printf("%s", string(res)) + + return nil + }, + }, + }, + } + + providerCmds = append(providerCmds, providerCmd) + + return providerCmds +} diff --git a/cmd/replication.go b/cmd/replication.go index e678550..722b5de 100644 --- a/cmd/replication.go +++ b/cmd/replication.go @@ -12,10 +12,9 @@ import ( func ReplicationCmd() []*cli.Command { var num uint var provider string - var dataset string + var datasetID uint var delayStartDays uint64 - // add a command to run API node var replicationCmds []*cli.Command replicationCmd := &cli.Command{ Name: "replication", @@ -39,10 +38,11 @@ func ReplicationCmd() []*cli.Command { Destination: &provider, Required: true, }, - &cli.StringFlag{ + &cli.UintFlag{ Name: "dataset", - Usage: "dataset to replicate", - Destination: &dataset, + Aliases: []string{"d"}, + Usage: "dataset id to replicate", + Destination: &datasetID, }, &cli.Uint64Flag{ Name: "delay-start", @@ -61,8 +61,8 @@ func ReplicationCmd() []*cli.Command { Provider: provider, } - if dataset != "" { - body.Dataset = &dataset + if datasetID != 0 { + body.DatasetID = &datasetID } if delayStartDays != 0 { diff --git a/cmd/wallet.go b/cmd/wallet.go index 52d6220..9249d76 100644 --- a/cmd/wallet.go +++ b/cmd/wallet.go @@ -5,17 +5,17 @@ import ( "fmt" "io/ioutil" "net/http" - "strings" + "strconv" "github.com/application-research/delta-dm/api" "github.com/urfave/cli/v2" ) func WalletCmd() []*cli.Command { - var dataset string + var datasetIDs cli.UintSlice + var datasetID uint var walletAddress string - // add a command to run API node var walletCmds []*cli.Command walletCmd := &cli.Command{ Name: "wallet", @@ -129,10 +129,10 @@ func WalletCmd() []*cli.Command { Usage: "associate wallet with dataset", UsageText: "delta-dm wallet associate [wallet address]", Flags: []cli.Flag{ - &cli.StringFlag{ + &cli.UintSliceFlag{ Name: "datasets", - Usage: "dataset names to associate with wallet (comma separated)", - Destination: &dataset, + Usage: "dataset ids to associate with wallet (comma separated)", + Destination: &datasetIDs, Required: true, }, &cli.StringFlag{ @@ -148,15 +148,13 @@ func WalletCmd() []*cli.Command { return fmt.Errorf("failed to connect to ddm node: %s", err) } - datasets := strings.Split(dataset, ",") - - if len(datasets) < 1 { - return fmt.Errorf("please provide at least one dataset name") + if len(datasetIDs.Value()) < 1 { + return fmt.Errorf("please provide at least one dataset id") } awb := api.AssociateWalletBody{ Address: walletAddress, - Datasets: datasets, + Datasets: datasetIDs.Value(), } b, err := json.Marshal(awb) @@ -178,10 +176,10 @@ func WalletCmd() []*cli.Command { Name: "list", Usage: "list wallets", Flags: []cli.Flag{ - &cli.StringFlag{ + &cli.UintFlag{ Name: "dataset", - Usage: "filter wallets by dataset", - Destination: &dataset, + Usage: "filter wallets by dataset id", + Destination: &datasetID, }, }, Action: func(c *cli.Context) error { @@ -192,8 +190,8 @@ func WalletCmd() []*cli.Command { url := "/api/v1/wallets" - if dataset != "" { - url += "?dataset=" + dataset + if datasetID != 0 { + url += "?dataset=" + strconv.FormatUint(uint64(datasetID), 10) } res, closer, err := cmd.MakeRequest(http.MethodGet, url, nil) diff --git a/core/database.go b/core/database.go index 68cab33..8e795ca 100644 --- a/core/database.go +++ b/core/database.go @@ -39,6 +39,10 @@ func OpenDatabase(dbDsn string, debug bool) (*gorm.DB, error) { // generate new models. ConfigureModels(DB) // create models. + if debug { + log.Debugf("connected to db at: %s", dbDsn) + } + if err != nil { return nil, err } @@ -46,7 +50,7 @@ func OpenDatabase(dbDsn string, debug bool) (*gorm.DB, error) { } func ConfigureModels(db *gorm.DB) { - err := db.AutoMigrate(&Provider{}, &Dataset{}, &Content{}, &Wallet{}, &ProviderAllowedDatasets{}, &WalletDatasets{}, &Replication{}) + err := db.AutoMigrate(&Provider{}, &Dataset{}, &Content{}, &Wallet{}, &ReplicationProfile{}, &WalletDatasets{}, &Replication{}) if err != nil { log.Fatalf("error migrating database: %s", err) @@ -77,21 +81,21 @@ type Replication struct { // A client is a Storage Provider that is being replicated to type Provider struct { - Key uuid.UUID `json:"key,omitempty" gorm:"type:uuid"` - ActorID string `json:"actor_id" gorm:"primaryKey"` - ActorName string `json:"actor_name,omitempty"` - AllowSelfService bool `json:"allow_self_service,omitempty" gorm:"notnull,default:true"` - BytesReplicated ByteSizes `json:"bytes_replicated,omitempty" gorm:"-"` - CountReplicated uint64 `json:"count_replicated,omitempty" gorm:"-"` - Replications []Replication `json:"replications,omitempty" gorm:"foreignKey:ProviderActorID"` - AllowedDatasets []Dataset `json:"allowed_datasets" gorm:"many2many:provider_allowed_datasets;"` + Key uuid.UUID `json:"key,omitempty" gorm:"type:uuid"` + ActorID string `json:"actor_id" gorm:"primaryKey"` + ActorName string `json:"actor_name,omitempty"` + AllowSelfService bool `json:"allow_self_service,omitempty" gorm:"notnull,default:true"` + BytesReplicated ByteSizes `json:"bytes_replicated,omitempty" gorm:"-"` + CountReplicated uint64 `json:"count_replicated,omitempty" gorm:"-"` + Replications []Replication `json:"replications,omitempty" gorm:"foreignKey:ProviderActorID"` + ReplicationProfiles []ReplicationProfile `json:"replication_profiles" gorm:"foreignKey:ProviderActorID"` } -type ProviderAllowedDatasets struct { - ProviderActorID string `gorm:"primaryKey" json:"provider_actor_id"` - DatasetID uint `gorm:"primaryKey" json:"dataset_id"` - CreatedAt time.Time - DeletedAt gorm.DeletedAt +type ReplicationProfile struct { + ProviderActorID string `gorm:"primaryKey;uniqueIndex:idx_provider_dataset" json:"provider_actor_id"` + DatasetID uint `gorm:"primaryKey;uniqueIndex:idx_provider_dataset" json:"dataset_id"` + Unsealed bool `json:"unsealed"` + Indexed bool `json:"indexed"` } type ByteSizes struct { @@ -102,18 +106,16 @@ type ByteSizes struct { // A Dataset is a collection of CAR files, and is identified by a name/slug type Dataset struct { gorm.Model - Name string `json:"name" gorm:"unique; not null"` - ReplicationQuota uint64 `json:"replication_quota"` - DealDuration uint64 `json:"deal_duration"` - Wallets []Wallet `json:"wallets,omitempty" gorm:"many2many:wallet_datasets;"` - Unsealed bool `json:"unsealed"` - Indexed bool `json:"indexed"` - Contents []Content `json:"contents" gorm:"foreignKey:DatasetName;references:Name"` - BytesReplicated ByteSizes `json:"bytes_replicated,omitempty" gorm:"-"` - BytesTotal ByteSizes `json:"bytes_total,omitempty" gorm:"-"` - CountReplicated uint64 `json:"count_replicated" gorm:"-"` - CountTotal uint64 `json:"count_total" gorm:"-"` - AllowedProviders []Provider `json:"allowed_providers" gorm:"many2many:provider_allowed_datasets;"` + Name string `json:"name" gorm:"unique; not null"` + ReplicationQuota uint64 `json:"replication_quota"` + DealDuration uint64 `json:"deal_duration"` + Wallets []Wallet `json:"wallets,omitempty" gorm:"many2many:wallet_datasets;"` + Contents []Content `json:"contents" gorm:"foreignKey:DatasetID;references:ID"` + BytesReplicated ByteSizes `json:"bytes_replicated,omitempty" gorm:"-"` + BytesTotal ByteSizes `json:"bytes_total,omitempty" gorm:"-"` + CountReplicated uint64 `json:"count_replicated" gorm:"-"` + CountTotal uint64 `json:"count_total" gorm:"-"` + ReplicationProfiles []ReplicationProfile `json:"replication_profiles" gorm:"foreignKey:dataset_id"` } type Content struct { @@ -121,7 +123,7 @@ type Content struct { PayloadCID string `json:"payload_cid" csv:"payloadCid"` Size uint64 `json:"size" csv:"size"` PaddedSize uint64 `json:"padded_size" csv:"paddedSize"` - DatasetName string `json:"dataset_name"` + DatasetID uint `json:"dataset_id"` Replications []Replication `json:"replications,omitempty" gorm:"foreignKey:ContentCommP"` NumReplications uint64 `json:"num_replications"` } diff --git a/docs/api.md b/docs/api.md index 5c67901..d14bf40 100644 --- a/docs/api.md +++ b/docs/api.md @@ -52,8 +52,6 @@ All endpoints (with the exception of `/self-service`) require the `Authorization "name": "delta-test", "replication_quota": 6, "deal_duration": 540, - "unsealed": false, - "indexed": true } ``` @@ -87,8 +85,6 @@ All endpoints (with the exception of `/self-service`) require the `Authorization "type": "secp256k1" } ], - "unsealed": false, - "indexed": true, "contents": null, "count_replicated": 21, // # of successful replications/storage deals "count_total": 210, // total # of contents for this dataset @@ -99,6 +95,14 @@ All endpoints (with the exception of `/self-service`) require the `Authorization "bytes_total": [ 1801001922192, // Raw bytes (the content itself) 3435973836800 // Padded bytes (i.e, filecoin piece) + ], + "replication_profiles": [ + { + "provider_actor_id": "f012345", + "dataset_id": 1, + "unsealed": false, + "indexed": false + } ] }, { @@ -112,8 +116,6 @@ All endpoints (with the exception of `/self-service`) require the `Authorization "dataset_name": "delta-test-2", "type": "secp256k1" }, - "unsealed": false, - "indexed": true, "contents": null, "count_replicated": 14, // # of successful replications/storage deals "count_total": 440, // total # of contents for this dataset @@ -125,6 +127,14 @@ All endpoints (with the exception of `/self-service`) require the `Authorization 1801001922192, // Raw bytes (the content itself) 3435973836800 // Padded bytes (i.e, filecoin piece) ] + "replication_profiles": [ + { + "provider_actor_id": "f012345", + "dataset_id": 2, + "unsealed": false, + "indexed": false + } + ] } ] @@ -135,9 +145,11 @@ All endpoints (with the exception of `/self-service`) require the `Authorization ### POST /contents/:dataset - Add content (CAR files) to the dataset - Accepts three types of input - standard (delta-dm) format, singularity format, or CSV- as defined below +- The :dataset parameter is the ID (uint) of the dataset to add the content to #### Request Params ```jsonc +/dataset // ID of dataset to add content to ?import_type= // singularity or csv. omit for standard format. ``` @@ -206,7 +218,7 @@ baga6ea4seaqhf2ymr6ahkxe3i2txmnqbmltzyf65nwcdvq2hvwmcx4eu4wzl4fi,bafybeif2bu5bdq #### Request Params ```jsonc -:dataset // dataset name to get contents for +/dataset // dataset ID to get contents for ``` @@ -225,7 +237,7 @@ baga6ea4seaqhf2ymr6ahkxe3i2txmnqbmltzyf65nwcdvq2hvwmcx4eu4wzl4fi,bafybeif2bu5bdq "payload_cid": "bafybeifyaefzfalorttcqfcvago2rbide3mnm72geau6xxdl6iewc5leki", "size": 26619574156, "padded_size": 34359738368, - "dataset_name": "delta-test", + "dataset_id": 1, "num_replications": 0 }, { @@ -233,7 +245,7 @@ baga6ea4seaqhf2ymr6ahkxe3i2txmnqbmltzyf65nwcdvq2hvwmcx4eu4wzl4fi,bafybeif2bu5bdq "payload_cid": "bafybeiaupshs7vgsgs5e4y6n7tqkz4ghuyt3teqmqqad6ee5drlbg6dcfq", "size": 24389555373, "padded_size": 34359738368, - "dataset_name": "delta-test", + "dataset_id": 2, "num_replications": 0 } ] @@ -271,7 +283,6 @@ baga6ea4seaqhf2ymr6ahkxe3i2txmnqbmltzyf65nwcdvq2hvwmcx4eu4wzl4fi,bafybeif2bu5bdq { actor_name: "Friendly name" // optional - friendly sp name allow_self_service: "on" // allow self-service replications ("on" or "off") - allowed_datasets: ["delta-test", "delta-test-2"] // list of datasets allowed to be replicated by this SP } ``` @@ -292,34 +303,25 @@ baga6ea4seaqhf2ymr6ahkxe3i2txmnqbmltzyf65nwcdvq2hvwmcx4eu4wzl4fi,bafybeif2bu5bdq { "key": "b3cc8a99-155a-4fff-8974-999ec313e5cc", "actor_id": "f0123456", - "actor_name": "jason", + "actor_name": "friendly sp", "allow_self_service": false, "bytes_replicated": { "raw": 234130249877, "padded": 446676598784 }, "count_replicated": 12, - "allowed_datasets": [ + "replication_profiles": [ { - "ID": 1, - "CreatedAt": "2023-02-28T13:50:15.591038-08:00", - "UpdatedAt": "2023-02-28T13:50:23.928193-08:00", - "DeletedAt": null, - "name": "delta-test", - "replication_quota": 6, - "deal_duration": 540, + "provider_actor_id": "f0123456", + "dataset_id": 1, + "unsealed": false, + "indexed": false + }, + { + "provider_actor_id": "f0123456", + "dataset_id": 2, "unsealed": false, - "indexed": true, - "contents": null, - "bytes_replicated": { - "raw": 0, - "padded": 0 - }, - "bytes_total": { - "raw": 0, - "padded": 0 - }, - "allowed_providers": null + "indexed": false } ] }, @@ -328,7 +330,7 @@ baga6ea4seaqhf2ymr6ahkxe3i2txmnqbmltzyf65nwcdvq2hvwmcx4eu4wzl4fi,bafybeif2bu5bdq "actor_id": "f0998272", "actor_name": "test sp", "allow_self_service": true, - "allowed_datasets": [], + "replication_profiles": [], "bytes_replicated": { "raw": 0, "padded": 0 @@ -408,7 +410,7 @@ Note the response contains two properties. `totalCount` is the total number of r "payload_cid": "bafybeifoxwwx5newgdwnvojyotleh3x3sy7ckndwa2ysioe4corv4ixgti", "size": 18010019221, "padded_size": 34359738368, - "dataset_name": "delta-test", + "dataset_id": 1, "num_replications": 1 }, "deal_time": "2023-03-01T19:16:14.956401-08:00", @@ -428,7 +430,7 @@ Note the response contains two properties. `totalCount` is the total number of r "payload_cid": "bafybeiezpv62emncxbe4adoyipxhzcdy2eqzxx3rde6rdzuqxs57gdsp2q", "size": 18010019221, "padded_size": 34359738368, - "dataset_name": "delta-test", + "dataset_id": 1, "num_replications": 1 }, "deal_time": "2023-03-01T19:16:14.962877-08:00", @@ -448,7 +450,7 @@ Note the response contains two properties. `totalCount` is the total number of r "payload_cid": "bafybeiakf666idv6zs4uksckfkjr76jmvrcuu4neidldxlfngo2vh6jvfe", "size": 18010019222, "padded_size": 34359738368, - "dataset_name": "delta-test", + "dataset_id": 1, "num_replications": 3 }, "deal_time": "2023-03-06T11:09:42.185318-08:00", @@ -469,7 +471,7 @@ Note the response contains two properties. `totalCount` is the total number of r "payload_cid": "bafybeiakf666idv6zs4uksckfkjr76jmvrcuu4neidldxlfngo2vh6jvfe", "size": 18010019222, "padded_size": 34359738368, - "dataset_name": "delta-test", + "dataset_id": 1, "num_replications": 3 }, "deal_time": "2023-03-06T11:11:02.723922-08:00", @@ -629,4 +631,84 @@ For more details, see the [Self-Service API](/docs/self-service.md) documentatio > 200: Success ```sh "successfully made deal with f0123456" -``` \ No newline at end of file +``` + +## /replication-profiles + +### GET /replication-profiles +- Get all replication profiles + +#### Params + + +#### Body + + +#### Response +```json +[ + { + "provider_actor_id": "f012345", + "dataset_id": 1, + "unsealed": false, + "indexed": false + }, + { + "provider_actor_id": "f012345", + "dataset_id": 2, + "unsealed": true, + "indexed": false + } +] +``` + +### PUT /replication-profiles +- Update a replication profile + +#### Params + + +#### Body +```json +{ + "provider_actor_id": "f012345", + "dataset_id": 1, + "unsealed": true, + "indexed": true +} +``` + +- Note: `provider_actor_id` and `dataset_id` cannot be changed with the PUT request - they are used to identify the profile to update. + +#### Response +> 200: Success + +```json +{ + "provider_actor_id": "f012345", + "dataset_id": 1, + "unsealed": true, + "indexed": true +} +``` + +### DELETE /replication-profiles +- Delete a replication profile + +#### Params + + +#### Body +```json +{ + "provider_actor_id": "f012345", + "dataset_id": 2, +} +``` + +#### Response +> 200: Success + +```json +"replication profile with ProviderActorID f012345 and DatasetID 2 deleted successfully" +``` diff --git a/docs/cmd.md b/docs/cmd.md index 46582ec..82a70d6 100644 --- a/docs/cmd.md +++ b/docs/cmd.md @@ -83,16 +83,16 @@ Example: ## replication ### Create a replication -`> ./delta-dm replication create --provider -num [--dataset ] [--delay-start ]` +`> ./delta-dm replication create --provider -num [--dataset ] [--delay-start ]` Example: ```bash -./delta-dm replication create --provider f01000 --num 3 --dataset delta-test --delay-start 3 +./delta-dm replication create --provider f01000 --num 3 --dataset 1 --delay-start 3 ``` ## content ### Import content to a dataset -`> ./delta-dm content import --dataset [--json ] [--csv ] [--singularity ]` +`> ./delta-dm content import --dataset [--json ] [--csv ] [--singularity ]` One of `--json`, `--csv`, or `--singularity` must be provided. @@ -100,8 +100,28 @@ For the expected file format, see the [api docs](api.md##/contents) Example: ```bash -./delta-dm content import --dataset delta-test --json ./content.json +./delta-dm content import --dataset 1 --json ./content.json ``` ### List content in a dataset -`> ./delta-dm content list --dataset ` +`> ./delta-dm content list --dataset ` + + +## replication profiles +- Note: `replication-profile`/`rp` commands take a `dataset id`, you can run `dataset list` to get the id for a dataset. +### Add a replication profile +`> ./delta-dm rp add --spid --dataset [--unsealed] [--indexed]` + +Example: +```bash +./delta-dm rp add --spid f01000 --dataset 1 --unsealed --indexed +``` + +### Modify a replication profile +`> ./delta-dm rp modify --spid --dataset [--unsealed] [--indexed]` + +### Delete a replication profile +`> ./delta-dm rp delete --spid --dataset ` + +### List replication profiles +`> ./delta-dm rp list` \ No newline at end of file diff --git a/docs/howto.md b/docs/howto.md deleted file mode 100644 index e69de29..0000000 diff --git a/docs/singularity-import.md b/docs/singularity-import.md index eeb557d..bc90560 100644 --- a/docs/singularity-import.md +++ b/docs/singularity-import.md @@ -16,15 +16,15 @@ To do this, ensure your Singularity database is running and you have the `DELTA_ **Script Usage** ```bash -./singularity-import.sh +./singularity-import.sh ``` Where - SINGULARITY datasetName - the name of the dataset in Singularity (aka the slug) -- DELTA datasetName - the name of the dataset in Delta +- DELTA datasetID - the ID of the dataset in Delta (you can obtain this with `delta-dm dataset list`) - deltaToken - your delta auth token **Example** ```bash -./singularity-import.sh common-crawl common-crawl EST-XXX-ARY +./singularity-import.sh common-crawl 1 EST-XXX-ARY ``` \ No newline at end of file diff --git a/main.go b/main.go index dfebc8e..ce55b61 100644 --- a/main.go +++ b/main.go @@ -29,6 +29,7 @@ func main() { commands = append(commands, cmd.DaemonCmd(di)...) commands = append(commands, cmd.WalletCmd()...) commands = append(commands, cmd.ReplicationCmd()...) + commands = append(commands, cmd.ReplicationProfilesCmd()...) commands = append(commands, cmd.ProviderCmd()...) commands = append(commands, cmd.DatasetCmd()...) commands = append(commands, cmd.ContentCmd()...) diff --git a/readme.md b/readme.md index ca3dc7f..bdc837a 100644 --- a/readme.md +++ b/readme.md @@ -10,7 +10,7 @@ ## What is this? A tool to manage deal replication tracking for onboarding datasets to the Filecoin network via **import storage deals**. This provides a solution to quickly make deals for massive amounts of data, where the transfer is better handled out-of-band. -## Data Flow +## Core Concepts ### Dataset The top-level logical grouping of data in DDM is the **dataset**. Datasets are identified by a name (aka "slug"), along with a replication quota, deal length, and a wallet to make the deals from. @@ -22,9 +22,12 @@ Once a dataset has been created, content may be added to it. A content represent ### Providers DDM tracks deals to Storage Providers in the network. Add a list of storage providers to DDM before making deals to begin tracking them. +### Replication Profiles +A **Replication Profile** is what ties a **Dataset** together with a **Provider**. It defines the parameters for any deals made to that provider for that dataset. Currently, it allows specifying whether to keep an `unsealed copy` and whether to announce to the `IPNI` indexer. This allows for flexibility in how deals are made to different providers, such as defining a single SP to host the unsealed copies for retrieval while the others maintain a cold copy for backup. + ### Replication -Once a **Dataset**, **Content**, and **Providers** have been specified, the DDM `replication` API can be called to issue a number of import deals out to the providers. +Once a **Dataset**, **Content**, **Providers**, and a **Replication Strategy** have been specified, DDM can make replications for the content to the providers. A **Replication** is a single deal made to a single provider for a single piece of content. Replications are tracked by DDM, and can be queried for status and deal information. # Instructions diff --git a/scripts/singularity-import.sh b/scripts/singularity-import.sh index 782044d..dac7b7d 100755 --- a/scripts/singularity-import.sh +++ b/scripts/singularity-import.sh @@ -1,6 +1,6 @@ #!/bin/bash -# Usage: ./singularity-import.sh +# Usage: ./singularity-import.sh query="{\"datasetName\": \"$1\", \"pieceSize\": { \"\$gt\": 0 }}" DELTA_TOKEN="Authorization: Bearer $3"