Skip to content

Commit

Permalink
Support optional merge function during copy (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
dazuma authored and theacodes committed Jul 31, 2018
1 parent b5690aa commit d80526c
Showing 1 changed file with 46 additions and 10 deletions.
56 changes: 46 additions & 10 deletions synthtool/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from pathlib import Path
import shutil
from typing import Iterable, Union
from typing import Callable, Iterable, Union
import os
import re
import sys
Expand Down Expand Up @@ -65,8 +65,36 @@ def _filter_files(paths: Iterable[Path]) -> Iterable[Path]:
return (path for path in paths if path.is_file())


def _merge_file(
source_path: Path, dest_path: Path, merge: Callable[[str, str, Path], str]
):
"""
Writes to the destination the result of merging the source with the
existing destination contents, using the given merge function.
The merge function must take three arguments: the source contents, the
old destination contents, and a Path to the file to be written.
"""

with source_path.open("r") as source_file:
source_text = source_file.read()

with dest_path.open("r+") as dest_file:
dest_text = dest_file.read()

final_text = merge(source_text, dest_text, dest_path)

if final_text != dest_text:
dest_file.seek(0)
dest_file.write(final_text)
dest_file.truncate()


def _copy_dir_to_existing_dir(
source: Path, destination: Path, excludes: ListOfPathsOrStrs = None
source: Path,
destination: Path,
excludes: ListOfPathsOrStrs = None,
merge: Callable[[str, str, Path], str] = None,
) -> bool:
"""
copies files over existing files to an existing directory
Expand All @@ -81,19 +109,23 @@ def _copy_dir_to_existing_dir(
for root, _, files in os.walk(source):
for name in files:
rel_path = str(Path(root).relative_to(source))
dest_dir = os.path.join(str(destination), rel_path)
dest_path = os.path.join(dest_dir, name)
dest_dir = destination / rel_path
dest_path = dest_dir / name
exclude = [
e
for e in excludes
if (
Path(e).relative_to(".") == Path(dest_path)
or Path(e).relative_to(".") == Path(dest_dir)
Path(e).relative_to(".") == dest_path
or Path(e).relative_to(".") == dest_dir
)
]
if not exclude:
os.makedirs(dest_dir, exist_ok=True)
shutil.copyfile(os.path.join(root, name), dest_path)
os.makedirs(str(dest_dir), exist_ok=True)
source_path = Path(os.path.join(root, name))
if merge is not None and dest_path.is_file():
_merge_file(source_path, dest_path, merge)
else:
shutil.copyfile(str(source_path), str(dest_path))
copied = True

return copied
Expand All @@ -103,6 +135,7 @@ def move(
sources: ListOfPathsOrStrs,
destination: PathOrStr = None,
excludes: ListOfPathsOrStrs = None,
merge: Callable[[str, str, Path], str] = None,
) -> bool:
"""
copy file(s) at source to current directory.
Expand All @@ -125,11 +158,14 @@ def move(
excludes = []
if source.is_dir():
copied = copied or _copy_dir_to_existing_dir(
source, canonical_destination, excludes=excludes
source, canonical_destination, excludes=excludes, merge=merge
)
elif source not in excludes:
# copy individual file
shutil.copy2(source, canonical_destination)
if merge is not None and canonical_destination.is_file():
_merge_file(source, canonical_destination, merge)
else:
shutil.copy2(source, canonical_destination)
copied = True

if not copied:
Expand Down

0 comments on commit d80526c

Please sign in to comment.