Skip to content

Commit

Permalink
re-implement import-sql using Python
Browse files Browse the repository at this point in the history
  • Loading branch information
nyurik committed Oct 2, 2021
1 parent 7655ca2 commit 28bb5af
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 33 deletions.
49 changes: 16 additions & 33 deletions bin/import-sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env bash
set -o errexit
set -o errexit # set -e
set -o pipefail
set -o nounset
set -o nounset # set -u
shopt -s nullglob

# For backward compatibility, allow both PG* and POSTGRES_* forms,
Expand All @@ -13,69 +13,52 @@ export PGUSER="${POSTGRES_USER:-${PGUSER?}}"
export PGPASSWORD="${POSTGRES_PASSWORD:-${PGPASSWORD?}}"
export PGPORT="${POSTGRES_PORT:-${PGPORT:-5432}}"

PSQL="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)/run-psql"

function exec_psql_file() {
local file_name="$1"
# Allows additional parameters to be passed to psql
# For example, PSQL_OPTIONS='-a -A' would echo everything and disable output alignment
# Using eval allows complex cases with quotes, like PSQL_OPTIONS=" -a -c 'select ...' "
eval "local psql_opts=(${PSQL_OPTIONS:-})"

echo "Importing $file_name (md5 $(md5sum < "$file_name") $(wc -l < "$file_name") lines) into Postgres..."

# shellcheck disable=SC2154
psql \
-v ON_ERROR_STOP="1" \
-c '\timing on' \
-f "$file_name" \
"${psql_opts[@]}"
}

