Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: replication profile #58

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions api/datasets.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/application-research/delta-dm/core"
"github.com/application-research/delta-dm/util"
"github.com/labstack/echo/v4"
"gorm.io/gorm"
)

func ConfigureDatasetsRouter(e *echo.Group, dldm *core.DeltaDM) {
Expand All @@ -17,9 +16,7 @@ func ConfigureDatasetsRouter(e *echo.Group, dldm *core.DeltaDM) {
datasets.GET("", func(c echo.Context) error {
var ds []core.Dataset

dldm.DB.Preload("Wallets").Preload("AllowedProviders", func(db *gorm.DB) *gorm.DB {
return db.Select("actor_id")
}).Find(&ds)
dldm.DB.Preload("Wallets").Preload("ReplicationProfiles").Find(&ds)

// Find # of bytes total and replicated for each dataset
for i, d := range ds {
Expand Down
23 changes: 3 additions & 20 deletions api/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import (
)

type ProviderPutBody struct {
ActorName string `json:"actor_name"`
AllowSelfService string `json:"allow_self_service"`
AllowedDatasets []string `json:"allowed_datasets"`
ActorName string `json:"actor_name"`
AllowSelfService string `json:"allow_self_service"`
}

func ConfigureProvidersRouter(e *echo.Group, dldm *core.DeltaDM) {
Expand All @@ -24,7 +23,7 @@ func ConfigureProvidersRouter(e *echo.Group, dldm *core.DeltaDM) {
providers.GET("", func(c echo.Context) error {
var p []core.Provider

dldm.DB.Preload("AllowedDatasets").Find(&p)
dldm.DB.Preload("ReplicationProfiles").Find(&p)

for i, sp := range p {
var rb [2]uint64
Expand Down Expand Up @@ -93,22 +92,6 @@ func ConfigureProvidersRouter(e *echo.Group, dldm *core.DeltaDM) {
existing.AllowSelfService = false
}

// If array of allowed datasets is empty, don't modify the association
if len(p.AllowedDatasets) > 0 {
var newAllowedDatasets []core.Dataset
for _, ds_name := range p.AllowedDatasets {
var ds core.Dataset
res := dldm.DB.Model(&core.Dataset{}).Where("name = ?", ds_name).First(&ds)
if res.Error != nil {
return fmt.Errorf("error fetching dataset %s : %s", ds_name, res.Error)
} else {
newAllowedDatasets = append(newAllowedDatasets, ds)
}
}

dldm.DB.Model(&existing).Association("AllowedDatasets").Replace(newAllowedDatasets)
}

res = dldm.DB.Save(&existing)
if res.Error != nil {
return fmt.Errorf("error saving provider %s", res.Error)
Expand Down
122 changes: 122 additions & 0 deletions api/replication-profiles.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package api

import (
"fmt"
"net/http"

"github.com/application-research/delta-dm/core"
"github.com/labstack/echo/v4"
)

type ReplicationProfile struct {
DatasetName string `json:"dataset_name"`
Unsealed bool `json:"unsealed"`
Indexed bool `json:"indexed"`
}

func ConfigureReplicationProfilesRouter(e *echo.Group, dldm *core.DeltaDM) {
replicationProfiles := e.Group("/replication-profiles")

replicationProfiles.Use(dldm.AS.AuthMiddleware)

replicationProfiles.GET("", func(c echo.Context) error {
var p []core.ReplicationProfile

res := dldm.DB.Model(&core.ReplicationProfile{}).Find(&p)
if res.Error != nil {
return fmt.Errorf("error finding replication profiles: %s", res.Error)
}

return c.JSON(http.StatusOK, p)
})

replicationProfiles.POST("", func(c echo.Context) error {
var p core.ReplicationProfile

if err := c.Bind(&p); err != nil {
return fmt.Errorf("failed to parse request body: %s", err.Error())
}

// Check if the dataset and provider exist
var ds core.Dataset
var provider core.Provider

dsRes := dldm.DB.Where("id = ?", p.DatasetID).First(&ds)
providerRes := dldm.DB.Where("actor_id = ?", p.ProviderActorID).First(&provider)

if dsRes.Error != nil || ds.ID == 0 {
return fmt.Errorf("invalid dataset: %s", dsRes.Error)
}

if providerRes.Error != nil || provider.ActorID == "" {
return fmt.Errorf("invalid provider: %s", providerRes.Error)
}

// Save the replication profile
res := dldm.DB.Create(&p)
if res.Error != nil {
if res.Error.Error() == "UNIQUE constraint failed: replication_profiles.provider_actor_id, replication_profiles.dataset_id" {
return fmt.Errorf("replication profile for provider %s and datasetID %d already exists", p.ProviderActorID, p.DatasetID)
}
return fmt.Errorf("failed to save replication profile: %s", res.Error.Error())
}

return c.JSON(http.StatusOK, p)
})

replicationProfiles.DELETE("", func(c echo.Context) error {
var p core.ReplicationProfile
if err := c.Bind(&p); err != nil {
return fmt.Errorf("failed to parse request body: %s", err.Error())
}

if p.DatasetID == 0 || p.ProviderActorID == "" {
return fmt.Errorf("invalid replication profile ID")
}

var existingProfile core.ReplicationProfile
res := dldm.DB.Where("provider_actor_id = ? AND dataset_id = ?", p.ProviderActorID, p.DatasetID).First(&existingProfile)

if res.Error != nil {
return fmt.Errorf("replication profile not found: %s", res.Error)
}

deleteRes := dldm.DB.Delete(&existingProfile)
if deleteRes.Error != nil {
return fmt.Errorf("failed to delete replication profile: %s", deleteRes.Error.Error())
}

return c.JSON(http.StatusOK, fmt.Sprintf("replication profile with ProviderActorID %s and DatasetID %d deleted successfully", p.ProviderActorID, p.DatasetID))
})

replicationProfiles.PUT("", func(c echo.Context) error {
var updatedProfile core.ReplicationProfile
if err := c.Bind(&updatedProfile); err != nil {
return fmt.Errorf("failed to parse request body: %s", err.Error())
}

if updatedProfile.DatasetID == 0 || updatedProfile.ProviderActorID == "" {
return fmt.Errorf("invalid replication profile ID")
}

var existingProfile core.ReplicationProfile
res := dldm.DB.Where("provider_actor_id = ? AND dataset_id = ?", updatedProfile.ProviderActorID, updatedProfile.DatasetID).First(&existingProfile)

if res.Error != nil {
return fmt.Errorf("replication profile not found: %s", res.Error)
}

updateData := map[string]interface{}{
"unsealed": updatedProfile.Unsealed,
"indexed": updatedProfile.Indexed,
}

updateRes := dldm.DB.Model(&existingProfile).Updates(updateData)
if updateRes.Error != nil {
return fmt.Errorf("failed to update replication profile: %s", updateRes.Error.Error())
}

return c.JSON(http.StatusOK, updatedProfile)
})

}
14 changes: 6 additions & 8 deletions api/replications.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ func handlePostReplications(c echo.Context, dldm *core.DeltaDM) error {
ConnectionMode: "import",
Miner: d.Provider,
Size: c.Size,
SkipIpniAnnounce: !c.Indexed,
RemoveUnsealedCopy: !c.Unsealed,
SkipIpniAnnounce: !c.ReplicationProfile.Indexed,
RemoveUnsealedCopy: !c.ReplicationProfile.Unsealed,
DurationInDays: c.DealDuration,
StartEpochInDays: delayStartEpoch,
PieceCommitment: core.PieceCommitment{
Expand All @@ -278,6 +278,7 @@ func handlePostReplications(c echo.Context, dldm *core.DeltaDM) error {
type replicatedContentQueryResponse struct {
core.Content
core.Dataset
core.ReplicationProfile
}

// Query the database for all contant that does not have replications to this actor yet
Expand All @@ -290,6 +291,7 @@ func findUnreplicatedContentForProvider(db *gorm.DB, providerID string, datasetN
SELECT *
FROM datasets d
INNER JOIN contents c ON d.name = c.dataset_name
INNER JOIN replication_profiles rp ON rp.dataset_id = d.id
-- Only select content that does not have a non-failed replication to this provider
WHERE c.comm_p NOT IN (
SELECT r.content_comm_p
Expand All @@ -301,12 +303,8 @@ func findUnreplicatedContentForProvider(db *gorm.DB, providerID string, datasetN
WHERE p.actor_id <> ?
)
)
-- Only select content from datasets that this provider is allowed to replicate
AND d.id IN (
SELECT dataset_id
FROM provider_allowed_datasets
WHERE provider_actor_id = ?
)
-- Only select content from datasets that this provider is allowed to replicate
AND rp.provider_actor_id = ?
AND c.num_replications < d.replication_quota
`
var rawValues = []interface{}{providerID, providerID}
Expand Down
1 change: 1 addition & 0 deletions api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func InitializeEchoRouterConfig(dldm *core.DeltaDM) {
ConfigureWalletsRouter(apiGroup, dldm)
ConfigureHealthRouter(apiGroup, dldm)
ConfigureSelfServiceRouter(apiGroup, dldm)
ConfigureReplicationProfilesRouter(apiGroup, dldm)
// Start server
e.Logger.Fatal(e.Start("0.0.0.0:1314")) // configuration
}
Expand Down
26 changes: 17 additions & 9 deletions api/selfservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func selfServiceTokenMiddleware(dldm *core.DeltaDM) echo.MiddlewareFunc {
return c.String(401, "missing provider self-service token")
}
var p core.Provider
res := dldm.DB.Model(&core.Provider{}).Preload("AllowedDatasets").Where("key = ?", providerToken).Find(&p)
res := dldm.DB.Model(&core.Provider{}).Preload("ReplicationProfiles").Where("key = ?", providerToken).Find(&p)

if res.Error != nil {
log.Errorf("error finding provider: %s", res.Error)
Expand Down Expand Up @@ -93,10 +93,12 @@ func handleSelfServiceByCid(c echo.Context, dldm *core.DeltaDM) error {
return fmt.Errorf("unable to find associated dataset %s", cnt.DatasetName)
}

var rp core.ReplicationProfile
isAllowed := false
for _, allowedDs := range p.AllowedDatasets {
if allowedDs.Name == ds.Name {
for _, thisRp := range p.ReplicationProfiles {
if thisRp.DatasetID == ds.ID {
isAllowed = true
rp = thisRp
break
}
}
Expand Down Expand Up @@ -133,8 +135,8 @@ func handleSelfServiceByCid(c echo.Context, dldm *core.DeltaDM) error {
ConnectionMode: "import",
Miner: p.ActorID,
Size: cnt.Size,
SkipIpniAnnounce: !ds.Indexed,
RemoveUnsealedCopy: !ds.Unsealed,
SkipIpniAnnounce: !rp.Indexed,
RemoveUnsealedCopy: !rp.Unsealed,
DurationInDays: ds.DealDuration,
StartEpochInDays: delayDays,
PieceCommitment: core.PieceCommitment{
Expand All @@ -159,6 +161,12 @@ func handleSelfServiceByDataset(c echo.Context, dldm *core.DeltaDM) error {
return fmt.Errorf("must provide a dataset name")
}

var ds core.Dataset
dsRes := dldm.DB.Where("name = ?", dataset).First(&ds)
if dsRes.Error != nil || ds.ID == 0 {
return fmt.Errorf("invalid dataset: %s", dsRes.Error)
}

var delayDays uint64 = 3
if startEpochDelay != "" {
var err error
Expand All @@ -175,8 +183,8 @@ func handleSelfServiceByDataset(c echo.Context, dldm *core.DeltaDM) error {
p := c.Get(PROVIDER).(core.Provider)

isAllowed := false
for _, ds := range p.AllowedDatasets {
if ds.Name == dataset {
for _, rp := range p.ReplicationProfiles {
if rp.DatasetID == ds.ID {
isAllowed = true
break
}
Expand Down Expand Up @@ -215,8 +223,8 @@ func handleSelfServiceByDataset(c echo.Context, dldm *core.DeltaDM) error {
ConnectionMode: "import",
Miner: p.ActorID,
Size: deal.Size,
SkipIpniAnnounce: !deal.Indexed,
RemoveUnsealedCopy: !deal.Unsealed,
SkipIpniAnnounce: !deal.ReplicationProfile.Indexed,
RemoveUnsealedCopy: !deal.ReplicationProfile.Unsealed,
DurationInDays: deal.DealDuration - delayDays,
StartEpochInDays: delayDays,
PieceCommitment: core.PieceCommitment{
Expand Down
1 change: 0 additions & 1 deletion cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ func DaemonCmd(di core.DeploymentInfo) []*cli.Command {
var deltaAuthToken string
var authServer string

// add a command to run API node
var daemonCommands []*cli.Command
daemonCmd := &cli.Command{
Name: "daemon",
Expand Down
21 changes: 0 additions & 21 deletions cmd/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ func DatasetCmd() []*cli.Command {
var datasetName string
var replicationQuota uint64
var dealDuration uint64
var keepUnsealedCopy bool
var announceToIndexer bool

// add a command to run API node
var datasetCmds []*cli.Command
datasetCmd := &cli.Command{
Name: "dataset",
Expand Down Expand Up @@ -50,22 +47,6 @@ func DatasetCmd() []*cli.Command {
Value: 540,
Destination: &dealDuration,
},
&cli.BoolFlag{
Name: "unsealed",
Aliases: []string{"u"},
Usage: "flag to keep unsealed copy when deals are made",
DefaultText: "true",
Value: true,
Destination: &keepUnsealedCopy,
},
&cli.BoolFlag{
Name: "indexed",
Aliases: []string{"i"},
Usage: "flag to announce to IPNI (Interplanetary Network Indexer)",
DefaultText: "true",
Value: true,
Destination: &announceToIndexer,
},
},
Action: func(c *cli.Context) error {
cmd, err := NewCmdProcessor(c)
Expand All @@ -81,8 +62,6 @@ func DatasetCmd() []*cli.Command {
Name: datasetName,
ReplicationQuota: replicationQuota,
DealDuration: dealDuration,
Indexed: announceToIndexer,
Unsealed: keepUnsealedCopy,
}

b, err := json.Marshal(body)
Expand Down
8 changes: 0 additions & 8 deletions cmd/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ func ProviderCmd() []*cli.Command {
var spId string
var spName string
var allowSelfService string
var allowedDatasets cli.StringSlice

// add a command to run API node
var providerCmds []*cli.Command
providerCmd := &cli.Command{
Name: "provider",
Expand Down Expand Up @@ -90,11 +88,6 @@ func ProviderCmd() []*cli.Command {
Usage: "enable self-service for provider (on|off)",
Destination: &allowSelfService,
},
&cli.StringSliceFlag{
Name: "allowed-datasets",
Usage: "datasets the provider is permitted to replicate (comma separated list)",
Destination: &allowedDatasets,
},
},
Action: func(c *cli.Context) error {
cmd, err := NewCmdProcessor(c)
Expand All @@ -111,7 +104,6 @@ func ProviderCmd() []*cli.Command {
body := api.ProviderPutBody{
ActorName: spName,
AllowSelfService: allowSelfService,
AllowedDatasets: allowedDatasets.Value(),
}

b, err := json.Marshal(body)
Expand Down
Loading