From de86710c7d4084ae1cca0319bb404cdc1ae035a8 Mon Sep 17 00:00:00 2001 From: mal Date: Sat, 14 May 2022 05:36:31 +0000 Subject: [PATCH 1/9] safety: rework --- sopel/modules/safety.py | 331 +++++++++++++++++++++++++--------------- sopel/modules/url.py | 20 ++- 2 files changed, 225 insertions(+), 126 deletions(-) diff --git a/sopel/modules/safety.py b/sopel/modules/safety.py index 9537eea2ad..2a6620e5d2 100644 --- a/sopel/modules/safety.py +++ b/sopel/modules/safety.py @@ -7,75 +7,112 @@ """ from __future__ import annotations +from base64 import urlsafe_b64encode import json import logging import os.path import re import threading import time +from typing import Dict, Optional from urllib.parse import urlparse from urllib.request import urlretrieve - import requests from sopel import formatting, plugin, tools -from sopel.config import types +from sopel.bot import Sopel +from sopel.config import Config, types +from sopel.trigger import Trigger LOGGER = logging.getLogger(__name__) PLUGIN_OUTPUT_PREFIX = '[safety] ' -vt_base_api_url = 'https://www.virustotal.com/vtapi/v2/url/' -malware_domains = set() +SAFETY_MODES = ["off", "local", "local strict", "on", "strict"] +VT_API_URL = "https://www.virustotal.com/api/v3/urls" +CACHE_LIMIT = 512 known_good = [] -cache_limit = 512 class SafetySection(types.StaticSection): - enabled_by_default = types.BooleanAttribute('enabled_by_default', default=True) - """Whether to enable URL safety in all channels where it isn't explicitly disabled.""" + enabled_by_default = types.BooleanAttribute("enabled_by_default", default=True) + """Deprecated: Sets default_mode to "off" or "on".""" + default_mode = types.ValidatedAttribute("default_mode") + """Which mode to use in channels without a mode set.""" known_good = types.ListAttribute('known_good') """List of "known good" domains to ignore.""" vt_api_key = types.ValidatedAttribute('vt_api_key') """Optional VirusTotal API key (improves malicious URL detection).""" + domain_blocklist_url = types.ValidatedAttribute("domain_blocklist_url") + """Optional hosts-file formatted domain blocklist to use instead of StevenBlack's.""" -def configure(config): +def configure(settings: Config): """ | name | example | purpose | | ---- | ------- | ------- | - | enabled\\_by\\_default | True | Enable URL safety in all channels where it isn't explicitly disabled. | - | known\\_good | sopel.chat,dftba.net | List of "known good" domains to ignore. | + | default\\_mode | on | Which mode to use in channels without a mode set. | + | known\\_good | sopel.chat,dftba.net | List of "known good" domains or regexes to ignore. This can save VT API calls. | | vt\\_api\\_key | 0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef | Optional VirusTotal API key to improve malicious URL detection | + | domain\\_blocklist\\_url | https://example.com/bad-hosts.txt | Optional hosts-file formatted domain blocklist to use instead of StevenBlack's. | """ - config.define_section('safety', SafetySection) - config.safety.configure_setting( - 'enabled_by_default', - "Enable URL safety in channels that don't specifically disable it?", + settings.define_section("safety", SafetySection) + settings.safety.configure_setting( + "default_mode", + ( + "Which mode should be used in channels that haven't specifically set one?" + "\n({})".format("/".join(SAFETY_MODES)) + ), + default="on", ) - config.safety.configure_setting( + settings.safety.configure_setting( 'known_good', - 'Enter any domains to whitelist', + "Enter any domains to allowlist", ) - config.safety.configure_setting( + settings.safety.configure_setting( 'vt_api_key', "Optionally, enter a VirusTotal API key to improve malicious URL " - "protection.\nOtherwise, only the StevenBlack list will be used." + "protection.\nOtherwise, only the configured hosts list will be used.", + ) + settings.safety.configure_setting( + "domain_blocklist_url", + "Optionally, provide the URL for a hosts-file formatted domain " + "blocklist to use instead of StevenBlack's.", ) -def setup(bot): - bot.config.define_section('safety', SafetySection) +def setup(bot: Sopel): + bot.settings.define_section("safety", SafetySection) + + if bot.settings.safety.default_mode is None: + bot.settings.safety.default_mode = "on" + # migrate from enabled_by_default to default_mode. remove in v8.1 or v9 + if not bot.settings.safety.enabled_by_default: + bot.settings.safety.default_mode = "off" + LOGGER.info( + "config: enabled_by_default is deprecated, please use default_mode=off", + ) if 'safety_cache' not in bot.memory: bot.memory['safety_cache'] = tools.SopelMemory() if 'safety_cache_lock' not in bot.memory: bot.memory['safety_cache_lock'] = threading.Lock() - for item in bot.config.safety.known_good: + for item in bot.settings.safety.known_good: known_good.append(re.compile(item, re.I)) - old_file = os.path.join(bot.config.homedir, 'malwaredomains.txt') + update_local_cache(bot, init=True) + + +def update_local_cache(bot: Sopel, init: bool = False): + """Download the current malware domain list and load it into memory. + + :param init: Load the file even if it's unchanged + """ + + malware_domains = set() + + old_file = os.path.join(bot.settings.homedir, "malwaredomains.txt") if os.path.exists(old_file) and os.path.isfile(old_file): LOGGER.info('Removing old malwaredomains file from %s', old_file) try: @@ -85,13 +122,18 @@ def setup(bot): # Python on Windows throws an exception if the file is in use LOGGER.info('Could not delete %s: %s', old_file, str(err)) - loc = os.path.join(bot.config.homedir, 'unsafedomains.txt') - if os.path.isfile(loc): - if os.path.getmtime(loc) < time.time() - 24 * 60 * 60: - # File exists but older than one day — update it - _download_domain_list(loc) - else: - _download_domain_list(loc) + loc = os.path.join(bot.settings.homedir, "unsafedomains.txt") + if not os.path.isfile(loc) or os.path.getmtime(loc) < time.time() - 24 * 60 * 60: + # File doesn't exist or is older than one day — update it + url = bot.settings.safety.domain_blocklist_url + if url is None or not url.startswith("http"): + url = "https://raw.githubusercontent.com/StevenBlack/hosts/master/hosts" + LOGGER.info("Downloading malicious domain list from %s", url) + # TODO: Can we use a cache header to avoid the download if it's unmodified? + urlretrieve(url, loc) + elif not init: + return + with open(loc, 'r') as f: for line in f: clean_line = str(line).strip().lower() @@ -110,129 +152,180 @@ def setup(bot): # only publicly routable domains matter; skip loopback/link-local stuff malware_domains.add(domain) + bot.memory["safety_cache_local"] = malware_domains -def shutdown(bot): + +def shutdown(bot: Sopel): bot.memory.pop('safety_cache', None) + bot.memory.pop('safety_cache_local', None) bot.memory.pop('safety_cache_lock', None) -def _download_domain_list(path): - url = 'https://raw.githubusercontent.com/StevenBlack/hosts/master/hosts' - LOGGER.info('Downloading malicious domain list from %s', url) - urlretrieve(url, path) - - @plugin.rule(r'(?u).*(https?://\S+).*') @plugin.priority('high') @plugin.output_prefix(PLUGIN_OUTPUT_PREFIX) -def url_handler(bot, trigger): +def url_handler(bot: Sopel, trigger: Trigger): """Checks for malicious URLs""" - check = True # Enable URL checking - strict = False # Strict mode: kick on malicious URL - positives = 0 # Number of engines saying it's malicious - total = 0 # Number of total engines - use_vt = True # Use VirusTotal - check = bot.config.safety.enabled_by_default - if check is None: - # If not set, assume default - check = True - # DB overrides config: - setting = bot.db.get_channel_value(trigger.sender, 'safety') - if setting is not None: - if setting == 'off': - return # Not checking - elif setting in ['on', 'strict', 'local', 'local strict']: - check = True - if setting == 'strict' or setting == 'local strict': - strict = True - if setting == 'local' or setting == 'local strict': - use_vt = False - - if not check: - return # Not overridden by DB, configured default off + mode = bot.db.get_channel_value( + trigger.sender, + "safety", + bot.settings.safety.default_mode, + ) + if mode == "off": + return + local_only = "local" in mode or bot.settings.safety.vt_api_key is None + strict = "strict" in mode - try: - netloc = urlparse(trigger.group(1)).netloc - except ValueError: - return # Invalid IPv6 URL + for url in tools.web.search_urls(trigger): + safe_url = "hxx" + url[3:] + + positives = 0 # Number of engines saying it's malicious + total = 0 # Number of total engines + + try: + netloc = urlparse(url).netloc.lower() + except ValueError: + pass # Invalid address + else: + if any(regex.search(netloc) for regex in known_good): + continue # explicitly allowed + + if netloc in bot.memory["safety_cache_local"]: + LOGGER.debug("[local] domain in blocklist: %r", netloc) + positives += 1 + total += 1 + + result = virustotal_lookup(bot, url, local_only=local_only) + if result: + positives += result["positives"] + total += result["total"] + + if positives >= 1: + # Possibly malicious URL detected! + LOGGER.info( + "Possibly malicious link (%s/%s) posted in %s by %s: %r", + positives, + total, + trigger.sender, + trigger.nick, + safe_url, + ) + bot.say( + "{} {} of {} engine{} flagged a link {} posted as malicious".format( + formatting.bold(formatting.color("WARNING:", "red")), + positives, + total, + "" if total == 1 else "s", + formatting.bold(trigger.nick), + ) + ) + if strict: + bot.kick(trigger.nick, trigger.sender, "Posted a malicious link") + + +def virustotal_lookup(bot: Sopel, url: str, local_only: bool = False) -> Optional[Dict]: + """Check VirusTotal for flags on a URL as malicious. + + :param url: The URL to look up + :param local_only: If set, only check cache, do not make a new request. + :returns: A dict containing information about findings, or None if not found. + """ + safe_url = "hxx" + url[3:] - if any(regex.search(netloc) for regex in known_good): - return # Whitelisted + if url in bot.memory["safety_cache"]: + LOGGER.debug("[VirusTotal] Using cached data for %r", safe_url) + return bot.memory["safety_cache"].get(url) + if local_only: + return None - apikey = bot.config.safety.vt_api_key + LOGGER.debug("[VirusTotal] Looking up %r", safe_url) + url_id = urlsafe_b64encode(url.encode("utf-8")).rstrip(b"=").decode("ascii") try: - if apikey is not None and use_vt: - payload = {'resource': str(trigger), - 'apikey': apikey, - 'scan': '1'} - - if trigger not in bot.memory['safety_cache']: - r = requests.post(vt_base_api_url + 'report', data=payload) - r.raise_for_status() - result = r.json() - fetched = time.time() - if all(k in result for k in ['positives', 'total']): - # cache result only if it contains a scan report - # TODO: handle checking back for results from queued scans - data = {'positives': result['positives'], - 'total': result['total'], - 'fetched': fetched} - bot.memory['safety_cache'][trigger] = data - if len(bot.memory['safety_cache']) >= (2 * cache_limit): - _clean_cache(bot) - else: - LOGGER.debug('using cache') - result = bot.memory['safety_cache'][trigger] - positives = result.get('positives', 0) - total = result.get('total', 0) + r = requests.get( + VT_API_URL + "/" + url_id, + headers={"x-apikey": bot.settings.safety.vt_api_key}, + ) + + if r.status_code == 404: + # Not analyzed - submit new + LOGGER.debug("[VirusTotal] No scan for %r, requesting", safe_url) + # TODO: handle checking back for results from queued scans + r = requests.post( + VT_API_URL, + data={"url": url}, + headers={"x-apikey": bot.settings.safety.vt_api_key}, + ) + return None + r.raise_for_status() + vt_data = r.json() except requests.exceptions.RequestException: # Ignoring exceptions with VT so domain list will always work - LOGGER.debug('[VirusTotal] Error obtaining response.', exc_info=True) + LOGGER.debug( + "[VirusTotal] Error obtaining response for %r", safe_url, exc_info=True + ) except json.JSONDecodeError: # Ignoring exceptions with VT so domain list will always work - LOGGER.debug('[VirusTotal] Malformed response (invalid JSON).', exc_info=True) - - if str(netloc).lower() in malware_domains: - positives += 1 - total += 1 - - if positives >= 1: - # Possibly malicious URL detected! - confidence = '{}%'.format(round((positives / total) * 100)) - msg = ( - 'link posted by %s is possibly malicious ' - % formatting.bold(trigger.nick) + LOGGER.debug( + "[VirusTotal] Malformed response (invalid JSON) for %r", + safe_url, + exc_info=True, ) - msg += '(confidence %s - %s/%s)' % (confidence, positives, total) - warning = formatting.bold(formatting.color('WARNING:', 'red')) - bot.say(warning + ' ' + msg) - if strict: - bot.kick(trigger.nick, trigger.sender, 'Posted a malicious link') + fetched = time.time() + last_analysis = vt_data["data"]["attributes"]["last_analysis_stats"] + # Only count strong opinions (ignore suspicious/timeout/undetected) + result = { + "positives": last_analysis["malicious"], + "total": last_analysis["malicious"] + last_analysis["harmless"], + "fetched": fetched, + "virustotal_data": vt_data, + } + bot.memory["safety_cache"][url] = result + if len(bot.memory["safety_cache"]) >= (2 * CACHE_LIMIT): + _clean_cache(bot) + return result @plugin.command('safety') @plugin.output_prefix(PLUGIN_OUTPUT_PREFIX) -def toggle_safety(bot, trigger): +def toggle_safety(bot: Sopel, trigger: Trigger): """Set safety setting for channel""" if not trigger.admin and bot.channels[trigger.sender].privileges[trigger.nick] < plugin.OP: bot.reply('Only channel operators can change safety settings') return - allowed_states = ['strict', 'on', 'off', 'local', 'local strict'] - if not trigger.group(2) or trigger.group(2).lower() not in allowed_states: - options = ' / '.join(allowed_states) - bot.reply('Available options: %s' % options) + + new_mode = None + if trigger.group(2): + new_mode = trigger.group(2).lower() + + if not new_mode or (new_mode != "default" and new_mode not in SAFETY_MODES): + bot.reply( + "Current mode: {}. Available modes: {}, or default ({})".format( + bot.db.get_channel_value( + trigger.sender, + "safety", + "default", + ), + ", ".join(SAFETY_MODES), + bot.settings.safety.default_mode, + ) + ) return - channel = trigger.sender.lower() - bot.db.set_channel_value(channel, 'safety', trigger.group(2).lower()) - bot.say('Safety is now set to "%s" on this channel' % trigger.group(2)) + if new_mode == "default": + bot.db.delete_channel_value(trigger.sender, "safety") + else: + bot.db.set_channel_value(trigger.sender, "safety", new_mode) + bot.say('Safety is now set to "%s" for this channel' % new_mode) # Clean the cache every day # Code above also calls this if there are too many cache entries @plugin.interval(24 * 60 * 60) -def _clean_cache(bot): +def _clean_cache(bot: Sopel): """Cleans up old entries in URL safety cache.""" + + update_local_cache(bot) + if bot.memory['safety_cache_lock'].acquire(False): LOGGER.info('Starting safety cache cleanup...') try: @@ -246,7 +339,7 @@ def _clean_cache(bot): bot.memory['safety_cache'].pop(key, None) # clean up more values if the cache is still too big - overage = len(bot.memory['safety_cache']) - cache_limit + overage = len(bot.memory['safety_cache']) - CACHE_LIMIT if overage > 0: extra_keys = sorted( (data.fetched, key) diff --git a/sopel/modules/url.py b/sopel/modules/url.py index 6147d3ba44..c7e45136aa 100644 --- a/sopel/modules/url.py +++ b/sopel/modules/url.py @@ -297,14 +297,20 @@ def title_auto(bot: SopelWrapper, trigger: Trigger): if re.match(bot.config.core.prefix + r'\S+', trigger): return - # Avoid fetching known malicious links - if 'safety_cache' in bot.memory and trigger in bot.memory['safety_cache']: - if bot.memory['safety_cache'][trigger]['positives'] > 1: - return - - urls = web.search_urls( + unchecked_urls = web.search_urls( trigger, exclusion_char=bot.config.url.exclusion_char, clean=True) + urls = [] + for url in unchecked_urls: + # Avoid fetching known malicious links + if url in bot.memory.get("safety_cache", {}): + if bot.memory["safety_cache"][url]["positives"] > 0: + continue + netloc = urlparse(url).netloc.lower() + if netloc in bot.memory.get("safety_cache_local", {}): + continue + urls.append(url) + for url, title, domain, tinyurl, dispatched in process_urls(bot, trigger, urls): if not dispatched: message = '%s | %s' % (title, domain) @@ -313,7 +319,7 @@ def title_auto(bot: SopelWrapper, trigger: Trigger): # Guard against responding to other instances of this bot. if message != trigger: bot.say(message) - bot.memory["last_seen_url"][trigger.sender] = url + bot.memory['last_seen_url'][trigger.sender] = url def process_urls( From 979aa5273f55ba3be50315f22b36d59ee220ac71 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Sun, 15 May 2022 14:10:30 -0400 Subject: [PATCH 2/9] squashme: suggestions & domain list download refactor --- sopel/modules/safety.py | 261 +++++++++++++++++++++++++++++----------- 1 file changed, 193 insertions(+), 68 deletions(-) diff --git a/sopel/modules/safety.py b/sopel/modules/safety.py index 2a6620e5d2..98977af75f 100644 --- a/sopel/modules/safety.py +++ b/sopel/modules/safety.py @@ -8,21 +8,22 @@ from __future__ import annotations from base64 import urlsafe_b64encode +from datetime import datetime, timedelta, timezone import json import logging import os.path import re import threading -import time +from time import sleep from typing import Dict, Optional from urllib.parse import urlparse -from urllib.request import urlretrieve import requests -from sopel import formatting, plugin, tools +from sopel import plugin, tools from sopel.bot import Sopel from sopel.config import Config, types +from sopel.formatting import bold, color, colors from sopel.trigger import Trigger @@ -41,7 +42,7 @@ class SafetySection(types.StaticSection): default_mode = types.ValidatedAttribute("default_mode") """Which mode to use in channels without a mode set.""" known_good = types.ListAttribute('known_good') - """List of "known good" domains to ignore.""" + """List of "known good" domains or regexes to consider trusted.""" vt_api_key = types.ValidatedAttribute('vt_api_key') """Optional VirusTotal API key (improves malicious URL detection).""" domain_blocklist_url = types.ValidatedAttribute("domain_blocklist_url") @@ -53,7 +54,7 @@ def configure(settings: Config): | name | example | purpose | | ---- | ------- | ------- | | default\\_mode | on | Which mode to use in channels without a mode set. | - | known\\_good | sopel.chat,dftba.net | List of "known good" domains or regexes to ignore. This can save VT API calls. | + | known\\_good | sopel.chat,dftba.net | List of "known good" domains or regexes to consider trusted. This can save VT API calls. | | vt\\_api\\_key | 0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef | Optional VirusTotal API key to improve malicious URL detection | | domain\\_blocklist\\_url | https://example.com/bad-hosts.txt | Optional hosts-file formatted domain blocklist to use instead of StevenBlack's. | """ @@ -68,7 +69,7 @@ def configure(settings: Config): ) settings.safety.configure_setting( 'known_good', - "Enter any domains to allowlist", + "Enter any domains or regexes to consider trusted", ) settings.safety.configure_setting( 'vt_api_key', @@ -101,17 +102,7 @@ def setup(bot: Sopel): for item in bot.settings.safety.known_good: known_good.append(re.compile(item, re.I)) - update_local_cache(bot, init=True) - - -def update_local_cache(bot: Sopel, init: bool = False): - """Download the current malware domain list and load it into memory. - - :param init: Load the file even if it's unchanged - """ - - malware_domains = set() - + # clean up old files old_file = os.path.join(bot.settings.homedir, "malwaredomains.txt") if os.path.exists(old_file) and os.path.isfile(old_file): LOGGER.info('Removing old malwaredomains file from %s', old_file) @@ -122,19 +113,67 @@ def update_local_cache(bot: Sopel, init: bool = False): # Python on Windows throws an exception if the file is in use LOGGER.info('Could not delete %s: %s', old_file, str(err)) - loc = os.path.join(bot.settings.homedir, "unsafedomains.txt") - if not os.path.isfile(loc) or os.path.getmtime(loc) < time.time() - 24 * 60 * 60: - # File doesn't exist or is older than one day — update it - url = bot.settings.safety.domain_blocklist_url - if url is None or not url.startswith("http"): - url = "https://raw.githubusercontent.com/StevenBlack/hosts/master/hosts" - LOGGER.info("Downloading malicious domain list from %s", url) - # TODO: Can we use a cache header to avoid the download if it's unmodified? - urlretrieve(url, loc) - elif not init: + update_local_cache(bot, init=True) + + +def zwsp(text: str) -> str: + """Insert a zero-width space between each character. + + Useful for reducing highlights and clickability of links. + """ + return "\u200B".join(text) + + +def download_domain_list(bot: Sopel, path: str) -> bool: + """Download the current unsafe domain list. + + :param path: Where to save the unsafe domain list + :returns: True if the list was updated + """ + url = bot.settings.safety.domain_blocklist_url + if url is None or not url.startswith("http"): + url = "https://raw.githubusercontent.com/StevenBlack/hosts/master/hosts" + LOGGER.info("Downloading unsafe domain list from %s", url) + old_etag = bot.db.get_plugin_value("safety", "unsafe_domain_list_etag") + if old_etag: + r = requests.head(url) + if r.headers["ETag"] == old_etag and os.path.isfile(path): + LOGGER.debug("Unsafe domain list unchanged, skipping") + return False + + r = requests.get(url, stream=True) + try: + r.raise_for_status() + with open(path + ".new", "wb") as f: + for data in r.iter_content(None): + f.write(data) + except Exception: + # don't bother handling, we'll try again tomorrow + LOGGER.warning("Unsafe domain list download failed, using cache") + return False + # .new+move so we don't clobber it if the download fails in the middle + os.rename(path + ".new", path) + bot.db.set_plugin_value("safety", "unsafe_domain_list_etag", r.headers.get("etag")) + return True + + +def update_local_cache(bot: Sopel, init: bool = False): + """Download the current malware domain list and load it into memory. + + :param init: Load the file even if it's unchanged + """ + path = os.path.join(bot.settings.homedir, "unsafedomains.txt") + updated = download_domain_list(bot, path) + if not os.path.isfile(path): + LOGGER.warning("Could not load unsafe domain list") + return + + if not updated and not init: return - with open(loc, 'r') as f: + LOGGER.debug("Loading new unsafedomains list") + malware_domains = set() + with open(path, "r") as f: for line in f: clean_line = str(line).strip().lower() if not clean_line or clean_line[0] == '#': @@ -151,7 +190,6 @@ def update_local_cache(bot: Sopel, init: bool = False): if '.' in domain: # only publicly routable domains matter; skip loopback/link-local stuff malware_domains.add(domain) - bot.memory["safety_cache_local"] = malware_domains @@ -188,7 +226,7 @@ def url_handler(bot: Sopel, trigger: Trigger): pass # Invalid address else: if any(regex.search(netloc) for regex in known_good): - continue # explicitly allowed + continue # explicitly trusted if netloc in bot.memory["safety_cache_local"]: LOGGER.debug("[local] domain in blocklist: %r", netloc) @@ -212,27 +250,43 @@ def url_handler(bot: Sopel, trigger: Trigger): ) bot.say( "{} {} of {} engine{} flagged a link {} posted as malicious".format( - formatting.bold(formatting.color("WARNING:", "red")), + bold(color("WARNING:", colors.RED)), positives, total, "" if total == 1 else "s", - formatting.bold(trigger.nick), + bold(trigger.nick), ) ) if strict: bot.kick(trigger.nick, trigger.sender, "Posted a malicious link") -def virustotal_lookup(bot: Sopel, url: str, local_only: bool = False) -> Optional[Dict]: +def virustotal_lookup( + bot: Sopel, + url: str, + local_only: bool = False, + max_cache_age: Optional[timedelta] = None, +) -> Optional[Dict]: """Check VirusTotal for flags on a URL as malicious. :param url: The URL to look up - :param local_only: If set, only check cache, do not make a new request. - :returns: A dict containing information about findings, or None if not found. + :param local_only: If set, only check cache, do not make a new request + :param max_cache_age: If set, don't use cache older than this value. + :returns: A dict containing information about findings, or None if not found """ + if url.startswith("hxxp"): + url = "htt" + url[3:] + elif not url.startswith("http"): + # VT only does http/https URLs + return None + safe_url = "hxx" + url[3:] - if url in bot.memory["safety_cache"]: + oldest_cache = datetime(1970, 1, 1, 0, 0) # default: use any cache available + if max_cache_age is not None: + oldest_cache = datetime.now(timezone.utc) - max_cache_age + cache = bot.memory["safety_cache"] + if url in cache and cache[url]["fetched"] > oldest_cache: LOGGER.debug("[VirusTotal] Using cached data for %r", safe_url) return bot.memory["safety_cache"].get(url) if local_only: @@ -240,44 +294,57 @@ def virustotal_lookup(bot: Sopel, url: str, local_only: bool = False) -> Optiona LOGGER.debug("[VirusTotal] Looking up %r", safe_url) url_id = urlsafe_b64encode(url.encode("utf-8")).rstrip(b"=").decode("ascii") - try: - r = requests.get( - VT_API_URL + "/" + url_id, - headers={"x-apikey": bot.settings.safety.vt_api_key}, - ) - - if r.status_code == 404: - # Not analyzed - submit new - LOGGER.debug("[VirusTotal] No scan for %r, requesting", safe_url) - # TODO: handle checking back for results from queued scans - r = requests.post( - VT_API_URL, - data={"url": url}, + attempts = 5 + requested = False + while attempts > 0: + attempts -= 1 + try: + r = requests.get( + VT_API_URL + "/" + url_id, headers={"x-apikey": bot.settings.safety.vt_api_key}, ) + if r.status_code == 200: + vt_data = r.json() + last_analysis = vt_data["data"]["attributes"]["last_analysis_stats"] + # VT returns 200s for recent submissions before scan results are in... + if not requested or sum(last_analysis.values()) > 0: + break + elif not requested and r.status_code == 404: + # Not analyzed - submit new + LOGGER.debug("[VirusTotal] No scan for %r, requesting", safe_url) + r = requests.post( + VT_API_URL, + data={"url": url}, + headers={"x-apikey": bot.settings.safety.vt_api_key}, + ) + requested = True + sleep(2) # Scans seem to take ~5s minimum, so add 2s + except requests.exceptions.RequestException: + # Ignoring exceptions with VT so domain list will always work + LOGGER.debug( + "[VirusTotal] Error obtaining response for %r", safe_url, exc_info=True + ) return None - r.raise_for_status() - vt_data = r.json() - except requests.exceptions.RequestException: - # Ignoring exceptions with VT so domain list will always work - LOGGER.debug( - "[VirusTotal] Error obtaining response for %r", safe_url, exc_info=True - ) - except json.JSONDecodeError: - # Ignoring exceptions with VT so domain list will always work - LOGGER.debug( - "[VirusTotal] Malformed response (invalid JSON) for %r", - safe_url, - exc_info=True, - ) - fetched = time.time() - last_analysis = vt_data["data"]["attributes"]["last_analysis_stats"] + except json.JSONDecodeError: + # Ignoring exceptions with VT so domain list will always work + LOGGER.debug( + "[VirusTotal] Malformed response (invalid JSON) for %r", + safe_url, + exc_info=True, + ) + return None + sleep(3) + else: # Still no results + LOGGER.debug("[VirusTotal] Scan failed or unfinished for %r", safe_url) + return None + fetched = datetime.now(timezone.utc) # Only count strong opinions (ignore suspicious/timeout/undetected) result = { "positives": last_analysis["malicious"], "total": last_analysis["malicious"] + last_analysis["harmless"], "fetched": fetched, - "virustotal_data": vt_data, + # Subject to change formats! + "virustotal_data": vt_data["data"]["attributes"], } bot.memory["safety_cache"][url] = result if len(bot.memory["safety_cache"]) >= (2 * CACHE_LIMIT): @@ -285,7 +352,65 @@ def virustotal_lookup(bot: Sopel, url: str, local_only: bool = False) -> Optiona return result +@plugin.command("virustotal") +@plugin.example(".virustotal https://malware.wicar.org/") +@plugin.example(".virustotal hxxps://malware.wicar.org/") +@plugin.output_prefix("[safety][VirusTotal] ") +def vt_command(bot: Sopel, trigger: Trigger): + """Look up VT results on demand""" + if not bot.settings.safety.vt_api_key: + bot.reply("Sorry, I don't have a VirusTotal API key configured.") + return + + url = trigger.group(2) + zwsp_safe_url = zwsp("hxx" + url[3:]) + + result = virustotal_lookup(bot, url, max_cache_age=timedelta(minutes=1)) + if not result: + bot.reply("Sorry, an error occurred while looking that up.") + return + + analysis = result["virustotal_data"]["last_analysis_stats"] + + result_types = { + "malicious": colors.RED, + "suspicious": colors.YELLOW, + "harmless": colors.GREEN, + "undetected": colors.GREY, + } + result_strs = [] + for result_type, result_color in result_types.items(): + if analysis[result_type] == 0: + result_strs.append("0 " + result_type) + else: + result_strs.append( + bold( + color(str(analysis[result_type]) + " " + result_type, result_color) + ) + ) + results_str = ", ".join(result_strs) + + vt_scan_time = datetime.fromtimestamp( + result["virustotal_data"]["last_analysis_date"], + timezone.utc, + ) + bot.reply( + "Results for {}: {} as of {}".format( + zwsp_safe_url, + results_str, + tools.time.format_time( + bot.db, + bot.config, + nick=trigger.nick, + channel=trigger.sender, + time=vt_scan_time, + ), + ) + ) + + @plugin.command('safety') +@plugin.example(".safety on") @plugin.output_prefix(PLUGIN_OUTPUT_PREFIX) def toggle_safety(bot: Sopel, trigger: Trigger): """Set safety setting for channel""" @@ -328,9 +453,9 @@ def _clean_cache(bot: Sopel): if bot.memory['safety_cache_lock'].acquire(False): LOGGER.info('Starting safety cache cleanup...') + cutoff = datetime.now(timezone.utc) - timedelta(days=7) try: # clean up by age first - cutoff = time.time() - (7 * 24 * 60 * 60) # 7 days ago old_keys = [] for key, data in bot.memory['safety_cache'].items(): if data['fetched'] <= cutoff: @@ -339,7 +464,7 @@ def _clean_cache(bot: Sopel): bot.memory['safety_cache'].pop(key, None) # clean up more values if the cache is still too big - overage = len(bot.memory['safety_cache']) - CACHE_LIMIT + overage = len(bot.memory["safety_cache"]) - CACHE_LIMIT if overage > 0: extra_keys = sorted( (data.fetched, key) From 37dc73adb994b871ce2160680fcca88aa6915618 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Sun, 15 May 2022 20:02:09 -0400 Subject: [PATCH 3/9] squashme: suggestions and stuff --- sopel/modules/safety.py | 47 +++++++++++++++++++++-------------------- sopel/modules/url.py | 4 ++-- 2 files changed, 26 insertions(+), 25 deletions(-) diff --git a/sopel/modules/safety.py b/sopel/modules/safety.py index 98977af75f..995470b823 100644 --- a/sopel/modules/safety.py +++ b/sopel/modules/safety.py @@ -16,7 +16,7 @@ import threading from time import sleep from typing import Dict, Optional -from urllib.parse import urlparse +from urllib.parse import urlparse, urlunparse import requests @@ -116,12 +116,13 @@ def setup(bot: Sopel): update_local_cache(bot, init=True) -def zwsp(text: str) -> str: - """Insert a zero-width space between each character. - - Useful for reducing highlights and clickability of links. - """ - return "\u200B".join(text) +def safeify_url(url: str) -> str: + """Replace bits of a URL to make it hard to browse to.""" + parts = urlparse(url) + scheme = "hxx" + parts.scheme[3:] # hxxp + netloc = parts.netloc.replace(".", "[.]") # google[.]com and IPv4 + netloc = netloc.replace(":", "[:]") # IPv6 addresses (bad lazy method) + return urlunparse((scheme, netloc) + parts[2:]) def download_domain_list(bot: Sopel, path: str) -> bool: @@ -171,8 +172,8 @@ def update_local_cache(bot: Sopel, init: bool = False): if not updated and not init: return - LOGGER.debug("Loading new unsafedomains list") - malware_domains = set() + LOGGER.debug("Loading new unsafe domain list") + unsafe_domains = set() with open(path, "r") as f: for line in f: clean_line = str(line).strip().lower() @@ -189,8 +190,8 @@ def update_local_cache(bot: Sopel, init: bool = False): if '.' in domain: # only publicly routable domains matter; skip loopback/link-local stuff - malware_domains.add(domain) - bot.memory["safety_cache_local"] = malware_domains + unsafe_domains.add(domain) + bot.memory["safety_cache_local"] = unsafe_domains def shutdown(bot: Sopel): @@ -215,21 +216,21 @@ def url_handler(bot: Sopel, trigger: Trigger): strict = "strict" in mode for url in tools.web.search_urls(trigger): - safe_url = "hxx" + url[3:] + safe_url = safeify_url(url) positives = 0 # Number of engines saying it's malicious total = 0 # Number of total engines try: - netloc = urlparse(url).netloc.lower() + hostname = urlparse(url).hostname.lower() except ValueError: pass # Invalid address else: - if any(regex.search(netloc) for regex in known_good): + if any(regex.search(hostname) for regex in known_good): continue # explicitly trusted - if netloc in bot.memory["safety_cache_local"]: - LOGGER.debug("[local] domain in blocklist: %r", netloc) + if hostname in bot.memory["safety_cache_local"]: + LOGGER.debug("[local] domain in blocklist: %r", hostname) positives += 1 total += 1 @@ -280,9 +281,10 @@ def virustotal_lookup( # VT only does http/https URLs return None - safe_url = "hxx" + url[3:] + safe_url = safeify_url(url) - oldest_cache = datetime(1970, 1, 1, 0, 0) # default: use any cache available + # default: use any cache available + oldest_cache = datetime(1970, 1, 1, 0, 0, tzinfo=timezone.utc) if max_cache_age is not None: oldest_cache = datetime.now(timezone.utc) - max_cache_age cache = bot.memory["safety_cache"] @@ -312,7 +314,7 @@ def virustotal_lookup( elif not requested and r.status_code == 404: # Not analyzed - submit new LOGGER.debug("[VirusTotal] No scan for %r, requesting", safe_url) - r = requests.post( + requests.post( VT_API_URL, data={"url": url}, headers={"x-apikey": bot.settings.safety.vt_api_key}, @@ -343,7 +345,6 @@ def virustotal_lookup( "positives": last_analysis["malicious"], "total": last_analysis["malicious"] + last_analysis["harmless"], "fetched": fetched, - # Subject to change formats! "virustotal_data": vt_data["data"]["attributes"], } bot.memory["safety_cache"][url] = result @@ -363,7 +364,7 @@ def vt_command(bot: Sopel, trigger: Trigger): return url = trigger.group(2) - zwsp_safe_url = zwsp("hxx" + url[3:]) + safe_url = safeify_url(url) result = virustotal_lookup(bot, url, max_cache_age=timedelta(minutes=1)) if not result: @@ -395,8 +396,7 @@ def vt_command(bot: Sopel, trigger: Trigger): timezone.utc, ) bot.reply( - "Results for {}: {} as of {}".format( - zwsp_safe_url, + "Results: {} at {} for {}".format( results_str, tools.time.format_time( bot.db, @@ -405,6 +405,7 @@ def vt_command(bot: Sopel, trigger: Trigger): channel=trigger.sender, time=vt_scan_time, ), + safe_url, ) ) diff --git a/sopel/modules/url.py b/sopel/modules/url.py index c7e45136aa..04140233f4 100644 --- a/sopel/modules/url.py +++ b/sopel/modules/url.py @@ -306,8 +306,8 @@ def title_auto(bot: SopelWrapper, trigger: Trigger): if url in bot.memory.get("safety_cache", {}): if bot.memory["safety_cache"][url]["positives"] > 0: continue - netloc = urlparse(url).netloc.lower() - if netloc in bot.memory.get("safety_cache_local", {}): + hostname = urlparse(url).hostname.lower() + if hostname in bot.memory.get("safety_cache_local", {}): continue urls.append(url) From 5e4e26023a1068d385acbb7084398d97065074f3 Mon Sep 17 00:00:00 2001 From: mal Date: Mon, 16 May 2022 18:31:28 +0000 Subject: [PATCH 4/9] squashme use sopelwrapper --- sopel/modules/safety.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sopel/modules/safety.py b/sopel/modules/safety.py index 995470b823..f5dd1d2d3b 100644 --- a/sopel/modules/safety.py +++ b/sopel/modules/safety.py @@ -21,7 +21,7 @@ import requests from sopel import plugin, tools -from sopel.bot import Sopel +from sopel.bot import Sopel, SopelWrapper from sopel.config import Config, types from sopel.formatting import bold, color, colors from sopel.trigger import Trigger @@ -203,7 +203,7 @@ def shutdown(bot: Sopel): @plugin.rule(r'(?u).*(https?://\S+).*') @plugin.priority('high') @plugin.output_prefix(PLUGIN_OUTPUT_PREFIX) -def url_handler(bot: Sopel, trigger: Trigger): +def url_handler(bot: SopelWrapper, trigger: Trigger): """Checks for malicious URLs""" mode = bot.db.get_channel_value( trigger.sender, @@ -263,7 +263,7 @@ def url_handler(bot: Sopel, trigger: Trigger): def virustotal_lookup( - bot: Sopel, + bot: SopelWrapper, url: str, local_only: bool = False, max_cache_age: Optional[timedelta] = None, @@ -357,7 +357,7 @@ def virustotal_lookup( @plugin.example(".virustotal https://malware.wicar.org/") @plugin.example(".virustotal hxxps://malware.wicar.org/") @plugin.output_prefix("[safety][VirusTotal] ") -def vt_command(bot: Sopel, trigger: Trigger): +def vt_command(bot: SopelWrapper, trigger: Trigger): """Look up VT results on demand""" if not bot.settings.safety.vt_api_key: bot.reply("Sorry, I don't have a VirusTotal API key configured.") @@ -413,7 +413,7 @@ def vt_command(bot: Sopel, trigger: Trigger): @plugin.command('safety') @plugin.example(".safety on") @plugin.output_prefix(PLUGIN_OUTPUT_PREFIX) -def toggle_safety(bot: Sopel, trigger: Trigger): +def toggle_safety(bot: SopelWrapper, trigger: Trigger): """Set safety setting for channel""" if not trigger.admin and bot.channels[trigger.sender].privileges[trigger.nick] < plugin.OP: bot.reply('Only channel operators can change safety settings') From de7a3b40678cd12f2869bdee88c2b5086212fe32 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 25 May 2022 19:37:12 -0400 Subject: [PATCH 5/9] squashme: conditional typing imports --- sopel/modules/safety.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/sopel/modules/safety.py b/sopel/modules/safety.py index f5dd1d2d3b..f203c54558 100644 --- a/sopel/modules/safety.py +++ b/sopel/modules/safety.py @@ -15,17 +15,21 @@ import re import threading from time import sleep -from typing import Dict, Optional +from typing import TYPE_CHECKING from urllib.parse import urlparse, urlunparse import requests from sopel import plugin, tools -from sopel.bot import Sopel, SopelWrapper -from sopel.config import Config, types +from sopel.config import types from sopel.formatting import bold, color, colors -from sopel.trigger import Trigger +if TYPE_CHECKING: + from typing import Dict, Optional + + from sopel.bot import Sopel, SopelWrapper + from sopel.config import Config + from sopel.trigger import Trigger LOGGER = logging.getLogger(__name__) PLUGIN_OUTPUT_PREFIX = '[safety] ' From 0ff6bd3601e67aaefb9bd8fd5c81c330d9141ac4 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 2 Jun 2022 23:28:20 -0400 Subject: [PATCH 6/9] suggestions and loglevel tweaks --- sopel/modules/safety.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sopel/modules/safety.py b/sopel/modules/safety.py index f203c54558..987ec84299 100644 --- a/sopel/modules/safety.py +++ b/sopel/modules/safety.py @@ -92,10 +92,10 @@ def setup(bot: Sopel): if bot.settings.safety.default_mode is None: bot.settings.safety.default_mode = "on" - # migrate from enabled_by_default to default_mode. remove in v8.1 or v9 + # migrate from enabled_by_default to default_mode. TODO: remove in v8.1 or v9 if not bot.settings.safety.enabled_by_default: bot.settings.safety.default_mode = "off" - LOGGER.info( + LOGGER.warning( "config: enabled_by_default is deprecated, please use default_mode=off", ) @@ -106,7 +106,7 @@ def setup(bot: Sopel): for item in bot.settings.safety.known_good: known_good.append(re.compile(item, re.I)) - # clean up old files + # clean up old files. TODO: remove in v8.1 or 9 old_file = os.path.join(bot.settings.homedir, "malwaredomains.txt") if os.path.exists(old_file) and os.path.isfile(old_file): LOGGER.info('Removing old malwaredomains file from %s', old_file) @@ -143,7 +143,7 @@ def download_domain_list(bot: Sopel, path: str) -> bool: if old_etag: r = requests.head(url) if r.headers["ETag"] == old_etag and os.path.isfile(path): - LOGGER.debug("Unsafe domain list unchanged, skipping") + LOGGER.info("Unsafe domain list unchanged, skipping") return False r = requests.get(url, stream=True) @@ -154,7 +154,7 @@ def download_domain_list(bot: Sopel, path: str) -> bool: f.write(data) except Exception: # don't bother handling, we'll try again tomorrow - LOGGER.warning("Unsafe domain list download failed, using cache") + LOGGER.warning("Unsafe domain list download failed; using cache") return False # .new+move so we don't clobber it if the download fails in the middle os.rename(path + ".new", path) @@ -457,7 +457,7 @@ def _clean_cache(bot: Sopel): update_local_cache(bot) if bot.memory['safety_cache_lock'].acquire(False): - LOGGER.info('Starting safety cache cleanup...') + LOGGER.debug('Starting safety cache cleanup...') cutoff = datetime.now(timezone.utc) - timedelta(days=7) try: # clean up by age first @@ -481,7 +481,7 @@ def _clean_cache(bot: Sopel): # No matter what errors happen (or not), release the lock bot.memory['safety_cache_lock'].release() - LOGGER.info('Safety cache cleanup finished.') + LOGGER.debug('Safety cache cleanup finished.') else: LOGGER.info( 'Skipping safety cache cleanup: Cache is locked, ' From 3215a5cf17798306a00d3e988b751dfb1a6dd18c Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Sat, 4 Jun 2022 01:30:26 -0400 Subject: [PATCH 7/9] [skip ci] mystery quotes --- sopel/modules/url.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sopel/modules/url.py b/sopel/modules/url.py index 04140233f4..75a6b84302 100644 --- a/sopel/modules/url.py +++ b/sopel/modules/url.py @@ -319,7 +319,7 @@ def title_auto(bot: SopelWrapper, trigger: Trigger): # Guard against responding to other instances of this bot. if message != trigger: bot.say(message) - bot.memory['last_seen_url'][trigger.sender] = url + bot.memory["last_seen_url"][trigger.sender] = url def process_urls( From 97797fbae3efe5cd2dd51e5507e31bb01c4d8f9a Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Sat, 4 Jun 2022 14:20:41 -0400 Subject: [PATCH 8/9] key constants, single lookup --- sopel/modules/safety.py | 43 ++++++++++++++++++++++------------------- sopel/modules/url.py | 10 +++++----- 2 files changed, 28 insertions(+), 25 deletions(-) diff --git a/sopel/modules/safety.py b/sopel/modules/safety.py index 987ec84299..4875428c61 100644 --- a/sopel/modules/safety.py +++ b/sopel/modules/safety.py @@ -34,6 +34,9 @@ LOGGER = logging.getLogger(__name__) PLUGIN_OUTPUT_PREFIX = '[safety] ' +SAFETY_CACHE_KEY = "safety_cache" +SAFETY_CACHE_LOCK_KEY = SAFETY_CACHE_KEY + "_lock" +SAFETY_CACHE_LOCAL_KEY = SAFETY_CACHE_KEY + "_local" SAFETY_MODES = ["off", "local", "local strict", "on", "strict"] VT_API_URL = "https://www.virustotal.com/api/v3/urls" CACHE_LIMIT = 512 @@ -99,10 +102,10 @@ def setup(bot: Sopel): "config: enabled_by_default is deprecated, please use default_mode=off", ) - if 'safety_cache' not in bot.memory: - bot.memory['safety_cache'] = tools.SopelMemory() - if 'safety_cache_lock' not in bot.memory: - bot.memory['safety_cache_lock'] = threading.Lock() + if SAFETY_CACHE_KEY not in bot.memory: + bot.memory[SAFETY_CACHE_KEY] = tools.SopelMemory() + if SAFETY_CACHE_LOCK_KEY not in bot.memory: + bot.memory[SAFETY_CACHE_LOCK_KEY] = threading.Lock() for item in bot.settings.safety.known_good: known_good.append(re.compile(item, re.I)) @@ -195,13 +198,13 @@ def update_local_cache(bot: Sopel, init: bool = False): if '.' in domain: # only publicly routable domains matter; skip loopback/link-local stuff unsafe_domains.add(domain) - bot.memory["safety_cache_local"] = unsafe_domains + bot.memory[SAFETY_CACHE_LOCAL_KEY] = unsafe_domains def shutdown(bot: Sopel): - bot.memory.pop('safety_cache', None) - bot.memory.pop('safety_cache_local', None) - bot.memory.pop('safety_cache_lock', None) + bot.memory.pop(SAFETY_CACHE_KEY, None) + bot.memory.pop(SAFETY_CACHE_LOCAL_KEY, None) + bot.memory.pop(SAFETY_CACHE_LOCK_KEY, None) @plugin.rule(r'(?u).*(https?://\S+).*') @@ -233,7 +236,7 @@ def url_handler(bot: SopelWrapper, trigger: Trigger): if any(regex.search(hostname) for regex in known_good): continue # explicitly trusted - if hostname in bot.memory["safety_cache_local"]: + if hostname in bot.memory[SAFETY_CACHE_LOCAL_KEY]: LOGGER.debug("[local] domain in blocklist: %r", hostname) positives += 1 total += 1 @@ -291,10 +294,10 @@ def virustotal_lookup( oldest_cache = datetime(1970, 1, 1, 0, 0, tzinfo=timezone.utc) if max_cache_age is not None: oldest_cache = datetime.now(timezone.utc) - max_cache_age - cache = bot.memory["safety_cache"] + cache = bot.memory[SAFETY_CACHE_KEY] if url in cache and cache[url]["fetched"] > oldest_cache: LOGGER.debug("[VirusTotal] Using cached data for %r", safe_url) - return bot.memory["safety_cache"].get(url) + return bot.memory[SAFETY_CACHE_KEY].get(url) if local_only: return None @@ -351,8 +354,8 @@ def virustotal_lookup( "fetched": fetched, "virustotal_data": vt_data["data"]["attributes"], } - bot.memory["safety_cache"][url] = result - if len(bot.memory["safety_cache"]) >= (2 * CACHE_LIMIT): + bot.memory[SAFETY_CACHE_KEY][url] = result + if len(bot.memory[SAFETY_CACHE_KEY]) >= (2 * CACHE_LIMIT): _clean_cache(bot) return result @@ -456,30 +459,30 @@ def _clean_cache(bot: Sopel): update_local_cache(bot) - if bot.memory['safety_cache_lock'].acquire(False): + if bot.memory[SAFETY_CACHE_LOCK_KEY].acquire(False): LOGGER.debug('Starting safety cache cleanup...') cutoff = datetime.now(timezone.utc) - timedelta(days=7) try: # clean up by age first old_keys = [] - for key, data in bot.memory['safety_cache'].items(): + for key, data in bot.memory[SAFETY_CACHE_KEY].items(): if data['fetched'] <= cutoff: old_keys.append(key) for key in old_keys: - bot.memory['safety_cache'].pop(key, None) + bot.memory[SAFETY_CACHE_KEY].pop(key, None) # clean up more values if the cache is still too big - overage = len(bot.memory["safety_cache"]) - CACHE_LIMIT + overage = len(bot.memory[SAFETY_CACHE_KEY]) - CACHE_LIMIT if overage > 0: extra_keys = sorted( (data.fetched, key) for (key, data) - in bot.memory['safety_cache'].items())[:overage] + in bot.memory[SAFETY_CACHE_KEY].items())[:overage] for (_, key) in extra_keys: - bot.memory['safety_cache'].pop(key, None) + bot.memory[SAFETY_CACHE_KEY].pop(key, None) finally: # No matter what errors happen (or not), release the lock - bot.memory['safety_cache_lock'].release() + bot.memory[SAFETY_CACHE_LOCK_KEY].release() LOGGER.debug('Safety cache cleanup finished.') else: diff --git a/sopel/modules/url.py b/sopel/modules/url.py index 75a6b84302..55a6d8c5ca 100644 --- a/sopel/modules/url.py +++ b/sopel/modules/url.py @@ -301,13 +301,13 @@ def title_auto(bot: SopelWrapper, trigger: Trigger): trigger, exclusion_char=bot.config.url.exclusion_char, clean=True) urls = [] + safety_cache = bot.memory.get("safety_cache", {}) + safety_cache_local = bot.memory.get("safety_cache_local", {}) for url in unchecked_urls: # Avoid fetching known malicious links - if url in bot.memory.get("safety_cache", {}): - if bot.memory["safety_cache"][url]["positives"] > 0: - continue - hostname = urlparse(url).hostname.lower() - if hostname in bot.memory.get("safety_cache_local", {}): + if url in safety_cache and safety_cache[url]["positives"] > 0: + continue + if urlparse(url).hostname.lower() in safety_cache_local: continue urls.append(url) From 8f5dcb8524e114c9f8021cbb37d08fec1da11239 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Sat, 4 Jun 2022 15:19:49 -0400 Subject: [PATCH 9/9] periods --- sopel/modules/safety.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sopel/modules/safety.py b/sopel/modules/safety.py index 4875428c61..8a1e51abff 100644 --- a/sopel/modules/safety.py +++ b/sopel/modules/safety.py @@ -211,7 +211,7 @@ def shutdown(bot: Sopel): @plugin.priority('high') @plugin.output_prefix(PLUGIN_OUTPUT_PREFIX) def url_handler(bot: SopelWrapper, trigger: Trigger): - """Checks for malicious URLs""" + """Checks for malicious URLs.""" mode = bot.db.get_channel_value( trigger.sender, "safety", @@ -279,7 +279,7 @@ def virustotal_lookup( :param url: The URL to look up :param local_only: If set, only check cache, do not make a new request - :param max_cache_age: If set, don't use cache older than this value. + :param max_cache_age: If set, don't use cache older than this value :returns: A dict containing information about findings, or None if not found """ if url.startswith("hxxp"): @@ -365,7 +365,7 @@ def virustotal_lookup( @plugin.example(".virustotal hxxps://malware.wicar.org/") @plugin.output_prefix("[safety][VirusTotal] ") def vt_command(bot: SopelWrapper, trigger: Trigger): - """Look up VT results on demand""" + """Look up VT results on demand.""" if not bot.settings.safety.vt_api_key: bot.reply("Sorry, I don't have a VirusTotal API key configured.") return @@ -421,7 +421,7 @@ def vt_command(bot: SopelWrapper, trigger: Trigger): @plugin.example(".safety on") @plugin.output_prefix(PLUGIN_OUTPUT_PREFIX) def toggle_safety(bot: SopelWrapper, trigger: Trigger): - """Set safety setting for channel""" + """Set safety setting for channel.""" if not trigger.admin and bot.channels[trigger.sender].privileges[trigger.nick] < plugin.OP: bot.reply('Only channel operators can change safety settings') return