Skip to content

Commit

Permalink
Add replicate functionality for Postgres and minor fixes
Browse files Browse the repository at this point in the history
The main logic of `replicate` is achieved by using `fork`
to create two new processes, one for `pg_dump` and one for
`pg_restore`. The commit also contains utilization of `log.h`
library and some fixes for better error handling.

Signed-off-by: Panagiotis Foliadis <pfoliadis@hotmail.com>
  • Loading branch information
panosfol committed Dec 13, 2023
1 parent ca2251b commit 9b9c161
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 14 deletions.
2 changes: 1 addition & 1 deletion include/db/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ typedef struct init_db_func_ptr_s {
struct db_t {
postgres_t *pg_conf;

void *host_conn;
void *origin_conn;
void *target_conn;
};

Expand Down
1 change: 1 addition & 0 deletions include/db/postgres.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include "db/db.h"

void construct_pg_password(char *, const char *);
int connect_pg(struct db_t *);
void close_pg(struct db_t *pg_db_t);
int replicate(struct db_t *pg_db_t, struct options *pg_options);
Expand Down
2 changes: 2 additions & 0 deletions src/db.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "db/db.h"
#include "db/postgres.h"

#include <stddef.h>
#include <stdlib.h>

Expand Down Expand Up @@ -28,6 +29,7 @@ int execute_db_operations(void)
ret = available_dbs[i]->connect(available_db);
if (ret != 0) {
available_dbs[i]->close(available_db);
return ret;
}
available_dbs[i]->replicate(available_db, NULL);
available_dbs[i]->close(available_db);
Expand Down
232 changes: 219 additions & 13 deletions src/postgres.c
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <sys/wait.h>

#include "libpq-fe.h"

#include "config.h"
#include "db/db.h"
#include "db/postgres.h"
#include "log.h"

extern config_t *yaml_config;

Expand All @@ -33,36 +37,238 @@ int construct_pg()
return 0;
}

/*
* connect_pg
*
* 0 Success
* -1 Failure to connect
*/

