Skip to content

Commit

Permalink
Refactored to use betterdict (D)
Browse files Browse the repository at this point in the history
  • Loading branch information
caseneuve committed Oct 27, 2023
1 parent e715130 commit 8aaeea6
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 135 deletions.
110 changes: 59 additions & 51 deletions dzira/dzira.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
import sys
import time
from datetime import datetime, timedelta
from functools import partial, wraps
from functools import wraps
from itertools import cycle
from operator import itemgetter
from pathlib import Path
from typing import Any, Callable
from typing import Any, Iterable

import click
from dotenv import dotenv_values
Expand Down Expand Up @@ -45,18 +44,39 @@ class Result:
stdout: str = ""


class D(dict):
def __call__(self, *keys) -> Iterable:
if keys:
return [self.get(k) for k in keys]
else:
return self.values()

def update(self, k, v) -> D:
self[k] = v
return self

def __repr__(self):
return f"betterdict({dict(self)})"


def spin_it(msg="", done="✓", fail="✗"):
spinner = cycle("⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏")
separator = " "
connector = ":\t"

def decorator(func):
func.is_decorated_with_spin_it = True

@wraps(func)
def wrapper(*args, **kwargs):
if not use_spinner:
return func(*args, **kwargs)
result = func(*args, **kwargs)
if out:=result.stdout:
print(c("^green", done, separator, "^reset", out), flush=True)
return result

try:
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(func, *args, **kwargs)

while future.running():
Expand All @@ -68,7 +88,7 @@ def wrapper(*args, **kwargs):

r: Result = future.result()
print(
c("\r", "^green", done, separator, msg, "^reset", r.stdout),
c("\r", "^green", done, separator, msg, "^reset", connector, r.stdout),
flush=True
)
return r
Expand All @@ -81,6 +101,7 @@ def wrapper(*args, **kwargs):
c("\r", "^red", fail, separator, msg),
end=f": {error_msg}\n", flush=True,
)
print(dir(exc))
return wrapper
return decorator

Expand Down Expand Up @@ -127,7 +148,7 @@ def get_jira(config: dict) -> Result:
server=f"https://{config['JIRA_SERVER']}",
basic_auth=(config["JIRA_EMAIL"], config["JIRA_TOKEN"]),
)
msg = f": connecting to {config['JIRA_SERVER']}"
msg = f"connecting to {config['JIRA_SERVER']}"
return Result(stdout=msg, result=jira)


Expand All @@ -146,7 +167,7 @@ def get_board_by_name(jira: JIRA, name: str) -> Result:
board = _get_board_by_name(jira, name)
return Result(
result=board,
stdout=f': {board.raw["location"]["displayName"]}'
stdout=f'{board.raw["location"]["displayName"]}'
)


