Skip to content

Commit

Permalink
remove @typed_data and move type checking support to datatype()
Browse files Browse the repository at this point in the history
  • Loading branch information
cosmicexplorer committed Apr 26, 2018
1 parent ebde56a commit 9e8055f
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 428 deletions.
327 changes: 88 additions & 239 deletions src/python/pants/util/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
from __future__ import (absolute_import, division, generators, nested_scopes, print_function,
unicode_literals, with_statement)

import inspect
import re
import sys
from abc import abstractmethod
from collections import OrderedDict, namedtuple
Expand All @@ -17,11 +15,53 @@
from pants.util.meta import AbstractClass


def datatype(*args, **kwargs):
def datatype(name, field_decls, **kwargs):
"""A wrapper for `namedtuple` that accounts for the type of the object in equality."""
class DataType(namedtuple(*args, **kwargs)):
field_names = []
fields_with_constraints = OrderedDict()
invalid_decl_errs = []
for maybe_decl in field_decls:
# ('field_name', type)
if isinstance(maybe_decl, tuple):
field_name, field_type = maybe_decl
fields_with_constraints[field_name] = Exactly(field_type)
else:
# interpret it as a field name without a type to check
field_name = maybe_decl
# namedtuple() already checks field uniqueness
field_names.append(field_name)

namedtuple_cls = namedtuple(
name,
# '_anonymous_namedtuple_subclass',
field_names, **kwargs)

class DataType(namedtuple_cls):
# TODO: remove this? namedtuple already does this
__slots__ = ()

@classmethod
def make_type_error(cls, msg):
return TypeCheckError(cls.__name__, msg)

def __new__(cls, *args, **kwargs):
this_object = super(DataType, cls).__new__(cls, *args, **kwargs)

# TODO(cosmicexplorer): Make this kind of exception pattern (filter for
# errors then display them all at once) more ergonomic.
type_failure_msgs = []
for field_name, field_constraint in fields_with_constraints.items():
field_value = getattr(this_object, field_name)
try:
field_constraint.validate_satisfied_by(field_value)
except TypeConstraintError as e:
type_failure_msgs.append(
"field '{}' was invalid: {}".format(field_name, e))
if type_failure_msgs:
raise cls.make_type_error('\n'.join(type_failure_msgs))

return this_object

def __eq__(self, other):
if self is other:
return True
Expand All @@ -40,20 +80,55 @@ def __ne__(self, other):
def __iter__(self):
raise TypeError("'{}' object is not iterable".format(type(self).__name__))

def _super_iter(self):
return super(DataType, self).__iter__()

def _asdict(self):
'''Return a new OrderedDict which maps field names to their values'''
return OrderedDict(zip(self._fields, super(DataType, self).__iter__()))
return OrderedDict(zip(self._fields, self._super_iter()))

def _replace(_self, **kwds):
'''Return a new datatype object replacing specified fields with new values'''
result = _self._make(map(kwds.pop, _self._fields, super(DataType, _self).__iter__()))
result = _self._make(map(kwds.pop, _self._fields, _self._super_iter()))
if kwds:
raise ValueError('Got unexpected field names: %r' % kwds.keys())
return result

def __getnewargs__(self):
'''Return self as a plain tuple. Used by copy and pickle.'''
return tuple(super(DataType, self).__iter__())
return tuple(self._super_iter())

def __repr__(self):
args_formatted = []
for field_name in field_names:
field_value = getattr(self, field_name)
args_formatted.append("{}={!r}".format(field_name, field_value))
return '{class_name}({args_joined})'.format(
class_name=type(self).__name__,
args_joined=', '.join(args_formatted))

def __str__(self):
elements_formatted = []
for field_name in field_names:
constraint_for_field = fields_with_constraints.get(field_name, None)
field_value = getattr(self, field_name)
if not constraint_for_field:
elements_formatted.append(
"{field_name}={field_value}"
.format(field_name=field_name,
field_value=field_value))
else:
elements_formatted.append(
"{field_name}<{type_constraint}>={field_value}"
.format(field_name=field_name,
type_constraint=constraint_for_field,
field_value=field_value))
return '{class_name}({typed_tagged_elements})'.format(
class_name=type(self).__name__,
typed_tagged_elements=', '.join(elements_formatted))

# TODO: remove!
DataType.__name__ = str(name)

return DataType

