Skip to content

Commit

Permalink
refactored to functional style
Browse files Browse the repository at this point in the history
  • Loading branch information
andgineer committed Apr 27, 2024
1 parent 14905fa commit 904b7be
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 19 deletions.
53 changes: 37 additions & 16 deletions src/bitwarden_import_msecure/msecure_to_bitwarden.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

import csv
import json
from functools import reduce
from pathlib import Path
from typing import Dict
from typing import Dict, Optional, List

from rich.console import Console
from rich.panel import Panel
Expand Down Expand Up @@ -37,6 +38,39 @@ def patch(input_path: Path, output_path: Path) -> None:
- import bitwarden_new.json to Bitwarden as Bitwarden json file
- clean up mSecure export file, `bitwarden_new.json` and it's backup
"""

def get_row_dict(csv_row: List[str]) -> Optional[Dict[str, str]]:
"""Get dict from mSecure CSV line."""
if csv_row and not csv_row[0].startswith("mSecure"):
row = import_msecure_row(csv_row, False)
if row and row["type"] == "login":
return {"type": "login", "login_uri": row["login_uri"], "name": row["name"]}
return None

def filter_logins_with_url(rows: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Filter logins with non-empty URL."""
return list(
filter(lambda x: x is not None and x["type"] == "login" and x["login_uri"], rows)
)

def reduce_to_uri_dict(rows: List[Dict[str, str]]) -> Dict[str, str]:
"""Reduce list of logins' dicts to dict with login name as key and login URL as value."""

def reduce_reporting_name_collisions(
acc: Dict[str, str], row: Dict[str, str]
) -> Dict[str, str]:
if row["name"] in acc and acc[row["name"]] != row["login_uri"]:
print(
f"Name collision: item `{row['name']}`, has different URLs: "
f"`{acc[row['name']]}` and `{row['login_uri']}`.\n"
f"Using first one."
)
else:
acc[row["name"]] = row["login_uri"]
return acc

return reduce(reduce_reporting_name_collisions, rows, {})

if not output_path.exists():
click.echo(f"Output file `{output_path}` does not exist.")
raise click.Abort()
Expand All @@ -46,23 +80,10 @@ def patch(input_path: Path, output_path: Path) -> None:
with output_path.open("r+") as file:
output_data = json.load(file)

uri_dict: Dict[str, str] = {}
with input_path.open(newline="", encoding="utf-8") as infile:
reader = csv.reader(infile, delimiter=",")
for row in reader:
if row and not row[0].startswith("mSecure"):
data = import_msecure_row(row, False)
if data["type"] == "login":
uri = data["login_uri"]
if uri:
if data["name"] in uri_dict:
print(
f"Name collision: item `{data['name']}`, "
f"has different URLs: `{uri_dict[data['name']]}` "
f"and `{uri}`. Using first one."
)
else:
uri_dict[data["name"]] = uri
rows = [row for row in map(get_row_dict, reader) if row is not None]
uri_dict = reduce_to_uri_dict(filter_logins_with_url(rows))

replaced = 0
for item in output_data.get("items", []):
Expand Down
4 changes: 1 addition & 3 deletions tests/test_bitwarden_import_msecure.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,5 @@ def test_overwrite_without_force(runner, tmp_path):
'--format', 'json' # specifying format explicitly if necessary
])

# Checking the output and exit code
expected_message = "already exists. Use --force to overwrite."
assert result.exit_code == 1, f"Expected exit code 1, got {result.exit_code}"
assert expected_message in result.output.replace("\n", ""), f"Expected error message not found in output: {result.output}"
assert "--force" in result.output.replace("\n", ""), f"Expected error message not found in output: {result.output}"
2 changes: 2 additions & 0 deletions tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ def test_bitwarden_patch(runner, tmpdir, msecure_export, bitwarden_broken_file,
output_file.write(bitwarden_broken_file.read_text(encoding="utf8"))

result = runner.invoke(bitwarden_import_msecure, [str(input_file), str(output_file), "--patch"])
if result.exit_code != 0:
print(result.stdout)
assert result.exit_code == 0

if UPDATE_EXPECTED_OUTPUT:
Expand Down

0 comments on commit 904b7be

Please sign in to comment.