Skip to content

Commit

Permalink
Refactor: dataset - use id on all endpoints and cli (#65)
Browse files Browse the repository at this point in the history
* feat: build out replication profiles model and route

* feat: wire up replication profile to dealmaking

* feat: update docs

* feat: update cli

* feat: update readmes

* feat: cleanup

* refactor: update models

* refactor: update api and cli to use ds id

* refactor: update docs, wallet assoc cmd and cli

---------

Co-authored-by: Jason Cihelka <jcihelka@isotechnics.com>
  • Loading branch information
jcace and Jason Cihelka authored May 31, 2023
1 parent a561b72 commit eac0132
Show file tree
Hide file tree
Showing 23 changed files with 631 additions and 218 deletions.
35 changes: 27 additions & 8 deletions api/contents.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package api

import (
"errors"
"fmt"
"io/ioutil"
"strconv"

"github.com/application-research/delta-dm/core"
"github.com/jszwec/csvutil"
"github.com/labstack/echo/v4"
"gorm.io/gorm"
)

func ConfigureContentsRouter(e *echo.Group, dldm *core.DeltaDM) {
Expand All @@ -21,12 +24,21 @@ func ConfigureContentsRouter(e *echo.Group, dldm *core.DeltaDM) {
d := c.Param("dataset")

if d == "" {
return fmt.Errorf("dataset must be specified")
return fmt.Errorf("dataset id must be specified")
}
did, err := strconv.ParseUint(d, 10, 64)
if err != nil {
return fmt.Errorf("dataset id must be numeric %s", err)
}

dldm.DB.Where("name = ?", d).First(&dataset)
if tx := dldm.DB.First(&dataset, did); tx.Error != nil {
if errors.Is(tx.Error, gorm.ErrRecordNotFound) {
return fmt.Errorf("dataset not found")
}
return fmt.Errorf("failed to get dataset: %s", tx.Error)
}

err := dldm.DB.Model(&dataset).Association("Contents").Find(&content)
err = dldm.DB.Model(&dataset).Association("Contents").Find(&content)
if err != nil {
return err
}
Expand All @@ -46,14 +58,21 @@ func ConfigureContentsRouter(e *echo.Group, dldm *core.DeltaDM) {
}

d := c.Param("dataset")

if d == "" {
return fmt.Errorf("dataset must be specified")
return fmt.Errorf("dataset id must be specified")
}
did, err := strconv.ParseUint(d, 10, 64)
if err != nil {
return fmt.Errorf("dataset id must be numeric %s", err)
}

// Check if dataset exists
res := dldm.DB.Where("name = ?", d).First(&dataset)
if res.Error != nil {
return fmt.Errorf("could not find dataset %s : %s", d, res.Error)
if tx := dldm.DB.First(&dataset, did); tx.Error != nil {
if errors.Is(tx.Error, gorm.ErrRecordNotFound) {
return fmt.Errorf("dataset not found")
}
return fmt.Errorf("failed to get dataset: %s", tx.Error)
}

it := c.QueryParam("import_type")
Expand Down Expand Up @@ -90,7 +109,7 @@ func ConfigureContentsRouter(e *echo.Group, dldm *core.DeltaDM) {
continue
}

cnt.DatasetName = dataset.Name
cnt.DatasetID = dataset.ID

err := dldm.DB.Create(&cnt).Error
if err != nil {
Expand Down
13 changes: 5 additions & 8 deletions api/datasets.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/application-research/delta-dm/core"
"github.com/application-research/delta-dm/util"
"github.com/labstack/echo/v4"
"gorm.io/gorm"
)

func ConfigureDatasetsRouter(e *echo.Group, dldm *core.DeltaDM) {
Expand All @@ -17,25 +16,23 @@ func ConfigureDatasetsRouter(e *echo.Group, dldm *core.DeltaDM) {
datasets.GET("", func(c echo.Context) error {
var ds []core.Dataset

dldm.DB.Preload("Wallets").Preload("AllowedProviders", func(db *gorm.DB) *gorm.DB {
return db.Select("actor_id")
}).Find(&ds)
dldm.DB.Preload("Wallets").Preload("ReplicationProfiles").Find(&ds)

// Find # of bytes total and replicated for each dataset
for i, d := range ds {
var rb [2]uint64
dldm.DB.Raw("select SUM(size) s, SUM(padded_size) ps FROM contents c inner join replications r on r.content_comm_p = c.comm_p where r.status = 'SUCCESS' AND dataset_name = ?", d.Name).Row().Scan(&rb[0], &rb[1])
dldm.DB.Raw("select SUM(size) s, SUM(padded_size) ps FROM contents c inner join replications r on r.content_comm_p = c.comm_p where r.status = 'SUCCESS' AND dataset_id = ?", d.ID).Row().Scan(&rb[0], &rb[1])

var tb [2]uint64
dldm.DB.Raw("select SUM(size) s, SUM(padded_size) ps FROM contents where dataset_name = ?", d.Name).Row().Scan(&tb[0], &tb[1])
dldm.DB.Raw("select SUM(size) s, SUM(padded_size) ps FROM contents where dataset_id = ?", d.ID).Row().Scan(&tb[0], &tb[1])

ds[i].BytesReplicated = core.ByteSizes{Raw: rb[0], Padded: rb[1]}
ds[i].BytesTotal = core.ByteSizes{Raw: tb[0], Padded: tb[1]}

var countReplicated uint64 = 0
var countTotal uint64 = 0
dldm.DB.Raw("select count(*) cr FROM contents c inner join replications r on r.content_comm_p = c.comm_p where r.status = 'SUCCESS' AND dataset_name = ?", d.Name).Row().Scan(&countReplicated)
dldm.DB.Raw("select count(*) cr FROM contents c where dataset_name = ?", d.Name).Row().Scan(&countTotal)
dldm.DB.Raw("select count(*) cr FROM contents c inner join replications r on r.content_comm_p = c.comm_p where r.status = 'SUCCESS' AND dataset_id = ?", d.ID).Row().Scan(&countReplicated)
dldm.DB.Raw("select count(*) cr FROM contents c where dataset_id = ?", d.ID).Row().Scan(&countTotal)

ds[i].CountReplicated = countReplicated
ds[i].CountTotal = countTotal
Expand Down
23 changes: 3 additions & 20 deletions api/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import (
)

type ProviderPutBody struct {
ActorName string `json:"actor_name"`
AllowSelfService string `json:"allow_self_service"`
AllowedDatasets []string `json:"allowed_datasets"`
ActorName string `json:"actor_name"`
AllowSelfService string `json:"allow_self_service"`
}

func ConfigureProvidersRouter(e *echo.Group, dldm *core.DeltaDM) {
Expand All @@ -24,7 +23,7 @@ func ConfigureProvidersRouter(e *echo.Group, dldm *core.DeltaDM) {
providers.GET("", func(c echo.Context) error {
var p []core.Provider

dldm.DB.Preload("AllowedDatasets").Find(&p)
dldm.DB.Preload("ReplicationProfiles").Find(&p)

for i, sp := range p {
var rb [2]uint64
Expand Down Expand Up @@ -93,22 +92,6 @@ func ConfigureProvidersRouter(e *echo.Group, dldm *core.DeltaDM) {
existing.AllowSelfService = false
}

// If array of allowed datasets is empty, don't modify the association
if len(p.AllowedDatasets) > 0 {
var newAllowedDatasets []core.Dataset
for _, ds_name := range p.AllowedDatasets {
var ds core.Dataset
res := dldm.DB.Model(&core.Dataset{}).Where("name = ?", ds_name).First(&ds)
if res.Error != nil {
return fmt.Errorf("error fetching dataset %s : %s", ds_name, res.Error)
} else {
newAllowedDatasets = append(newAllowedDatasets, ds)
}
}

dldm.DB.Model(&existing).Association("AllowedDatasets").Replace(newAllowedDatasets)
}

res = dldm.DB.Save(&existing)
if res.Error != nil {
return fmt.Errorf("error saving provider %s", res.Error)
Expand Down
122 changes: 122 additions & 0 deletions api/replication-profiles.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package api

import (
"fmt"
"net/http"

"github.com/application-research/delta-dm/core"
"github.com/labstack/echo/v4"
)

type ReplicationProfile struct {
DatasetName string `json:"dataset_name"`
Unsealed bool `json:"unsealed"`
Indexed bool `json:"indexed"`
}

func ConfigureReplicationProfilesRouter(e *echo.Group, dldm *core.DeltaDM) {
replicationProfiles := e.Group("/replication-profiles")

replicationProfiles.Use(dldm.AS.AuthMiddleware)

replicationProfiles.GET("", func(c echo.Context) error {
var p []core.ReplicationProfile

res := dldm.DB.Model(&core.ReplicationProfile{}).Find(&p)
if res.Error != nil {
return fmt.Errorf("error finding replication profiles: %s", res.Error)
}

return c.JSON(http.StatusOK, p)
})

replicationProfiles.POST("", func(c echo.Context) error {
var p core.ReplicationProfile

if err := c.Bind(&p); err != nil {
return fmt.Errorf("failed to parse request body: %s", err.Error())
}

// Check if the dataset and provider exist
var ds core.Dataset
var provider core.Provider

dsRes := dldm.DB.Where("id = ?", p.DatasetID).First(&ds)
providerRes := dldm.DB.Where("actor_id = ?", p.ProviderActorID).First(&provider)

if dsRes.Error != nil || ds.ID == 0 {
return fmt.Errorf("invalid dataset: %s", dsRes.Error)
}

if providerRes.Error != nil || provider.ActorID == "" {
return fmt.Errorf("invalid provider: %s", providerRes.Error)
}

// Save the replication profile
res := dldm.DB.Create(&p)
if res.Error != nil {
if res.Error.Error() == "UNIQUE constraint failed: replication_profiles.provider_actor_id, replication_profiles.dataset_id" {
return fmt.Errorf("replication profile for provider %s and datasetID %d already exists", p.ProviderActorID, p.DatasetID)
}
return fmt.Errorf("failed to save replication profile: %s", res.Error.Error())
}

return c.JSON(http.StatusOK, p)
})

replicationProfiles.DELETE("", func(c echo.Context) error {
var p core.ReplicationProfile
if err := c.Bind(&p); err != nil {
return fmt.Errorf("failed to parse request body: %s", err.Error())
}

if p.DatasetID == 0 || p.ProviderActorID == "" {
return fmt.Errorf("invalid replication profile ID")
}

var existingProfile core.ReplicationProfile
res := dldm.DB.Where("provider_actor_id = ? AND dataset_id = ?", p.ProviderActorID, p.DatasetID).First(&existingProfile)

if res.Error != nil {
return fmt.Errorf("replication profile not found: %s", res.Error)
}

deleteRes := dldm.DB.Delete(&existingProfile)
if deleteRes.Error != nil {
return fmt.Errorf("failed to delete replication profile: %s", deleteRes.Error.Error())
}

return c.JSON(http.StatusOK, fmt.Sprintf("replication profile with ProviderActorID %s and DatasetID %d deleted successfully", p.ProviderActorID, p.DatasetID))
})

replicationProfiles.PUT("", func(c echo.Context) error {
var updatedProfile core.ReplicationProfile
if err := c.Bind(&updatedProfile); err != nil {
return fmt.Errorf("failed to parse request body: %s", err.Error())
}

if updatedProfile.DatasetID == 0 || updatedProfile.ProviderActorID == "" {
return fmt.Errorf("invalid replication profile ID")
}

var existingProfile core.ReplicationProfile
res := dldm.DB.Where("provider_actor_id = ? AND dataset_id = ?", updatedProfile.ProviderActorID, updatedProfile.DatasetID).First(&existingProfile)

if res.Error != nil {
return fmt.Errorf("replication profile not found: %s", res.Error)
}

updateData := map[string]interface{}{
"unsealed": updatedProfile.Unsealed,
"indexed": updatedProfile.Indexed,
}

updateRes := dldm.DB.Model(&existingProfile).Updates(updateData)
if updateRes.Error != nil {
return fmt.Errorf("failed to update replication profile: %s", updateRes.Error.Error())
}

return c.JSON(http.StatusOK, updatedProfile)
})

}
Loading

0 comments on commit eac0132

Please sign in to comment.