-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathincremental-streaming-aggregates-testsuite.py
87 lines (66 loc) · 3.04 KB
/
incremental-streaming-aggregates-testsuite.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# Databricks notebook source
# MAGIC %run ./incremental-streaming-aggregates
# COMMAND ----------
class AggregationTestSuite():
def __init__(self):
self.base_data_dir = "/FileStore/data_spark_streaming_scholarnest"
def cleanTests(self):
print(f"Starting Cleanup......")
spark.sql("drop table if exists invoices_bz")
spark.sql("drop table if exists customer_rewards")
dbutils.fs.rm("/user/hive/warehouse/invoices_bz", True)
dbutils.fs.rm("/user/hive/warehouse/customer_rewards", True)
spark.sql(f"CREATE TABLE customer_rewards(CustomerCardNo STRING,TotalAmount DOUBLE,TotalPoints DOUBLE)")
dbutils.fs.rm(f"{self.base_data_dir}/checkpoint/invoices_bz", True)
dbutils.fs.rm(f"{self.base_data_dir}/checkpoint/customer_rewards", True)
dbutils.fs.rm(f"{self.base_data_dir}/data/invoices", True)
dbutils.fs.mkdirs(f"{self.base_data_dir}/data/invoices")
print("Done")
def ingestData(self, itr):
print("Starting Ingestion.....")
dbutils.fs.cp(f"{self.base_data_dir}/datasets/invoices/invoices_{itr}.json",
f"{self.base_data_dir}/data/invoices")
print("Done....")
def assertBronze(self,expected_value):
print("Starting Bronze....")
actual_value = spark.sql("select count(*) from invoices_bz").collect()[0][0]
assert expected_value == actual_value,f"Test failed! actual value is {actual_value}"
print("Done")
def assertGold(self, expected_value):
print("Starting Gold Validation.....")
actual_value = spark.sql("select TotalAmount from customer_rewards where CustomerCardNo='2262471989'").collect()[0][0]
assert expected_value == actual_value, f"Test failed! actual value is {actual_value}"
print("Done.....")
def waitForMicroBatch(self, sleep=100):
import time
print(f"waiting for {sleep} seconds....")
time.sleep(sleep)
print("Done.....")
def runTest(self):
self.cleanTests()
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
bzstream = Bronze()
bzQuery = bzstream.process()
gdstream = Gold()
gdQuery = gdstream.process()
print("Testing first Iteration of Invoice Stream.....")
self.ingestData(1)
self.waitForMicroBatch()
self.assertBronze(501)
self.assertGold(36859)
print("validation passed.....")
print("Testing Second Iteration of invoice Streams..")
self.ingestData(2)
self.waitForMicroBatch()
self.assertBronze(501+500)
self.assertGold(36859+20740)
print("Validation Passed....")
bzQuery.stop()
gdQuery.stop()
# COMMAND ----------
aztest = AggregationTestSuite()
aztest.runTest()
# COMMAND ----------
df = spark.read.format("json").option("inferSchema",True).load("dbfs:/FileStore/data_spark_streaming_scholarnest/datasets/invoices/invoices_1.json")
df.show()
# COMMAND ----------