Skip to content

Commit

Permalink
Merge pull request #10 from MinaFoundation/local-filesystem-storage
Browse files Browse the repository at this point in the history
Local filesystem storage
  • Loading branch information
piotr-iohk authored Oct 31, 2023
2 parents 5795571 + 3724934 commit f7a5575
Show file tree
Hide file tree
Showing 14 changed files with 319 additions and 56 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ result/
# Dependency directories (remove the comment below to include it)
# vendor/

app_config.json
app_config*.json
aws_creds.json
minasheets.json
2 changes: 1 addition & 1 deletion scripts/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ case "$1" in
;;
integration-test)
cd src/integration_tests
$GO test -v
$GO test -v --timeout 30m
;;
docker)
if [[ "$TAG" == "" ]]; then
Expand Down
4 changes: 3 additions & 1 deletion src/cmd/delegation_backend/main_bpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ func main() {
}
} else if appCfg.LocalFileSystem != nil {
log.Infof("storage backend: Local File System")
// future implementation of local file system storage
app.Save = func(objs ObjectsToSave) {
LocalFileSystemSave(objs, appCfg.LocalFileSystem.Path, log)
}
} else if appCfg.Database != nil {
log.Infof("storage backend: Database")
// future implementation of database storage
Expand Down
25 changes: 25 additions & 0 deletions src/delegation_backend/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"encoding/json"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"time"

Expand Down Expand Up @@ -54,6 +56,29 @@ func (ctx *AwsContext) S3Save(objs ObjectsToSave) {
}
}

func LocalFileSystemSave(objs ObjectsToSave, directory string, log logging.StandardLogger) {
for path, bs := range objs {
fullPath := filepath.Join(directory, path)

// Check if file exists
if _, err := os.Stat(fullPath); !os.IsNotExist(err) {
log.Warnf("file already exists: %s", fullPath)
continue // skip to the next object
}

err := os.MkdirAll(filepath.Dir(fullPath), os.ModePerm)
if err != nil {
log.Warnf("Error creating directories for %s: %v", fullPath, err)
continue // skip to the next object
}

err = os.WriteFile(fullPath, bs, 0644)
if err != nil {
log.Warnf("Error writing to file %s: %v", fullPath, err)
}
}
}

type ObjectsToSave map[string][]byte

