-
Notifications
You must be signed in to change notification settings - Fork 74
/
sp_demo_technical_overview.sql
829 lines (675 loc) · 36.6 KB
/
sp_demo_technical_overview.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
/*##################################################################################
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###################################################################################*/
/*
YouTube:
- https://youtu.be/bMS4p2XHMpE
Use Cases:
- General overview of BigQuery recent features
Running this demo:
- Start the Spanner Airflow DAG: sample-bigquery-start-spanner (takes about 10 minutes to deploy and shuts down after 4 hours)
- Start the Datastream Airflow DAG: sample-datastream-private-ip-deploy (takes about 10 minutes to deploy)
- Start the Change Data Capture (generate data) DAG: sample-datastream-private-ip-generate-data (starts immedately)
- Start the Dataflow Airflow DAG: sample-dataflow-start-streaming-job (takes about 10 minutes to deploy and shuts down after 4 hours)
- Upload the notebook from the code bucket to colab enterprise
- You should run the notebook: BigQuery-Create-TensorFlow-Model.ipynb
- Run the Iceberg table DAG: sample-iceberg-create-tables-update-data (takes about 15 minutes to deploy)
- Start the Data Quality DAG: sample-rideshare-run-data-quality (takes about 10 minutes to complete)
- Log into https://atomfashion.io/analytics/salesoverview and switch to Premium Account
- Run the below "DO THIS IN ADVANCE"
- Run a manual Data Profile of table: ${project_id}.${bigquery_rideshare_lakehouse_curated_dataset}.bigquery_rideshare_trip
- https://console.cloud.google.com/dataplex/govern/profile?project=${project_id}
Clean up / Reset script:
-- This table was put in the taxi dataset since we need to gran pub/sub permissions to access
DROP TABLE IF EXISTS `${project_id}.${bigquery_taxi_dataset}.taxi_trips_pub_sub`;
DROP SCHEMA IF EXISTS `${project_id}.technical_demo`;
DROP TABLE IF EXISTS `${project_id}.technical_demo.load_data_taxi_trips_csv`;
DROP TABLE IF EXISTS `${project_id}.technical_demo.biglake_taxi_trips_parquet`;
DROP TABLE IF EXISTS `${project_id}.technical_demo.biglake_green_trips_parquet`;
DROP TABLE IF EXISTS `${project_id}.technical_demo.omni_aws_taxi_rate_code`;
DROP TABLE IF EXISTS `${project_id}.technical_demo.bigquery_rideshare_zone`;
DROP TABLE IF EXISTS `${project_id}.technical_demo.bigquery_time_travel`;
DROP TABLE IF EXISTS `${project_id}.${bigquery_taxi_dataset}.bigsearch_log_5b_5t_json_hourly_NOT_INDEXED`;
DROP TABLE IF EXISTS `${project_id}.${bigquery_taxi_dataset}.bigsearch_log_5b_5t_json_hourly_INDEXED`;
DROP MODEL IF EXISTS `${project_id}.technical_demo.model_churn`;
DROP MODEL IF EXISTS `${project_id}.technical_demo.model_tf_linear_regression_fare`;
DROP EXTERNAL TABLE IF EXISTS `${project_id}.technical_demo.biglake_rideshare_zone_csv`;
DROP EXTERNAL TABLE IF EXISTS `${project_id}.technical_demo.biglake_rideshare_trip_iceberg`;
DROP SEARCH INDEX IF EXISTS idx_all_bigsearch_log_5b_5t_json_hourly ON `${project_id}.${bigquery_taxi_dataset}.bigsearch_log_5b_5t_json_hourly_INDEXED`;
*/
-- ***************************************************************************************************
-- BEGIN: DO THIS IN ADVANCE (some items need some time to fully initilize)
-- ***************************************************************************************************
--- Create temporary dataset
CREATE SCHEMA `${project_id}.technical_demo`
OPTIONS (
location = "${bigquery_region}"
);
-- Copy the data from the shared project
CREATE TABLE `${project_id}.technical_demo.bigsearch_log_5b_5t_json_hourly_NOT_INDEXED`
COPY `${shared_demo_project_id}.data_analytics_shared_data.bigsearch_log_5b_5t_json_hourly`;
CREATE TABLE `${project_id}.technical_demo.bigsearch_log_5b_5t_json_hourly_INDEXED`
COPY `${shared_demo_project_id}.data_analytics_shared_data.bigsearch_log_5b_5t_json_hourly`;
-- This can take some time to create the index
CREATE SEARCH INDEX idx_all_bigsearch_log_5b_5t_json_hourly
ON `${project_id}.technical_demo.bigsearch_log_5b_5t_json_hourly_INDEXED` (ALL COLUMNS)
OPTIONS (analyzer = 'LOG_ANALYZER');
-- It can take data lineage a few minutes to be created
CREATE OR REPLACE VIEW `${project_id}.technical_demo.taxi_lineage` AS
SELECT CAST(rideshare_trip_id AS STRING) AS rideshare_trip_id,
CAST(pickup_location_id AS INTEGER) AS pickup_location_id,
zone_pickup.borough AS pickup_borough,
CAST(pickup_datetime AS TIMESTAMP) AS pickup_datetime,
CAST(dropoff_location_id AS INTEGER) AS dropoff_location_id,
zone_dropoff.borough AS dropoff_borough,
CAST(dropoff_datetime AS TIMESTAMP) AS dropoff_datetime,
CAST(ride_distance AS FLOAT64) AS ride_distance,
CAST(is_airport AS BOOLEAN) AS is_airport,
payment.payment_type_description,
CAST(fare_amount AS FLOAT64) AS fare_amount,
CAST(tip_amount AS FLOAT64) AS tip_amount,
CAST(taxes_amount AS FLOAT64) AS taxes_amount,
CAST(total_amount AS FLOAT64) AS total_amount
FROM `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_trip_json` AS trip
INNER JOIN `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_payment_type_json` AS payment
ON CAST(trip.payment_type_id AS INT64) = payment.payment_type_id
INNER JOIN `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_zone_csv` zone_pickup
ON CAST(trip.pickup_location_id AS INT64) = zone_pickup.location_id
INNER JOIN `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_zone_csv` zone_dropoff
ON CAST(trip.dropoff_location_id AS INT64) = zone_dropoff.location_id
UNION ALL
SELECT CAST(rideshare_trip_id AS STRING) AS rideshare_trip_id,
CAST(pickup_location_id AS INTEGER) AS pickup_location_id,
zone_pickup.borough AS pickup_borough,
CAST(pickup_datetime AS TIMESTAMP) AS pickup_datetime,
CAST(dropoff_location_id AS INTEGER) AS dropoff_location_id,
zone_dropoff.borough AS dropoff_borough,
CAST(dropoff_datetime AS TIMESTAMP) AS dropoff_datetime,
CAST(ride_distance AS FLOAT64) AS ride_distance,
CAST(is_airport AS BOOLEAN) AS is_airport,
payment.payment_type_description,
CAST(fare_amount AS FLOAT64) AS fare_amount,
CAST(tip_amount AS FLOAT64) AS tip_amount,
CAST(taxes_amount AS FLOAT64) AS taxes_amount,
CAST(total_amount AS FLOAT64) AS total_amount
FROM `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_trip_parquet` AS trip
INNER JOIN `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_payment_type_json` AS payment
ON CAST(trip.payment_type_id AS INT64) = payment.payment_type_id
INNER JOIN `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_zone_csv` zone_pickup
ON CAST(trip.pickup_location_id AS INT64) = zone_pickup.location_id
INNER JOIN `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_zone_csv` zone_dropoff
ON CAST(trip.dropoff_location_id AS INT64) = zone_dropoff.location_id ;
-- ***************************************************************************************************
-- END: DO THIS IN ADVANCE
-- ***************************************************************************************************
--------------------------------------------------------------------------------------------------------
-- PART 1: See all the data - Any cloud, any speed
--------------------------------------------------------------------------------------------------------
/*
- Load data w/variety of file formats
- Create external tables over different file formats
- Parquet
- Parquet w/Hive partitioning
- Iceberg
- Query and Transfer AWS data
- Dataflow Streaming Data
- BQ Subscriptions
- DataStream
- Spanner
*/
-- * Load data w/varity of file formats *
-- Load data into BigQuery (AVRO, ORC, CSV, JSON, Parquet, Iceberg)
LOAD DATA OVERWRITE `${project_id}.technical_demo.load_data_taxi_trips_csv`
FROM FILES (
format = 'CSV',
skip_leading_rows = 1,
field_delimiter=',',
null_marker='',
uris = ['gs://${processed_bucket_name}/processed/taxi-data/green/trips_table/csv/year=2021/month=12/*.csv']);
SELECT * FROM `${project_id}.technical_demo.load_data_taxi_trips_csv`;
-- * Create external tables over different file formats *
-- Query External Parquet data
CREATE OR REPLACE EXTERNAL TABLE `${project_id}.technical_demo.biglake_taxi_trips_parquet`
WITH CONNECTION `${project_id}.${bigquery_region}.biglake-connection`
OPTIONS (
format = "PARQUET",
uris = ['gs://${gcs_rideshare_lakehouse_raw_bucket}/rideshare_trip/parquet/*.parquet']
);
SELECT * FROM `${project_id}.technical_demo.biglake_taxi_trips_parquet` LIMIT 1000;
-- Query External Parquet data w/Hive Partitioning
-- https://console.cloud.google.com/storage/browser/${processed_bucket_name};tab=objects?forceOnBucketsSortingFiltering=false&project=${project_id}&prefix=&forceOnObjectsSortingFiltering=false
CREATE OR REPLACE EXTERNAL TABLE `${project_id}.technical_demo.biglake_green_trips_parquet`
WITH PARTITION COLUMNS (
year INTEGER,
month INTEGER
)
WITH CONNECTION `${project_id}.${bigquery_region}.biglake-connection`
OPTIONS (
format = "PARQUET",
hive_partition_uri_prefix = "gs://${processed_bucket_name}/processed/taxi-data/green/trips_table/parquet/",
uris = ['gs://${processed_bucket_name}/processed/taxi-data/green/trips_table/parquet/*.parquet']
);
SELECT * FROM `${project_id}.technical_demo.biglake_green_trips_parquet` LIMIT 1000;
-- Query External Iceberg data
-- OPEN: https://console.cloud.google.com/storage/browser/rideshare-lakehouse-enriched-${random_extension}/rideshare_iceberg_catalog/rideshare_lakehouse_enriched.db/biglake_rideshare_payment_type_iceberg/metadata?pageState=(%22StorageObjectListTable%22:(%22f%22:%22%255B%255D%22))&project=${project_id}&prefix=&forceOnObjectsSortingFiltering=false
CREATE OR REPLACE EXTERNAL TABLE `${project_id}.technical_demo.biglake_rideshare_trip_iceberg`
OPTIONS (
format = 'ICEBERG',
uris = ["gs://${gcs_rideshare_lakehouse_enriched_bucket}/rideshare_iceberg_catalog/rideshare_lakehouse_enriched.db/biglake_rideshare_payment_type_iceberg/metadata/REPLACE-ME.metadata.json"]
);
SELECT * FROM `${project_id}.technical_demo.biglake_rideshare_trip_iceberg`;
-- * Query and Transfer AWS data *
-- https://cloud.google.com/bigquery/docs/load-data-using-cross-cloud-transfer
-- Create the AWS tables
CALL `${project_id}.${aws_omni_biglake_dataset_name}.sp_demo_aws_omni_create_tables`();
-- Load Data from AWS (or Azure) - Query data in a remote cloud and create a local BigQuery table
CREATE OR REPLACE TABLE `${project_id}.technical_demo.omni_aws_taxi_rate_code` AS
SELECT * FROM `${project_id}.${aws_omni_biglake_dataset_name}.taxi_s3_rate_code`;
SELECT * FROM `${project_id}.technical_demo.omni_aws_taxi_rate_code`;
-- * Dataflow Streaming Data *
-- Ingest streaming data (Open Dataflow, we can use custom code and/or templates)
-- https://console.cloud.google.com/dataflow/jobs?project=${project_id}
SELECT * FROM `${project_id}.${bigquery_taxi_dataset}.taxi_trips_streaming` LIMIT 1000;
SELECT COUNT(*) AS RecordCount
FROM `${project_id}.${bigquery_taxi_dataset}.taxi_trips_streaming`
WHERE timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(),INTERVAL 1 MINUTE);
-- * BQ Subscriptions *
CREATE OR REPLACE TABLE `${project_id}.${bigquery_taxi_dataset}.taxi_trips_pub_sub`
(
data STRING
)
PARTITION BY TIMESTAMP_TRUNC(_PARTITIONTIME, HOUR);
-- Open: https://console.cloud.google.com/cloudpubsub/subscription/list?project=${project_id}
-- projects/pubsub-public-data/topics/taxirides-realtime
-- grant service account: service-31541164526@gcp-sa-pubsub.iam.gserviceaccount.com
SELECT * FROM `${project_id}.${bigquery_taxi_dataset}.taxi_trips_pub_sub` LIMIT 1000;
SELECT COUNT(*) AS Cnt FROM `${project_id}.${bigquery_taxi_dataset}.taxi_trips_pub_sub`;
-- REMEMBER TO DELETE YOUR Pub/Sub TO BIGQUERY JOB
-- * Datastream *
-- https://console.cloud.google.com/datastream/streams?project=${project_id}
SELECT * FROM `${project_id}.datastream_private_ip_public.driver`;
SELECT * FROM `${project_id}.datastream_private_ip_public.review`;
SELECT * FROM `${project_id}.datastream_private_ip_public.payment`;
-- * Spanner Federated Query *
-- https://console.cloud.google.com/spanner/instances/spanner-ral9vzfqhi/databases/weather/details/tables?project=${project_id}
-- Spanner (data)
EXECUTE IMMEDIATE """
SELECT *
FROM EXTERNAL_QUERY(
'projects/${project_id}/locations/${spanner_region}/connections/bq_spanner_connection',
"SELECT * FROM weather WHERE station_id='USW00094728'");
""";
--------------------------------------------------------------------------------------------------------
-- PART 2: Trust it all - Data lineage, dataplex, data quality
--------------------------------------------------------------------------------------------------------
/*
- Lineage via/Console and Views
- Managing with Dataplex
- Securing with Dataplex
- Registration with Dataplex
- Data Profiling metrics
- Data Quality metrics
- Security (RLS/CLS/DM to both internal/BigLake tables)
*/
-- * Lineage via/Console and Views *
-- Create a view and show the lineage
-- Show: ${project_id}.${bigquery_rideshare_lakehouse_curated_dataset}.bigquery_model_training_data
-- Shows Analytics Hub data
-- https://console.cloud.google.com/dataplex/search?project=${project_id}&q=${project_id}.${bigquery_rideshare_lakehouse_curated_dataset}.bigquery_model_training_data
-- Lineage from storage through tables and views
-- This was done in advance
/*
CREATE VIEW `${project_id}.technical_demo.taxi_lineage` AS
SELECT CAST(rideshare_trip_id AS STRING) AS rideshare_trip_id,
CAST(pickup_location_id AS INTEGER) AS pickup_location_id,
zone_pickup.borough AS pickup_borough,
CAST(pickup_datetime AS TIMESTAMP) AS pickup_datetime,
CAST(dropoff_location_id AS INTEGER) AS dropoff_location_id,
zone_dropoff.borough AS dropoff_borough,
CAST(dropoff_datetime AS TIMESTAMP) AS dropoff_datetime,
CAST(ride_distance AS FLOAT64) AS ride_distance,
CAST(is_airport AS BOOLEAN) AS is_airport,
payment.payment_type_description,
CAST(fare_amount AS FLOAT64) AS fare_amount,
CAST(tip_amount AS FLOAT64) AS tip_amount,
CAST(taxes_amount AS FLOAT64) AS taxes_amount,
CAST(total_amount AS FLOAT64) AS total_amount
FROM `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_trip_json` AS trip
INNER JOIN `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_payment_type_json` AS payment
ON CAST(trip.payment_type_id AS INT64) = payment.payment_type_id
INNER JOIN `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_zone_csv` zone_pickup
ON CAST(trip.pickup_location_id AS INT64) = zone_pickup.location_id
INNER JOIN `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_zone_csv` zone_dropoff
ON CAST(trip.dropoff_location_id AS INT64) = zone_dropoff.location_id
UNION ALL
SELECT CAST(rideshare_trip_id AS STRING) AS rideshare_trip_id,
CAST(pickup_location_id AS INTEGER) AS pickup_location_id,
zone_pickup.borough AS pickup_borough,
CAST(pickup_datetime AS TIMESTAMP) AS pickup_datetime,
CAST(dropoff_location_id AS INTEGER) AS dropoff_location_id,
zone_dropoff.borough AS dropoff_borough,
CAST(dropoff_datetime AS TIMESTAMP) AS dropoff_datetime,
CAST(ride_distance AS FLOAT64) AS ride_distance,
CAST(is_airport AS BOOLEAN) AS is_airport,
payment.payment_type_description,
CAST(fare_amount AS FLOAT64) AS fare_amount,
CAST(tip_amount AS FLOAT64) AS tip_amount,
CAST(taxes_amount AS FLOAT64) AS taxes_amount,
CAST(total_amount AS FLOAT64) AS total_amount
FROM `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_trip_parquet` AS trip
INNER JOIN `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_payment_type_json` AS payment
ON CAST(trip.payment_type_id AS INT64) = payment.payment_type_id
INNER JOIN `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_zone_csv` zone_pickup
ON CAST(trip.pickup_location_id AS INT64) = zone_pickup.location_id
INNER JOIN `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_zone_csv` zone_dropoff
ON CAST(trip.dropoff_location_id AS INT64) = zone_dropoff.location_id ;
*/
SELECT * FROM `${project_id}.technical_demo.taxi_lineage` LIMIT 100;
-- * Managing with Dataplex *
-- https://console.cloud.google.com/dataplex/lakes?project=${project_id}
-- Show Dataplex Lakes (Managed View)
-- * Security with Dataplex *
-- https://console.cloud.google.com/dataplex/secure?project=${project_id}
-- Show Dataplex Security (Securing Assests within BigQuery / Storage as one)
-- * Registration with Dataplex *
-- Show tables in storage
-- Show the tables within BigQuery
-- https://console.cloud.google.com/storage/browser/rideshare-lakehouse-raw-${random_extension};tab=objects?forceOnBucketsSortingFiltering=false&project=${project_id}&prefix=&forceOnObjectsSortingFiltering=false
SELECT * FROM `${project_id}.rideshare_raw_zone_${random_extension}.rideshare_payment_type` LIMIT 1000;
SELECT * FROM `${project_id}.rideshare_raw_zone_${random_extension}.rideshare_trip_json` LIMIT 1000;
SELECT * FROM `${project_id}.rideshare_raw_zone_${random_extension}.rideshare_trip_parquet` LIMIT 1000;
-- * Data Profiling metrics *
-- Show data scan of table: bigquery_rideshare_trip
-- https://console.cloud.google.com/dataplex/govern/profile?project=${project_id}
-- * Data Quality metrics *
-- https://console.cloud.google.com/dataplex/govern/quality?project=${project_id}
-- See results in data catalog
-- https://console.cloud.google.com/dataplex/search?project=${project_id}&q=${project_id}.${bigquery_rideshare_lakehouse_curated_dataset}.bigquery_rideshare_trip
-- * Security (RLS/CLS/DM to both internal/BigLake tables) *
-- Create a BigLake table over the Zone data
CREATE OR REPLACE EXTERNAL TABLE `${project_id}.technical_demo.biglake_rideshare_zone_csv`
(
location_id INTEGER,
borough STRING,
zone STRING,
service_zone STRING
)
WITH CONNECTION `${project_id}.${bigquery_region}.biglake-connection`
OPTIONS (
format = "CSV",
field_delimiter = '|',
skip_leading_rows = 1,
uris = ['gs://${gcs_rideshare_lakehouse_raw_bucket}/rideshare_zone/*.csv']
);
-- Show CSV table (all data)
SELECT * FROM `${project_id}.technical_demo.biglake_rideshare_zone_csv` LIMIT 1000;
-- Query: Create an access policy so the admin (you) can only see Manhattan data
CREATE OR REPLACE ROW ACCESS POLICY rls_biglake_rideshare_zone_csv
ON `${project_id}.technical_demo.biglake_rideshare_zone_csv`
GRANT TO ("user:${gcp_account_name}")
FILTER USING (borough = 'Manhattan');
-- See just the data you are allowed to see
SELECT * FROM `${project_id}.technical_demo.biglake_rideshare_zone_csv`;
-- Edit the table and show Data Masking / Column level security on the service zone
SELECT * FROM `${project_id}.technical_demo.biglake_rideshare_zone_csv`;
DROP ALL ROW ACCESS POLICIES ON `${project_id}.technical_demo.biglake_rideshare_zone_csv`;
-- Create an Internal table and apply a policy (same exact code)
CREATE OR REPLACE TABLE `${project_id}.technical_demo.bigquery_rideshare_zone` AS
SELECT * FROM `${project_id}.technical_demo.biglake_rideshare_zone_csv`;
CREATE OR REPLACE ROW ACCESS POLICY rls_biglake_rideshare_zone_csv
ON `${project_id}.technical_demo.bigquery_rideshare_zone`
GRANT TO ("user:${gcp_account_name}")
FILTER USING (service_zone = 'Yellow Zone');
SELECT * FROM `${project_id}.technical_demo.bigquery_rideshare_zone`;
DROP ALL ROW ACCESS POLICIES ON `${project_id}.technical_demo.bigquery_rideshare_zone`;
--------------------------------------------------------------------------------------------------------
-- PART 3: Activate it all - BQML, universal semantic layer, connect layer as dashboard, turn on bi engine (1 click), vertex (deploy 1 click and drift detection), etc
--------------------------------------------------------------------------------------------------------
/*
- BQML
- Create Model
- Explainable AI
- Integration with Vertex AI
- Model Registry
- REST API to expose to consuming apps
- Build model in Vertex (ML Ops and ingest into BQ - Import)
- Show time travel
- ML Ops pipeline (TO DO)
- JSON Analytics / data type
- BigSearch
- Query Acceleration
- Unstructured data analysis
- BigSpark
*/
-- * BQML - Create a Model * (takes 30 seconds)
-- Save the model in Vertex AI model registry
-- https://cloud.google.com/bigquery/docs/bqml-introduction#supported_models
-- https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-create#create_model_syntax
CREATE OR REPLACE MODEL `${project_id}.technical_demo.model_churn`
OPTIONS(
MODEL_TYPE="LOGISTIC_REG", -- or BOOSTED_TREE_CLASSIFIER, DNN_CLASSIFIER, AUTOML_CLASSIFIER
INPUT_LABEL_COLS=["churned"],
MODEL_REGISTRY = "vertex_ai"
) AS
SELECT * EXCEPT(user_first_engagement, user_pseudo_id)
FROM `${project_id}.${bigquery_thelook_ecommerce_dataset}.training_data`;
-- * BQML - Explainable AI *
-- Use Explainable AI for the prediction
EXECUTE IMMEDIATE """
SELECT *
FROM ML.EXPLAIN_PREDICT(MODEL `${project_id}.technical_demo.model_churn`,
(SELECT * FROM `${project_id}`.thelook_ecommerce.training_data LIMIT 10));
""";
-- * Integration with Vertex AI - Model Registry *
-- REST API to expose to consuming apps
-- https://console.cloud.google.com/vertex-ai/models?project=${project_id}
-- * Build model in Vertex (ML Ops and ingest into BQ - Import) *
-- Import a model (Tensorflow)
-- Show notebook / model
CREATE OR REPLACE MODEL `${project_id}.technical_demo..model_tf_linear_regression_fare`
OPTIONS (MODEL_TYPE='TENSORFLOW',
MODEL_PATH='gs://${processed_bucket_name}/tensorflow/taxi_fare_model/linear_regression/*');
-- Run a prediction (note you can run the same prediction in your notebook and get the same result)
EXECUTE IMMEDIATE """
SELECT *
FROM ML.PREDICT(MODEL `${project_id}.technical_demo..model_tf_linear_regression_fare`,
(
SELECT [10.0,20.0] AS normalization_input
));
""";
-- * Show time travel w/this (delete then see time travel) - this is for easy recovery of data *
CREATE OR REPLACE TABLE `${project_id}.technical_demo.bigquery_time_travel`
(
Vendor_Id INTEGER,
Pickup_DateTime TIMESTAMP,
Dropoff_DateTime TIMESTAMP,
PULocationID INTEGER,
DOLocationID INTEGER,
Trip_Distance FLOAT64,
Total_Amount FLOAT64,
PartitionDate DATE
)
PARTITION BY PartitionDate
AS SELECT
Vendor_Id,
Pickup_DateTime,
Dropoff_DateTime,
PULocationID,
DOLocationID,
Trip_Distance,
Total_Amount,
DATE(year, month, 1) as PartitionDate
FROM `${project_id}.${bigquery_taxi_dataset}.ext_green_trips_parquet`
WHERE DATE(year, month, 1) = '2021-01-01';
-- Query: 76516
SELECT COUNT(*) AS RecordCount FROM `${project_id}.technical_demo.bigquery_time_travel`;
-- Make a delete mistake
DELETE FROM `${project_id}.technical_demo.bigquery_time_travel` WHERE PULocationID <= 47;
-- Query: 59114
SELECT COUNT(*) AS RecordCount FROM `${project_id}.technical_demo.bigquery_time_travel`;
-- Query: 76516 - See the prior data before the INSERT
-- NOTE: You might need to change INTERVAL 30 SECOND to more time if you spend time talking
SELECT COUNT(*) AS RecordCount
FROM `${project_id}.technical_demo.bigquery_time_travel`
FOR SYSTEM_TIME AS OF TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 15 SECOND);
-- * JSON Analytics / data type *
CREATE OR REPLACE EXTERNAL TABLE `${project_id}.${bigquery_taxi_dataset}.taxi_trips_json`
(
taxi_json JSON
)
WITH PARTITION COLUMNS (
-- column order must match the external path
year INTEGER,
month INTEGER
)
OPTIONS (
format = "CSV",
field_delimiter = '\u00fe',
skip_leading_rows = 0,
hive_partition_uri_prefix = "gs://${processed_bucket_name}/processed/taxi-data/yellow/trips_table/json/",
uris = ['gs://${processed_bucket_name}/processed/taxi-data/yellow/trips_table/json/*.json']
);
-- JSON Analytics with JSON data
SELECT * FROM `${project_id}.${bigquery_taxi_dataset}.taxi_trips_json` LIMIT 100;
SELECT taxi_json.Vendor_Id,
taxi_json.Rate_Code_Id,
taxi_json.Fare_Amount
FROM `${project_id}.${bigquery_taxi_dataset}.taxi_trips_json`
LIMIT 100;
-- for each day find the highest passenger count and total amount per payment type
WITH TaxiData AS
(
SELECT CAST(JSON_VALUE(taxi_trips.taxi_json.Pickup_DateTime) AS TIMESTAMP) AS Pickup_DateTime,
SAFE.INT64(taxi_trips.taxi_json.Payment_Type_Id) AS Payment_Type_Id,
SAFE.INT64(taxi_trips.taxi_json.Passenger_Count) AS Passenger_Count,
SAFE.FLOAT64(taxi_trips.taxi_json.Total_Amount) AS Total_Amount,
FROM `${project_id}.${bigquery_taxi_dataset}.taxi_trips_json` AS taxi_trips
WHERE CAST(JSON_VALUE(taxi_trips.taxi_json.Pickup_DateTime) AS TIMESTAMP) BETWEEN '2020-01-01' AND '2020-06-01'
AND SAFE.INT64(taxi_trips.taxi_json.Payment_Type_Id) IN (1,2)
)
, TaxiDataRanking AS
(
SELECT CAST(Pickup_DateTime AS DATE) AS Pickup_Date,
taxi_trips.Payment_Type_Id,
taxi_trips.Passenger_Count,
taxi_trips.Total_Amount,
RANK() OVER (PARTITION BY CAST(Pickup_DateTime AS DATE),
taxi_trips.Payment_Type_Id
ORDER BY taxi_trips.Passenger_Count DESC,
taxi_trips.Total_Amount DESC) AS Ranking
FROM TaxiData AS taxi_trips
)
SELECT Pickup_Date,
Payment_Type_Description,
Passenger_Count,
Total_Amount
FROM TaxiDataRanking
INNER JOIN `${project_id}.${bigquery_taxi_dataset}.payment_type` AS payment_type
ON TaxiDataRanking.Payment_Type_Id = payment_type.Payment_Type_Id
WHERE Ranking = 1
ORDER BY Pickup_Date, Payment_Type_Description;
-- * BigSearch *
-- How many records are we searching
SELECT COUNT(*) AS Cnt FROM `${project_id}.${bigquery_rideshare_lakehouse_curated_dataset}.bigquery_rideshare_trip` ;
-- Find a credit card number this since this is generated data and change below REPLACE_CREDIT_CARD
SELECT credit_card_number
FROM `${project_id}.${bigquery_rideshare_lakehouse_curated_dataset}.bigquery_rideshare_trip`
WHERE credit_Card_number IS NOT NULL
LIMIT 1;
-- The table is not partitioned or clustered on this data
-- Click on Job Information and show "Index Usage Mode: FULLY_USED"
SELECT *
FROM `${project_id}.${bigquery_rideshare_lakehouse_curated_dataset}.bigquery_rideshare_trip`
WHERE SEARCH(credit_card_number,'REPLACE_CREDIT_CARD', analyzer=>'NO_OP_ANALYZER');
-- Search even if we do not know the field name
SELECT *
FROM `${project_id}.${bigquery_rideshare_lakehouse_curated_dataset}.bigquery_rideshare_trip` AS bigquery_rideshare_trip
WHERE SEARCH(bigquery_rideshare_trip,'REPLACE_CREDIT_CARD', analyzer=>'NO_OP_ANALYZER');
-- NOT_INDEXED
-- Bytes processed: 5.83 TB
-- Index Usage Mode: UNUSED
SELECT *
FROM `${project_id}.technical_demo.bigsearch_log_5b_5t_json_hourly_NOT_INDEXED`
WHERE SEARCH(insertId, '40ms4zeh78tg6o6jx');
-- Make sure index is complete: coverage_percentage should be 100%
SELECT table_name, index_name, coverage_percentage
FROM `${project_id}.technical_demo.INFORMATION_SCHEMA.SEARCH_INDEXES`
WHERE index_status = 'ACTIVE';
-- Search the field "insertId" for a value (just one column)
-- Bytes processed: 106.57 MB
-- Index Usage Mode: FULLY_USED
SELECT *
FROM `${project_id}.technical_demo.bigsearch_log_5b_5t_json_hourly_INDEXED`
WHERE SEARCH(insertId, '40ms4zeh78tg6o6jx');
-- 50 Billion Rows (requires access to the shared project)
SELECT COUNT(*) AS Cnt
FROM `${shared_demo_project_id}.bigquery_features.bigsearch_log_50b_60t_json_hourly` AS LogsIndexed
WHERE SEARCH((internalClusterId, labels), 'service281')
AND TIMESTAMP_TRUNC(timestamp, HOUR) BETWEEN TIMESTAMP("2021-02-01 00:00:00")
AND TIMESTAMP("2021-02-06 00:00:00") ;
-- * Query Acceleration *
/*
Data Size:
- File Count = 5,068,912
- Directory Count = 1,588,355
- The taxi trip table was exported to cloud storage and partitioned by Year, Month, Day, Hour and Minute
- About 75 GB of data was exported and created many small files
*/
-- DO NOT RUN - TOO SLOW (the point of the demo)
-- NO ACCELERATION
-- Duration: >3 min
SELECT *
FROM `${shared_demo_project_id}.bigquery_preview_features.biglake_no_acceleration`
WHERE year = 2021
AND month = 1
AND day = 1
AND hour = 0
AND minute = 0;
-- WITH ACCELERATION
SELECT *
FROM `${shared_demo_project_id}.bigquery_preview_features.biglake_query_acceleration`
WHERE year = 2021
AND month = 1
AND day = 1
AND hour = 0
AND minute = 0;
-- * Unstructured Data Analytis *
-- https://console.cloud.google.com/storage/browser/${gcs_rideshare_lakehouse_raw_bucket}/rideshare_images?pageState=(%22StorageObjectListTable%22:(%22f%22:%22%255B%255D%22))&project=${project_id}&prefix=&forceOnObjectsSortingFiltering=false
CREATE OR REPLACE EXTERNAL TABLE `${project_id}.technical_demo.biglake_unstructured_data`
WITH CONNECTION `${project_id}.${bigquery_region}.biglake-connection`
OPTIONS (
object_metadata="DIRECTORY",
uris = ['gs://${gcs_rideshare_lakehouse_raw_bucket}/rideshare_images/*.jpg',
'gs://${gcs_rideshare_lakehouse_raw_bucket}/rideshare_images/*.jpeg'],
max_staleness=INTERVAL 30 MINUTE,
metadata_cache_mode="MANUAL");
CALL BQ.REFRESH_EXTERNAL_METADATA_CACHE('${project_id}.technical_demo.biglake_unstructured_data');
-- See the data
SELECT * FROM `${project_id}.technical_demo.biglake_unstructured_data` ;
-- Process the images via our object table and show the results
WITH UnstructuredData AS
(
-- get the image from the oject table
SELECT *
FROM `${project_id}.technical_demo.biglake_unstructured_data`
WHERE uri = (SELECT uri FROM `${project_id}.technical_demo.biglake_unstructured_data` LIMIT 1)
)
, ScoreAI AS
(
-- call a remote function
SELECT `${project_id}.rideshare_lakehouse_enriched.ext_udf_ai_localize_objects`(UnstructuredData.uri) AS json_result
FROM UnstructuredData
)
SELECT item.name,
item.score,
ScoreAI.json_result
FROM ScoreAI, UNNEST(JSON_QUERY_ARRAY(ScoreAI.json_result.localized_object_annotations)) AS item;
-- Show all the images processed (object detection, labels, landmarks, logos)
SELECT * FROM `${project_id}.rideshare_lakehouse_enriched.bigquery_rideshare_images_ml_score` LIMIT 100;
-- * BigSpark *
-- BigLake Metastore Service
-- CALL `${project_id}.rideshare_lakehouse_enriched.sp_process_data`();
--------------------------------------------------------------------------------------------------------
-- PART 4: Intelligent infrastructure (autoscaling to the minute, best price/perf, etc, capacity, )
--------------------------------------------------------------------------------------------------------
/*
- Autoscaling / Workload management
- Monitoring - Show these views (slide 20): WIP - Landing Launches 27 Mar2023 - DA Field Council - go/da-fc (TO DO)
- See how much you are using
- Quotas (even for on-demand)
- Query Visualization (optimizer/queues)
- Slot recommender / Info Schema (see pricing / slots)
- Data Modeling (Looker: https://atomfashion.io/analytics/salesoverview | Select Map | Add Age)
- SQL Translate
- Sheets
- BI Engine
*/
-- * Autoscaling / Workload management - 10 second autoscaling *
-- https://console.cloud.google.com/bigquery/admin/reservations;region=${bigquery_region}/create?project=${project_id}®ion=${bigquery_region}
-- * Monitoring *
-- https://console.cloud.google.com/bigquery/admin/monitoring/resource-utilization?project=${project_id}®ion=${bigquery_region}
-- * See how much you are using (Informational Schema) *
-- Compute the cost per Job, Average slots per Job and Max slots per Job (at the job stage level)
-- This will show you the cost for the Query and the Maximum number of slots used and if any additional slots where requested (to help gauge for reservations)
SELECT project_id,
job_id,
reservation_id,
EXTRACT(DATE FROM creation_time) AS creation_date,
TIMESTAMP_DIFF(end_time, creation_time, SECOND) AS job_duration_seconds,
job_type,
user_email,
total_bytes_billed,
-- 6.25 / 1,099,511,627,776 = 0.00000000000568434188608080 ($6.25 per TB so cost per byte is 0.00000000000568434188608080)
CASE WHEN reservation_id IS NULL
THEN CAST(total_bytes_billed AS BIGDECIMAL) * CAST(0.00000000000568434188608080 AS BIGDECIMAL)
ELSE 0
END
as est_cost,
-- Average slot utilization per job is calculated by dividing
-- total_slot_ms by the millisecond duration of the job
SAFE_DIVIDE(job.total_slot_ms,(TIMESTAMP_DIFF(job.end_time, job.start_time, MILLISECOND))) AS job_avg_slots,
query,
-- Determine the max number of slots used at ANY stage in the query. The average slots might be 55
-- but a single stage might spike to 2000 slots. This is important to know when estimating when purchasing slots.
MAX(SAFE_DIVIDE(unnest_job_stages.slot_ms,unnest_job_stages.end_ms - unnest_job_stages.start_ms)) AS jobstage_max_slots,
-- Is the job requesting more units of works (slots). If so you need more slots.
-- estimatedRunnableUnits = Units of work that can be scheduled immediately.
-- Providing additional slots for these units of work will accelerate the query, if no other query in the reservation needs additional slots.
MAX(unnest_timeline.estimated_runnable_units) AS estimated_runnable_units
FROM `region-${bigquery_region}`.INFORMATION_SCHEMA.JOBS AS job
CROSS JOIN UNNEST(job_stages) as unnest_job_stages
CROSS JOIN UNNEST(timeline) AS unnest_timeline
WHERE project_id = '${project_id}'
AND DATE(creation_time) BETWEEN DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY) AND CURRENT_DATE()
GROUP BY 1,2,3,4,5,6,7,8,9,10,11
ORDER BY creation_date DESC ;
-- * Quotas (even for on-demand) *
-- https://console.cloud.google.com/iam-admin/quotas?referrer=search&project=${project_id}&pageState=(%22allQuotasTable%22:(%22f%22:%22%255B%257B_22k_22_3A_22Metric_22_2C_22t_22_3A10_2C_22v_22_3A_22_5C_22bigquery.googleapis.com%252Fquota%252Fquery%252Fusage_5C_22_22_2C_22s_22_3Atrue_2C_22i_22_3A_22metricName_22%257D%255D%22))
-- * Query Visualization (optimizer/queues) *
SELECT FORMAT_DATE("%w", Pickup_DateTime) AS WeekdayNumber,
FORMAT_DATE("%A", Pickup_DateTime) AS WeekdayName,
vendor.Vendor_Description,
payment_type.Payment_Type_Description,
SUM(taxi_trips.Total_Amount) AS high_value_trips
FROM `${project_id}.${bigquery_taxi_dataset}.taxi_trips` AS taxi_trips
INNER JOIN `${project_id}.${bigquery_taxi_dataset}.vendor` AS vendor
ON taxi_trips.Vendor_Id = vendor.Vendor_Id
AND taxi_trips.Pickup_DateTime BETWEEN '2020-01-01' AND '2020-06-01'
LEFT JOIN `${project_id}.${bigquery_taxi_dataset}.payment_type` AS payment_type
ON taxi_trips.Payment_Type_Id = payment_type.Payment_Type_Id
GROUP BY 1, 2, 3, 4
HAVING SUM(taxi_trips.Total_Amount) > 50
ORDER BY WeekdayNumber, 3, 4;
-- * Slot recommender *
-- https://console.cloud.google.com/bigquery/admin/reservations;region=${bigquery_region}/slot-estimator?project=${project_id}®ion=${bigquery_region}
-- * Data Modeling (Looker: https://atomfashion.io/analytics/salesoverview | Select Map | Add Age) *
-- * SQL Translate *
/*
CREATE VOLATILE TABLE exampleTable (age INT, gender VARCHAR(10));
INS INTO exampleTable (10, 'F');
INS INTO exampleTable (20, 'M');
SEL *
FROM exampleTable
WHERE gender EQ 'F'
AND age LT 15;
*/
-- * Sheets *
-- Create sheet w/Taxi Data
-- Create pivot table (values of tip/total amt)
-- * BI Engine *
-- Open BI Engine tab and add BI Engine
-- https://console.cloud.google.com/bigquery/admin/bi-engine?project=${project_id}
-- REMEMBER TO DELETE YOUR Pub/Sub TO BIGQUERY JOB
SELECT 1;