Skip to content

Commit

Permalink
Refactor functions to utils
Browse files Browse the repository at this point in the history
  • Loading branch information
rdgfuentes committed Sep 18, 2024
1 parent 35be033 commit a93cffe
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 27 deletions.
4 changes: 4 additions & 0 deletions packages/libs/utils/utils/convert.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
def list_to_dict(labels):
return {x.split("=")[0]: x.split("=")[1] for x in labels}


def to_float(value):
return float(value) if value is not None else None

Expand Down
6 changes: 5 additions & 1 deletion packages/libs/utils/utils/dates.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone


def prev_month_from_YYYYMMDD(dt_str):
Expand All @@ -12,3 +12,7 @@ def prev_month_from_YYYYMMDD(dt_str):
dt = datetime.strptime(dt_str, "%Y%m%d")
dt = dt - timedelta(days=1)
return datetime.strftime(dt.replace(day=1), "%Y%m%d")


def parse_yyyy_mm_dd_param(value, tzinfo=timezone.utc):
return datetime.strptime(value, "%Y-%m-%d").replace(tzinfo=tzinfo)
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import datetime as dt

import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from bigquery.table import clear_records, ensure_table_exists
from common.transforms.pick_output_fields import PickOutputFields
from utils.convert import list_to_dict
from utils.dates import parse_yyyy_mm_dd_param
from vms_ingestion.ingestion.excel_to_bq.feed_ingestion_factory import (
FeedIngestionFactory,
)
Expand All @@ -22,14 +22,6 @@
)


def parse_yyyy_mm_dd_param(value):
return dt.datetime.strptime(value, "%Y-%m-%d")


def list_to_dict(labels):
return {x.split("=")[0]: x.split("=")[1] for x in labels}


class IngestionExcelToBQPipeline:
def __init__(self, options):
self.pipeline = beam.Pipeline(options=options)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import datetime as dt

import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from bigquery.table import clear_records, ensure_table_exists
from common.transforms.pick_output_fields import PickOutputFields
from utils.convert import list_to_dict
from utils.dates import parse_yyyy_mm_dd_param
from vms_ingestion.normalization.feed_normalization_factory import (
FeedNormalizationFactory,
)
Expand All @@ -22,14 +22,6 @@
from vms_ingestion.options import CommonPipelineOptions


def parse_yyyy_mm_dd_param(value):
return dt.datetime.strptime(value, "%Y-%m-%d")


def list_to_dict(labels):
return {x.split("=")[0]: x.split("=")[1] for x in labels}


class NormalizationPipeline:
def __init__(self, options):
self.pipeline = beam.Pipeline(options=options)
Expand All @@ -41,12 +33,8 @@ def __init__(self, options):
self.source = params.source
self.source_timestamp_field = params.source_timestamp_field
self.destination = params.destination
self.start_date = parse_yyyy_mm_dd_param(params.start_date).replace(
tzinfo=dt.timezone.utc
)
self.end_date = parse_yyyy_mm_dd_param(params.end_date).replace(
tzinfo=dt.timezone.utc
)
self.start_date = parse_yyyy_mm_dd_param(params.start_date)
self.end_date = parse_yyyy_mm_dd_param(params.end_date)
self.labels = list_to_dict(gCloudParams.labels)

self.table_schema = table_schema()
Expand Down

0 comments on commit a93cffe

Please sign in to comment.