Skip to content

Commit

Permalink
stream backups directly to s3
Browse files Browse the repository at this point in the history
  • Loading branch information
LobbyLobster committed Jun 25, 2024
1 parent b60f3f8 commit 6ff7e6b
Show file tree
Hide file tree
Showing 17 changed files with 635 additions and 700 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
.idea/
monodb-backup
bin/
295 changes: 120 additions & 175 deletions backup/backup.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,68 @@
package backup

import (
"io"
"monodb-backup/notify"
"os"
"path/filepath"
"strconv"
"strings"
"time"
)

func getDBList() (dbList []string) {
if params.Database == "" || params.Database == "postgresql" {
switch params.Database {
case "postgresql":
dbList = getPSQLList()
} else if params.Database == "mysql" {
case "mysql":
dbList = getMySQLList()
case "mssql":
dbList = getMSSQLList()
case "oracle":
return getOracleList()
default:
dbList = getPSQLList()
}
return
}

func dumpDB(db string, dst string) (dumpPath string, name string, err error) {
if params.Database == "" || params.Database == "postgresql" {
dumpPath, name, err = dumpPSQLDb(db, dst)
} else if params.Database == "mysql" {
dumpPath, name, err = dumpMySQLDb(db, dst)
func dumpAndUpload(db string, pipeWriters []*io.PipeWriter) error {
switch params.Database {
case "postgresql":
return dumpAndUploadPSQL(db, pipeWriters)
case "mysql":
return dumpAndUploadMySQL(db, pipeWriters)
default:
return dumpAndUploadPSQL(db, pipeWriters)
}
}

func dumpDB(db string, dst string) (string, string, error) {
switch params.Database {
case "postgresql":
return dumpPSQLDb(db, dst)
case "mysql":
return dumpMySQLDb(db, dst)
case "mssql":
return dumpMSSQLDB(db, dst)
//case "oracle":
// return dumpOracleDB(db, dst)
default:
return dumpPSQLDb(db, dst)
}
return
}

func Backup() {
logger.Info("monodb-backup job started.")
notify.SendAlarm("Database backup started.", false)
streamable := (params.Database == "" || params.Database == "postgresql" || (params.Database == "mysql" && !params.BackupAsTables)) && params.ArchivePass == ""

dateNow = rightNow{
day: time.Now().Format("Mon"),
hour: time.Now().Format("Mon-15"),
minute: time.Now().Format("Mon-15_04"),
now: time.Now().Format("2006-01-02-150405"),
}

if len(params.Databases) == 0 {
logger.Info("Getting database list...")
databases := getDBList()
Expand All @@ -57,190 +84,108 @@ func Backup() {
params.Databases = tmpDatabases
}

if params.Minio.S3FS.ShouldMount {
mountMinIO(params.Minio)
defer umountMinIO(params.Minio)

dst := params.Minio.S3FS.MountPath
if params.Minio.Path != "" {
dst = dst + "/" + params.Minio.Path
}
err := os.MkdirAll(dst+"/"+minioPath(), os.FileMode(0750))
if err != nil {
notify.SendAlarm("Couldn't create folder in MinIO at path: "+dst+" - Error: "+err.Error(), true)
logger.Fatal("Couldn't create folder in MinIO at path: " + dst + " - Error: " + err.Error())
return
}
if params.Rotation.Enabled {
dst = dst + "/" + minioPath()
}
if streamable {
for _, db := range params.Databases {
if !params.BackupAsTables {
filePath, fileName, err := dumpDB(db, dst)
if err != nil {
notify.SendAlarm("Problem during backing up "+db+" - Error: "+err.Error(), true)
err = os.Remove(filePath)
if err != nil {
logger.Error("Couldn't delete faulty dump file at " + filePath + " - Error: " + err.Error())
} else {
logger.Info("Faulty dump file at " + filePath + " successfully deleted.")
}
} else {
logger.Info("Successfully backed up database:" + db + " at " + filePath)
notify.SendAlarm("Successfully backed up "+db+" at "+filePath, false)

if params.Rotation.Enabled {
shouldRotate, name := rotate(db)
if shouldRotate {
var newDst string
if params.Minio.Path != "" {
newDst = params.Minio.Path
}
newDst = params.Minio.S3FS.MountPath + "/" + newDst
newDst = strings.TrimSuffix(newDst, "/")
err := os.MkdirAll(strings.TrimSuffix(newDst, "/")+"/"+rotatePath(), os.FileMode(0750))
if err != nil {
notify.SendAlarm("Couldn't create folder in MinIO at path: "+dst+" - Error: "+err.Error(), true)
logger.Fatal("Couldn't create folder in MinIO at path: " + dst + " - Error: " + err.Error())
return
}
extension := getFileExtension(fileName)
name = newDst + "/" + name + extension
_, err = copyFile(filePath, name)
if err != nil {
logger.Error("Couldn't create a copy of " + filePath + " for rotation\npath: " + name + "\n Error: " + err.Error())
notify.SendAlarm("Couldn't create a copy of "+filePath+" for rotation\npath: "+name+"\n Error: "+err.Error(), true)
} else {
logger.Info("Successfully created a copy of " + filePath + " for rotation\npath: " + name)
notify.SendAlarm("Successfully created a copy of "+filePath+" for rotation\npath: "+name, false)
}
}
}
}
uploadWhileDumping(db)
}
return
}

for _, db := range params.Databases {
dst := strings.TrimSuffix(params.BackupDestination, "/") + "/" + db
if params.BackupAsTables && db != "mysql" {
dumpPaths, names, err := dumpDBWithTables(db, dst)
if err != nil {
notify.SendAlarm("Problem during backing up "+db+" - Error: "+err.Error(), true)
} else {
dumpPaths, names, err := dumpDBWithTables(db, dst)
if err != nil {
notify.SendAlarm("Problem during backing up "+db+" - Error: "+err.Error(), true)
for _, filePath := range dumpPaths {
err = os.Remove(filePath)
if err != nil {
logger.Error("Couldn't delete dump file at " + filePath + " - Error: " + err.Error())
} else {
logger.Info("Dump file at " + filePath + " successfully deleted.")
}
}
} else {
for i, filePath := range dumpPaths {
fileName := names[i]
logger.Info("Successfully backed up database:" + db + " at " + filePath)
notify.SendAlarm("Successfully backed up "+db+" at "+filePath, false)

if params.Rotation.Enabled {
name := filepath.Base(fileName)
extension := getFileExtension(fileName)
shouldRotate, name := rotate(strings.TrimSuffix(name, extension))
newPath := filepath.Dir(name)
baseName := filepath.Base(name)
name = newPath + "/" + db + "/" + baseName
if shouldRotate {
var newDst string
if params.Minio.Path != "" {
newDst = params.Minio.Path
}
newDst = params.Minio.S3FS.MountPath + "/" + newDst
newDst = strings.TrimSuffix(newDst, "/")
err := os.MkdirAll(strings.TrimSuffix(newDst, "/")+"/"+rotatePath()+"/"+db, os.FileMode(0750))
if err != nil {
notify.SendAlarm("Couldn't create folder in MinIO at path: "+dst+" - Error: "+err.Error(), true)
logger.Fatal("Couldn't create folder in MinIO at path: " + dst + " - Error: " + err.Error())
return
}
name = newDst + "/" + name + extension
_, err = copyFile(filePath, name)
if err != nil {
logger.Error("Couldn't create a copy of " + filePath + " for rotation\npath: " + name + "\n Error: " + err.Error())
notify.SendAlarm("Couldn't create a copy of "+filePath+" for rotation\npath: "+name+"\n Error: "+err.Error(), true)
} else {
logger.Info("Successfully created a copy of " + filePath + " for rotation\npath: " + name)
notify.SendAlarm("Successfully created a copy of "+filePath+" for rotation\npath: "+name, false)
}
}
}
}
logger.Info("Successfully backed up database:" + db + " with its tables separately, at " + params.BackupDestination + "/" + db)
notify.SendAlarm("Successfully backed up "+db+" at "+params.BackupDestination+"/"+db, false)
for i, filePath := range dumpPaths {
name := names[i]
upload(name, db, filePath)
}
}
}
} else {
for _, db := range params.Databases {
if params.BackupAsTables && db != "mysql" {
dumpPaths, names, err := dumpDBWithTables(db, params.BackupDestination)
if err != nil {
notify.SendAlarm("Problem during backing up "+db+" - Error: "+err.Error(), true)
} else {
logger.Info("Successfully backed up database:" + db + " with its tables separately, at " + params.BackupDestination + "/" + db)
notify.SendAlarm("Successfully backed up "+db+" at "+params.BackupDestination+"/"+db, false)
for i, filePath := range dumpPaths {
name := names[i]
upload(name, db, filePath)
}
}
if params.RemoveLocal {
err = os.RemoveAll(params.BackupDestination + "/" + db)
if err != nil {
logger.Error("Couldn't delete dump file at " + params.BackupDestination + "/" + db + " - Error: " + err.Error())
} else {
logger.Info("Dump file at " + params.BackupDestination + "/" + db + " successfully deleted.")
}
}

} else {
filePath, name, err := dumpDB(db, dst)
if err != nil {
notify.SendAlarm("Problem during backing up "+db+" - Error: "+err.Error(), true)
} else {
filePath, name, err := dumpDB(db, params.BackupDestination)
if err != nil {
notify.SendAlarm("Problem during backing up "+db+" - Error: "+err.Error(), true)
} else {
logger.Info("Successfully backed up database:" + db + " at " + filePath)
notify.SendAlarm("Successfully backed up "+db+" at "+filePath, false)
logger.Info("Successfully backed up database:" + db + " at " + filePath)
notify.SendAlarm("Successfully backed up "+db+" at "+filePath, false)

upload(name, db, filePath)
}
if params.RemoveLocal {
err = os.Remove(filePath)
if err != nil {
logger.Error("Couldn't delete dump file at " + filePath + " - Error: " + err.Error())
} else {
logger.Info("Dump file at " + filePath + " successfully deleted.")
}
}
upload(name, db, filePath)
}
}
if params.RemoveLocal {
err := os.RemoveAll(dst)
if err != nil {
logger.Error("Couldn't delete dump file at " + params.BackupDestination + "/" + db + " - Error: " + err.Error())
} else {
logger.Info("Dump file at " + params.BackupDestination + "/" + db + " successfully deleted.")
}
}
}
logger.Info("monodb-backup job finished.")
notify.SendAlarm("monodb-backup job finished.", false)
}

func upload(name, db, filePath string) {
if params.S3.Enabled {
target := nameWithPath(name)
if params.S3.Path != "" {
target = params.S3.Path + "/" + target
}
uploadFileToS3(filePath, target, db)
func uploadWhileDumping(db string) {
var name string
if db == "mysql" {
name = nameWithPath(dumpName(db+"_users", params.Rotation, ""))
} else {
name = nameWithPath(dumpName(db, params.Rotation, ""))
}
switch params.Database {
case "postgresql":
name = name + ".dump"
case "mysql":
name = name + ".sql.gz"
default:
name = name + ".dump"
}
var pipeWriters []*io.PipeWriter
var uploadDone []chan error
for i, instance := range uploaders {
dumpPath := instance.instance.Path + "/" + name
pipeReader, pipeWriter := io.Pipe()
pipeWriters = append(pipeWriters, pipeWriter)
uploadDone = append(uploadDone, make(chan error))
go func(i int, instance uploaderStruct) {
err := uploadFileToS3("", dumpPath, db, pipeReader, &instance)
uploadDone[i] <- err
close(uploadDone[i])
}(i, instance)
}

if params.Minio.Enabled {
target := nameWithPath(name)
if params.Minio.Path != "" {
target = params.Minio.Path + "/" + target
err := dumpAndUpload(db, pipeWriters)
if err != nil {
notify.SendAlarm("Problem during backing up "+db+" - Error: "+err.Error(), true)
}
for _, writer := range pipeWriters {
writer.Close()
}
for i, channel := range uploadDone {
uploadErr := <-channel
if uploadErr != nil {
logger.Error(strconv.Itoa(i+1) + ") " + db + " - " + "Couldn't upload to S3: " + uploaders[i].instance.Endpoint + " - Error: " + err.Error())
notify.SendAlarm(strconv.Itoa(i+1)+") "+db+" - "+"Couldn't upload to S3: "+uploaders[i].instance.Endpoint+" - Error: "+err.Error(), true)
} else {
logger.Info(strconv.Itoa(i+1) + ") " + db + " - " + "Successfully uploaded to S3: " + uploaders[i].instance.Endpoint)
notify.SendAlarm(strconv.Itoa(i+1)+") "+db+" - "+"Successfully uploaded to S3: "+uploaders[i].instance.Endpoint, false)
}
uploadFileToMinio(filePath, target, db)
}
}

if params.SFTP.Enabled {
for _, target := range params.SFTP.Targets {
func upload(name, db, filePath string) {
switch params.BackupType.Type {
case "s3", "minio":
uploadToS3(filePath, name, db)
case "sftp":
for _, target := range params.BackupType.Info[0].Targets {
SendSFTP(filePath, name, db, target)
}
}
if params.Rsync.Enabled {
for _, target := range params.Rsync.Targets {
case "rsync":
for _, target := range params.BackupType.Info[0].Targets {
SendRsync(filePath, name, target)
}
}
Expand Down
Loading

0 comments on commit 6ff7e6b

Please sign in to comment.