Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for starting from exising BQ schema #40

Closed
wants to merge 10 commits into from
Closed
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ wheels/
.installed.cfg
*.egg
MANIFEST
.tox/
125 changes: 102 additions & 23 deletions bigquery_schema_generator/generate_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,15 @@ def __init__(self,
quoted_values_are_strings=False,
debugging_interval=1000,
debugging_map=False,
sanitize_names=False):
sanitize_names=False,
type_mismatch_callback=None):
self.input_format = input_format
self.infer_mode = infer_mode
self.keep_nulls = keep_nulls
self.quoted_values_are_strings = quoted_values_are_strings
self.debugging_interval = debugging_interval
self.debugging_map = debugging_map
self.type_mismatch_callback = type_mismatch_callback

# 'infer_mode' is supported for only input_format = 'csv' because
# the header line gives us the complete list of fields to be expected in
Expand Down Expand Up @@ -123,7 +125,7 @@ def log_error(self, msg):

# TODO: BigQuery is case-insensitive with regards to the 'name' of the
# field. Verify that the 'name' is unique regardless of the case.
def deduce_schema(self, file):
def deduce_schema(self, file, *, schema_map=None):
"""Loop through each newlined-delimited line of 'file' and
deduce the BigQuery schema. The schema is returned as a recursive map
that contains both the database schema and some additional metadata
Expand Down Expand Up @@ -178,7 +180,9 @@ def deduce_schema(self, file):
else:
raise Exception("Unknown input_format '%s'" % self.input_format)

schema_map = OrderedDict()
if schema_map is None:
schema_map = OrderedDict()

try:
for json_object in reader:
self.line_number += 1
Expand All @@ -204,10 +208,19 @@ def deduce_schema_for_line(self, json_object, schema_map):
then they must be compatible.
"""
for key, value in json_object.items():
schema_entry = schema_map.get(key)
sanitized_key = self.sanitize_name(key)
schema_entry = schema_map.get(sanitized_key)
new_schema_entry = self.get_schema_entry(key, value)
schema_map[key] = self.merge_schema_entry(schema_entry,
new_schema_entry)
schema_map[sanitized_key] = self.merge_schema_entry(schema_entry,
new_schema_entry)

def sanitize_name(self, value):
if self.sanitize_names:
new_value = re.sub('[^a-zA-Z0-9_]', '_', value[:127])
else:
new_value = value
return new_value.lower()


def merge_schema_entry(self, old_schema_entry, new_schema_entry):
"""Merges the 'new_schema_entry' into the 'old_schema_entry' and return
Expand Down Expand Up @@ -261,9 +274,13 @@ def merge_schema_entry(self, old_schema_entry, new_schema_entry):

# Defensive check, names should always be the same.
if old_name != new_name:
raise Exception(
'old_name (%s) != new_name(%s), should never happen' %
(old_name, new_name))
if old_name.lower() != new_name.lower():
raise Exception(
'old_name (%s) != new_name(%s), should never happen' %
(old_name, new_name))
else:
# preserve old name if case is different
new_info['name'] = old_info['name']

# Recursively merge in the subfields of a RECORD, allowing
# NULLABLE to become REPEATED (because 'bq load' allows it).
Expand Down Expand Up @@ -296,14 +313,23 @@ def merge_schema_entry(self, old_schema_entry, new_schema_entry):
# upgraded to a REPEATED {primitive_type}, but currently 'bq load' does
# not support that so we must also follow that rule.
if old_mode != new_mode:
self.log_error(
f'Ignoring non-RECORD field with mismatched mode: '
f'old=({old_status},{old_name},{old_mode},{old_type}); '
f'new=({new_status},{new_name},{new_mode},{new_type})')
return None
# primitive-types are conservatively deduced NULLABLE. In case we
# know a-priori that a field is REQUIRED, we accept that
new_might_be_required = new_mode == 'NULLABLE' and new_schema_entry['filled']
if self.infer_mode and old_mode == 'REQUIRED' and new_might_be_required:
new_info['mode'] = old_mode
else:
self.log_error(
f'Ignoring non-RECORD field with mismatched mode: '
f'old=({old_status},{old_name},{old_mode},{old_type}); '
f'new=({new_status},{new_name},{new_mode},{new_type})')
return None

# Check that the converted types are compatible.
candidate_type = convert_type(old_type, new_type)
if not candidate_type and self.type_mismatch_callback:
# inconvertible -> check if the caller has additional insight
candidate_type = self.type_mismatch_callback(old_type, new_type)
if not candidate_type:
self.log_error(
f'Ignoring field with mismatched type: '
Expand Down Expand Up @@ -506,17 +532,16 @@ def flatten_schema(self, schema_map):
schema_map=schema_map,
keep_nulls=self.keep_nulls,
sorted_schema=self.sorted_schema,
infer_mode=self.infer_mode,
sanitize_names=self.sanitize_names)
infer_mode=self.infer_mode)

def run(self, input_file=sys.stdin, output_file=sys.stdout):
def run(self, input_file=sys.stdin, output_file=sys.stdout, schema_map=None):
"""Read the data records from the input_file and print out the BigQuery
schema on the output_file. The error logs are printed on the sys.stderr.
Args:
input_file: a file-like object (default: sys.stdin)
output_file: a file-like object (default: sys.stdout)
"""
schema_map, error_logs = self.deduce_schema(input_file)
schema_map, error_logs = self.deduce_schema(input_file, schema_map=schema_map)

for error in error_logs:
logging.info("Problem on line %s: %s", error['line'], error['msg'])
Expand Down Expand Up @@ -621,8 +646,7 @@ def is_string_type(thetype):
def flatten_schema_map(schema_map,
keep_nulls=False,
sorted_schema=True,
infer_mode=False,
sanitize_names=False):
infer_mode=False):
"""Converts the 'schema_map' into a more flatten version which is
compatible with BigQuery schema.

