Skip to content

Data Lakehouse local stack with PySpark, Trino, and Minio. Includes an example to process Raygun error data and the IP address occurrence.

License

Notifications You must be signed in to change notification settings

tomkat-cr/data_lakehouse_local_stack

Repository files navigation

Data Lakehouse Local Stack

Data Lakehouse local stack with PySpark, Trino, and Minio

This repository aims to introduce the Data Lakehouse pattern as a suitable and flexible solution to transit small companies to established enterprises, allowing to implement a local data lakehouse from OpenSource solutions, compatible with Cloud production grade tools.

It also includes an example to process Raygun error data and the IP address occurrence.

Introduction

In the world of ML and AI, data is the crown jewel, but it's normally lost in Swamps due to bad practices with Data Lakes when companies try to productionize their data.

Data Warehouses are costly solutions for this problem, and increase the complexity of simple Data Lakes.

Here's where Data Lakehouses come into action, being a hybrid solution with the best of both worlds. (source).

Data Lakehouses aim to combine elements of data warehousing with core elements of data lakes. Put simply, they are designed to provide the lower costs of cloud storage even for large amounts of raw data alongside support for certain analytics concepts – such as SQL access to curated and structured data tables stored in relational databases, or support for large scale processing of Big Data analytics or Machine Learning workloads (source).

Common DatalakeHouse technologies
(Image source)

The Medallion Architecture

A medallion architecture is a data design pattern used to logically organize data in a lakehouse, with the goal of incrementally and progressively improving the structure and quality of data as it flows through each layer of the architecture (from Bronze ⇒ Silver ⇒ Gold layer tables). Medallion architectures are sometimes also referred to as "multi-hop" architectures.

The Medallion Architecture
(Image source)

  • Bronze: ingestion tables (raw data, originals).

  • Silver: refined/cleaned tables.

  • Gold: feature/aggregated data store.

  • Platinum (optional): in a faster format like a high-speed DBMS, because gold is stored in cloud bucket storage (like AWS S3), and it's slow for e.g. real-time dashboard.

Readings:

Data Lakehouse Layers

DatalakeHouse components
(Image source)

Storing structured and unstructured data in a data lakehouse presents many benefits to a data organization, namely making it easier and more seamless to support both business intelligence and data science workloads. This starts at the data source.

We describe the five layers in this section, but let’s first talk about the sources that feed the Lake House Architecture.

Data sources

The Lake House Architecture enables you to ingest and analyze data from a variety of sources. Many of these sources such as line of business (LOB) applications, ERP applications, and CRM applications generate highly structured batches of data at fixed intervals. In addition to internal structured sources, you can receive data from modern sources such as web applications, mobile devices, sensors, video streams, and social media. These modern sources typically generate semi-structured and unstructured data, often as continuous streams.

Data ingestion layer

The ingestion layer is responsible for ingesting data into the Lake House storage layer. It provides the ability to connect to internal and external data sources over a variety of protocols. It can ingest and deliver batch as well as real-time streaming data into a data warehouse as well as data lake components of the Lake House storage layer.

Data storage layer

The data storage layer is responsible for providing durable, scalable, and cost-effective components to store and manage vast quantities of data. In a Lake House Architecture, the data warehouse and data lake natively integrate to provide an integrated cost-effective storage layer that supports unstructured as well as highly structured and modeled data. The storage layer can store data in different states of consumption readiness, including raw, trusted-conformed, enriched, and modeled.

Catalog layer

The catalog layer is responsible for storing business and technical metadata about datasets hosted in the Lake House storage layer. In a Lake House Architecture, the catalog is shared by both the data lake and data warehouse, and enables writing queries that incorporate data stored in the data lake as well as the data warehouse in the same SQL. It allows you to track versioned schemas and granular partitioning information of datasets. As the number of datasets grows, this layer makes datasets in the Lake House discoverable by providing search capabilities.

Processing layer

Components in the processing layer are responsible for transforming data into a consumable state through data validation, cleanup, normalization, transformation, and enrichment. The processing layer provides purpose-built components to perform a variety of transformations, including data warehouse style SQL, big data processing, and near-real-time ETL.

Data consumption layer

The data consumption layer is responsible for providing scalable and performant components that use unified Lake House interfaces to access all the data stored in Lake House storage and all the metadata stored in the Lake House catalog. It democratizes analytics to enable all personas across an organization by providing purpose-built components that enable analysis methods, including interactive SQL queries, warehouse style analytics, BI dashboards, and ML.

