Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Analytics api #73

Merged
merged 17 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 182 additions & 0 deletions gdrive/analytics_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
"""
Google Analytics Rest API
"""

from datetime import datetime
import logging

import fastapi
from pydantic import BaseModel
from fastapi import BackgroundTasks, responses
import pandas as pd

from gdrive import error, settings, analytics_client, sheets_client, drive_client

log = logging.getLogger(__name__)
router = fastapi.APIRouter()


class AnalyticsRequest(BaseModel):
startDate: str = None
endDate: str = None


@router.post("/analytics")
nathan-moore-97 marked this conversation as resolved.
Show resolved Hide resolved
async def run_analytics(background_tasks: BackgroundTasks):
if settings.ANALYTICS:
nathan-moore-97 marked this conversation as resolved.
Show resolved Hide resolved
background_tasks.add_task(run_analytics_task, datetime.today(), None)
return responses.JSONResponse(
status_code=202,
content="Analytics request for %s is being processed."
nathan-moore-97 marked this conversation as resolved.
Show resolved Hide resolved
% (datetime.date(datetime.today())),
)

else:
return responses.JSONResponse(
status_code=409,
content="Request is good, however the client has requested a resource that is unavailable at this time.",
)


@router.post("/analytics/daterange")
nathan-moore-97 marked this conversation as resolved.
Show resolved Hide resolved
async def run_analytics(background_tasks: BackgroundTasks, req: AnalyticsRequest):
try:
date_format = "%Y-%m-%d"
nathan-moore-97 marked this conversation as resolved.
Show resolved Hide resolved
start_date = datetime.strptime(req.startDate, date_format)
end_date = datetime.strptime(req.endDate, date_format)
except ValueError as _:
return responses.JSONResponse(
nathan-moore-97 marked this conversation as resolved.
Show resolved Hide resolved
status_code=400,
content="Failed (invalid date parameters): %s, %s"
% (req.start_date, req.end_date),
)

if settings.ANALYTICS:
background_tasks.add_task(run_analytics_task, start_date, end_date)
return responses.JSONResponse(
status_code=202,
content="Analytics request for %s - %s is being processed."
% (datetime.date(start_date), datetime.date(end_date)),
)
else:
return responses.JSONResponse(
status_code=409,
content="Request is good, however the client has requested a resource that is unavailable at this time.",
)


@router.post("/analytics/list")
async def list_accounts(backgroud_tasks: BackgroundTasks):
nathan-moore-97 marked this conversation as resolved.
Show resolved Hide resolved
if settings.ANALYTICS:
backgroud_tasks.add_task(list_accounts_task)
return responses.JSONResponse(
status_code=202, content="List request is being processed."
)
else:
return responses.JSONResponse(
status_code=409,
content="Request is good, however the client has requested a resource that is unavailable at this time.",
)


async def run_analytics_task(start_date: datetime, end_date: datetime):
try:
analytics_df = analytics_client.download(
settings.ANALYTICS_PROPERTY_ID, start_date, end_date
)
sheets_id = export(analytics_df, start_date, end_date)
do_analytics_export_post_processing(analytics_df, sheets_id=sheets_id)
nathan-moore-97 marked this conversation as resolved.
Show resolved Hide resolved
except Exception as e:
log.error(e)


async def list_accounts_task():
try:
list_response = analytics_client.list()
if list_response is not None:
log.info("-------------------------------")
for act in list_response.accounts:
log.info("Name:\t\t%s" % (act.name))
log.info("Display name:\t%s" % (act.display_name))
log.info("-------------------------------")
else:
log.warn(
"List response was none. Ensure credentials are set correctly"
+ " and you have access to the cloud property."
)
except Exception as e:
log.error(e.args)


def export(
df: pd.DataFrame, date_of_report: datetime, end_date: datetime = None
) -> str:
"""
Transform the downloaded response from the google analytics API into a
Google Sheets Object.

This function first touches a Google Sheets object with the drive API, then
writes the analytics data to that object. As of right now there is no way to do
this in one API transaction.

Args:
df (pandas.DataFrame): Tabular data to export to Google Sheets object
date_of_report (datetime): Date the report was run
Returns:
str: Google Sheets ID of the new Sheets object
"""
filename_str = get_filename(date_of_report, end_date)
analytics_folder_id = drive_client.create_folder(
"Google Analytics", parent_id=settings.ANALYTICS_ROOT
)

# We have to do this in multiple steps with more than one client because the Sheets API
# doesnt support opening a file in a given directory.
sheets_id = drive_client.create_empty_spreadsheet(filename_str, analytics_folder_id)
log.info("Uploading to folder %s (%s)" % ("Google Analytics", analytics_folder_id))
result = sheets_client.export_df_to_gdrive_speadsheet(df, sheets_id)
log.info(
"Successfully created %s (%s)" % (filename_str, result.get("spreadsheetId"))
)
return sheets_id


def do_analytics_export_post_processing(df: pd.DataFrame, sheets_id: str):
"""
Add new pages and pivot tables.

This function is fairly naive and inefficient. If we ever want to make Google Sheets
more often than once a day, we should refactor this to limit the number of API transactions.

Args:
df (pandas.DataFrame): Tabular data in the spreadsheet
sheets_id (str): Google Sheets object ID
"""

