From f29151a10cdfcf45c79e06a31ed3f6e2d1b6e0c4 Mon Sep 17 00:00:00 2001 From: David Dudas Date: Mon, 13 Jan 2025 11:04:53 -0800 Subject: [PATCH] [Issue 3493] Remove legacy sprint and delivery reports code from analytics CLI and Makefile (#3496) ## Summary Fixes #3493 ### Time to review: __2 mins__ ## Changes proposed > What was added, updated, or removed in this PR. Sprint and Delivery reports are now generated in Metabase, therefore the legacy code that generated those reports and posted them to slack is now deprecated. This PR removes the deprecated code. ## Context for reviewers > Testing instructions, background context, more in-depth details of the implementation, and anything else you'd like to call out or ask reviewers. Explain how the changes were verified. ## Additional information > Screenshots, GIF demos, code examples or output to help show the changes working as expected. --- analytics/Makefile | 35 - analytics/src/analytics/cli.py | 131 +--- analytics/src/analytics/metrics/__init__.py | 1 - analytics/src/analytics/metrics/base.py | 139 ---- analytics/src/analytics/metrics/burndown.py | 163 ---- analytics/src/analytics/metrics/burnup.py | 157 ---- .../src/analytics/metrics/percent_complete.py | 170 ---- analytics/src/analytics/metrics/utils.py | 129 --- analytics/tests/metrics/__init__.py | 1 - analytics/tests/metrics/test_base.py | 69 -- analytics/tests/metrics/test_burndown.py | 673 ---------------- analytics/tests/metrics/test_burnup.py | 738 ------------------ .../tests/metrics/test_percent_complete.py | 434 ---------- analytics/tests/test_cli.py | 216 ----- 14 files changed, 1 insertion(+), 3055 deletions(-) delete mode 100644 analytics/src/analytics/metrics/__init__.py delete mode 100644 analytics/src/analytics/metrics/base.py delete mode 100644 analytics/src/analytics/metrics/burndown.py delete mode 100644 analytics/src/analytics/metrics/burnup.py delete mode 100644 analytics/src/analytics/metrics/percent_complete.py delete mode 100644 analytics/src/analytics/metrics/utils.py delete mode 100644 analytics/tests/metrics/__init__.py delete mode 100644 analytics/tests/metrics/test_base.py delete mode 100644 analytics/tests/metrics/test_burndown.py delete mode 100644 analytics/tests/metrics/test_burnup.py delete mode 100644 analytics/tests/metrics/test_percent_complete.py diff --git a/analytics/Makefile b/analytics/Makefile index 1926bba4f..b3dcf9447 100644 --- a/analytics/Makefile +++ b/analytics/Makefile @@ -177,38 +177,3 @@ gh-data-export: --output-file $(ISSUE_FILE) \ --temp-dir $(OUTPUT_DIR) -sprint-burndown: - @echo "=> Running sprint burndown report for HHS/13" - @echo "=====================================================" - $(POETRY) analytics calculate sprint_burndown \ - --issue-file $(ISSUE_FILE) \ - --output-dir $(OUTPUT_DIR) \ - --sprint "$(SPRINT)" \ - --project 13 \ - --unit $(UNIT) \ - --$(ACTION) - @echo "=====================================================" - @echo "=> Running sprint burndown report for HHS/17" - @echo "=====================================================" - $(POETRY) analytics calculate sprint_burndown \ - --issue-file $(ISSUE_FILE) \ - --output-dir $(OUTPUT_DIR) \ - --sprint "$(SPRINT)" \ - --project 17 \ - --unit $(UNIT) \ - --$(ACTION) - -percent-complete: - @echo "=> Running percent complete deliverable" - @echo "=====================================================" - $(POETRY) analytics calculate deliverable_percent_complete \ - --issue-file $(ISSUE_FILE) \ - --output-dir $(OUTPUT_DIR) \ - --include-status "In Progress" \ - --include-status "Planning" \ - --unit $(UNIT) \ - --$(ACTION) - -sprint-reports: sprint-burndown percent-complete - -sprint-reports-with-latest-data: gh-data-export sprint-reports diff --git a/analytics/src/analytics/cli.py b/analytics/src/analytics/cli.py index d40bc5a9f..b2db94e30 100644 --- a/analytics/src/analytics/cli.py +++ b/analytics/src/analytics/cli.py @@ -9,14 +9,13 @@ from typing import Annotated import typer -from slack_sdk import WebClient from sqlalchemy import text from analytics.datasets.etl_dataset import EtlDataset from analytics.datasets.issues import GitHubIssues from analytics.etl.github import GitHubProjectConfig, GitHubProjectETL from analytics.etl.utils import load_config -from analytics.integrations import etldb, slack +from analytics.integrations import etldb from analytics.integrations.db import PostgresDbClient from analytics.integrations.extracts.load_opportunity_data import ( extract_copy_opportunity_data, @@ -24,10 +23,6 @@ from analytics.logs import init as init_logging from analytics.logs.app_logger import init_app from analytics.logs.ecs_background_task import ecs_background_task -from analytics.metrics.base import BaseMetric, Unit -from analytics.metrics.burndown import SprintBurndown -from analytics.metrics.burnup import SprintBurnup -from analytics.metrics.percent_complete import DeliverablePercentComplete logger = logging.getLogger(__name__) @@ -38,15 +33,8 @@ OUTPUT_FILE_ARG = typer.Option(help="Path to file where exported data will be saved") OUTPUT_DIR_ARG = typer.Option(help="Path to directory where output files will be saved") TMP_DIR_ARG = typer.Option(help="Path to directory where intermediate files will be saved") -SPRINT_ARG = typer.Option(help="Name of the sprint for which we're calculating burndown") -UNIT_ARG = typer.Option(help="Whether to calculate completion by 'points' or 'tickets'") OWNER_ARG = typer.Option(help="Name of the GitHub project owner, e.g. HHS") PROJECT_ARG = typer.Option(help="Number of the GitHub project, e.g. 13") -SHOW_RESULTS_ARG = typer.Option(help="Display a chart of the results in a browser") -POST_RESULTS_ARG = typer.Option(help="Post the results to slack") -STATUS_ARG = typer.Option( - help="Deliverable status to include in report, can be passed multiple times", -) EFFECTIVE_DATE_ARG = typer.Option(help="YYYY-MM-DD effective date to apply to each imported row") # fmt: on @@ -54,12 +42,10 @@ app = typer.Typer() # instantiate sub-commands for exporting data and calculating metrics export_app = typer.Typer() -metrics_app = typer.Typer() import_app = typer.Typer() etl_app = typer.Typer() # add sub-commands to main entrypoint app.add_typer(export_app, name="export", help="Export data needed to calculate metrics") -app.add_typer(metrics_app, name="calculate", help="Calculate key project metrics") app.add_typer(import_app, name="import", help="Import data into the database") app.add_typer(etl_app, name="etl", help="Transform and load local file") @@ -101,121 +87,6 @@ def export_github_data( GitHubProjectETL(config).run() -# =========================================================== -# Calculate commands -# =========================================================== - - -@metrics_app.command(name="sprint_burndown") -def calculate_sprint_burndown( - issue_file: Annotated[str, ISSUE_FILE_ARG], - sprint: Annotated[str, SPRINT_ARG], - unit: Annotated[Unit, UNIT_ARG] = Unit.points.value, # type: ignore[assignment] - *, # makes the following args keyword only - show_results: Annotated[bool, SHOW_RESULTS_ARG] = False, - post_results: Annotated[bool, POST_RESULTS_ARG] = False, - output_dir: Annotated[str, OUTPUT_DIR_ARG] = "data", - owner: Annotated[str, OWNER_ARG] = "HHS", - project: Annotated[int, PROJECT_ARG] = 13, -) -> None: - """Calculate the burndown for a particular sprint.""" - # load the input data - sprint_data = GitHubIssues.from_json(issue_file) - # calculate burndown - burndown = SprintBurndown( - sprint_data, - sprint=sprint, - unit=unit, - project=project, - owner=owner, - ) - show_and_or_post_results( - metric=burndown, - show_results=show_results, - post_results=post_results, - output_dir=output_dir, - ) - - -@metrics_app.command(name="sprint_burnup") -def calculate_sprint_burnup( - issue_file: Annotated[str, ISSUE_FILE_ARG], - sprint: Annotated[str, SPRINT_ARG], - unit: Annotated[Unit, UNIT_ARG] = Unit.points.value, # type: ignore[assignment] - *, # makes the following args keyword only - show_results: Annotated[bool, SHOW_RESULTS_ARG] = False, - post_results: Annotated[bool, POST_RESULTS_ARG] = False, - output_dir: Annotated[str, OUTPUT_DIR_ARG] = "data", -) -> None: - """Calculate the burnup of a particular sprint.""" - # load the input data - sprint_data = GitHubIssues.from_json(issue_file) - # calculate burnup - burnup = SprintBurnup(sprint_data, sprint=sprint, unit=unit) - show_and_or_post_results( - metric=burnup, - show_results=show_results, - post_results=post_results, - output_dir=output_dir, - ) - - -@metrics_app.command(name="deliverable_percent_complete") -def calculate_deliverable_percent_complete( - issue_file: Annotated[str, ISSUE_FILE_ARG], - # Typer uses the Unit enum to validate user inputs from the CLI - # but the default arg must be a string or the CLI will throw an error - unit: Annotated[Unit, UNIT_ARG] = Unit.points.value, # type: ignore[assignment] - *, # makes the following args keyword only - show_results: Annotated[bool, SHOW_RESULTS_ARG] = False, - post_results: Annotated[bool, POST_RESULTS_ARG] = False, - output_dir: Annotated[str, OUTPUT_DIR_ARG] = "data", - include_status: Annotated[list[str] | None, STATUS_ARG] = None, -) -> None: - """Calculate percentage completion by deliverable.""" - task_data = GitHubIssues.from_json(issue_file) - # calculate percent complete - metric = DeliverablePercentComplete( - dataset=task_data, - unit=unit, - statuses_to_include=include_status, - ) - show_and_or_post_results( - metric=metric, - show_results=show_results, - post_results=post_results, - output_dir=output_dir, - ) - - -def show_and_or_post_results( - metric: BaseMetric, - *, # makes the following args keyword only - show_results: bool, - post_results: bool, - output_dir: str, -) -> None: - """Optionally show the results of a metric and/or post them to slack.""" - # defer load of settings until this command is called - # this prevents an error if ANALYTICS_SLACK_BOT_TOKEN env var is unset - from config import get_db_settings - - settings = get_db_settings() - - # optionally display the burndown chart in the browser - if show_results: - metric.show_chart() - print("Slack message:\n") - print(metric.format_slack_message()) - if post_results: - slackbot = slack.SlackBot(client=WebClient(token=settings.slack_bot_token)) - metric.post_results_to_slack( - slackbot=slackbot, - channel_id=settings.reporting_channel_id, - output_dir=Path(output_dir), - ) - - # =========================================================== # Import commands # =========================================================== diff --git a/analytics/src/analytics/metrics/__init__.py b/analytics/src/analytics/metrics/__init__.py deleted file mode 100644 index e558f81e2..000000000 --- a/analytics/src/analytics/metrics/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Calculate a set of metrics that are important to the project.""" diff --git a/analytics/src/analytics/metrics/base.py b/analytics/src/analytics/metrics/base.py deleted file mode 100644 index b60436ac9..000000000 --- a/analytics/src/analytics/metrics/base.py +++ /dev/null @@ -1,139 +0,0 @@ -"""Base class for all metrics.""" - -from dataclasses import dataclass -from enum import Enum -from pathlib import Path -from typing import Any, Generic, TypeVar - -import pandas as pd -from plotly.graph_objects import Figure - -from analytics.datasets.base import BaseDataset -from analytics.integrations.slack import FileMapping, SlackBot - -Dataset = TypeVar("Dataset", bound=BaseDataset) - - -class Unit(Enum): - """List the units in which metrics can be calculated.""" - - issues = "issues" # pylint: disable=C0103 - points = "points" # pylint: disable=C0103 - - -@dataclass -class Statistic: - """Store a single value that represents a summary statistic about a dataset.""" - - value: Any - suffix: str = "" - - -class BaseMetric(Generic[Dataset]): - """Base class for all metrics.""" - - CHART_PNG = "chart-static.png" - CHART_HTML = "chart-interactive.html" - RESULTS_CSV = "results.csv" - DATASET_CSV = "source-data.csv" - - def __init__(self, dataset: Dataset) -> None: - """Initialize and calculate the metric from the input dataset.""" - self.dataset = dataset - self.results = self.calculate() - self.stats = self.get_stats() - self._chart: Figure | None = None - - def calculate(self) -> pd.DataFrame: - """Calculate the metric and return the resulting dataset.""" - raise NotImplementedError - - def get_stats(self) -> dict[str, Statistic]: - """Get the list of stats associated with this metric to include in reporting.""" - raise NotImplementedError - - @property - def chart(self) -> Figure: - """ - Return a chart visualizing the results. - - Note: - ---- - By deferring the self.plot_results() method invocation until the chart is - needed, we decrease the amount of time required to instantiate the class - - """ - if not self._chart: - self._chart = self.plot_results() - return self._chart - - def plot_results(self) -> Figure: - """Create a plotly chart that visually represents the results.""" - raise NotImplementedError - - def export_results(self, output_dir: Path = Path("data")) -> Path: - """Export the self.results dataframe to a csv file.""" - # make sure the parent directory exists - output_dir.mkdir(exist_ok=True, parents=True) - output_path = output_dir / self.RESULTS_CSV - # export results dataframe to a csv - self.results.to_csv(output_path) - return output_path - - def export_dataset(self, output_dir: Path = Path("data")) -> Path: - """Export self.dataset to a csv file.""" - # make sure the parent directory exists - output_dir.mkdir(exist_ok=True, parents=True) - output_path = output_dir / self.DATASET_CSV - # export results dataframe to a csv - self.dataset.to_csv(output_path) - return output_path - - def export_chart_to_html(self, output_dir: Path = Path("data")) -> Path: - """Export the plotly chart in self.chart to a png file.""" - # make sure the parent directory exists - output_dir.mkdir(exist_ok=True, parents=True) - output_path = output_dir / self.CHART_HTML - # export chart to a png - self.chart.write_html(output_path) - return output_path - - def export_chart_to_png(self, output_dir: Path = Path("data")) -> Path: - """Export the plotly chart in self.chart to a png file.""" - # make sure the parent directory exists - output_dir.mkdir(exist_ok=True, parents=True) - output_path = output_dir / self.CHART_PNG - # export chart to a png - self.chart.write_image(output_path, width=900) - return output_path - - def show_chart(self) -> None: - """Display self.chart in a browser.""" - self.chart.show() - - def format_slack_message(self) -> str: - """Format the message that will be included with the charts posted to slack.""" - raise NotImplementedError - - def post_results_to_slack( - self, - slackbot: SlackBot, - channel_id: str, - output_dir: Path = Path("data"), - ) -> None: - """Upload copies of the results and chart to a slack channel.""" - results_csv = self.export_results(output_dir) - dataset_csv = self.export_dataset(output_dir) - chart_png = self.export_chart_to_png(output_dir) - chart_html = self.export_chart_to_html(output_dir) - files = [ - FileMapping(path=str(results_csv), name=results_csv.name), - FileMapping(path=str(dataset_csv), name=dataset_csv.name), - FileMapping(path=str(chart_png), name=chart_png.name), - FileMapping(path=str(chart_html), name=chart_html.name), - ] - slackbot.upload_files_to_slack_channel( - files=files, - channel_id=channel_id, - message=self.format_slack_message(), - ) diff --git a/analytics/src/analytics/metrics/burndown.py b/analytics/src/analytics/metrics/burndown.py deleted file mode 100644 index 041f79559..000000000 --- a/analytics/src/analytics/metrics/burndown.py +++ /dev/null @@ -1,163 +0,0 @@ -""" -Calculates burndown for sprints. - -This is a subclass of the BaseMetric class that calculates the running total of -open issues for each day in a sprint -""" - -from __future__ import annotations - -from typing import TYPE_CHECKING - -import pandas as pd -import plotly.express as px - -from analytics.datasets.issues import GitHubIssues -from analytics.metrics.base import BaseMetric, Statistic, Unit -from analytics.metrics.utils import Columns, sum_tix_by_day - -if TYPE_CHECKING: - from plotly.graph_objects import Figure - - -class SprintBurndown(BaseMetric[GitHubIssues]): - """Calculates the running total of open issues per day in the sprint.""" - - def __init__( - self, - dataset: GitHubIssues, - sprint: str, - unit: Unit, - project: int, - owner: str = "HHS", - ) -> None: - """Initialize the SprintBurndown metric.""" - self.dataset = dataset - self.project = project - self.owner = owner - self.sprint = self._get_and_validate_sprint_name(sprint) - self.sprint_data = self._isolate_data_for_this_sprint() - self.date_col = "date" - self.columns = Columns( - opened_at_col=dataset.opened_col, - closed_at_col=dataset.closed_col, - unit_col=dataset.points_col if unit == Unit.points else unit.value, - date_col=self.date_col, - ) - self.unit = unit - # Set the value of the unit column based on - # whether we're summing issues or story points - self.unit_col = dataset.points_col if unit == Unit.points else unit.value - super().__init__(dataset) - - def calculate(self) -> pd.DataFrame: - """Calculate the sprint burnup.""" - # make a copy of columns and rows we need to calculate burndown for this sprint - burnup_cols = [ - self.dataset.opened_col, - self.dataset.closed_col, - self.dataset.points_col, - ] - df_sprint = self.sprint_data[burnup_cols].copy() - # Count the number of tickets opened, closed, and remaining by day - return sum_tix_by_day( - df=df_sprint, - cols=self.columns, - unit=self.unit, - sprint_end=self.dataset.sprint_end(self.sprint), - ) - - def plot_results(self) -> Figure: - """Plot the sprint burndown using a plotly line chart.""" - # Limit the data in the line chart to dates within the sprint - # or through today, if the sprint hasn't yet ended - # NOTE: This will *not* affect the running totals on those days - sprint_start = self.dataset.sprint_start(self.sprint) - sprint_end = self.dataset.sprint_end(self.sprint) - date_mask = self.results[self.date_col].between( - sprint_start, - min(sprint_end, pd.Timestamp.today()), - ) - df = self.results[date_mask] - # create a line chart from the data in self.results - title = ( - f"{self.owner}/{self.project} {self.sprint} burndown " - f"in project by {self.unit.value}" - ) - chart = px.line( - data_frame=df, - x=self.date_col, - y="total_open", - title=title, - labels={"total_open": f"total {self.unit.value} open"}, - ) - # set the scale of the y axis to start at 0 - chart.update_yaxes(range=[0, df["total_open"].max() + 2]) - chart.update_xaxes(range=[sprint_start, sprint_end]) - return chart - - def get_stats(self) -> dict[str, Statistic]: - """ - Calculate summary statistics for this metric. - - Notes - ----- - TODO(@widal001): 2023-12-04 - Should stats be calculated in separate private methods? - - """ - df = self.results - # get sprint start and end dates - sprint_start = self.dataset.sprint_start(self.sprint).strftime("%Y-%m-%d") - sprint_end = self.dataset.sprint_end(self.sprint).strftime("%Y-%m-%d") - # get open and closed counts and percentages - total_opened = int(df["opened"].sum()) - total_closed = int(df["closed"].sum()) - pct_closed = round(total_closed / total_opened * 100, 2) - # get the percentage of tickets that were ticketed - is_pointed = self.sprint_data[self.dataset.points_col] >= 1 - issues_pointed = len(self.sprint_data[is_pointed]) - issues_total = len(self.sprint_data) - pct_pointed = round(issues_pointed / issues_total * 100, 2) - # format and return stats - return { - "Sprint start date": Statistic(value=sprint_start), - "Sprint end date": Statistic(value=sprint_end), - "Total opened": Statistic(total_opened, suffix=f" {self.unit.value}"), - "Total closed": Statistic(total_closed, suffix=f" {self.unit.value}"), - "Percent closed": Statistic(value=pct_closed, suffix="%"), - "Percent pointed": Statistic( - value=pct_pointed, - suffix=f"% of {Unit.issues.value}", - ), - } - - def format_slack_message(self) -> str: - """Format the message that will be included with the charts posted to slack.""" - message = ( - f"*:github: Burndown summary for {self.sprint} " - f"in project {self.owner}/{self.project} by {self.unit.value}*\n" - ) - for label, stat in self.stats.items(): - message += f"• *{label}:* {stat.value}{stat.suffix}\n" - return message - - def _get_and_validate_sprint_name(self, sprint: str | None) -> str: - """Get the name of the sprint we're using to calculate burndown or raise an error.""" - # save dataset to local variable for brevity - dataset = self.dataset - # update sprint name if calculating burndown for the current sprint - if sprint == "@current": - sprint = dataset.current_sprint - # check that the sprint name matches one of the sprints in the dataset - valid_sprint = sprint in list(dataset.sprints[dataset.sprint_col]) - if not sprint or not valid_sprint: # needs `not sprint` for mypy checking - msg = "Sprint value doesn't match one of the available sprints" - raise ValueError(msg) - # return the sprint name if it's valid - return sprint - - def _isolate_data_for_this_sprint(self) -> pd.DataFrame: - """Filter out issues that are not assigned to the current sprint or project.""" - sprint_filter = self.dataset.df[self.dataset.sprint_col] == self.sprint - project_filter = self.dataset.df[self.dataset.project_col] == self.project - return self.dataset.df[((sprint_filter) & (project_filter))] diff --git a/analytics/src/analytics/metrics/burnup.py b/analytics/src/analytics/metrics/burnup.py deleted file mode 100644 index b93b78dda..000000000 --- a/analytics/src/analytics/metrics/burnup.py +++ /dev/null @@ -1,157 +0,0 @@ -""" -Calculates burnup for sprints. - -This is a subclass of the BaseMetric class that calculates the running total of -open issues for each day in a sprint -""" - -from __future__ import annotations - -from typing import TYPE_CHECKING - -import pandas as pd -import plotly.express as px - -from analytics.datasets.issues import GitHubIssues -from analytics.metrics.base import BaseMetric, Statistic, Unit -from analytics.metrics.utils import Columns, sum_tix_by_day - -if TYPE_CHECKING: - from plotly.graph_objects import Figure - - -class SprintBurnup(BaseMetric[GitHubIssues]): - """Calculates the running total of open issues per day in the sprint.""" - - def __init__( - self, - dataset: GitHubIssues, - sprint: str, - unit: Unit, - ) -> None: - """Initialize the SprintBurnup metric.""" - self.dataset = dataset - self.sprint = self._get_and_validate_sprint_name(sprint) - self.sprint_data = self._isolate_data_for_this_sprint() - self.date_col = "date" - self.columns = Columns( - opened_at_col=dataset.opened_col, - closed_at_col=dataset.closed_col, - unit_col=dataset.points_col if unit == Unit.points else unit.value, - date_col=self.date_col, - ) - self.unit = unit - super().__init__(dataset) - - def calculate(self) -> pd.DataFrame: - """Calculate the sprint burnup.""" - # make a copy of columns and rows we need to calculate burndown for this sprint - burnup_cols = [ - self.dataset.opened_col, - self.dataset.closed_col, - self.dataset.points_col, - ] - df_sprint = self.sprint_data[burnup_cols].copy() - # Count the number of tickets opened, closed, and remaining by day - return sum_tix_by_day( - df=df_sprint, - cols=self.columns, - unit=self.unit, - sprint_end=self.dataset.sprint_end(self.sprint), - ) - - def plot_results(self) -> Figure: - """Plot the sprint burnup using a plotly area chart.""" - # Limit the data in the area chart to dates within the sprint - # or through today, if the sprint hasn't yet ended - # NOTE: This will *not* affect the running totals on those days - sprint_start = self.dataset.sprint_start(self.sprint) - sprint_end = self.dataset.sprint_end(self.sprint) - date_mask = self.results[self.date_col].between( - sprint_start, - min(sprint_end, pd.Timestamp.today()), - ) - df = self.results[date_mask].melt( - id_vars=self.date_col, - value_vars=["total_closed", "total_open"], - var_name="cols", - ) - - # create a area chart from the data in self.results - chart = px.area( - data_frame=df, - x=self.date_col, - y="value", - color="cols", - color_discrete_sequence=["#EFE0FC", "#2DA34D"], - markers=True, - title=f"{self.sprint} Burnup by {self.unit.value}", - template="none", - ) - # set the scale of the y axis to start at 0 - chart.update_yaxes(range=[0, df["value"].max() + 10]) - chart.update_xaxes(range=[sprint_start, sprint_end]) - chart.update_layout( - xaxis_title="Date", - yaxis_title=f"Total {self.unit.value.capitalize()}", - legend_title=f"{self.unit.value.capitalize()}", - ) - return chart - - def get_stats(self) -> dict[str, Statistic]: - """Calculate summary statistics for this metric.""" - df = self.results - # get sprint start and end dates - sprint_start = self.dataset.sprint_start(self.sprint).strftime("%Y-%m-%d") - sprint_end = self.dataset.sprint_end(self.sprint).strftime("%Y-%m-%d") - # get open and closed counts and percentages - total_opened = int(df["opened"].sum()) - total_closed = int(df["closed"].sum()) - pct_closed = round(total_closed / total_opened * 100, 2) - # For burnup, we want to know at a glance the pct_remaining - pct_remaining = round(100 - pct_closed, 2) - # get the percentage of tickets that were ticketed - is_pointed = self.sprint_data[self.dataset.points_col] >= 1 - issues_pointed = len(self.sprint_data[is_pointed]) - issues_total = len(self.sprint_data) - pct_pointed = round(issues_pointed / issues_total * 100, 2) - # format and return stats - return { - "Sprint start date": Statistic(value=sprint_start), - "Sprint end date": Statistic(value=sprint_end), - "Total opened": Statistic(total_opened, suffix=f" {self.unit.value}"), - "Total closed": Statistic(total_closed, suffix=f" {self.unit.value}"), - "Percent closed": Statistic(value=pct_closed, suffix="%"), - "Percent remaining": Statistic(value=pct_remaining, suffix="%"), - "Percent pointed": Statistic( - value=pct_pointed, - suffix=f"% of {Unit.issues.value}", - ), - } - - def format_slack_message(self) -> str: - """Format the message that will be included with the charts posted to slack.""" - message = f"*:github: Burnup summary for {self.sprint} by {self.unit.value}*\n" - for label, stat in self.stats.items(): - message += f"• *{label}:* {stat.value}{stat.suffix}\n" - return message - - def _get_and_validate_sprint_name(self, sprint: str | None) -> str: - """Get the name of the sprint we're using to calculate burndown or raise an error.""" - # save dataset to local variable for brevity - dataset = self.dataset - # update sprint name if calculating burndown for the current sprint - if sprint == "@current": - sprint = dataset.current_sprint - # check that the sprint name matches one of the sprints in the dataset - valid_sprint = sprint in list(dataset.sprints[dataset.sprint_col]) - if not sprint or not valid_sprint: # needs `not sprint` for mypy checking - msg = "Sprint value doesn't match one of the available sprints" - raise ValueError(msg) - # return the sprint name if it's valid - return sprint - - def _isolate_data_for_this_sprint(self) -> pd.DataFrame: - """Filter out issues that are not assigned to the current sprint.""" - sprint_filter = self.dataset.df[self.dataset.sprint_col] == self.sprint - return self.dataset.df[sprint_filter] diff --git a/analytics/src/analytics/metrics/percent_complete.py b/analytics/src/analytics/metrics/percent_complete.py deleted file mode 100644 index a17087aaa..000000000 --- a/analytics/src/analytics/metrics/percent_complete.py +++ /dev/null @@ -1,170 +0,0 @@ -"""Calculate and visualizes percent completion by deliverable.""" - -import datetime as dt - -import pandas as pd -import plotly.express as px -from plotly.graph_objects import Figure - -from analytics.datasets.issues import GitHubIssues -from analytics.metrics.base import BaseMetric, Statistic, Unit - - -class DeliverablePercentComplete(BaseMetric[GitHubIssues]): - """Calculate the percentage of issues or points completed per deliverable.""" - - def __init__( - self, - dataset: GitHubIssues, - unit: Unit, - statuses_to_include: list[str] | None = None, - ) -> None: - """Initialize the DeliverablePercentComplete metric.""" - self.dataset = dataset - self.deliverable_col = "deliverable_title" - self.status_col = "issue_state" - self.deliverable_status_col = "deliverable_status" - self.unit = unit - self.unit_col = dataset.points_col if unit == Unit.points else unit.value - self.statuses_to_include = statuses_to_include - self.deliverable_data = self._isolate_deliverables_by_status() - super().__init__(dataset) - - def calculate(self) -> pd.DataFrame: - """ - Calculate the percent complete per deliverable. - - Notes - ----- - Percent completion is calculated using the following steps: - 1. Count the number of all issues (or points) per deliverable - 2. Count the number of closed issues (or points) per deliverable - 3. Left join all issues/points with open issues/points on deliverable - so that we have a row per deliverable with a total count column - and a closed count column - 4. Subtract closed count from total count to get open count - 5. Divide closed count by total count to get percent complete - - """ - # get total and closed counts per deliverable - df_total = self._get_count_by_deliverable(status="all") - df_closed = self._get_count_by_deliverable(status="closed") - # join total and closed counts on deliverable - # and calculate remaining columns - df_all = df_total.merge(df_closed, on=self.deliverable_col, how="left") - df_all = df_all.fillna(0) - df_all["open"] = df_all["total"] - df_all["closed"] - df_all["percent_complete"] = df_all["closed"] / df_all["total"] - df_all["percent_complete"] = df_all["percent_complete"].fillna(0) - return df_all - - def plot_results(self) -> Figure: - """Create a bar chart of percent completion from the data in self.results.""" - # get the current date in YYYY-MM-DD format - today = dt.datetime.now(tz=dt.timezone.utc).strftime("%Y-%m-%d") - # reshape the dataframe in self.results for plotly - df = self._prepare_result_dataframe_for_plotly() - # create a stacked bar chart from the data - return px.bar( - df, - x=self.unit.value, - y=self.deliverable_col, - color=self.status_col, - text="percent_of_total", - labels={self.deliverable_col: "deliverable"}, - color_discrete_map={"open": "#aacde3", "closed": "#06508f"}, - orientation="h", - title=f"Percent of {self.unit.value} complete by deliverable as of {today}", - height=800, - ) - - def get_stats(self) -> dict[str, Statistic]: - """Calculate stats for this metric.""" - df_src = self.deliverable_data - # get the total number of issues and the number of issues with points per deliverable - is_pointed = df_src[self.dataset.points_col] >= 1 - issues_total = df_src.value_counts(self.deliverable_col).to_frame() - issues_pointed = ( - df_src[is_pointed].value_counts(self.deliverable_col).to_frame() - ) - # join the count of all issues to the count of pointed issues and - # calculate the percentage of all issues that have points per deliverable - df_tgt = issues_total.join(issues_pointed, lsuffix="_total", rsuffix="_pointed") - df_tgt["pct_pointed"] = df_tgt["count_pointed"] / df_tgt["count_total"] * 100 - df_tgt["pct_pointed"] = round(df_tgt["pct_pointed"], 2).fillna(0) - # export to a dictionary of stats) - stats = {} - for row in df_tgt.reset_index().to_dict("records"): - deliverable = row[self.deliverable_col] - stats[deliverable] = Statistic( - value=row["pct_pointed"], - suffix=f"% of {Unit.issues.value} pointed", - ) - return stats - - def format_slack_message(self) -> str: - """Format the message that will be included with the charts posted to slack.""" - message = f"*:github: Percent of {self.unit.value} completed by deliverable*\n" - if self.statuses_to_include: - statuses = ", ".join(self.statuses_to_include) - message += f"Limited to deliverables with these statuses: {statuses}\n\n" - for label, stat in self.stats.items(): - message += f"• *{label}:* {stat.value}{stat.suffix}\n" - return message - - def _isolate_deliverables_by_status(self) -> pd.DataFrame: - """Isolate the deliverables to include in the report based on their status.""" - df = self.dataset.df - # if statuses_to_include is provided, use it to filter the dataset - statuses_provided = self.statuses_to_include - if statuses_provided: - status_filter = df[self.deliverable_status_col].isin(statuses_provided) - df = df[status_filter] - return df - - def _get_count_by_deliverable( - self, - status: str, - ) -> pd.DataFrame: - """Get the count of issues (or points) by deliverable and status.""" - # create local copies of the dataset and key column names - df = self.deliverable_data.copy() - unit_col = self.unit_col - key_cols = [self.deliverable_col, unit_col] - # create a dummy column to sum per row if the unit is issues - if self.unit == Unit.issues: - df[unit_col] = 1 - # isolate issues with the status we want - if status != "all": - status_filter = df[self.status_col] == status - df = df.loc[status_filter, key_cols] - else: - status = "total" # rename status var to use as column name - df = df[key_cols] - # group by deliverable and sum the values in the unit field - # then rename the sum column to the value of the status var - # to prevent duplicate col names when open and closed counts are joined - df_agg = df.groupby(self.deliverable_col, as_index=False).agg({unit_col: "sum"}) - return df_agg.rename(columns={unit_col: status}) - - def _prepare_result_dataframe_for_plotly(self) -> pd.DataFrame: - """Stack the open and closed counts self.results for plotly charts.""" - # unpivot open and closed counts so that each deliverable has both - # an open and a closed row with just one column for count - unit_col: str = self.unit.value - df = self.results.melt( - id_vars=[self.deliverable_col], - value_vars=["open", "closed"], - value_name=unit_col, - var_name=self.status_col, - ) - # calculate the percentage of open and closed per deliverable - # so that we can use this value as label in the chart - df["total"] = df.groupby(self.deliverable_col)[unit_col].transform("sum") - df["percent_of_total"] = (df[unit_col] / df["total"] * 100).round(0) - df["percent_of_total"] = ( - df["percent_of_total"].astype("Int64").astype("str") + "%" - ) - # sort the dataframe by count and status so that the resulting chart - # has deliverables with more issues/points at the top - return df.sort_values(["total", self.status_col], ascending=True) diff --git a/analytics/src/analytics/metrics/utils.py b/analytics/src/analytics/metrics/utils.py deleted file mode 100644 index 4db39b850..000000000 --- a/analytics/src/analytics/metrics/utils.py +++ /dev/null @@ -1,129 +0,0 @@ -"""Stores utility functions for Metrics classes.""" - -from __future__ import annotations - -from dataclasses import dataclass -from enum import StrEnum - -import pandas as pd - -from analytics.metrics.base import Unit - - -@dataclass -class Columns: - """List of columns names to use when calculating burnup/down.""" - - opened_at_col: str - closed_at_col: str - unit_col: str - date_col: str = "date" - opened_count_col: str = "opened" - closed_count_col: str = "closed" - delta_col: str = "delta" - - -class IssueState(StrEnum): - """Whether the issue is open or closed.""" - - OPEN = "opened" - CLOSED = "closed" - - -def sum_tix_by_day( - df: pd.DataFrame, - cols: Columns, - unit: Unit, - sprint_end: pd.Timestamp, -) -> pd.DataFrame: - """Count the total number of tix opened, closed, and remaining by day.""" - # Get the date range for burndown/burnup - df_tix_range = get_tix_date_range(df, cols, sprint_end) - # Get the number of tix opened and closed by day - df_opened = get_daily_tix_counts_by_status(df, cols, IssueState.OPEN, unit) - df_closed = get_daily_tix_counts_by_status(df, cols, IssueState.CLOSED, unit) - # combine the daily opened and closed counts to get total open and closed per day - return get_cum_sum_of_tix(cols, df_tix_range, df_opened, df_closed) - - -def get_daily_tix_counts_by_status( - df: pd.DataFrame, - cols: Columns, - state: IssueState, - unit: Unit, -) -> pd.DataFrame: - """ - Count the number of issues or points opened or closed by date. - - Notes - ----- - It does this by: - - Grouping on the created_date or opened_date column, depending on state - - Counting the total number of rows per group - - """ - agg_col = cols.opened_at_col if state == IssueState.OPEN else cols.closed_at_col - unit_col = cols.unit_col - key_cols = [agg_col, unit_col] - if unit == Unit.issues: - df[unit_col] = 1 - df_agg = df[key_cols].groupby(agg_col, as_index=False).agg({unit_col: "sum"}) - return df_agg.rename(columns={agg_col: "date", unit_col: state.value}) - - -def get_tix_date_range( - df: pd.DataFrame, - cols: Columns, - sprint_end: pd.Timestamp, -) -> pd.DataFrame: - """ - Get the data range over which issues were created and closed. - - Notes - ----- - It does this by: - - Finding the date when the sprint ends - - Finding the earliest date a issue was created - - Finding the latest date a issue was closed - - Creating a row for each day between the earliest date a ticket was opened - and either the sprint end _or_ the latest date an issue was closed, - whichever is the later date. - - """ - opened_min = df[cols.opened_at_col].min() - closed_max = df[cols.closed_at_col].max() - closed_max = sprint_end if pd.isna(closed_max) else max(sprint_end, closed_max) - return pd.DataFrame( - pd.date_range(opened_min, closed_max), - columns=["date"], - ) - - -def get_cum_sum_of_tix( - cols: Columns, - dates: pd.DataFrame, - opened: pd.DataFrame, - closed: pd.DataFrame, -) -> pd.DataFrame: - """ - Create results data frame. - - Notes - ----- - It does this by: - - Left joining the full date range to the daily open and closed counts - so that we have a row for each day of the range with a column for tix - opened, a column for tix closed for the day, - - Cumulatively summing the deltas to get the running total of open tix - - Cumulative summing the closed column to get the running total of closed tix - - """ - df = ( - dates.merge(opened, on=cols.date_col, how="left") - .merge(closed, on=cols.date_col, how="left") - .fillna(0) - ) - df[cols.delta_col] = df[cols.opened_count_col] - df[cols.closed_count_col] - df["total_open"] = df[cols.delta_col].cumsum() - df["total_closed"] = df[cols.closed_count_col].cumsum() - return df diff --git a/analytics/tests/metrics/__init__.py b/analytics/tests/metrics/__init__.py deleted file mode 100644 index 8e691a3df..000000000 --- a/analytics/tests/metrics/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Test modules in analytics.metrics package.""" diff --git a/analytics/tests/metrics/test_base.py b/analytics/tests/metrics/test_base.py deleted file mode 100644 index 3362c7532..000000000 --- a/analytics/tests/metrics/test_base.py +++ /dev/null @@ -1,69 +0,0 @@ -"""Test the BaseMetric class.""" - -# pylint: disable=abstract-method -import pandas as pd # noqa: I001 -import pytest - -from analytics.datasets.base import BaseDataset -from analytics.metrics.base import BaseMetric, Statistic - - -class MetricWithoutStats(BaseMetric): - """Create a mock metric for testing without get_stats() method.""" - - def calculate(self) -> pd.DataFrame: - """Implement calculate method.""" - return pd.DataFrame() - - -class MetricWithoutPlotResults(BaseMetric): - """Create a mock metric for testing without get_stats() method.""" - - def calculate(self) -> pd.DataFrame: - """Implement calculate method.""" - return pd.DataFrame() - - def get_stats(self) -> dict[str, Statistic]: - """Implement get_stats method.""" - return {} - - -@pytest.fixture(scope="module", name="dataset") -def mock_dataset() -> BaseDataset: - """Create a mock BaseDataset instance for tests.""" - return BaseDataset(df=pd.DataFrame()) - - -class TestRequiredImplementations: - """Check that NotImplementedError is raised for abstract methods.""" - - def test_raise_not_implemented_on_init_due_to_calculate( - self, - dataset: BaseDataset, - ): - """Error should be raised for __init__() method without calculate().""" - with pytest.raises(NotImplementedError): - BaseMetric(dataset) - - def test_raise_not_implemented_on_init_due_to_get_stats( - self, - dataset: BaseDataset, - ): - """Error should be raised for __init__() method without get_stats().""" - with pytest.raises(NotImplementedError): - MetricWithoutStats(dataset) - - def test_raise_not_implemented_for_plot_results(self, dataset: BaseDataset): - """NotImplementedError should be raised for plot_results().""" - mock_metric = MetricWithoutPlotResults(dataset) - with pytest.raises(NotImplementedError): - mock_metric.plot_results() - - def test_raise_not_implemented_for_format_slack_message( - self, - dataset: BaseDataset, - ): - """NotImplementedError should be raised for format_slack_message().""" - mock_metric = MetricWithoutPlotResults(dataset) - with pytest.raises(NotImplementedError): - mock_metric.format_slack_message() diff --git a/analytics/tests/metrics/test_burndown.py b/analytics/tests/metrics/test_burndown.py deleted file mode 100644 index 6da265b8b..000000000 --- a/analytics/tests/metrics/test_burndown.py +++ /dev/null @@ -1,673 +0,0 @@ -"""Test the analytics.metrics.burndown module.""" - -from pathlib import Path # noqa: I001 - -import pandas as pd -import pytest - -from analytics.datasets.issues import GitHubIssues -from analytics.metrics.burndown import SprintBurndown, Unit - -from tests.conftest import ( - DAY_0, - DAY_1, - DAY_2, - DAY_3, - DAY_4, - MockSlackbot, - issue, -) - - -def result_row( - day: str, - opened: int, - closed: int, - delta: int, - total: int, - closed_total: int, -) -> dict: - """Create a sample result row.""" - return { - "date": pd.Timestamp(day), - "opened": opened, - "closed": closed, - "delta": delta, - "total_open": total, - "total_closed": closed_total, - } - - -@pytest.fixture(name="sample_burndown", scope="module") -def sample_burndown_by_points_fixture() -> SprintBurndown: - """Create a sample burndown to simplify test setup.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # return sprint burndown by points - return SprintBurndown(test_data, sprint="Sprint 1", unit=Unit.points, project=1) - - -class TestSprintBurndownByTasks: - """Test the SprintBurndown class with unit='tasks'.""" - - def test_exclude_tix_assigned_to_other_sprints(self): - """The burndown should exclude tickets that are assigned to other sprints.""" - # setup - create test data - sprint_data = [ - # fmt: off - # include this row - assigned to sprint 1 - issue(issue=1, sprint=1, sprint_start=DAY_1, created=DAY_1, closed=DAY_3), - # exclude this row - assigned to sprint 2 - issue(issue=1, sprint=2, sprint_start=DAY_4, created=DAY_0, closed=DAY_4), - # fmt: on - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.issues, - project=1, - ) - df = output.results - # validation - check min and max dates - assert df[output.date_col].min() == pd.Timestamp(DAY_1) - assert df[output.date_col].max() == pd.Timestamp(DAY_3) - # validation - check burndown output - # fmt: off - expected = [ - result_row(day=DAY_1, opened=1, closed=0, delta=1, total=1, closed_total=0), - result_row(day=DAY_2, opened=0, closed=0, delta=0, total=1, closed_total=0), - result_row(day=DAY_3, opened=0, closed=1, delta=-1, total=0, closed_total=1), - ] - # fmt: on - assert df.to_dict("records") == expected - - def test_count_tix_created_before_sprint_start(self): - """Burndown should include tix opened before the sprint but closed during it.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_2), - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.issues, - project=1, - ) - df = output.results - # validation - check min and max dates - assert df[output.date_col].min() == pd.Timestamp(DAY_0) - assert df[output.date_col].max() == pd.Timestamp(DAY_3) - # validation - check burndown output - # fmt: off - expected = [ - result_row(day=DAY_0, opened=2, closed=0, delta=2, total=2, closed_total=0), - result_row(day=DAY_1, opened=0, closed=0, delta=0, total=2, closed_total=0), - result_row(day=DAY_2, opened=0, closed=1, delta=-1, total=1, closed_total=1), - result_row(day=DAY_3, opened=0, closed=1, delta=-1, total=0, closed_total=2), - ] - # fmt: on - assert df.to_dict("records") == expected - - def test_count_tix_closed_after_sprint_start(self): - """Burndown should include tix closed after the sprint ended.""" - # setup - create test data - sprint_data = [ - issue( # closed before sprint end - issue=1, - sprint_start=DAY_1, - sprint_length=2, - created=DAY_1, - closed=DAY_2, - ), - issue( # closed after sprint end - issue=1, - sprint_start=DAY_1, - sprint_length=2, - created=DAY_1, - closed=DAY_4, - ), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.issues, - project=1, - ) - df = output.results - # validation - check min and max dates - assert df[output.date_col].min() == pd.Timestamp(DAY_1) - assert df[output.date_col].max() == pd.Timestamp(DAY_4) - # validation - check burndown output - # fmt: off - expected = [ - result_row(day=DAY_1, opened=2, closed=0, delta=2, total=2, closed_total=0), - result_row(day=DAY_2, opened=0, closed=1, delta=-1, total=1, closed_total=1), - result_row(day=DAY_3, opened=0, closed=0, delta=0, total=1, closed_total=1), - result_row(day=DAY_4, opened=0, closed=1, delta=-1, total=0, closed_total=2), - ] - # fmt: on - assert df.to_dict("records") == expected - - def test_count_tix_created_after_sprint_start(self): - """Burndown should include tix opened and closed during the sprint.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, closed=DAY_3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.issues, - project=1, - ) - df = output.results - # validation - check burndown output - # fmt: off - expected = [ - result_row(day=DAY_0, opened=1, closed=0, delta=1, total=1, closed_total=0), - result_row(day=DAY_1, opened=0, closed=0, delta=0, total=1, closed_total=0), - result_row(day=DAY_2, opened=1, closed=1, delta=0, total=1, closed_total=1), - result_row(day=DAY_3, opened=0, closed=1, delta=-1, total=0, closed_total=2), - ] - # fmt: on - assert df.to_dict("records") == expected - - def test_include_all_sprint_days_if_tix_closed_early(self): - """All days of the sprint should be included even if all tix were closed early.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_1), - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_1), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.issues, - project=1, - ) - df = output.results - # validation - check max date is end of sprint not last closed date - assert df[output.date_col].max() == pd.Timestamp(DAY_3) - - def test_raise_value_error_if_sprint_arg_not_in_dataset(self): - """A ValueError should be raised if the sprint argument isn't valid.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_1), - issue(issue=1, sprint_start=DAY_1, created=DAY_0), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # validation - with pytest.raises( - ValueError, - match="Sprint value doesn't match one of the available sprints", - ): - SprintBurndown(test_data, sprint="Fake sprint", unit=Unit.issues, project=1) - - def test_calculate_burndown_for_current_sprint(self): - """Use the current sprint if the date falls in the middle of a sprint.""" - # setup - create test data - today = pd.Timestamp.today().floor("d") - day_1 = (today + pd.Timedelta(days=-1)).strftime("%Y-%m-%d") - day_2 = today.strftime("%Y-%m-%d") - day_3 = (today + pd.Timedelta(days=1)).strftime("%Y-%m-%d") - sprint_data = [ # note sprint duration is 2 days by default - issue(issue=1, sprint_start=day_1, created=day_1, closed=day_2), - issue(issue=1, sprint_start=day_1, created=day_1), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="@current", - unit=Unit.issues, - project=1, - ) - df = output.results - # validation - check burndown output - # fmt: off - expected = [ - result_row(day=day_1, opened=2, closed=0, delta=2, total=2, closed_total=0), - result_row(day=day_2, opened=0, closed=1, delta=-1, total=1, closed_total=1), - result_row(day=day_3, opened=0, closed=0, delta=0, total=1, closed_total=1), - ] - # fmt: on - assert df.to_dict("records") == expected - - -class TestSprintBurndownByPoints: - """Test the SprintBurndown class with unit='points'.""" - - def test_burndown_works_with_points(self): - """Burndown should be calculated correctly with points.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.points, - project=1, - ) - df = output.results - # validation - # fmt: off - expected = [ - result_row(day=DAY_0, opened=2, closed=0, delta=2, total=2, closed_total=0), - result_row(day=DAY_1, opened=0, closed=0, delta=0, total=2, closed_total=0), - result_row(day=DAY_2, opened=3, closed=0, delta=3, total=5, closed_total=0), - result_row(day=DAY_3, opened=0, closed=0, delta=0, total=5, closed_total=0), - ] - # fmt: on - assert df.to_dict("records") == expected - - def test_burndown_excludes_tix_without_points(self): - """Burndown should exclude tickets that are not pointed.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_1, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=0), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=None), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.points, - project=1, - ) - df = output.results - # validation - # fmt: off - expected = [ - result_row(day=DAY_1, opened=2, closed=0, delta=2, total=2, closed_total=0), - result_row(day=DAY_2, opened=0, closed=0, delta=0, total=2, closed_total=0), - result_row(day=DAY_3, opened=0, closed=0, delta=0, total=2, closed_total=0), - ] - # fmt: on - assert df.to_dict("records") == expected - - -class TestGetStats: - """Test the SprintBurndown.get_stats() method.""" - - SPRINT_START = "Sprint start date" - SPRINT_END = "Sprint end date" - TOTAL_OPENED = "Total opened" - TOTAL_CLOSED = "Total closed" - PCT_CLOSED = "Percent closed" - PCT_POINTED = "Percent pointed" - - def test_sprint_start_and_sprint_end_not_affected_by_unit(self): - """Test that sprint start and end are the same regardless of unit.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_2), - issue(issue=2, sprint_start=DAY_1, created=DAY_2, closed=DAY_4), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - points = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.points, - project=1, - ) - issues = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.issues, - project=1, - ) - # validation - check they're calculated correctly - assert points.stats[self.SPRINT_START].value == DAY_1 - assert points.stats[self.SPRINT_END].value == DAY_3 - # validation - check that they are the same - # fmt: off - assert points.stats.get(self.SPRINT_START) == issues.stats.get(self.SPRINT_START) - assert points.stats.get(self.SPRINT_END) == issues.stats.get(self.SPRINT_END) - # fmt: on - - def test_get_total_closed_and_opened_when_unit_is_issues(self): - """Test that total_closed is calculated correctly when unit is issues.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint=1, created=DAY_0, closed=DAY_2), - issue(issue=2, sprint=1, created=DAY_0, closed=DAY_3), - issue(issue=3, sprint=1, created=DAY_2), # not closed - issue(issue=4, sprint=1, created=DAY_2), # not closed - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.issues, - project=1, - ) - print(output.results) - # validation - check that stats were calculated correctly - assert output.stats[self.TOTAL_CLOSED].value == 2 - assert output.stats[self.TOTAL_OPENED].value == 4 - assert output.stats[self.PCT_CLOSED].value == 50.0 - # validation - check that message contains string value of Unit.issues - assert Unit.issues.value in output.stats[self.TOTAL_CLOSED].suffix - assert Unit.issues.value in output.stats[self.TOTAL_OPENED].suffix - assert "%" in output.stats[self.PCT_CLOSED].suffix - - def test_get_total_closed_and_opened_when_unit_is_points(self): - """Test that total_closed is calculated correctly when unit is issues.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint=1, created=DAY_1, points=2, closed=DAY_2), - issue(issue=2, sprint=1, created=DAY_2, points=1, closed=DAY_4), - issue(issue=3, sprint=1, created=DAY_2, points=2), # not closed - issue(issue=4, sprint=1, created=DAY_2, points=4), # not closed - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.points, - project=1, - ) - # validation - assert output.stats[self.TOTAL_CLOSED].value == 3 - assert output.stats[self.TOTAL_OPENED].value == 9 - assert output.stats[self.PCT_CLOSED].value == 33.33 # rounded to 2 places - # validation - check that message contains string value of Unit.points - assert Unit.points.value in output.stats[self.TOTAL_CLOSED].suffix - assert Unit.points.value in output.stats[self.TOTAL_OPENED].suffix - assert "%" in output.stats[self.PCT_CLOSED].suffix - - def test_include_issues_closed_after_sprint_end(self): - """Issues that are closed after sprint ended should be included in closed count.""" - # setup - create test data - sprint_data = [ - issue( # closed during sprint - issue=1, - sprint_start=DAY_1, - sprint_length=2, - created=DAY_1, - closed=DAY_2, - ), - issue( # closed after sprint - issue=2, - sprint_start=DAY_1, - sprint_length=2, - created=DAY_2, - closed=DAY_4, - ), - issue( # not closed - issue=3, - sprint_start=DAY_1, - sprint_length=2, - created=DAY_2, - ), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.issues, - project=1, - ) - # validation - assert output.stats[self.TOTAL_CLOSED].value == 2 - assert output.stats[self.TOTAL_OPENED].value == 3 - assert output.stats[self.PCT_CLOSED].value == 66.67 # rounded to 2 places - - def test_get_percent_pointed(self): - """Test that percent pointed is calculated correctly.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint=1, created=DAY_1, points=2, closed=DAY_2), - issue(issue=2, sprint=1, created=DAY_2, points=1, closed=DAY_4), - issue(issue=3, sprint=1, created=DAY_2, points=None), # not pointed - issue(issue=4, sprint=1, created=DAY_2, points=0), # not closed - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.points, - project=1, - ) - # validation - assert output.stats[self.TOTAL_CLOSED].value == 3 - assert output.stats[self.TOTAL_OPENED].value == 3 - assert output.stats[self.PCT_CLOSED].value == 100 - assert output.stats[self.PCT_POINTED].value == 50 - # validation - check that stat contains '%' suffix - assert f"% of {Unit.issues.value}" in output.stats[self.PCT_POINTED].suffix - - def test_exclude_other_sprints_in_percent_pointed(self): - """Only include issues in this sprint when calculating percent pointed.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint=1, created=DAY_1, points=2, closed=DAY_2), - issue(issue=2, sprint=1, created=DAY_2, points=1, closed=DAY_4), - issue(issue=3, sprint=1, created=DAY_2, points=None), # not pointed - issue(issue=4, sprint=2, created=DAY_2, points=None), # other sprint - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.issues, - project=1, - ) - # validation - assert output.stats[self.TOTAL_CLOSED].value == 2 - assert output.stats[self.TOTAL_OPENED].value == 3 - assert output.stats[self.PCT_POINTED].value == 66.67 # exclude final row - - -class TestFormatSlackMessage: - """Test the DeliverablePercentComplete.format_slack_message().""" - - def test_slack_message_contains_right_number_of_lines(self): - """Message should contain one line for the title and one for each stat.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.points, - project=1, - ) - lines = output.format_slack_message().splitlines() - for line in lines: - print(line) - # validation - assert len(lines) == len(list(output.stats)) + 1 - - def test_title_includes_issues_when_unit_is_issue(self): - """Test that the title is formatted correctly when unit is issues.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.issues, - project=1, - ) - title = output.format_slack_message().splitlines()[0] - # validation - assert Unit.issues.value in title - - def test_title_includes_points_when_unit_is_points(self): - """Test that the title is formatted correctly when unit is points.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.points, - project=1, - ) - title = output.format_slack_message().splitlines()[0] - # validation - assert Unit.points.value in title - - -class TestPlotResults: - """Test the SprintBurndown.show_results() method.""" - - def test_plot_results_output_stored_in_chart_property(self): - """SprintBurndown.chart should contain the output of plot_results().""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.points, - project=1, - ) - # validation - check that the chart attribute matches output of plot_results() - assert output.chart == output.plot_results() - - -class TestExportMethods: - """Test the export methods method for SprintBurndown.""" - - @pytest.mark.parametrize( - ("method", "file_name"), - [ - ("export_results", "RESULTS_CSV"), - ("export_dataset", "DATASET_CSV"), - ("export_chart_to_html", "CHART_HTML"), - ("export_chart_to_png", "CHART_PNG"), - ], - ) - def test_export_results_to_correct_file_path( - self, - method: str, - file_name: str, - tmp_path: Path, - sample_burndown: SprintBurndown, - ): - """The file should be exported to the correct location.""" - # setup - check that file doesn't exist at output location - file_name = getattr(sample_burndown, file_name) - expected_path = tmp_path / file_name - assert expected_path.parent.exists() is True - assert expected_path.exists() is False - # execution - func = getattr(sample_burndown, method) - output = func(output_dir=expected_path.parent) - # validation - check that output path matches expected and file exists - assert output == expected_path - assert expected_path.exists() - - @pytest.mark.parametrize( - ("method", "file_name"), - [ - ("export_results", "RESULTS_CSV"), - ("export_dataset", "DATASET_CSV"), - ("export_chart_to_html", "CHART_HTML"), - ("export_chart_to_png", "CHART_PNG"), - ], - ) - def test_create_parent_dir_if_it_does_not_exists( - self, - method: str, - file_name: str, - tmp_path: Path, - sample_burndown: SprintBurndown, - ): - """The parent directory should be created if it doesn't already exist.""" - # setup - check that file and parent directory don't exist - file_name = getattr(sample_burndown, file_name) - expected_path = tmp_path / "new_folder" / file_name - assert expected_path.parent.exists() is False # doesn't yet exist - assert expected_path.exists() is False - # execution - func = getattr(sample_burndown, method) - output = func(output_dir=expected_path.parent) - # validation - check that output path matches expected and file exists - assert output == expected_path - assert expected_path.exists() - - -def test_post_to_slack( - mock_slackbot: MockSlackbot, - tmp_path: Path, - sample_burndown: SprintBurndown, -): - """Test the steps required to post the results to slack, without actually posting.""" - # execution - sample_burndown.post_results_to_slack( - mock_slackbot, # type: ignore[assignment] - channel_id="test_channel", - output_dir=tmp_path, - ) - # validation - check that output files exist - for output in ["RESULTS_CSV", "DATASET_CSV", "CHART_PNG", "CHART_HTML"]: - output_path = tmp_path / getattr(sample_burndown, output) - assert output_path.exists() is True diff --git a/analytics/tests/metrics/test_burnup.py b/analytics/tests/metrics/test_burnup.py deleted file mode 100644 index df5f653f6..000000000 --- a/analytics/tests/metrics/test_burnup.py +++ /dev/null @@ -1,738 +0,0 @@ -"""Test the analytics.metrics.burnup module.""" - -from pathlib import Path - -import pandas as pd -import pytest -from analytics.datasets.issues import GitHubIssues -from analytics.metrics.burnup import SprintBurnup, Unit - -from tests.conftest import ( - DAY_0, - DAY_1, - DAY_2, - DAY_3, - DAY_4, - MockSlackbot, - issue, -) - - -def result_row( - day: str, - opened: int, - closed: int, - delta: int, - total_open: int, - total_closed: int, -) -> dict: - """Create a sample result row.""" - return { - "date": pd.Timestamp(day), - "opened": opened, - "closed": closed, - "delta": delta, - "total_open": total_open, - "total_closed": total_closed, - } - - -@pytest.fixture(name="sample_burnup", scope="module") -def sample_burnup_by_points_fixture() -> SprintBurnup: - """Create a sample burnup to simplify test setup.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # return sprint burnup by points - return SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.points) - - -class TestSprintBurnupByTasks: - """Test the SprintBurnup class with unit='tasks'.""" - - def test_exclude_tix_assigned_to_other_sprints(self): - """The burnup should exclude tickets that are assigned to other sprints.""" - # setup - create test data - sprint_data = [ - # fmt: off - # include this row - assigned to sprint 1 - issue(issue=1, sprint=1, sprint_start=DAY_1, created=DAY_1, closed=DAY_3), - # exclude this row - assigned to sprint 2 - issue(issue=1, sprint=2, sprint_start=DAY_4, created=DAY_0, closed=DAY_4), - # fmt: on - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.issues) - df = output.results - # validation - check min and max dates - assert df[output.date_col].min() == pd.Timestamp(DAY_1) - assert df[output.date_col].max() == pd.Timestamp(DAY_3) - # validation - check burnup output - expected = [ - result_row( - day=DAY_1, - opened=1, - closed=0, - delta=1, - total_open=1, - total_closed=0, - ), - result_row( - day=DAY_2, - opened=0, - closed=0, - delta=0, - total_open=1, - total_closed=0, - ), - result_row( - day=DAY_3, - opened=0, - closed=1, - delta=-1, - total_open=0, - total_closed=1, - ), - ] - assert df.to_dict("records") == expected - - def test_count_tix_created_before_sprint_start(self): - """Burnup should include tix opened before the sprint but closed during it.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_2), - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.issues) - df = output.results - # validation - check min and max dates - assert df[output.date_col].min() == pd.Timestamp(DAY_0) - assert df[output.date_col].max() == pd.Timestamp(DAY_3) - # validation - check burnup output - expected = [ - result_row( - day=DAY_0, - opened=2, - closed=0, - delta=2, - total_open=2, - total_closed=0, - ), - result_row( - day=DAY_1, - opened=0, - closed=0, - delta=0, - total_open=2, - total_closed=0, - ), - result_row( - day=DAY_2, - opened=0, - closed=1, - delta=-1, - total_open=1, - total_closed=1, - ), - result_row( - day=DAY_3, - opened=0, - closed=1, - delta=-1, - total_open=0, - total_closed=2, - ), - ] - assert df.to_dict("records") == expected - - def test_count_tix_closed_after_sprint_start(self): - """Burnup should include tix closed after the sprint ended.""" - # setup - create test data - sprint_data = [ - issue( # closed before sprint end - issue=1, - sprint_start=DAY_1, - sprint_length=2, - created=DAY_1, - closed=DAY_2, - ), - issue( # closed after sprint end - issue=1, - sprint_start=DAY_1, - sprint_length=2, - created=DAY_1, - closed=DAY_4, - ), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.issues) - df = output.results - # validation - check min and max dates - assert df[output.date_col].min() == pd.Timestamp(DAY_1) - assert df[output.date_col].max() == pd.Timestamp(DAY_4) - # validation - check burnup output - expected = [ - result_row( - day=DAY_1, - opened=2, - closed=0, - delta=2, - total_open=2, - total_closed=0, - ), - result_row( - day=DAY_2, - opened=0, - closed=1, - delta=-1, - total_open=1, - total_closed=1, - ), - result_row( - day=DAY_3, - opened=0, - closed=0, - delta=0, - total_open=1, - total_closed=1, - ), - result_row( - day=DAY_4, - opened=0, - closed=1, - delta=-1, - total_open=0, - total_closed=2, - ), - ] - assert df.to_dict("records") == expected - - def test_count_tix_created_after_sprint_start(self): - """Burnup should include tix opened and closed during the sprint.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, closed=DAY_3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.issues) - df = output.results - # validation - check burnup output - expected = [ - result_row( - day=DAY_0, - opened=1, - closed=0, - delta=1, - total_open=1, - total_closed=0, - ), - result_row( - day=DAY_1, - opened=0, - closed=0, - delta=0, - total_open=1, - total_closed=0, - ), - result_row( - day=DAY_2, - opened=1, - closed=1, - delta=0, - total_open=1, - total_closed=1, - ), - result_row( - day=DAY_3, - opened=0, - closed=1, - delta=-1, - total_open=0, - total_closed=2, - ), - ] - assert df.to_dict("records") == expected - - def test_include_all_sprint_days_if_tix_closed_early(self): - """All days of the sprint should be included even if all tix were closed early.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_1), - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_1), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.issues) - df = output.results - # validation - check max date is end of sprint not last closed date - assert df[output.date_col].max() == pd.Timestamp(DAY_3) - - def test_raise_value_error_if_sprint_arg_not_in_dataset(self): - """A ValueError should be raised if the sprint argument isn't valid.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_1), - issue(issue=1, sprint_start=DAY_1, created=DAY_0), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # validation - with pytest.raises( - ValueError, - match="Sprint value doesn't match one of the available sprints", - ): - SprintBurnup(test_data, sprint="Fake sprint", unit=Unit.issues) - - def test_calculate_burnup_for_current_sprint(self): - """Use the current sprint if the date falls in the middle of a sprint.""" - # setup - create test data - today = pd.Timestamp.today().floor("d") - day_1 = (today + pd.Timedelta(days=-1)).strftime("%Y-%m-%d") - day_2 = today.strftime("%Y-%m-%d") - day_3 = (today + pd.Timedelta(days=1)).strftime("%Y-%m-%d") - sprint_data = [ # note sprint duration is 2 days by default - issue(issue=1, sprint_start=day_1, created=day_1, closed=day_2), - issue(issue=1, sprint_start=day_1, created=day_1), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="@current", unit=Unit.issues) - df = output.results - # validation - check burnup output - expected = [ - result_row( - day=day_1, - opened=2, - closed=0, - delta=2, - total_open=2, - total_closed=0, - ), - result_row( - day=day_2, - opened=0, - closed=1, - delta=-1, - total_open=1, - total_closed=1, - ), - result_row( - day=day_3, - opened=0, - closed=0, - delta=0, - total_open=1, - total_closed=1, - ), - ] - assert df.to_dict("records") == expected - - -class TestSprintBurnupByPoints: - """Test the SprintBurnup class with unit='points'.""" - - def test_burnup_works_with_points(self): - """Burnup should be calculated correctly with points.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.points) - df = output.results - # validation - expected = [ - result_row( - day=DAY_0, - opened=2, - closed=0, - delta=2, - total_open=2, - total_closed=0, - ), - result_row( - day=DAY_1, - opened=0, - closed=0, - delta=0, - total_open=2, - total_closed=0, - ), - result_row( - day=DAY_2, - opened=3, - closed=0, - delta=3, - total_open=5, - total_closed=0, - ), - result_row( - day=DAY_3, - opened=0, - closed=0, - delta=0, - total_open=5, - total_closed=0, - ), - ] - assert df.to_dict("records") == expected - - def test_burnup_excludes_tix_without_points(self): - """Burnup should exclude tickets that are not pointed.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_1, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=0), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=None), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.points) - df = output.results - # validation - expected = [ - result_row( - day=DAY_1, - opened=2, - closed=0, - delta=2, - total_open=2, - total_closed=0, - ), - result_row( - day=DAY_2, - opened=0, - closed=0, - delta=0, - total_open=2, - total_closed=0, - ), - result_row( - day=DAY_3, - opened=0, - closed=0, - delta=0, - total_open=2, - total_closed=0, - ), - ] - assert df.to_dict("records") == expected - - -class TestGetStats: - """Test the SprintBurnup.get_stats() method.""" - - SPRINT_START = "Sprint start date" - SPRINT_END = "Sprint end date" - TOTAL_OPENED = "Total opened" - TOTAL_CLOSED = "Total closed" - PCT_CLOSED = "Percent closed" - PCT_POINTED = "Percent pointed" - - def test_sprint_start_and_sprint_end_not_affected_by_unit(self): - """Test that sprint start and end are the same regardless of unit.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_2), - issue(issue=2, sprint_start=DAY_1, created=DAY_2, closed=DAY_4), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - points = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.points) - issues = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.issues) - # validation - check they're calculated correctly - assert points.stats[self.SPRINT_START].value == DAY_1 - assert points.stats[self.SPRINT_END].value == DAY_3 - # validation - check that they are the same - # fmt: off - assert points.stats.get(self.SPRINT_START) == issues.stats.get(self.SPRINT_START) - assert points.stats.get(self.SPRINT_END) == issues.stats.get(self.SPRINT_END) - # fmt: on - - def test_get_total_closed_and_opened_when_unit_is_issues(self): - """Test that total_closed is calculated correctly when unit is issues.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint=1, created=DAY_0, closed=DAY_2), - issue(issue=2, sprint=1, created=DAY_0, closed=DAY_3), - issue(issue=3, sprint=1, created=DAY_2), # not closed - issue(issue=4, sprint=1, created=DAY_2), # not closed - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.issues) - print(output.results) - # validation - check that stats were calculated correctly - assert output.stats[self.TOTAL_CLOSED].value == 2 - assert output.stats[self.TOTAL_OPENED].value == 4 - assert output.stats[self.PCT_CLOSED].value == 50.0 - # validation - check that message contains string value of Unit.issues - assert Unit.issues.value in output.stats[self.TOTAL_CLOSED].suffix - assert Unit.issues.value in output.stats[self.TOTAL_OPENED].suffix - assert "%" in output.stats[self.PCT_CLOSED].suffix - - def test_get_total_closed_and_opened_when_unit_is_points(self): - """Test that total_closed is calculated correctly when unit is issues.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint=1, created=DAY_1, points=2, closed=DAY_2), - issue(issue=2, sprint=1, created=DAY_2, points=1, closed=DAY_4), - issue(issue=3, sprint=1, created=DAY_2, points=2), # not closed - issue(issue=4, sprint=1, created=DAY_2, points=4), # not closed - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.points) - # validation - assert output.stats[self.TOTAL_CLOSED].value == 3 - assert output.stats[self.TOTAL_OPENED].value == 9 - assert output.stats[self.PCT_CLOSED].value == 33.33 # rounded to 2 places - # validation - check that message contains string value of Unit.points - assert Unit.points.value in output.stats[self.TOTAL_CLOSED].suffix - assert Unit.points.value in output.stats[self.TOTAL_OPENED].suffix - assert "%" in output.stats[self.PCT_CLOSED].suffix - - def test_include_issues_closed_after_sprint_end(self): - """Issues that are closed after sprint ended should be included in closed count.""" - # setup - create test data - sprint_data = [ - issue( # closed during sprint - issue=1, - sprint_start=DAY_1, - sprint_length=2, - created=DAY_1, - closed=DAY_2, - ), - issue( # closed after sprint - issue=2, - sprint_start=DAY_1, - sprint_length=2, - created=DAY_2, - closed=DAY_4, - ), - issue( # not closed - issue=3, - sprint_start=DAY_1, - sprint_length=2, - created=DAY_2, - ), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.issues) - # validation - assert output.stats[self.TOTAL_CLOSED].value == 2 - assert output.stats[self.TOTAL_OPENED].value == 3 - assert output.stats[self.PCT_CLOSED].value == 66.67 # rounded to 2 places - - def test_get_percent_pointed(self): - """Test that percent pointed is calculated correctly.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint=1, created=DAY_1, points=2, closed=DAY_2), - issue(issue=2, sprint=1, created=DAY_2, points=1, closed=DAY_4), - issue(issue=3, sprint=1, created=DAY_2, points=None), # not pointed - issue(issue=4, sprint=1, created=DAY_2, points=0), # not closed - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.points) - # validation - assert output.stats[self.TOTAL_CLOSED].value == 3 - assert output.stats[self.TOTAL_OPENED].value == 3 - assert output.stats[self.PCT_CLOSED].value == 100 - assert output.stats[self.PCT_POINTED].value == 50 - # validation - check that stat contains '%' suffix - assert f"% of {Unit.issues.value}" in output.stats[self.PCT_POINTED].suffix - - def test_exclude_other_sprints_in_percent_pointed(self): - """Only include issues in this sprint when calculating percent pointed.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint=1, created=DAY_1, points=2, closed=DAY_2), - issue(issue=2, sprint=1, created=DAY_2, points=1, closed=DAY_4), - issue(issue=3, sprint=1, created=DAY_2, points=None), # not pointed - issue(issue=4, sprint=2, created=DAY_2, points=None), # other sprint - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.issues) - # validation - assert output.stats[self.TOTAL_CLOSED].value == 2 - assert output.stats[self.TOTAL_OPENED].value == 3 - assert output.stats[self.PCT_POINTED].value == 66.67 # exclude final row - - -class TestFormatSlackMessage: - """Test the DeliverablePercentComplete.format_slack_message().""" - - def test_slack_message_contains_right_number_of_lines(self): - """Message should contain one line for the title and one for each stat.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.points) - lines = output.format_slack_message().splitlines() - for line in lines: - print(line) - # validation - assert len(lines) == len(list(output.stats)) + 1 - - def test_title_includes_issues_when_unit_is_issue(self): - """Test that the title is formatted correctly when unit is issues.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.issues) - title = output.format_slack_message().splitlines()[0] - # validation - assert Unit.issues.value in title - - def test_title_includes_points_when_unit_is_points(self): - """Test that the title is formatted correctly when unit is points.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.points) - title = output.format_slack_message().splitlines()[0] - # validation - assert Unit.points.value in title - - -class TestPlotResults: - """Test the SprintBurnup.show_results() method.""" - - def test_plot_results_output_stored_in_chart_property(self): - """SprintBurnup.chart should contain the output of plot_results().""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.points) - # validation - check that the chart attribute matches output of plot_results() - assert output.chart == output.plot_results() - - -class TestExportMethods: - """Test the export methods method for SprintBurnup.""" - - @pytest.mark.parametrize( - ("method", "file_name"), - [ - ("export_results", "RESULTS_CSV"), - ("export_dataset", "DATASET_CSV"), - ("export_chart_to_html", "CHART_HTML"), - ("export_chart_to_png", "CHART_PNG"), - ], - ) - def test_export_results_to_correct_file_path( - self, - method: str, - file_name: str, - tmp_path: Path, - sample_burnup: SprintBurnup, - ): - """The file should be exported to the correct location.""" - # setup - check that file doesn't exist at output location - file_name = getattr(sample_burnup, file_name) - expected_path = tmp_path / file_name - assert expected_path.parent.exists() is True - assert expected_path.exists() is False - # execution - func = getattr(sample_burnup, method) - output = func(output_dir=expected_path.parent) - # validation - check that output path matches expected and file exists - assert output == expected_path - assert expected_path.exists() - - @pytest.mark.parametrize( - ("method", "file_name"), - [ - ("export_results", "RESULTS_CSV"), - ("export_dataset", "DATASET_CSV"), - ("export_chart_to_html", "CHART_HTML"), - ("export_chart_to_png", "CHART_PNG"), - ], - ) - def test_create_parent_dir_if_it_does_not_exists( - self, - method: str, - file_name: str, - tmp_path: Path, - sample_burnup: SprintBurnup, - ): - """The parent directory should be created if it doesn't already exist.""" - # setup - check that file and parent directory don't exist - file_name = getattr(sample_burnup, file_name) - expected_path = tmp_path / "new_folder" / file_name - assert expected_path.parent.exists() is False # doesn't yet exist - assert expected_path.exists() is False - # execution - func = getattr(sample_burnup, method) - output = func(output_dir=expected_path.parent) - # validation - check that output path matches expected and file exists - assert output == expected_path - assert expected_path.exists() - - -def test_post_to_slack( - mock_slackbot: MockSlackbot, - tmp_path: Path, - sample_burnup: SprintBurnup, -): - """Test the steps required to post the results to slack, without actually posting.""" - # execution - sample_burnup.post_results_to_slack( - mock_slackbot, # type: ignore[assignment] - channel_id="test_channel", - output_dir=tmp_path, - ) - # validation - check that output files exist - for output in ["RESULTS_CSV", "DATASET_CSV", "CHART_PNG", "CHART_HTML"]: - output_path = tmp_path / getattr(sample_burnup, output) - assert output_path.exists() is True diff --git a/analytics/tests/metrics/test_percent_complete.py b/analytics/tests/metrics/test_percent_complete.py deleted file mode 100644 index 5a819706e..000000000 --- a/analytics/tests/metrics/test_percent_complete.py +++ /dev/null @@ -1,434 +0,0 @@ -"""Tests for analytics/datasets/percent_complete.py.""" - -from pathlib import Path # noqa: I001 - -import pytest - -from analytics.datasets.issues import GitHubIssues, IssueMetadata, IssueType -from analytics.metrics.percent_complete import DeliverablePercentComplete, Unit -from tests.conftest import MockSlackbot, DAY_0, DAY_1 - - -def task_row( - deliverable: int, - task: int | None, - deliverable_status: str | None = "In Progress", - points: int | None = 1, - status: str | None = "open", -) -> dict: - """Create a sample row of the DeliverableTasks dataset.""" - issue = IssueMetadata( - project_owner="HHS", - project_number=1, - issue_title=f"Task {task}", - issue_url=f"task{task}", - issue_type=IssueType.TASK.value, - issue_parent=None, - issue_points=points, - issue_is_closed=status == "closed", - issue_opened_at=DAY_0, - issue_closed_at=DAY_1 if status == "closed" else None, - deliverable_title=f"Deliverable {deliverable}", - deliverable_status=deliverable_status, - ) - return issue.model_dump() - - -@pytest.fixture(name="percent_complete", scope="module") -def sample_percent_complete() -> DeliverablePercentComplete: - """Create a sample burndown to simplify test setup.""" - # setup - create test data - test_rows = [ - task_row(deliverable=1, task=1, status="open"), - task_row(deliverable=1, task=2, status="closed"), - task_row(deliverable=2, task=3, status="open"), - ] - test_data = GitHubIssues.from_dict(test_rows) - # return sprint burndown by points - return DeliverablePercentComplete(test_data, unit=Unit.points) - - -class TestDeliverablePercentComplete: - """Test the DeliverablePercentComplete metric.""" - - def test_percent_complete_based_on_task_count(self): - """Check that percent completion is correct when tasks are the unit.""" - # setup - create test dataset - test_rows = [ - task_row(deliverable=1, task=1, status="open"), - task_row(deliverable=1, task=2, status="closed"), - task_row(deliverable=2, task=3, status="open"), - ] - test_data = GitHubIssues.from_dict(test_rows) - # execution - df = DeliverablePercentComplete(test_data, unit=Unit.issues).results - df = df.set_index("deliverable_title") - # validation - check number of rows returned - assert len(df) == 2 - # validation - check totals - assert df.loc["Deliverable 1", "total"] == 2 - assert df.loc["Deliverable 2", "total"] == 1 - # validation - check open - assert df.loc["Deliverable 1", "open"] == 1 - assert df.loc["Deliverable 2", "open"] == 1 - # validation - check closed - assert df.loc["Deliverable 1", "closed"] == 1 - assert df.loc["Deliverable 2", "closed"] == 0 - # validation - check percent complete - assert df.loc["Deliverable 1", "percent_complete"] == 0.5 - assert df.loc["Deliverable 2", "percent_complete"] == 0.0 - - def test_percent_complete_based_on_points(self): - """Check that percent completion is correct when points are the unit.""" - # setup - create test dataset - test_rows = [ - task_row(deliverable=1, task=1, points=1, status="open"), - task_row(deliverable=1, task=2, points=3, status="closed"), - task_row(deliverable=2, task=3, points=5, status="open"), - ] - test_data = GitHubIssues.from_dict(test_rows) - # execution - df = DeliverablePercentComplete(test_data, unit=Unit.points).results - df = df.set_index("deliverable_title") - # validation - check number of rows returned - assert len(df) == 2 - # validation - check totals - assert df.loc["Deliverable 1", "total"] == 4 - assert df.loc["Deliverable 2", "total"] == 5 - # validation - check open - assert df.loc["Deliverable 1", "open"] == 1 - assert df.loc["Deliverable 2", "open"] == 5 - # validation - check closed - assert df.loc["Deliverable 1", "closed"] == 3 - assert df.loc["Deliverable 2", "closed"] == 0 - # validation - check percent complete - assert df.loc["Deliverable 1", "percent_complete"] == 0.75 - assert df.loc["Deliverable 2", "percent_complete"] == 0.0 - - def test_show_0_pct_for_deliverables_without_tasks(self): - """Deliverables without tasks should show 0% complete instead of throwing an error.""" - # setup - create test dataset where deliverable 2 has no tasks - test_rows = [ - task_row(deliverable=1, task=2, status="closed"), - task_row(deliverable=2, task=None, status=None), - ] - test_data = GitHubIssues.from_dict(test_rows) - # execution - use tasks as the unit - df = DeliverablePercentComplete(test_data, unit=Unit.issues).results - df = df.set_index("deliverable_title") - # validation - check number of rows returned - assert len(df) == 2 - # validation - check totals - assert df.loc["Deliverable 1", "total"] == 1 - assert df.loc["Deliverable 2", "total"] == 1 - # validation - check open - assert df.loc["Deliverable 1", "open"] == 0 - assert df.loc["Deliverable 2", "open"] == 1 - # validation - check closed - assert df.loc["Deliverable 1", "closed"] == 1 - assert df.loc["Deliverable 2", "closed"] == 0 - # validation - check percent complete - assert df.loc["Deliverable 1", "percent_complete"] == 1.0 - assert df.loc["Deliverable 2", "percent_complete"] == 0.0 - - def test_show_0_pct_for_deliverables_without_points(self): - """Deliverables without points should show 0% complete instead of throwing an error.""" - # setup - create test dataset where deliverable 2 has no points - test_rows = [ - task_row(deliverable=1, task=2, points=2, status="closed"), - task_row(deliverable=2, task=None, points=None, status=None), - ] - test_data = GitHubIssues.from_dict(test_rows) - # execution - use points as the unit - df = DeliverablePercentComplete(test_data, unit=Unit.points).results - df = df.set_index("deliverable_title") - # validation - check number of rows returned - assert len(df) == 2 - # validation - check totals - assert df.loc["Deliverable 1", "total"] == 2 - assert df.loc["Deliverable 2", "total"] == 0 - # validation - check open - assert df.loc["Deliverable 1", "open"] == 0 - assert df.loc["Deliverable 2", "open"] == 0 - # validation - check closed - assert df.loc["Deliverable 1", "closed"] == 2 - assert df.loc["Deliverable 2", "closed"] == 0 - # validation - check percent complete - assert df.loc["Deliverable 1", "percent_complete"] == 1.0 - assert df.loc["Deliverable 2", "percent_complete"] == 0.0 - - -class TestFilteringReportByDeliverableStatus: - """Test the metric when we limit the set of deliverable statuses to include.""" - - TEST_ROWS = [ - task_row(deliverable=1, task=1, status="closed", deliverable_status="Done"), - task_row(deliverable=2, task=2, status="closed", deliverable_status="Open"), - task_row(deliverable=2, task=3, status="open", deliverable_status="Open"), - ] - - def test_filter_out_deliverables_with_excluded_status(self): - """The results should exclude deliverables with a status that wasn't passed.""" - # setup - create test dataset - test_data = GitHubIssues.from_dict(self.TEST_ROWS) - # execution - df = DeliverablePercentComplete( - test_data, - unit=Unit.issues, - statuses_to_include=["Open"], - ).results - df = df.set_index("deliverable_title") - # validation - assert len(df) == 1 - assert "Deliverable 1" not in df.index # confirm deliverable 1 was dropped - assert df.loc["Deliverable 2", "percent_complete"] == 0.5 - - def test_invert_statuses_selected(self): - """We should filter out the other deliverable if invert statuses selected.""" - # setup - create test dataset - test_data = GitHubIssues.from_dict(self.TEST_ROWS) - # execution - df = DeliverablePercentComplete( - test_data, - unit=Unit.issues, - statuses_to_include=["Done"], # changed - ).results - df = df.set_index("deliverable_title") - # validation - assert len(df) == 1 - assert "Deliverable 2" not in df.index # confirm deliverable 2 was dropped - assert df.loc["Deliverable 1", "percent_complete"] == 1 - - def test_list_selected_statuses_in_slack_message(self): - """If we filter on status, those statuses should be listed in the slack message.""" - # setup - create test dataset - test_data = GitHubIssues.from_dict(self.TEST_ROWS) - # execution - metric = DeliverablePercentComplete( - test_data, - unit=Unit.issues, - statuses_to_include=["Open"], - ) - output = metric.format_slack_message() - # validation - expected = "Limited to deliverables with these statuses: Open" - assert expected in output - - def test_stats_also_filter_out_deliverables_with_excluded_status(self): - """Filtered deliverables should also be excluded from get_stats().""" - # setup - create test dataset - test_data = GitHubIssues.from_dict(self.TEST_ROWS) - # execution - metric = DeliverablePercentComplete( - test_data, - unit=Unit.issues, - statuses_to_include=["Open"], # exclude deliverable 1 - ) - output = metric.get_stats() - # validation - assert len(output) == 1 - assert output.get("Deliverable 1") is None - - -class TestGetStats: - """Test the DeliverablePercentComplete.get_stats() method.""" - - def test_all_issues_are_pointed(self): - """Test that stats show 100% of issues are pointed if all have points.""" - # setup - create test dataset - test_rows = [ - task_row(deliverable=1, task=1, points=2, status="open"), - task_row(deliverable=1, task=2, points=1, status="closed"), - task_row(deliverable=2, task=3, points=3, status="open"), - task_row(deliverable=2, task=3, points=1, status="open"), - ] - test_data = GitHubIssues.from_dict(test_rows) - # execution - output = DeliverablePercentComplete(test_data, unit=Unit.issues) - # validation - assert len(output.stats) == 2 - for deliverable in ["Deliverable 1", "Deliverable 2"]: - stat = output.stats.get(deliverable) - assert stat is not None - assert stat.value == 100 - assert stat.suffix == f"% of {Unit.issues.value} pointed" - - def test_some_issues_are_not_pointed(self): - """Test that stats are calculated correctly if not all issues are pointed.""" - # setup - create test dataset - test_rows = [ - task_row(deliverable=1, task=1, points=2, status="open"), - task_row(deliverable=1, task=2, points=0, status="closed"), - task_row(deliverable=2, task=3, points=3, status="open"), - task_row(deliverable=2, task=3, points=None, status="open"), - ] - test_data = GitHubIssues.from_dict(test_rows) - # execution - output = DeliverablePercentComplete(test_data, unit=Unit.issues) - # validation - assert len(output.stats) == 2 - for deliverable in ["Deliverable 1", "Deliverable 2"]: - stat = output.stats.get(deliverable) - assert stat is not None - assert stat.value == 50 - assert stat.suffix == f"% of {Unit.issues.value} pointed" - - def test_deliverables_without_tasks_have_0_pct_pointed(self): - """Deliverables without tasks should have 0% pointed in stats.""" - # setup - create test dataset - test_rows = [ - task_row(deliverable=1, task=1, points=2, status="open"), - task_row(deliverable=1, task=2, points=1, status="closed"), - task_row(deliverable=2, task=None, points=None, status=None), - ] - test_data = GitHubIssues.from_dict(test_rows) - # execution - output = DeliverablePercentComplete(test_data, unit=Unit.issues) - # validation - assert len(output.stats) == 2 - assert output.stats["Deliverable 1"].value == 100 - assert output.stats["Deliverable 2"].value == 0 - - -class TestFormatSlackMessage: - """Test the DeliverablePercentComplete.format_slack_message().""" - - def test_slack_message_contains_right_number_of_lines(self): - """Message should contain one line for the title and one for each deliverable.""" - # setup - create test dataset - test_rows = [ - task_row(deliverable=1, task=1, points=2, status="open"), - task_row(deliverable=2, task=2, points=1, status="closed"), - task_row(deliverable=3, task=3, points=3, status="open"), - ] - test_data = GitHubIssues.from_dict(test_rows) - # execution - output = DeliverablePercentComplete(test_data, unit=Unit.issues) - lines = output.format_slack_message().splitlines() - # validation - assert len(lines) == 4 - - def test_title_includes_issues_when_unit_is_issue(self): - """Test that the title is formatted correctly when unit is issues.""" - # setup - create test dataset - test_rows = [ - task_row(deliverable=1, task=1, points=2, status="open"), - task_row(deliverable=2, task=2, points=1, status=None), - ] - test_data = GitHubIssues.from_dict(test_rows) - # execution - output = DeliverablePercentComplete(test_data, unit=Unit.issues) - title = output.format_slack_message().splitlines()[0] - # validation - assert Unit.issues.value in title - - def test_title_includes_points_when_unit_is_points(self): - """Test that the title is formatted correctly when unit is points.""" - # setup - create test dataset - test_rows = [ - task_row(deliverable=1, task=1, points=2, status="open"), - task_row(deliverable=2, task=2, points=1, status=None), - ] - test_data = GitHubIssues.from_dict(test_rows) - # execution - output = DeliverablePercentComplete(test_data, unit=Unit.points) - title = output.format_slack_message().splitlines()[0] - # validation - assert Unit.points.value in title - - -class TestPlotResults: - """Test the DeliverablePercentComplete.plot_results() method.""" - - def test_plot_results_output_stored_in_chart_property(self): - """SprintBurndown.chart should contain the output of plot_results().""" - # setup - create test dataset - test_rows = [ - task_row(deliverable=1, task=1, points=2, status="open"), - task_row(deliverable=1, task=2, points=0, status="closed"), - task_row(deliverable=2, task=3, points=3, status="open"), - task_row(deliverable=2, task=3, points=None, status="open"), - ] - test_data = GitHubIssues.from_dict(test_rows) - # execution - output = DeliverablePercentComplete(test_data, unit=Unit.issues) - # validation - check that the chart attribute matches output of plot_results() - assert output.chart == output.plot_results() - - -class TestExportMethods: - """Test the export methods method for SprintBurndown.""" - - @pytest.mark.parametrize( - ("method", "file_name"), - [ - ("export_results", "RESULTS_CSV"), - ("export_chart_to_html", "CHART_HTML"), - ("export_chart_to_png", "CHART_PNG"), - ], - ) - def test_export_results_to_correct_file_path( - self, - method: str, - file_name: str, - tmp_path: Path, - percent_complete: DeliverablePercentComplete, - ): - """The file should be exported to the correct location.""" - # setup - check that file doesn't exist at output location - file_name = getattr(percent_complete, file_name) - expected_path = tmp_path / file_name - assert expected_path.parent.exists() is True - assert expected_path.exists() is False - # execution - func = getattr(percent_complete, method) - output = func(output_dir=expected_path.parent) - # validation - check that output path matches expected and file exists - assert output == expected_path - assert expected_path.exists() - - @pytest.mark.parametrize( - ("method", "file_name"), - [ - ("export_results", "RESULTS_CSV"), - ("export_chart_to_html", "CHART_HTML"), - ("export_chart_to_png", "CHART_PNG"), - ], - ) - def test_create_parent_dir_if_it_does_not_exists( - self, - method: str, - file_name: str, - tmp_path: Path, - percent_complete: DeliverablePercentComplete, - ): - """The parent directory should be created if it doesn't already exist.""" - # setup - check that file and parent directory don't exist - file_name = getattr(percent_complete, file_name) - expected_path = tmp_path / "new_folder" / file_name - assert expected_path.parent.exists() is False # doesn't yet exist - assert expected_path.exists() is False - # execution - func = getattr(percent_complete, method) - output = func(output_dir=expected_path.parent) - # validation - check that output path matches expected and file exists - assert output == expected_path - assert expected_path.exists() - - -def test_post_to_slack( - mock_slackbot: MockSlackbot, - tmp_path: Path, - percent_complete: DeliverablePercentComplete, -): - """Test the steps required to post the results to slack, without actually posting.""" - # execution - percent_complete.post_results_to_slack( - mock_slackbot, # type: ignore noqa: PGH003 - channel_id="test_channel", - output_dir=tmp_path, - ) - # validation - check that output files exist - for output in ["RESULTS_CSV", "CHART_PNG", "CHART_HTML"]: - output_path = tmp_path / getattr(percent_complete, output) - assert output_path.exists() is True diff --git a/analytics/tests/test_cli.py b/analytics/tests/test_cli.py index 28f299bf8..863739a69 100644 --- a/analytics/tests/test_cli.py +++ b/analytics/tests/test_cli.py @@ -58,222 +58,6 @@ def test_file_fixtures(tmp_path: Path) -> MockFiles: ) -class TestCalculateSprintBurndown: - """Test the calculate_sprint_burndown entry point with mock data.""" - - def test_without_showing_or_posting_results(self, mock_files: MockFiles): - """Entrypoint should run successfully but not print slack message to stdout.""" - # setup - create command - command = [ - "calculate", - "sprint_burndown", - "--issue-file", - str(mock_files.delivery_file), - "--sprint", - "Sprint 1", - "--project", - "1", - ] - # execution - result = runner.invoke(app, command) - print(result.stdout) - # validation - check there wasn't an error - assert result.exit_code == 0 - assert "Slack message" not in result.stdout - - def test_stdout_message_includes_points_if_no_unit_is_set( - self, - mock_files: MockFiles, - ): - """CLI should uses 'points' as default unit and include it in stdout message.""" - # setup - create command - command = [ - "calculate", - "sprint_burndown", - "--issue-file", - str(mock_files.delivery_file), - "--sprint", - "Sprint 1", - "--project", - "1", - "--show-results", - ] - # execution - result = runner.invoke(app, command) - print(result.stdout) - # validation - check there wasn't an error - assert result.exit_code == 0 - # validation - check that slack message is printed and includes 'points' - assert "Slack message" in result.stdout - assert "points" in result.stdout - - def test_stdout_message_includes_issues_if_unit_set_to_issues( - self, - mock_files: MockFiles, - ): - """CLI should use issues if set explicitly and include it in stdout.""" - # setup - create command - command = [ - "calculate", - "sprint_burndown", - "--issue-file", - str(mock_files.delivery_file), - "--sprint", - "Sprint 1", - "--project", - "1", - "--unit", - "issues", - "--show-results", - ] - # execution - result = runner.invoke(app, command) - print(result.stdout) - # validation - check there wasn't an error - assert result.exit_code == 0 - # validation - check that slack message is printed and includes 'points' - assert "Slack message" in result.stdout - assert "issues" in result.stdout - - -class TestCalculateSprintBurnup: - """Test the calculate_sprint_burnup entry point with mock data.""" - - def test_without_showing_or_posting_results(self, mock_files: MockFiles): - """Entrypoint should run successfully but not print slack message to stdout.""" - # setup - create command - command = [ - "calculate", - "sprint_burnup", - "--issue-file", - str(mock_files.delivery_file), - "--sprint", - "Sprint 1", - ] - # execution - result = runner.invoke(app, command) - print(result.stdout) - # validation - check there wasn't an error - assert result.exit_code == 0 - assert "Slack message" not in result.stdout - - def test_stdout_message_includes_points_if_no_unit_is_set( - self, - mock_files: MockFiles, - ): - """CLI should uses 'points' as default unit and include it in stdout message.""" - # setup - create command - command = [ - "calculate", - "sprint_burnup", - "--issue-file", - str(mock_files.delivery_file), - "--sprint", - "Sprint 1", - "--show-results", - ] - # execution - result = runner.invoke(app, command) - print(result.stdout) - # validation - check there wasn't an error - assert result.exit_code == 0 - # validation - check that slack message is printed and includes 'points' - assert "Slack message" in result.stdout - assert "points" in result.stdout - - def test_stdout_message_includes_issues_if_unit_set_to_issues( - self, - mock_files: MockFiles, - ): - """CLI should use issues if set explicitly and include it in stdout.""" - # setup - create command - command = [ - "calculate", - "sprint_burnup", - "--issue-file", - str(mock_files.delivery_file), - "--sprint", - "Sprint 1", - "--unit", - "issues", - "--show-results", - ] - # execution - result = runner.invoke(app, command) - print(result.stdout) - # validation - check there wasn't an error - assert result.exit_code == 0 - # validation - check that slack message is printed and includes 'points' - assert "Slack message" in result.stdout - assert "issues" in result.stdout - - -class TestCalculateDeliverablePercentComplete: - """Test the calculate_deliverable_percent_complete entry point with mock data.""" - - def test_calculate_deliverable_percent_complete(self, mock_files: MockFiles): - """Entrypoint should run successfully but not print slack message to stdout.""" - # setup - create command - command = [ - "calculate", - "deliverable_percent_complete", - "--issue-file", - str(mock_files.delivery_file), - ] - # execution - result = runner.invoke(app, command) - print(result.stdout) - # validation - check there wasn't an error - assert result.exit_code == 0 - assert "Slack message" not in result.stdout - - def test_stdout_message_includes_points_if_no_unit_is_set( - self, - mock_files: MockFiles, - ): - """CLI should uses 'points' as default unit and include it in stdout message.""" - # setup - create command - command = [ - "calculate", - "deliverable_percent_complete", - "--issue-file", - str(mock_files.delivery_file), - "--show-results", - ] - # execution - result = runner.invoke(app, command) - print(result.stdout) - # validation - check there wasn't an error - assert result.exit_code == 0 - # validation - check that slack message is printed and includes 'points' - assert "Slack message" in result.stdout - assert "points" in result.stdout - - def test_stdout_message_includes_issues_if_unit_set_to_issues( - self, - mock_files: MockFiles, - ): - """CLI should use issues if set explicitly and include it in stdout.""" - # setup - create command - command = [ - "calculate", - "deliverable_percent_complete", - "--issue-file", - str(mock_files.delivery_file), - "--unit", - "issues", - "--show-results", - ] - # execution - result = runner.invoke(app, command) - print(result.stdout) - # validation - check there wasn't an error - assert result.exit_code == 0 - # validation - check that slack message is printed and includes 'points' - assert "Slack message" in result.stdout - assert "issues" in result.stdout - - class TestEtlEntryPoint: """Test the etl entry point."""