Skip to content

Commit

Permalink
Merge pull request #23 from antstackio/dev
Browse files Browse the repository at this point in the history
Merge Dev to SCD-38
  • Loading branch information
sleepy0owl authored Mar 21, 2023
2 parents ef747a8 + ebbb3b7 commit c071163
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 5 deletions.
37 changes: 37 additions & 0 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# 👮 Pull Request Checklist

## 📝 Description

- This PR contains the code to create a PR checklist that will help the team in identifying a few common points that needs to be checked while raising a PR.

## 🎫 Associated JIRA tickets

- [SCD-18](https://antstack-datamesh.atlassian.net/browse/SCD-18)
- A screenshot followed by the description of the ticket. This ticket deals with creating a common PR checklist template which will be improved continuously to help the developer.
- Any additional information for testing the ticket.

## ⚔️ Mandatory Checks

- [ ] Is the functionality working as expected?
- [ ] Are the comments are clear and useful, and mostly explain why instead of what?
- [ ] Does it match the required coding standards?
- [ ] Is the performance up to the mark?

## 🛠️ Regular Checks

- [ ] Spell Checks.

## 🗡️ Optional Checks

- [ ] Schema Changes

## 🤳 PR Self-Review

- **DRY**: Don't repeat yourself
- **YAGNI**: You aren't gonna need it. Make sure you're not over-complicating something just to try & make it more future-proof 🙅‍♂️
- P.S. According to Fowler, "Yagni only applies to capabilities built into the software to support a presumptive feature, it does not apply to effort to make the software easier to modify."
- A link to some of the [best practices](https://github.com/andela/bestpractices/wiki/Git-naming-conventions-and-best-practices) for creating a PR

<br>

> ## ⚠️ Wait for PR status checks to complete before approving PR
2 changes: 1 addition & 1 deletion pipeline/01_conference_data_source_to_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,4 @@

# COMMAND ----------

# MAGIC %sql

74 changes: 74 additions & 0 deletions pipeline/02_conference_data_raw_to_standardized.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Databricks notebook source
from src.main import convert_date_object
from pyspark.sql.functions import (
lit,
current_timestamp,
input_file_name,
from_utc_timestamp,
to_date,
when,
DataFrame,
current_date,
col,
cast,
)

# COMMAND ----------

# MAGIC %md
# MAGIC ### Pull Raw layer data

# COMMAND ----------

# MAGIC %md
# MAGIC #### Event Table

# COMMAND ----------

def read_data_from_raw(schema:str, table_name: str) -> DataFrame:
df = spark.read.table(f"{schema}.{table_name}")
df = df.filter(df.is_processed == 'false')
df.printSchema()
return df

# COMMAND ----------

event_df = read_data_from_raw('conference_raw', 'event')

# COMMAND ----------

# MAGIC %md
# MAGIC #### Session

# COMMAND ----------

session_df = read_data_from_raw('conference_raw', 'session')

# COMMAND ----------

# MAGIC %md
# MAGIC #### In person attendee

# COMMAND ----------

inperson_attendee_df = read_data_from_raw('conference_raw', 'in_person_attendee')

# COMMAND ----------

# MAGIC %md
# MAGIC #### Virtual attendee

# COMMAND ----------

virtual_attendee_df = read_data_from_raw('conference_raw', 'virtual_attendee')

# COMMAND ----------

# MAGIC %md
# MAGIC #### Poll question

# COMMAND ----------

polling_questions_df = read_data_from_raw('conference_raw', 'polling_questions')

# COMMAND ----------
117 changes: 117 additions & 0 deletions pipeline/03_conference_data_gx_cloud.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Databricks notebook source
import os
import great_expectations as gx

os.environ["GX_CLOUD_BASE_URL"] = "https://api.greatexpectations.io"

# COMMAND ----------

context = gx.get_context()

# COMMAND ----------

attendee_dim_df = spark.read.table('conference_refined.event_registrant_dim')
attendee_dim_df = attendee_dim_df.filter(attendee_dim_df.create_date == '2023-03-17')

# COMMAND ----------

datasource_yaml = """
name: conference_refined_attendee_dim
class_name: Datasource
execution_engine:
class_name: SparkDFExecutionEngine
data_connectors:
default_runtime_data_connector:
class_name: RuntimeDataConnector
batch_identifiers:
- attendee_dim_batch_idf
"""

# Test your configuration (Optional):
datasource: Datasource = context.test_yaml_config(datasource_yaml)

# Save your datasource:
datasource = context.save_datasource(datasource)

# Confirm the datasource has been saved (Optional):
existing_datasource = context.get_datasource(datasource_name=datasource.name)
print(existing_datasource.config)


# COMMAND ----------

expectation_suite = context.create_expectation_suite(
expectation_suite_name="conference_attendee_dim_ex_suite"
)


# COMMAND ----------

expectation_configuration = gx.core.ExpectationConfiguration(**{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": 'state',
},
"meta":{}
})
expectation_suite.add_expectation(expectation_configuration)


# COMMAND ----------

expectation_configuration = gx.core.ExpectationConfiguration(**{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": 'email_address',
},
"meta":{}
})
expectation_suite.add_expectation(expectation_configuration)


