Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for GCS #21

Merged
merged 10 commits into from
Nov 11, 2023
Merged
43 changes: 42 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dbt-loom currently supports obtaining model definitions from:

- Local manifest files
- dbt Cloud
- GCS
- S3-compatible object storage services [TODO]

:warning: **dbt Core's plugin functionality is still in beta. Please note that this may break in the future as dbt Labs solidifies the dbt plugin API in future versions.**
Expand Down Expand Up @@ -72,7 +73,7 @@ manifests:
config:
account_id: <YOUR DBT CLOUD ACCOUNT ID>

# Job ID pertains to the job that you'd like to fetch artifacts from
# Job ID pertains to the job that you'd like to fetch artifacts from.
job_id: <REFERENCE JOB ID>

api_endpoint: <DBT CLOUD ENDPOINT>
Expand All @@ -84,6 +85,46 @@ manifests:
# which to fetch artifacts. Defaults to the last step.
```

### Using GCS as an artifact source

You can use dbt-loom to fetch manifest files from Google Cloud Storage by setting up a `gcs` manifest in your `dbt-loom` config.

```yaml
manifests:
- name: project_name
type: gcs
config:
nawfel-bacha marked this conversation as resolved.
Show resolved Hide resolved
project_id: <YOUR GCP PROJECT ID>
# The alphanumeric ID of the GCP project that contains your target bucket.

bucket_name: <YOUR GCS BUCKET NAME>
# The name of the bucket where your manifest is stored.

object_name: <YOUR OBJECT NAME>
# The object name of your manifest file.

credentials: <PATH TO YOUR SERVICE ACCOUNT JSON CREDENTIALS>
# The OAuth2 Credentials to use. If not passed, falls back to the default inferred from the environment.
```

### Using environment variables

You can easily incorporate your own environment variables into the config file. This allows for dynamic configuration values that can change based on the environment. To specify an environment variable in the `dbt-loom` config file, use one of the following formats:

`${ENV_VAR}` or `$ENV_VAR`

#### Example:

```yaml
manifests:
- name: revenue
type: gcs
config:
project_id: ${GCP_PROJECT}
bucket_name: ${GCP_BUCKET}
object_name: ${MANIFEST_PATH}
```

## How does it work?

As of dbt-core 1.6.0-b8, there now exists a `dbtPlugin` class which defines functions that can
Expand Down
52 changes: 48 additions & 4 deletions dbt_loom/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import json
import os
import re
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Union


import yaml
from dbt.contracts.graph.node_args import ModelNodeArgs
from dbt.plugins.manager import dbt_hook, dbtPlugin
Expand All @@ -13,13 +13,15 @@
from pydantic import BaseModel

from .clients.dbt_cloud import DbtCloud
from .clients.gcs import GCSClient


class ManifestReferenceType(str, Enum):
"""Type of ManifestReference"""

file = "file"
dbt_cloud = "dbt_cloud"
gcs = "gcs"


class FileReferenceConfig(BaseModel):
Expand All @@ -37,12 +39,21 @@ class DbtCloudReferenceConfig(BaseModel):
step: Optional[int] = None


class GCSReferenceConfig(BaseModel):
"""Configuration for a GCS reference"""

project_id: str
bucket_name: str
object_name: str
credentials: Optional[Path] = None


class ManifestReference(BaseModel):
"""Reference information for a manifest to be loaded into dbt-loom."""

name: str
type: ManifestReferenceType
config: Union[FileReferenceConfig, DbtCloudReferenceConfig]
config: Union[FileReferenceConfig, DbtCloudReferenceConfig, GCSReferenceConfig]


class dbtLoomConfig(BaseModel):
Expand All @@ -60,6 +71,7 @@ def __init__(self):
self.loading_functions = {
ManifestReferenceType.file: self.load_from_local_filesystem,
ManifestReferenceType.dbt_cloud: self.load_from_dbt_cloud,
ManifestReferenceType.gcs: self.load_from_gcs,
}

@staticmethod
Expand All @@ -79,6 +91,18 @@ def load_from_dbt_cloud(config: DbtCloudReferenceConfig) -> Dict:

return client.get_models(config.job_id, step=config.step)

@staticmethod
def load_from_gcs(config: GCSReferenceConfig) -> Dict:
"""Load a manifest dictionary from a GCS bucket."""
gcs_client = GCSClient(
project_id=config.project_id,
bucket_name=config.bucket_name,
object_name=config.object_name,
credentials=config.credentials,
)

return gcs_client.load_manifest()

def load(self, manifest_reference: ManifestReference) -> Dict:
"""Load a manifest dictionary based on a ManifestReference input."""

Expand Down Expand Up @@ -129,7 +153,10 @@ def convert_model_nodes_to_model_node_args(
unique_id: ModelNodeArgs(
name=node.get("name"),
package_name=node.get("package_name"),
identifier=node.get("relation_name").split(".")[-1].replace('"', ""),
identifier=node.get("relation_name")
.split(".")[-1]
.replace('"', "")
.replace('`', ""),
nawfel-bacha marked this conversation as resolved.
Show resolved Hide resolved
schema=node.get("schema"),
database=node.get("database"),
relation_name=node.get("relation_name"),
Expand Down Expand Up @@ -169,7 +196,24 @@ def read_config(self, path: Path) -> Optional[dbtLoomConfig]:
if not path.exists():
return None

return dbtLoomConfig(**yaml.load(open(path), yaml.SafeLoader))
with open(path) as file:
config_content = file.read()

config_content = self.replace_env_variables(config_content)

return dbtLoomConfig(**yaml.load(config_content, yaml.SafeLoader))

@staticmethod
def replace_env_variables(config_str: str) -> str:
"""Replace environment variable placeholders in the configuration string."""
pattern = r'\$(\w+)|\$\{([^}]+)\}'
return re.sub(
pattern,
lambda match: os.environ.get(
match.group(1) if match.group(1) is not None else match.group(2), ''
),
config_str,
)
nawfel-bacha marked this conversation as resolved.
Show resolved Hide resolved

def initialize(self) -> None:
"""Initialize the plugin"""
Expand Down
44 changes: 44 additions & 0 deletions dbt_loom/clients/gcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import json
from pathlib import Path
from typing import Dict, Optional

from google.cloud import storage


class GCSClient:
"""Client for GCS. Fetches manifest for a given bucket."""

def __init__(
self,
project_id: str,
bucket_name: str,
object_name: str,
credentials: Optional[Path] = None,
) -> None:
self.project_id = project_id
self.bucket_name = bucket_name
self.object_name = object_name
self.credentials = credentials

def load_manifest(self) -> Dict:
"""Load a manifest json from a GCS bucket."""
client = (
storage.Client.from_service_account_json(
self.credentials, project=self.project_id
)
if self.credentials
else storage.Client(project=self.project_id)
)
bucket = client.get_bucket(self.bucket_name)
blob = bucket.get_blob(self.object_name)
if not blob:
raise Exception(
f"The object `{self.object_name}` does not exist in bucket `{self.bucket_name}`."
)

manifest_json = blob.download_as_text()

try:
return json.loads(manifest_json)
except:
raise Exception(f"The object `{self.object_name}` is not a valid JSON.")
Loading
Loading