function import_all_sql_files() {
local dir="$1"
local sql_file

if [[ -d "$dir/parallel" ]]; then
# Assume this dir may contain run_first.sql, parallel/*.sql, and run_last.sql
# use parallel execution
if [[ -f "$dir/run_first.sql" ]]; then
exec_psql_file "$dir/run_first.sql"
"${PSQL}" "$dir/run_first.sql"
else
echo "File $dir/run_first.sql not found, skipping"
fi

# Run import-sql script in parallel, up to MAX_PARALLEL_PSQL processes at the same time
: "${MAX_PARALLEL_PSQL:=5}"
echo "Importing $(find "$dir/parallel" -name "*.sql" | wc -l) sql files from $dir/parallel/, up to $MAX_PARALLEL_PSQL files at the same time"
find "$dir/parallel" -name "*.sql" -print0 | xargs -0 -I{} -P "$MAX_PARALLEL_PSQL" sh -c "\"$0\" \"{}\" || exit 255"
echo "Finished importing sql files matching '$dir/parallel/*.sql'"
"${PSQL}" --parallel "${MAX_PARALLEL_PSQL:=5}" "$dir/parallel"

if [[ -f "$dir/run_last.sql" ]]; then
exec_psql_file "$dir/run_last.sql"
"${PSQL}" "$dir/run_last.sql"
else
echo "File $dir/run_last.sql not found, skipping"
fi
else
for sql_file in "$dir"/*.sql; do
exec_psql_file "$sql_file"
done
"${PSQL}" "$dir"
fi
}

# If there are no arguments, imports everything,
# otherwise the first argument is the name of a file to be imported.
if [[ $# -eq 0 ]]; then
if [[ "${SQL_TOOLS_DIR:-}" == "" ]]; then
echo "ENV variable SQL_TOOLS_DIR is not set. It should contain directory with .sql files, e.g. postgis-vt-util.sql and language.sql"
if [[ "${SQL_TOOLS_DIR:-}" == "" ]] || [[ ! -d "${SQL_TOOLS_DIR}" ]]; then
echo "ENV variable SQL_TOOLS_DIR is not set or there are no files. It should contain directory with .sql files, e.g. postgis-vt-util.sql and language.sql"
exit 1
else
echo "SQL_TOOLS_DIR=$SQL_TOOLS_DIR"
fi
if [[ "${SQL_DIR:-}" == "" ]]; then
echo "ENV variable SQL_DIR is not set. It should contain directory with .sql files, e.g. tileset.sql, or run_first.sql, run_last.sql, and parallel/*.sql"
if [[ "${SQL_DIR:-}" == "" ]] || [[ ! -d "${SQL_DIR}" ]]; then
echo "ENV variable SQL_DIR is not set or there are no files. It should contain directory with .sql files, e.g. tileset.sql, or run_first.sql, run_last.sql, and parallel/*.sql"
exit 1
else
echo "SQL_DIR=$SQL_DIR"
fi

import_all_sql_files "$SQL_TOOLS_DIR" # import postgis-vt-util.sql and all other tools SQL files from /sql
import_all_sql_files "$SQL_DIR" # import compiled tileset.sql
else
exec_psql_file "$1"
"${PSQL}" "$1"
fi
189 changes: 189 additions & 0 deletions bin/run-psql
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
#!/usr/bin/env python
"""
Usage:
run-psql <path>...
[--parallel=<num> [--on-err-stop | --on-err-interrupt]]
[--pghost=<host>] [--pgport=<port>] [--dbname=<db>]
[--user=<user>] [--password=<password>] [--verbose]
[--print-err=<num>] [-- <psql-args>...]
run-psql --help
run-psql --version
Options:
<path> One or more SQL files or directories to run using psql.
<psql-args> Override default psql parameters.
-p --parallel=<num> Run all files/dirs specified with <path>... in parallel,
up to <num> at the same time. [default: 1]
-e --print-err=<num> If psql exits with an error, print last <num> output lines
after all other psql imports finish. Disabled if 0. [default: 10]
--on-err-stop If psql fails during parallel run, wait for other files to finish,
but do not start any files that have not been started yet.
--on-err-interrupt Same as --on-err-stop, but also sends an SIGINT to all running psql.
-v --verbose Print additional debugging information
--help Show this screen.
--version Show version.
PostgreSQL Options:
-h --pghost=<host> Postgres hostname. By default uses PGHOST env or "localhost" if not set.
-P --pgport=<port> Postgres port. By default uses PGPORT env or "5432" if not set.
-d --dbname=<db> Postgres db name. By default uses PGDATABASE env or "openmaptiles" if not set.
-U --user=<user> Postgres user. By default uses PGUSER env or "openmaptiles" if not set.
--password=<password> Postgres password. By default uses PGPASSWORD env or "openmaptiles" if not set.
These legacy environment variables should not be used, but they are still supported:
POSTGRES_HOST, POSTGRES_PORT, POSTGRES_DB, POSTGRES_USER, POSTGRES_PASSWORD
"""
import collections
import os
from dataclasses import dataclass
from hashlib import md5
from multiprocessing import Pool, Value
from pathlib import Path
from select import select
from subprocess import Popen, PIPE, list2cmdline
from sys import stdout, stderr
from typing import List, Optional

# noinspection PyProtectedMember
from docopt import docopt, DocoptExit

import openmaptiles
from openmaptiles.pgutils import parse_pg_args


@dataclass
class Params:
print_on_err: int
psql_args: Optional[List[str]]
pghost: str
pgport: str
dbname: str
user: str
password: str
verbose: bool


stop_value: Optional[Value] = None


def last_lines(output: List[bytes], count: int, name: str) -> str:
if output and count > 0:
text = b''.join(output).decode('utf-8').strip()
if text:
lines = text.split('\n')[-count:]
out = '\n'.join(lines)
return f'\n###### Last {len(lines)} lines of {name}:\n{out}'
return ''


def run_psql(file: Path, params: Params):
if stop_value.value != 0:
print(f'Skipping {file}', file=stderr)
return None

cmd = ['stdbuf', '-oL', '-e0', 'psql', '-f', file]
if params.psql_args is None:
cmd.extend(['-v', 'ON_ERROR_STOP=1', '-c', r'\timing on'])
else:
cmd.extend(params.psql_args)

env = {
'PGHOST': params.pghost,
'PGDATABASE': params.dbname,
'PGUSER': params.user,
'PGPASSWORD': params.password,
'PGPORT': params.pgport,
}

sql_content = file.read_text()
md5sum = md5(sql_content).hexdigest()
lines = sql_content.count('\n')
print(f'Importing {file} (md5 {md5sum} {lines} lines) into Postgres...')
if params.verbose:
print(f' {list2cmdline(cmd)}')
print('Environment Variables:')
for k, v in sorted(env.items()):
print(f' {k}={v}')

stderr_buf = []
with Popen(cmd, stdout=PIPE, stderr=PIPE, env=env) as proc:
readable = {
proc.stdout.fileno(): stdout.buffer,
proc.stderr.fileno(): stderr.buffer,
}
while readable:
for fd in select(readable, [], [])[0]:
data = os.read(fd, 1024) # read available
if not data:
del readable[fd]
else:
if params.print_on_err > 0 and fd == proc.stderr.fileno():
stderr_buf.append(data)
readable[fd].write(data)
readable[fd].flush()
# there should be no more output, should be safe to wait
exit_code = proc.wait()
if exit_code == 0:
print(f'Finished importing {file}...')
return None

stop_value.value = 1
result = f'File {file} failed with error {exit_code}'
print(result)
return result + last_lines(stderr_buf, params.print_on_err, 'STDERR')


def main(args):
pghost, pgport, dbname, user, password = parse_pg_args(args)
paths = args['<path>']
if '--' in paths:
pos = paths.index('--')
psql_args = paths[pos + 1:]
paths = paths[:pos]
else:
psql_args = None
params = Params(int(args['--print-err']), psql_args, pghost, pgport, dbname, user, password, args['--verbose'])

paths = [Path(v).resolve() for v in paths]
dups = [vv for vv, cnt in collections.Counter((str(v) for v in paths)).items() if cnt > 1]
if dups:
raise DocoptExit(f'ERROR: Duplicate paths: [{"], [".join(dups)}]')
missing = []
files = []
for path in paths:
if not path.exists():
missing.append(path)
elif path.is_dir():
for file in path.glob('*.sql'):
if file.is_file():
files.append(file)
else:
files.append(path)
if missing:
raise DocoptExit(f'ERROR: Path does not exist: [{"], [".join(str(v) for v in missing)}]')
if not files:
# For compatibility, allow empty dir import
print('No .sql files were found')
exit(0)

global stop_value
stop_value = Value('b', 0)

parallel = int(args['--parallel'])
with Pool(processes=parallel) as pool:
print(f'Importing {len(files)} files...')
tasks = [(file, params) for file in sorted(files)]
async_result = pool.starmap_async(run_psql, tasks, chunksize=1)
results = async_result.get()

if stop_value.value != 0:
if parallel > 1:
print('-------------------------- ERROR SUMMARY ---------------------------------')
print('\n'.join(v for v in results if v))
exit(1)
else:
print('All SQL files successfully executed')


if __name__ == '__main__':
main(docopt(__doc__, version=openmaptiles.__version__))
1 change: 1 addition & 0 deletions tests/sql/test-sql.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ done
sleep 1

# Import all pre-required SQL code
mkdir -p "$SQL_DIR"
source import-sql

echo "++++++++++++++++++++++++"
Expand Down

0 comments on commit 28bb5af

Please sign in to comment.