Expand All @@ -162,7 +183,7 @@ def get_current_sprint(jira: JIRA, board: Board) -> Result:
fmt = lambda d: datetime.strptime(d, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%a, %b %d")
return Result(
result=sprint,
stdout=f": {sprint.name} [{fmt(sprint.startDate)} -> {fmt(sprint.endDate)}]",
stdout=f"{sprint.name} [{fmt(sprint.startDate)} -> {fmt(sprint.endDate)}]",
)


Expand All @@ -175,37 +196,30 @@ def _get_sprint_issues(jira: JIRA, sprint: Sprint) -> list:
@spin_it("Getting issues")
def get_sprint_issues(jira: JIRA, sprint: Sprint) -> Result:
issues = _get_sprint_issues(jira, sprint)
return Result(result=issues, stdout=":")
return Result(result=issues)


# TODO: catch errors and properly process them in Result (stderr?)
@spin_it("Adding worklog")
def add_worklog(jira: JIRA, **payload) -> Result:
issue, time, seconds, comment = itemgetter("issue", "time", "seconds", "comment")(
payload
)

def add_worklog(jira: JIRA, issue, time=None, comment=None, seconds=None, **_) -> Result:
work_log = jira.add_worklog(
issue=issue, timeSpent=time, timeSpentSeconds=seconds, comment=comment
)
return Result(
stdout=(
f": spent {work_log.raw['timeSpent']} in {issue} "
f"spent {work_log.raw['timeSpent']} in {issue} "
f"[worklog {work_log.id}] at {datetime.now():%H:%M:%S}"
)
)


def get_worklog(jira: JIRA, **payload) -> Worklog:
if work_log := jira.worklog(issue=payload["issue"], id=str(payload["worklog_id"])):
def get_worklog(jira: JIRA, issue: str, worklog_id: str, **_) -> Worklog:
if work_log := jira.worklog(issue=issue, id=str(worklog_id)):
return work_log
raise Exception(
f"could not find worklog {payload['worklog_id']} for issue {payload['issue']!r}"
)
raise Exception(f"could not find worklog {worklog_id} for issue {issue!r}")


def _update_worklog(worklog: Worklog, payload: dict) -> None:
time, comment = itemgetter("time", "comment")(payload)
def _update_worklog(worklog: Worklog, time: str | None, comment: str | None) -> None:
if not (time or comment):
raise Exception(
"at least one of <time> or <comment> fields needed to perform the update!"
Expand All @@ -215,9 +229,9 @@ def _update_worklog(worklog: Worklog, payload: dict) -> None:


@spin_it("Updating worklog")
def update_worklog(worklog: Worklog, **payload) -> Result:
_update_worklog(worklog, payload)
return Result(stdout=f": {worklog.id} updated!")
def update_worklog(worklog: Worklog, time: str, comment: str, **_) -> Result:
_update_worklog(worklog, time, comment)
return Result(stdout=f"{worklog.id} updated!")


##################################################
Expand Down Expand Up @@ -274,7 +288,6 @@ def show_issues(issues: list) -> None:
fmt = lambda t: timedelta(seconds=t) if t else None
state_clr = {"To Do": "^magenta", "In Progress": "^yellow", "Done": "^green"}
clr = lambda s: c(state_clr.get(s, "^reset"), "^bold", s)
# breakpoint()

def estimate(i):
try:
Expand Down Expand Up @@ -354,13 +367,11 @@ def validate_hour(ctx, param, value):
)


def check_params(**args) -> None:
time, start, worklog_id, comment = itemgetter(
"time", "start", "worklog_id", "comment"
)(args)
def sanitize_params(args: D) -> D:
time, start, worklog_id, comment = args("time", "start", "worklog_id", "comment")

if (time or start) or (worklog_id and comment):
return
return calculate_seconds(args)

if worklog_id and (comment is None):
msg = "to update a worklog, either time spent or a comment is needed"
Expand All @@ -374,8 +385,8 @@ def check_params(**args) -> None:

### Payload

def calculate_seconds(**payload) -> dict:
start, end = itemgetter("start", "end")(payload)
def calculate_seconds(payload: D) -> D:
start, end = payload("start", "end")

if start is None:
return payload
Expand All @@ -391,15 +402,15 @@ def calculate_seconds(**payload) -> dict:
raise click.BadParameter("start time cannot be later than end time")
else:
delta_seconds = (t2 - t1).total_seconds()
payload["seconds"] = str(int(delta_seconds))
return payload
return payload.update("seconds", str(int(delta_seconds)))


def establish_issue(jira: JIRA, config: dict, issue: str) -> str:
def establish_issue(jira: JIRA, config: dict, payload: D) -> D:
board = config.get("JIRA_BOARD")
issue = payload.get("issue", "")

if issue.isdigit():
return f"{board}-{issue}"
return payload.update("issue", f"{board}-{issue}")

jira_board = get_board_by_name(jira, board).result
sprint = get_current_sprint(jira, jira_board)
Expand All @@ -412,14 +423,15 @@ def establish_issue(jira: JIRA, config: dict, issue: str) -> str:
msg = "\n".join(f" * {i.key}: {i.fields.summary}" for i in candidates)
raise Exception("found more than one matching issue:\n" + msg)

return candidates[0].key
return payload.update("issue", candidates[0].key)


def establish_action(jira: JIRA, **payload) -> Callable:
if payload["worklog_id"]:
def perform_log_action(jira: JIRA, payload: D) -> None:
if payload["worklog_id"] is not None:
worklog = get_worklog(jira, **payload)
return partial(update_worklog, worklog)
return partial(add_worklog, jira)
update_worklog(worklog, **payload)
else:
add_worklog(jira, **payload)


@cli.command()
Expand Down Expand Up @@ -454,7 +466,7 @@ def establish_action(jira: JIRA, **payload) -> Callable:
)
@click.option("--spin/--no-spin", default=True)
@click.help_option("-h", "--help")
def log(ctx, issue, **rest):
def log(ctx, **_):
"""
Log time spent on ISSUE number or ISSUE with description containing
matching string.
Expand All @@ -469,15 +481,11 @@ def log(ctx, issue, **rest):
When WORKLOG id is present, it will update an existing log,
rather then create a new one.
"""
check_params(**rest)

payload = calculate_seconds(**ctx.params)
payload = sanitize_params(D(ctx.params))
config = get_config(config=ctx.obj)
jira = get_jira(config).result
payload["issue"] = establish_issue(jira, config, issue)
action = establish_action(jira, **payload)

action(**payload)
payload = establish_issue(jira, config, payload)
perform_log_action(jira, payload)


def main():
Expand Down
Loading

0 comments on commit 8aaeea6

Please sign in to comment.