Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weโ€™ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge Dev to SCD-38 #23

Merged
merged 12 commits into from
Mar 21, 2023
37 changes: 37 additions & 0 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# ๐Ÿ‘ฎ Pull Request Checklist

## ๐Ÿ“ Description

- This PR contains the code to create a PR checklist that will help the team in identifying a few common points that needs to be checked while raising a PR.

## ๐ŸŽซ Associated JIRA tickets

- [SCD-18](https://antstack-datamesh.atlassian.net/browse/SCD-18)
- A screenshot followed by the description of the ticket. This ticket deals with creating a common PR checklist template which will be improved continuously to help the developer.
- Any additional information for testing the ticket.

## โš”๏ธ Mandatory Checks

- [ ] Is the functionality working as expected?
- [ ] Are the comments are clear and useful, and mostly explain why instead of what?
- [ ] Does it match the required coding standards?
- [ ] Is the performance up to the mark?

## ๐Ÿ› ๏ธ Regular Checks

- [ ] Spell Checks.

## ๐Ÿ—ก๏ธ Optional Checks

- [ ] Schema Changes

## ๐Ÿคณ PR Self-Review

- **DRY**: Don't repeat yourself
- **YAGNI**: You aren't gonna need it. Make sure you're not over-complicating something just to try & make it more future-proof ๐Ÿ™…โ€โ™‚๏ธ
- P.S. According to Fowler, "Yagni only applies to capabilities built into the software to support a presumptive feature, it does not apply to effort to make the software easier to modify."
- A link to some of the [best practices](https://github.com/andela/bestpractices/wiki/Git-naming-conventions-and-best-practices) for creating a PR

<br>

> ## โš ๏ธ Wait for PR status checks to complete before approving PR
2 changes: 1 addition & 1 deletion pipeline/01_conference_data_source_to_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,4 @@

# COMMAND ----------

# MAGIC %sql

74 changes: 74 additions & 0 deletions pipeline/02_conference_data_raw_to_standardized.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Databricks notebook source
from src.main import convert_date_object
from pyspark.sql.functions import (
lit,
current_timestamp,
input_file_name,
from_utc_timestamp,
to_date,
when,
DataFrame,
current_date,
col,
cast,
)

# COMMAND ----------

# MAGIC %md
# MAGIC ### Pull Raw layer data

# COMMAND ----------

# MAGIC %md
# MAGIC #### Event Table

# COMMAND ----------

def read_data_from_raw(schema:str, table_name: str) -> DataFrame:
df = spark.read.table(f"{schema}.{table_name}")
df = df.filter(df.is_processed == 'false')
df.printSchema()
return df

# COMMAND ----------

event_df = read_data_from_raw('conference_raw', 'event')

# COMMAND ----------

# MAGIC %md
# MAGIC #### Session

# COMMAND ----------

session_df = read_data_from_raw('conference_raw', 'session')

# COMMAND ----------

# MAGIC %md
# MAGIC #### In person attendee

# COMMAND ----------

inperson_attendee_df = read_data_from_raw('conference_raw', 'in_person_attendee')

# COMMAND ----------

# MAGIC %md
# MAGIC #### Virtual attendee

# COMMAND ----------

virtual_attendee_df = read_data_from_raw('conference_raw', 'virtual_attendee')

# COMMAND ----------

# MAGIC %md
# MAGIC #### Poll question

# COMMAND ----------

polling_questions_df = read_data_from_raw('conference_raw', 'polling_questions')

# COMMAND ----------
117 changes: 117 additions & 0 deletions pipeline/03_conference_data_gx_cloud.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Databricks notebook source
import os
import great_expectations as gx

os.environ["GX_CLOUD_BASE_URL"] = "https://api.greatexpectations.io"

# COMMAND ----------

context = gx.get_context()

# COMMAND ----------

attendee_dim_df = spark.read.table('conference_refined.event_registrant_dim')
attendee_dim_df = attendee_dim_df.filter(attendee_dim_df.create_date == '2023-03-17')

# COMMAND ----------

datasource_yaml = """
name: conference_refined_attendee_dim
class_name: Datasource
execution_engine:
class_name: SparkDFExecutionEngine
data_connectors:
default_runtime_data_connector:
class_name: RuntimeDataConnector
batch_identifiers:
- attendee_dim_batch_idf
"""

# Test your configuration (Optional):
datasource: Datasource = context.test_yaml_config(datasource_yaml)

# Save your datasource:
datasource = context.save_datasource(datasource)

# Confirm the datasource has been saved (Optional):
existing_datasource = context.get_datasource(datasource_name=datasource.name)
print(existing_datasource.config)


# COMMAND ----------

expectation_suite = context.create_expectation_suite(
expectation_suite_name="conference_attendee_dim_ex_suite"
)


# COMMAND ----------

expectation_configuration = gx.core.ExpectationConfiguration(**{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": 'state',
},
"meta":{}
})
expectation_suite.add_expectation(expectation_configuration)


# COMMAND ----------

expectation_configuration = gx.core.ExpectationConfiguration(**{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": 'email_address',
},
"meta":{}
})
expectation_suite.add_expectation(expectation_configuration)


