Skip to content

Commit

Permalink
feat(ingest): delta-lake: adding support for delta lake (#5259)
Browse files Browse the repository at this point in the history
Co-authored-by: Shirshanka Das <shirshanka@apache.org>
  • Loading branch information
MugdhaHardikar-GSLab and shirshanka authored Jun 27, 2022
1 parent f9aca8a commit 5455122
Show file tree
Hide file tree
Showing 51 changed files with 3,946 additions and 185 deletions.
Binary file added datahub-web-react/src/images/deltalakelogo.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
142 changes: 142 additions & 0 deletions metadata-ingestion/docs/sources/delta-lake/delta-lake.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
## Usage Guide

If you are new to [Delta Lake](https://delta.io/) and want to test out a simple integration with Delta Lake and DataHub, you can follow this guide.

### Delta Table on Local File System

#### Step 1
Create a delta table using the sample PySpark code below if you don't have a delta table you can point to.

```python
import uuid
import random
from pyspark.sql import SparkSession
from delta.tables import DeltaTable

def generate_data():
return [(y, m, d, str(uuid.uuid4()), str(random.randrange(10000) % 26 + 65) * 3, random.random()*10000)
for d in range(1, 29)
for m in range(1, 13)
for y in range(2000, 2021)]

jar_packages = ["org.apache.hadoop:hadoop-aws:3.2.3", "io.delta:delta-core_2.12:1.2.1"]
spark = SparkSession.builder \
.appName("quickstart") \
.master("local[*]") \
.config("spark.jars.packages", ",".join(jar_packages)) \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()

table_path = "quickstart/my-table"
columns = ["year", "month", "day", "sale_id", "customer", "total_cost"]
spark.sparkContext.parallelize(generate_data()).toDF(columns).repartition(1).write.format("delta").save(table_path)

df = spark.read.format("delta").load(table_path)
df.show()

```

#### Step 2
Create a datahub ingestion yaml file (delta.dhub.yaml) to ingest metadata from the delta table you just created.

```yaml
source:
type: "delta-lake"
config:
base_path: "quickstart/my-table"

sink:
type: "datahub-rest"
config:
server: "http://localhost:8080"
```
Note: Make sure you run the Spark code as well as recipe from same folder otherwise use absolute paths.
#### Step 3
Execute the ingestion recipe:
```shell
datahub ingest -c delta.dhub.yaml
```

### Delta Table on S3

#### Step 1
Set up your AWS credentials by creating an AWS credentials config file; typically in '$HOME/.aws/credentials'.
```
[my-creds]
aws_access_key_id: ######
aws_secret_access_key: ######
```
Step 2: Create a Delta Table using the PySpark sample code below unless you already have Delta Tables on your S3.
```python
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from configparser import ConfigParser
import uuid
import random
def generate_data():
return [(y, m, d, str(uuid.uuid4()), str(random.randrange(10000) % 26 + 65) * 3, random.random()*10000)
for d in range(1, 29)
for m in range(1, 13)
for y in range(2000, 2021)]

jar_packages = ["org.apache.hadoop:hadoop-aws:3.2.3", "io.delta:delta-core_2.12:1.2.1"]
spark = SparkSession.builder \
.appName("quickstart") \
.master("local[*]") \
.config("spark.jars.packages", ",".join(jar_packages)) \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()


config_object = ConfigParser()
config_object.read("$HOME/.aws/credentials")
profile_info = config_object["my-creds"]
access_id = profile_info["aws_access_key_id"]
access_key = profile_info["aws_secret_access_key"]

hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.access.key", access_id)
hadoop_conf.set("fs.s3a.secret.key", access_key)

table_path = "s3a://my-bucket/my-folder/sales-table"
columns = ["year", "month", "day", "sale_id", "customer", "total_cost"]
spark.sparkContext.parallelize(generate_data()).toDF(columns).repartition(1).write.format("delta").save(table_path)
df = spark.read.format("delta").load(table_path)
df.show()

```

#### Step 3
Create a datahub ingestion yaml file (delta.s3.dhub.yaml) to ingest metadata from the delta table you just created.

```yml
source:
type: "delta-lake"
config:
base_path: "s3://my-bucket/my-folder/sales-table"
s3:
aws_config:
aws_access_key_id: <<Access key>>
aws_secret_access_key: <<secret key>>

sink:
type: "datahub-rest"
config:
server: "http://localhost:8080"
```
#### Step 4
Execute the ingestion recipe:
```shell
datahub ingest -c delta.s3.dhub.yaml
```

### Note

The above recipes are minimal recipes. Please refer to [Config Details](#config-details) section for the full configuration.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
source:
type: delta-lake
config:
env: "PROD"
platform_instance: "my-delta-lake"
base_path: "/path/to/data/folder"

sink:
# sink configs
10 changes: 9 additions & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ def get_long_description():
"wcmatch",
}

delta_lake = {
*s3_base,
"deltalake",
}

usage_common = {
"sqlparse",
}
Expand Down Expand Up @@ -196,6 +201,7 @@ def get_long_description():
"datahub-business-glossary": set(),
"data-lake": {*data_lake_base, *data_lake_profiling},
"s3": {*s3_base, *data_lake_profiling},
"delta-lake": {*data_lake_profiling, *delta_lake},
"dbt": {"requests"} | aws_common,
"druid": sql_common | {"pydruid>=0.6.2"},
# Starting with 7.14.0 python client is checking if it is connected to elasticsearch client. If its not it throws
Expand Down Expand Up @@ -350,6 +356,7 @@ def get_long_description():
"redshift",
"redshift-usage",
"data-lake",
"delta-lake",
"s3",
"tableau",
"trino",
Expand Down Expand Up @@ -447,7 +454,7 @@ def get_long_description():
entry_points = {
"console_scripts": ["datahub = datahub.entrypoints:main"],
"datahub.ingestion.source.plugins": [
"csv-enricher = datahub.ingestion.source.csv_enricher:CSVEnricherSource",
"csv-enricher = datahub.ingestion.source.csv_enricher:CSVEnricherSource",
"file = datahub.ingestion.source.file:GenericFileSource",
"sqlalchemy = datahub.ingestion.source.sql.sql_generic:SQLAlchemyGenericSource",
"athena = datahub.ingestion.source.sql.athena:AthenaSource",
Expand All @@ -457,6 +464,7 @@ def get_long_description():
"clickhouse = datahub.ingestion.source.sql.clickhouse:ClickHouseSource",
"clickhouse-usage = datahub.ingestion.source.usage.clickhouse_usage:ClickHouseUsageSource",
"data-lake = datahub.ingestion.source.data_lake:DataLakeSource",
"delta-lake = datahub.ingestion.source.delta_lake:DeltaLakeSource",
"s3 = datahub.ingestion.source.s3:S3Source",
"dbt = datahub.ingestion.source.dbt:DBTSource",
"druid = datahub.ingestion.source.sql.druid:DruidSource",
Expand Down
97 changes: 96 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,103 @@
import logging
import os
from typing import Iterable, Optional

from datahub.emitter.mce_builder import make_tag_urn
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.aws.aws_common import AwsSourceConfig
from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass

S3_PREFIXES = ["s3://", "s3n://", "s3a://"]

logging.getLogger("py4j").setLevel(logging.ERROR)
logger: logging.Logger = logging.getLogger(__name__)


def get_s3_tags(
bucket_name: str,
key_name: Optional[str],
dataset_urn: str,
aws_config: Optional[AwsSourceConfig],
ctx: PipelineContext,
use_s3_bucket_tags: Optional[bool] = False,
use_s3_object_tags: Optional[bool] = False,
) -> Optional[GlobalTagsClass]:
if aws_config is None:
raise ValueError("aws_config not set. Cannot browse s3")
new_tags = GlobalTagsClass(tags=[])
tags_to_add = []
if use_s3_bucket_tags:
s3 = aws_config.get_s3_resource()
bucket = s3.Bucket(bucket_name)
try:
tags_to_add.extend(
[
make_tag_urn(f"""{tag["Key"]}:{tag["Value"]}""")
for tag in bucket.Tagging().tag_set
]
)
except s3.meta.client.exceptions.ClientError:
logger.warn(f"No tags found for bucket={bucket_name}")

if use_s3_object_tags and key_name is not None:
s3_client = aws_config.get_s3_client()
object_tagging = s3_client.get_object_tagging(Bucket=bucket_name, Key=key_name)
tag_set = object_tagging["TagSet"]
if tag_set:
tags_to_add.extend(
[make_tag_urn(f"""{tag["Key"]}:{tag["Value"]}""") for tag in tag_set]
)
else:
# Unlike bucket tags, if an object does not have tags, it will just return an empty array
# as opposed to an exception.
logger.warn(f"No tags found for bucket={bucket_name} key={key_name}")
if len(tags_to_add) == 0:
return None
if ctx.graph is not None:
logger.debug("Connected to DatahubApi, grabbing current tags to maintain.")
current_tags: Optional[GlobalTagsClass] = ctx.graph.get_aspect_v2(
entity_urn=dataset_urn,
aspect="globalTags",
aspect_type=GlobalTagsClass,
)
if current_tags:
tags_to_add.extend([current_tag.tag for current_tag in current_tags.tags])
else:
logger.warn("Could not connect to DatahubApi. No current tags to maintain")
# Remove duplicate tags
tags_to_add = list(set(tags_to_add))
new_tags = GlobalTagsClass(
tags=[TagAssociationClass(tag_to_add) for tag_to_add in tags_to_add]
)
return new_tags


def list_folders_path(
s3_uri: str, aws_config: Optional[AwsSourceConfig]
) -> Iterable[str]:
if not is_s3_uri(s3_uri):
raise ValueError("Not a s3 URI: " + s3_uri)
if aws_config is None:
raise ValueError("aws_config not set. Cannot browse s3")
bucket_name = get_bucket_name(s3_uri)
prefix = get_bucket_relative_path(s3_uri)
yield from list_folders(bucket_name, prefix, aws_config)


def list_folders(
bucket_name: str, prefix: str, aws_config: Optional[AwsSourceConfig]
) -> Iterable[str]:
if aws_config is None:
raise ValueError("aws_config not set. Cannot browse s3")
s3_client = aws_config.get_s3_client()
paginator = s3_client.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix, Delimiter="/"):
for o in page.get("CommonPrefixes", []):
folder: str = str(o.get("Prefix"))
if folder.endswith("/"):
folder = folder[:-1]
yield f"{folder}"


def is_s3_uri(uri: str) -> bool:
return any(uri.startswith(prefix) for prefix in S3_PREFIXES)
Expand All @@ -24,7 +120,6 @@ def get_bucket_relative_path(s3_uri: str) -> str:


def make_s3_urn(s3_uri: str, env: str) -> str:

s3_name = strip_s3_prefix(s3_uri)

if s3_name.endswith("/"):
Expand Down
Loading

0 comments on commit 5455122

Please sign in to comment.