diff --git a/analytics/Makefile b/analytics/Makefile index 1926bba4f..b3dcf9447 100644 --- a/analytics/Makefile +++ b/analytics/Makefile @@ -177,38 +177,3 @@ gh-data-export: --output-file $(ISSUE_FILE) \ --temp-dir $(OUTPUT_DIR) -sprint-burndown: - @echo "=> Running sprint burndown report for HHS/13" - @echo "=====================================================" - $(POETRY) analytics calculate sprint_burndown \ - --issue-file $(ISSUE_FILE) \ - --output-dir $(OUTPUT_DIR) \ - --sprint "$(SPRINT)" \ - --project 13 \ - --unit $(UNIT) \ - --$(ACTION) - @echo "=====================================================" - @echo "=> Running sprint burndown report for HHS/17" - @echo "=====================================================" - $(POETRY) analytics calculate sprint_burndown \ - --issue-file $(ISSUE_FILE) \ - --output-dir $(OUTPUT_DIR) \ - --sprint "$(SPRINT)" \ - --project 17 \ - --unit $(UNIT) \ - --$(ACTION) - -percent-complete: - @echo "=> Running percent complete deliverable" - @echo "=====================================================" - $(POETRY) analytics calculate deliverable_percent_complete \ - --issue-file $(ISSUE_FILE) \ - --output-dir $(OUTPUT_DIR) \ - --include-status "In Progress" \ - --include-status "Planning" \ - --unit $(UNIT) \ - --$(ACTION) - -sprint-reports: sprint-burndown percent-complete - -sprint-reports-with-latest-data: gh-data-export sprint-reports diff --git a/analytics/src/analytics/cli.py b/analytics/src/analytics/cli.py index d40bc5a9f..b2db94e30 100644 --- a/analytics/src/analytics/cli.py +++ b/analytics/src/analytics/cli.py @@ -9,14 +9,13 @@ from typing import Annotated import typer -from slack_sdk import WebClient from sqlalchemy import text from analytics.datasets.etl_dataset import EtlDataset from analytics.datasets.issues import GitHubIssues from analytics.etl.github import GitHubProjectConfig, GitHubProjectETL from analytics.etl.utils import load_config -from analytics.integrations import etldb, slack +from analytics.integrations import etldb from analytics.integrations.db import PostgresDbClient from analytics.integrations.extracts.load_opportunity_data import ( extract_copy_opportunity_data, @@ -24,10 +23,6 @@ from analytics.logs import init as init_logging from analytics.logs.app_logger import init_app from analytics.logs.ecs_background_task import ecs_background_task -from analytics.metrics.base import BaseMetric, Unit -from analytics.metrics.burndown import SprintBurndown -from analytics.metrics.burnup import SprintBurnup -from analytics.metrics.percent_complete import DeliverablePercentComplete logger = logging.getLogger(__name__) @@ -38,15 +33,8 @@ OUTPUT_FILE_ARG = typer.Option(help="Path to file where exported data will be saved") OUTPUT_DIR_ARG = typer.Option(help="Path to directory where output files will be saved") TMP_DIR_ARG = typer.Option(help="Path to directory where intermediate files will be saved") -SPRINT_ARG = typer.Option(help="Name of the sprint for which we're calculating burndown") -UNIT_ARG = typer.Option(help="Whether to calculate completion by 'points' or 'tickets'") OWNER_ARG = typer.Option(help="Name of the GitHub project owner, e.g. HHS") PROJECT_ARG = typer.Option(help="Number of the GitHub project, e.g. 13") -SHOW_RESULTS_ARG = typer.Option(help="Display a chart of the results in a browser") -POST_RESULTS_ARG = typer.Option(help="Post the results to slack") -STATUS_ARG = typer.Option( - help="Deliverable status to include in report, can be passed multiple times", -) EFFECTIVE_DATE_ARG = typer.Option(help="YYYY-MM-DD effective date to apply to each imported row") # fmt: on @@ -54,12 +42,10 @@ app = typer.Typer() # instantiate sub-commands for exporting data and calculating metrics export_app = typer.Typer() -metrics_app = typer.Typer() import_app = typer.Typer() etl_app = typer.Typer() # add sub-commands to main entrypoint app.add_typer(export_app, name="export", help="Export data needed to calculate metrics") -app.add_typer(metrics_app, name="calculate", help="Calculate key project metrics") app.add_typer(import_app, name="import", help="Import data into the database") app.add_typer(etl_app, name="etl", help="Transform and load local file") @@ -101,121 +87,6 @@ def export_github_data( GitHubProjectETL(config).run() -# =========================================================== -# Calculate commands -# =========================================================== - - -@metrics_app.command(name="sprint_burndown") -def calculate_sprint_burndown( - issue_file: Annotated[str, ISSUE_FILE_ARG], - sprint: Annotated[str, SPRINT_ARG], - unit: Annotated[Unit, UNIT_ARG] = Unit.points.value, # type: ignore[assignment] - *, # makes the following args keyword only - show_results: Annotated[bool, SHOW_RESULTS_ARG] = False, - post_results: Annotated[bool, POST_RESULTS_ARG] = False, - output_dir: Annotated[str, OUTPUT_DIR_ARG] = "data", - owner: Annotated[str, OWNER_ARG] = "HHS", - project: Annotated[int, PROJECT_ARG] = 13, -) -> None: - """Calculate the burndown for a particular sprint.""" - # load the input data - sprint_data = GitHubIssues.from_json(issue_file) - # calculate burndown - burndown = SprintBurndown( - sprint_data, - sprint=sprint, - unit=unit, - project=project, - owner=owner, - ) - show_and_or_post_results( - metric=burndown, - show_results=show_results, - post_results=post_results, - output_dir=output_dir, - ) - - -@metrics_app.command(name="sprint_burnup") -def calculate_sprint_burnup( - issue_file: Annotated[str, ISSUE_FILE_ARG], - sprint: Annotated[str, SPRINT_ARG], - unit: Annotated[Unit, UNIT_ARG] = Unit.points.value, # type: ignore[assignment] - *, # makes the following args keyword only - show_results: Annotated[bool, SHOW_RESULTS_ARG] = False, - post_results: Annotated[bool, POST_RESULTS_ARG] = False, - output_dir: Annotated[str, OUTPUT_DIR_ARG] = "data", -) -> None: - """Calculate the burnup of a particular sprint.""" - # load the input data - sprint_data = GitHubIssues.from_json(issue_file) - # calculate burnup - burnup = SprintBurnup(sprint_data, sprint=sprint, unit=unit) - show_and_or_post_results( - metric=burnup, - show_results=show_results, - post_results=post_results, - output_dir=output_dir, - ) - - -@metrics_app.command(name="deliverable_percent_complete") -def calculate_deliverable_percent_complete( - issue_file: Annotated[str, ISSUE_FILE_ARG], - # Typer uses the Unit enum to validate user inputs from the CLI - # but the default arg must be a string or the CLI will throw an error - unit: Annotated[Unit, UNIT_ARG] = Unit.points.value, # type: ignore[assignment] - *, # makes the following args keyword only - show_results: Annotated[bool, SHOW_RESULTS_ARG] = False, - post_results: Annotated[bool, POST_RESULTS_ARG] = False, - output_dir: Annotated[str, OUTPUT_DIR_ARG] = "data", - include_status: Annotated[list[str] | None, STATUS_ARG] = None, -) -> None: - """Calculate percentage completion by deliverable.""" - task_data = GitHubIssues.from_json(issue_file) - # calculate percent complete - metric = DeliverablePercentComplete( - dataset=task_data, - unit=unit, - statuses_to_include=include_status, - ) - show_and_or_post_results( - metric=metric, - show_results=show_results, - post_results=post_results, - output_dir=output_dir, - ) - - -def show_and_or_post_results( - metric: BaseMetric, - *, # makes the following args keyword only - show_results: bool, - post_results: bool, - output_dir: str, -) -> None: - """Optionally show the results of a metric and/or post them to slack.""" - # defer load of settings until this command is called - # this prevents an error if ANALYTICS_SLACK_BOT_TOKEN env var is unset - from config import get_db_settings - - settings = get_db_settings() - - # optionally display the burndown chart in the browser - if show_results: - metric.show_chart() - print("Slack message:\n") - print(metric.format_slack_message()) - if post_results: - slackbot = slack.SlackBot(client=WebClient(token=settings.slack_bot_token)) - metric.post_results_to_slack( - slackbot=slackbot, - channel_id=settings.reporting_channel_id, - output_dir=Path(output_dir), - ) - - # =========================================================== # Import commands # =========================================================== diff --git a/analytics/src/analytics/metrics/__init__.py b/analytics/src/analytics/metrics/__init__.py deleted file mode 100644 index e558f81e2..000000000 --- a/analytics/src/analytics/metrics/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Calculate a set of metrics that are important to the project.""" diff --git a/analytics/src/analytics/metrics/base.py b/analytics/src/analytics/metrics/base.py deleted file mode 100644 index b60436ac9..000000000 --- a/analytics/src/analytics/metrics/base.py +++ /dev/null @@ -1,139 +0,0 @@ -"""Base class for all metrics.""" - -from dataclasses import dataclass -from enum import Enum -from pathlib import Path -from typing import Any, Generic, TypeVar - -import pandas as pd -from plotly.graph_objects import Figure - -from analytics.datasets.base import BaseDataset -from analytics.integrations.slack import FileMapping, SlackBot - -Dataset = TypeVar("Dataset", bound=BaseDataset) - - -class Unit(Enum): - """List the units in which metrics can be calculated.""" - - issues = "issues" # pylint: disable=C0103 - points = "points" # pylint: disable=C0103 - - -@dataclass -class Statistic: - """Store a single value that represents a summary statistic about a dataset.""" - - value: Any - suffix: str = "" - - -class BaseMetric(Generic[Dataset]): - """Base class for all metrics.""" - - CHART_PNG = "chart-static.png" - CHART_HTML = "chart-interactive.html" - RESULTS_CSV = "results.csv" - DATASET_CSV = "source-data.csv" - - def __init__(self, dataset: Dataset) -> None: - """Initialize and calculate the metric from the input dataset.""" - self.dataset = dataset - self.results = self.calculate() - self.stats = self.get_stats() - self._chart: Figure | None = None - - def calculate(self) -> pd.DataFrame: - """Calculate the metric and return the resulting dataset.""" - raise NotImplementedError - - def get_stats(self) -> dict[str, Statistic]: - """Get the list of stats associated with this metric to include in reporting.""" - raise NotImplementedError - - @property - def chart(self) -> Figure: - """ - Return a chart visualizing the results. - - Note: - ---- - By deferring the self.plot_results() method invocation until the chart is - needed, we decrease the amount of time required to instantiate the class - - """ - if not self._chart: - self._chart = self.plot_results() - return self._chart - - def plot_results(self) -> Figure: - """Create a plotly chart that visually represents the results.""" - raise NotImplementedError - - def export_results(self, output_dir: Path = Path("data")) -> Path: - """Export the self.results dataframe to a csv file.""" - # make sure the parent directory exists - output_dir.mkdir(exist_ok=True, parents=True) - output_path = output_dir / self.RESULTS_CSV - # export results dataframe to a csv - self.results.to_csv(output_path) - return output_path - - def export_dataset(self, output_dir: Path = Path("data")) -> Path: - """Export self.dataset to a csv file.""" - # make sure the parent directory exists - output_dir.mkdir(exist_ok=True, parents=True) - output_path = output_dir / self.DATASET_CSV - # export results dataframe to a csv - self.dataset.to_csv(output_path) - return output_path - - def export_chart_to_html(self, output_dir: Path = Path("data")) -> Path: - """Export the plotly chart in self.chart to a png file.""" - # make sure the parent directory exists - output_dir.mkdir(exist_ok=True, parents=True) - output_path = output_dir / self.CHART_HTML - # export chart to a png - self.chart.write_html(output_path) - return output_path - - def export_chart_to_png(self, output_dir: Path = Path("data")) -> Path: - """Export the plotly chart in self.chart to a png file.""" - # make sure the parent directory exists - output_dir.mkdir(exist_ok=True, parents=True) - output_path = output_dir / self.CHART_PNG - # export chart to a png - self.chart.write_image(output_path, width=900) - return output_path - - def show_chart(self) -> None: - """Display self.chart in a browser.""" - self.chart.show() - - def format_slack_message(self) -> str: - """Format the message that will be included with the charts posted to slack.""" - raise NotImplementedError - - def post_results_to_slack( - self, - slackbot: SlackBot, - channel_id: str, - output_dir: Path = Path("data"), - ) -> None: - """Upload copies of the results and chart to a slack channel.""" - results_csv = self.export_results(output_dir) - dataset_csv = self.export_dataset(output_dir) - chart_png = self.export_chart_to_png(output_dir) - chart_html = self.export_chart_to_html(output_dir) - files = [ - FileMapping(path=str(results_csv), name=results_csv.name), - FileMapping(path=str(dataset_csv), name=dataset_csv.name), - FileMapping(path=str(chart_png), name=chart_png.name), - FileMapping(path=str(chart_html), name=chart_html.name), - ] - slackbot.upload_files_to_slack_channel( - files=files, - channel_id=channel_id, - message=self.format_slack_message(), - ) diff --git a/analytics/src/analytics/metrics/burndown.py b/analytics/src/analytics/metrics/burndown.py deleted file mode 100644 index 041f79559..000000000 --- a/analytics/src/analytics/metrics/burndown.py +++ /dev/null @@ -1,163 +0,0 @@ -""" -Calculates burndown for sprints. - -This is a subclass of the BaseMetric class that calculates the running total of -open issues for each day in a sprint -""" - -from __future__ import annotations - -from typing import TYPE_CHECKING - -import pandas as pd -import plotly.express as px - -from analytics.datasets.issues import GitHubIssues -from analytics.metrics.base import BaseMetric, Statistic, Unit -from analytics.metrics.utils import Columns, sum_tix_by_day - -if TYPE_CHECKING: - from plotly.graph_objects import Figure - - -class SprintBurndown(BaseMetric[GitHubIssues]): - """Calculates the running total of open issues per day in the sprint.""" - - def __init__( - self, - dataset: GitHubIssues, - sprint: str, - unit: Unit, - project: int, - owner: str = "HHS", - ) -> None: - """Initialize the SprintBurndown metric.""" - self.dataset = dataset - self.project = project - self.owner = owner - self.sprint = self._get_and_validate_sprint_name(sprint) - self.sprint_data = self._isolate_data_for_this_sprint() - self.date_col = "date" - self.columns = Columns( - opened_at_col=dataset.opened_col, - closed_at_col=dataset.closed_col, - unit_col=dataset.points_col if unit == Unit.points else unit.value, - date_col=self.date_col, - ) - self.unit = unit - # Set the value of the unit column based on - # whether we're summing issues or story points - self.unit_col = dataset.points_col if unit == Unit.points else unit.value - super().__init__(dataset) - - def calculate(self) -> pd.DataFrame: - """Calculate the sprint burnup.""" - # make a copy of columns and rows we need to calculate burndown for this sprint - burnup_cols = [ - self.dataset.opened_col, - self.dataset.closed_col, - self.dataset.points_col, - ] - df_sprint = self.sprint_data[burnup_cols].copy() - # Count the number of tickets opened, closed, and remaining by day - return sum_tix_by_day( - df=df_sprint, - cols=self.columns, - unit=self.unit, - sprint_end=self.dataset.sprint_end(self.sprint), - ) - - def plot_results(self) -> Figure: - """Plot the sprint burndown using a plotly line chart.""" - # Limit the data in the line chart to dates within the sprint - # or through today, if the sprint hasn't yet ended - # NOTE: This will *not* affect the running totals on those days - sprint_start = self.dataset.sprint_start(self.sprint) - sprint_end = self.dataset.sprint_end(self.sprint) - date_mask = self.results[self.date_col].between( - sprint_start, - min(sprint_end, pd.Timestamp.today()), - ) - df = self.results[date_mask] - # create a line chart from the data in self.results - title = ( - f"{self.owner}/{self.project} {self.sprint} burndown " - f"in project by {self.unit.value}" - ) - chart = px.line( - data_frame=df, - x=self.date_col, - y="total_open", - title=title, - labels={"total_open": f"total {self.unit.value} open"}, - ) - # set the scale of the y axis to start at 0 - chart.update_yaxes(range=[0, df["total_open"].max() + 2]) - chart.update_xaxes(range=[sprint_start, sprint_end]) - return chart - - def get_stats(self) -> dict[str, Statistic]: - """ - Calculate summary statistics for this metric. - - Notes - ----- - TODO(@widal001): 2023-12-04 - Should stats be calculated in separate private methods? - - """ - df = self.results - # get sprint start and end dates - sprint_start = self.dataset.sprint_start(self.sprint).strftime("%Y-%m-%d") - sprint_end = self.dataset.sprint_end(self.sprint).strftime("%Y-%m-%d") - # get open and closed counts and percentages - total_opened = int(df["opened"].sum()) - total_closed = int(df["closed"].sum()) - pct_closed = round(total_closed / total_opened * 100, 2) - # get the percentage of tickets that were ticketed - is_pointed = self.sprint_data[self.dataset.points_col] >= 1 - issues_pointed = len(self.sprint_data[is_pointed]) - issues_total = len(self.sprint_data) - pct_pointed = round(issues_pointed / issues_total * 100, 2) - # format and return stats - return { - "Sprint start date": Statistic(value=sprint_start), - "Sprint end date": Statistic(value=sprint_end), - "Total opened": Statistic(total_opened, suffix=f" {self.unit.value}"), - "Total closed": Statistic(total_closed, suffix=f" {self.unit.value}"), - "Percent closed": Statistic(value=pct_closed, suffix="%"), - "Percent pointed": Statistic( - value=pct_pointed, - suffix=f"% of {Unit.issues.value}", - ), - } - - def format_slack_message(self) -> str: - """Format the message that will be included with the charts posted to slack.""" - message = ( - f"*:github: Burndown summary for {self.sprint} " - f"in project {self.owner}/{self.project} by {self.unit.value}*\n" - ) - for label, stat in self.stats.items(): - message += f"• *{label}:* {stat.value}{stat.suffix}\n" - return message - - def _get_and_validate_sprint_name(self, sprint: str | None) -> str: - """Get the name of the sprint we're using to calculate burndown or raise an error.""" - # save dataset to local variable for brevity - dataset = self.dataset - # update sprint name if calculating burndown for the current sprint - if sprint == "@current": - sprint = dataset.current_sprint - # check that the sprint name matches one of the sprints in the dataset - valid_sprint = sprint in list(dataset.sprints[dataset.sprint_col]) - if not sprint or not valid_sprint: # needs `not sprint` for mypy checking - msg = "Sprint value doesn't match one of the available sprints" - raise ValueError(msg) - # return the sprint name if it's valid - return sprint - - def _isolate_data_for_this_sprint(self) -> pd.DataFrame: - """Filter out issues that are not assigned to the current sprint or project.""" - sprint_filter = self.dataset.df[self.dataset.sprint_col] == self.sprint - project_filter = self.dataset.df[self.dataset.project_col] == self.project - return self.dataset.df[((sprint_filter) & (project_filter))] diff --git a/analytics/src/analytics/metrics/burnup.py b/analytics/src/analytics/metrics/burnup.py deleted file mode 100644 index b93b78dda..000000000 --- a/analytics/src/analytics/metrics/burnup.py +++ /dev/null @@ -1,157 +0,0 @@ -""" -Calculates burnup for sprints. - -This is a subclass of the BaseMetric class that calculates the running total of -open issues for each day in a sprint -""" - -from __future__ import annotations - -from typing import TYPE_CHECKING - -import pandas as pd -import plotly.express as px - -from analytics.datasets.issues import GitHubIssues -from analytics.metrics.base import BaseMetric, Statistic, Unit -from analytics.metrics.utils import Columns, sum_tix_by_day - -if TYPE_CHECKING: - from plotly.graph_objects import Figure - - -class SprintBurnup(BaseMetric[GitHubIssues]): - """Calculates the running total of open issues per day in the sprint.""" - - def __init__( - self, - dataset: GitHubIssues, - sprint: str, - unit: Unit, - ) -> None: - """Initialize the SprintBurnup metric.""" - self.dataset = dataset - self.sprint = self._get_and_validate_sprint_name(sprint) - self.sprint_data = self._isolate_data_for_this_sprint() - self.date_col = "date" - self.columns = Columns( - opened_at_col=dataset.opened_col, - closed_at_col=dataset.closed_col, - unit_col=dataset.points_col if unit == Unit.points else unit.value, - date_col=self.date_col, - ) - self.unit = unit - super().__init__(dataset) - - def calculate(self) -> pd.DataFrame: - """Calculate the sprint burnup.""" - # make a copy of columns and rows we need to calculate burndown for this sprint - burnup_cols = [ - self.dataset.opened_col, - self.dataset.closed_col, - self.dataset.points_col, - ] - df_sprint = self.sprint_data[burnup_cols].copy() - # Count the number of tickets opened, closed, and remaining by day - return sum_tix_by_day( - df=df_sprint, - cols=self.columns, - unit=self.unit, - sprint_end=self.dataset.sprint_end(self.sprint), - ) - - def plot_results(self) -> Figure: - """Plot the sprint burnup using a plotly area chart.""" - # Limit the data in the area chart to dates within the sprint - # or through today, if the sprint hasn't yet ended - # NOTE: This will *not* affect the running totals on those days - sprint_start = self.dataset.sprint_start(self.sprint) - sprint_end = self.dataset.sprint_end(self.sprint) - date_mask = self.results[self.date_col].between( - sprint_start, - min(sprint_end, pd.Timestamp.today()), - ) - df = self.results[date_mask].melt( - id_vars=self.date_col, - value_vars=["total_closed", "total_open"], - var_name="cols", - ) - - # create a area chart from the data in self.results - chart = px.area( - data_frame=df, - x=self.date_col, - y="value", - color="cols", - color_discrete_sequence=["#EFE0FC", "#2DA34D"], - markers=True, - title=f"{self.sprint} Burnup by {self.unit.value}", - template="none", - ) - # set the scale of the y axis to start at 0 - chart.update_yaxes(range=[0, df["value"].max() + 10]) - chart.update_xaxes(range=[sprint_start, sprint_end]) - chart.update_layout( - xaxis_title="Date", - yaxis_title=f"Total {self.unit.value.capitalize()}", - legend_title=f"{self.unit.value.capitalize()}", - ) - return chart - - def get_stats(self) -> dict[str, Statistic]: - """Calculate summary statistics for this metric.""" - df = self.results - # get sprint start and end dates - sprint_start = self.dataset.sprint_start(self.sprint).strftime("%Y-%m-%d") - sprint_end = self.dataset.sprint_end(self.sprint).strftime("%Y-%m-%d") - # get open and closed counts and percentages - total_opened = int(df["opened"].sum()) - total_closed = int(df["closed"].sum()) - pct_closed = round(total_closed / total_opened * 100, 2) - # For burnup, we want to know at a glance the pct_remaining - pct_remaining = round(100 - pct_closed, 2) - # get the percentage of tickets that were ticketed - is_pointed = self.sprint_data[self.dataset.points_col] >= 1 - issues_pointed = len(self.sprint_data[is_pointed]) - issues_total = len(self.sprint_data) - pct_pointed = round(issues_pointed / issues_total * 100, 2) - # format and return stats - return { - "Sprint start date": Statistic(value=sprint_start), - "Sprint end date": Statistic(value=sprint_end), - "Total opened": Statistic(total_opened, suffix=f" {self.unit.value}"), - "Total closed": Statistic(total_closed, suffix=f" {self.unit.value}"), - "Percent closed": Statistic(value=pct_closed, suffix="%"), - "Percent remaining": Statistic(value=pct_remaining, suffix="%"), - "Percent pointed": Statistic( - value=pct_pointed, - suffix=f"% of {Unit.issues.value}", - ), - } - - def format_slack_message(self) -> str: - """Format the message that will be included with the charts posted to slack.""" - message = f"*:github: Burnup summary for {self.sprint} by {self.unit.value}*\n" - for label, stat in self.stats.items(): - message += f"• *{label}:* {stat.value}{stat.suffix}\n" - return message - - def _get_and_validate_sprint_name(self, sprint: str | None) -> str: - """Get the name of the sprint we're using to calculate burndown or raise an error.""" - # save dataset to local variable for brevity - dataset = self.dataset - # update sprint name if calculating burndown for the current sprint - if sprint == "@current": - sprint = dataset.current_sprint - # check that the sprint name matches one of the sprints in the dataset - valid_sprint = sprint in list(dataset.sprints[dataset.sprint_col]) - if not sprint or not valid_sprint: # needs `not sprint` for mypy checking - msg = "Sprint value doesn't match one of the available sprints" - raise ValueError(msg) - # return the sprint name if it's valid - return sprint - - def _isolate_data_for_this_sprint(self) -> pd.DataFrame: - """Filter out issues that are not assigned to the current sprint.""" - sprint_filter = self.dataset.df[self.dataset.sprint_col] == self.sprint - return self.dataset.df[sprint_filter] diff --git a/analytics/src/analytics/metrics/percent_complete.py b/analytics/src/analytics/metrics/percent_complete.py deleted file mode 100644 index a17087aaa..000000000 --- a/analytics/src/analytics/metrics/percent_complete.py +++ /dev/null @@ -1,170 +0,0 @@ -"""Calculate and visualizes percent completion by deliverable.""" - -import datetime as dt - -import pandas as pd -import plotly.express as px -from plotly.graph_objects import Figure - -from analytics.datasets.issues import GitHubIssues -from analytics.metrics.base import BaseMetric, Statistic, Unit - - -class DeliverablePercentComplete(BaseMetric[GitHubIssues]): - """Calculate the percentage of issues or points completed per deliverable.""" - - def __init__( - self, - dataset: GitHubIssues, - unit: Unit, - statuses_to_include: list[str] | None = None, - ) -> None: - """Initialize the DeliverablePercentComplete metric.""" - self.dataset = dataset - self.deliverable_col = "deliverable_title" - self.status_col = "issue_state" - self.deliverable_status_col = "deliverable_status" - self.unit = unit - self.unit_col = dataset.points_col if unit == Unit.points else unit.value - self.statuses_to_include = statuses_to_include - self.deliverable_data = self._isolate_deliverables_by_status() - super().__init__(dataset) - - def calculate(self) -> pd.DataFrame: - """ - Calculate the percent complete per deliverable. - - Notes - ----- - Percent completion is calculated using the following steps: - 1. Count the number of all issues (or points) per deliverable - 2. Count the number of closed issues (or points) per deliverable - 3. Left join all issues/points with open issues/points on deliverable - so that we have a row per deliverable with a total count column - and a closed count column - 4. Subtract closed count from total count to get open count - 5. Divide closed count by total count to get percent complete - - """ - # get total and closed counts per deliverable - df_total = self._get_count_by_deliverable(status="all") - df_closed = self._get_count_by_deliverable(status="closed") - # join total and closed counts on deliverable - # and calculate remaining columns - df_all = df_total.merge(df_closed, on=self.deliverable_col, how="left") - df_all = df_all.fillna(0) - df_all["open"] = df_all["total"] - df_all["closed"] - df_all["percent_complete"] = df_all["closed"] / df_all["total"] - df_all["percent_complete"] = df_all["percent_complete"].fillna(0) - return df_all - - def plot_results(self) -> Figure: - """Create a bar chart of percent completion from the data in self.results.""" - # get the current date in YYYY-MM-DD format - today = dt.datetime.now(tz=dt.timezone.utc).strftime("%Y-%m-%d") - # reshape the dataframe in self.results for plotly - df = self._prepare_result_dataframe_for_plotly() - # create a stacked bar chart from the data - return px.bar( - df, - x=self.unit.value, - y=self.deliverable_col, - color=self.status_col, - text="percent_of_total", - labels={self.deliverable_col: "deliverable"}, - color_discrete_map={"open": "#aacde3", "closed": "#06508f"}, - orientation="h", - title=f"Percent of {self.unit.value} complete by deliverable as of {today}", - height=800, - ) - - def get_stats(self) -> dict[str, Statistic]: - """Calculate stats for this metric.""" - df_src = self.deliverable_data - # get the total number of issues and the number of issues with points per deliverable - is_pointed = df_src[self.dataset.points_col] >= 1 - issues_total = df_src.value_counts(self.deliverable_col).to_frame() - issues_pointed = ( - df_src[is_pointed].value_counts(self.deliverable_col).to_frame() - ) - # join the count of all issues to the count of pointed issues and - # calculate the percentage of all issues that have points per deliverable - df_tgt = issues_total.join(issues_pointed, lsuffix="_total", rsuffix="_pointed") - df_tgt["pct_pointed"] = df_tgt["count_pointed"] / df_tgt["count_total"] * 100 - df_tgt["pct_pointed"] = round(df_tgt["pct_pointed"], 2).fillna(0) - # export to a dictionary of stats) - stats = {} - for row in df_tgt.reset_index().to_dict("records"): - deliverable = row[self.deliverable_col] - stats[deliverable] = Statistic( - value=row["pct_pointed"], - suffix=f"% of {Unit.issues.value} pointed", - ) - return stats - - def format_slack_message(self) -> str: - """Format the message that will be included with the charts posted to slack.""" - message = f"*:github: Percent of {self.unit.value} completed by deliverable*\n" - if self.statuses_to_include: - statuses = ", ".join(self.statuses_to_include) - message += f"Limited to deliverables with these statuses: {statuses}\n\n" - for label, stat in self.stats.items(): - message += f"• *{label}:* {stat.value}{stat.suffix}\n" - return message - - def _isolate_deliverables_by_status(self) -> pd.DataFrame: - """Isolate the deliverables to include in the report based on their status.""" - df = self.dataset.df - # if statuses_to_include is provided, use it to filter the dataset - statuses_provided = self.statuses_to_include - if statuses_provided: - status_filter = df[self.deliverable_status_col].isin(statuses_provided) - df = df[status_filter] - return df - - def _get_count_by_deliverable( - self, - status: str, - ) -> pd.DataFrame: - """Get the count of issues (or points) by deliverable and status.""" - # create local copies of the dataset and key column names - df = self.deliverable_data.copy() - unit_col = self.unit_col - key_cols = [self.deliverable_col, unit_col] - # create a dummy column to sum per row if the unit is issues - if self.unit == Unit.issues: - df[unit_col] = 1 - # isolate issues with the status we want - if status != "all": - status_filter = df[self.status_col] == status - df = df.loc[status_filter, key_cols] - else: - status = "total" # rename status var to use as column name - df = df[key_cols] - # group by deliverable and sum the values in the unit field - # then rename the sum column to the value of the status var - # to prevent duplicate col names when open and closed counts are joined - df_agg = df.groupby(self.deliverable_col, as_index=False).agg({unit_col: "sum"}) - return df_agg.rename(columns={unit_col: status}) - - def _prepare_result_dataframe_for_plotly(self) -> pd.DataFrame: - """Stack the open and closed counts self.results for plotly charts.""" - # unpivot open and closed counts so that each deliverable has both - # an open and a closed row with just one column for count - unit_col: str = self.unit.value - df = self.results.melt( - id_vars=[self.deliverable_col], - value_vars=["open", "closed"], - value_name=unit_col, - var_name=self.status_col, - ) - # calculate the percentage of open and closed per deliverable - # so that we can use this value as label in the chart - df["total"] = df.groupby(self.deliverable_col)[unit_col].transform("sum") - df["percent_of_total"] = (df[unit_col] / df["total"] * 100).round(0) - df["percent_of_total"] = ( - df["percent_of_total"].astype("Int64").astype("str") + "%" - ) - # sort the dataframe by count and status so that the resulting chart - # has deliverables with more issues/points at the top - return df.sort_values(["total", self.status_col], ascending=True) diff --git a/analytics/src/analytics/metrics/utils.py b/analytics/src/analytics/metrics/utils.py deleted file mode 100644 index 4db39b850..000000000 --- a/analytics/src/analytics/metrics/utils.py +++ /dev/null @@ -1,129 +0,0 @@ -"""Stores utility functions for Metrics classes.""" - -from __future__ import annotations - -from dataclasses import dataclass -from enum import StrEnum - -import pandas as pd - -from analytics.metrics.base import Unit - - -@dataclass -class Columns: - """List of columns names to use when calculating burnup/down.""" - - opened_at_col: str - closed_at_col: str - unit_col: str - date_col: str = "date" - opened_count_col: str = "opened" - closed_count_col: str = "closed" - delta_col: str = "delta" - - -class IssueState(StrEnum): - """Whether the issue is open or closed.""" - - OPEN = "opened" - CLOSED = "closed" - - -def sum_tix_by_day( - df: pd.DataFrame, - cols: Columns, - unit: Unit, - sprint_end: pd.Timestamp, -) -> pd.DataFrame: - """Count the total number of tix opened, closed, and remaining by day.""" - # Get the date range for burndown/burnup - df_tix_range = get_tix_date_range(df, cols, sprint_end) - # Get the number of tix opened and closed by day - df_opened = get_daily_tix_counts_by_status(df, cols, IssueState.OPEN, unit) - df_closed = get_daily_tix_counts_by_status(df, cols, IssueState.CLOSED, unit) - # combine the daily opened and closed counts to get total open and closed per day - return get_cum_sum_of_tix(cols, df_tix_range, df_opened, df_closed) - - -def get_daily_tix_counts_by_status( - df: pd.DataFrame, - cols: Columns, - state: IssueState, - unit: Unit, -) -> pd.DataFrame: - """ - Count the number of issues or points opened or closed by date. - - Notes - ----- - It does this by: - - Grouping on the created_date or opened_date column, depending on state - - Counting the total number of rows per group - - """ - agg_col = cols.opened_at_col if state == IssueState.OPEN else cols.closed_at_col - unit_col = cols.unit_col - key_cols = [agg_col, unit_col] - if unit == Unit.issues: - df[unit_col] = 1 - df_agg = df[key_cols].groupby(agg_col, as_index=False).agg({unit_col: "sum"}) - return df_agg.rename(columns={agg_col: "date", unit_col: state.value}) - - -def get_tix_date_range( - df: pd.DataFrame, - cols: Columns, - sprint_end: pd.Timestamp, -) -> pd.DataFrame: - """ - Get the data range over which issues were created and closed. - - Notes - ----- - It does this by: - - Finding the date when the sprint ends - - Finding the earliest date a issue was created - - Finding the latest date a issue was closed - - Creating a row for each day between the earliest date a ticket was opened - and either the sprint end _or_ the latest date an issue was closed, - whichever is the later date. - - """ - opened_min = df[cols.opened_at_col].min() - closed_max = df[cols.closed_at_col].max() - closed_max = sprint_end if pd.isna(closed_max) else max(sprint_end, closed_max) - return pd.DataFrame( - pd.date_range(opened_min, closed_max), - columns=["date"], - ) - - -def get_cum_sum_of_tix( - cols: Columns, - dates: pd.DataFrame, - opened: pd.DataFrame, - closed: pd.DataFrame, -) -> pd.DataFrame: - """ - Create results data frame. - - Notes - ----- - It does this by: - - Left joining the full date range to the daily open and closed counts - so that we have a row for each day of the range with a column for tix - opened, a column for tix closed for the day, - - Cumulatively summing the deltas to get the running total of open tix - - Cumulative summing the closed column to get the running total of closed tix - - """ - df = ( - dates.merge(opened, on=cols.date_col, how="left") - .merge(closed, on=cols.date_col, how="left") - .fillna(0) - ) - df[cols.delta_col] = df[cols.opened_count_col] - df[cols.closed_count_col] - df["total_open"] = df[cols.delta_col].cumsum() - df["total_closed"] = df[cols.closed_count_col].cumsum() - return df diff --git a/analytics/tests/metrics/__init__.py b/analytics/tests/metrics/__init__.py deleted file mode 100644 index 8e691a3df..000000000 --- a/analytics/tests/metrics/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Test modules in analytics.metrics package.""" diff --git a/analytics/tests/metrics/test_base.py b/analytics/tests/metrics/test_base.py deleted file mode 100644 index 3362c7532..000000000 --- a/analytics/tests/metrics/test_base.py +++ /dev/null @@ -1,69 +0,0 @@ -"""Test the BaseMetric class.""" - -# pylint: disable=abstract-method -import pandas as pd # noqa: I001 -import pytest - -from analytics.datasets.base import BaseDataset -from analytics.metrics.base import BaseMetric, Statistic - - -class MetricWithoutStats(BaseMetric): - """Create a mock metric for testing without get_stats() method.""" - - def calculate(self) -> pd.DataFrame: - """Implement calculate method.""" - return pd.DataFrame() - - -class MetricWithoutPlotResults(BaseMetric): - """Create a mock metric for testing without get_stats() method.""" - - def calculate(self) -> pd.DataFrame: - """Implement calculate method.""" - return pd.DataFrame() - - def get_stats(self) -> dict[str, Statistic]: - """Implement get_stats method.""" - return {} - - -@pytest.fixture(scope="module", name="dataset") -def mock_dataset() -> BaseDataset: - """Create a mock BaseDataset instance for tests.""" - return BaseDataset(df=pd.DataFrame()) - - -class TestRequiredImplementations: - """Check that NotImplementedError is raised for abstract methods.""" - - def test_raise_not_implemented_on_init_due_to_calculate( - self, - dataset: BaseDataset, - ): - """Error should be raised for __init__() method without calculate().""" - with pytest.raises(NotImplementedError): - BaseMetric(dataset) - - def test_raise_not_implemented_on_init_due_to_get_stats( - self, - dataset: BaseDataset, - ): - """Error should be raised for __init__() method without get_stats().""" - with pytest.raises(NotImplementedError): - MetricWithoutStats(dataset) - - def test_raise_not_implemented_for_plot_results(self, dataset: BaseDataset): - """NotImplementedError should be raised for plot_results().""" - mock_metric = MetricWithoutPlotResults(dataset) - with pytest.raises(NotImplementedError): - mock_metric.plot_results() - - def test_raise_not_implemented_for_format_slack_message( - self, - dataset: BaseDataset, - ): - """NotImplementedError should be raised for format_slack_message().""" - mock_metric = MetricWithoutPlotResults(dataset) - with pytest.raises(NotImplementedError): - mock_metric.format_slack_message() diff --git a/analytics/tests/metrics/test_burndown.py b/analytics/tests/metrics/test_burndown.py deleted file mode 100644 index 6da265b8b..000000000 --- a/analytics/tests/metrics/test_burndown.py +++ /dev/null @@ -1,673 +0,0 @@ -"""Test the analytics.metrics.burndown module.""" - -from pathlib import Path # noqa: I001 - -import pandas as pd -import pytest - -from analytics.datasets.issues import GitHubIssues -from analytics.metrics.burndown import SprintBurndown, Unit - -from tests.conftest import ( - DAY_0, - DAY_1, - DAY_2, - DAY_3, - DAY_4, - MockSlackbot, - issue, -) - - -def result_row( - day: str, - opened: int, - closed: int, - delta: int, - total: int, - closed_total: int, -) -> dict: - """Create a sample result row.""" - return { - "date": pd.Timestamp(day), - "opened": opened, - "closed": closed, - "delta": delta, - "total_open": total, - "total_closed": closed_total, - } - - -@pytest.fixture(name="sample_burndown", scope="module") -def sample_burndown_by_points_fixture() -> SprintBurndown: - """Create a sample burndown to simplify test setup.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # return sprint burndown by points - return SprintBurndown(test_data, sprint="Sprint 1", unit=Unit.points, project=1) - - -class TestSprintBurndownByTasks: - """Test the SprintBurndown class with unit='tasks'.""" - - def test_exclude_tix_assigned_to_other_sprints(self): - """The burndown should exclude tickets that are assigned to other sprints.""" - # setup - create test data - sprint_data = [ - # fmt: off - # include this row - assigned to sprint 1 - issue(issue=1, sprint=1, sprint_start=DAY_1, created=DAY_1, closed=DAY_3), - # exclude this row - assigned to sprint 2 - issue(issue=1, sprint=2, sprint_start=DAY_4, created=DAY_0, closed=DAY_4), - # fmt: on - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.issues, - project=1, - ) - df = output.results - # validation - check min and max dates - assert df[output.date_col].min() == pd.Timestamp(DAY_1) - assert df[output.date_col].max() == pd.Timestamp(DAY_3) - # validation - check burndown output - # fmt: off - expected = [ - result_row(day=DAY_1, opened=1, closed=0, delta=1, total=1, closed_total=0), - result_row(day=DAY_2, opened=0, closed=0, delta=0, total=1, closed_total=0), - result_row(day=DAY_3, opened=0, closed=1, delta=-1, total=0, closed_total=1), - ] - # fmt: on - assert df.to_dict("records") == expected - - def test_count_tix_created_before_sprint_start(self): - """Burndown should include tix opened before the sprint but closed during it.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_2), - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.issues, - project=1, - ) - df = output.results - # validation - check min and max dates - assert df[output.date_col].min() == pd.Timestamp(DAY_0) - assert df[output.date_col].max() == pd.Timestamp(DAY_3) - # validation - check burndown output - # fmt: off - expected = [ - result_row(day=DAY_0, opened=2, closed=0, delta=2, total=2, closed_total=0), - result_row(day=DAY_1, opened=0, closed=0, delta=0, total=2, closed_total=0), - result_row(day=DAY_2, opened=0, closed=1, delta=-1, total=1, closed_total=1), - result_row(day=DAY_3, opened=0, closed=1, delta=-1, total=0, closed_total=2), - ] - # fmt: on - assert df.to_dict("records") == expected - - def test_count_tix_closed_after_sprint_start(self): - """Burndown should include tix closed after the sprint ended.""" - # setup - create test data - sprint_data = [ - issue( # closed before sprint end - issue=1, - sprint_start=DAY_1, - sprint_length=2, - created=DAY_1, - closed=DAY_2, - ), - issue( # closed after sprint end - issue=1, - sprint_start=DAY_1, - sprint_length=2, - created=DAY_1, - closed=DAY_4, - ), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.issues, - project=1, - ) - df = output.results - # validation - check min and max dates - assert df[output.date_col].min() == pd.Timestamp(DAY_1) - assert df[output.date_col].max() == pd.Timestamp(DAY_4) - # validation - check burndown output - # fmt: off - expected = [ - result_row(day=DAY_1, opened=2, closed=0, delta=2, total=2, closed_total=0), - result_row(day=DAY_2, opened=0, closed=1, delta=-1, total=1, closed_total=1), - result_row(day=DAY_3, opened=0, closed=0, delta=0, total=1, closed_total=1), - result_row(day=DAY_4, opened=0, closed=1, delta=-1, total=0, closed_total=2), - ] - # fmt: on - assert df.to_dict("records") == expected - - def test_count_tix_created_after_sprint_start(self): - """Burndown should include tix opened and closed during the sprint.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, closed=DAY_3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.issues, - project=1, - ) - df = output.results - # validation - check burndown output - # fmt: off - expected = [ - result_row(day=DAY_0, opened=1, closed=0, delta=1, total=1, closed_total=0), - result_row(day=DAY_1, opened=0, closed=0, delta=0, total=1, closed_total=0), - result_row(day=DAY_2, opened=1, closed=1, delta=0, total=1, closed_total=1), - result_row(day=DAY_3, opened=0, closed=1, delta=-1, total=0, closed_total=2), - ] - # fmt: on - assert df.to_dict("records") == expected - - def test_include_all_sprint_days_if_tix_closed_early(self): - """All days of the sprint should be included even if all tix were closed early.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_1), - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_1), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.issues, - project=1, - ) - df = output.results - # validation - check max date is end of sprint not last closed date - assert df[output.date_col].max() == pd.Timestamp(DAY_3) - - def test_raise_value_error_if_sprint_arg_not_in_dataset(self): - """A ValueError should be raised if the sprint argument isn't valid.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_1), - issue(issue=1, sprint_start=DAY_1, created=DAY_0), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # validation - with pytest.raises( - ValueError, - match="Sprint value doesn't match one of the available sprints", - ): - SprintBurndown(test_data, sprint="Fake sprint", unit=Unit.issues, project=1) - - def test_calculate_burndown_for_current_sprint(self): - """Use the current sprint if the date falls in the middle of a sprint.""" - # setup - create test data - today = pd.Timestamp.today().floor("d") - day_1 = (today + pd.Timedelta(days=-1)).strftime("%Y-%m-%d") - day_2 = today.strftime("%Y-%m-%d") - day_3 = (today + pd.Timedelta(days=1)).strftime("%Y-%m-%d") - sprint_data = [ # note sprint duration is 2 days by default - issue(issue=1, sprint_start=day_1, created=day_1, closed=day_2), - issue(issue=1, sprint_start=day_1, created=day_1), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="@current", - unit=Unit.issues, - project=1, - ) - df = output.results - # validation - check burndown output - # fmt: off - expected = [ - result_row(day=day_1, opened=2, closed=0, delta=2, total=2, closed_total=0), - result_row(day=day_2, opened=0, closed=1, delta=-1, total=1, closed_total=1), - result_row(day=day_3, opened=0, closed=0, delta=0, total=1, closed_total=1), - ] - # fmt: on - assert df.to_dict("records") == expected - - -class TestSprintBurndownByPoints: - """Test the SprintBurndown class with unit='points'.""" - - def test_burndown_works_with_points(self): - """Burndown should be calculated correctly with points.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.points, - project=1, - ) - df = output.results - # validation - # fmt: off - expected = [ - result_row(day=DAY_0, opened=2, closed=0, delta=2, total=2, closed_total=0), - result_row(day=DAY_1, opened=0, closed=0, delta=0, total=2, closed_total=0), - result_row(day=DAY_2, opened=3, closed=0, delta=3, total=5, closed_total=0), - result_row(day=DAY_3, opened=0, closed=0, delta=0, total=5, closed_total=0), - ] - # fmt: on - assert df.to_dict("records") == expected - - def test_burndown_excludes_tix_without_points(self): - """Burndown should exclude tickets that are not pointed.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_1, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=0), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=None), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.points, - project=1, - ) - df = output.results - # validation - # fmt: off - expected = [ - result_row(day=DAY_1, opened=2, closed=0, delta=2, total=2, closed_total=0), - result_row(day=DAY_2, opened=0, closed=0, delta=0, total=2, closed_total=0), - result_row(day=DAY_3, opened=0, closed=0, delta=0, total=2, closed_total=0), - ] - # fmt: on - assert df.to_dict("records") == expected - - -class TestGetStats: - """Test the SprintBurndown.get_stats() method.""" - - SPRINT_START = "Sprint start date" - SPRINT_END = "Sprint end date" - TOTAL_OPENED = "Total opened" - TOTAL_CLOSED = "Total closed" - PCT_CLOSED = "Percent closed" - PCT_POINTED = "Percent pointed" - - def test_sprint_start_and_sprint_end_not_affected_by_unit(self): - """Test that sprint start and end are the same regardless of unit.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_2), - issue(issue=2, sprint_start=DAY_1, created=DAY_2, closed=DAY_4), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - points = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.points, - project=1, - ) - issues = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.issues, - project=1, - ) - # validation - check they're calculated correctly - assert points.stats[self.SPRINT_START].value == DAY_1 - assert points.stats[self.SPRINT_END].value == DAY_3 - # validation - check that they are the same - # fmt: off - assert points.stats.get(self.SPRINT_START) == issues.stats.get(self.SPRINT_START) - assert points.stats.get(self.SPRINT_END) == issues.stats.get(self.SPRINT_END) - # fmt: on - - def test_get_total_closed_and_opened_when_unit_is_issues(self): - """Test that total_closed is calculated correctly when unit is issues.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint=1, created=DAY_0, closed=DAY_2), - issue(issue=2, sprint=1, created=DAY_0, closed=DAY_3), - issue(issue=3, sprint=1, created=DAY_2), # not closed - issue(issue=4, sprint=1, created=DAY_2), # not closed - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.issues, - project=1, - ) - print(output.results) - # validation - check that stats were calculated correctly - assert output.stats[self.TOTAL_CLOSED].value == 2 - assert output.stats[self.TOTAL_OPENED].value == 4 - assert output.stats[self.PCT_CLOSED].value == 50.0 - # validation - check that message contains string value of Unit.issues - assert Unit.issues.value in output.stats[self.TOTAL_CLOSED].suffix - assert Unit.issues.value in output.stats[self.TOTAL_OPENED].suffix - assert "%" in output.stats[self.PCT_CLOSED].suffix - - def test_get_total_closed_and_opened_when_unit_is_points(self): - """Test that total_closed is calculated correctly when unit is issues.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint=1, created=DAY_1, points=2, closed=DAY_2), - issue(issue=2, sprint=1, created=DAY_2, points=1, closed=DAY_4), - issue(issue=3, sprint=1, created=DAY_2, points=2), # not closed - issue(issue=4, sprint=1, created=DAY_2, points=4), # not closed - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.points, - project=1, - ) - # validation - assert output.stats[self.TOTAL_CLOSED].value == 3 - assert output.stats[self.TOTAL_OPENED].value == 9 - assert output.stats[self.PCT_CLOSED].value == 33.33 # rounded to 2 places - # validation - check that message contains string value of Unit.points - assert Unit.points.value in output.stats[self.TOTAL_CLOSED].suffix - assert Unit.points.value in output.stats[self.TOTAL_OPENED].suffix - assert "%" in output.stats[self.PCT_CLOSED].suffix - - def test_include_issues_closed_after_sprint_end(self): - """Issues that are closed after sprint ended should be included in closed count.""" - # setup - create test data - sprint_data = [ - issue( # closed during sprint - issue=1, - sprint_start=DAY_1, - sprint_length=2, - created=DAY_1, - closed=DAY_2, - ), - issue( # closed after sprint - issue=2, - sprint_start=DAY_1, - sprint_length=2, - created=DAY_2, - closed=DAY_4, - ), - issue( # not closed - issue=3, - sprint_start=DAY_1, - sprint_length=2, - created=DAY_2, - ), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.issues, - project=1, - ) - # validation - assert output.stats[self.TOTAL_CLOSED].value == 2 - assert output.stats[self.TOTAL_OPENED].value == 3 - assert output.stats[self.PCT_CLOSED].value == 66.67 # rounded to 2 places - - def test_get_percent_pointed(self): - """Test that percent pointed is calculated correctly.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint=1, created=DAY_1, points=2, closed=DAY_2), - issue(issue=2, sprint=1, created=DAY_2, points=1, closed=DAY_4), - issue(issue=3, sprint=1, created=DAY_2, points=None), # not pointed - issue(issue=4, sprint=1, created=DAY_2, points=0), # not closed - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.points, - project=1, - ) - # validation - assert output.stats[self.TOTAL_CLOSED].value == 3 - assert output.stats[self.TOTAL_OPENED].value == 3 - assert output.stats[self.PCT_CLOSED].value == 100 - assert output.stats[self.PCT_POINTED].value == 50 - # validation - check that stat contains '%' suffix - assert f"% of {Unit.issues.value}" in output.stats[self.PCT_POINTED].suffix - - def test_exclude_other_sprints_in_percent_pointed(self): - """Only include issues in this sprint when calculating percent pointed.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint=1, created=DAY_1, points=2, closed=DAY_2), - issue(issue=2, sprint=1, created=DAY_2, points=1, closed=DAY_4), - issue(issue=3, sprint=1, created=DAY_2, points=None), # not pointed - issue(issue=4, sprint=2, created=DAY_2, points=None), # other sprint - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.issues, - project=1, - ) - # validation - assert output.stats[self.TOTAL_CLOSED].value == 2 - assert output.stats[self.TOTAL_OPENED].value == 3 - assert output.stats[self.PCT_POINTED].value == 66.67 # exclude final row - - -class TestFormatSlackMessage: - """Test the DeliverablePercentComplete.format_slack_message().""" - - def test_slack_message_contains_right_number_of_lines(self): - """Message should contain one line for the title and one for each stat.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.points, - project=1, - ) - lines = output.format_slack_message().splitlines() - for line in lines: - print(line) - # validation - assert len(lines) == len(list(output.stats)) + 1 - - def test_title_includes_issues_when_unit_is_issue(self): - """Test that the title is formatted correctly when unit is issues.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.issues, - project=1, - ) - title = output.format_slack_message().splitlines()[0] - # validation - assert Unit.issues.value in title - - def test_title_includes_points_when_unit_is_points(self): - """Test that the title is formatted correctly when unit is points.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.points, - project=1, - ) - title = output.format_slack_message().splitlines()[0] - # validation - assert Unit.points.value in title - - -class TestPlotResults: - """Test the SprintBurndown.show_results() method.""" - - def test_plot_results_output_stored_in_chart_property(self): - """SprintBurndown.chart should contain the output of plot_results().""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurndown( - test_data, - sprint="Sprint 1", - unit=Unit.points, - project=1, - ) - # validation - check that the chart attribute matches output of plot_results() - assert output.chart == output.plot_results() - - -class TestExportMethods: - """Test the export methods method for SprintBurndown.""" - - @pytest.mark.parametrize( - ("method", "file_name"), - [ - ("export_results", "RESULTS_CSV"), - ("export_dataset", "DATASET_CSV"), - ("export_chart_to_html", "CHART_HTML"), - ("export_chart_to_png", "CHART_PNG"), - ], - ) - def test_export_results_to_correct_file_path( - self, - method: str, - file_name: str, - tmp_path: Path, - sample_burndown: SprintBurndown, - ): - """The file should be exported to the correct location.""" - # setup - check that file doesn't exist at output location - file_name = getattr(sample_burndown, file_name) - expected_path = tmp_path / file_name - assert expected_path.parent.exists() is True - assert expected_path.exists() is False - # execution - func = getattr(sample_burndown, method) - output = func(output_dir=expected_path.parent) - # validation - check that output path matches expected and file exists - assert output == expected_path - assert expected_path.exists() - - @pytest.mark.parametrize( - ("method", "file_name"), - [ - ("export_results", "RESULTS_CSV"), - ("export_dataset", "DATASET_CSV"), - ("export_chart_to_html", "CHART_HTML"), - ("export_chart_to_png", "CHART_PNG"), - ], - ) - def test_create_parent_dir_if_it_does_not_exists( - self, - method: str, - file_name: str, - tmp_path: Path, - sample_burndown: SprintBurndown, - ): - """The parent directory should be created if it doesn't already exist.""" - # setup - check that file and parent directory don't exist - file_name = getattr(sample_burndown, file_name) - expected_path = tmp_path / "new_folder" / file_name - assert expected_path.parent.exists() is False # doesn't yet exist - assert expected_path.exists() is False - # execution - func = getattr(sample_burndown, method) - output = func(output_dir=expected_path.parent) - # validation - check that output path matches expected and file exists - assert output == expected_path - assert expected_path.exists() - - -def test_post_to_slack( - mock_slackbot: MockSlackbot, - tmp_path: Path, - sample_burndown: SprintBurndown, -): - """Test the steps required to post the results to slack, without actually posting.""" - # execution - sample_burndown.post_results_to_slack( - mock_slackbot, # type: ignore[assignment] - channel_id="test_channel", - output_dir=tmp_path, - ) - # validation - check that output files exist - for output in ["RESULTS_CSV", "DATASET_CSV", "CHART_PNG", "CHART_HTML"]: - output_path = tmp_path / getattr(sample_burndown, output) - assert output_path.exists() is True diff --git a/analytics/tests/metrics/test_burnup.py b/analytics/tests/metrics/test_burnup.py deleted file mode 100644 index df5f653f6..000000000 --- a/analytics/tests/metrics/test_burnup.py +++ /dev/null @@ -1,738 +0,0 @@ -"""Test the analytics.metrics.burnup module.""" - -from pathlib import Path - -import pandas as pd -import pytest -from analytics.datasets.issues import GitHubIssues -from analytics.metrics.burnup import SprintBurnup, Unit - -from tests.conftest import ( - DAY_0, - DAY_1, - DAY_2, - DAY_3, - DAY_4, - MockSlackbot, - issue, -) - - -def result_row( - day: str, - opened: int, - closed: int, - delta: int, - total_open: int, - total_closed: int, -) -> dict: - """Create a sample result row.""" - return { - "date": pd.Timestamp(day), - "opened": opened, - "closed": closed, - "delta": delta, - "total_open": total_open, - "total_closed": total_closed, - } - - -@pytest.fixture(name="sample_burnup", scope="module") -def sample_burnup_by_points_fixture() -> SprintBurnup: - """Create a sample burnup to simplify test setup.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # return sprint burnup by points - return SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.points) - - -class TestSprintBurnupByTasks: - """Test the SprintBurnup class with unit='tasks'.""" - - def test_exclude_tix_assigned_to_other_sprints(self): - """The burnup should exclude tickets that are assigned to other sprints.""" - # setup - create test data - sprint_data = [ - # fmt: off - # include this row - assigned to sprint 1 - issue(issue=1, sprint=1, sprint_start=DAY_1, created=DAY_1, closed=DAY_3), - # exclude this row - assigned to sprint 2 - issue(issue=1, sprint=2, sprint_start=DAY_4, created=DAY_0, closed=DAY_4), - # fmt: on - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.issues) - df = output.results - # validation - check min and max dates - assert df[output.date_col].min() == pd.Timestamp(DAY_1) - assert df[output.date_col].max() == pd.Timestamp(DAY_3) - # validation - check burnup output - expected = [ - result_row( - day=DAY_1, - opened=1, - closed=0, - delta=1, - total_open=1, - total_closed=0, - ), - result_row( - day=DAY_2, - opened=0, - closed=0, - delta=0, - total_open=1, - total_closed=0, - ), - result_row( - day=DAY_3, - opened=0, - closed=1, - delta=-1, - total_open=0, - total_closed=1, - ), - ] - assert df.to_dict("records") == expected - - def test_count_tix_created_before_sprint_start(self): - """Burnup should include tix opened before the sprint but closed during it.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_2), - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.issues) - df = output.results - # validation - check min and max dates - assert df[output.date_col].min() == pd.Timestamp(DAY_0) - assert df[output.date_col].max() == pd.Timestamp(DAY_3) - # validation - check burnup output - expected = [ - result_row( - day=DAY_0, - opened=2, - closed=0, - delta=2, - total_open=2, - total_closed=0, - ), - result_row( - day=DAY_1, - opened=0, - closed=0, - delta=0, - total_open=2, - total_closed=0, - ), - result_row( - day=DAY_2, - opened=0, - closed=1, - delta=-1, - total_open=1, - total_closed=1, - ), - result_row( - day=DAY_3, - opened=0, - closed=1, - delta=-1, - total_open=0, - total_closed=2, - ), - ] - assert df.to_dict("records") == expected - - def test_count_tix_closed_after_sprint_start(self): - """Burnup should include tix closed after the sprint ended.""" - # setup - create test data - sprint_data = [ - issue( # closed before sprint end - issue=1, - sprint_start=DAY_1, - sprint_length=2, - created=DAY_1, - closed=DAY_2, - ), - issue( # closed after sprint end - issue=1, - sprint_start=DAY_1, - sprint_length=2, - created=DAY_1, - closed=DAY_4, - ), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.issues) - df = output.results - # validation - check min and max dates - assert df[output.date_col].min() == pd.Timestamp(DAY_1) - assert df[output.date_col].max() == pd.Timestamp(DAY_4) - # validation - check burnup output - expected = [ - result_row( - day=DAY_1, - opened=2, - closed=0, - delta=2, - total_open=2, - total_closed=0, - ), - result_row( - day=DAY_2, - opened=0, - closed=1, - delta=-1, - total_open=1, - total_closed=1, - ), - result_row( - day=DAY_3, - opened=0, - closed=0, - delta=0, - total_open=1, - total_closed=1, - ), - result_row( - day=DAY_4, - opened=0, - closed=1, - delta=-1, - total_open=0, - total_closed=2, - ), - ] - assert df.to_dict("records") == expected - - def test_count_tix_created_after_sprint_start(self): - """Burnup should include tix opened and closed during the sprint.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, closed=DAY_3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.issues) - df = output.results - # validation - check burnup output - expected = [ - result_row( - day=DAY_0, - opened=1, - closed=0, - delta=1, - total_open=1, - total_closed=0, - ), - result_row( - day=DAY_1, - opened=0, - closed=0, - delta=0, - total_open=1, - total_closed=0, - ), - result_row( - day=DAY_2, - opened=1, - closed=1, - delta=0, - total_open=1, - total_closed=1, - ), - result_row( - day=DAY_3, - opened=0, - closed=1, - delta=-1, - total_open=0, - total_closed=2, - ), - ] - assert df.to_dict("records") == expected - - def test_include_all_sprint_days_if_tix_closed_early(self): - """All days of the sprint should be included even if all tix were closed early.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_1), - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_1), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.issues) - df = output.results - # validation - check max date is end of sprint not last closed date - assert df[output.date_col].max() == pd.Timestamp(DAY_3) - - def test_raise_value_error_if_sprint_arg_not_in_dataset(self): - """A ValueError should be raised if the sprint argument isn't valid.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_1), - issue(issue=1, sprint_start=DAY_1, created=DAY_0), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # validation - with pytest.raises( - ValueError, - match="Sprint value doesn't match one of the available sprints", - ): - SprintBurnup(test_data, sprint="Fake sprint", unit=Unit.issues) - - def test_calculate_burnup_for_current_sprint(self): - """Use the current sprint if the date falls in the middle of a sprint.""" - # setup - create test data - today = pd.Timestamp.today().floor("d") - day_1 = (today + pd.Timedelta(days=-1)).strftime("%Y-%m-%d") - day_2 = today.strftime("%Y-%m-%d") - day_3 = (today + pd.Timedelta(days=1)).strftime("%Y-%m-%d") - sprint_data = [ # note sprint duration is 2 days by default - issue(issue=1, sprint_start=day_1, created=day_1, closed=day_2), - issue(issue=1, sprint_start=day_1, created=day_1), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="@current", unit=Unit.issues) - df = output.results - # validation - check burnup output - expected = [ - result_row( - day=day_1, - opened=2, - closed=0, - delta=2, - total_open=2, - total_closed=0, - ), - result_row( - day=day_2, - opened=0, - closed=1, - delta=-1, - total_open=1, - total_closed=1, - ), - result_row( - day=day_3, - opened=0, - closed=0, - delta=0, - total_open=1, - total_closed=1, - ), - ] - assert df.to_dict("records") == expected - - -class TestSprintBurnupByPoints: - """Test the SprintBurnup class with unit='points'.""" - - def test_burnup_works_with_points(self): - """Burnup should be calculated correctly with points.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.points) - df = output.results - # validation - expected = [ - result_row( - day=DAY_0, - opened=2, - closed=0, - delta=2, - total_open=2, - total_closed=0, - ), - result_row( - day=DAY_1, - opened=0, - closed=0, - delta=0, - total_open=2, - total_closed=0, - ), - result_row( - day=DAY_2, - opened=3, - closed=0, - delta=3, - total_open=5, - total_closed=0, - ), - result_row( - day=DAY_3, - opened=0, - closed=0, - delta=0, - total_open=5, - total_closed=0, - ), - ] - assert df.to_dict("records") == expected - - def test_burnup_excludes_tix_without_points(self): - """Burnup should exclude tickets that are not pointed.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_1, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=0), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=None), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.points) - df = output.results - # validation - expected = [ - result_row( - day=DAY_1, - opened=2, - closed=0, - delta=2, - total_open=2, - total_closed=0, - ), - result_row( - day=DAY_2, - opened=0, - closed=0, - delta=0, - total_open=2, - total_closed=0, - ), - result_row( - day=DAY_3, - opened=0, - closed=0, - delta=0, - total_open=2, - total_closed=0, - ), - ] - assert df.to_dict("records") == expected - - -class TestGetStats: - """Test the SprintBurnup.get_stats() method.""" - - SPRINT_START = "Sprint start date" - SPRINT_END = "Sprint end date" - TOTAL_OPENED = "Total opened" - TOTAL_CLOSED = "Total closed" - PCT_CLOSED = "Percent closed" - PCT_POINTED = "Percent pointed" - - def test_sprint_start_and_sprint_end_not_affected_by_unit(self): - """Test that sprint start and end are the same regardless of unit.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, closed=DAY_2), - issue(issue=2, sprint_start=DAY_1, created=DAY_2, closed=DAY_4), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - points = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.points) - issues = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.issues) - # validation - check they're calculated correctly - assert points.stats[self.SPRINT_START].value == DAY_1 - assert points.stats[self.SPRINT_END].value == DAY_3 - # validation - check that they are the same - # fmt: off - assert points.stats.get(self.SPRINT_START) == issues.stats.get(self.SPRINT_START) - assert points.stats.get(self.SPRINT_END) == issues.stats.get(self.SPRINT_END) - # fmt: on - - def test_get_total_closed_and_opened_when_unit_is_issues(self): - """Test that total_closed is calculated correctly when unit is issues.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint=1, created=DAY_0, closed=DAY_2), - issue(issue=2, sprint=1, created=DAY_0, closed=DAY_3), - issue(issue=3, sprint=1, created=DAY_2), # not closed - issue(issue=4, sprint=1, created=DAY_2), # not closed - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.issues) - print(output.results) - # validation - check that stats were calculated correctly - assert output.stats[self.TOTAL_CLOSED].value == 2 - assert output.stats[self.TOTAL_OPENED].value == 4 - assert output.stats[self.PCT_CLOSED].value == 50.0 - # validation - check that message contains string value of Unit.issues - assert Unit.issues.value in output.stats[self.TOTAL_CLOSED].suffix - assert Unit.issues.value in output.stats[self.TOTAL_OPENED].suffix - assert "%" in output.stats[self.PCT_CLOSED].suffix - - def test_get_total_closed_and_opened_when_unit_is_points(self): - """Test that total_closed is calculated correctly when unit is issues.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint=1, created=DAY_1, points=2, closed=DAY_2), - issue(issue=2, sprint=1, created=DAY_2, points=1, closed=DAY_4), - issue(issue=3, sprint=1, created=DAY_2, points=2), # not closed - issue(issue=4, sprint=1, created=DAY_2, points=4), # not closed - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.points) - # validation - assert output.stats[self.TOTAL_CLOSED].value == 3 - assert output.stats[self.TOTAL_OPENED].value == 9 - assert output.stats[self.PCT_CLOSED].value == 33.33 # rounded to 2 places - # validation - check that message contains string value of Unit.points - assert Unit.points.value in output.stats[self.TOTAL_CLOSED].suffix - assert Unit.points.value in output.stats[self.TOTAL_OPENED].suffix - assert "%" in output.stats[self.PCT_CLOSED].suffix - - def test_include_issues_closed_after_sprint_end(self): - """Issues that are closed after sprint ended should be included in closed count.""" - # setup - create test data - sprint_data = [ - issue( # closed during sprint - issue=1, - sprint_start=DAY_1, - sprint_length=2, - created=DAY_1, - closed=DAY_2, - ), - issue( # closed after sprint - issue=2, - sprint_start=DAY_1, - sprint_length=2, - created=DAY_2, - closed=DAY_4, - ), - issue( # not closed - issue=3, - sprint_start=DAY_1, - sprint_length=2, - created=DAY_2, - ), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.issues) - # validation - assert output.stats[self.TOTAL_CLOSED].value == 2 - assert output.stats[self.TOTAL_OPENED].value == 3 - assert output.stats[self.PCT_CLOSED].value == 66.67 # rounded to 2 places - - def test_get_percent_pointed(self): - """Test that percent pointed is calculated correctly.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint=1, created=DAY_1, points=2, closed=DAY_2), - issue(issue=2, sprint=1, created=DAY_2, points=1, closed=DAY_4), - issue(issue=3, sprint=1, created=DAY_2, points=None), # not pointed - issue(issue=4, sprint=1, created=DAY_2, points=0), # not closed - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.points) - # validation - assert output.stats[self.TOTAL_CLOSED].value == 3 - assert output.stats[self.TOTAL_OPENED].value == 3 - assert output.stats[self.PCT_CLOSED].value == 100 - assert output.stats[self.PCT_POINTED].value == 50 - # validation - check that stat contains '%' suffix - assert f"% of {Unit.issues.value}" in output.stats[self.PCT_POINTED].suffix - - def test_exclude_other_sprints_in_percent_pointed(self): - """Only include issues in this sprint when calculating percent pointed.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint=1, created=DAY_1, points=2, closed=DAY_2), - issue(issue=2, sprint=1, created=DAY_2, points=1, closed=DAY_4), - issue(issue=3, sprint=1, created=DAY_2, points=None), # not pointed - issue(issue=4, sprint=2, created=DAY_2, points=None), # other sprint - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.issues) - # validation - assert output.stats[self.TOTAL_CLOSED].value == 2 - assert output.stats[self.TOTAL_OPENED].value == 3 - assert output.stats[self.PCT_POINTED].value == 66.67 # exclude final row - - -class TestFormatSlackMessage: - """Test the DeliverablePercentComplete.format_slack_message().""" - - def test_slack_message_contains_right_number_of_lines(self): - """Message should contain one line for the title and one for each stat.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.points) - lines = output.format_slack_message().splitlines() - for line in lines: - print(line) - # validation - assert len(lines) == len(list(output.stats)) + 1 - - def test_title_includes_issues_when_unit_is_issue(self): - """Test that the title is formatted correctly when unit is issues.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.issues) - title = output.format_slack_message().splitlines()[0] - # validation - assert Unit.issues.value in title - - def test_title_includes_points_when_unit_is_points(self): - """Test that the title is formatted correctly when unit is points.""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.points) - title = output.format_slack_message().splitlines()[0] - # validation - assert Unit.points.value in title - - -class TestPlotResults: - """Test the SprintBurnup.show_results() method.""" - - def test_plot_results_output_stored_in_chart_property(self): - """SprintBurnup.chart should contain the output of plot_results().""" - # setup - create test data - sprint_data = [ - issue(issue=1, sprint_start=DAY_1, created=DAY_0, points=2), - issue(issue=1, sprint_start=DAY_1, created=DAY_2, points=3), - ] - sprint_data = [i.__dict__ for i in sprint_data] - test_data = GitHubIssues.from_dict(sprint_data) - # execution - output = SprintBurnup(test_data, sprint="Sprint 1", unit=Unit.points) - # validation - check that the chart attribute matches output of plot_results() - assert output.chart == output.plot_results() - - -class TestExportMethods: - """Test the export methods method for SprintBurnup.""" - - @pytest.mark.parametrize( - ("method", "file_name"), - [ - ("export_results", "RESULTS_CSV"), - ("export_dataset", "DATASET_CSV"), - ("export_chart_to_html", "CHART_HTML"), - ("export_chart_to_png", "CHART_PNG"), - ], - ) - def test_export_results_to_correct_file_path( - self, - method: str, - file_name: str, - tmp_path: Path, - sample_burnup: SprintBurnup, - ): - """The file should be exported to the correct location.""" - # setup - check that file doesn't exist at output location - file_name = getattr(sample_burnup, file_name) - expected_path = tmp_path / file_name - assert expected_path.parent.exists() is True - assert expected_path.exists() is False - # execution - func = getattr(sample_burnup, method) - output = func(output_dir=expected_path.parent) - # validation - check that output path matches expected and file exists - assert output == expected_path - assert expected_path.exists() - - @pytest.mark.parametrize( - ("method", "file_name"), - [ - ("export_results", "RESULTS_CSV"), - ("export_dataset", "DATASET_CSV"), - ("export_chart_to_html", "CHART_HTML"), - ("export_chart_to_png", "CHART_PNG"), - ], - ) - def test_create_parent_dir_if_it_does_not_exists( - self, - method: str, - file_name: str, - tmp_path: Path, - sample_burnup: SprintBurnup, - ): - """The parent directory should be created if it doesn't already exist.""" - # setup - check that file and parent directory don't exist - file_name = getattr(sample_burnup, file_name) - expected_path = tmp_path / "new_folder" / file_name - assert expected_path.parent.exists() is False # doesn't yet exist - assert expected_path.exists() is False - # execution - func = getattr(sample_burnup, method) - output = func(output_dir=expected_path.parent) - # validation - check that output path matches expected and file exists - assert output == expected_path - assert expected_path.exists() - - -def test_post_to_slack( - mock_slackbot: MockSlackbot, - tmp_path: Path, - sample_burnup: SprintBurnup, -): - """Test the steps required to post the results to slack, without actually posting.""" - # execution - sample_burnup.post_results_to_slack( - mock_slackbot, # type: ignore[assignment] - channel_id="test_channel", - output_dir=tmp_path, - ) - # validation - check that output files exist - for output in ["RESULTS_CSV", "DATASET_CSV", "CHART_PNG", "CHART_HTML"]: - output_path = tmp_path / getattr(sample_burnup, output) - assert output_path.exists() is True diff --git a/analytics/tests/metrics/test_percent_complete.py b/analytics/tests/metrics/test_percent_complete.py deleted file mode 100644 index 5a819706e..000000000 --- a/analytics/tests/metrics/test_percent_complete.py +++ /dev/null @@ -1,434 +0,0 @@ -"""Tests for analytics/datasets/percent_complete.py.""" - -from pathlib import Path # noqa: I001 - -import pytest - -from analytics.datasets.issues import GitHubIssues, IssueMetadata, IssueType -from analytics.metrics.percent_complete import DeliverablePercentComplete, Unit -from tests.conftest import MockSlackbot, DAY_0, DAY_1 - - -def task_row( - deliverable: int, - task: int | None, - deliverable_status: str | None = "In Progress", - points: int | None = 1, - status: str | None = "open", -) -> dict: - """Create a sample row of the DeliverableTasks dataset.""" - issue = IssueMetadata( - project_owner="HHS", - project_number=1, - issue_title=f"Task {task}", - issue_url=f"task{task}", - issue_type=IssueType.TASK.value, - issue_parent=None, - issue_points=points, - issue_is_closed=status == "closed", - issue_opened_at=DAY_0, - issue_closed_at=DAY_1 if status == "closed" else None, - deliverable_title=f"Deliverable {deliverable}", - deliverable_status=deliverable_status, - ) - return issue.model_dump() - - -@pytest.fixture(name="percent_complete", scope="module") -def sample_percent_complete() -> DeliverablePercentComplete: - """Create a sample burndown to simplify test setup.""" - # setup - create test data - test_rows = [ - task_row(deliverable=1, task=1, status="open"), - task_row(deliverable=1, task=2, status="closed"), - task_row(deliverable=2, task=3, status="open"), - ] - test_data = GitHubIssues.from_dict(test_rows) - # return sprint burndown by points - return DeliverablePercentComplete(test_data, unit=Unit.points) - - -class TestDeliverablePercentComplete: - """Test the DeliverablePercentComplete metric.""" - - def test_percent_complete_based_on_task_count(self): - """Check that percent completion is correct when tasks are the unit.""" - # setup - create test dataset - test_rows = [ - task_row(deliverable=1, task=1, status="open"), - task_row(deliverable=1, task=2, status="closed"), - task_row(deliverable=2, task=3, status="open"), - ] - test_data = GitHubIssues.from_dict(test_rows) - # execution - df = DeliverablePercentComplete(test_data, unit=Unit.issues).results - df = df.set_index("deliverable_title") - # validation - check number of rows returned - assert len(df) == 2 - # validation - check totals - assert df.loc["Deliverable 1", "total"] == 2 - assert df.loc["Deliverable 2", "total"] == 1 - # validation - check open - assert df.loc["Deliverable 1", "open"] == 1 - assert df.loc["Deliverable 2", "open"] == 1 - # validation - check closed - assert df.loc["Deliverable 1", "closed"] == 1 - assert df.loc["Deliverable 2", "closed"] == 0 - # validation - check percent complete - assert df.loc["Deliverable 1", "percent_complete"] == 0.5 - assert df.loc["Deliverable 2", "percent_complete"] == 0.0 - - def test_percent_complete_based_on_points(self): - """Check that percent completion is correct when points are the unit.""" - # setup - create test dataset - test_rows = [ - task_row(deliverable=1, task=1, points=1, status="open"), - task_row(deliverable=1, task=2, points=3, status="closed"), - task_row(deliverable=2, task=3, points=5, status="open"), - ] - test_data = GitHubIssues.from_dict(test_rows) - # execution - df = DeliverablePercentComplete(test_data, unit=Unit.points).results - df = df.set_index("deliverable_title") - # validation - check number of rows returned - assert len(df) == 2 - # validation - check totals - assert df.loc["Deliverable 1", "total"] == 4 - assert df.loc["Deliverable 2", "total"] == 5 - # validation - check open - assert df.loc["Deliverable 1", "open"] == 1 - assert df.loc["Deliverable 2", "open"] == 5 - # validation - check closed - assert df.loc["Deliverable 1", "closed"] == 3 - assert df.loc["Deliverable 2", "closed"] == 0 - # validation - check percent complete - assert df.loc["Deliverable 1", "percent_complete"] == 0.75 - assert df.loc["Deliverable 2", "percent_complete"] == 0.0 - - def test_show_0_pct_for_deliverables_without_tasks(self): - """Deliverables without tasks should show 0% complete instead of throwing an error.""" - # setup - create test dataset where deliverable 2 has no tasks - test_rows = [ - task_row(deliverable=1, task=2, status="closed"), - task_row(deliverable=2, task=None, status=None), - ] - test_data = GitHubIssues.from_dict(test_rows) - # execution - use tasks as the unit - df = DeliverablePercentComplete(test_data, unit=Unit.issues).results - df = df.set_index("deliverable_title") - # validation - check number of rows returned - assert len(df) == 2 - # validation - check totals - assert df.loc["Deliverable 1", "total"] == 1 - assert df.loc["Deliverable 2", "total"] == 1 - # validation - check open - assert df.loc["Deliverable 1", "open"] == 0 - assert df.loc["Deliverable 2", "open"] == 1 - # validation - check closed - assert df.loc["Deliverable 1", "closed"] == 1 - assert df.loc["Deliverable 2", "closed"] == 0 - # validation - check percent complete - assert df.loc["Deliverable 1", "percent_complete"] == 1.0 - assert df.loc["Deliverable 2", "percent_complete"] == 0.0 - - def test_show_0_pct_for_deliverables_without_points(self): - """Deliverables without points should show 0% complete instead of throwing an error.""" - # setup - create test dataset where deliverable 2 has no points - test_rows = [ - task_row(deliverable=1, task=2, points=2, status="closed"), - task_row(deliverable=2, task=None, points=None, status=None), - ] - test_data = GitHubIssues.from_dict(test_rows) - # execution - use points as the unit - df = DeliverablePercentComplete(test_data, unit=Unit.points).results - df = df.set_index("deliverable_title") - # validation - check number of rows returned - assert len(df) == 2 - # validation - check totals - assert df.loc["Deliverable 1", "total"] == 2 - assert df.loc["Deliverable 2", "total"] == 0 - # validation - check open - assert df.loc["Deliverable 1", "open"] == 0 - assert df.loc["Deliverable 2", "open"] == 0 - # validation - check closed - assert df.loc["Deliverable 1", "closed"] == 2 - assert df.loc["Deliverable 2", "closed"] == 0 - # validation - check percent complete - assert df.loc["Deliverable 1", "percent_complete"] == 1.0 - assert df.loc["Deliverable 2", "percent_complete"] == 0.0 - - -class TestFilteringReportByDeliverableStatus: - """Test the metric when we limit the set of deliverable statuses to include.""" - - TEST_ROWS = [ - task_row(deliverable=1, task=1, status="closed", deliverable_status="Done"), - task_row(deliverable=2, task=2, status="closed", deliverable_status="Open"), - task_row(deliverable=2, task=3, status="open", deliverable_status="Open"), - ] - - def test_filter_out_deliverables_with_excluded_status(self): - """The results should exclude deliverables with a status that wasn't passed.""" - # setup - create test dataset - test_data = GitHubIssues.from_dict(self.TEST_ROWS) - # execution - df = DeliverablePercentComplete( - test_data, - unit=Unit.issues, - statuses_to_include=["Open"], - ).results - df = df.set_index("deliverable_title") - # validation - assert len(df) == 1 - assert "Deliverable 1" not in df.index # confirm deliverable 1 was dropped - assert df.loc["Deliverable 2", "percent_complete"] == 0.5 - - def test_invert_statuses_selected(self): - """We should filter out the other deliverable if invert statuses selected.""" - # setup - create test dataset - test_data = GitHubIssues.from_dict(self.TEST_ROWS) - # execution - df = DeliverablePercentComplete( - test_data, - unit=Unit.issues, - statuses_to_include=["Done"], # changed - ).results - df = df.set_index("deliverable_title") - # validation - assert len(df) == 1 - assert "Deliverable 2" not in df.index # confirm deliverable 2 was dropped - assert df.loc["Deliverable 1", "percent_complete"] == 1 - - def test_list_selected_statuses_in_slack_message(self): - """If we filter on status, those statuses should be listed in the slack message.""" - # setup - create test dataset - test_data = GitHubIssues.from_dict(self.TEST_ROWS) - # execution - metric = DeliverablePercentComplete( - test_data, - unit=Unit.issues, - statuses_to_include=["Open"], - ) - output = metric.format_slack_message() - # validation - expected = "Limited to deliverables with these statuses: Open" - assert expected in output - - def test_stats_also_filter_out_deliverables_with_excluded_status(self): - """Filtered deliverables should also be excluded from get_stats().""" - # setup - create test dataset - test_data = GitHubIssues.from_dict(self.TEST_ROWS) - # execution - metric = DeliverablePercentComplete( - test_data, - unit=Unit.issues, - statuses_to_include=["Open"], # exclude deliverable 1 - ) - output = metric.get_stats() - # validation - assert len(output) == 1 - assert output.get("Deliverable 1") is None - - -class TestGetStats: - """Test the DeliverablePercentComplete.get_stats() method.""" - - def test_all_issues_are_pointed(self): - """Test that stats show 100% of issues are pointed if all have points.""" - # setup - create test dataset - test_rows = [ - task_row(deliverable=1, task=1, points=2, status="open"), - task_row(deliverable=1, task=2, points=1, status="closed"), - task_row(deliverable=2, task=3, points=3, status="open"), - task_row(deliverable=2, task=3, points=1, status="open"), - ] - test_data = GitHubIssues.from_dict(test_rows) - # execution - output = DeliverablePercentComplete(test_data, unit=Unit.issues) - # validation - assert len(output.stats) == 2 - for deliverable in ["Deliverable 1", "Deliverable 2"]: - stat = output.stats.get(deliverable) - assert stat is not None - assert stat.value == 100 - assert stat.suffix == f"% of {Unit.issues.value} pointed" - - def test_some_issues_are_not_pointed(self): - """Test that stats are calculated correctly if not all issues are pointed.""" - # setup - create test dataset - test_rows = [ - task_row(deliverable=1, task=1, points=2, status="open"), - task_row(deliverable=1, task=2, points=0, status="closed"), - task_row(deliverable=2, task=3, points=3, status="open"), - task_row(deliverable=2, task=3, points=None, status="open"), - ] - test_data = GitHubIssues.from_dict(test_rows) - # execution - output = DeliverablePercentComplete(test_data, unit=Unit.issues) - # validation - assert len(output.stats) == 2 - for deliverable in ["Deliverable 1", "Deliverable 2"]: - stat = output.stats.get(deliverable) - assert stat is not None - assert stat.value == 50 - assert stat.suffix == f"% of {Unit.issues.value} pointed" - - def test_deliverables_without_tasks_have_0_pct_pointed(self): - """Deliverables without tasks should have 0% pointed in stats.""" - # setup - create test dataset - test_rows = [ - task_row(deliverable=1, task=1, points=2, status="open"), - task_row(deliverable=1, task=2, points=1, status="closed"), - task_row(deliverable=2, task=None, points=None, status=None), - ] - test_data = GitHubIssues.from_dict(test_rows) - # execution - output = DeliverablePercentComplete(test_data, unit=Unit.issues) - # validation - assert len(output.stats) == 2 - assert output.stats["Deliverable 1"].value == 100 - assert output.stats["Deliverable 2"].value == 0 - - -class TestFormatSlackMessage: - """Test the DeliverablePercentComplete.format_slack_message().""" - - def test_slack_message_contains_right_number_of_lines(self): - """Message should contain one line for the title and one for each deliverable.""" - # setup - create test dataset - test_rows = [ - task_row(deliverable=1, task=1, points=2, status="open"), - task_row(deliverable=2, task=2, points=1, status="closed"), - task_row(deliverable=3, task=3, points=3, status="open"), - ] - test_data = GitHubIssues.from_dict(test_rows) - # execution - output = DeliverablePercentComplete(test_data, unit=Unit.issues) - lines = output.format_slack_message().splitlines() - # validation - assert len(lines) == 4 - - def test_title_includes_issues_when_unit_is_issue(self): - """Test that the title is formatted correctly when unit is issues.""" - # setup - create test dataset - test_rows = [ - task_row(deliverable=1, task=1, points=2, status="open"), - task_row(deliverable=2, task=2, points=1, status=None), - ] - test_data = GitHubIssues.from_dict(test_rows) - # execution - output = DeliverablePercentComplete(test_data, unit=Unit.issues) - title = output.format_slack_message().splitlines()[0] - # validation - assert Unit.issues.value in title - - def test_title_includes_points_when_unit_is_points(self): - """Test that the title is formatted correctly when unit is points.""" - # setup - create test dataset - test_rows = [ - task_row(deliverable=1, task=1, points=2, status="open"), - task_row(deliverable=2, task=2, points=1, status=None), - ] - test_data = GitHubIssues.from_dict(test_rows) - # execution - output = DeliverablePercentComplete(test_data, unit=Unit.points) - title = output.format_slack_message().splitlines()[0] - # validation - assert Unit.points.value in title - - -class TestPlotResults: - """Test the DeliverablePercentComplete.plot_results() method.""" - - def test_plot_results_output_stored_in_chart_property(self): - """SprintBurndown.chart should contain the output of plot_results().""" - # setup - create test dataset - test_rows = [ - task_row(deliverable=1, task=1, points=2, status="open"), - task_row(deliverable=1, task=2, points=0, status="closed"), - task_row(deliverable=2, task=3, points=3, status="open"), - task_row(deliverable=2, task=3, points=None, status="open"), - ] - test_data = GitHubIssues.from_dict(test_rows) - # execution - output = DeliverablePercentComplete(test_data, unit=Unit.issues) - # validation - check that the chart attribute matches output of plot_results() - assert output.chart == output.plot_results() - - -class TestExportMethods: - """Test the export methods method for SprintBurndown.""" - - @pytest.mark.parametrize( - ("method", "file_name"), - [ - ("export_results", "RESULTS_CSV"), - ("export_chart_to_html", "CHART_HTML"), - ("export_chart_to_png", "CHART_PNG"), - ], - ) - def test_export_results_to_correct_file_path( - self, - method: str, - file_name: str, - tmp_path: Path, - percent_complete: DeliverablePercentComplete, - ): - """The file should be exported to the correct location.""" - # setup - check that file doesn't exist at output location - file_name = getattr(percent_complete, file_name) - expected_path = tmp_path / file_name - assert expected_path.parent.exists() is True - assert expected_path.exists() is False - # execution - func = getattr(percent_complete, method) - output = func(output_dir=expected_path.parent) - # validation - check that output path matches expected and file exists - assert output == expected_path - assert expected_path.exists() - - @pytest.mark.parametrize( - ("method", "file_name"), - [ - ("export_results", "RESULTS_CSV"), - ("export_chart_to_html", "CHART_HTML"), - ("export_chart_to_png", "CHART_PNG"), - ], - ) - def test_create_parent_dir_if_it_does_not_exists( - self, - method: str, - file_name: str, - tmp_path: Path, - percent_complete: DeliverablePercentComplete, - ): - """The parent directory should be created if it doesn't already exist.""" - # setup - check that file and parent directory don't exist - file_name = getattr(percent_complete, file_name) - expected_path = tmp_path / "new_folder" / file_name - assert expected_path.parent.exists() is False # doesn't yet exist - assert expected_path.exists() is False - # execution - func = getattr(percent_complete, method) - output = func(output_dir=expected_path.parent) - # validation - check that output path matches expected and file exists - assert output == expected_path - assert expected_path.exists() - - -def test_post_to_slack( - mock_slackbot: MockSlackbot, - tmp_path: Path, - percent_complete: DeliverablePercentComplete, -): - """Test the steps required to post the results to slack, without actually posting.""" - # execution - percent_complete.post_results_to_slack( - mock_slackbot, # type: ignore noqa: PGH003 - channel_id="test_channel", - output_dir=tmp_path, - ) - # validation - check that output files exist - for output in ["RESULTS_CSV", "CHART_PNG", "CHART_HTML"]: - output_path = tmp_path / getattr(percent_complete, output) - assert output_path.exists() is True diff --git a/analytics/tests/test_cli.py b/analytics/tests/test_cli.py index 28f299bf8..863739a69 100644 --- a/analytics/tests/test_cli.py +++ b/analytics/tests/test_cli.py @@ -58,222 +58,6 @@ def test_file_fixtures(tmp_path: Path) -> MockFiles: ) -class TestCalculateSprintBurndown: - """Test the calculate_sprint_burndown entry point with mock data.""" - - def test_without_showing_or_posting_results(self, mock_files: MockFiles): - """Entrypoint should run successfully but not print slack message to stdout.""" - # setup - create command - command = [ - "calculate", - "sprint_burndown", - "--issue-file", - str(mock_files.delivery_file), - "--sprint", - "Sprint 1", - "--project", - "1", - ] - # execution - result = runner.invoke(app, command) - print(result.stdout) - # validation - check there wasn't an error - assert result.exit_code == 0 - assert "Slack message" not in result.stdout - - def test_stdout_message_includes_points_if_no_unit_is_set( - self, - mock_files: MockFiles, - ): - """CLI should uses 'points' as default unit and include it in stdout message.""" - # setup - create command - command = [ - "calculate", - "sprint_burndown", - "--issue-file", - str(mock_files.delivery_file), - "--sprint", - "Sprint 1", - "--project", - "1", - "--show-results", - ] - # execution - result = runner.invoke(app, command) - print(result.stdout) - # validation - check there wasn't an error - assert result.exit_code == 0 - # validation - check that slack message is printed and includes 'points' - assert "Slack message" in result.stdout - assert "points" in result.stdout - - def test_stdout_message_includes_issues_if_unit_set_to_issues( - self, - mock_files: MockFiles, - ): - """CLI should use issues if set explicitly and include it in stdout.""" - # setup - create command - command = [ - "calculate", - "sprint_burndown", - "--issue-file", - str(mock_files.delivery_file), - "--sprint", - "Sprint 1", - "--project", - "1", - "--unit", - "issues", - "--show-results", - ] - # execution - result = runner.invoke(app, command) - print(result.stdout) - # validation - check there wasn't an error - assert result.exit_code == 0 - # validation - check that slack message is printed and includes 'points' - assert "Slack message" in result.stdout - assert "issues" in result.stdout - - -class TestCalculateSprintBurnup: - """Test the calculate_sprint_burnup entry point with mock data.""" - - def test_without_showing_or_posting_results(self, mock_files: MockFiles): - """Entrypoint should run successfully but not print slack message to stdout.""" - # setup - create command - command = [ - "calculate", - "sprint_burnup", - "--issue-file", - str(mock_files.delivery_file), - "--sprint", - "Sprint 1", - ] - # execution - result = runner.invoke(app, command) - print(result.stdout) - # validation - check there wasn't an error - assert result.exit_code == 0 - assert "Slack message" not in result.stdout - - def test_stdout_message_includes_points_if_no_unit_is_set( - self, - mock_files: MockFiles, - ): - """CLI should uses 'points' as default unit and include it in stdout message.""" - # setup - create command - command = [ - "calculate", - "sprint_burnup", - "--issue-file", - str(mock_files.delivery_file), - "--sprint", - "Sprint 1", - "--show-results", - ] - # execution - result = runner.invoke(app, command) - print(result.stdout) - # validation - check there wasn't an error - assert result.exit_code == 0 - # validation - check that slack message is printed and includes 'points' - assert "Slack message" in result.stdout - assert "points" in result.stdout - - def test_stdout_message_includes_issues_if_unit_set_to_issues( - self, - mock_files: MockFiles, - ): - """CLI should use issues if set explicitly and include it in stdout.""" - # setup - create command - command = [ - "calculate", - "sprint_burnup", - "--issue-file", - str(mock_files.delivery_file), - "--sprint", - "Sprint 1", - "--unit", - "issues", - "--show-results", - ] - # execution - result = runner.invoke(app, command) - print(result.stdout) - # validation - check there wasn't an error - assert result.exit_code == 0 - # validation - check that slack message is printed and includes 'points' - assert "Slack message" in result.stdout - assert "issues" in result.stdout - - -class TestCalculateDeliverablePercentComplete: - """Test the calculate_deliverable_percent_complete entry point with mock data.""" - - def test_calculate_deliverable_percent_complete(self, mock_files: MockFiles): - """Entrypoint should run successfully but not print slack message to stdout.""" - # setup - create command - command = [ - "calculate", - "deliverable_percent_complete", - "--issue-file", - str(mock_files.delivery_file), - ] - # execution - result = runner.invoke(app, command) - print(result.stdout) - # validation - check there wasn't an error - assert result.exit_code == 0 - assert "Slack message" not in result.stdout - - def test_stdout_message_includes_points_if_no_unit_is_set( - self, - mock_files: MockFiles, - ): - """CLI should uses 'points' as default unit and include it in stdout message.""" - # setup - create command - command = [ - "calculate", - "deliverable_percent_complete", - "--issue-file", - str(mock_files.delivery_file), - "--show-results", - ] - # execution - result = runner.invoke(app, command) - print(result.stdout) - # validation - check there wasn't an error - assert result.exit_code == 0 - # validation - check that slack message is printed and includes 'points' - assert "Slack message" in result.stdout - assert "points" in result.stdout - - def test_stdout_message_includes_issues_if_unit_set_to_issues( - self, - mock_files: MockFiles, - ): - """CLI should use issues if set explicitly and include it in stdout.""" - # setup - create command - command = [ - "calculate", - "deliverable_percent_complete", - "--issue-file", - str(mock_files.delivery_file), - "--unit", - "issues", - "--show-results", - ] - # execution - result = runner.invoke(app, command) - print(result.stdout) - # validation - check there wasn't an error - assert result.exit_code == 0 - # validation - check that slack message is printed and includes 'points' - assert "Slack message" in result.stdout - assert "issues" in result.stdout - - class TestEtlEntryPoint: """Test the etl entry point."""