diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 0000000..316865b --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,37 @@ +# 👮 Pull Request Checklist + +## 📝 Description + +- This PR contains the code to create a PR checklist that will help the team in identifying a few common points that needs to be checked while raising a PR. + +## 🎫 Associated JIRA tickets + +- [SCD-18](https://antstack-datamesh.atlassian.net/browse/SCD-18) + - A screenshot followed by the description of the ticket. This ticket deals with creating a common PR checklist template which will be improved continuously to help the developer. + - Any additional information for testing the ticket. + +## ⚔️ Mandatory Checks + +- [ ] Is the functionality working as expected? +- [ ] Are the comments are clear and useful, and mostly explain why instead of what? +- [ ] Does it match the required coding standards? +- [ ] Is the performance up to the mark? + +## 🛠️ Regular Checks + +- [ ] Spell Checks. + +## 🗡️ Optional Checks + +- [ ] Schema Changes + +## 🤳 PR Self-Review + +- **DRY**: Don't repeat yourself +- **YAGNI**: You aren't gonna need it. Make sure you're not over-complicating something just to try & make it more future-proof 🙅‍♂️ + - P.S. According to Fowler, "Yagni only applies to capabilities built into the software to support a presumptive feature, it does not apply to effort to make the software easier to modify." +- A link to some of the [best practices](https://github.com/andela/bestpractices/wiki/Git-naming-conventions-and-best-practices) for creating a PR + +
+ +> ## ⚠️ Wait for PR status checks to complete before approving PR diff --git a/pipeline/01_conference_data_source_to_raw.py b/pipeline/01_conference_data_source_to_raw.py index e290bd8..c20dc9f 100644 --- a/pipeline/01_conference_data_source_to_raw.py +++ b/pipeline/01_conference_data_source_to_raw.py @@ -193,4 +193,4 @@ # COMMAND ---------- -# MAGIC %sql + diff --git a/pipeline/02_conference_data_raw_to_standardized.py b/pipeline/02_conference_data_raw_to_standardized.py new file mode 100644 index 0000000..7af5dc1 --- /dev/null +++ b/pipeline/02_conference_data_raw_to_standardized.py @@ -0,0 +1,74 @@ +# Databricks notebook source +from src.main import convert_date_object +from pyspark.sql.functions import ( + lit, + current_timestamp, + input_file_name, + from_utc_timestamp, + to_date, + when, + DataFrame, + current_date, + col, + cast, +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Pull Raw layer data + +# COMMAND ---------- + +# MAGIC %md +# MAGIC #### Event Table + +# COMMAND ---------- + +def read_data_from_raw(schema:str, table_name: str) -> DataFrame: + df = spark.read.table(f"{schema}.{table_name}") + df = df.filter(df.is_processed == 'false') + df.printSchema() + return df + +# COMMAND ---------- + +event_df = read_data_from_raw('conference_raw', 'event') + +# COMMAND ---------- + +# MAGIC %md +# MAGIC #### Session + +# COMMAND ---------- + +session_df = read_data_from_raw('conference_raw', 'session') + +# COMMAND ---------- + +# MAGIC %md +# MAGIC #### In person attendee + +# COMMAND ---------- + +inperson_attendee_df = read_data_from_raw('conference_raw', 'in_person_attendee') + +# COMMAND ---------- + +# MAGIC %md +# MAGIC #### Virtual attendee + +# COMMAND ---------- + +virtual_attendee_df = read_data_from_raw('conference_raw', 'virtual_attendee') + +# COMMAND ---------- + +# MAGIC %md +# MAGIC #### Poll question + +# COMMAND ---------- + +polling_questions_df = read_data_from_raw('conference_raw', 'polling_questions') + +# COMMAND ---------- \ No newline at end of file diff --git a/pipeline/03_conference_data_gx_cloud.py b/pipeline/03_conference_data_gx_cloud.py new file mode 100644 index 0000000..f2cc147 --- /dev/null +++ b/pipeline/03_conference_data_gx_cloud.py @@ -0,0 +1,117 @@ +# Databricks notebook source +import os +import great_expectations as gx + +os.environ["GX_CLOUD_BASE_URL"] = "https://api.greatexpectations.io" + +# COMMAND ---------- + +context = gx.get_context() + +# COMMAND ---------- + +attendee_dim_df = spark.read.table('conference_refined.event_registrant_dim') +attendee_dim_df = attendee_dim_df.filter(attendee_dim_df.create_date == '2023-03-17') + +# COMMAND ---------- + +datasource_yaml = """ +name: conference_refined_attendee_dim +class_name: Datasource +execution_engine: + class_name: SparkDFExecutionEngine +data_connectors: + default_runtime_data_connector: + class_name: RuntimeDataConnector + batch_identifiers: + - attendee_dim_batch_idf +""" + +# Test your configuration (Optional): +datasource: Datasource = context.test_yaml_config(datasource_yaml) + +# Save your datasource: +datasource = context.save_datasource(datasource) + +# Confirm the datasource has been saved (Optional): +existing_datasource = context.get_datasource(datasource_name=datasource.name) +print(existing_datasource.config) + + +# COMMAND ---------- + +expectation_suite = context.create_expectation_suite( + expectation_suite_name="conference_attendee_dim_ex_suite" +) + + +# COMMAND ---------- + +expectation_configuration = gx.core.ExpectationConfiguration(**{ + "expectation_type": "expect_column_values_to_not_be_null", + "kwargs": { + "column": 'state', + }, + "meta":{} + }) +expectation_suite.add_expectation(expectation_configuration) + + +# COMMAND ---------- + +expectation_configuration = gx.core.ExpectationConfiguration(**{ + "expectation_type": "expect_column_values_to_not_be_null", + "kwargs": { + "column": 'email_address', + }, + "meta":{} + }) +expectation_suite.add_expectation(expectation_configuration) + + +context.save_expectation_suite(expectation_suite) + +# COMMAND ---------- + +checkpoint_name = "conference_attendee_dim_ex_checkpoint" +config_version = 1 +class_name = "Checkpoint" + +# First, fetch the ExpectationSuite we will use to define a Validation +expectation_suite = context.get_expectation_suite(expectation_suite_name="conference_attendee_dim_ex_suite") + +validations = [{ + "expectation_suite_name": expectation_suite.name, + "expectation_suite_ge_cloud_id": expectation_suite.ge_cloud_id, + "batch_request": { + "datasource_name": "conference_refined_attendee_dim", + "data_connector_name": "default_runtime_data_connector", + "data_asset_name": "conference_attende_dim_data_asset", + }, +}] + +# NOTE: To update an existing Checkpoint, you must include the Checkpoint's ge_cloud_id as a named argument. +checkpoint = context.add_or_update_checkpoint( + name=checkpoint_name, + config_version=config_version, + class_name=class_name, + validations=validations, +) + +# Confirm the Checkpoint has been saved: +checkpoint_id = str(checkpoint.ge_cloud_id) +checkpoint = context.get_checkpoint(ge_cloud_id=checkpoint_id) +print(checkpoint) + +# COMMAND ---------- + +batch_request = { + "runtime_parameters": { + "batch_data": attendee_dim_df + }, + "batch_identifiers": { + "attendee_dim_batch_idf": "initial_data" + } +} +context.run_checkpoint(ge_cloud_id=checkpoint.ge_cloud_id, batch_request=batch_request) + diff --git a/pipeline/03_conference_data_snowflake.py b/pipeline/03_conference_data_snowflake.py deleted file mode 100644 index cdad790..0000000 --- a/pipeline/03_conference_data_snowflake.py +++ /dev/null @@ -1,2 +0,0 @@ -# Databricks notebook source - diff --git a/pipeline/03_conference_date_run_checkpoint.py b/pipeline/03_conference_date_run_checkpoint.py new file mode 100644 index 0000000..1ebc7db --- /dev/null +++ b/pipeline/03_conference_date_run_checkpoint.py @@ -0,0 +1,36 @@ +# Databricks notebook source +import os +import great_expectations as gx +from pyspark.sql.functions import current_date + +os.environ["GX_CLOUD_BASE_URL"] = "https://api.greatexpectations.io" + +# COMMAND ---------- + +context = gx.get_context() + +# COMMAND ---------- + +attendee_dim_df = spark.read.table('conference_refined.event_registrant_dim') +attendee_dim_df = attendee_dim_df.filter(attendee_dim_df.create_date == current_date()) + +# COMMAND ---------- + +checkpoint = context.get_checkpoint(name='conference_attendee_dim_ex_checkpoint') +print(checkpoint) + +# COMMAND ---------- + +batch_request = { + "runtime_parameters": { + "batch_data": attendee_dim_df + }, + "batch_identifiers": { + "attendee_dim_batch_idf": "initial_data" + } +} +context.run_checkpoint(ge_cloud_id=checkpoint.ge_cloud_id, batch_request=batch_request) + +# COMMAND ---------- + + diff --git a/src/schema.py b/src/schema.py index 18790bc..a0257e2 100644 --- a/src/schema.py +++ b/src/schema.py @@ -75,8 +75,8 @@ StructField("job_role", StringType(), True), StructField("state", StringType(), True), StructField("session_title", StringType(), True), - StructField("login_time", FloatType(), True), - StructField("logout_time", FloatType(), True), + StructField("login_time", StringType(), True), + StructField("logout_time", StringType(), True), ] ) poll_question_schema = StructType(