diff --git a/.gitignore b/.gitignore old mode 100644 new mode 100755 index 7564492..6660006 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,7 @@ -.idea/ *pyc + +build/ + +.idea/ +.tmp/ + diff --git a/LICENSE b/LICENSE old mode 100644 new mode 100755 diff --git a/README.md b/README.md old mode 100644 new mode 100755 index 0f8293a..27cda9b --- a/README.md +++ b/README.md @@ -1,4 +1,6 @@ -### [Alfred Workflow](https://www.alfredapp.com/workflows/) to generate random values for different data types 🎲️ +## ALFRED-RANDOMER + +[Alfred Workflow](https://www.alfredapp.com/workflows/) to generate random values for different data types 🎲️ ![all](./img/screenshots/all.png) diff --git a/alfred-randomer b/alfred-randomer old mode 100644 new mode 100755 diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..b7298c8 --- /dev/null +++ b/build.sh @@ -0,0 +1,4 @@ +poetry run pyinstaller --onefile --paths ./src/ src/main.py + +rm -rf ./build +rm *.spec diff --git a/dist/main b/dist/main new file mode 100755 index 0000000..fbe4b53 Binary files /dev/null and b/dist/main differ diff --git a/icon.png b/icon.png index 20a4145..a663ec5 100644 Binary files a/icon.png and b/icon.png differ diff --git a/img/screenshots/all.png b/img/screenshots/all.png old mode 100644 new mode 100755 diff --git a/img/screenshots/imei.png b/img/screenshots/imei.png old mode 100644 new mode 100755 diff --git a/info.plist b/info.plist index 1338358..386aee1 100644 --- a/info.plist +++ b/info.plist @@ -8,11 +8,11 @@ Tools connections - EC765E5B-D5EC-4893-B319-32CDD2BC7C5C + FE37874C-BB78-4D05-8F02-EA114E8BA312 destinationuid - 8C2A867D-8342-47CC-B605-C5A3D7740379 + 88A52F5F-CC59-4060-9DCC-796BEDC66EEE modifiers 0 modifiersubtext @@ -47,7 +47,7 @@ type alfred.workflow.output.clipboard uid - 8C2A867D-8342-47CC-B605-C5A3D7740379 + 88A52F5F-CC59-4060-9DCC-796BEDC66EEE version 3 @@ -71,15 +71,15 @@ queuedelaycustom 3 queuedelayimmediatelyinitially - + queuedelaymode 0 queuemode - 1 + 2 runningsubtext - ... + script - python3 ./main.py $@ + ./dist/main $@ scriptargtype 1 scriptfile @@ -87,7 +87,7 @@ subtext title - + Generate random values for different data types type 0 withspace @@ -96,7 +96,7 @@ type alfred.workflow.input.scriptfilter uid - EC765E5B-D5EC-4893-B319-32CDD2BC7C5C + FE37874C-BB78-4D05-8F02-EA114E8BA312 version 3 @@ -105,23 +105,25 @@ Generate random values for different data types 🎲️ uidata - 8C2A867D-8342-47CC-B605-C5A3D7740379 + 88A52F5F-CC59-4060-9DCC-796BEDC66EEE xpos - 305 + 280 ypos - 40 + 15 - EC765E5B-D5EC-4893-B319-32CDD2BC7C5C + FE37874C-BB78-4D05-8F02-EA114E8BA312 xpos - 55 + 30 ypos - 40 + 15 + userconfigurationconfig + version - 1.2 + 1.3.0 webaddress https://github.com/fedecalendino/alfred-randomer diff --git a/poetry.lock b/poetry.lock new file mode 100644 index 0000000..910337a --- /dev/null +++ b/poetry.lock @@ -0,0 +1,209 @@ +[[package]] +name = "alfred-pyflow" +version = "0.2.0" +description = "Minimal library for the development of Alfred Workflows." +category = "main" +optional = false +python-versions = ">=3.8,<4.0" + +[[package]] +name = "altgraph" +version = "0.17.2" +description = "Python graph (network) package" +category = "dev" +optional = false +python-versions = "*" + +[[package]] +name = "black" +version = "22.6.0" +description = "The uncompromising code formatter." +category = "dev" +optional = false +python-versions = ">=3.6.2" + +[package.dependencies] +click = ">=8.0.0" +mypy-extensions = ">=0.4.3" +pathspec = ">=0.9.0" +platformdirs = ">=2" +tomli = {version = ">=1.1.0", markers = "python_full_version < \"3.11.0a7\""} +typing-extensions = {version = ">=3.10.0.0", markers = "python_version < \"3.10\""} + +[package.extras] +colorama = ["colorama (>=0.4.3)"] +d = ["aiohttp (>=3.7.4)"] +jupyter = ["ipython (>=7.8.0)", "tokenize-rt (>=3.2.0)"] +uvloop = ["uvloop (>=0.15.2)"] + +[[package]] +name = "click" +version = "8.1.3" +description = "Composable command line interface toolkit" +category = "dev" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +colorama = {version = "*", markers = "platform_system == \"Windows\""} + +[[package]] +name = "colorama" +version = "0.4.5" +description = "Cross-platform colored terminal text." +category = "dev" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" + +[[package]] +name = "coverage" +version = "6.4.2" +description = "Code coverage measurement for Python" +category = "dev" +optional = false +python-versions = ">=3.7" + +[package.extras] +toml = ["tomli"] + +[[package]] +name = "ddt" +version = "1.5.0" +description = "Data-Driven/Decorated Tests" +category = "dev" +optional = false +python-versions = "*" + +[[package]] +name = "future" +version = "0.18.2" +description = "Clean single-source support for Python 3 and 2" +category = "dev" +optional = false +python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" + +[[package]] +name = "macholib" +version = "1.16" +description = "Mach-O header analysis and editing" +category = "dev" +optional = false +python-versions = "*" + +[package.dependencies] +altgraph = ">=0.15" + +[[package]] +name = "mypy-extensions" +version = "0.4.3" +description = "Experimental type system extensions for programs checked with the mypy typechecker." +category = "dev" +optional = false +python-versions = "*" + +[[package]] +name = "pathspec" +version = "0.9.0" +description = "Utility library for gitignore style pattern matching of file paths." +category = "dev" +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.7" + +[[package]] +name = "pefile" +version = "2022.5.30" +description = "Python PE parsing module" +category = "dev" +optional = false +python-versions = ">=3.6.0" + +[package.dependencies] +future = "*" + +[[package]] +name = "platformdirs" +version = "2.5.2" +description = "A small Python module for determining appropriate platform-specific dirs, e.g. a \"user data dir\"." +category = "dev" +optional = false +python-versions = ">=3.7" + +[package.extras] +docs = ["furo (>=2021.7.5b38)", "proselint (>=0.10.2)", "sphinx-autodoc-typehints (>=1.12)", "sphinx (>=4)"] +test = ["appdirs (==1.4.4)", "pytest-cov (>=2.7)", "pytest-mock (>=3.6)", "pytest (>=6)"] + +[[package]] +name = "pyinstaller" +version = "5.2" +description = "PyInstaller bundles a Python application and all its dependencies into a single package." +category = "dev" +optional = false +python-versions = "<3.11,>=3.7" + +[package.dependencies] +altgraph = "*" +macholib = {version = ">=1.8", markers = "sys_platform == \"darwin\""} +pefile = {version = ">=2022.5.30", markers = "sys_platform == \"win32\""} +pyinstaller-hooks-contrib = ">=2021.4" +pywin32-ctypes = {version = ">=0.2.0", markers = "sys_platform == \"win32\""} + +[package.extras] +encryption = ["tinyaes (>=1.0.0)"] +hook_testing = ["pytest (>=2.7.3)", "execnet (>=1.5.0)", "psutil"] + +[[package]] +name = "pyinstaller-hooks-contrib" +version = "2022.8" +description = "Community maintained hooks for PyInstaller" +category = "dev" +optional = false +python-versions = ">=3.7" + +[[package]] +name = "pywin32-ctypes" +version = "0.2.0" +description = "" +category = "dev" +optional = false +python-versions = "*" + +[[package]] +name = "tomli" +version = "2.0.1" +description = "A lil' TOML parser" +category = "dev" +optional = false +python-versions = ">=3.7" + +[[package]] +name = "typing-extensions" +version = "4.3.0" +description = "Backported and Experimental Type Hints for Python 3.7+" +category = "dev" +optional = false +python-versions = ">=3.7" + +[metadata] +lock-version = "1.1" +python-versions = ">=3.8,<3.11" +content-hash = "7d39aea303f4516890cf3536edcee0bd07ff020a3900f3b93692f08e74f2205b" + +[metadata.files] +alfred-pyflow = [] +altgraph = [] +black = [] +click = [] +colorama = [] +coverage = [] +ddt = [] +future = [] +macholib = [] +mypy-extensions = [] +pathspec = [] +pefile = [] +platformdirs = [] +pyinstaller = [] +pyinstaller-hooks-contrib = [] +pywin32-ctypes = [] +tomli = [] +typing-extensions = [] diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..9a9d687 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,25 @@ +[tool.poetry] +name = "Randomer" +version = "1.3.0" +description = "Generate random values for different data types" +documentation = "https://github.com/fedecalendino/alfred-randomer/blob/main/README.md" +homepage = "https://github.com/fedecalendino/alfred-randomer" +license = "MIT" + +authors = [ + "Fede Calendino " +] + +[tool.poetry.dependencies] +python = ">=3.8,<3.11" +alfred-pyflow = "latest" + +[tool.poetry.dev-dependencies] +black = "latest" +coverage = "latest" +ddt = "latest" +pyinstaller = "latest" + +[build-system] +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api" diff --git a/workflow/.alfredversionchecked b/src/__init__.py similarity index 100% rename from workflow/.alfredversionchecked rename to src/__init__.py diff --git a/generators.py b/src/generators.py similarity index 94% rename from generators.py rename to src/generators.py index 77d9549..48ca970 100644 --- a/generators.py +++ b/src/generators.py @@ -43,7 +43,7 @@ def random_unit_number(): ] values = list(map(lambda d: int(LETTER_VALUES.get(d, d)), number)) - checksum = sum(d * 2 ** i for i, d in enumerate(values)) % 11 + checksum = sum(d * 2**i for i, d in enumerate(values)) % 11 if checksum > 9: return random_unit_number() diff --git a/main.py b/src/main.py similarity index 86% rename from main.py rename to src/main.py index eadd653..c82756b 100644 --- a/main.py +++ b/src/main.py @@ -1,7 +1,6 @@ -# coding=utf-8 import sys -from workflow import Workflow +from pyflow import Workflow import generators @@ -25,7 +24,7 @@ def main(workflow): for name in items: value = GENERATORS[name]() - workflow.add_item( + workflow.new_item( title=value, subtitle=name, arg=value, @@ -33,7 +32,7 @@ def main(workflow): ) -if __name__ == u"__main__": +if __name__ == "__main__": wf = Workflow() wf.run(main) wf.send_feedback() diff --git a/workflow/Notify.tgz b/workflow/Notify.tgz deleted file mode 100644 index 174e9a7..0000000 Binary files a/workflow/Notify.tgz and /dev/null differ diff --git a/workflow/__init__.py b/workflow/__init__.py deleted file mode 100644 index f93fb60..0000000 --- a/workflow/__init__.py +++ /dev/null @@ -1,104 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -# -# Copyright (c) 2014 Dean Jackson -# -# MIT Licence. See http://opensource.org/licenses/MIT -# -# Created on 2014-02-15 -# - -"""A helper library for `Alfred `_ workflows.""" - -import os - -# Filter matching rules -# Icons -# Exceptions -# Workflow objects -from .workflow import ( - ICON_ACCOUNT, - ICON_BURN, - ICON_CLOCK, - ICON_COLOR, - ICON_COLOUR, - ICON_EJECT, - ICON_ERROR, - ICON_FAVORITE, - ICON_FAVOURITE, - ICON_GROUP, - ICON_HELP, - ICON_HOME, - ICON_INFO, - ICON_NETWORK, - ICON_NOTE, - ICON_SETTINGS, - ICON_SWIRL, - ICON_SWITCH, - ICON_SYNC, - ICON_TRASH, - ICON_USER, - ICON_WARNING, - ICON_WEB, - MATCH_ALL, - MATCH_ALLCHARS, - MATCH_ATOM, - MATCH_CAPITALS, - MATCH_INITIALS, - MATCH_INITIALS_CONTAIN, - MATCH_INITIALS_STARTSWITH, - MATCH_STARTSWITH, - MATCH_SUBSTRING, - KeychainError, - PasswordNotFound, - Workflow, - manager, -) -from .workflow3 import Variables, Workflow3 - -__title__ = "Alfred-Workflow" -__version__ = open(os.path.join(os.path.dirname(__file__), "version")).read() -__author__ = "Dean Jackson" -__licence__ = "MIT" -__copyright__ = "Copyright 2014-2019 Dean Jackson" - -__all__ = [ - "Variables", - "Workflow", - "Workflow3", - "manager", - "PasswordNotFound", - "KeychainError", - "ICON_ACCOUNT", - "ICON_BURN", - "ICON_CLOCK", - "ICON_COLOR", - "ICON_COLOUR", - "ICON_EJECT", - "ICON_ERROR", - "ICON_FAVORITE", - "ICON_FAVOURITE", - "ICON_GROUP", - "ICON_HELP", - "ICON_HOME", - "ICON_INFO", - "ICON_NETWORK", - "ICON_NOTE", - "ICON_SETTINGS", - "ICON_SWIRL", - "ICON_SWITCH", - "ICON_SYNC", - "ICON_TRASH", - "ICON_USER", - "ICON_WARNING", - "ICON_WEB", - "MATCH_ALL", - "MATCH_ALLCHARS", - "MATCH_ATOM", - "MATCH_CAPITALS", - "MATCH_INITIALS", - "MATCH_INITIALS_CONTAIN", - "MATCH_INITIALS_STARTSWITH", - "MATCH_STARTSWITH", - "MATCH_SUBSTRING", -] diff --git a/workflow/background.py b/workflow/background.py deleted file mode 100644 index bed4c15..0000000 --- a/workflow/background.py +++ /dev/null @@ -1,293 +0,0 @@ -# encoding: utf-8 -# -# Copyright (c) 2014 deanishe@deanishe.net -# -# MIT Licence. See http://opensource.org/licenses/MIT -# -# Created on 2014-04-06 -# - -"""This module provides an API to run commands in background processes. - -Combine with the :ref:`caching API ` to work from cached data -while you fetch fresh data in the background. - -See :ref:`the User Manual ` for more information -and examples. -""" - - -import os -import pickle -import signal -import subprocess -import sys - -from workflow import Workflow - -__all__ = ["is_running", "run_in_background"] - -_wf = None - - -def wf(): - global _wf - if _wf is None: - _wf = Workflow() - return _wf - - -def _log(): - return wf().logger - - -def _arg_cache(name): - """Return path to pickle cache file for arguments. - - :param name: name of task - :type name: ``unicode`` - :returns: Path to cache file - :rtype: ``unicode`` filepath - - """ - return wf().cachefile(name + ".argcache") - - -def _pid_file(name): - """Return path to PID file for ``name``. - - :param name: name of task - :type name: ``unicode`` - :returns: Path to PID file for task - :rtype: ``unicode`` filepath - - """ - return wf().cachefile(name + ".pid") - - -def _process_exists(pid): - """Check if a process with PID ``pid`` exists. - - :param pid: PID to check - :type pid: ``int`` - :returns: ``True`` if process exists, else ``False`` - :rtype: ``Boolean`` - - """ - try: - os.kill(pid, 0) - except OSError: # not running - return False - return True - - -def _job_pid(name): - """Get PID of job or `None` if job does not exist. - - Args: - name (str): Name of job. - - Returns: - int: PID of job process (or `None` if job doesn't exist). - """ - pidfile = _pid_file(name) - if not os.path.exists(pidfile): - return - - with open(pidfile, "rb") as fp: - read = fp.read() - print(str(read)) - pid = int.from_bytes(read, sys.byteorder) - print(pid) - - if _process_exists(pid): - return pid - - os.unlink(pidfile) - - -def is_running(name): - """Test whether task ``name`` is currently running. - - :param name: name of task - :type name: unicode - :returns: ``True`` if task with name ``name`` is running, else ``False`` - :rtype: bool - - """ - if _job_pid(name) is not None: - return True - - return False - - -def _background( - pidfile, stdin="/dev/null", stdout="/dev/null", stderr="/dev/null" -): # pragma: no cover - """Fork the current process into a background daemon. - - :param pidfile: file to write PID of daemon process to. - :type pidfile: filepath - :param stdin: where to read input - :type stdin: filepath - :param stdout: where to write stdout output - :type stdout: filepath - :param stderr: where to write stderr output - :type stderr: filepath - - """ - - def _fork_and_exit_parent(errmsg, wait=False, write=False): - try: - pid = os.fork() - if pid > 0: - if write: # write PID of child process to `pidfile` - tmp = pidfile + ".tmp" - with open(tmp, "wb") as fp: - fp.write(pid.to_bytes(4, sys.byteorder)) - os.rename(tmp, pidfile) - if wait: # wait for child process to exit - os.waitpid(pid, 0) - os._exit(0) - except OSError as err: - _log().critical("%s: (%d) %s", errmsg, err.errno, err.strerror) - raise err - - # Do first fork and wait for second fork to finish. - _fork_and_exit_parent("fork #1 failed", wait=True) - - # Decouple from parent environment. - os.chdir(wf().workflowdir) - os.setsid() - - # Do second fork and write PID to pidfile. - _fork_and_exit_parent("fork #2 failed", write=True) - - # Now I am a daemon! - # Redirect standard file descriptors. - si = open(stdin, "r", 1) - so = open(stdout, "a+", 1) - se = open(stderr, "a+", 1) - if hasattr(sys.stdin, "fileno"): - os.dup2(si.fileno(), sys.stdin.fileno()) - if hasattr(sys.stdout, "fileno"): - os.dup2(so.fileno(), sys.stdout.fileno()) - if hasattr(sys.stderr, "fileno"): - os.dup2(se.fileno(), sys.stderr.fileno()) - - -def kill(name, sig=signal.SIGTERM): - """Send a signal to job ``name`` via :func:`os.kill`. - - .. versionadded:: 1.29 - - Args: - name (str): Name of the job - sig (int, optional): Signal to send (default: SIGTERM) - - Returns: - bool: `False` if job isn't running, `True` if signal was sent. - """ - pid = _job_pid(name) - if pid is None: - return False - - os.kill(pid, sig) - return True - - -def run_in_background(name, args, **kwargs): - r"""Cache arguments then call this script again via :func:`subprocess.call`. - - :param name: name of job - :type name: unicode - :param args: arguments passed as first argument to :func:`subprocess.call` - :param \**kwargs: keyword arguments to :func:`subprocess.call` - :returns: exit code of sub-process - :rtype: int - - When you call this function, it caches its arguments and then calls - ``background.py`` in a subprocess. The Python subprocess will load the - cached arguments, fork into the background, and then run the command you - specified. - - This function will return as soon as the ``background.py`` subprocess has - forked, returning the exit code of *that* process (i.e. not of the command - you're trying to run). - - If that process fails, an error will be written to the log file. - - If a process is already running under the same name, this function will - return immediately and will not run the specified command. - - """ - if is_running(name): - _log().info("[%s] job already running", name) - return - - argcache = _arg_cache(name) - - # Cache arguments - with open(argcache, "wb") as fp: - pickle.dump({"args": args, "kwargs": kwargs}, fp) - _log().debug("[%s] command cached: %s", name, argcache) - - # Call this script - cmd = [sys.executable, "-m", "workflow.background", name] - _log().debug("[%s] passing job to background runner: %r", name, cmd) - retcode = subprocess.call(cmd, env={"PYTHONPATH": ":".join(sys.path)}) - - if retcode: # pragma: no cover - _log().error("[%s] background runner failed with %d", name, retcode) - else: - _log().debug("[%s] background job started", name) - - return retcode - - -def main(wf): # pragma: no cover - """Run command in a background process. - - Load cached arguments, fork into background, then call - :meth:`subprocess.call` with cached arguments. - - """ - log = wf.logger - name = wf.args[0] - argcache = _arg_cache(name) - if not os.path.exists(argcache): - msg = "[{0}] command cache not found: {1}".format(name, argcache) - log.critical(msg) - raise IOError(msg) - - # Fork to background and run command - pidfile = _pid_file(name) - _background(pidfile) - - # Load cached arguments - with open(argcache, "rb") as fp: - data = pickle.load(fp) - - # Cached arguments - args = data["args"] - kwargs = data["kwargs"] - - # Delete argument cache file - os.unlink(argcache) - - try: - # Run the command - log.debug("[%s] running command: %r", name, args) - - retcode = subprocess.call(args, **kwargs) - - if retcode: - log.error("[%s] command failed with status %d", name, retcode) - finally: - os.unlink(pidfile) - - log.debug("[%s] job complete", name) - - -if __name__ == "__main__": # pragma: no cover - wf().run(main) diff --git a/workflow/notify.py b/workflow/notify.py deleted file mode 100644 index fa582f6..0000000 --- a/workflow/notify.py +++ /dev/null @@ -1,344 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -# -# Copyright (c) 2015 deanishe@deanishe.net -# -# MIT Licence. See http://opensource.org/licenses/MIT -# -# Created on 2015-11-26 -# - -# TODO: Exclude this module from test and code coverage in py2.6 - -""" -Post notifications via the macOS Notification Center. - -This feature is only available on Mountain Lion (10.8) and later. -It will silently fail on older systems. - -The main API is a single function, :func:`~workflow.notify.notify`. - -It works by copying a simple application to your workflow's data -directory. It replaces the application's icon with your workflow's -icon and then calls the application to post notifications. -""" - - -import os -import plistlib -import shutil -import subprocess -import sys -import tarfile -import tempfile -import uuid -from typing import List - -from . import workflow - -_wf = None -_log = None - - -#: Available system sounds from System Preferences > Sound > Sound Effects -SOUNDS = ( - "Basso", - "Blow", - "Bottle", - "Frog", - "Funk", - "Glass", - "Hero", - "Morse", - "Ping", - "Pop", - "Purr", - "Sosumi", - "Submarine", - "Tink", -) - - -def wf(): - """Return Workflow object for this module. - - Returns: - workflow.Workflow: Workflow object for current workflow. - """ - global _wf - if _wf is None: - _wf = workflow.Workflow() - return _wf - - -def log(): - """Return logger for this module. - - Returns: - logging.Logger: Logger for this module. - """ - global _log - if _log is None: - _log = wf().logger - return _log - - -def notifier_program(): - """Return path to notifier applet executable. - - Returns: - unicode: Path to Notify.app ``applet`` executable. - """ - return wf().datafile("Notify.app/Contents/MacOS/applet") - - -def notifier_icon_path(): - """Return path to icon file in installed Notify.app. - - Returns: - unicode: Path to ``applet.icns`` within the app bundle. - """ - return wf().datafile("Notify.app/Contents/Resources/applet.icns") - - -def install_notifier(): - """Extract ``Notify.app`` from the workflow to data directory. - - Changes the bundle ID of the installed app and gives it the - workflow's icon. - """ - archive = os.path.join(os.path.dirname(__file__), "Notify.tgz") - destdir = wf().datadir - app_path = os.path.join(destdir, "Notify.app") - n = notifier_program() - log().debug("installing Notify.app to %r ...", destdir) - # z = zipfile.ZipFile(archive, 'r') - # z.extractall(destdir) - tgz = tarfile.open(archive, "r:gz") - tgz.extractall(destdir) - if not os.path.exists(n): # pragma: nocover - raise RuntimeError("Notify.app could not be installed in " + destdir) - - # Replace applet icon - icon = notifier_icon_path() - workflow_icon = wf().workflowfile("icon.png") - if os.path.exists(icon): - os.unlink(icon) - - png_to_icns(workflow_icon, icon) - - # Set file icon - # PyObjC isn't available for 2.6, so this is 2.7 only. Actually, - # none of this code will "work" on pre-10.8 systems. Let it run - # until I figure out a better way of excluding this module - # from coverage in py2.6. - if sys.version_info >= (2, 7): # pragma: no cover - from AppKit import NSImage, NSWorkspace - - ws = NSWorkspace.sharedWorkspace() - img = NSImage.alloc().init() - img.initWithContentsOfFile_(icon) - ws.setIcon_forFile_options_(img, app_path, 0) - - # Change bundle ID of installed app - ip_path = os.path.join(app_path, "Contents/Info.plist") - bundle_id = "{0}.{1}".format(wf().bundleid, uuid.uuid4().hex) - data = plistlib.readPlist(ip_path) - log().debug("changing bundle ID to %r", bundle_id) - data["CFBundleIdentifier"] = bundle_id - plistlib.writePlist(data, ip_path) - - -def validate_sound(sound): - """Coerce ``sound`` to valid sound name. - - Returns ``None`` for invalid sounds. Sound names can be found - in ``System Preferences > Sound > Sound Effects``. - - Args: - sound (str): Name of system sound. - - Returns: - str: Proper name of sound or ``None``. - """ - if not sound: - return None - - # Case-insensitive comparison of `sound` - if sound.lower() in [s.lower() for s in SOUNDS]: - # Title-case is correct for all system sounds as of macOS 10.11 - return sound.title() - return None - - -def notify(title="", text="", sound=None): - """Post notification via Notify.app helper. - - Args: - title (str, optional): Notification title. - text (str, optional): Notification body text. - sound (str, optional): Name of sound to play. - - Raises: - ValueError: Raised if both ``title`` and ``text`` are empty. - - Returns: - bool: ``True`` if notification was posted, else ``False``. - """ - if title == text == "": - raise ValueError("Empty notification") - - sound = validate_sound(sound) or "" - - n = notifier_program() - - if not os.path.exists(n): - install_notifier() - - env = os.environ.copy() - enc = "utf-8" - env["NOTIFY_TITLE"] = title.encode(enc) - env["NOTIFY_MESSAGE"] = text.encode(enc) - env["NOTIFY_SOUND"] = sound.encode(enc) - cmd = [n] - retcode = subprocess.call(cmd, env=env) - if retcode == 0: - return True - - log().error("Notify.app exited with status {0}.".format(retcode)) - return False - - -def usr_bin_env(*args: str) -> List[str]: - return ["/usr/bin/env", f'PATH={os.environ["PATH"]}'] + list(args) - - -def convert_image(inpath, outpath, size): - """Convert an image file using ``sips``. - - Args: - inpath (str): Path of source file. - outpath (str): Path to destination file. - size (int): Width and height of destination image in pixels. - - Raises: - RuntimeError: Raised if ``sips`` exits with non-zero status. - """ - cmd = ["sips", "-z", str(size), str(size), inpath, "--out", outpath] - # log().debug(cmd) - with open(os.devnull, "w") as pipe: - retcode = subprocess.call( - cmd, shell=True, stdout=pipe, stderr=subprocess.STDOUT - ) - - if retcode != 0: - raise RuntimeError("sips exited with %d" % retcode) - - -def png_to_icns(png_path, icns_path): - """Convert PNG file to ICNS using ``iconutil``. - - Create an iconset from the source PNG file. Generate PNG files - in each size required by macOS, then call ``iconutil`` to turn - them into a single ICNS file. - - Args: - png_path (str): Path to source PNG file. - icns_path (str): Path to destination ICNS file. - - Raises: - RuntimeError: Raised if ``iconutil`` or ``sips`` fail. - """ - tempdir = tempfile.mkdtemp(prefix="aw-", dir=wf().datadir) - - try: - iconset = os.path.join(tempdir, "Icon.iconset") - - if os.path.exists(iconset): # pragma: nocover - raise RuntimeError("iconset already exists: " + iconset) - - os.makedirs(iconset) - - # Copy source icon to icon set and generate all the other - # sizes needed - configs = [] - for i in (16, 32, 128, 256, 512): - configs.append(("icon_{0}x{0}.png".format(i), i)) - configs.append((("icon_{0}x{0}@2x.png".format(i), i * 2))) - - shutil.copy(png_path, os.path.join(iconset, "icon_256x256.png")) - shutil.copy(png_path, os.path.join(iconset, "icon_128x128@2x.png")) - - for name, size in configs: - outpath = os.path.join(iconset, name) - if os.path.exists(outpath): - continue - convert_image(png_path, outpath, size) - - cmd = ["iconutil", "-c", "icns", "-o", icns_path, iconset] - - retcode = subprocess.call(cmd) - if retcode != 0: - raise RuntimeError("iconset exited with %d" % retcode) - - if not os.path.exists(icns_path): # pragma: nocover - raise ValueError("generated ICNS file not found: " + repr(icns_path)) - finally: - try: - shutil.rmtree(tempdir) - except OSError: # pragma: no cover - pass - - -if __name__ == "__main__": # pragma: nocover - # Simple command-line script to test module with - # This won't work on 2.6, as `argparse` isn't available - # by default. - import argparse - from unicodedata import normalize - - def ustr(s): - """Coerce `s` to normalised Unicode.""" - return normalize("NFD", s.decode("utf-8")) - - p = argparse.ArgumentParser() - p.add_argument("-p", "--png", help="PNG image to convert to ICNS.") - p.add_argument( - "-l", "--list-sounds", help="Show available sounds.", action="store_true" - ) - p.add_argument("-t", "--title", help="Notification title.", type=ustr, default="") - p.add_argument( - "-s", "--sound", type=ustr, help="Optional notification sound.", default="" - ) - p.add_argument( - "text", type=ustr, help="Notification body text.", default="", nargs="?" - ) - o = p.parse_args() - - # List available sounds - if o.list_sounds: - for sound in SOUNDS: - print(sound) - sys.exit(0) - - # Convert PNG to ICNS - if o.png: - icns = os.path.join( - os.path.dirname(o.png), - os.path.splitext(os.path.basename(o.png))[0] + ".icns", - ) - - print("converting {0!r} to {1!r} ...".format(o.png, icns), file=sys.stderr) - - if os.path.exists(icns): - raise ValueError("destination file already exists: " + icns) - - png_to_icns(o.png, icns) - sys.exit(0) - - # Post notification - if o.title == o.text == "": - print("ERROR: empty notification.", file=sys.stderr) - sys.exit(1) - else: - notify(o.title, o.text, o.sound) diff --git a/workflow/update.py b/workflow/update.py deleted file mode 100644 index 643681b..0000000 --- a/workflow/update.py +++ /dev/null @@ -1,584 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -# -# Copyright (c) 2014 Fabio Niephaus , -# Dean Jackson -# -# MIT Licence. See http://opensource.org/licenses/MIT -# -# Created on 2014-08-16 -# - -"""Self-updating from GitHub. - -.. versionadded:: 1.9 - -.. note:: - - This module is not intended to be used directly. Automatic updates - are controlled by the ``update_settings`` :class:`dict` passed to - :class:`~workflow.workflow.Workflow` objects. - -""" - - -import json -import os -import re -import subprocess -import tempfile -from collections import defaultdict -from functools import total_ordering -from itertools import zip_longest - -import requests - -from workflow.util import atomic_writer - -from . import workflow - -# __all__ = [] - - -RELEASES_BASE = "https://api.github.com/repos/{}/releases" -match_workflow = re.compile(r"\.alfred(\d+)?workflow$").search - -_wf = None - - -def wf(): - """Lazy `Workflow` object.""" - global _wf - if _wf is None: - _wf = workflow.Workflow() - return _wf - - -@total_ordering -class Download(object): - """A workflow file that is available for download. - - .. versionadded: 1.37 - - Attributes: - url (str): URL of workflow file. - filename (str): Filename of workflow file. - version (Version): Semantic version of workflow. - prerelease (bool): Whether version is a pre-release. - alfred_version (Version): Minimum compatible version - of Alfred. - - """ - - @classmethod - def from_dict(cls, d): - """Create a `Download` from a `dict`.""" - return cls( - url=d["url"], - filename=d["filename"], - version=Version(d["version"]), - prerelease=d["prerelease"], - ) - - @classmethod - def from_releases(cls, js): - """Extract downloads from GitHub releases. - - Searches releases with semantic tags for assets with - file extension .alfredworkflow or .alfredXworkflow where - X is a number. - - Files are returned sorted by latest version first. Any - releases containing multiple files with the same (workflow) - extension are rejected as ambiguous. - - Args: - js (str): JSON response from GitHub's releases endpoint. - - Returns: - list: Sequence of `Download`. - """ - releases = json.loads(js) - downloads = [] - for release in releases: - tag = release["tag_name"] - dupes = defaultdict(int) - try: - version = Version(tag) - except ValueError as err: - wf().logger.debug('ignored release: bad version "%s": %s', tag, err) - continue - - dls = [] - for asset in release.get("assets", []): - url = asset.get("browser_download_url") - filename = os.path.basename(url) - m = match_workflow(filename) - if not m: - wf().logger.debug("unwanted file: %s", filename) - continue - - ext = m.group(0) - dupes[ext] = dupes[ext] + 1 - dls.append(Download(url, filename, version, release["prerelease"])) - - valid = True - for ext, n in list(dupes.items()): - if n > 1: - wf().logger.debug( - 'ignored release "%s": multiple assets ' 'with extension "%s"', - tag, - ext, - ) - valid = False - break - - if valid: - downloads.extend(dls) - - downloads.sort(reverse=True) - return downloads - - def __init__(self, url, filename, version, prerelease=False): - """Create a new Download. - - Args: - url (str): URL of workflow file. - filename (str): Filename of workflow file. - version (Version): Version of workflow. - prerelease (bool, optional): Whether version is - pre-release. Defaults to False. - - """ - if isinstance(version, str): - version = Version(version) - - self.url = url - self.filename = filename - self.version = version - self.prerelease = prerelease - - @property - def alfred_version(self): - """Minimum Alfred version based on filename extension.""" - m = match_workflow(self.filename) - if not m or not m.group(1): - return Version("0") - return Version(m.group(1)) - - @property - def dict(self): - """Convert `Download` to `dict`.""" - return dict( - url=self.url, - filename=self.filename, - version=str(self.version), - prerelease=self.prerelease, - ) - - def __str__(self): - """Format `Download` for printing.""" - return ( - "Download(" - "url={dl.url!r}, " - "filename={dl.filename!r}, " - "version={dl.version!r}, " - "prerelease={dl.prerelease!r}" - ")" - ).format(dl=self) - - def __repr__(self): - """Code-like representation of `Download`.""" - return str(self) - - def __eq__(self, other): - """Compare Downloads based on version numbers.""" - if ( - self.url != other.url - or self.filename != other.filename - or self.version != other.version - or self.prerelease != other.prerelease - ): - return False - return True - - def __ne__(self, other): - """Compare Downloads based on version numbers.""" - return not self.__eq__(other) - - def __lt__(self, other): - """Compare Downloads based on version numbers.""" - if self.version != other.version: - return self.version < other.version - return self.alfred_version < other.alfred_version - - -class Version(object): - """Mostly semantic versioning. - - The main difference to proper :ref:`semantic versioning ` - is that this implementation doesn't require a minor or patch version. - - Version strings may also be prefixed with "v", e.g.: - - >>> v = Version('v1.1.1') - >>> v.tuple - (1, 1, 1, '') - - >>> v = Version('2.0') - >>> v.tuple - (2, 0, 0, '') - - >>> Version('3.1-beta').tuple - (3, 1, 0, 'beta') - - >>> Version('1.0.1') > Version('0.0.1') - True - """ - - #: Match version and pre-release/build information in version strings - match_version = re.compile(r"([0-9][0-9\.]*)(.+)?").match - - def __init__(self, vstr): - """Create new `Version` object. - - Args: - vstr (basestring): Semantic version string. - """ - if not vstr: - raise ValueError("invalid version number: {!r}".format(vstr)) - - self.vstr = vstr - self.major = 0 - self.minor = 0 - self.patch = 0 - self.suffix = "" - self.build = "" - self._parse(vstr) - - def _parse(self, vstr): - vstr = str(vstr) - if vstr.startswith("v"): - m = self.match_version(vstr[1:]) - else: - m = self.match_version(vstr) - if not m: - raise ValueError("invalid version number: " + vstr) - - version, suffix = m.groups() - parts = self._parse_dotted_string(version) - self.major = parts.pop(0) - if len(parts): - self.minor = parts.pop(0) - if len(parts): - self.patch = parts.pop(0) - if not len(parts) == 0: - raise ValueError("version number too long: " + vstr) - - if suffix: - # Build info - idx = suffix.find("+") - if idx > -1: - self.build = suffix[idx + 1 :] - suffix = suffix[:idx] - if suffix: - if not suffix.startswith("-"): - raise ValueError("suffix must start with - : " + suffix) - self.suffix = suffix[1:] - - def _parse_dotted_string(self, s): - """Parse string ``s`` into list of ints and strings.""" - parsed = [] - parts = s.split(".") - for p in parts: - if p.isdigit(): - p = int(p) - parsed.append(p) - return parsed - - @property - def tuple(self): - """Version number as a tuple of major, minor, patch, pre-release.""" - return (self.major, self.minor, self.patch, self.suffix) - - def __lt__(self, other): - """Implement comparison.""" - if not isinstance(other, Version): - raise ValueError("not a Version instance: {0!r}".format(other)) - t = self.tuple[:3] - o = other.tuple[:3] - if t < o: - return True - if t == o: # We need to compare suffixes - if self.suffix and not other.suffix: - return True - if other.suffix and not self.suffix: - return False - - self_suffix = self._parse_dotted_string(self.suffix) - other_suffix = self._parse_dotted_string(other.suffix) - - for s, o in zip_longest(self_suffix, other_suffix): - if s is None: # shorter value wins - return True - elif o is None: # longer value loses - return False - elif type(s) != type(o): # type coersion - s, o = str(s), str(o) - if s == o: # next if the same compare - continue - return s < o # finally compare - # t > o - return False - - def __eq__(self, other): - """Implement comparison.""" - if not isinstance(other, Version): - raise ValueError("not a Version instance: {0!r}".format(other)) - return self.tuple == other.tuple - - def __ne__(self, other): - """Implement comparison.""" - return not self.__eq__(other) - - def __gt__(self, other): - """Implement comparison.""" - if not isinstance(other, Version): - raise ValueError("not a Version instance: {0!r}".format(other)) - return other.__lt__(self) - - def __le__(self, other): - """Implement comparison.""" - if not isinstance(other, Version): - raise ValueError("not a Version instance: {0!r}".format(other)) - return not other.__lt__(self) - - def __ge__(self, other): - """Implement comparison.""" - return not self.__lt__(other) - - def __str__(self): - """Return semantic version string.""" - vstr = "{0}.{1}.{2}".format(self.major, self.minor, self.patch) - if self.suffix: - vstr = "{0}-{1}".format(vstr, self.suffix) - if self.build: - vstr = "{0}+{1}".format(vstr, self.build) - return vstr - - def __repr__(self): - """Return 'code' representation of `Version`.""" - return "Version('{0}')".format(str(self)) - - -def retrieve_download(dl): - """Saves a download to a temporary file and returns path. - - .. versionadded: 1.37 - - Args: - url (unicode): URL to .alfredworkflow file in GitHub repo - - Returns: - unicode: path to downloaded file - - """ - if not match_workflow(dl.filename): - raise ValueError("attachment not a workflow: " + dl.filename) - - path = os.path.join(tempfile.gettempdir(), dl.filename) - wf().logger.debug("downloading update from " "%r to %r ...", dl.url, path) - - r = requests.get(dl.url) - r.raise_for_status() - - with atomic_writer(path, "wb") as file_obj: - file_obj.write(r.content) - - return path - - -def build_api_url(repo): - """Generate releases URL from GitHub repo. - - Args: - repo (unicode): Repo name in form ``username/repo`` - - Returns: - unicode: URL to the API endpoint for the repo's releases - - """ - if len(repo.split("/")) != 2: - raise ValueError("invalid GitHub repo: {!r}".format(repo)) - - return RELEASES_BASE.format(repo) - - -def get_downloads(repo): - """Load available ``Download``s for GitHub repo. - - .. versionadded: 1.37 - - Args: - repo (unicode): GitHub repo to load releases for. - - Returns: - list: Sequence of `Download` contained in GitHub releases. - """ - url = build_api_url(repo) - - def _fetch(): - wf().logger.info("retrieving releases for %r ...", repo) - r = requests.get(url) - r.raise_for_status() - return r.content - - key = "github-releases-" + repo.replace("/", "-") - js = wf().cached_data(key, _fetch, max_age=60) - - return Download.from_releases(js) - - -def latest_download(dls, alfred_version=None, prereleases=False): - """Return newest `Download`.""" - alfred_version = alfred_version or os.getenv("alfred_version") - version = None - if alfred_version: - version = Version(alfred_version) - - dls.sort(reverse=True) - for dl in dls: - if dl.prerelease and not prereleases: - wf().logger.debug("ignored prerelease: %s", dl.version) - continue - if version and dl.alfred_version > version: - wf().logger.debug( - "ignored incompatible (%s > %s): %s", - dl.alfred_version, - version, - dl.filename, - ) - continue - - wf().logger.debug("latest version: %s (%s)", dl.version, dl.filename) - return dl - - return None - - -def check_update(repo, current_version, prereleases=False, alfred_version=None): - """Check whether a newer release is available on GitHub. - - Args: - repo (unicode): ``username/repo`` for workflow's GitHub repo - current_version (unicode): the currently installed version of the - workflow. :ref:`Semantic versioning ` is required. - prereleases (bool): Whether to include pre-releases. - alfred_version (unicode): version of currently-running Alfred. - if empty, defaults to ``$alfred_version`` environment variable. - - Returns: - bool: ``True`` if an update is available, else ``False`` - - If an update is available, its version number and download URL will - be cached. - - """ - key = "__workflow_latest_version" - # data stored when no update is available - no_update = {"available": False, "download": None, "version": None} - current = Version(current_version) - - dls = get_downloads(repo) - if not len(dls): - wf().logger.warning("no valid downloads for %s", repo) - wf().cache_data(key, no_update) - return False - - wf().logger.info("%d download(s) for %s", len(dls), repo) - - dl = latest_download(dls, alfred_version, prereleases) - - if not dl: - wf().logger.warning("no compatible downloads for %s", repo) - wf().cache_data(key, no_update) - return False - - wf().logger.debug("latest=%r, installed=%r", dl.version, current) - - if dl.version > current: - wf().cache_data( - key, {"version": str(dl.version), "download": dl.dict, "available": True} - ) - return True - - wf().cache_data(key, no_update) - return False - - -def install_update(): - """If a newer release is available, download and install it. - - :returns: ``True`` if an update is installed, else ``False`` - - """ - key = "__workflow_latest_version" - # data stored when no update is available - no_update = {"available": False, "download": None, "version": None} - status = wf().cached_data(key, max_age=0) - - if not status or not status.get("available"): - wf().logger.info("no update available") - return False - - dl = status.get("download") - if not dl: - wf().logger.info("no download information") - return False - - path = retrieve_download(Download.from_dict(dl)) - - wf().logger.info("installing updated workflow ...") - subprocess.call(["open", path]) # nosec - - wf().cache_data(key, no_update) - return True - - -if __name__ == "__main__": # pragma: nocover - import sys - - prereleases = False - - def show_help(status=0): - """Print help message.""" - print("usage: update.py (check|install) " "[--prereleases] ") - sys.exit(status) - - argv = sys.argv[:] - if "-h" in argv or "--help" in argv: - show_help() - - if "--prereleases" in argv: - argv.remove("--prereleases") - prereleases = True - - if len(argv) != 4: - show_help(1) - - action = argv[1] - repo = argv[2] - version = argv[3] - - try: - - if action == "check": - check_update(repo, version, prereleases) - elif action == "install": - install_update() - else: - show_help(1) - - except Exception as err: # ensure traceback is in log file - wf().logger.exception(err) - raise err diff --git a/workflow/util.py b/workflow/util.py deleted file mode 100644 index 998456b..0000000 --- a/workflow/util.py +++ /dev/null @@ -1,647 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -# -# Copyright (c) 2017 Dean Jackson -# -# MIT Licence. See http://opensource.org/licenses/MIT -# -# Created on 2017-12-17 -# - -"""A selection of helper functions useful for building workflows.""" - - -import atexit -import errno -import fcntl -import functools -import json -import os -import signal -import subprocess -import sys -import time -from collections import namedtuple -from contextlib import contextmanager -from threading import Event - -# JXA scripts to call Alfred's API via the Scripting Bridge -# {app} is automatically replaced with "Alfred 3" or -# "com.runningwithcrayons.Alfred" depending on version. -# -# Open Alfred in search (regular) mode -JXA_SEARCH = "Application({app}).search({arg});" -# Open Alfred's File Actions on an argument -JXA_ACTION = "Application({app}).action({arg});" -# Open Alfred's navigation mode at path -JXA_BROWSE = "Application({app}).browse({arg});" -# Set the specified theme -JXA_SET_THEME = "Application({app}).setTheme({arg});" -# Call an External Trigger -JXA_TRIGGER = "Application({app}).runTrigger({arg}, {opts});" -# Save a variable to the workflow configuration sheet/info.plist -JXA_SET_CONFIG = "Application({app}).setConfiguration({arg}, {opts});" -# Delete a variable from the workflow configuration sheet/info.plist -JXA_UNSET_CONFIG = "Application({app}).removeConfiguration({arg}, {opts});" -# Tell Alfred to reload a workflow from disk -JXA_RELOAD_WORKFLOW = "Application({app}).reloadWorkflow({arg});" - - -class AcquisitionError(Exception): - """Raised if a lock cannot be acquired.""" - - -AppInfo = namedtuple("AppInfo", ["name", "path", "bundleid"]) -"""Information about an installed application. - -Returned by :func:`appinfo`. All attributes are Unicode. - -.. py:attribute:: name - - Name of the application, e.g. ``u'Safari'``. - -.. py:attribute:: path - - Path to the application bundle, e.g. ``u'/Applications/Safari.app'``. - -.. py:attribute:: bundleid - - Application's bundle ID, e.g. ``u'com.apple.Safari'``. - -""" - - -def jxa_app_name(): - """Return name of application to call currently running Alfred. - - .. versionadded: 1.37 - - Returns 'Alfred 3' or 'com.runningwithcrayons.Alfred' depending - on which version of Alfred is running. - - This name is suitable for use with ``Application(name)`` in JXA. - - Returns: - unicode: Application name or ID. - - """ - if os.getenv("alfred_version", "").startswith("3"): - # Alfred 3 - return "Alfred 3" - # Alfred 4+ - return "com.runningwithcrayons.Alfred" - - -def unicodify(s, encoding="utf-8", norm=None): - """Ensure string is Unicode. - - .. versionadded:: 1.31 - - Decode encoded strings using ``encoding`` and normalise Unicode - to form ``norm`` if specified. - - Args: - s (str): String to decode. May also be Unicode. - encoding (str, optional): Encoding to use on bytestrings. - norm (None, optional): Normalisation form to apply to Unicode string. - - Returns: - unicode: Decoded, optionally normalised, Unicode string. - - """ - if not isinstance(s, str): - s = str(s, encoding) - - if norm: - from unicodedata import normalize - - s = normalize(norm, s) - - return s - - -def utf8ify(s): - """Ensure string is a bytestring. - - .. versionadded:: 1.31 - - Returns `str` objects unchanced, encodes `unicode` objects to - UTF-8, and calls :func:`str` on anything else. - - Args: - s (object): A Python object - - Returns: - str: UTF-8 string or string representation of s. - - """ - if isinstance(s, str): - return s - - if isinstance(s, str): - return s.encode("utf-8") - - return str(s) - - -def applescriptify(s): - """Escape string for insertion into an AppleScript string. - - .. versionadded:: 1.31 - - Replaces ``"`` with `"& quote &"`. Use this function if you want - to insert a string into an AppleScript script: - - >>> applescriptify('g "python" test') - 'g " & quote & "python" & quote & "test' - - Args: - s (unicode): Unicode string to escape. - - Returns: - unicode: Escaped string. - - """ - return s.replace('"', '" & quote & "') - - -def run_command(cmd, **kwargs): - """Run a command and return the output. - - .. versionadded:: 1.31 - - A thin wrapper around :func:`subprocess.check_output` that ensures - all arguments are encoded to UTF-8 first. - - Args: - cmd (list): Command arguments to pass to :func:`~subprocess.check_output`. - **kwargs: Keyword arguments to pass to :func:`~subprocess.check_output`. - - Returns: - str: Output returned by :func:`~subprocess.check_output`. - - """ - cmd = [str(s) for s in cmd] - return subprocess.check_output(cmd, **kwargs).decode() - - -def run_applescript(script, *args, **kwargs): - """Execute an AppleScript script and return its output. - - .. versionadded:: 1.31 - - Run AppleScript either by filepath or code. If ``script`` is a valid - filepath, that script will be run, otherwise ``script`` is treated - as code. - - Args: - script (str, optional): Filepath of script or code to run. - *args: Optional command-line arguments to pass to the script. - **kwargs: Pass ``lang`` to run a language other than AppleScript. - Any other keyword arguments are passed to :func:`run_command`. - - Returns: - str: Output of run command. - - """ - lang = "AppleScript" - if "lang" in kwargs: - lang = kwargs["lang"] - del kwargs["lang"] - - cmd = ["/usr/bin/osascript", "-l", lang] - - if os.path.exists(script): - cmd += [script] - else: - cmd += ["-e", script] - - cmd.extend(args) - - return run_command(cmd, **kwargs) - - -def run_jxa(script, *args): - """Execute a JXA script and return its output. - - .. versionadded:: 1.31 - - Wrapper around :func:`run_applescript` that passes ``lang=JavaScript``. - - Args: - script (str): Filepath of script or code to run. - *args: Optional command-line arguments to pass to script. - - Returns: - str: Output of script. - - """ - return run_applescript(script, *args, lang="JavaScript") - - -def run_trigger(name, bundleid=None, arg=None): - """Call an Alfred External Trigger. - - .. versionadded:: 1.31 - - If ``bundleid`` is not specified, the bundle ID of the calling - workflow is used. - - Args: - name (str): Name of External Trigger to call. - bundleid (str, optional): Bundle ID of workflow trigger belongs to. - arg (str, optional): Argument to pass to trigger. - - """ - bundleid = bundleid or os.getenv("alfred_workflow_bundleid") - appname = jxa_app_name() - opts = {"inWorkflow": bundleid} - if arg: - opts["withArgument"] = arg - - script = JXA_TRIGGER.format( - app=json.dumps(appname), - arg=json.dumps(name), - opts=json.dumps(opts, sort_keys=True), - ) - - run_applescript(script, lang="JavaScript") - - -def set_theme(theme_name): - """Change Alfred's theme. - - .. versionadded:: 1.39.0 - - Args: - theme_name (unicode): Name of theme Alfred should use. - - """ - appname = jxa_app_name() - script = JXA_SET_THEME.format(app=json.dumps(appname), arg=json.dumps(theme_name)) - run_applescript(script, lang="JavaScript") - - -def set_config(name, value, bundleid=None, exportable=False): - """Set a workflow variable in ``info.plist``. - - .. versionadded:: 1.33 - - If ``bundleid`` is not specified, the bundle ID of the calling - workflow is used. - - Args: - name (str): Name of variable to set. - value (str): Value to set variable to. - bundleid (str, optional): Bundle ID of workflow variable belongs to. - exportable (bool, optional): Whether variable should be marked - as exportable (Don't Export checkbox). - - """ - bundleid = bundleid or os.getenv("alfred_workflow_bundleid") - appname = jxa_app_name() - opts = {"toValue": value, "inWorkflow": bundleid, "exportable": exportable} - - script = JXA_SET_CONFIG.format( - app=json.dumps(appname), - arg=json.dumps(name), - opts=json.dumps(opts, sort_keys=True), - ) - - run_applescript(script, lang="JavaScript") - - -def unset_config(name, bundleid=None): - """Delete a workflow variable from ``info.plist``. - - .. versionadded:: 1.33 - - If ``bundleid`` is not specified, the bundle ID of the calling - workflow is used. - - Args: - name (str): Name of variable to delete. - bundleid (str, optional): Bundle ID of workflow variable belongs to. - - """ - bundleid = bundleid or os.getenv("alfred_workflow_bundleid") - appname = jxa_app_name() - opts = {"inWorkflow": bundleid} - - script = JXA_UNSET_CONFIG.format( - app=json.dumps(appname), - arg=json.dumps(name), - opts=json.dumps(opts, sort_keys=True), - ) - - run_applescript(script, lang="JavaScript") - - -def search_in_alfred(query=None): - """Open Alfred with given search query. - - .. versionadded:: 1.39.0 - - Omit ``query`` to simply open Alfred's main window. - - Args: - query (unicode, optional): Search query. - - """ - query = query or "" - appname = jxa_app_name() - script = JXA_SEARCH.format(app=json.dumps(appname), arg=json.dumps(query)) - run_applescript(script, lang="JavaScript") - - -def browse_in_alfred(path): - """Open Alfred's filesystem navigation mode at ``path``. - - .. versionadded:: 1.39.0 - - Args: - path (unicode): File or directory path. - - """ - appname = jxa_app_name() - script = JXA_BROWSE.format(app=json.dumps(appname), arg=json.dumps(path)) - run_applescript(script, lang="JavaScript") - - -def action_in_alfred(paths): - """Action the give filepaths in Alfred. - - .. versionadded:: 1.39.0 - - Args: - paths (list): Unicode paths to files/directories to action. - - """ - appname = jxa_app_name() - script = JXA_ACTION.format(app=json.dumps(appname), arg=json.dumps(paths)) - run_applescript(script, lang="JavaScript") - - -def reload_workflow(bundleid=None): - """Tell Alfred to reload a workflow from disk. - - .. versionadded:: 1.39.0 - - If ``bundleid`` is not specified, the bundle ID of the calling - workflow is used. - - Args: - bundleid (unicode, optional): Bundle ID of workflow to reload. - - """ - bundleid = bundleid or os.getenv("alfred_workflow_bundleid") - appname = jxa_app_name() - script = JXA_RELOAD_WORKFLOW.format( - app=json.dumps(appname), arg=json.dumps(bundleid) - ) - - run_applescript(script, lang="JavaScript") - - -def appinfo(name): - """Get information about an installed application. - - .. versionadded:: 1.31 - - Args: - name (str): Name of application to look up. - - Returns: - AppInfo: :class:`AppInfo` tuple or ``None`` if app isn't found. - - """ - cmd = [ - "mdfind", - "-onlyin", - "/Applications", - "-onlyin", - "/System/Applications", - "-onlyin", - os.path.expanduser("~/Applications"), - "(kMDItemContentTypeTree == com.apple.application &&" - '(kMDItemDisplayName == "{0}" || kMDItemFSName == "{0}.app"))'.format(name), - ] - - output = run_command(cmd).strip() - if not output: - return None - - path = output.split("\n")[0] - - cmd = ["mdls", "-raw", "-name", "kMDItemCFBundleIdentifier", path] - bid = run_command(cmd).strip() - if not bid: # pragma: no cover - return None - - return AppInfo(name, path, bid) - - -@contextmanager -def atomic_writer(fpath, mode): - """Atomic file writer. - - .. versionadded:: 1.12 - - Context manager that ensures the file is only written if the write - succeeds. The data is first written to a temporary file. - - :param fpath: path of file to write to. - :type fpath: ``unicode`` - :param mode: sames as for :func:`open` - :type mode: string - - """ - suffix = ".{}.tmp".format(os.getpid()) - temppath = fpath + suffix - with open(temppath, mode) as fp: - try: - yield fp - os.rename(temppath, fpath) - finally: - try: - os.remove(temppath) - except OSError: - pass - - -class LockFile(object): - """Context manager to protect filepaths with lockfiles. - - .. versionadded:: 1.13 - - Creates a lockfile alongside ``protected_path``. Other ``LockFile`` - instances will refuse to lock the same path. - - >>> path = '/path/to/file' - >>> with LockFile(path): - >>> with open(path, 'w') as fp: - >>> fp.write(data) - - Args: - protected_path (unicode): File to protect with a lockfile - timeout (float, optional): Raises an :class:`AcquisitionError` - if lock cannot be acquired within this number of seconds. - If ``timeout`` is 0 (the default), wait forever. - delay (float, optional): How often to check (in seconds) if - lock has been released. - - Attributes: - delay (float): How often to check (in seconds) whether the lock - can be acquired. - lockfile (unicode): Path of the lockfile. - timeout (float): How long to wait to acquire the lock. - - """ - - def __init__(self, protected_path, timeout=0.0, delay=0.05): - """Create new :class:`LockFile` object.""" - self.lockfile = protected_path + ".lock" - self._lockfile = None - self.timeout = timeout - self.delay = delay - self._lock = Event() - atexit.register(self.release) - - @property - def locked(self): - """``True`` if file is locked by this instance.""" - return self._lock.is_set() - - def acquire(self, blocking=True): - """Acquire the lock if possible. - - If the lock is in use and ``blocking`` is ``False``, return - ``False``. - - Otherwise, check every :attr:`delay` seconds until it acquires - lock or exceeds attr:`timeout` and raises an :class:`AcquisitionError`. - - """ - if self.locked and not blocking: - return False - - start = time.time() - while True: - # Raise error if we've been waiting too long to acquire the lock - if self.timeout and (time.time() - start) >= self.timeout: - raise AcquisitionError("lock acquisition timed out") - - # If already locked, wait then try again - if self.locked: - time.sleep(self.delay) - continue - - # Create in append mode so we don't lose any contents - if self._lockfile is None: - self._lockfile = open(self.lockfile, "a") - - # Try to acquire the lock - try: - fcntl.lockf(self._lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB) - self._lock.set() - break - except IOError as err: # pragma: no cover - if err.errno not in (errno.EACCES, errno.EAGAIN): - raise - - # Don't try again - if not blocking: # pragma: no cover - return False - - # Wait, then try again - time.sleep(self.delay) - - return True - - def release(self): - """Release the lock by deleting `self.lockfile`.""" - if not self._lock.is_set(): - return False - - try: - fcntl.lockf(self._lockfile, fcntl.LOCK_UN) - except IOError: # pragma: no cover - pass - finally: - self._lock.clear() - self._lockfile = None - try: - os.unlink(self.lockfile) - except OSError: # pragma: no cover - pass - - return True # noqa: B012 - - def __enter__(self): - """Acquire lock.""" - self.acquire() - return self - - def __exit__(self, typ, value, traceback): - """Release lock.""" - self.release() - - def __del__(self): - """Clear up `self.lockfile`.""" - self.release() # pragma: no cover - - -class uninterruptible(object): - """Decorator that postpones SIGTERM until wrapped function returns. - - .. versionadded:: 1.12 - - .. important:: This decorator is NOT thread-safe. - - As of version 2.7, Alfred allows Script Filters to be killed. If - your workflow is killed in the middle of critical code (e.g. - writing data to disk), this may corrupt your workflow's data. - - Use this decorator to wrap critical functions that *must* complete. - If the script is killed while a wrapped function is executing, - the SIGTERM will be caught and handled after your function has - finished executing. - - Alfred-Workflow uses this internally to ensure its settings, data - and cache writes complete. - - """ - - def __init__(self, func, class_name=""): - """Decorate `func`.""" - self.func = func - functools.update_wrapper(self, func) - self._caught_signal = None - - def signal_handler(self, signum, frame): - """Called when process receives SIGTERM.""" - self._caught_signal = (signum, frame) - - def __call__(self, *args, **kwargs): - """Trap ``SIGTERM`` and call wrapped function.""" - self._caught_signal = None - # Register handler for SIGTERM, then call `self.func` - self.old_signal_handler = signal.getsignal(signal.SIGTERM) - signal.signal(signal.SIGTERM, self.signal_handler) - - self.func(*args, **kwargs) - - # Restore old signal handler - signal.signal(signal.SIGTERM, self.old_signal_handler) - - # Handle any signal caught during execution - if self._caught_signal is not None: - signum, frame = self._caught_signal - if callable(self.old_signal_handler): - self.old_signal_handler(signum, frame) - elif self.old_signal_handler == signal.SIG_DFL: - sys.exit(0) - - def __get__(self, obj=None, klass=None): - """Decorator API.""" - return self.__class__(self.func.__get__(obj, klass), klass.__name__) diff --git a/workflow/version b/workflow/version deleted file mode 100644 index ebc91b4..0000000 --- a/workflow/version +++ /dev/null @@ -1 +0,0 @@ -1.40.0 \ No newline at end of file diff --git a/workflow/workflow.py b/workflow/workflow.py deleted file mode 100644 index 22a8da0..0000000 --- a/workflow/workflow.py +++ /dev/null @@ -1,2882 +0,0 @@ -# encoding: utf-8 -# -# Copyright (c) 2014 Dean Jackson -# -# MIT Licence. See http://opensource.org/licenses/MIT -# -# Created on 2014-02-15 -# - -"""The :class:`Workflow` object is the main interface to this library. - -:class:`Workflow` is targeted at Alfred 2. Use -:class:`~workflow.Workflow3` if you want to use Alfred 3's new -features, such as :ref:`workflow variables ` or -more powerful modifiers. - -See :ref:`setup` in the :ref:`user-manual` for an example of how to set -up your Python script to best utilise the :class:`Workflow` object. - -""" - - -import binascii -import json -import logging -import logging.handlers -import os -import pickle -import plistlib -import re -import shutil -import string -import subprocess -import sys -import time -import unicodedata -from contextlib import contextmanager -from copy import deepcopy -from typing import Optional - -try: - import xml.etree.cElementTree as ET -except ImportError: # pragma: no cover - import xml.etree.ElementTree as ET - -# imported to maintain API -from workflow.util import AcquisitionError # noqa: F401 -from workflow.util import LockFile, atomic_writer, uninterruptible - -assert sys.version_info[0] == 3 - -#: Sentinel for properties that haven't been set yet (that might -#: correctly have the value ``None``) -UNSET = object() - -#################################################################### -# Standard system icons -#################################################################### - -# These icons are default macOS icons. They are super-high quality, and -# will be familiar to users. -# This library uses `ICON_ERROR` when a workflow dies in flames, so -# in my own workflows, I use `ICON_WARNING` for less fatal errors -# (e.g. bad user input, no results etc.) - -# The system icons are all in this directory. There are many more than -# are listed here - -ICON_ROOT = "/System/Library/CoreServices/CoreTypes.bundle/Contents/Resources" - -ICON_ACCOUNT = os.path.join(ICON_ROOT, "Accounts.icns") -ICON_BURN = os.path.join(ICON_ROOT, "BurningIcon.icns") -ICON_CLOCK = os.path.join(ICON_ROOT, "Clock.icns") -ICON_COLOR = os.path.join(ICON_ROOT, "ProfileBackgroundColor.icns") -ICON_COLOUR = ICON_COLOR # Queen's English, if you please -ICON_EJECT = os.path.join(ICON_ROOT, "EjectMediaIcon.icns") -# Shown when a workflow throws an error -ICON_ERROR = os.path.join(ICON_ROOT, "AlertStopIcon.icns") -ICON_FAVORITE = os.path.join(ICON_ROOT, "ToolbarFavoritesIcon.icns") -ICON_FAVOURITE = ICON_FAVORITE -ICON_GROUP = os.path.join(ICON_ROOT, "GroupIcon.icns") -ICON_HELP = os.path.join(ICON_ROOT, "HelpIcon.icns") -ICON_HOME = os.path.join(ICON_ROOT, "HomeFolderIcon.icns") -ICON_INFO = os.path.join(ICON_ROOT, "ToolbarInfo.icns") -ICON_NETWORK = os.path.join(ICON_ROOT, "GenericNetworkIcon.icns") -ICON_NOTE = os.path.join(ICON_ROOT, "AlertNoteIcon.icns") -ICON_SETTINGS = os.path.join(ICON_ROOT, "ToolbarAdvanced.icns") -ICON_SWIRL = os.path.join(ICON_ROOT, "ErasingIcon.icns") -ICON_SWITCH = os.path.join(ICON_ROOT, "General.icns") -ICON_SYNC = os.path.join(ICON_ROOT, "Sync.icns") -ICON_TRASH = os.path.join(ICON_ROOT, "TrashIcon.icns") -ICON_USER = os.path.join(ICON_ROOT, "UserIcon.icns") -ICON_WARNING = os.path.join(ICON_ROOT, "AlertCautionIcon.icns") -ICON_WEB = os.path.join(ICON_ROOT, "BookmarkIcon.icns") - -#################################################################### -# non-ASCII to ASCII diacritic folding. -# Used by `fold_to_ascii` method -#################################################################### - -ASCII_REPLACEMENTS = { - "À": "A", - "Á": "A", - "Â": "A", - "Ã": "A", - "Ä": "A", - "Å": "A", - "Æ": "AE", - "Ç": "C", - "È": "E", - "É": "E", - "Ê": "E", - "Ë": "E", - "Ì": "I", - "Í": "I", - "Î": "I", - "Ï": "I", - "Ð": "D", - "Ñ": "N", - "Ò": "O", - "Ó": "O", - "Ô": "O", - "Õ": "O", - "Ö": "O", - "Ø": "O", - "Ù": "U", - "Ú": "U", - "Û": "U", - "Ü": "U", - "Ý": "Y", - "Þ": "Th", - "ß": "ss", - "à": "a", - "á": "a", - "â": "a", - "ã": "a", - "ä": "a", - "å": "a", - "æ": "ae", - "ç": "c", - "è": "e", - "é": "e", - "ê": "e", - "ë": "e", - "ì": "i", - "í": "i", - "î": "i", - "ï": "i", - "ð": "d", - "ñ": "n", - "ò": "o", - "ó": "o", - "ô": "o", - "õ": "o", - "ö": "o", - "ø": "o", - "ù": "u", - "ú": "u", - "û": "u", - "ü": "u", - "ý": "y", - "þ": "th", - "ÿ": "y", - "Ł": "L", - "ł": "l", - "Ń": "N", - "ń": "n", - "Ņ": "N", - "ņ": "n", - "Ň": "N", - "ň": "n", - "Ŋ": "ng", - "ŋ": "NG", - "Ō": "O", - "ō": "o", - "Ŏ": "O", - "ŏ": "o", - "Ő": "O", - "ő": "o", - "Œ": "OE", - "œ": "oe", - "Ŕ": "R", - "ŕ": "r", - "Ŗ": "R", - "ŗ": "r", - "Ř": "R", - "ř": "r", - "Ś": "S", - "ś": "s", - "Ŝ": "S", - "ŝ": "s", - "Ş": "S", - "ş": "s", - "Š": "S", - "š": "s", - "Ţ": "T", - "ţ": "t", - "Ť": "T", - "ť": "t", - "Ŧ": "T", - "ŧ": "t", - "Ũ": "U", - "ũ": "u", - "Ū": "U", - "ū": "u", - "Ŭ": "U", - "ŭ": "u", - "Ů": "U", - "ů": "u", - "Ű": "U", - "ű": "u", - "Ŵ": "W", - "ŵ": "w", - "Ŷ": "Y", - "ŷ": "y", - "Ÿ": "Y", - "Ź": "Z", - "ź": "z", - "Ż": "Z", - "ż": "z", - "Ž": "Z", - "ž": "z", - "ſ": "s", - "Α": "A", - "Β": "B", - "Γ": "G", - "Δ": "D", - "Ε": "E", - "Ζ": "Z", - "Η": "E", - "Θ": "Th", - "Ι": "I", - "Κ": "K", - "Λ": "L", - "Μ": "M", - "Ν": "N", - "Ξ": "Ks", - "Ο": "O", - "Π": "P", - "Ρ": "R", - "Σ": "S", - "Τ": "T", - "Υ": "U", - "Φ": "Ph", - "Χ": "Kh", - "Ψ": "Ps", - "Ω": "O", - "α": "a", - "β": "b", - "γ": "g", - "δ": "d", - "ε": "e", - "ζ": "z", - "η": "e", - "θ": "th", - "ι": "i", - "κ": "k", - "λ": "l", - "μ": "m", - "ν": "n", - "ξ": "x", - "ο": "o", - "π": "p", - "ρ": "r", - "ς": "s", - "σ": "s", - "τ": "t", - "υ": "u", - "φ": "ph", - "χ": "kh", - "ψ": "ps", - "ω": "o", - "А": "A", - "Б": "B", - "В": "V", - "Г": "G", - "Д": "D", - "Е": "E", - "Ж": "Zh", - "З": "Z", - "И": "I", - "Й": "I", - "К": "K", - "Л": "L", - "М": "M", - "Н": "N", - "О": "O", - "П": "P", - "Р": "R", - "С": "S", - "Т": "T", - "У": "U", - "Ф": "F", - "Х": "Kh", - "Ц": "Ts", - "Ч": "Ch", - "Ш": "Sh", - "Щ": "Shch", - "Ъ": "'", - "Ы": "Y", - "Ь": "'", - "Э": "E", - "Ю": "Iu", - "Я": "Ia", - "а": "a", - "б": "b", - "в": "v", - "г": "g", - "д": "d", - "е": "e", - "ж": "zh", - "з": "z", - "и": "i", - "й": "i", - "к": "k", - "л": "l", - "м": "m", - "н": "n", - "о": "o", - "п": "p", - "р": "r", - "с": "s", - "т": "t", - "у": "u", - "ф": "f", - "х": "kh", - "ц": "ts", - "ч": "ch", - "ш": "sh", - "щ": "shch", - "ъ": "'", - "ы": "y", - "ь": "'", - "э": "e", - "ю": "iu", - "я": "ia", - # 'ᴀ': '', - # 'ᴁ': '', - # 'ᴂ': '', - # 'ᴃ': '', - # 'ᴄ': '', - # 'ᴅ': '', - # 'ᴆ': '', - # 'ᴇ': '', - # 'ᴈ': '', - # 'ᴉ': '', - # 'ᴊ': '', - # 'ᴋ': '', - # 'ᴌ': '', - # 'ᴍ': '', - # 'ᴎ': '', - # 'ᴏ': '', - # 'ᴐ': '', - # 'ᴑ': '', - # 'ᴒ': '', - # 'ᴓ': '', - # 'ᴔ': '', - # 'ᴕ': '', - # 'ᴖ': '', - # 'ᴗ': '', - # 'ᴘ': '', - # 'ᴙ': '', - # 'ᴚ': '', - # 'ᴛ': '', - # 'ᴜ': '', - # 'ᴝ': '', - # 'ᴞ': '', - # 'ᴟ': '', - # 'ᴠ': '', - # 'ᴡ': '', - # 'ᴢ': '', - # 'ᴣ': '', - # 'ᴤ': '', - # 'ᴥ': '', - "ᴦ": "G", - "ᴧ": "L", - "ᴨ": "P", - "ᴩ": "R", - "ᴪ": "PS", - "ẞ": "Ss", - "Ỳ": "Y", - "ỳ": "y", - "Ỵ": "Y", - "ỵ": "y", - "Ỹ": "Y", - "ỹ": "y", -} - -#################################################################### -# Smart-to-dumb punctuation mapping -#################################################################### - -DUMB_PUNCTUATION = { - "‘": "'", - "’": "'", - "‚": "'", - "“": '"', - "”": '"', - "„": '"', - "–": "-", - "—": "-", -} - - -#################################################################### -# Used by `Workflow.filter` -#################################################################### - -# Anchor characters in a name -#: Characters that indicate the beginning of a "word" in CamelCase -INITIALS = string.ascii_uppercase + string.digits - -#: Split on non-letters, numbers -split_on_delimiters = re.compile("[^a-zA-Z0-9]").split - -# Match filter flags -#: Match items that start with ``query`` -MATCH_STARTSWITH = 1 -#: Match items whose capital letters start with ``query`` -MATCH_CAPITALS = 2 -#: Match items with a component "word" that matches ``query`` -MATCH_ATOM = 4 -#: Match items whose initials (based on atoms) start with ``query`` -MATCH_INITIALS_STARTSWITH = 8 -#: Match items whose initials (based on atoms) contain ``query`` -MATCH_INITIALS_CONTAIN = 16 -#: Combination of :const:`MATCH_INITIALS_STARTSWITH` and -#: :const:`MATCH_INITIALS_CONTAIN` -MATCH_INITIALS = 24 -#: Match items if ``query`` is a substring -MATCH_SUBSTRING = 32 -#: Match items if all characters in ``query`` appear in the item in order -MATCH_ALLCHARS = 64 -#: Combination of all other ``MATCH_*`` constants -MATCH_ALL = 127 - - -#################################################################### -# Used by `Workflow.check_update` -#################################################################### - -# Number of days to wait between checking for updates to the workflow -DEFAULT_UPDATE_FREQUENCY = 1 - - -#################################################################### -# Keychain access errors -#################################################################### - - -class KeychainError(Exception): - """Raised for unknown Keychain errors. - - Raised by methods :meth:`Workflow.save_password`, - :meth:`Workflow.get_password` and :meth:`Workflow.delete_password` - when ``security`` CLI app returns an unknown error code. - - """ - - -class PasswordNotFound(KeychainError): - """Password not in Keychain. - - Raised by method :meth:`Workflow.get_password` when ``account`` - is unknown to the Keychain. - - """ - - -class PasswordExists(KeychainError): - """Raised when trying to overwrite an existing account password. - - You should never receive this error: it is used internally - by the :meth:`Workflow.save_password` method to know if it needs - to delete the old password first (a Keychain implementation detail). - - """ - - -#################################################################### -# Helper functions -#################################################################### - - -def isascii(text): - """Test if ``text`` contains only ASCII characters. - - :param text: text to test for ASCII-ness - :type text: ``unicode`` - :returns: ``True`` if ``text`` contains only ASCII characters - :rtype: ``Boolean`` - - """ - try: - text.encode("ascii") - except UnicodeEncodeError: - return False - return True - - -#################################################################### -# Implementation classes -#################################################################### - - -class SerializerManager(object): - """Contains registered serializers. - - .. versionadded:: 1.8 - - A configured instance of this class is available at - :attr:`workflow.manager`. - - Use :meth:`register()` to register new (or replace - existing) serializers, which you can specify by name when calling - :class:`~workflow.Workflow` data storage methods. - - See :ref:`guide-serialization` and :ref:`guide-persistent-data` - for further information. - - """ - - def __init__(self): - """Create new SerializerManager object.""" - self._serializers = {} - - def register(self, name, serializer): - """Register ``serializer`` object under ``name``. - - Raises :class:`AttributeError` if ``serializer`` in invalid. - - .. note:: - - ``name`` will be used as the file extension of the saved files. - - :param name: Name to register ``serializer`` under - :type name: ``unicode`` or ``str`` - :param serializer: object with ``load()`` and ``dump()`` - methods - - """ - # Basic validation - serializer.load - serializer.dump - - self._serializers[name] = serializer - - def serializer(self, name): - """Return serializer object for ``name``. - - :param name: Name of serializer to return - :type name: ``unicode`` or ``str`` - :returns: serializer object or ``None`` if no such serializer - is registered. - - """ - return self._serializers.get(name) - - def unregister(self, name): - """Remove registered serializer with ``name``. - - Raises a :class:`ValueError` if there is no such registered - serializer. - - :param name: Name of serializer to remove - :type name: ``unicode`` or ``str`` - :returns: serializer object - - """ - if name not in self._serializers: - raise ValueError("No such serializer registered : {0}".format(name)) - - serializer = self._serializers[name] - del self._serializers[name] - - return serializer - - @property - def serializers(self): - """Return names of registered serializers.""" - return sorted(self._serializers.keys()) - - -class BaseSerializer: - is_binary: Optional[bool] = None - - @classmethod - def binary_mode(cls): - return "b" if cls.is_binary else "" - - @classmethod - def _opener(cls, opener, path, mode="r"): - with opener(path, mode + cls.binary_mode()) as fp: - yield fp - - @classmethod - @contextmanager - def atomic_writer(cls, path, mode): - yield from cls._opener(atomic_writer, path, mode) - - @classmethod - @contextmanager - def open(cls, path, mode): - yield from cls._opener(open, path, mode) - - -class JSONSerializer(BaseSerializer): - """Wrapper around :mod:`json`. Sets ``indent`` and ``encoding``. - - .. versionadded:: 1.8 - - Use this serializer if you need readable data files. JSON doesn't - support Python objects as well as ``pickle``, so be - careful which data you try to serialize as JSON. - - """ - - is_binary = False - - @classmethod - def load(cls, file_obj): - """Load serialized object from open JSON file. - - .. versionadded:: 1.8 - - :param file_obj: file handle - :type file_obj: ``file`` object - :returns: object loaded from JSON file - :rtype: object - - """ - return json.load(file_obj) - - @classmethod - def dump(cls, obj, file_obj): - """Serialize object ``obj`` to open JSON file. - - .. versionadded:: 1.8 - - :param obj: Python object to serialize - :type obj: JSON-serializable data structure - :param file_obj: file handle - :type file_obj: ``file`` object - - """ - return json.dump(obj, file_obj, indent=2) - - -class PickleSerializer(BaseSerializer): - """Wrapper around :mod:`pickle`. Sets ``protocol``. - - .. versionadded:: 1.8 - - Use this serializer if you need to add custom pickling. - - """ - - is_binary = True - - @classmethod - def load(cls, file_obj): - """Load serialized object from open pickle file. - - .. versionadded:: 1.8 - - :param file_obj: file handle - :type file_obj: ``file`` object - :returns: object loaded from pickle file - :rtype: object - - """ - return pickle.load(file_obj) - - @classmethod - def dump(cls, obj, file_obj): - """Serialize object ``obj`` to open pickle file. - - .. versionadded:: 1.8 - - :param obj: Python object to serialize - :type obj: Python object - :param file_obj: file handle - :type file_obj: ``file`` object - - """ - return pickle.dump(obj, file_obj, protocol=-1) - - -# Set up default manager and register built-in serializers -manager = SerializerManager() -manager.register("pickle", PickleSerializer) -manager.register("json", JSONSerializer) - - -class Item(object): - """Represents a feedback item for Alfred. - - Generates Alfred-compliant XML for a single item. - - You probably shouldn't use this class directly, but via - :meth:`Workflow.add_item`. See :meth:`~Workflow.add_item` - for details of arguments. - - """ - - def __init__( - self, - title, - subtitle="", - modifier_subtitles=None, - arg=None, - autocomplete=None, - valid=False, - uid=None, - icon=None, - icontype=None, - type=None, - largetext=None, - copytext=None, - quicklookurl=None, - ): - """Same arguments as :meth:`Workflow.add_item`.""" - self.title = title - self.subtitle = subtitle - self.modifier_subtitles = modifier_subtitles or {} - self.arg = arg - self.autocomplete = autocomplete - self.valid = valid - self.uid = uid - self.icon = icon - self.icontype = icontype - self.type = type - self.largetext = largetext - self.copytext = copytext - self.quicklookurl = quicklookurl - - @property - def elem(self): - """Create and return feedback item for Alfred. - - :returns: :class:`ElementTree.Element ` - instance for this :class:`Item` instance. - - """ - # Attributes on element - attr = {} - if self.valid: - attr["valid"] = "yes" - else: - attr["valid"] = "no" - # Allow empty string for autocomplete. This is a useful value, - # as TABing the result will revert the query back to just the - # keyword - if self.autocomplete is not None: - attr["autocomplete"] = self.autocomplete - - # Optional attributes - for name in ("uid", "type"): - value = getattr(self, name, None) - if value: - attr[name] = value - - root = ET.Element("item", attr) - ET.SubElement(root, "title").text = self.title - ET.SubElement(root, "subtitle").text = self.subtitle - - # Add modifier subtitles - for mod in ("cmd", "ctrl", "alt", "shift", "fn"): - if mod in self.modifier_subtitles: - ET.SubElement( - root, "subtitle", {"mod": mod} - ).text = self.modifier_subtitles[mod] - - # Add arg as element instead of attribute on , as it's more - # flexible (newlines aren't allowed in attributes) - if self.arg: - ET.SubElement(root, "arg").text = self.arg - - # Add icon if there is one - if self.icon: - if self.icontype: - attr = dict(type=self.icontype) - else: - attr = {} - ET.SubElement(root, "icon", attr).text = self.icon - - if self.largetext: - ET.SubElement(root, "text", {"type": "largetype"}).text = self.largetext - - if self.copytext: - ET.SubElement(root, "text", {"type": "copy"}).text = self.copytext - - if self.quicklookurl: - ET.SubElement(root, "quicklookurl").text = self.quicklookurl - - return root - - -class Settings(dict): - """A dictionary that saves itself when changed. - - Dictionary keys & values will be saved as a JSON file - at ``filepath``. If the file does not exist, the dictionary - (and settings file) will be initialised with ``defaults``. - - :param filepath: where to save the settings - :type filepath: :class:`unicode` - :param defaults: dict of default settings - :type defaults: :class:`dict` - - - An appropriate instance is provided by :class:`Workflow` instances at - :attr:`Workflow.settings`. - - """ - - def __init__(self, filepath, defaults=None): - """Create new :class:`Settings` object.""" - super(Settings, self).__init__() - self._filepath = filepath - self._nosave = False - self._original = {} - if os.path.exists(self._filepath): - self._load() - elif defaults: - for key, val in list(defaults.items()): - self[key] = val - self.save() # save default settings - - def _load(self): - """Load cached settings from JSON file `self._filepath`.""" - data = {} - with LockFile(self._filepath, 0.5): - with open(self._filepath, "r") as fp: - data.update(json.load(fp)) - - self._original = deepcopy(data) - - self._nosave = True - self.update(data) - self._nosave = False - - @uninterruptible - def save(self): - """Save settings to JSON file specified in ``self._filepath``. - - If you're using this class via :attr:`Workflow.settings`, which - you probably are, ``self._filepath`` will be ``settings.json`` - in your workflow's data directory (see :attr:`~Workflow.datadir`). - """ - if self._nosave: - return - - data = {} - data.update(self) - - with LockFile(self._filepath, 0.5): - with atomic_writer(self._filepath, "w") as fp: - json.dump(data, fp, sort_keys=True, indent=2) - - # dict methods - def __setitem__(self, key, value): - """Implement :class:`dict` interface.""" - if self._original.get(key) != value: - super(Settings, self).__setitem__(key, value) - self.save() - - def __delitem__(self, key): - """Implement :class:`dict` interface.""" - super(Settings, self).__delitem__(key) - self.save() - - def update(self, *args, **kwargs): - """Override :class:`dict` method to save on update.""" - super(Settings, self).update(*args, **kwargs) - self.save() - - def setdefault(self, key, value=None): - """Override :class:`dict` method to save on update.""" - ret = super(Settings, self).setdefault(key, value) - self.save() - return ret - - -class Workflow(object): - """The ``Workflow`` object is the main interface to Alfred-Workflow. - - It provides APIs for accessing the Alfred/workflow environment, - storing & caching data, using Keychain, and generating Script - Filter feedback. - - ``Workflow`` is compatible with Alfred 2+. Subclass - :class:`~workflow.Workflow3` provides additional features, - only available in Alfred 3+, such as workflow variables. - - :param default_settings: default workflow settings. If no settings file - exists, :class:`Workflow.settings` will be pre-populated with - ``default_settings``. - :type default_settings: :class:`dict` - :param update_settings: settings for updating your workflow from - GitHub releases. The only required key is ``github_slug``, - whose value must take the form of ``username/repo``. - If specified, ``Workflow`` will check the repo's releases - for updates. Your workflow must also have a semantic version - number. Please see the :ref:`User Manual ` and - `update API docs ` for more information. - :type update_settings: :class:`dict` - :param input_encoding: encoding of command line arguments. You - should probably leave this as the default (``utf-8``), which - is the encoding Alfred uses. - :type input_encoding: :class:`unicode` - :param normalization: normalisation to apply to CLI args. - See :meth:`Workflow.decode` for more details. - :type normalization: :class:`unicode` - :param capture_args: Capture and act on ``workflow:*`` arguments. See - :ref:`Magic arguments ` for details. - :type capture_args: :class:`Boolean` - :param libraries: sequence of paths to directories containing - libraries. These paths will be prepended to ``sys.path``. - :type libraries: :class:`tuple` or :class:`list` - :param help_url: URL to webpage where a user can ask for help with - the workflow, report bugs, etc. This could be the GitHub repo - or a page on AlfredForum.com. If your workflow throws an error, - this URL will be displayed in the log and Alfred's debugger. It can - also be opened directly in a web browser with the ``workflow:help`` - :ref:`magic argument `. - :type help_url: :class:`unicode` or :class:`str` - - """ - - # Which class to use to generate feedback items. You probably - # won't want to change this - item_class = Item - - def __init__( - self, - default_settings=None, - update_settings=None, - input_encoding="utf-8", - normalization="NFC", - capture_args=True, - libraries=None, - help_url=None, - ): - """Create new :class:`Workflow` object.""" - - seralizer = "pickle" - - self._default_settings = default_settings or {} - self._update_settings = update_settings or {} - self._input_encoding = input_encoding - self._normalizsation = normalization - self._capture_args = capture_args - self.help_url = help_url - self._workflowdir = None - self._settings_path = None - self._settings = None - self._bundleid = None - self._debugging = None - self._name = None - self._cache_serializer = seralizer - self._data_serializer = seralizer - self._info = None - self._info_loaded = False - self._logger = None - self._items = [] - self._alfred_env = None - # Version number of the workflow - self._version = UNSET - # Version from last workflow run - self._last_version_run = UNSET - # Cache for regex patterns created for filter keys - self._search_pattern_cache = {} - #: Prefix for all magic arguments. - #: The default value is ``workflow:`` so keyword - #: ``config`` would match user query ``workflow:config``. - self.magic_prefix = "workflow:" - #: Mapping of available magic arguments. The built-in magic - #: arguments are registered by default. To add your own magic arguments - #: (or override built-ins), add a key:value pair where the key is - #: what the user should enter (prefixed with :attr:`magic_prefix`) - #: and the value is a callable that will be called when the argument - #: is entered. If you would like to display a message in Alfred, the - #: function should return a ``unicode`` string. - #: - #: By default, the magic arguments documented - #: :ref:`here ` are registered. - self.magic_arguments = {} - - self._register_default_magic() - - if libraries: - sys.path = libraries + sys.path - - #################################################################### - # API methods - #################################################################### - - # info.plist contents and alfred_* environment variables ---------- - - @property - def alfred_version(self): - """Alfred version as :class:`~workflow.update.Version` object.""" - from .update import Version - - return Version(self.alfred_env.get("version")) - - @property - def alfred_env(self): - """Dict of Alfred's environmental variables minus ``alfred_`` prefix. - - .. versionadded:: 1.7 - - The variables Alfred 2.4+ exports are: - - ============================ ========================================= - Variable Description - ============================ ========================================= - debug Set to ``1`` if Alfred's debugger is - open, otherwise unset. - preferences Path to Alfred.alfredpreferences - (where your workflows and settings are - stored). - preferences_localhash Machine-specific preferences are stored - in ``Alfred.alfredpreferences/preferences/local/`` - (see ``preferences`` above for - the path to ``Alfred.alfredpreferences``) - theme ID of selected theme - theme_background Background colour of selected theme in - format ``rgba(r,g,b,a)`` - theme_subtext Show result subtext. - ``0`` = Always, - ``1`` = Alternative actions only, - ``2`` = Selected result only, - ``3`` = Never - version Alfred version number, e.g. ``'2.4'`` - version_build Alfred build number, e.g. ``277`` - workflow_bundleid Bundle ID, e.g. - ``net.deanishe.alfred-mailto`` - workflow_cache Path to workflow's cache directory - workflow_data Path to workflow's data directory - workflow_name Name of current workflow - workflow_uid UID of workflow - workflow_version The version number specified in the - workflow configuration sheet/info.plist - ============================ ========================================= - - **Note:** all values are Unicode strings except ``version_build`` and - ``theme_subtext``, which are integers. - - :returns: ``dict`` of Alfred's environmental variables without the - ``alfred_`` prefix, e.g. ``preferences``, ``workflow_data``. - - """ - if self._alfred_env is not None: - return self._alfred_env - - data = {} - - for key in ( - "debug", - "preferences", - "preferences_localhash", - "theme", - "theme_background", - "theme_subtext", - "version", - "version_build", - "workflow_bundleid", - "workflow_cache", - "workflow_data", - "workflow_name", - "workflow_uid", - "workflow_version", - ): - - value = os.getenv("alfred_" + key, "") - - if value: - if key in ("debug", "version_build", "theme_subtext"): - if value.isdigit(): - value = int(value) - else: - value = False - else: - value = self.decode(value) - - data[key] = value - - self._alfred_env = data - - return self._alfred_env - - @property - def info(self): - """:class:`dict` of ``info.plist`` contents.""" - if not self._info_loaded: - self._load_info_plist() - return self._info - - @property - def bundleid(self): - """Workflow bundle ID from environmental vars or ``info.plist``. - - :returns: bundle ID - :rtype: ``unicode`` - - """ - if not self._bundleid: - if self.alfred_env.get("workflow_bundleid"): - self._bundleid = self.alfred_env.get("workflow_bundleid") - else: - self._bundleid = self.info["bundleid"] - - return self._bundleid - - @property - def debugging(self): - """Whether Alfred's debugger is open. - - :returns: ``True`` if Alfred's debugger is open. - :rtype: ``bool`` - - """ - return bool( - self.alfred_env.get("debug") == 1 or os.environ.get("PYTEST_RUNNING") - ) - - @property - def name(self): - """Workflow name from Alfred's environmental vars or ``info.plist``. - - :returns: workflow name - :rtype: ``unicode`` - - """ - if not self._name: - if self.alfred_env.get("workflow_name"): - self._name = self.decode(self.alfred_env.get("workflow_name")) - else: - self._name = self.decode(self.info["name"]) - - return self._name - - @property - def version(self): - """Return the version of the workflow. - - .. versionadded:: 1.9.10 - - Get the workflow version from environment variable, - the ``update_settings`` dict passed on - instantiation, the ``version`` file located in the workflow's - root directory or ``info.plist``. Return ``None`` if none - exists or :class:`ValueError` if the version number is invalid - (i.e. not semantic). - - :returns: Version of the workflow (not Alfred-Workflow) - :rtype: :class:`~workflow.update.Version` object - - """ - if self._version is UNSET: - - version = None - # environment variable has priority - if self.alfred_env.get("workflow_version"): - version = self.alfred_env["workflow_version"] - - # Try `update_settings` - elif self._update_settings: - version = self._update_settings.get("version") - - # `version` file - if not version: - filepath = self.workflowfile("version") - - if os.path.exists(filepath): - with open(filepath, "r") as fileobj: - version = fileobj.read() - - # info.plist - if not version: - version = self.info.get("version") - - if version: - from .update import Version - - version = Version(version) - - self._version = version - - return self._version - - # Workflow utility methods ----------------------------------------- - - @property - def args(self): - """Return command line args as normalised unicode. - - Args are decoded and normalised via :meth:`~Workflow.decode`. - - The encoding and normalisation are the ``input_encoding`` and - ``normalization`` arguments passed to :class:`Workflow` (``UTF-8`` - and ``NFC`` are the defaults). - - If :class:`Workflow` is called with ``capture_args=True`` - (the default), :class:`Workflow` will look for certain - ``workflow:*`` args and, if found, perform the corresponding - actions and exit the workflow. - - See :ref:`Magic arguments ` for details. - - """ - msg = None - args = [self.decode(arg) for arg in sys.argv[1:]] - - # Handle magic args - if len(args) and self._capture_args: - for name in self.magic_arguments: - key = "{0}{1}".format(self.magic_prefix, name) - if key in args: - msg = self.magic_arguments[name]() - - if msg: - self.logger.debug(msg) - if not sys.stdout.isatty(): # Show message in Alfred - self.add_item(msg, valid=False, icon=ICON_INFO) - self.send_feedback() - sys.exit(0) - return args - - @property - def cachedir(self): - """Path to workflow's cache directory. - - The cache directory is a subdirectory of Alfred's own cache directory - in ``~/Library/Caches``. The full path is in Alfred 4+ is: - - ``~/Library/Caches/com.runningwithcrayons.Alfred/Workflow Data/`` - - For earlier versions: - - ``~/Library/Caches/com.runningwithcrayons.Alfred-X/Workflow Data/`` - - where ``Alfred-X`` may be ``Alfred-2`` or ``Alfred-3``. - - Returns: - unicode: full path to workflow's cache directory - - """ - if self.alfred_env.get("workflow_cache"): - dirpath = self.alfred_env.get("workflow_cache") - - else: - dirpath = self._default_cachedir - - return self._create(dirpath) - - @property - def _default_cachedir(self): - """Alfred 2's default cache directory.""" - return os.path.join( - os.path.expanduser( - "~/Library/Caches/com.runningwithcrayons.Alfred-2/" "Workflow Data/" - ), - self.bundleid, - ) - - @property - def datadir(self): - """Path to workflow's data directory. - - The data directory is a subdirectory of Alfred's own data directory in - ``~/Library/Application Support``. The full path for Alfred 4+ is: - - ``~/Library/Application Support/Alfred/Workflow Data/`` - - For earlier versions, the path is: - - ``~/Library/Application Support/Alfred X/Workflow Data/`` - - where ``Alfred X` is ``Alfred 2`` or ``Alfred 3``. - - Returns: - unicode: full path to workflow data directory - - """ - if self.alfred_env.get("workflow_data"): - dirpath = self.alfred_env.get("workflow_data") - - else: - dirpath = self._default_datadir - - return self._create(dirpath) - - @property - def _default_datadir(self): - """Alfred 2's default data directory.""" - return os.path.join( - os.path.expanduser("~/Library/Application Support/Alfred 2/Workflow Data/"), - self.bundleid, - ) - - @property - def workflowdir(self): - """Path to workflow's root directory (where ``info.plist`` is). - - Returns: - unicode: full path to workflow root directory - - """ - if not self._workflowdir: - # Try the working directory first, then the directory - # the library is in. CWD will be the workflow root if - # a workflow is being run in Alfred - candidates = [ - os.path.abspath(os.getcwd()), - os.path.dirname(os.path.abspath(os.path.dirname(__file__))), - ] - - # climb the directory tree until we find `info.plist` - for dirpath in candidates: - - # Ensure directory path is Unicode - dirpath = self.decode(dirpath) - - while True: - if os.path.exists(os.path.join(dirpath, "info.plist")): - self._workflowdir = dirpath - break - - elif dirpath == "/": - # no `info.plist` found - break - - # Check the parent directory - dirpath = os.path.dirname(dirpath) - - # No need to check other candidates - if self._workflowdir: - break - - if not self._workflowdir: - raise IOError("'info.plist' not found in directory tree") - - return self._workflowdir - - def cachefile(self, filename): - """Path to ``filename`` in workflow's cache directory. - - Return absolute path to ``filename`` within your workflow's - :attr:`cache directory `. - - :param filename: basename of file - :type filename: ``unicode`` - :returns: full path to file within cache directory - :rtype: ``unicode`` - - """ - return os.path.join(self.cachedir, filename) - - def datafile(self, filename): - """Path to ``filename`` in workflow's data directory. - - Return absolute path to ``filename`` within your workflow's - :attr:`data directory `. - - :param filename: basename of file - :type filename: ``unicode`` - :returns: full path to file within data directory - :rtype: ``unicode`` - - """ - return os.path.join(self.datadir, filename) - - def workflowfile(self, filename): - """Return full path to ``filename`` in workflow's root directory. - - :param filename: basename of file - :type filename: ``unicode`` - :returns: full path to file within data directory - :rtype: ``unicode`` - - """ - return os.path.join(self.workflowdir, filename) - - @property - def logfile(self): - """Path to logfile. - - :returns: path to logfile within workflow's cache directory - :rtype: ``unicode`` - - """ - return self.cachefile("%s.log" % self.bundleid) - - @property - def logger(self): - """Logger that logs to both console and a log file. - - If Alfred's debugger is open, log level will be ``DEBUG``, - else it will be ``INFO``. - - Use :meth:`open_log` to open the log file in Console. - - :returns: an initialised :class:`~logging.Logger` - - """ - if self._logger: - return self._logger - - # Initialise new logger and optionally handlers - logger = logging.getLogger("") - - # Only add one set of handlers - # Exclude from coverage, as pytest will have configured the - # root logger already - if not len(logger.handlers): # pragma: no cover - - fmt = logging.Formatter( - "%(asctime)s %(filename)s:%(lineno)s" " %(levelname)-8s %(message)s", - datefmt="%H:%M:%S", - ) - - logfile = logging.handlers.RotatingFileHandler( - self.logfile, maxBytes=1024 * 1024, backupCount=1 - ) - logfile.setFormatter(fmt) - logger.addHandler(logfile) - - console = logging.StreamHandler() - console.setFormatter(fmt) - logger.addHandler(console) - - if self.debugging: - logger.setLevel(logging.DEBUG) - else: - logger.setLevel(logging.INFO) - - self._logger = logger - - return self._logger - - @logger.setter - def logger(self, logger): - """Set a custom logger. - - :param logger: The logger to use - :type logger: `~logging.Logger` instance - - """ - self._logger = logger - - @property - def settings_path(self): - """Path to settings file within workflow's data directory. - - :returns: path to ``settings.json`` file - :rtype: ``unicode`` - - """ - if not self._settings_path: - self._settings_path = self.datafile("settings.json") - return self._settings_path - - @property - def settings(self): - """Return a dictionary subclass that saves itself when changed. - - See :ref:`guide-settings` in the :ref:`user-manual` for more - information on how to use :attr:`settings` and **important - limitations** on what it can do. - - :returns: :class:`~workflow.workflow.Settings` instance - initialised from the data in JSON file at - :attr:`settings_path` or if that doesn't exist, with the - ``default_settings`` :class:`dict` passed to - :class:`Workflow` on instantiation. - :rtype: :class:`~workflow.workflow.Settings` instance - - """ - if not self._settings: - self.logger.debug("reading settings from %s", self.settings_path) - self._settings = Settings(self.settings_path, self._default_settings) - return self._settings - - @property - def cache_serializer(self): - """Name of default cache serializer. - - .. versionadded:: 1.8 - - This serializer is used by :meth:`cache_data()` and - :meth:`cached_data()` - - See :class:`SerializerManager` for details. - - :returns: serializer name - :rtype: ``unicode`` - - """ - return self._cache_serializer - - @cache_serializer.setter - def cache_serializer(self, serializer_name): - """Set the default cache serialization format. - - .. versionadded:: 1.8 - - This serializer is used by :meth:`cache_data()` and - :meth:`cached_data()` - - The specified serializer must already by registered with the - :class:`SerializerManager` at `~workflow.workflow.manager`, - otherwise a :class:`ValueError` will be raised. - - :param serializer_name: Name of default serializer to use. - :type serializer_name: - - """ - if manager.serializer(serializer_name) is None: - raise ValueError( - "Unknown serializer : `{0}`. Register your serializer " - "with `manager` first.".format(serializer_name) - ) - - self.logger.debug("default cache serializer: %s", serializer_name) - - self._cache_serializer = serializer_name - - @property - def data_serializer(self): - """Name of default data serializer. - - .. versionadded:: 1.8 - - This serializer is used by :meth:`store_data()` and - :meth:`stored_data()` - - See :class:`SerializerManager` for details. - - :returns: serializer name - :rtype: ``unicode`` - - """ - return self._data_serializer - - @data_serializer.setter - def data_serializer(self, serializer_name): - """Set the default cache serialization format. - - .. versionadded:: 1.8 - - This serializer is used by :meth:`store_data()` and - :meth:`stored_data()` - - The specified serializer must already by registered with the - :class:`SerializerManager` at `~workflow.workflow.manager`, - otherwise a :class:`ValueError` will be raised. - - :param serializer_name: Name of serializer to use by default. - - """ - if manager.serializer(serializer_name) is None: - raise ValueError( - "Unknown serializer : `{0}`. Register your serializer " - "with `manager` first.".format(serializer_name) - ) - - self.logger.debug("default data serializer: %s", serializer_name) - - self._data_serializer = serializer_name - - def stored_data(self, name): - """Retrieve data from data directory. - - Returns ``None`` if there are no data stored under ``name``. - - .. versionadded:: 1.8 - - :param name: name of datastore - - """ - metadata_path = self.datafile(".{0}.alfred-workflow".format(name)) - - if not os.path.exists(metadata_path): - self.logger.debug("no data stored for `%s`", name) - return None - - with open(metadata_path, "r") as file_obj: - serializer_name = file_obj.read().strip() - - serializer = manager.serializer(serializer_name) - - if serializer is None: - raise ValueError( - "Unknown serializer `{0}`. Register a corresponding " - "serializer with `manager.register()` " - "to load this data.".format(serializer_name) - ) - - self.logger.debug("data `%s` stored as `%s`", name, serializer_name) - - filename = "{0}.{1}".format(name, serializer_name) - data_path = self.datafile(filename) - - if not os.path.exists(data_path): - self.logger.debug("no data stored: %s", name) - if os.path.exists(metadata_path): - os.unlink(metadata_path) - - return None - - with open(data_path, "rb") as file_obj: - data = serializer.load(file_obj) - - self.logger.debug("stored data loaded: %s", data_path) - - return data - - def store_data(self, name, data, serializer=None): - """Save data to data directory. - - .. versionadded:: 1.8 - - If ``data`` is ``None``, the datastore will be deleted. - - Note that the datastore does NOT support mutliple threads. - - :param name: name of datastore - :param data: object(s) to store. **Note:** some serializers - can only handled certain types of data. - :param serializer: name of serializer to use. If no serializer - is specified, the default will be used. See - :class:`SerializerManager` for more information. - :returns: data in datastore or ``None`` - - """ - # Ensure deletion is not interrupted by SIGTERM - @uninterruptible - def delete_paths(paths): - """Clear one or more data stores""" - for path in paths: - if os.path.exists(path): - os.unlink(path) - self.logger.debug("deleted data file: %s", path) - - serializer_name = serializer or self.data_serializer - - # In order for `stored_data()` to be able to load data stored with - # an arbitrary serializer, yet still have meaningful file extensions, - # the format (i.e. extension) is saved to an accompanying file - metadata_path = self.datafile(".{0}.alfred-workflow".format(name)) - filename = "{0}.{1}".format(name, serializer_name) - data_path = self.datafile(filename) - - if data_path == self.settings_path: - raise ValueError( - "Cannot save data to" - + "`{0}` with format `{1}`. ".format(name, serializer_name) - + "This would overwrite Alfred-Workflow's settings file." - ) - - serializer = manager.serializer(serializer_name) - - if serializer is None: - raise ValueError( - "Invalid serializer `{0}`. Register your serializer with " - "`manager.register()` first.".format(serializer_name) - ) - - if data is None: # Delete cached data - delete_paths((metadata_path, data_path)) - return - - if isinstance(data, str): - data = bytearray(data) - - # Ensure write is not interrupted by SIGTERM - @uninterruptible - def _store(): - # Save file extension - with atomic_writer(metadata_path, "w") as file_obj: - file_obj.write(serializer_name) - - with serializer.atomic_writer(data_path, "w") as file_obj: - serializer.dump(data, file_obj) - - _store() - - self.logger.debug("saved data: %s", data_path) - - def cached_data(self, name, data_func=None, max_age=60): - """Return cached data if younger than ``max_age`` seconds. - - Retrieve data from cache or re-generate and re-cache data if - stale/non-existant. If ``max_age`` is 0, return cached data no - matter how old. - - :param name: name of datastore - :param data_func: function to (re-)generate data. - :type data_func: ``callable`` - :param max_age: maximum age of cached data in seconds - :type max_age: ``int`` - :returns: cached data, return value of ``data_func`` or ``None`` - if ``data_func`` is not set - - """ - serializer = manager.serializer(self.cache_serializer) - - cache_path = self.cachefile("%s.%s" % (name, self.cache_serializer)) - age = self.cached_data_age(name) - - if (age < max_age or max_age == 0) and os.path.exists(cache_path): - - with open(cache_path, "rb") as file_obj: - self.logger.debug("loading cached data: %s", cache_path) - return serializer.load(file_obj) - - if not data_func: - return None - - data = data_func() - self.cache_data(name, data) - - return data - - def cache_data(self, name, data): - """Save ``data`` to cache under ``name``. - - If ``data`` is ``None``, the corresponding cache file will be - deleted. - - :param name: name of datastore - :param data: data to store. This may be any object supported by - the cache serializer - - """ - serializer = manager.serializer(self.cache_serializer) - - cache_path = self.cachefile("%s.%s" % (name, self.cache_serializer)) - - if data is None: - if os.path.exists(cache_path): - os.unlink(cache_path) - self.logger.debug("deleted cache file: %s", cache_path) - return - - with serializer.atomic_writer(cache_path, "w") as file_obj: - serializer.dump(data, file_obj) - - self.logger.debug("cached data: %s", cache_path) - - def cached_data_fresh(self, name, max_age): - """Whether cache `name` is less than `max_age` seconds old. - - :param name: name of datastore - :param max_age: maximum age of data in seconds - :type max_age: ``int`` - :returns: ``True`` if data is less than ``max_age`` old, else - ``False`` - - """ - age = self.cached_data_age(name) - - if not age: - return False - - return age < max_age - - def cached_data_age(self, name): - """Return age in seconds of cache `name` or 0 if cache doesn't exist. - - :param name: name of datastore - :type name: ``unicode`` - :returns: age of datastore in seconds - :rtype: ``int`` - - """ - cache_path = self.cachefile("%s.%s" % (name, self.cache_serializer)) - - if not os.path.exists(cache_path): - return 0 - - return time.time() - os.stat(cache_path).st_mtime - - def filter( - self, - query, - items, - key=lambda x: x, - ascending=False, - include_score=False, - min_score=0, - max_results=0, - match_on=MATCH_ALL, - fold_diacritics=True, - ): - """Fuzzy search filter. Returns list of ``items`` that match ``query``. - - ``query`` is case-insensitive. Any item that does not contain the - entirety of ``query`` is rejected. - - If ``query`` is an empty string or contains only whitespace, - all items will match. - - :param query: query to test items against - :type query: ``unicode`` - :param items: iterable of items to test - :type items: ``list`` or ``tuple`` - :param key: function to get comparison key from ``items``. - Must return a ``unicode`` string. The default simply returns - the item. - :type key: ``callable`` - :param ascending: set to ``True`` to get worst matches first - :type ascending: ``Boolean`` - :param include_score: Useful for debugging the scoring algorithm. - If ``True``, results will be a list of tuples - ``(item, score, rule)``. - :type include_score: ``Boolean`` - :param min_score: If non-zero, ignore results with a score lower - than this. - :type min_score: ``int`` - :param max_results: If non-zero, prune results list to this length. - :type max_results: ``int`` - :param match_on: Filter option flags. Bitwise-combined list of - ``MATCH_*`` constants (see below). - :type match_on: ``int`` - :param fold_diacritics: Convert search keys to ASCII-only - characters if ``query`` only contains ASCII characters. - :type fold_diacritics: ``Boolean`` - :returns: list of ``items`` matching ``query`` or list of - ``(item, score, rule)`` `tuples` if ``include_score`` is ``True``. - ``rule`` is the ``MATCH_*`` rule that matched the item. - :rtype: ``list`` - - **Matching rules** - - By default, :meth:`filter` uses all of the following flags (i.e. - :const:`MATCH_ALL`). The tests are always run in the given order: - - 1. :const:`MATCH_STARTSWITH` - Item search key starts with ``query`` (case-insensitive). - 2. :const:`MATCH_CAPITALS` - The list of capital letters in item search key starts with - ``query`` (``query`` may be lower-case). E.g., ``of`` - would match ``OmniFocus``, ``gc`` would match ``Google Chrome``. - 3. :const:`MATCH_ATOM` - Search key is split into "atoms" on non-word characters - (.,-,' etc.). Matches if ``query`` is one of these atoms - (case-insensitive). - 4. :const:`MATCH_INITIALS_STARTSWITH` - Initials are the first characters of the above-described - "atoms" (case-insensitive). - 5. :const:`MATCH_INITIALS_CONTAIN` - ``query`` is a substring of the above-described initials. - 6. :const:`MATCH_INITIALS` - Combination of (4) and (5). - 7. :const:`MATCH_SUBSTRING` - ``query`` is a substring of item search key (case-insensitive). - 8. :const:`MATCH_ALLCHARS` - All characters in ``query`` appear in item search key in - the same order (case-insensitive). - 9. :const:`MATCH_ALL` - Combination of all the above. - - - :const:`MATCH_ALLCHARS` is considerably slower than the other - tests and provides much less accurate results. - - **Examples:** - - To ignore :const:`MATCH_ALLCHARS` (tends to provide the worst - matches and is expensive to run), use - ``match_on=MATCH_ALL ^ MATCH_ALLCHARS``. - - To match only on capitals, use ``match_on=MATCH_CAPITALS``. - - To match only on startswith and substring, use - ``match_on=MATCH_STARTSWITH | MATCH_SUBSTRING``. - - **Diacritic folding** - - .. versionadded:: 1.3 - - If ``fold_diacritics`` is ``True`` (the default), and ``query`` - contains only ASCII characters, non-ASCII characters in search keys - will be converted to ASCII equivalents (e.g. **ü** -> **u**, - **ß** -> **ss**, **é** -> **e**). - - See :const:`ASCII_REPLACEMENTS` for all replacements. - - If ``query`` contains non-ASCII characters, search keys will not be - altered. - - """ - if not query: - return items - - # Remove preceding/trailing spaces - query = query.strip() - - if not query: - return items - - # Use user override if there is one - fold_diacritics = self.settings.get( - "__workflow_diacritic_folding", fold_diacritics - ) - - results = [] - - for item in items: - skip = False - score = 0 - words = [s.strip() for s in query.split(" ")] - value = key(item).strip() - if value == "": - continue - for word in words: - if word == "": - continue - s, rule = self._filter_item(value, word, match_on, fold_diacritics) - - if not s: # Skip items that don't match part of the query - skip = True - score += s - - if skip: - continue - - if score: - # use "reversed" `score` (i.e. highest becomes lowest) and - # `value` as sort key. This means items with the same score - # will be sorted in alphabetical not reverse alphabetical order - results.append( - ((100.0 / score, value.lower(), score), (item, score, rule)) - ) - - # sort on keys, then discard the keys - results.sort(reverse=ascending) - results = [t[1] for t in results] - - if min_score: - results = [r for r in results if r[1] > min_score] - - if max_results and len(results) > max_results: - results = results[:max_results] - - # return list of ``(item, score, rule)`` - if include_score: - return results - # just return list of items - return [t[0] for t in results] - - def _filter_item(self, value, query, match_on, fold_diacritics): - """Filter ``value`` against ``query`` using rules ``match_on``. - - :returns: ``(score, rule)`` - - """ - query = query.lower() - - if not isascii(query): - fold_diacritics = False - - if fold_diacritics: - value = self.fold_to_ascii(value) - - # pre-filter any items that do not contain all characters - # of ``query`` to save on running several more expensive tests - if not set(query) <= set(value.lower()): - - return (0, None) - - # item starts with query - if match_on & MATCH_STARTSWITH and value.lower().startswith(query): - score = 100.0 - (len(value) / len(query)) - - return (score, MATCH_STARTSWITH) - - # query matches capitalised letters in item, - # e.g. of = OmniFocus - if match_on & MATCH_CAPITALS: - initials = "".join([c for c in value if c in INITIALS]) - if initials.lower().startswith(query): - score = 100.0 - (len(initials) / len(query)) - - return (score, MATCH_CAPITALS) - - # split the item into "atoms", i.e. words separated by - # spaces or other non-word characters - if ( - match_on & MATCH_ATOM - or match_on & MATCH_INITIALS_CONTAIN - or match_on & MATCH_INITIALS_STARTSWITH - ): - atoms = [s.lower() for s in split_on_delimiters(value)] - # print('atoms : %s --> %s' % (value, atoms)) - # initials of the atoms - initials = "".join([s[0] for s in atoms if s]) - - if match_on & MATCH_ATOM: - # is `query` one of the atoms in item? - # similar to substring, but scores more highly, as it's - # a word within the item - if query in atoms: - score = 100.0 - (len(value) / len(query)) - - return (score, MATCH_ATOM) - - # `query` matches start (or all) of the initials of the - # atoms, e.g. ``himym`` matches "How I Met Your Mother" - # *and* "how i met your mother" (the ``capitals`` rule only - # matches the former) - if match_on & MATCH_INITIALS_STARTSWITH and initials.startswith(query): - score = 100.0 - (len(initials) / len(query)) - - return (score, MATCH_INITIALS_STARTSWITH) - - # `query` is a substring of initials, e.g. ``doh`` matches - # "The Dukes of Hazzard" - elif match_on & MATCH_INITIALS_CONTAIN and query in initials: - score = 95.0 - (len(initials) / len(query)) - - return (score, MATCH_INITIALS_CONTAIN) - - # `query` is a substring of item - if match_on & MATCH_SUBSTRING and query in value.lower(): - score = 90.0 - (len(value) / len(query)) - - return (score, MATCH_SUBSTRING) - - # finally, assign a score based on how close together the - # characters in `query` are in item. - if match_on & MATCH_ALLCHARS: - search = self._search_for_query(query) - match = search(value) - if match: - score = 100.0 / ( - (1 + match.start()) * (match.end() - match.start() + 1) - ) - - return (score, MATCH_ALLCHARS) - - # Nothing matched - return (0, None) - - def _search_for_query(self, query): - if query in self._search_pattern_cache: - return self._search_pattern_cache[query] - - # Build pattern: include all characters - pattern = [] - for c in query: - # pattern.append('[^{0}]*{0}'.format(re.escape(c))) - pattern.append(".*?{0}".format(re.escape(c))) - pattern = "".join(pattern) - search = re.compile(pattern, re.IGNORECASE).search - - self._search_pattern_cache[query] = search - return search - - def run(self, func, text_errors=False): - """Call ``func`` to run your workflow. - - :param func: Callable to call with ``self`` (i.e. the :class:`Workflow` - instance) as first argument. - :param text_errors: Emit error messages in plain text, not in - Alfred's XML/JSON feedback format. Use this when you're not - running Alfred-Workflow in a Script Filter and would like - to pass the error message to, say, a notification. - :type text_errors: ``Boolean`` - - ``func`` will be called with :class:`Workflow` instance as first - argument. - - ``func`` should be the main entry point to your workflow. - - Any exceptions raised will be logged and an error message will be - output to Alfred. - - """ - start = time.time() - - # Write to debugger to ensure "real" output starts on a new line - print(".", file=sys.stderr) - - # Call workflow's entry function/method within a try-except block - # to catch any errors and display an error message in Alfred - try: - if self.version: - self.logger.debug( - "---------- %s (%s) ----------", self.name, self.version - ) - else: - self.logger.debug("---------- %s ----------", self.name) - - # Run update check if configured for self-updates. - # This call has to go in the `run` try-except block, as it will - # initialise `self.settings`, which will raise an exception - # if `settings.json` isn't valid. - if self._update_settings: - self.check_update() - - # Run workflow's entry function/method - func(self) - - # Set last version run to current version after a successful - # run - self.set_last_version() - - except Exception as err: - self.logger.exception(err) - if self.help_url: - self.logger.info("for assistance, see: %s", self.help_url) - - if not sys.stdout.isatty(): # Show error in Alfred - if text_errors: - print(str(err).encode("utf-8"), end="") - else: - self._items = [] - if self._name: - name = self._name - elif self._bundleid: # pragma: no cover - name = self._bundleid - else: # pragma: no cover - name = os.path.dirname(__file__) - self.add_item( - "Error in workflow '%s'" % name, str(err), icon=ICON_ERROR - ) - self.send_feedback() - return 1 - - finally: - self.logger.debug( - "---------- finished in %0.3fs ----------", time.time() - start - ) - - return 0 - - # Alfred feedback methods ------------------------------------------ - - def add_item( - self, - title, - subtitle="", - modifier_subtitles=None, - arg=None, - autocomplete=None, - valid=False, - uid=None, - icon=None, - icontype=None, - type=None, - largetext=None, - copytext=None, - quicklookurl=None, - ): - """Add an item to be output to Alfred. - - :param title: Title shown in Alfred - :type title: ``unicode`` - :param subtitle: Subtitle shown in Alfred - :type subtitle: ``unicode`` - :param modifier_subtitles: Subtitles shown when modifier - (CMD, OPT etc.) is pressed. Use a ``dict`` with the lowercase - keys ``cmd``, ``ctrl``, ``shift``, ``alt`` and ``fn`` - :type modifier_subtitles: ``dict`` - :param arg: Argument passed by Alfred as ``{query}`` when item is - actioned - :type arg: ``unicode`` - :param autocomplete: Text expanded in Alfred when item is TABbed - :type autocomplete: ``unicode`` - :param valid: Whether or not item can be actioned - :type valid: ``Boolean`` - :param uid: Used by Alfred to remember/sort items - :type uid: ``unicode`` - :param icon: Filename of icon to use - :type icon: ``unicode`` - :param icontype: Type of icon. Must be one of ``None`` , ``'filetype'`` - or ``'fileicon'``. Use ``'filetype'`` when ``icon`` is a filetype - such as ``'public.folder'``. Use ``'fileicon'`` when you wish to - use the icon of the file specified as ``icon``, e.g. - ``icon='/Applications/Safari.app', icontype='fileicon'``. - Leave as `None` if ``icon`` points to an actual - icon file. - :type icontype: ``unicode`` - :param type: Result type. Currently only ``'file'`` is supported - (by Alfred). This will tell Alfred to enable file actions for - this item. - :type type: ``unicode`` - :param largetext: Text to be displayed in Alfred's large text box - if user presses CMD+L on item. - :type largetext: ``unicode`` - :param copytext: Text to be copied to pasteboard if user presses - CMD+C on item. - :type copytext: ``unicode`` - :param quicklookurl: URL to be displayed using Alfred's Quick Look - feature (tapping ``SHIFT`` or ``⌘+Y`` on a result). - :type quicklookurl: ``unicode`` - :returns: :class:`Item` instance - - See :ref:`icons` for a list of the supported system icons. - - .. note:: - - Although this method returns an :class:`Item` instance, you don't - need to hold onto it or worry about it. All generated :class:`Item` - instances are also collected internally and sent to Alfred when - :meth:`send_feedback` is called. - - The generated :class:`Item` is only returned in case you want to - edit it or do something with it other than send it to Alfred. - - """ - item = self.item_class( - title, - subtitle, - modifier_subtitles, - arg, - autocomplete, - valid, - uid, - icon, - icontype, - type, - largetext, - copytext, - quicklookurl, - ) - self._items.append(item) - return item - - def send_feedback(self): - """Print stored items to console/Alfred as XML.""" - root = ET.Element("items") - for item in self._items: - root.append(item.elem) - sys.stdout.write('\n') - sys.stdout.write(ET.tostring(root, encoding="unicode")) - sys.stdout.flush() - - #################################################################### - # Updating methods - #################################################################### - - @property - def first_run(self): - """Return ``True`` if it's the first time this version has run. - - .. versionadded:: 1.9.10 - - Raises a :class:`ValueError` if :attr:`version` isn't set. - - """ - if not self.version: - raise ValueError("No workflow version set") - - if not self.last_version_run: - return True - - return self.version != self.last_version_run - - @property - def last_version_run(self): - """Return version of last version to run (or ``None``). - - .. versionadded:: 1.9.10 - - :returns: :class:`~workflow.update.Version` instance - or ``None`` - - """ - if self._last_version_run is UNSET: - - version = self.settings.get("__workflow_last_version") - if version: - from .update import Version - - version = Version(version) - - self._last_version_run = version - - self.logger.debug("last run version: %s", self._last_version_run) - - return self._last_version_run - - def set_last_version(self, version=None): - """Set :attr:`last_version_run` to current version. - - .. versionadded:: 1.9.10 - - :param version: version to store (default is current version) - :type version: :class:`~workflow.update.Version` instance - or ``unicode`` - :returns: ``True`` if version is saved, else ``False`` - - """ - if not version: - if not self.version: - self.logger.warning("Can't save last version: workflow has no version") - return False - - version = self.version - - if isinstance(version, str): - from .update import Version - - version = Version(version) - - self.settings["__workflow_last_version"] = str(version) - - self.logger.debug("set last run version: %s", version) - - return True - - @property - def update_available(self): - """Whether an update is available. - - .. versionadded:: 1.9 - - See :ref:`guide-updates` in the :ref:`user-manual` for detailed - information on how to enable your workflow to update itself. - - :returns: ``True`` if an update is available, else ``False`` - - """ - key = "__workflow_latest_version" - # Create a new workflow object to ensure standard serialiser - # is used (update.py is called without the user's settings) - status = Workflow().cached_data(key, max_age=0) - - # self.logger.debug('update status: %r', status) - if not status or not status.get("available"): - return False - - return status["available"] - - @property - def prereleases(self): - """Whether workflow should update to pre-release versions. - - .. versionadded:: 1.16 - - :returns: ``True`` if pre-releases are enabled with the :ref:`magic - argument ` or the ``update_settings`` dict, else - ``False``. - - """ - if self._update_settings.get("prereleases"): - return True - - return self.settings.get("__workflow_prereleases") or False - - def check_update(self, force=False): - """Call update script if it's time to check for a new release. - - .. versionadded:: 1.9 - - The update script will be run in the background, so it won't - interfere in the execution of your workflow. - - See :ref:`guide-updates` in the :ref:`user-manual` for detailed - information on how to enable your workflow to update itself. - - :param force: Force update check - :type force: ``Boolean`` - - """ - key = "__workflow_latest_version" - frequency = self._update_settings.get("frequency", DEFAULT_UPDATE_FREQUENCY) - - if not force and not self.settings.get("__workflow_autoupdate", True): - self.logger.debug("Auto update turned off by user") - return - - # Check for new version if it's time - if force or not self.cached_data_fresh(key, frequency * 86400): - repo = self._update_settings["github_slug"] - # version = self._update_settings['version'] - version = str(self.version) - - from .background import run_in_background - - # update.py is adjacent to this file - update_script = os.path.join(os.path.dirname(__file__), "update.py") - - cmd = [sys.executable, update_script, "check", repo, version] - if self.prereleases: - cmd.append("--prereleases") - - self.logger.info("checking for update ...") - - run_in_background("__workflow_update_check", cmd) - - else: - self.logger.debug("update check not due") - - def start_update(self): - """Check for update and download and install new workflow file. - - .. versionadded:: 1.9 - - See :ref:`guide-updates` in the :ref:`user-manual` for detailed - information on how to enable your workflow to update itself. - - :returns: ``True`` if an update is available and will be - installed, else ``False`` - - """ - from . import update - - repo = self._update_settings["github_slug"] - # version = self._update_settings['version'] - version = str(self.version) - - if not update.check_update(repo, version, self.prereleases): - return False - - from .background import run_in_background - - # update.py is adjacent to this file - update_script = os.path.join(os.path.dirname(__file__), "update.py") - - cmd = [sys.executable, update_script, "install", repo, version] - - if self.prereleases: - cmd.append("--prereleases") - - self.logger.debug("downloading update ...") - run_in_background("__workflow_update_install", cmd) - - return True - - #################################################################### - # Keychain password storage methods - #################################################################### - - def save_password(self, account, password, service=None): - """Save account credentials. - - If the account exists, the old password will first be deleted - (Keychain throws an error otherwise). - - If something goes wrong, a :class:`KeychainError` exception will - be raised. - - :param account: name of the account the password is for, e.g. - "Pinboard" - :type account: ``unicode`` - :param password: the password to secure - :type password: ``unicode`` - :param service: Name of the service. By default, this is the - workflow's bundle ID - :type service: ``unicode`` - - """ - if not service: - service = self.bundleid - - try: - self._call_security( - "add-generic-password", service, account, "-w", password - ) - self.logger.debug("saved password : %s:%s", service, account) - - except PasswordExists: - self.logger.debug("password exists : %s:%s", service, account) - current_password = self.get_password(account, service) - - if current_password == password: - self.logger.debug("password unchanged") - - else: - self.delete_password(account, service) - self._call_security( - "add-generic-password", service, account, "-w", password - ) - self.logger.debug("save_password : %s:%s", service, account) - - def get_password(self, account, service=None): - """Retrieve the password saved at ``service/account``. - - Raise :class:`PasswordNotFound` exception if password doesn't exist. - - :param account: name of the account the password is for, e.g. - "Pinboard" - :type account: ``unicode`` - :param service: Name of the service. By default, this is the workflow's - bundle ID - :type service: ``unicode`` - :returns: account password - :rtype: ``unicode`` - - """ - if not service: - service = self.bundleid - - output = self._call_security("find-generic-password", service, account, "-g") - - # Parsing of `security` output is adapted from python-keyring - # by Jason R. Coombs - # https://pypi.python.org/pypi/keyring - m = re.search( - r'password:\s*(?:0x(?P[0-9A-F]+)\s*)?(?:"(?P.*)")?', output - ) - - if m: - groups = m.groupdict() - h = groups.get("hex") - password = groups.get("pw") - if h: - password = str(binascii.unhexlify(h), "utf-8") - - self.logger.debug("got password : %s:%s", service, account) - - return password - - def delete_password(self, account, service=None): - """Delete the password stored at ``service/account``. - - Raise :class:`PasswordNotFound` if account is unknown. - - :param account: name of the account the password is for, e.g. - "Pinboard" - :type account: ``unicode`` - :param service: Name of the service. By default, this is the workflow's - bundle ID - :type service: ``unicode`` - - """ - if not service: - service = self.bundleid - - self._call_security("delete-generic-password", service, account) - - self.logger.debug("deleted password : %s:%s", service, account) - - #################################################################### - # Methods for workflow:* magic args - #################################################################### - - def _register_default_magic(self): # noqa: C901 - """Register the built-in magic arguments.""" - # TODO: refactor & simplify - # Wrap callback and message with callable - def callback(func, msg): - def wrapper(): - func() - return msg - - return wrapper - - self.magic_arguments["delcache"] = callback( - self.clear_cache, "Deleted workflow cache" - ) - self.magic_arguments["deldata"] = callback( - self.clear_data, "Deleted workflow data" - ) - self.magic_arguments["delsettings"] = callback( - self.clear_settings, "Deleted workflow settings" - ) - self.magic_arguments["reset"] = callback(self.reset, "Reset workflow") - self.magic_arguments["openlog"] = callback( - self.open_log, "Opening workflow log file" - ) - self.magic_arguments["opencache"] = callback( - self.open_cachedir, "Opening workflow cache directory" - ) - self.magic_arguments["opendata"] = callback( - self.open_datadir, "Opening workflow data directory" - ) - self.magic_arguments["openworkflow"] = callback( - self.open_workflowdir, "Opening workflow directory" - ) - self.magic_arguments["openterm"] = callback( - self.open_terminal, "Opening workflow root directory in Terminal" - ) - - # Diacritic folding - def fold_on(): - self.settings["__workflow_diacritic_folding"] = True - return "Diacritics will always be folded" - - def fold_off(): - self.settings["__workflow_diacritic_folding"] = False - return "Diacritics will never be folded" - - def fold_default(): - if "__workflow_diacritic_folding" in self.settings: - del self.settings["__workflow_diacritic_folding"] - return "Diacritics folding reset" - - self.magic_arguments["foldingon"] = fold_on - self.magic_arguments["foldingoff"] = fold_off - self.magic_arguments["foldingdefault"] = fold_default - - # Updates - def update_on(): - self.settings["__workflow_autoupdate"] = True - return "Auto update turned on" - - def update_off(): - self.settings["__workflow_autoupdate"] = False - return "Auto update turned off" - - def prereleases_on(): - self.settings["__workflow_prereleases"] = True - return "Prerelease updates turned on" - - def prereleases_off(): - self.settings["__workflow_prereleases"] = False - return "Prerelease updates turned off" - - def do_update(): - if self.start_update(): - return "Downloading and installing update ..." - else: - return "No update available" - - self.magic_arguments["autoupdate"] = update_on - self.magic_arguments["noautoupdate"] = update_off - self.magic_arguments["prereleases"] = prereleases_on - self.magic_arguments["noprereleases"] = prereleases_off - self.magic_arguments["update"] = do_update - - # Help - def do_help(): - if self.help_url: - self.open_help() - return "Opening workflow help URL in browser" - else: - return "Workflow has no help URL" - - def show_version(): - if self.version: - return "Version: {0}".format(self.version) - else: - return "This workflow has no version number" - - def list_magic(): - """Display all available magic args in Alfred.""" - isatty = sys.stderr.isatty() - for name in sorted(self.magic_arguments.keys()): - if name == "magic": - continue - arg = self.magic_prefix + name - self.logger.debug(arg) - - if not isatty: - self.add_item(arg, icon=ICON_INFO) - - if not isatty: - self.send_feedback() - - self.magic_arguments["help"] = do_help - self.magic_arguments["magic"] = list_magic - self.magic_arguments["version"] = show_version - - def clear_cache(self, filter_func=lambda f: True): - """Delete all files in workflow's :attr:`cachedir`. - - :param filter_func: Callable to determine whether a file should be - deleted or not. ``filter_func`` is called with the filename - of each file in the data directory. If it returns ``True``, - the file will be deleted. - By default, *all* files will be deleted. - :type filter_func: ``callable`` - """ - self._delete_directory_contents(self.cachedir, filter_func) - - def clear_data(self, filter_func=lambda f: True): - """Delete all files in workflow's :attr:`datadir`. - - :param filter_func: Callable to determine whether a file should be - deleted or not. ``filter_func`` is called with the filename - of each file in the data directory. If it returns ``True``, - the file will be deleted. - By default, *all* files will be deleted. - :type filter_func: ``callable`` - """ - self._delete_directory_contents(self.datadir, filter_func) - - def clear_settings(self): - """Delete workflow's :attr:`settings_path`.""" - if os.path.exists(self.settings_path): - os.unlink(self.settings_path) - self.logger.debug("deleted : %r", self.settings_path) - - def reset(self): - """Delete workflow settings, cache and data. - - File :attr:`settings ` and directories - :attr:`cache ` and :attr:`data ` are deleted. - - """ - self.clear_cache() - self.clear_data() - self.clear_settings() - - def open_log(self): - """Open :attr:`logfile` in default app (usually Console.app).""" - subprocess.call(["open", self.logfile]) # nosec - - def open_cachedir(self): - """Open the workflow's :attr:`cachedir` in Finder.""" - subprocess.call(["open", self.cachedir]) # nosec - - def open_datadir(self): - """Open the workflow's :attr:`datadir` in Finder.""" - subprocess.call(["open", self.datadir]) # nosec - - def open_workflowdir(self): - """Open the workflow's :attr:`workflowdir` in Finder.""" - subprocess.call(["open", self.workflowdir]) # nosec - - def open_terminal(self): - """Open a Terminal window at workflow's :attr:`workflowdir`.""" - subprocess.call(["open", "-a", "Terminal", self.workflowdir]) # nosec - - def open_help(self): - """Open :attr:`help_url` in default browser.""" - subprocess.call(["open", self.help_url]) # nosec - - return "Opening workflow help URL in browser" - - #################################################################### - # Helper methods - #################################################################### - - def decode(self, text, encoding=None, normalization=None): - """Return ``text`` as normalised unicode. - - If ``encoding`` and/or ``normalization`` is ``None``, the - ``input_encoding``and ``normalization`` parameters passed to - :class:`Workflow` are used. - - :param text: string - :type text: encoded or Unicode string. If ``text`` is already a - Unicode string, it will only be normalised. - :param encoding: The text encoding to use to decode ``text`` to - Unicode. - :type encoding: ``unicode`` or ``None`` - :param normalization: The nomalisation form to apply to ``text``. - :type normalization: ``unicode`` or ``None`` - :returns: decoded and normalised ``unicode`` - - :class:`Workflow` uses "NFC" normalisation by default. This is the - standard for Python and will work well with data from the web (via - :mod:`~workflow.web` or :mod:`json`). - - macOS, on the other hand, uses "NFD" normalisation (nearly), so data - coming from the system (e.g. via :mod:`subprocess` or - :func:`os.listdir`/:mod:`os.path`) may not match. You should either - normalise this data, too, or change the default normalisation used by - :class:`Workflow`. - - """ - encoding = encoding or self._input_encoding - normalization = normalization or self._normalizsation - if not isinstance(text, str): - text = str(text, encoding) - return unicodedata.normalize(normalization, text) - - def fold_to_ascii(self, text): - """Convert non-ASCII characters to closest ASCII equivalent. - - .. versionadded:: 1.3 - - .. note:: This only works for a subset of European languages. - - :param text: text to convert - :type text: ``unicode`` - :returns: text containing only ASCII characters - :rtype: ``unicode`` - - """ - if isascii(text): - return text - text = "".join([ASCII_REPLACEMENTS.get(c, c) for c in text]) - return unicodedata.normalize("NFKD", text) - - def dumbify_punctuation(self, text): - """Convert non-ASCII punctuation to closest ASCII equivalent. - - This method replaces "smart" quotes and n- or m-dashes with their - workaday ASCII equivalents. This method is currently not used - internally, but exists as a helper method for workflow authors. - - .. versionadded: 1.9.7 - - :param text: text to convert - :type text: ``unicode`` - :returns: text with only ASCII punctuation - :rtype: ``unicode`` - - """ - if isascii(text): - return text - - text = "".join([DUMB_PUNCTUATION.get(c, c) for c in text]) - return text - - def _delete_directory_contents(self, dirpath, filter_func): - """Delete all files in a directory. - - :param dirpath: path to directory to clear - :type dirpath: ``unicode`` or ``str`` - :param filter_func function to determine whether a file shall be - deleted or not. - :type filter_func ``callable`` - - """ - if os.path.exists(dirpath): - for filename in os.listdir(dirpath): - if not filter_func(filename): - continue - path = os.path.join(dirpath, filename) - if os.path.isdir(path): - shutil.rmtree(path) - else: - os.unlink(path) - self.logger.debug("deleted : %r", path) - - def _load_info_plist(self): - """Load workflow info from ``info.plist``.""" - # info.plist should be in the directory above this one - with open(self.workflowfile("info.plist"), "rb") as file_obj: - self._info = plistlib.load(file_obj) - self._info_loaded = True - - def _create(self, dirpath): - """Create directory `dirpath` if it doesn't exist. - - :param dirpath: path to directory - :type dirpath: ``unicode`` - :returns: ``dirpath`` argument - :rtype: ``unicode`` - - """ - if not os.path.exists(dirpath): - os.makedirs(dirpath) - return dirpath - - def _call_security(self, action, service, account, *args): - """Call ``security`` CLI program that provides access to keychains. - - May raise `PasswordNotFound`, `PasswordExists` or `KeychainError` - exceptions (the first two are subclasses of `KeychainError`). - - :param action: The ``security`` action to call, e.g. - ``add-generic-password`` - :type action: ``unicode`` - :param service: Name of the service. - :type service: ``unicode`` - :param account: name of the account the password is for, e.g. - "Pinboard" - :type account: ``unicode`` - :param password: the password to secure - :type password: ``unicode`` - :param *args: list of command line arguments to be passed to - ``security`` - :type *args: `list` or `tuple` - :returns: ``(retcode, output)``. ``retcode`` is an `int`, ``output`` a - ``unicode`` string. - :rtype: `tuple` (`int`, ``unicode``) - - """ - cmd = ["security", action, "-s", service, "-a", account] + list(args) - p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - stdout, _ = p.communicate() - if p.returncode == 44: # password does not exist - raise PasswordNotFound() - elif p.returncode == 45: # password already exists - raise PasswordExists() - elif p.returncode > 0: - err = KeychainError("Unknown Keychain error : %s" % stdout) - err.retcode = p.returncode - raise err - return stdout.strip().decode("utf-8") diff --git a/workflow/workflow3.py b/workflow/workflow3.py deleted file mode 100644 index 3a06e33..0000000 --- a/workflow/workflow3.py +++ /dev/null @@ -1,767 +0,0 @@ -# encoding: utf-8 -# -# Copyright (c) 2016 Dean Jackson -# -# MIT Licence. See http://opensource.org/licenses/MIT -# -# Created on 2016-06-25 -# - -"""An Alfred 3+ version of :class:`~workflow.Workflow`. - -:class:`~workflow.Workflow3` supports new features, such as -setting :ref:`workflow-variables` and -:class:`the more advanced modifiers ` supported by Alfred 3+. - -In order for the feedback mechanism to work correctly, it's important -to create :class:`Item3` and :class:`Modifier` objects via the -:meth:`Workflow3.add_item()` and :meth:`Item3.add_modifier()` methods -respectively. If you instantiate :class:`Item3` or :class:`Modifier` -objects directly, the current :class:`Workflow3` object won't be aware -of them, and they won't be sent to Alfred when you call -:meth:`Workflow3.send_feedback()`. - -""" - - -import json -import os -import sys - -from .workflow import ICON_WARNING, Workflow - - -class Variables(dict): - """Workflow variables for Run Script actions. - - .. versionadded: 1.26 - - This class allows you to set workflow variables from - Run Script actions. - - It is a subclass of :class:`dict`. - - >>> v = Variables(username='deanishe', password='hunter2') - >>> v.arg = u'output value' - >>> print(v) - - See :ref:`variables-run-script` in the User Guide for more - information. - - Args: - arg (unicode or list, optional): Main output/``{query}``. - **variables: Workflow variables to set. - - In Alfred 4.1+ and Alfred-Workflow 1.40+, ``arg`` may also be a - :class:`list` or :class:`tuple`. - - Attributes: - arg (unicode or list): Output value (``{query}``). - In Alfred 4.1+ and Alfred-Workflow 1.40+, ``arg`` may also be a - :class:`list` or :class:`tuple`. - config (dict): Configuration for downstream workflow element. - - """ - - def __init__(self, arg=None, **variables): - """Create a new `Variables` object.""" - self.arg = arg - self.config = {} - super(Variables, self).__init__(**variables) - - @property - def obj(self): - """``alfredworkflow`` :class:`dict`.""" - o = {} - if self: - d2 = {} - for k, v in list(self.items()): - d2[k] = v - o["variables"] = d2 - - if self.config: - o["config"] = self.config - - if self.arg is not None: - o["arg"] = self.arg - - return {"alfredworkflow": o} - - def __str__(self): - """Convert to ``alfredworkflow`` JSON object. - - Returns: - unicode: ``alfredworkflow`` JSON object - - """ - if not self and not self.config: - if not self.arg: - return "" - if isinstance(self.arg, str): - return self.arg - - return json.dumps(self.obj) - - -class Modifier(object): - """Modify :class:`Item3` arg/icon/variables when modifier key is pressed. - - Don't use this class directly (as it won't be associated with any - :class:`Item3`), but rather use :meth:`Item3.add_modifier()` - to add modifiers to results. - - >>> it = wf.add_item('Title', 'Subtitle', valid=True) - >>> it.setvar('name', 'default') - >>> m = it.add_modifier('cmd') - >>> m.setvar('name', 'alternate') - - See :ref:`workflow-variables` in the User Guide for more information - and :ref:`example usage `. - - Args: - key (unicode): Modifier key, e.g. ``"cmd"``, ``"alt"`` etc. - subtitle (unicode, optional): Override default subtitle. - arg (unicode, optional): Argument to pass for this modifier. - valid (bool, optional): Override item's validity. - icon (unicode, optional): Filepath/UTI of icon to use - icontype (unicode, optional): Type of icon. See - :meth:`Workflow.add_item() ` - for valid values. - - Attributes: - arg (unicode): Arg to pass to following action. - config (dict): Configuration for a downstream element, such as - a File Filter. - icon (unicode): Filepath/UTI of icon. - icontype (unicode): Type of icon. See - :meth:`Workflow.add_item() ` - for valid values. - key (unicode): Modifier key (see above). - subtitle (unicode): Override item subtitle. - valid (bool): Override item validity. - variables (dict): Workflow variables set by this modifier. - - """ - - def __init__( - self, key, subtitle=None, arg=None, valid=None, icon=None, icontype=None - ): - """Create a new :class:`Modifier`. - - Don't use this class directly (as it won't be associated with any - :class:`Item3`), but rather use :meth:`Item3.add_modifier()` - to add modifiers to results. - - Args: - key (unicode): Modifier key, e.g. ``"cmd"``, ``"alt"`` etc. - subtitle (unicode, optional): Override default subtitle. - arg (unicode, optional): Argument to pass for this modifier. - valid (bool, optional): Override item's validity. - icon (unicode, optional): Filepath/UTI of icon to use - icontype (unicode, optional): Type of icon. See - :meth:`Workflow.add_item() ` - for valid values. - - """ - self.key = key - self.subtitle = subtitle - self.arg = arg - self.valid = valid - self.icon = icon - self.icontype = icontype - - self.config = {} - self.variables = {} - - def setvar(self, name, value): - """Set a workflow variable for this Item. - - Args: - name (unicode): Name of variable. - value (unicode): Value of variable. - - """ - self.variables[name] = value - - def getvar(self, name, default=None): - """Return value of workflow variable for ``name`` or ``default``. - - Args: - name (unicode): Variable name. - default (None, optional): Value to return if variable is unset. - - Returns: - unicode or ``default``: Value of variable if set or ``default``. - - """ - return self.variables.get(name, default) - - @property - def obj(self): - """Modifier formatted for JSON serialization for Alfred 3. - - Returns: - dict: Modifier for serializing to JSON. - - """ - o = {} - - if self.subtitle is not None: - o["subtitle"] = self.subtitle - - if self.arg is not None: - o["arg"] = self.arg - - if self.valid is not None: - o["valid"] = self.valid - - if self.variables: - o["variables"] = self.variables - - if self.config: - o["config"] = self.config - - icon = self._icon() - if icon: - o["icon"] = icon - - return o - - def _icon(self): - """Return `icon` object for item. - - Returns: - dict: Mapping for item `icon` (may be empty). - - """ - icon = {} - if self.icon is not None: - icon["path"] = self.icon - - if self.icontype is not None: - icon["type"] = self.icontype - - return icon - - -class Item3(object): - """Represents a feedback item for Alfred 3+. - - Generates Alfred-compliant JSON for a single item. - - Don't use this class directly (as it then won't be associated with - any :class:`Workflow3 ` object), but rather use - :meth:`Workflow3.add_item() `. - See :meth:`~workflow.Workflow3.add_item` for details of arguments. - - """ - - def __init__( - self, - title, - subtitle="", - arg=None, - autocomplete=None, - match=None, - valid=False, - uid=None, - icon=None, - icontype=None, - type=None, - largetext=None, - copytext=None, - quicklookurl=None, - ): - """Create a new :class:`Item3` object. - - Use same arguments as for - :class:`Workflow.Item `. - - Argument ``subtitle_modifiers`` is not supported. - - """ - self.title = title - self.subtitle = subtitle - self.arg = arg - self.autocomplete = autocomplete - self.match = match - self.valid = valid - self.uid = uid - self.icon = icon - self.icontype = icontype - self.type = type - self.quicklookurl = quicklookurl - self.largetext = largetext - self.copytext = copytext - - self.modifiers = {} - - self.config = {} - self.variables = {} - - def setvar(self, name, value): - """Set a workflow variable for this Item. - - Args: - name (unicode): Name of variable. - value (unicode): Value of variable. - - """ - self.variables[name] = value - - def getvar(self, name, default=None): - """Return value of workflow variable for ``name`` or ``default``. - - Args: - name (unicode): Variable name. - default (None, optional): Value to return if variable is unset. - - Returns: - unicode or ``default``: Value of variable if set or ``default``. - - """ - return self.variables.get(name, default) - - def add_modifier( - self, key, subtitle=None, arg=None, valid=None, icon=None, icontype=None - ): - """Add alternative values for a modifier key. - - Args: - key (unicode): Modifier key, e.g. ``"cmd"`` or ``"alt"`` - subtitle (unicode, optional): Override item subtitle. - arg (unicode, optional): Input for following action. - valid (bool, optional): Override item validity. - icon (unicode, optional): Filepath/UTI of icon. - icontype (unicode, optional): Type of icon. See - :meth:`Workflow.add_item() ` - for valid values. - - In Alfred 4.1+ and Alfred-Workflow 1.40+, ``arg`` may also be a - :class:`list` or :class:`tuple`. - - Returns: - Modifier: Configured :class:`Modifier`. - - """ - mod = Modifier(key, subtitle, arg, valid, icon, icontype) - - # Add Item variables to Modifier - mod.variables.update(self.variables) - - self.modifiers[key] = mod - - return mod - - @property - def obj(self): - """Item formatted for JSON serialization. - - Returns: - dict: Data suitable for Alfred 3 feedback. - - """ - # Required values - o = {"title": self.title, "subtitle": self.subtitle, "valid": self.valid} - - # Optional values - if self.arg is not None: - o["arg"] = self.arg - - if self.autocomplete is not None: - o["autocomplete"] = self.autocomplete - - if self.match is not None: - o["match"] = self.match - - if self.uid is not None: - o["uid"] = self.uid - - if self.type is not None: - o["type"] = self.type - - if self.quicklookurl is not None: - o["quicklookurl"] = self.quicklookurl - - if self.variables: - o["variables"] = self.variables - - if self.config: - o["config"] = self.config - - # Largetype and copytext - text = self._text() - if text: - o["text"] = text - - icon = self._icon() - if icon: - o["icon"] = icon - - # Modifiers - mods = self._modifiers() - if mods: - o["mods"] = mods - - return o - - def _icon(self): - """Return `icon` object for item. - - Returns: - dict: Mapping for item `icon` (may be empty). - - """ - icon = {} - if self.icon is not None: - icon["path"] = self.icon - - if self.icontype is not None: - icon["type"] = self.icontype - - return icon - - def _text(self): - """Return `largetext` and `copytext` object for item. - - Returns: - dict: `text` mapping (may be empty) - - """ - text = {} - if self.largetext is not None: - text["largetype"] = self.largetext - - if self.copytext is not None: - text["copy"] = self.copytext - - return text - - def _modifiers(self): - """Build `mods` dictionary for JSON feedback. - - Returns: - dict: Modifier mapping or `None`. - - """ - if self.modifiers: - mods = {} - for k, mod in list(self.modifiers.items()): - mods[k] = mod.obj - - return mods - - return None - - -class Workflow3(Workflow): - """Workflow class that generates Alfred 3+ feedback. - - It is a subclass of :class:`~workflow.Workflow` and most of its - methods are documented there. - - Attributes: - item_class (class): Class used to generate feedback items. - variables (dict): Top level workflow variables. - - """ - - item_class = Item3 - - def __init__(self, **kwargs): - """Create a new :class:`Workflow3` object. - - See :class:`~workflow.Workflow` for documentation. - - """ - Workflow.__init__(self, **kwargs) - self.variables = {} - self._rerun = 0 - # Get session ID from environment if present - self._session_id = os.getenv("_WF_SESSION_ID") or None - if self._session_id: - self.setvar("_WF_SESSION_ID", self._session_id) - - @property - def _default_cachedir(self): - """Alfred 4's default cache directory.""" - return os.path.join( - os.path.expanduser( - "~/Library/Caches/com.runningwithcrayons.Alfred/" "Workflow Data/" - ), - self.bundleid, - ) - - @property - def _default_datadir(self): - """Alfred 4's default data directory.""" - return os.path.join( - os.path.expanduser("~/Library/Application Support/Alfred/Workflow Data/"), - self.bundleid, - ) - - @property - def rerun(self): - """How often (in seconds) Alfred should re-run the Script Filter.""" - return self._rerun - - @rerun.setter - def rerun(self, seconds): - """Interval at which Alfred should re-run the Script Filter. - - Args: - seconds (int): Interval between runs. - """ - self._rerun = seconds - - @property - def session_id(self): - """A unique session ID every time the user uses the workflow. - - .. versionadded:: 1.25 - - The session ID persists while the user is using this workflow. - It expires when the user runs a different workflow or closes - Alfred. - - """ - if not self._session_id: - from uuid import uuid4 - - self._session_id = uuid4().hex - self.setvar("_WF_SESSION_ID", self._session_id) - - return self._session_id - - def setvar(self, name, value, persist=False): - """Set a "global" workflow variable. - - .. versionchanged:: 1.33 - - These variables are always passed to downstream workflow objects. - - If you have set :attr:`rerun`, these variables are also passed - back to the script when Alfred runs it again. - - Args: - name (unicode): Name of variable. - value (unicode): Value of variable. - persist (bool, optional): Also save variable to ``info.plist``? - - """ - self.variables[name] = value - if persist: - from .util import set_config - - set_config(name, value, self.bundleid) - self.logger.debug( - "saved variable %r with value %r to info.plist", name, value - ) - - def getvar(self, name, default=None): - """Return value of workflow variable for ``name`` or ``default``. - - Args: - name (unicode): Variable name. - default (None, optional): Value to return if variable is unset. - - Returns: - unicode or ``default``: Value of variable if set or ``default``. - - """ - return self.variables.get(name, default) - - def add_item( - self, - title, - subtitle="", - arg=None, - autocomplete=None, - valid=False, - uid=None, - icon=None, - icontype=None, - type=None, - largetext=None, - copytext=None, - quicklookurl=None, - match=None, - ): - """Add an item to be output to Alfred. - - Args: - match (unicode, optional): If you have "Alfred filters results" - turned on for your Script Filter, Alfred (version 3.5 and - above) will filter against this field, not ``title``. - - In Alfred 4.1+ and Alfred-Workflow 1.40+, ``arg`` may also be a - :class:`list` or :class:`tuple`. - - See :meth:`Workflow.add_item() ` for - the main documentation and other parameters. - - The key difference is that this method does not support the - ``modifier_subtitles`` argument. Use the :meth:`~Item3.add_modifier()` - method instead on the returned item instead. - - Returns: - Item3: Alfred feedback item. - - """ - item = self.item_class( - title, - subtitle, - arg, - autocomplete, - match, - valid, - uid, - icon, - icontype, - type, - largetext, - copytext, - quicklookurl, - ) - - # Add variables to child item - item.variables.update(self.variables) - - self._items.append(item) - return item - - @property - def _session_prefix(self): - """Filename prefix for current session.""" - return "_wfsess-{0}-".format(self.session_id) - - def _mk_session_name(self, name): - """New cache name/key based on session ID.""" - return self._session_prefix + name - - def cache_data(self, name, data, session=False): - """Cache API with session-scoped expiry. - - .. versionadded:: 1.25 - - Args: - name (str): Cache key - data (object): Data to cache - session (bool, optional): Whether to scope the cache - to the current session. - - ``name`` and ``data`` are the same as for the - :meth:`~workflow.Workflow.cache_data` method on - :class:`~workflow.Workflow`. - - If ``session`` is ``True``, then ``name`` is prefixed - with :attr:`session_id`. - - """ - if session: - name = self._mk_session_name(name) - - return super(Workflow3, self).cache_data(name, data) - - def cached_data(self, name, data_func=None, max_age=60, session=False): - """Cache API with session-scoped expiry. - - .. versionadded:: 1.25 - - Args: - name (str): Cache key - data_func (callable): Callable that returns fresh data. It - is called if the cache has expired or doesn't exist. - max_age (int): Maximum allowable age of cache in seconds. - session (bool, optional): Whether to scope the cache - to the current session. - - ``name``, ``data_func`` and ``max_age`` are the same as for the - :meth:`~workflow.Workflow.cached_data` method on - :class:`~workflow.Workflow`. - - If ``session`` is ``True``, then ``name`` is prefixed - with :attr:`session_id`. - - """ - if session: - name = self._mk_session_name(name) - - return super(Workflow3, self).cached_data(name, data_func, max_age) - - def clear_session_cache(self, current=False): - """Remove session data from the cache. - - .. versionadded:: 1.25 - .. versionchanged:: 1.27 - - By default, data belonging to the current session won't be - deleted. Set ``current=True`` to also clear current session. - - Args: - current (bool, optional): If ``True``, also remove data for - current session. - - """ - - def _is_session_file(filename): - if current: - return filename.startswith("_wfsess-") - return filename.startswith("_wfsess-") and not filename.startswith( - self._session_prefix - ) - - self.clear_cache(_is_session_file) - - @property - def obj(self): - """Feedback formatted for JSON serialization. - - Returns: - dict: Data suitable for Alfred 3 feedback. - - """ - items = [] - for item in self._items: - items.append(item.obj) - - o = {"items": items} - if self.variables: - o["variables"] = self.variables - if self.rerun: - o["rerun"] = self.rerun - return o - - def warn_empty(self, title, subtitle="", icon=None): - """Add a warning to feedback if there are no items. - - .. versionadded:: 1.31 - - Add a "warning" item to Alfred feedback if no other items - have been added. This is a handy shortcut to prevent Alfred - from showing its fallback searches, which is does if no - items are returned. - - Args: - title (unicode): Title of feedback item. - subtitle (unicode, optional): Subtitle of feedback item. - icon (str, optional): Icon for feedback item. If not - specified, ``ICON_WARNING`` is used. - - Returns: - Item3: Newly-created item. - - """ - if len(self._items): - return - - icon = icon or ICON_WARNING - return self.add_item(title, subtitle, icon=icon) - - def send_feedback(self): - """Print stored items to console/Alfred as JSON.""" - if self.debugging: - json.dump(self.obj, sys.stdout, indent=2, separators=(",", ": ")) - else: - json.dump(self.obj, sys.stdout) - sys.stdout.flush()