page1 = "Rekrewt Pivot Table - First Visit"
page2 = "Rekrewt Pivot Table - Sessions"
page3 = "GSA Use Pivot Table"
page4 = "Completions"

new_sheet_name_to_id = sheets_client.add_new_pages(
[page1, page2, page3, page4], sheets_id
)
log.info("Added %s pages to %s" % (len(new_sheet_name_to_id.keys()), sheets_id))
sheets_client.do_create_pivot_tables(
df, (page1, page2, page3, page4), new_sheet_name_to_id, sheets_id
)


def get_filename(date: datetime, end_date: datetime = None):
nathan-moore-97 marked this conversation as resolved.
Show resolved Hide resolved
"""
Return filename for the new spreadsheet to be saved as

Args:
date (datetime): date to format
Return:
str: Formatted Date
"""
ret = date.strftime("%Y%m%d")
if end_date is not None and end_date != date:
ret += "-%s" % (end_date.strftime("%Y%m%d"))
return ret
109 changes: 109 additions & 0 deletions gdrive/analytics_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import datetime

from google.oauth2 import service_account
from google.analytics.admin import AnalyticsAdminServiceClient
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import (
DateRange,
Dimension,
Metric,
RunReportRequest,
)

import logging
import pandas as pd

from gdrive import settings

log = logging.getLogger(__name__)

creds = service_account.Credentials.from_service_account_info(settings.CREDENTIALS)

"""
Client for the Google Analytics (GA4) API

This class contains functions relating to downloading analytics data
for the IDVA flow.
"""


def download(
property_id, target_date: datetime, end_date: datetime = None
) -> pd.DataFrame:
"""
Access Google Analytics (GA4) api and download desired analytics report.
"""
if end_date is None:
end_date = target_date

request = RunReportRequest(
property=f"properties/{property_id}",
limit="250",
# https://developers.google.com/analytics/devguides/reporting/data/v1/api-schema
dimensions=[
Dimension(name="eventName"),
Dimension(name="firstUserCampaignName"),
Dimension(name="firstUserMedium"),
Dimension(name="firstUserSource"),
Dimension(name="isConversionEvent"),
Dimension(name="linkUrl"),
],
metrics=[
Metric(name="eventCount"),
Metric(name="sessions"),
Metric(name="totalUsers"),
Metric(name="eventCountPerUser"),
Metric(name="conversions"),
],
date_ranges=[
DateRange(
start_date=format_date_for_api(target_date),
end_date=format_date_for_api(end_date),
)
],
)

return create_df_from_analytics_response(
nathan-moore-97 marked this conversation as resolved.
Show resolved Hide resolved
BetaAnalyticsDataClient(credentials=creds).run_report(request)
)


def list():
"""
List the available properties the user has access to. Can be run to
verify setup of the enviornment is correct.
"""
client = AnalyticsAdminServiceClient(credentials=creds)
return client.list_accounts()


def format_date_for_api(date: datetime):
"""
Formats datetime object for Google Analytics Api (GA4) input
"""
return date.strftime("%Y-%m-%d")
nathan-moore-97 marked this conversation as resolved.
Show resolved Hide resolved


def create_df_from_analytics_response(response):
"""
Extracts values from Google Analytics API response and transforms
them into pandas DataFrame for ease of use. This enables the analytics
client to do any processing of the data desired, if something comes up in
the future we want to do but isnt supported in GA4.
"""
all_headers = []
for _, header in enumerate(response.dimension_headers):
all_headers += [header.name]
for _, header in enumerate(response.metric_headers):
all_headers += [header.name]

arr = [all_headers]
for _, row in enumerate(response.rows):
row_li = []
for _, val in enumerate(row.dimension_values):
row_li += [val.value]
for _, val in enumerate(row.metric_values):
row_li += [val.value]
arr += [row_li]

return pd.DataFrame(arr)
16 changes: 9 additions & 7 deletions gdrive/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
from googleapiclient.http import HttpError
from starlette.requests import Request

from . import client, settings
from . import drive_client, settings

log = logging.getLogger(__name__)

router = fastapi.APIRouter()

client.init()
drive_client.init()


# Patch zip decodeExtra to ignore invalid extra data
Expand Down Expand Up @@ -50,16 +50,18 @@ async def upload_file(

stream = io.BytesIO(body)

parent = client.create_folder(id, settings.ROOT_DIRECTORY)
parent = drive_client.create_folder(id, settings.ROOT_DIRECTORY)

if zip:
with zipfile.ZipFile(stream) as archive:
files = archive.filelist
for file in files:
image = io.BytesIO(archive.read(file))
client.upload_basic(f"{filename}_{file.filename}", parent, image)
drive_client.upload_basic(
f"{filename}_{file.filename}", parent, image
)
else:
client.upload_basic(filename, parent, stream)
drive_client.upload_basic(filename, parent, stream)

except HttpError as error:
log.error(f"An error occurred: {error}")
Expand All @@ -73,10 +75,10 @@ async def delete_file(filename, response: Response):
"""

try:
files = client.get_files(filename)
files = drive_client.get_files(filename)
if files:
for file in files:
client.delete_file(file["id"])
drive_client.delete_file(file["id"])
else:
response.status_code = status.HTTP_404_NOT_FOUND

Expand Down
Loading
Loading