Expand Down Expand Up @@ -146,7 +221,7 @@ def validate_satisfied_by(self, obj):
return obj

raise TypeConstraintError(
"value {!r} (with type {!r}) does not satisfy this type constraint: {!r}."
"value {!r} (with type {!r}) must satisfy this type constraint: {!r}."
.format(obj, type(obj).__name__, self))

def __hash__(self):
Expand Down Expand Up @@ -212,238 +287,12 @@ def satisfied_by_type(self, obj_type):
return issubclass(obj_type, self._types)


class FieldType(Exactly):
"""A TypeConstraint which matches exactly one type, with a name string.
Create using `FieldType.create_from_type(cls)` to generate a FieldType with a
predictable, unique name string from the class `cls` which is used in
`typed_datatype()` to generate property names passed in to `datatype()`.
"""

class FieldTypeConstructionError(Exception):
"""Raised on invalid arguments on creation."""

class FieldTypeNameError(FieldTypeConstructionError):
"""Raised if a type object has an invalid name."""

CAMEL_CASE_TYPE_NAME = re.compile('\A([A-Z][a-z]*)+\Z')
CAMEL_CASE_SPLIT_PATTERN = re.compile('[A-Z][a-z]*')

LOWER_CASE_TYPE_NAME = re.compile('\A[a-z]+\Z')

@classmethod
def _transform_type_field_name(cls, type_name):
if cls.LOWER_CASE_TYPE_NAME.match(type_name):
# double underscore here ensures no clash with camel-case type names
return 'primitive__{}'.format(type_name)

if cls.CAMEL_CASE_TYPE_NAME.match(type_name):
split_by_camel_downcased = []
for m in cls.CAMEL_CASE_SPLIT_PATTERN.finditer(type_name):
camel_group = m.group(0)
downcased = camel_group.lower()
split_by_camel_downcased.append(downcased)
return '_'.join(split_by_camel_downcased)

raise cls.FieldTypeNameError(
"Type name {!r} must be camel-cased with an initial capital, "
"or all lowercase. Only ASCII alphabetical characters are allowed."
.format(type_name))

def __init__(self, single_type, field_name):
if not isinstance(single_type, type):
raise self.FieldTypeConstructionError(
"single_type is not a type: was {!r} (type {!r})."
.format(single_type, type(single_type).__name__))
if not isinstance(field_name, str):
raise self.FieldTypeConstructionError(
"field_name is not a str: was {!r} (type {!r})."
.format(field_name, type(field_name).__name__))

super(FieldType, self).__init__(single_type)

self._field_name = field_name

@property
def field_name(self):
return self._field_name

@property
def field_type(self):
return self.types[0]

def validate_satisfies_field(self, obj):
"""Return `obj` if it satisfies this type constraint, or raise.
Use this method over `self.validate_satisfied_by()` to provide a more
specific error message for the FieldType class.
:raises: `TypeConstraintError` if the given object does not satisfy this
type constraint.
"""
if self.satisfied_by(obj):
return obj

raise TypeConstraintError(
"value {!r} (with type {!r}) must be an instance of type {!r}."
.format(obj, type(obj).__name__, self.field_type.__name__))

def __repr__(self):
fmt_str = 'FieldType({field_type}, {field_name!r})'
return fmt_str.format(field_type=self.field_type.__name__,
field_name=self.field_name)

@classmethod
def create_from_type(cls, type_obj):
"""Generate a FieldType with a predictable name string from the given type.
The field name will be a deterministic and unique string generated from the
type `type_obj`. `type_obj` must have a camel-cased name (e.g. MyType) or
all-lowercased name (e.g. int).
"""
if not isinstance(type_obj, type):
raise cls.FieldTypeConstructionError(
"type_obj is not a type: was {!r} (type {!r})"
.format(type_obj, type(type_obj).__name__))
transformed_type_name = cls._transform_type_field_name(type_obj.__name__)
return cls(type_obj, str(transformed_type_name))


def typed_datatype(type_name, field_decls):
"""A wrapper over namedtuple which accepts a dict of field names and types.
This can be used to very concisely define classes which have fields that are
type-checked at construction.
"""

type_name = str(type_name)

if not isinstance(field_decls, tuple):
raise TypedDatatypeClassConstructionError(
type_name,
"field_decls is not a tuple: {!r}".format(field_decls))
if field_decls is ():
raise TypedDatatypeClassConstructionError(
type_name,
"no fields were declared")