context.save_expectation_suite(expectation_suite)

# COMMAND ----------

checkpoint_name = "conference_attendee_dim_ex_checkpoint"
config_version = 1
class_name = "Checkpoint"

# First, fetch the ExpectationSuite we will use to define a Validation
expectation_suite = context.get_expectation_suite(expectation_suite_name="conference_attendee_dim_ex_suite")

validations = [{
"expectation_suite_name": expectation_suite.name,
"expectation_suite_ge_cloud_id": expectation_suite.ge_cloud_id,
"batch_request": {
"datasource_name": "conference_refined_attendee_dim",
"data_connector_name": "default_runtime_data_connector",
"data_asset_name": "conference_attende_dim_data_asset",
},
}]

# NOTE: To update an existing Checkpoint, you must include the Checkpoint's ge_cloud_id as a named argument.
checkpoint = context.add_or_update_checkpoint(
name=checkpoint_name,
config_version=config_version,
class_name=class_name,
validations=validations,
)

# Confirm the Checkpoint has been saved:
checkpoint_id = str(checkpoint.ge_cloud_id)
checkpoint = context.get_checkpoint(ge_cloud_id=checkpoint_id)
print(checkpoint)

# COMMAND ----------

batch_request = {
"runtime_parameters": {
"batch_data": attendee_dim_df
},
"batch_identifiers": {
"attendee_dim_batch_idf": "initial_data"
}
}
context.run_checkpoint(ge_cloud_id=checkpoint.ge_cloud_id, batch_request=batch_request)

2 changes: 0 additions & 2 deletions pipeline/03_conference_data_snowflake.py

This file was deleted.

36 changes: 36 additions & 0 deletions pipeline/03_conference_date_run_checkpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Databricks notebook source
import os
import great_expectations as gx
from pyspark.sql.functions import current_date

os.environ["GX_CLOUD_BASE_URL"] = "https://api.greatexpectations.io"

# COMMAND ----------

context = gx.get_context()

# COMMAND ----------

attendee_dim_df = spark.read.table('conference_refined.event_registrant_dim')
attendee_dim_df = attendee_dim_df.filter(attendee_dim_df.create_date == current_date())

# COMMAND ----------

checkpoint = context.get_checkpoint(name='conference_attendee_dim_ex_checkpoint')
print(checkpoint)

# COMMAND ----------

batch_request = {
"runtime_parameters": {
"batch_data": attendee_dim_df
},
"batch_identifiers": {
"attendee_dim_batch_idf": "initial_data"
}
}
context.run_checkpoint(ge_cloud_id=checkpoint.ge_cloud_id, batch_request=batch_request)

# COMMAND ----------


4 changes: 2 additions & 2 deletions src/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@
StructField("job_role", StringType(), True),
StructField("state", StringType(), True),
StructField("session_title", StringType(), True),
StructField("login_time", FloatType(), True),
StructField("logout_time", FloatType(), True),
StructField("login_time", StringType(), True),
StructField("logout_time", StringType(), True),
]
)
poll_question_schema = StructType(
Expand Down

0 comments on commit c071163

Please sign in to comment.