Sources:

Data LakeHouse Components by Cloud Providers

DatalakeHouse Components by Cloud
(Image source)

Local Data Lakehouse Components

Common DatalakeHouse technologies

Other Components

  • SQL Alchemy
    https://docs.sqlalchemy.org

  • Pandas
    https://pandas.pydata.org

  • Jupiter Lab
    https://docs.jupyter.org

  • Delta Lake
    Open table format.
    https://delta.io
    Open source framework developed by Databricks. Like other modern table formats, it employs file-level listings, improving the speed of queries considerably compared to the directory-level listing of Hive. Offers enhanced CRUD operations, including the ability to update and delete records in a data lake which would previously have been immutable.
    (Click here for more information about Open Table Formats).

Requirements

Usage

Clone the respository:

git clone https://github.com/tomkat-cr/data_lakehouse_local_stack.git
cd data_lakehouse_local_stack

Download the required packages:

make install

IMPORTANT: this process will take a long time, depending on your Internet connection speed.

Start the local stack (or spark stack):

make run

Run the local Jupiter engine:

make open_local_jupiter

Run the local Minio explorer:

make open_local_minio

Raygun Data Processing

To process a single Raygun error you need to download the Raygun data, composed by a series of JSON files for each time the error happens with all data shown in Raygun, upload the data to S3 (managed locally by Minio), ingest the data using Spark, build the Hive metastore and finally run the SQL queries to get the data analytics.