# TODO(cosmicexplorer): Make this kind of exception pattern (filter for errors
# then display them all at once) more ergonomic.
type_constraints = OrderedSet()
invalid_decl_errs = []
for maybe_decl in field_decls:
try:
field_constraint = FieldType.create_from_type(maybe_decl)
except FieldType.FieldTypeConstructionError as e:
invalid_decl_errs.append(str(e))
continue

if field_constraint in type_constraints:
invalid_decl_errs.append(
"type {!r} was already used as a field"
.format(field_constraint.field_type.__name__))
else:
type_constraints.add(field_constraint)
if invalid_decl_errs:
raise TypedDatatypeClassConstructionError(
type_name,
"invalid field declarations:\n{}".format('\n'.join(invalid_decl_errs)))

# This is a tuple of FieldType instances for the arguments given.
field_type_tuple = tuple(type_constraints)
# This is a tuple of type names, for use in error messages.
type_names_joined = "({})".format(
' '.join("{},".format(f.field_type.__name__) for f in field_type_tuple))

datatype_cls = datatype(type_name, [t.field_name for t in field_type_tuple])

class TypedDatatype(datatype_cls):

def __new__(cls, *args, **kwargs):
if kwargs:
raise TypedDatatypeInstanceConstructionError(
type_name,
"""typed_datatype() subclasses can only be constructed with positional arguments! The class {class_name} requires {field_types} as arguments.
The args provided were: {args!r}.
The kwargs provided were: {kwargs!r}."""
.format(class_name=cls.__name__,
field_types=type_names_joined,
args=args,
kwargs=kwargs))

if len(args) != len(field_type_tuple):
raise TypedDatatypeInstanceConstructionError(
type_name,
"""{num_args} args were provided, but expected {expected_num_args}: {field_types}.
The args provided were: {args!r}."""
.format(num_args=len(args),
expected_num_args=len(field_type_tuple),
field_types=type_names_joined,
args=args))

type_failure_msgs = []
for field_idx, field_value in enumerate(args):
constraint_for_field = field_type_tuple[field_idx]
try:
constraint_for_field.validate_satisfies_field(field_value)
except TypeConstraintError as e:
type_failure_msgs.append(
"field '{}' was invalid: {}"
.format(constraint_for_field.field_name, e))
if type_failure_msgs:
raise TypeCheckError(type_name, '\n'.join(type_failure_msgs))

return super(TypedDatatype, cls).__new__(cls, *args)

def __repr__(self):
formatted_args = [repr(arg) for arg in self.__getnewargs__()]
return '{class_name}({args_joined})'.format(
class_name=type(self).__name__,
args_joined=', '.join(formatted_args))

def __str__(self):
arg_tuple = self.__getnewargs__()
elements_formatted = []
for field_idx, field_value in enumerate(arg_tuple):
constraint_for_field = field_type_tuple[field_idx]
elements_formatted.append("{field_name}<{type_name}>={arg}".format(
field_name=constraint_for_field.field_name,
type_name=constraint_for_field.field_type.__name__,
arg=field_value))
return '{class_name}({typed_tagged_elements})'.format(
class_name=type(self).__name__,
typed_tagged_elements=', '.join(elements_formatted))

@classmethod
def make_type_error(cls, msg):
return TypeCheckError(cls.__name__, msg)

return TypedDatatype


# @typed_data(int, str)
# class MyTypedData(SomeMixin):
# # source code...
#
# |
# |
# V
#
# class MyTypedData(typed_datatype('MyTypedData', (int, str)), SomeMixin):
# # source code...
def typed_data(*fields):

def from_class(cls):
if not inspect.isclass(cls):
raise ValueError("The @typed_data() decorator must be applied "
"innermost of all decorators.")

typed_base = typed_datatype(cls.__name__, tuple(fields))
all_bases = (typed_base,) + cls.__bases__

return type(cls.__name__, all_bases, dict(cls.__dict__))
# def typed_datatype(type_name, field_decls):
# """A wrapper over namedtuple which accepts a dict of field names and types.

return from_class
# This can be used to very concisely define classes which have fields that are
# type-checked at construction.
# """


class Collection(object):
Expand Down
Loading

0 comments on commit 9e8055f

Please sign in to comment.