Skip to content

Commit

Permalink
Merge pull request #16
Browse files Browse the repository at this point in the history
* Add destination authentication and object comparison logic.

* Add CLI command for cloning HIP objects between tenants

* Update dependencies in poetry.lock file

* Add support for cloning HIP objects in documentation.
  • Loading branch information
cdot65 authored Dec 19, 2024
1 parent a6c43f0 commit 615a488
Show file tree
Hide file tree
Showing 25 changed files with 1,928 additions and 1,236 deletions.
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ $ scm-clone --help
│ decryption-profiles Clone decryption profiles. │
│ dns-security-profiles Clone DNS Security profiles. │
│ edls Clone external dynamic lists. │
│ hip-objects Clone HIP objects. │
│ security-rules Clone security rules. │
│ service-groups Clone service groupss. │
│ services Clone services. │
Expand Down
1 change: 1 addition & 0 deletions docs/user-guide/python/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ available commands and their primary purposes.
| application-filters | Clone application filters |
| application-groups | Clone application groups |
| edls | Clone external dynamic lists |
| hip-objects | Clone HIP objects |
| services | Clone services |
| service-groups | Clone service groups |
| tags | Clone tag objects |
Expand Down
251 changes: 115 additions & 136 deletions poetry.lock

Large diffs are not rendered by default.

7 changes: 3 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
[tool.poetry]
name = "scm-config-clone"
version = "0.2.2"
version = "0.2.4"
description = "A command-line tool to clone configuration objects between Palo Alto Networks Strata Cloud Manager (SCM) tenants."
authors = ["Calvin Remsburg <calvin@cdot.io>"]
license = "Apache 2.0"
readme = "README.md"

[tool.poetry.dependencies]
python = "^3.10"
dynaconf = "^3.2.6"
typer = "^0.12.5"
typer = "^0.15.1"
setuptools = "^75.1.0"
pan-scm-sdk = "^0.3.4"
pan-scm-sdk = "^0.3.7"
tabulate = "^0.9.0"
pandas = "^2.2.3"
pyyaml = "^6.0.2"
Expand Down
1 change: 1 addition & 0 deletions scm_config_clone/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .commands.objects.application_filters import application_filters
from .commands.objects.application_group import application_groups
from .commands.objects.external_dynamic_lists import external_dynamic_lists
from .commands.objects.hip_objects import hip_objects
from .commands.objects.service import services
from .commands.objects.service_group import service_groups
from .commands.objects.tag import tags
Expand Down
200 changes: 89 additions & 111 deletions scm_config_clone/commands/objects/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
from scm.models.objects.address import AddressCreateModel, AddressResponseModel
from tabulate import tabulate

from scm_config_clone.utilities import load_settings, parse_csv_option
from scm_config_clone.utilities import (
compare_object_lists,
load_settings,
parse_csv_option,
)


