Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move partial parsing to end of parsing. Switch to using msgpack for saved manifest. #3364

Merged
merged 1 commit into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 116 additions & 37 deletions core/dbt/contracts/files.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import hashlib
import os
from dataclasses import dataclass, field
from mashumaro.types import SerializableType
from typing import List, Optional, Union, Dict, Any

from dbt.dataclass_schema import dbtClassMixin, StrEnum

from dbt.exceptions import InternalException

from .util import MacroKey, SourceKey
from .util import SourceKey


MAXIMUM_SEED_SIZE = 1 * 1024 * 1024
Expand All @@ -23,7 +22,20 @@ class ParseFileType(StrEnum):
Seed = 'seed'
Documentation = 'docs'
Schema = 'schema'
Hook = 'hook'
Hook = 'hook' # not a real filetype, from dbt_project.yml


parse_file_type_to_parser = {
ParseFileType.Macro: 'MacroParser',
ParseFileType.Model: 'ModelParser',
ParseFileType.Snapshot: 'SnapshotParser',
ParseFileType.Analysis: 'AnalysisParser',
ParseFileType.Test: 'DataTestParser',
ParseFileType.Seed: 'SeedParser',
ParseFileType.Documentation: 'DocumentationParser',
ParseFileType.Schema: 'SchemaParser',
ParseFileType.Hook: 'HookParser',
}


@dataclass
Expand Down Expand Up @@ -122,7 +134,7 @@ def original_file_path(self):


@dataclass
class SourceFile(dbtClassMixin):
class BaseSourceFile(dbtClassMixin, SerializableType):
"""Define a source file in dbt"""
path: Union[FilePath, RemoteFile] # the path information
checksum: FileHash
Expand All @@ -131,45 +143,57 @@ class SourceFile(dbtClassMixin):
# Parse file type: i.e. which parser will process this file
parse_file_type: Optional[ParseFileType] = None
# we don't want to serialize this
_contents: Optional[str] = None
contents: Optional[str] = None
# the unique IDs contained in this file
nodes: List[str] = field(default_factory=list)
docs: List[str] = field(default_factory=list)
macros: List[str] = field(default_factory=list)
sources: List[str] = field(default_factory=list)
exposures: List[str] = field(default_factory=list)
# any node patches in this file. The entries are names, not unique ids!
patches: List[str] = field(default_factory=list)
# any macro patches in this file. The entries are package, name pairs.
macro_patches: List[MacroKey] = field(default_factory=list)
# any source patches in this file. The entries are package, name pairs
source_patches: List[SourceKey] = field(default_factory=list)
# temporary - for schema source files only
dict_from_yaml: Optional[Dict[str, Any]] = None

@property
def search_key(self) -> Optional[str]:
def file_id(self):
if isinstance(self.path, RemoteFile):
return None
if self.checksum.name == 'none':
return None
return self.path.search_key
return f'{self.project_name}://{self.path.original_file_path}'

@property
def contents(self) -> str:
if self._contents is None:
raise InternalException('SourceFile has no contents!')
return self._contents

@contents.setter
def contents(self, value):
self._contents = value
def _serialize(self):
dct = self.to_dict()
if 'pp_files' in dct:
del dct['pp_files']
if 'pp_test_index' in dct:
del dct['pp_test_index']
return dct

@classmethod
def empty(cls, path: FilePath) -> 'SourceFile':
self = cls(path=path, checksum=FileHash.empty())
self.contents = ''
return self
def _deserialize(cls, dct: Dict[str, int]):
if dct['parse_file_type'] == 'schema':
# TODO: why are these keys even here
if 'pp_files' in dct:
del dct['pp_files']
if 'pp_test_index' in dct:
del dct['pp_test_index']
sf = SchemaSourceFile.from_dict(dct)
else:
sf = SourceFile.from_dict(dct)
return sf

def __post_serialize__(self, dct):
dct = super().__post_serialize__(dct)
# remove empty lists to save space
dct_keys = list(dct.keys())
for key in dct_keys:
if isinstance(dct[key], list) and not dct[key]:
del dct[key]
# remove contents. Schema files will still have 'dict_from_yaml'
# from the contents
if 'contents' in dct:
del dct['contents']
return dct


@dataclass
class SourceFile(BaseSourceFile):
nodes: List[str] = field(default_factory=list)
docs: List[str] = field(default_factory=list)
macros: List[str] = field(default_factory=list)

@classmethod
def big_seed(cls, path: FilePath) -> 'SourceFile':
Expand All @@ -178,8 +202,63 @@ def big_seed(cls, path: FilePath) -> 'SourceFile':
self.contents = ''
return self

# TODO: do this a different way. This remote file kludge isn't going
# to work long term
@classmethod
def remote(cls, contents: str) -> 'SourceFile':
self = cls(path=RemoteFile(), checksum=FileHash.empty())
self.contents = contents
def remote(cls, contents: str, project_name: str) -> 'SourceFile':
self = cls(
path=RemoteFile(),
checksum=FileHash.from_contents(contents),
project_name=project_name,
contents=contents,
)
return self


@dataclass
class SchemaSourceFile(BaseSourceFile):
dfy: Dict[str, Any] = field(default_factory=dict)
# these are in the manifest.nodes dictionary
tests: List[str] = field(default_factory=list)
sources: List[str] = field(default_factory=list)
exposures: List[str] = field(default_factory=list)
# node patches contain models, seeds, snapshots, analyses
ndp: List[str] = field(default_factory=list)
# any macro patches in this file by macro unique_id.
mcp: List[str] = field(default_factory=list)
# any source patches in this file. The entries are package, name pairs
# Patches are only against external sources. Sources can be
# created too, but those are in 'sources'
sop: List[SourceKey] = field(default_factory=list)
pp_dict: Optional[Dict[str, Any]] = None
pp_test_index: Optional[Dict[str, Any]] = None

@property
def dict_from_yaml(self):
return self.dfy

@property
def node_patches(self):
return self.ndp

@property
def macro_patches(self):
return self.mcp

@property
def source_patches(self):
return self.sop

def __post_serialize__(self, dct):
dct = super().__post_serialize__(dct)
if 'pp_files' in dct:
del dct['pp_files']
if 'pp_test_index' in dct:
del dct['pp_test_index']
return dct

def append_patch(self, yaml_key, unique_id):
self.node_patches.append(unique_id)


AnySourceFile = Union[SchemaSourceFile, SourceFile]
Loading