type AwsContext struct {
Expand Down
22 changes: 13 additions & 9 deletions src/integration_tests/aws_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func getS3Service(config delegation_backend.AppConfig) *s3.S3 {
return s3.New(sess)
}

func emptyIntegrationTestFolder(config delegation_backend.AppConfig) error {
func emptyS3IntegrationTestFolder(config delegation_backend.AppConfig) error {
log.Printf("Emptying AWS S3 integration_test folder")

bucketName := getAWSBucketName(config)
Expand Down Expand Up @@ -123,19 +123,23 @@ func waitUntilS3BucketHasBlocksAndSubmissions(config delegation_backend.AppConfi
hasBlocks = false
hasSubmissionsForToday = false

// Check the objects
for _, object := range objects.Contents {
key := *object.Key
if strings.HasPrefix(key, folderPrefix+"blocks/") && strings.HasSuffix(key, ".dat") {
hasBlocks = true
}
if strings.HasPrefix(key, folderPrefix+"submissions/"+currentDate+"/") && strings.HasSuffix(key, ".json") {
hasSubmissionsForToday = true
if len(objects.Contents) > 1 {
log.Printf("Found objects in the S3 bucket. Checking for blocks and submissions...")
// Check the objects
for _, object := range objects.Contents {
key := *object.Key
if strings.HasPrefix(key, folderPrefix+"blocks/") && strings.HasSuffix(key, ".dat") {
hasBlocks = true
}
if strings.HasPrefix(key, folderPrefix+"submissions/"+currentDate+"/") && strings.HasSuffix(key, ".json") {
hasSubmissionsForToday = true
}
}
}

// If both blocks and submissions for today are found, return
if hasBlocks && hasSubmissionsForToday {
log.Printf("Found blocks and submissions for today")
return nil
}
}
Expand Down
81 changes: 61 additions & 20 deletions src/integration_tests/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package integration_tests

import (
"fmt"
"log"
"os"
"os/exec"
"strings"
)

const (
Expand All @@ -12,17 +14,48 @@ const (
GENESIS_FILE = TEST_DATA_FOLDER + "/topology/genesis_ledger.json"
TOPOLOGY_FILE = TEST_DATA_FOLDER + "/topology/topology.json"

APP_CONFIG_FILE = TEST_DATA_FOLDER + "/topology/uptime_service_config/app_config.json"
AWS_CREDS_FILE = TEST_DATA_FOLDER + "/topology/uptime_service_config/aws_creds.json"
MINASHEETS_FILE = TEST_DATA_FOLDER + "/topology/uptime_service_config/minasheets.json"
UPTIME_SERVICE_CONFIG_DIR = TEST_DATA_FOLDER + "/topology/uptime_service_config"
APP_CONFIG_FILE = UPTIME_SERVICE_CONFIG_DIR + "/app_config.json"

TIMEOUT_IN_S = 600
)

var uptime_service_config_files = []string{
APP_CONFIG_FILE,
AWS_CREDS_FILE,
MINASHEETS_FILE,
func getDirFiles(dir string, suffix string) ([]string, error) {
files, err := os.ReadDir(dir)
if err != nil {
return nil, err
}

var filteredFiles []string
for _, f := range files {
if strings.HasSuffix(f.Name(), suffix) {
absolutePath := dir + "/" + f.Name()
filteredFiles = append(filteredFiles, absolutePath)
}
}

return filteredFiles, nil
}

func getGpgFiles(dir string) ([]string, error) {
return getDirFiles(dir, ".gpg")
}

func getJsonFiles(dir string) ([]string, error) {
return getDirFiles(dir, ".json")
}

// copy app_config_*.json to app_config.json
func setAppConfig(config_type string) error {
conf_file := "/app_config_" + config_type + ".json"
log.Printf("Setting %s as %s...\n", conf_file, APP_CONFIG_FILE)

err := copyFile(UPTIME_SERVICE_CONFIG_DIR+conf_file, APP_CONFIG_FILE)
if err != nil {
return fmt.Errorf("Error copying %s: %s\n", APP_CONFIG_FILE, err)
}

return nil
}

func encodeUptimeServiceConf() error {
Expand All @@ -31,26 +64,30 @@ func encodeUptimeServiceConf() error {
return fmt.Errorf("Error: UPTIME_SERVICE_SECRET environment variable not set")
}

for _, file := range uptime_service_config_files {
fmt.Printf(">> Encoding %s...\n", file)
jsonFiles, err := getJsonFiles(UPTIME_SERVICE_CONFIG_DIR)
if err != nil {
return err
}
for _, json_file := range jsonFiles {
log.Printf(">> Encoding %s...\n", json_file)

// Construct the gpg command
cmd := exec.Command(
"gpg",
"--pinentry-mode", "loopback",
"--passphrase", fixturesSecret,
"--symmetric",
"--output", fmt.Sprintf("%s.gpg", file),
file,
"--output", fmt.Sprintf("%s.gpg", json_file),
json_file,
)

// Execute and get output
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("Error encoding %s: %s\n", file, err)
return fmt.Errorf("Error encoding %s: %s\n", json_file, err)
}

fmt.Println(string(out))
log.Println(string(out))
}

return nil
Expand All @@ -62,24 +99,28 @@ func decodeUptimeServiceConf() error {
return fmt.Errorf("Error: UPTIME_SERVICE_SECRET environment variable not set")
}

for _, file := range uptime_service_config_files {
gpg_file := fmt.Sprintf("%s.gpg", file)
gpgFiles, err := getGpgFiles(UPTIME_SERVICE_CONFIG_DIR)
if err != nil {
return err
}

for _, gpg_file := range gpgFiles {
json_file := strings.TrimSuffix(gpg_file, ".gpg")
// skip if file exists
if _, err := os.Stat(file); err == nil {
fmt.Printf(">> Skipping decoding %s... JSON file already exists.\n", gpg_file)
if _, err := os.Stat(json_file); err == nil {
log.Printf(">> Skipping decoding %s... JSON file already exists.\n", gpg_file)
continue
}

fmt.Printf(">> Decoding %s...\n", gpg_file)
log.Printf(">> Decoding %s...\n", gpg_file)

// Construct the gpg command
cmd := exec.Command(
"gpg",
"--pinentry-mode", "loopback",
"--yes",
"--passphrase", fixturesSecret,
"--output", file,
"--output", json_file,
"--decrypt", gpg_file,
)

Expand All @@ -89,7 +130,7 @@ func decodeUptimeServiceConf() error {
return fmt.Errorf("Error decoding %s: %s\n", gpg_file, err)
}

fmt.Println(string(out))
log.Println(string(out))
}

return nil
Expand Down
118 changes: 118 additions & 0 deletions src/integration_tests/filesystem_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package integration_tests

import (
"fmt"
"io"
"log"
"os"
"path/filepath"
"strings"
"time"
)

func copyFile(src, dst string) error {
srcFile, err := os.Open(src)
if err != nil {
return err
}
defer srcFile.Close()

dstFile, err := os.Create(dst)
if err != nil {
return err
}
defer dstFile.Close()

_, err = io.Copy(dstFile, srcFile)
if err != nil {
return err
}

return dstFile.Sync()
}

func emptyLocalFilesystemStorage(folderPath string) error {
log.Printf("Emptying local filesystem folder: %s", folderPath)

if _, err := os.Stat(folderPath); os.IsNotExist(err) {
// Folder does not exist; nothing to do
return nil
}

items, err := os.ReadDir(folderPath)
if err != nil {
return err
}

for _, item := range items {
fullPath := folderPath + "/" + item.Name()
if item.IsDir() {
err = os.RemoveAll(fullPath)
} else {
err = os.Remove(fullPath)
}
if err != nil {
return err
}
}

return nil
}

func waitUntilLocalStorageHasBlocksAndSubmissions(directory string) error {
log.Printf("Waiting for blocks and submissions to appear in the local directory: %s", directory)

hasBlocks := false
hasSubmissionsForToday := false
currentDate := time.Now().Format("2006-01-02") // YYYY-MM-DD format

timeout := time.After(TIMEOUT_IN_S * time.Second)
tick := time.Tick(5 * time.Second)

for {
select {
case <-timeout:
return fmt.Errorf("Timeout reached while waiting for local storage contents")
case <-tick:
items, err := os.ReadDir(directory)
if err != nil {
return err
}
// Reset the checks
hasBlocks = false
hasSubmissionsForToday = false

if len(items) > 0 {
log.Print("Found files in the local storage directory. Checking for blocks and submissions...")

// Check for blocks
blocksPath := filepath.Join(directory, "blocks")
if items, err := os.ReadDir(blocksPath); err == nil {
for _, item := range items {
if strings.HasSuffix(item.Name(), ".dat") {
hasBlocks = true
break
}
}
}

// Check for submissions
submissionsPathForToday := filepath.Join(directory, "submissions", currentDate)
if items, err := os.ReadDir(submissionsPathForToday); err == nil {
for _, item := range items {
if strings.HasSuffix(item.Name(), ".json") && strings.Contains(item.Name(), currentDate) {
hasSubmissionsForToday = true
break
}
}
}
}

// If both blocks and submissions for today are found, return
if hasBlocks && hasSubmissionsForToday {
log.Printf("Found blocks and submissions for today")
return nil
}
}
}
}
Loading

0 comments on commit f7a5575

Please sign in to comment.