Raygun Data Preparation

  1. Go to Raygun (https://app.raygun.com), and select the corresponding Application.

  2. Put the checkmark in the error you want to analyze.

  3. Click on the Export selected groups button.

  4. Click on the Add to export list button.

  5. A message like this will be shown:

Great work! Those error groups have been added to the export list.
View your exports by clicking here, or by using the "Export" link under "Crash Reporting" in the sidebar.
  1. Under Crash Reporting in the sidebar, click on the Export link.

  2. Click on the Start export button.

Confirm export

Export all errors between XXXX and YYYY.

Exports can take some time depending on the volume of errors being exported. You will be notified when your export is ready to be downloaded. Once an export is started, another cannot begin until the first has completed.

Exports are generated as a single compressed file. [Learn how to extract them](https://raygun.com/documentation/product-guides/crash-reporting/exporting-data/)

Recipients:
example@address.com
  1. Click on the Start export button.

  2. Wait until the compressed file arraives to your email inbox.

  3. A message arrives to your inbox like this:

Subject: Your Raygun Crash Reporting export is complete for "XXXX"

Your error export has been generated
We have completed the error group export for your application "XXXX". You can now download it below.
Download export - XXX MB
Learn how to extract 7z files
  1. Click on the Download export - XXX MB link.

  2. Put the compressed file in a local directory like ${HOME}/Downloads/raygun-data

  3. Uncompress the file.

  4. A new directory will be created with the JSON files, each one with an error for a date/time, in the directory: ${HOME}/Downloads/raygun-data/assets

  5. To check the total input file size:

Set an environment variable with the path:

ORIGINAL_JSON_FILES_PATH="${HOME}/Downloads/raygun-data/assets"

Get the total size for all files downloaded:

du -sh ${ORIGINAL_JSON_FILES_PATH}

Count the number of files in that directory:

ls -l ${ORIGINAL_JSON_FILES_PATH} | wc -l
  1. Move the files to the data/raygun directory in the Project, or perform the Large number of input data files procedure (see next section) to link the ${HOME}/Downloads/raygun-data/assets to the data/raygun directory.

Large number of input data files

If you have more than 1000 raw data input files, you can use the following procedure to mount the input files directory in the local stack data directory:

  1. Edit the docker-compose configuration file in the project's root:
vi ./docker-compose.yml
  1. Add the data/any_directory_name input files directory in the volumnes section, changing any_directory_name with the name of yours, e.g. raygun:

File: ./docker-compose.yml

version: "3.9"
services:
  spark:
      .
      .
    volumes:
      - /path/to/input/directory/:/home/LocalLakeHouse/Project/data/any_directory_name

So your massive input files will be under the data/any_directory_name directory.

  1. You can also do it by a symbolic link:
ln -s /path/to/input/directory/ data/any_directory_name
  1. Once you finish the ingestion process (see Raygun Data Ingestion section), the link can be removed:

Exit the spark docker container by pressing Ctrl-D (or running the exit command), and run:

unlink data/any_directory_name

Configuration

  1. Change the current directory to the Raygun processing path:
cd /home/LocalLakeHouse/Project/batch_processing/raygun_ip_processing
  1. Copy the configuration template:
cp processing.example.env processing.env
  1. Edit the configuration file:
vi processing.example.env processing.env
  1. Assign the Spark App Name:
SPARK_APPNAME=RaygunErrorTraceAnalysis
  1. Assign the data input sub-directory (under BASE_PATH or /home/LocalLakeHouse/Project):
# Local directory path containing JSON files
INPUT_LOCAL_DIRECTORY="data/raygun"
  1. Assign the Local S3 (Minio) raw data input sub-directory:
# S3 prefix (directory path in the bucket) to store raw data read from
# the local directory
S3_PREFIX="Raw/raygun"
  1. Assign the Cluster storage bucket name, to save the processed Dataframes and be able to resume the execution from the last one when any error aborts the process:
# Dataframe cluster storage bucket name
DF_CLUSTER_STORAGE_BUCKET_PREFIX="ClusterData/RaygunIpSummary"
  1. Assign the attribute name and alias for the IP Address in the Raygun JSON files:
# Desired attribute and alias to filter one column
DESIRED_ATTRIBUTE="Request.IpAddress"
DESIRED_ALIAS=RequestIpAddress
  1. Assign the results sub-directory (under /home/LocalLakeHouse/Project/Outputs)
# Final output result sub-directory
RESULTS_SUB_DIRECTORY=raygun_ip_addresses_summary
  1. Assign the input files reading page size, to prevent errors building the list of large amounts of files.
# S3 pagination page size: 1000 files chunks
S3_PAGE_SIZE-1000
  1. Define the batch size, to read the JSON files in batches during the Spark Dataframe creation. Adjust the batch size based on your memory capacity and data size. 5000 works well in a MacBook with 16 GB of RAM.
# Spark Dataframe creation batch size: 5000 files per batch.
DF_READ_BATCH_SIZE=5000
  1. Assign the Spark driver memory. For example, the Raygun JSON event files have 16 Kb each (small files).
# Spark driver memory
# "3g" for small files it's better 2-3g
# "12g" for big files with more data it's better 4-5g
SPARK_DRIVER_MEMORY=3g
  1. Assign the Spark number of partitions, to optimize parallel processing and memory usage:
# Repartition the DataFrame to optimize parallel processing and memory usage
# (Adjust the number of partitions based on your environment and data size,
#  workload and cluster setup)
DF_NUM_PARTITIONS=200
  1. Assign the number of batches to save Data into the Apache Hive metastore, to prevent errors saving large amounts of files.
# Number of batches to Save Data into Apache Hive
HIVE_BATCHES=10
  1. For other parameters check the Raygun IP processing main Python code, function get_config().

Raygun Data Copy to S3 / Minio

  1. In a terminal window, restart the spark stack:

If the local stack is already running, press Ctrl-C to stop it.

Run this command:

make run

The spark stak docker container will have a /home/LocalLakeHouse/Project directory that's the Project's root directory in your local computer.

  1. Open a second terminal window and enter to the spark docker container:
docker exec -ti spark bash
  1. Then run the load script:
cd /home/LocalLakeHouse/Project
sh ./Scripts/1.init_minio.sh data/raygun

Raygun Data Ingestion

  1. Run the ingest process:

Open a terminal window and enter to the spark docker container:

docker exec -ti spark bash

Run the ingestion process from scratch:

cd Project
MODE=ingest make raygun_ip_processing

When the ingestion process ends, the Build the Hive Metastore and Raygun Data Reporting will run as well.

  1. If the process stops, copy the counter XXXX after the last Persisting... message:

For example:

Persisting DataFrame to disk (round X)...
3) From: XXXX | To: YYYY

Then run:

MODE=ingest FROM=XXXX make raygun_ip_processing

Build the Hive Metastore

If the ingestion process stops and you don't want to resume/finish it, you can run the Hive metastore building, and be able to run the SQL queries and build the reports:

MODE=hive_process make raygun_ip_processing

Raygun Data Reporting

  1. To run the default SQL query using Spark:
MODE=spark_sql make raygun_ip_processing

This process generates a report in the Project's Outputs/raygun_ip_addresses_summary directory, where you will find a part-00000-<some-headecimal-code>.csv file with the results.

The default SQL query that generates the results file is:

SELECT RequestIpAddress, COUNT(*) AS IpCount
  FROM raygun_error_traces
  GROUP BY RequestIpAddress
  ORDER BY IpCount DESC

The result will be the list of IP addresses and the number of times those IPs are in all the Requests evaluated.

  1. To run a custom SQL query using Spark:
SQL='SELECT RequestIpAddress FROM raygun_error_traces GROUP BY RequestIpAddress' MODE=spark_sql make raygun_ip_processing
  1. To run the default SQL query using Trino:
MODE=trino_sql make raygun_ip_processing

Ingest process stats

  1. To check the Dataframe cluster S3 (Minio) files:

This way you can do the ingestion process follow-up because during the Dataframe build step in the ingest process, files are processed in batches (groups of 5,000 files), and each time a batch finishes, the result is written to the S3 cluster directory. If the ingestion process stops, it can be resumed from the last executed batch, and the processed files will be appended to the S3 cluster directory.

Set an environment variable with the path:

CLUSTER_DIRECTORY="/home/LocalLakeHouse/Project/Storage/minio/data-lakehouse/ClusterData/RaygunIpSummary"

The parquet files resulting from the Dataframe build part in the ingestion process. are in the path assigned to CLUSTER_DIRECTORY.

Get the total size for all files currently processed in the Dataframe:

du -sh ${CLUSTER_DIRECTORY}

Count the number of files in that directory:

ls -l ${CLUSTER_DIRECTORY} | wc -l
  1. To check the raw input JSON files uploaded to S3 (Minio):

Set an environment variable with the path:

S3_JSON_FILES_PATH="/home/LocalLakeHouse/Project/Storage/minio/data-lakehouse/Raw/raygun"

Get the total size for all files:

du -sh ${S3_JSON_FILES_PATH}

Count the number of files in that directory:

ls -l ${S3_JSON_FILES_PATH} | wc -l

Minio UI

  1. Run the local Minio explorer:
make open_local_minio
  1. Automatically this URL will be opened in a Browser: http://127.0.0.1:9001

  2. The credentials for the login are in the minio.env file:
    Username (MINIO_ACCESS_KEY): minio_ak
    Password (MINIO_SECRET_KEY): minio_sk

Monitor Spark processes

To access the pyspark web UI:

  1. Run the following command in a terminal window:
make open_pyspark_ui

It runs the following command:

docker exec -ti spark pyspark
  1. Go to this URL in a Browser: http://127.0.0.1:4040

Run the local Jupiter Notebooks

  1. Run the local Jupiter engine:
make open_local_jupiter
  1. Automatically this URL will be opened in a Browser: http://127.0.0.1:8888/lab

  2. A screen will appear asking for the Password or token to authenticate.
    It can be found in the docker attach screen (the one that stays running when you execute make run to start the spark stack).

  3. Seach for a message like this:
    http://127.0.0.1:8888/lab?token=xxxx

  4. The xxxx is the Password or token to authenticate.

Connect to the Jupiter Server in VSC

To connect the Jupiter Server in VSC (Visual Studio Code):

  1. In the docker attach screen, look for a message like this:
    http://127.0.0.1:8888/lab?token=xxxx

  2. The xxxx is the password to be used when the Jupyter Kernel Connection ask for it...

  3. Then select the Existing Jupiter Server option.

  4. Specify the URL: http://127.0.0.1:8888

  5. Specify the password copied in seconf step: xxxx

  6. Select the desired Kernel from the list

The VSC will be connected to the Jupiter Server.

Pokemon Data preparation

To prepare the data for the Pockemon Data Ingestion:

  1. Download the compressed files from: https://github.com/alejogm0520/lakehouses-101-pycon2024/tree/main/data

  2. Copy those files to the data directory.

  3. Decompress the example data files:

cd data
unzip moves.zip
unzip pokemon.zip
unzip types.zip

Jupiter notebooks

License

This is a open-sourced software licensed under the MIT license.

Credits

This project is maintained by Carlos J. Ramirez.

It was forked from the Data Lakehouse 101 repository made by Alejandro Gómez Montoya.

For more information or to contribute to the project, visit Data Lakehouse Local Stack on GitHub.

Happy Coding!

About

Data Lakehouse local stack with PySpark, Trino, and Minio. Includes an example to process Raygun error data and the IP address occurrence.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published