-
Notifications
You must be signed in to change notification settings - Fork 126
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
first iteration - split more internal utilities out of the main package file #101
Closed
Closed
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
|
||
|
||
class TagTracer(object): | ||
def __init__(self): | ||
self._tag2proc = {} | ||
self.writer = None | ||
self.indent = 0 | ||
|
||
def get(self, name): | ||
return TagTracerSub(self, (name,)) | ||
|
||
def format_message(self, tags, args): | ||
if isinstance(args[-1], dict): | ||
extra = args[-1] | ||
args = args[:-1] | ||
else: | ||
extra = {} | ||
|
||
content = " ".join(map(str, args)) | ||
indent = " " * self.indent | ||
|
||
lines = [ | ||
"%s%s [%s]\n" % (indent, content, ":".join(tags)) | ||
] | ||
|
||
for name, value in extra.items(): | ||
lines.append("%s %s: %s\n" % (indent, name, value)) | ||
return lines | ||
|
||
def processmessage(self, tags, args): | ||
if self.writer is not None and args: | ||
lines = self.format_message(tags, args) | ||
self.writer(''.join(lines)) | ||
try: | ||
self._tag2proc[tags](tags, args) | ||
except KeyError: | ||
pass | ||
|
||
def setwriter(self, writer): | ||
self.writer = writer | ||
|
||
def setprocessor(self, tags, processor): | ||
if isinstance(tags, str): | ||
tags = tuple(tags.split(":")) | ||
else: | ||
assert isinstance(tags, tuple) | ||
self._tag2proc[tags] = processor | ||
|
||
|
||
class TagTracerSub(object): | ||
def __init__(self, root, tags): | ||
self.root = root | ||
self.tags = tags | ||
|
||
def __call__(self, *args): | ||
self.root.processmessage(self.tags, args) | ||
|
||
def setmyprocessor(self, processor): | ||
self.root.setprocessor(self.tags, processor) | ||
|
||
def get(self, name): | ||
return self.__class__(self.root, self.tags + (name,)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,257 @@ | ||
import warnings | ||
|
||
from .parse_funcs import varnames | ||
from .callers import _legacymulticall, _multicall, _Result | ||
|
||
|
||
class _HookRelay(object): | ||
""" hook holder object for performing 1:N hook calls where N is the number | ||
of registered plugins. | ||
|
||
""" | ||
|
||
def __init__(self, trace): | ||
self._trace = trace | ||
|
||
|
||
class _HookCaller(object): | ||
def __init__(self, name, hook_execute, specmodule_or_class=None, spec_opts=None): | ||
self.name = name | ||
self._wrappers = [] | ||
self._nonwrappers = [] | ||
self._hookexec = hook_execute | ||
self.argnames = None | ||
self.kwargnames = None | ||
self.multicall = _multicall | ||
if specmodule_or_class is not None: | ||
assert spec_opts is not None | ||
self.set_specification(specmodule_or_class, spec_opts) | ||
|
||
def has_spec(self): | ||
return hasattr(self, "_specmodule_or_class") | ||
|
||
def set_specification(self, specmodule_or_class, spec_opts): | ||
assert not self.has_spec() | ||
self._specmodule_or_class = specmodule_or_class | ||
specfunc = getattr(specmodule_or_class, self.name) | ||
# get spec arg signature | ||
argnames, self.kwargnames = varnames(specfunc) | ||
self.argnames = ["__multicall__"] + list(argnames) | ||
self.spec_opts = spec_opts | ||
if spec_opts.get("historic"): | ||
self._call_history = [] | ||
|
||
def is_historic(self): | ||
return hasattr(self, "_call_history") | ||
|
||
def _remove_plugin(self, plugin): | ||
def remove(wrappers): | ||
for i, method in enumerate(wrappers): | ||
if method.plugin == plugin: | ||
del wrappers[i] | ||
return True | ||
if remove(self._wrappers) is None: | ||
if remove(self._nonwrappers) is None: | ||
raise ValueError("plugin %r not found" % (plugin,)) | ||
|
||
def _add_hookimpl(self, hookimpl): | ||
"""A an implementation to the callback chain. | ||
""" | ||
if hookimpl.hookwrapper: | ||
methods = self._wrappers | ||
else: | ||
methods = self._nonwrappers | ||
|
||
if hookimpl.trylast: | ||
methods.insert(0, hookimpl) | ||
elif hookimpl.tryfirst: | ||
methods.append(hookimpl) | ||
else: | ||
# find last non-tryfirst method | ||
i = len(methods) - 1 | ||
while i >= 0 and methods[i].tryfirst: | ||
i -= 1 | ||
methods.insert(i + 1, hookimpl) | ||
|
||
if '__multicall__' in hookimpl.argnames: | ||
warnings.warn( | ||
"Support for __multicall__ is now deprecated and will be" | ||
"removed in an upcoming release.", | ||
DeprecationWarning | ||
) | ||
self.multicall = _legacymulticall | ||
|
||
def __repr__(self): | ||
return "<_HookCaller %r>" % (self.name,) | ||
|
||
def __call__(self, *args, **kwargs): | ||
if args: | ||
raise TypeError("hook calling supports only keyword arguments") | ||
assert not self.is_historic() | ||
if self.argnames: | ||
notincall = set(self.argnames) - set(['__multicall__']) - set( | ||
kwargs.keys()) | ||
if notincall: | ||
warnings.warn( | ||
"Argument(s) {0} which are declared in the hookspec " | ||
"can not be found in this hook call" | ||
.format(tuple(notincall)), | ||
stacklevel=2, | ||
) | ||
return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs) | ||
|
||
def call_historic(self, proc=None, kwargs=None): | ||
""" call the hook with given ``kwargs`` for all registered plugins and | ||
for all plugins which will be registered afterwards. | ||
|
||
If ``proc`` is not None it will be called for for each non-None result | ||
obtained from a hook implementation. | ||
""" | ||
self._call_history.append((kwargs or {}, proc)) | ||
# historizing hooks don't return results | ||
res = self._hookexec(self, self._nonwrappers + self._wrappers, kwargs) | ||
for x in res or []: | ||
proc(x) | ||
|
||
def call_extra(self, methods, kwargs): | ||
""" Call the hook with some additional temporarily participating | ||
methods using the specified kwargs as call parameters. """ | ||
old = list(self._nonwrappers), list(self._wrappers) | ||
for method in methods: | ||
opts = dict(hookwrapper=False, trylast=False, tryfirst=False) | ||
hookimpl = HookImpl(None, "<temp>", method, opts) | ||
self._add_hookimpl(hookimpl) | ||
try: | ||
return self(**kwargs) | ||
finally: | ||
self._nonwrappers, self._wrappers = old | ||
|
||
def _maybe_apply_history(self, method): | ||
"""Apply call history to a new hookimpl if it is marked as historic. | ||
""" | ||
if self.is_historic(): | ||
for kwargs, proc in self._call_history: | ||
res = self._hookexec(self, [method], kwargs) | ||
if res and proc is not None: | ||
proc(res[0]) | ||
|
||
|
||
class HookImpl(object): | ||
def __init__(self, plugin, plugin_name, function, hook_impl_opts): | ||
self.function = function | ||
self.argnames, self.kwargnames = varnames(self.function) | ||
self.plugin = plugin | ||
self.opts = hook_impl_opts | ||
self.plugin_name = plugin_name | ||
self.__dict__.update(hook_impl_opts) | ||
|
||
|
||
class HookspecMarker(object): | ||
""" Decorator helper class for marking functions as hook specifications. | ||
|
||
You can instantiate it with a project_name to get a decorator. | ||
Calling PluginManager.add_hookspecs later will discover all marked functions | ||
if the PluginManager uses the same project_name. | ||
""" | ||
|
||
def __init__(self, project_name): | ||
self.project_name = project_name | ||
|
||
def __call__(self, function=None, firstresult=False, historic=False): | ||
""" if passed a function, directly sets attributes on the function | ||
which will make it discoverable to add_hookspecs(). If passed no | ||
function, returns a decorator which can be applied to a function | ||
later using the attributes supplied. | ||
|
||
If firstresult is True the 1:N hook call (N being the number of registered | ||
hook implementation functions) will stop at I<=N when the I'th function | ||
returns a non-None result. | ||
|
||
If historic is True calls to a hook will be memorized and replayed | ||
on later registered plugins. | ||
|
||
""" | ||
def setattr_hookspec_opts(func): | ||
if historic and firstresult: | ||
raise ValueError("cannot have a historic firstresult hook") | ||
setattr(func, self.project_name + "_spec", | ||
dict(firstresult=firstresult, historic=historic)) | ||
return func | ||
|
||
if function is not None: | ||
return setattr_hookspec_opts(function) | ||
else: | ||
return setattr_hookspec_opts | ||
|
||
|
||
class HookimplMarker(object): | ||
""" Decorator helper class for marking functions as hook implementations. | ||
|
||
You can instantiate with a project_name to get a decorator. | ||
Calling PluginManager.register later will discover all marked functions | ||
if the PluginManager uses the same project_name. | ||
""" | ||
def __init__(self, project_name): | ||
self.project_name = project_name | ||
|
||
def __call__(self, function=None, hookwrapper=False, optionalhook=False, | ||
tryfirst=False, trylast=False): | ||
|
||
""" if passed a function, directly sets attributes on the function | ||
which will make it discoverable to register(). If passed no function, | ||
returns a decorator which can be applied to a function later using | ||
the attributes supplied. | ||
|
||
If optionalhook is True a missing matching hook specification will not result | ||
in an error (by default it is an error if no matching spec is found). | ||
|
||
If tryfirst is True this hook implementation will run as early as possible | ||
in the chain of N hook implementations for a specfication. | ||
|
||
If trylast is True this hook implementation will run as late as possible | ||
in the chain of N hook implementations. | ||
|
||
If hookwrapper is True the hook implementations needs to execute exactly | ||
one "yield". The code before the yield is run early before any non-hookwrapper | ||
function is run. The code after the yield is run after all non-hookwrapper | ||
function have run. The yield receives a ``_Result`` object representing | ||
the exception or result outcome of the inner calls (including other hookwrapper | ||
calls). | ||
|
||
""" | ||
def setattr_hookimpl_opts(func): | ||
setattr(func, self.project_name + "_impl", | ||
dict(hookwrapper=hookwrapper, optionalhook=optionalhook, | ||
tryfirst=tryfirst, trylast=trylast)) | ||
return func | ||
|
||
if function is None: | ||
return setattr_hookimpl_opts | ||
else: | ||
return setattr_hookimpl_opts(function) | ||
|
||
|
||
def normalize_hookimpl_opts(opts): | ||
opts.setdefault("tryfirst", False) | ||
opts.setdefault("trylast", False) | ||
opts.setdefault("hookwrapper", False) | ||
opts.setdefault("optionalhook", False) | ||
|
||
|
||
class _TracedHookExecution(object): | ||
def __init__(self, pluginmanager, before, after): | ||
self.pluginmanager = pluginmanager | ||
self.before = before | ||
self.after = after | ||
self.oldcall = pluginmanager._inner_hookexec | ||
assert not isinstance(self.oldcall, _TracedHookExecution) | ||
self.pluginmanager._inner_hookexec = self | ||
|
||
def __call__(self, hook, hook_impls, kwargs): | ||
self.before(hook.name, hook_impls, kwargs) | ||
outcome = _Result.from_call(lambda: self.oldcall(hook, hook_impls, kwargs)) | ||
self.after(outcome, hook.name, hook_impls, kwargs) | ||
return outcome.get_result() | ||
|
||
def undo(self): | ||
self.pluginmanager._inner_hookexec = self.oldcall |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
import inspect | ||
|
||
|
||
def varnames(func): | ||
"""Return tuple of positional and keywrord argument names for a function, | ||
method, class or callable. | ||
|
||
In case of a class, its ``__init__`` method is considered. | ||
For methods the ``self`` parameter is not included. | ||
""" | ||
cache = getattr(func, "__dict__", {}) | ||
try: | ||
return cache["_varnames"] | ||
except KeyError: | ||
pass | ||
|
||
if inspect.isclass(func): | ||
try: | ||
func = func.__init__ | ||
except AttributeError: | ||
return (), () | ||
elif not inspect.isroutine(func): # callable object? | ||
try: | ||
func = getattr(func, '__call__', func) | ||
except Exception: | ||
return () | ||
|
||
try: # func MUST be a function or method here or we won't parse any args | ||
spec = inspect.getargspec(func) | ||
except TypeError: | ||
return (), () | ||
|
||
args, defaults = tuple(spec.args), spec.defaults | ||
if defaults: | ||
index = -len(defaults) | ||
args, defaults = args[:index], tuple(args[index:]) | ||
else: | ||
defaults = () | ||
|
||
# strip any implicit instance arg | ||
if args: | ||
if inspect.ismethod(func) or ( | ||
'.' in getattr(func, '__qualname__', ()) and args[0] == 'self' | ||
): | ||
args = args[1:] | ||
|
||
assert "self" not in args # best naming practises check? | ||
try: | ||
cache["_varnames"] = args, defaults | ||
except TypeError: | ||
pass | ||
return args, defaults | ||
|
||
|
||
if hasattr(inspect, 'signature'): | ||
def _formatdef(func): | ||
return "%s%s" % ( | ||
func.__name__, | ||
str(inspect.signature(func)) | ||
) | ||
else: | ||
def _formatdef(func): | ||
return "%s%s" % ( | ||
func.__name__, | ||
inspect.formatargspec(*inspect.getargspec(func)) | ||
) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was actually thinking maybe just make this a
utils.py
mod since it's only got the one function and we might later want a place to add miscellaneous stuff.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in general - having a utils module/package is a antipattern ^^ - its just a utterly bad name because it caries no meaning