Expand Down Expand Up @@ -679,22 +703,77 @@ def flatten_schema_map(schema_map,
else:
# Recursively flatten the sub-fields of a RECORD entry.
new_value = flatten_schema_map(
value, keep_nulls, sorted_schema, sanitize_names)
value, keep_nulls, sorted_schema)
elif key == 'type' and value in ['QINTEGER', 'QFLOAT', 'QBOOLEAN']:
new_value = value[1:]
elif key == 'mode':
if infer_mode and value == 'NULLABLE' and filled:
new_value = 'REQUIRED'
else:
new_value = value
elif key == 'name' and sanitize_names:
new_value = re.sub('[^a-zA-Z0-9_]', '_', value)[0:127]
else:
new_value = value
new_info[key] = new_value
schema.append(new_info)
return schema

def bq_schema_to_map(schema):
""" convert BQ JSON table schema representation to SchemaGenerator schema_map representaton """
if isinstance(schema, dict):
schema = schema['fields']
return OrderedDict((f['name'].lower(), bq_schema_field_to_entry(f))
for f in schema)


BQ_TYPES = frozenset(
'''STRING
BYTES
INTEGER
FLOAT
BOOLEAN
TIMESTAMP
DATE
TIME
DATETIME
RECORD'''.split())

BQ_TYPE_ALIASES = {
'INT64': 'INTEGER',
'FLOAT64': 'FLOAT',
'BOOL': 'BOOLEAN',
'STRUCT': 'RECORD',
}


def bq_type_to_entry_type(type):
if type in BQ_TYPES:
return type
if type in BQ_TYPE_ALIASES:
return BQ_TYPE_ALIASES[type]
raise TypeError(f'Unknown BQ type ""{type}"')


def bq_schema_field_to_entry(field):
type = bq_type_to_entry_type(field['type'])
# maintain order of info fields
if type == 'RECORD':
info = OrderedDict([
('fields', bq_schema_to_map(field['fields'])),
('mode', field['mode']),
('name', field['name']),
('type', type),
])
else:
info = OrderedDict([
('mode', field['mode']),
('name', field['name']),
('type', type),
])
return OrderedDict([
('status', 'hard'),
('filled', field['mode'] != 'NULLABLE'),
('info', info),
])

def main():
# Configure command line flags.
Expand Down
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[build-system]
# These are the assumed default build requirements from pip:
# https://pip.pypa.io/en/stable/reference/pip/#pep-517-and-518-support
requires = ["setuptools>=40.8.0", "wheel"]
build-backend = "setuptools.build_meta"
14 changes: 13 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,17 @@
'console_scripts': [
'generate-schema = bigquery_schema_generator.generate_schema:main'
]
}
},
extras_require={
'dev': [
'flake8',
'pytest',
'tox',
'wheel',
'setuptools',
],
'test': [
'coverage',
],
},
)
46 changes: 33 additions & 13 deletions tests/data_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ class DataReader:
ERRORS
line: msg
...
ERRORS INFORMED
line: msg
...
SCHEMA
bigquery_schema
END
Expand All @@ -57,6 +60,8 @@ class DataReader:

* a DATA section containing the newline-separated JSON data records
* an optional ERRORS section containing the expected error messages
* an optional ERRORS INFORMED section containing the expected error
messages when the schema is known to schema decoder in advance
* a SCHEMA section containing the expected BigQuery schema
* comment lines start with a '#' character.

Expand Down Expand Up @@ -103,6 +108,8 @@ class DataReader:
def __init__(self, testdata_file):
self.testdata_file = testdata_file
self.next_line = None
self.lineno = 0
self.chunk_count = 0

def read_chunk(self):
"""Returns a dict with the next test chunk from the data file,
Expand All @@ -115,19 +122,30 @@ def read_chunk(self):
}
Returns None if there are no more test chunks.
"""
data_flags, records = self.read_data_section()
data_flags, records, line = self.read_data_section()
if data_flags is None:
return None
errors = self.read_errors_section()
error_map = self.process_errors(errors)
error_flags, errors = self.read_errors_section()
if errors and error_flags:
raise Exception("Unexpected error flags in the first ERRORS section")
informed_error_flags, informed_errors = self.read_errors_section()
if informed_errors and "INFORMED" not in informed_error_flags:
raise Exception("Expected INFORMED flag in the second ERRORS section")
error_map = self.process_errors(errors or [])
informed_error_map = self.process_errors(informed_errors or [])
schema = self.read_schema_section()
self.read_end_marker()
self.chunk_count += 1

return {
'chunk_count': self.chunk_count,
'line': line,
'data_flags': data_flags,
'records': records,
'errors': errors,
'errors': errors or [],
'error_map': error_map,
'informed_errors': informed_errors,
'informed_error_map': informed_error_map,
'schema': schema
}

Expand All @@ -138,8 +156,9 @@ def read_data_section(self):

# First tag must be 'DATA [flags]'
tag_line = self.read_line()
lineno = self.lineno
if tag_line is None:
return (None, None)
return (None, None, lineno)
(tag, data_flags) = self.parse_tag_line(tag_line)
if tag != 'DATA':
raise Exception(
Expand All @@ -160,7 +179,7 @@ def read_data_section(self):
break
records.append(line)

return (data_flags, records)
return (data_flags, records, lineno)

def read_errors_section(self):
"""Return a dictionary of errors which are expected from the parsing of
Expand All @@ -174,11 +193,11 @@ def read_errors_section(self):
# The 'ERRORS' section is optional.
tag_line = self.read_line()
if tag_line is None:
return []
(tag, _) = self.parse_tag_line(tag_line)
return None, None
(tag, error_flags) = self.parse_tag_line(tag_line)
if tag != 'ERRORS':
self.push_back(tag_line)
return []
return None, None

# Read the ERRORS records until the next TAG_TOKEN.
errors = []
Expand All @@ -188,12 +207,12 @@ def read_errors_section(self):
raise Exception("Unexpected EOF, should be SCHEMA tag")
(tag, _) = self.parse_tag_line(line)
if tag in self.TAG_TOKENS:
if tag == 'ERRORS':
if tag == 'DATA':
raise Exception("Unexpected ERRORS tag")
self.push_back(line)
break
errors.append(line)
return errors
return error_flags, errors

def read_schema_section(self):
"""Returns the JSON string of the schema section.
Expand All @@ -216,8 +235,8 @@ def read_schema_section(self):
break
(tag, _) = self.parse_tag_line(line)
if tag in self.TAG_TOKENS:
if tag == 'SCHEMA':
raise Exception("Unexpected SCHEMA tag")
if tag in ('DATA', 'ERROR', 'SCHEMA'):
raise Exception("Unexpected {} tag".format(tag))
self.push_back(line)
break
schema_lines.append(line)
Expand Down Expand Up @@ -259,6 +278,7 @@ def read_line(self):

while True:
line = self.testdata_file.readline()
self.lineno += 1
# EOF
if line == '':
return None
Expand Down
Loading