int connect_pg(struct db_t *pg_db_t)
{
int ret = 0;
// Initialize the fields that are to be used to connect to postgres
/*
* We should NULL the connection to avoid using `free` on an
* uninitialized connection.
*/
pg_db_t->origin_conn = NULL;
pg_db_t->target_conn = NULL;

// Initialize the fields that are to be used to connect to postgres.
const char *keywords[] = { "host", "user", "password",
"port", "dbname", 0 };
// Construct the array that will hold the values of the fields
const char *values[] = { pg_db_t->pg_conf->origin_host, pg_db_t->pg_conf->origin_user,
pg_db_t->pg_conf->origin_password,
pg_db_t->pg_conf->origin_port,
pg_db_t->pg_conf->origin_database };

pg_db_t->host_conn = PQconnectdbParams(keywords, values, 0);
if (PQstatus(pg_db_t->host_conn) != CONNECTION_OK) {
fprintf(stderr, "%s", PQerrorMessage(pg_db_t->host_conn));
// Construct the array that will hold the values of the fields.
const char *origin_values[] = { pg_db_t->pg_conf->origin_host,
pg_db_t->pg_conf->origin_user,
pg_db_t->pg_conf->origin_password,
pg_db_t->pg_conf->origin_port,
pg_db_t->pg_conf->origin_database };
const char *target_values[] = { pg_db_t->pg_conf->target_host,
pg_db_t->pg_conf->target_user,
pg_db_t->pg_conf->target_password,
pg_db_t->pg_conf->target_port,
pg_db_t->pg_conf->target_database };

// Connect to origin-database.
pg_db_t->origin_conn = PQconnectdbParams(keywords, origin_values, 0);
if (PQstatus(pg_db_t->origin_conn) != CONNECTION_OK) {
pr_info("Postgres origin-database connection: Failed!\n");
fprintf(stderr, "%s", PQerrorMessage(pg_db_t->origin_conn));
ret = -1;
return ret;
}
pr_info("Origin-database connection: Success!\n");

// Connect to target-database.
pg_db_t->target_conn = PQconnectdbParams(keywords, target_values, 0);
if (PQstatus(pg_db_t->target_conn) != CONNECTION_OK) {
pr_info("Target-database connection: Failed!\n");
fprintf(stderr, "%s", PQerrorMessage(pg_db_t->target_conn));
ret = -1;
return ret;
}
pr_info("Target-database connection: Success!\n");

return ret;
}

void close_pg(struct db_t *pg_db_t)
{
PQfinish(pg_db_t->host_conn);
PQfinish(pg_db_t->origin_conn);
PQfinish(pg_db_t->target_conn);
free(pg_db_t->pg_conf);
}

/*
* replicate
*
* 0 Success
* -1 Failure of`pg_dump` or `pg_restore`
* -2 Failure of creation of a new process by fork()
*/
int replicate(struct db_t *pg_db_t, struct options *pg_options)
{
printf("replicate\n");
int wstatus;
pid_t p_restore;
pid_t p_dump;

int dump_pass_size = strlen("PGPASSWORD=") +
strlen(pg_db_t->pg_conf->origin_password) + 1;
int restore_pass_size = strlen("PGPASSWORD=") +
strlen(pg_db_t->pg_conf->target_password) + 1;

/*
* These will be used as an environmental variable for `pg_dump` and `pg_restore`.
* The format of the passed variables is `PGPASSWORD=<password>`.
*/
char *dump_pass =
(char *)calloc(dump_pass_size, dump_pass_size * sizeof(char));
char *restore_pass = (char *)calloc(restore_pass_size,
restore_pass_size * sizeof(char));

/*
* The max length of the array is 261 because 255 is the max length of
* the acceptable filepath by Linux. Our string is of the format
* `PATH=/path/to/pg_command` therefore we also need 5 characters for
* `PATH=` + 1 for '\0'.
*/
char *pg_bin, prefixed_command_path[261] = "PATH=";

/*
* Set this as the default command for Debian-based distributions. The array
* should potentially hold the max length of the filepath because it might be
* overwritten if the user has specified the $PG_BIN environmental variable.
*/
char command_path[255] = "/usr/local/pgsql/bin/";
/*
* This will be used as the path for the output of `pg_dump` and the input
* of the `pg_restore`, and it will be deleted before we return from `replicate()`
*/
char *dump_path = "/tmp/backup.dump";

char *const dump_args[] = {
"pg_dump",
"-h",
(char *const)pg_db_t->pg_conf->origin_host,
"-F",
"custom",
"-p",
(char *const)pg_db_t->pg_conf->origin_port,
"-U",
(char *const)pg_db_t->pg_conf->origin_user,
"-d",
(char *const)pg_db_t->pg_conf->origin_database,
"-f",
dump_path,
0
};

char *const restore_args[] = {
"pg_restore",
"-h",
(char *const)pg_db_t->pg_conf->target_host,
"-p",
(char *const)pg_db_t->pg_conf->target_port,
"-U",
(char *const)pg_db_t->pg_conf->target_user,
"-d",
(char *const)pg_db_t->pg_conf->target_database,
dump_path,
0
};

// Construct the password by adding `PGPASSWORD=` prefix.
construct_pg_password(dump_pass,
(char *)pg_db_t->pg_conf->origin_password);
construct_pg_password(restore_pass,
(char *)pg_db_t->pg_conf->target_password);

pr_info("Checking if $PG_BIN environmental variable.\n");
pg_bin = getenv("PG_BIN");
if (pg_bin) {
pr_info("$PG_BIN=%s was found.\n", pg_bin);
strncat(prefixed_command_path, pg_bin, strlen(pg_bin));
strncpy(command_path, pg_bin, strlen(prefixed_command_path));
} else {
pr_info("$PG_BIN was not found, default path %s will be used.\n",
command_path);
strncat(prefixed_command_path, command_path,
strlen(command_path) + 1);
}

char *const dump_envp[] = { dump_pass, prefixed_command_path, 0 };
char *const restore_envp[] = { restore_pass, prefixed_command_path, 0 };

/*
* The replication process is achieved by using a fork() for `pg_dump`
* and another fork() for `pg_restore`.
*/
pr_info("Starting `pg_dump` process.\n");
p_dump = fork();
if (p_dump < 0) {
if (get_verbose()) {
pr_info("`pg_dump` process failed to start: %s. Replication process will be terminated.\n",
strerror(errno));
} else {
printf("%s\n", strerror(errno));
}
free(dump_pass);
free(restore_pass);
return -2;
} else if (p_dump == 0) {
execve(strncat(command_path, "pg_dump", strlen("pg_dump") + 1),
dump_args, dump_envp);
pr_info("`pg_dump` error: %s\n", strerror(errno));
exit(-1);
}
wait(&wstatus);
// Check the status of the forked process, anything other than 0 means failure.
if (wstatus != 0) {
pr_info("`pg_dump` was unsuccessful. Replication process will be terminated.\n");
free(dump_pass);
free(restore_pass);
return -1;
}

pr_info("Starting `pg_restore` process.\n");
p_restore = fork();
if (p_restore < 0) {
if (get_verbose()) {
pr_info("`pg_restore` process failed to start: %s. Replication process will be terminated.\n",
strerror(errno));
} else {
printf("%s\n", strerror(errno));
}
free(dump_pass);
free(restore_pass);
return -2;
} else if (p_restore == 0) {
execve(strncat(command_path, "pg_restore",
strlen("pg_restore") + 1),
restore_args, restore_envp);
pr_info("`pg_restore` error: %s\n", strerror(errno));
exit(-1);
}
wait(&wstatus);
// Check the status of the forked process, anything other than 0 means failure.
if (wstatus != 0) {
pr_info("`pg_restore` was unsuccessful. Replication process will be terminated.\n");
free(dump_pass);
free(restore_pass);
return -1;
}

free(dump_pass);
free(restore_pass);
pr_info("Database Replication was successful!\n");
// Remove the `.dump` file that was created for the replication
remove(dump_path);

return 0;
}
void construct_pg_password(char *env_pass, const char *config_pass)
{
strncpy(env_pass, "PGPASSWORD=", strlen("PGPASSWORD=") + 1);
strncat(env_pass, config_pass, strlen(config_pass));
}

ADD_FUNC(construct_pg);

0 comments on commit 9b9c161

Please sign in to comment.