def build_create_params(src_obj: AddressResponseModel, folder: str) -> Dict[str, Any]:
Expand Down Expand Up @@ -95,15 +99,13 @@ def addresses(
help="If set, commit the changes on the destination tenant after object creation.",
is_flag=True,
),
# Existing flag that already was present
auto_approve: bool = typer.Option(
None,
"--auto-approve",
"-A",
help="If set, skip the confirmation prompt and automatically proceed with creation.",
is_flag=True,
),
# New flags introduced
create_report: bool = typer.Option(
None,
"--create-report",
Expand Down Expand Up @@ -176,7 +178,6 @@ def addresses(
settings = load_settings(settings_file)

# Apply fallback logic: if a flag wasn't provided at runtime, use settings.yaml values
# If a flag is provided (not None), use the provided value; otherwise, use settings default.
auto_approve = settings["auto_approve"] if auto_approve is None else auto_approve
create_report = (
settings["create_report"] if create_report is None else create_report
Expand All @@ -197,7 +198,7 @@ def addresses(
exclude_snippets_list = parse_csv_option(exclude_snippets)
exclude_devices_list = parse_csv_option(exclude_devices)

# Authenticate and retrieve from source
# Authenticate with source
try:
source_creds = settings["source_scm"]
source_client = Scm(
Expand All @@ -214,115 +215,120 @@ def addresses(
logger.error(f"Unexpected error with source authentication: {e}")
raise typer.Exit(code=1)

# Retrieve address objects from the source
# Authenticate with destination
try:
destination_creds = settings["destination_scm"]
destination_client = Scm(
client_id=destination_creds["client_id"],
client_secret=destination_creds["client_secret"],
tsg_id=destination_creds["tenant"],
log_level=logging_level,
)
logger.info(
f"Authenticated with destination SCM tenant: {destination_creds['tenant']}"
)
except (AuthenticationError, KeyError) as e:
logger.error(f"Error authenticating with destination tenant: {e}")
raise typer.Exit(code=1)
except Exception as e:
logger.error(f"Unexpected error with destination authentication: {e}")
raise typer.Exit(code=1)

# Retrieve address objects from source
try:
source_addresses = Address(source_client, max_limit=5000)
address_objects = source_addresses.list(
source_objects = source_addresses.list(
folder=folder,
exact_match=True,
exclude_folders=exclude_folders_list,
exclude_snippets=exclude_snippets_list,
exclude_devices=exclude_devices_list,
)
logger.info(
f"Retrieved {len(address_objects)} address objects from source tenant folder '{folder}'."
f"Retrieved {len(source_objects)} address objects from source tenant folder '{folder}'."
)
except Exception as e:
logger.error(f"Error retrieving address objects from source: {e}")
raise typer.Exit(code=1)

# If not quiet_mode, display retrieved objects
if address_objects and not quiet_mode:
# Retrieve address objects from destination
try:
destination_addresses = Address(destination_client, max_limit=5000)
destination_objects = destination_addresses.list(
folder=folder,
exact_match=True,
exclude_folders=exclude_folders_list,
exclude_snippets=exclude_snippets_list,
exclude_devices=exclude_devices_list,
)
logger.info(
f"Retrieved {len(destination_objects)} address objects from destination tenant folder '{folder}'."
)
except Exception as e:
logger.error(f"Error retrieving address objects from destination: {e}")
raise typer.Exit(code=1)

# Compare and get the status information
comparison_results = compare_object_lists(
source_objects,
destination_objects,
)

if source_objects and not quiet_mode:
addr_table = []
for addr in address_objects:
if addr.ip_netmask:
addr_value = addr.ip_netmask
elif addr.fqdn:
addr_value = addr.fqdn
elif addr.ip_range:
addr_value = addr.ip_range
elif addr.ip_wildcard:
addr_value = addr.ip_wildcard
else:
addr_value = "Unknown Type"

addr_table.append(
[
addr.name,
addr.folder,
addr_value,
addr.description or "",
]
)
for result in comparison_results:
# 'x' if already configured else ''
status = "x" if result["already_configured"] else ""
addr_table.append([result["name"], status])

typer.echo(
tabulate(
addr_table,
headers=[
"Name",
"Folder",
"Value",
"Description",
],
headers=["Name", "Destination Status"],
tablefmt="fancy_grid",
)
)
elif not address_objects:
typer.echo("No address objects found in the source folder.")

# Prompt for confirmation if not auto-approved and objects found
if address_objects and not auto_approve:
# Prompt if not auto-approved and objects exist
if source_objects and not auto_approve:
proceed = typer.confirm(
"Do you want to proceed with creating these objects in the destination tenant?"
)
if not proceed:
typer.echo("Aborting cloning operation.")
raise typer.Exit(code=0)

# Authenticate with destination tenant
try:
dest_creds = settings["destination_scm"]
destination_client = Scm(
client_id=dest_creds["client_id"],
client_secret=dest_creds["client_secret"],
tsg_id=dest_creds["tenant"],
log_level=logging_level,
)
logger.info(
f"Authenticated with destination SCM tenant: {dest_creds['tenant']}"
)
except (AuthenticationError, KeyError) as e:
logger.error(f"Error authenticating with destination tenant: {e}")
raise typer.Exit(code=1)
except Exception as e:
logger.error(f"Unexpected error with destination authentication: {e}")
raise typer.Exit(code=1)
# Determine which objects need to be created (those not already configured)
already_configured_names = {
res["name"] for res in comparison_results if res["already_configured"]
}

objects_to_create = [
obj for obj in source_objects if obj.name not in already_configured_names
]

# Create address objects in destination
destination_addresses = Address(
destination_client,
max_limit=5000,
)
destination_addresses = Address(destination_client, max_limit=5000)
created_objs: List[AddressResponseModel] = []
error_objects: List[List[str]] = []

for src_obj in address_objects:
try:
create_params = build_create_params(
src_obj,
folder,
for src_obj in objects_to_create:
if dry_run:
logger.info(
f"Skipping creation of address object in destination (dry run): {src_obj.name}"
)
continue

if create_report:
with open("result.csv", "a") as f:
f.write(f"Address,{src_obj.name},{src_obj.folder}\n")

try:
create_params = build_create_params(src_obj, folder)
except ValueError as ve:
error_objects.append(
[
src_obj.name,
str(ve),
]
)
error_objects.append([src_obj.name, str(ve)])
continue

# If dry_run is True, we might skip actual creation in the future.
# For now, just proceed as normal until logic is implemented.
try:
new_obj = destination_addresses.create(create_params)
created_objs.append(new_obj)
Expand All @@ -333,46 +339,24 @@ def addresses(
NameNotUniqueError,
ObjectNotPresentError,
) as e:
error_objects.append([src_obj.name, str(e)])
error_type = type(e).__name__
error_objects.append([src_obj.name, error_type])
continue
except Exception as e:
error_objects.append([src_obj.name, str(e)])
except Exception: # noqa
error_objects.append([src_obj.name, "unknown error"])
continue

# If not quiet_mode, display results
# Display results if not quiet_mode
if created_objs and not quiet_mode:
typer.echo("\nSuccessfully created the following address objects:")
created_table = []
for obj in created_objs:
if obj.ip_netmask:
value = obj.ip_netmask
elif obj.fqdn:
value = obj.fqdn
elif obj.ip_range:
value = obj.ip_range
elif obj.ip_wildcard:
value = obj.ip_wildcard
else:
value = "Unknown Type"

created_table.append(
[
obj.name,
obj.folder,
value,
obj.description or "",
]
)
created_table.append([obj.name])

typer.echo(
tabulate(
created_table,
headers=[
"Name",
"Folder",
"Value",
"Description",
],
headers=["Name"],
tablefmt="fancy_grid",
)
)
Expand All @@ -382,10 +366,7 @@ def addresses(
typer.echo(
tabulate(
error_objects,
headers=[
"Object Name",
"Error",
],
headers=["Object Name", "Error"],
tablefmt="fancy_grid",
)
)
Expand Down Expand Up @@ -414,7 +395,4 @@ def addresses(
else:
logger.info("No new address objects were created, skipping commit.")

# If create_report is True, in the future we will append results to 'result.csv'
# For now, logic can be implemented later.

typer.echo("🎉 Address objects cloning completed successfully! 🎉")
Loading

0 comments on commit 615a488

Please sign in to comment.