Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: dataset - use id on all endpoints and cli #65

Merged
merged 9 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions api/contents.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package api

import (
"errors"
"fmt"
"io/ioutil"
"strconv"

"github.com/application-research/delta-dm/core"
"github.com/jszwec/csvutil"
"github.com/labstack/echo/v4"
"gorm.io/gorm"
)

func ConfigureContentsRouter(e *echo.Group, dldm *core.DeltaDM) {
Expand All @@ -21,12 +24,21 @@ func ConfigureContentsRouter(e *echo.Group, dldm *core.DeltaDM) {
d := c.Param("dataset")

if d == "" {
return fmt.Errorf("dataset must be specified")
return fmt.Errorf("dataset id must be specified")
}
did, err := strconv.ParseUint(d, 10, 64)
if err != nil {
return fmt.Errorf("dataset id must be numeric %s", err)
}

dldm.DB.Where("name = ?", d).First(&dataset)
if tx := dldm.DB.First(&dataset, did); tx.Error != nil {
if errors.Is(tx.Error, gorm.ErrRecordNotFound) {
return fmt.Errorf("dataset not found")
}
return fmt.Errorf("failed to get dataset: %s", tx.Error)
}

err := dldm.DB.Model(&dataset).Association("Contents").Find(&content)
err = dldm.DB.Model(&dataset).Association("Contents").Find(&content)
if err != nil {
return err
}
Expand All @@ -46,14 +58,21 @@ func ConfigureContentsRouter(e *echo.Group, dldm *core.DeltaDM) {
}

d := c.Param("dataset")

if d == "" {
return fmt.Errorf("dataset must be specified")
return fmt.Errorf("dataset id must be specified")
}
did, err := strconv.ParseUint(d, 10, 64)
if err != nil {
return fmt.Errorf("dataset id must be numeric %s", err)
}

// Check if dataset exists
res := dldm.DB.Where("name = ?", d).First(&dataset)
if res.Error != nil {
return fmt.Errorf("could not find dataset %s : %s", d, res.Error)
if tx := dldm.DB.First(&dataset, did); tx.Error != nil {
if errors.Is(tx.Error, gorm.ErrRecordNotFound) {
return fmt.Errorf("dataset not found")
}
return fmt.Errorf("failed to get dataset: %s", tx.Error)
}

it := c.QueryParam("import_type")
Expand Down Expand Up @@ -90,7 +109,7 @@ func ConfigureContentsRouter(e *echo.Group, dldm *core.DeltaDM) {
continue
}

cnt.DatasetName = dataset.Name
cnt.DatasetID = dataset.ID

err := dldm.DB.Create(&cnt).Error
if err != nil {
Expand Down
13 changes: 5 additions & 8 deletions api/datasets.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/application-research/delta-dm/core"
"github.com/application-research/delta-dm/util"
"github.com/labstack/echo/v4"
"gorm.io/gorm"
)

func ConfigureDatasetsRouter(e *echo.Group, dldm *core.DeltaDM) {
Expand All @@ -17,25 +16,23 @@ func ConfigureDatasetsRouter(e *echo.Group, dldm *core.DeltaDM) {
datasets.GET("", func(c echo.Context) error {
var ds []core.Dataset

dldm.DB.Preload("Wallets").Preload("AllowedProviders", func(db *gorm.DB) *gorm.DB {
return db.Select("actor_id")
}).Find(&ds)
dldm.DB.Preload("Wallets").Preload("ReplicationProfiles").Find(&ds)

// Find # of bytes total and replicated for each dataset
for i, d := range ds {
var rb [2]uint64
dldm.DB.Raw("select SUM(size) s, SUM(padded_size) ps FROM contents c inner join replications r on r.content_comm_p = c.comm_p where r.status = 'SUCCESS' AND dataset_name = ?", d.Name).Row().Scan(&rb[0], &rb[1])
dldm.DB.Raw("select SUM(size) s, SUM(padded_size) ps FROM contents c inner join replications r on r.content_comm_p = c.comm_p where r.status = 'SUCCESS' AND dataset_id = ?", d.ID).Row().Scan(&rb[0], &rb[1])

var tb [2]uint64
dldm.DB.Raw("select SUM(size) s, SUM(padded_size) ps FROM contents where dataset_name = ?", d.Name).Row().Scan(&tb[0], &tb[1])
dldm.DB.Raw("select SUM(size) s, SUM(padded_size) ps FROM contents where dataset_id = ?", d.ID).Row().Scan(&tb[0], &tb[1])

ds[i].BytesReplicated = core.ByteSizes{Raw: rb[0], Padded: rb[1]}
ds[i].BytesTotal = core.ByteSizes{Raw: tb[0], Padded: tb[1]}

var countReplicated uint64 = 0
var countTotal uint64 = 0
dldm.DB.Raw("select count(*) cr FROM contents c inner join replications r on r.content_comm_p = c.comm_p where r.status = 'SUCCESS' AND dataset_name = ?", d.Name).Row().Scan(&countReplicated)
dldm.DB.Raw("select count(*) cr FROM contents c where dataset_name = ?", d.Name).Row().Scan(&countTotal)
dldm.DB.Raw("select count(*) cr FROM contents c inner join replications r on r.content_comm_p = c.comm_p where r.status = 'SUCCESS' AND dataset_id = ?", d.ID).Row().Scan(&countReplicated)
dldm.DB.Raw("select count(*) cr FROM contents c where dataset_id = ?", d.ID).Row().Scan(&countTotal)

ds[i].CountReplicated = countReplicated
ds[i].CountTotal = countTotal
Expand Down
23 changes: 3 additions & 20 deletions api/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import (
)

type ProviderPutBody struct {
ActorName string `json:"actor_name"`
AllowSelfService string `json:"allow_self_service"`
AllowedDatasets []string `json:"allowed_datasets"`
ActorName string `json:"actor_name"`
AllowSelfService string `json:"allow_self_service"`
}

func ConfigureProvidersRouter(e *echo.Group, dldm *core.DeltaDM) {
Expand All @@ -24,7 +23,7 @@ func ConfigureProvidersRouter(e *echo.Group, dldm *core.DeltaDM) {
providers.GET("", func(c echo.Context) error {
var p []core.Provider

dldm.DB.Preload("AllowedDatasets").Find(&p)
dldm.DB.Preload("ReplicationProfiles").Find(&p)

for i, sp := range p {
var rb [2]uint64
Expand Down Expand Up @@ -93,22 +92,6 @@ func ConfigureProvidersRouter(e *echo.Group, dldm *core.DeltaDM) {
existing.AllowSelfService = false
}

// If array of allowed datasets is empty, don't modify the association
if len(p.AllowedDatasets) > 0 {
var newAllowedDatasets []core.Dataset
for _, ds_name := range p.AllowedDatasets {
var ds core.Dataset
res := dldm.DB.Model(&core.Dataset{}).Where("name = ?", ds_name).First(&ds)
if res.Error != nil {
return fmt.Errorf("error fetching dataset %s : %s", ds_name, res.Error)
} else {
newAllowedDatasets = append(newAllowedDatasets, ds)
}
}

dldm.DB.Model(&existing).Association("AllowedDatasets").Replace(newAllowedDatasets)
}

res = dldm.DB.Save(&existing)
if res.Error != nil {
return fmt.Errorf("error saving provider %s", res.Error)
Expand Down
122 changes: 122 additions & 0 deletions api/replication-profiles.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package api

import (
"fmt"
"net/http"

"github.com/application-research/delta-dm/core"
"github.com/labstack/echo/v4"
)

type ReplicationProfile struct {
DatasetName string `json:"dataset_name"`
Unsealed bool `json:"unsealed"`
Indexed bool `json:"indexed"`
}

func ConfigureReplicationProfilesRouter(e *echo.Group, dldm *core.DeltaDM) {
replicationProfiles := e.Group("/replication-profiles")

replicationProfiles.Use(dldm.AS.AuthMiddleware)

replicationProfiles.GET("", func(c echo.Context) error {
var p []core.ReplicationProfile

res := dldm.DB.Model(&core.ReplicationProfile{}).Find(&p)
if res.Error != nil {
return fmt.Errorf("error finding replication profiles: %s", res.Error)
}

return c.JSON(http.StatusOK, p)
})

replicationProfiles.POST("", func(c echo.Context) error {
var p core.ReplicationProfile

if err := c.Bind(&p); err != nil {
return fmt.Errorf("failed to parse request body: %s", err.Error())
}

// Check if the dataset and provider exist
var ds core.Dataset
var provider core.Provider

dsRes := dldm.DB.Where("id = ?", p.DatasetID).First(&ds)
providerRes := dldm.DB.Where("actor_id = ?", p.ProviderActorID).First(&provider)

if dsRes.Error != nil || ds.ID == 0 {
return fmt.Errorf("invalid dataset: %s", dsRes.Error)
}

if providerRes.Error != nil || provider.ActorID == "" {
return fmt.Errorf("invalid provider: %s", providerRes.Error)
}

// Save the replication profile
res := dldm.DB.Create(&p)
if res.Error != nil {
if res.Error.Error() == "UNIQUE constraint failed: replication_profiles.provider_actor_id, replication_profiles.dataset_id" {
return fmt.Errorf("replication profile for provider %s and datasetID %d already exists", p.ProviderActorID, p.DatasetID)
}
return fmt.Errorf("failed to save replication profile: %s", res.Error.Error())
}

return c.JSON(http.StatusOK, p)
})

replicationProfiles.DELETE("", func(c echo.Context) error {
var p core.ReplicationProfile
if err := c.Bind(&p); err != nil {
return fmt.Errorf("failed to parse request body: %s", err.Error())
}

if p.DatasetID == 0 || p.ProviderActorID == "" {
return fmt.Errorf("invalid replication profile ID")
}

var existingProfile core.ReplicationProfile
res := dldm.DB.Where("provider_actor_id = ? AND dataset_id = ?", p.ProviderActorID, p.DatasetID).First(&existingProfile)

if res.Error != nil {
return fmt.Errorf("replication profile not found: %s", res.Error)
}

deleteRes := dldm.DB.Delete(&existingProfile)
if deleteRes.Error != nil {
return fmt.Errorf("failed to delete replication profile: %s", deleteRes.Error.Error())
}

return c.JSON(http.StatusOK, fmt.Sprintf("replication profile with ProviderActorID %s and DatasetID %d deleted successfully", p.ProviderActorID, p.DatasetID))
})

replicationProfiles.PUT("", func(c echo.Context) error {
var updatedProfile core.ReplicationProfile
if err := c.Bind(&updatedProfile); err != nil {
return fmt.Errorf("failed to parse request body: %s", err.Error())
}

if updatedProfile.DatasetID == 0 || updatedProfile.ProviderActorID == "" {
return fmt.Errorf("invalid replication profile ID")
}

var existingProfile core.ReplicationProfile
res := dldm.DB.Where("provider_actor_id = ? AND dataset_id = ?", updatedProfile.ProviderActorID, updatedProfile.DatasetID).First(&existingProfile)

if res.Error != nil {
return fmt.Errorf("replication profile not found: %s", res.Error)
}

updateData := map[string]interface{}{
"unsealed": updatedProfile.Unsealed,
"indexed": updatedProfile.Indexed,
}

updateRes := dldm.DB.Model(&existingProfile).Updates(updateData)
if updateRes.Error != nil {
return fmt.Errorf("failed to update replication profile: %s", updateRes.Error.Error())
}

return c.JSON(http.StatusOK, updatedProfile)
})

}
Loading