context.save_expectation_suite(expectation_suite)

# COMMAND ----------

checkpoint_name = "conference_attendee_dim_ex_checkpoint"
config_version = 1
class_name = "Checkpoint"

# First, fetch the ExpectationSuite we will use to define a Validation
expectation_suite = context.get_expectation_suite(expectation_suite_name="conference_attendee_dim_ex_suite")

validations = [{
"expectation_suite_name": expectation_suite.name,
"expectation_suite_ge_cloud_id": expectation_suite.ge_cloud_id,
"batch_request": {
"datasource_name": "conference_refined_attendee_dim",
"data_connector_name": "default_runtime_data_connector",
"data_asset_name": "conference_attende_dim_data_asset",
},
}]

# NOTE: To update an existing Checkpoint, you must include the Checkpoint's ge_cloud_id as a named argument.
checkpoint = context.add_or_update_checkpoint(
name=checkpoint_name,
config_version=config_version,
class_name=class_name,
validations=validations,
)

# Confirm the Checkpoint has been saved:
checkpoint_id = str(checkpoint.ge_cloud_id)
checkpoint = context.get_checkpoint(ge_cloud_id=checkpoint_id)
print(checkpoint)

# COMMAND ----------

batch_request = {
"runtime_parameters": {
"batch_data": attendee_dim_df
},
"batch_identifiers": {
"attendee_dim_batch_idf": "initial_data"
}
}
context.run_checkpoint(ge_cloud_id=checkpoint.ge_cloud_id, batch_request=batch_request)

2 changes: 0 additions & 2 deletions pipeline/03_conference_data_snowflake.py

This file was deleted.

36 changes: 36 additions & 0 deletions pipeline/03_conference_date_run_checkpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Databricks notebook source
import os
import great_expectations as gx
from pyspark.sql.functions import current_date

os.environ["GX_CLOUD_BASE_URL"] = "https://api.greatexpectations.io"

# COMMAND ----------

context = gx.get_context()

# COMMAND ----------

attendee_dim_df = spark.read.table('conference_refined.event_registrant_dim')
attendee_dim_df = attendee_dim_df.filter(attendee_dim_df.create_date == current_date())

# COMMAND ----------

checkpoint = context.get_checkpoint(name='conference_attendee_dim_ex_checkpoint')
print(checkpoint)

# COMMAND ----------

batch_request = {
"runtime_parameters": {
"batch_data": attendee_dim_df
},
"batch_identifiers": {
"attendee_dim_batch_idf": "initial_data"
}
}
context.run_checkpoint(ge_cloud_id=checkpoint.ge_cloud_id, batch_request=batch_request)

# COMMAND ----------


4 changes: 2 additions & 2 deletions src/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@
StructField("job_role", StringType(), True),
StructField("state", StringType(), True),
StructField("session_title", StringType(), True),
StructField("login_time", FloatType(), True),
StructField("logout_time", FloatType(), True),
StructField("login_time", StringType(), True),
StructField("logout_time", StringType(), True),
]
)
poll_question_schema = StructType(
Expand Down