From 9285f93953a276e2ef67ce9a11f7a9b3d1b5003f Mon Sep 17 00:00:00 2001 From: Giampaolo Rodola Date: Wed, 3 Jul 2024 00:25:32 +0200 Subject: [PATCH] use f-strings --- demo/anti_flood_ftpd.py | 6 +- demo/unix_daemon.py | 10 +- docs/conf.py | 4 +- pyftpdlib/__main__.py | 2 +- pyftpdlib/_asyncore.py | 18 +- pyftpdlib/authorizers.py | 26 +-- pyftpdlib/filesystems.py | 10 +- pyftpdlib/handlers.py | 236 +++++++++++++------------- pyftpdlib/ioloop.py | 34 ++-- pyftpdlib/log.py | 4 +- pyftpdlib/servers.py | 10 +- pyftpdlib/test/__init__.py | 24 ++- pyftpdlib/test/conftest.py | 8 +- pyftpdlib/test/test_authorizers.py | 20 +-- pyftpdlib/test/test_cli.py | 2 +- pyftpdlib/test/test_functional.py | 78 ++++----- pyftpdlib/test/test_functional_ssl.py | 4 +- pyproject.toml | 2 - scripts/internal/git_pre_commit.py | 10 +- scripts/internal/print_announce.py | 2 +- scripts/internal/winmake.py | 64 +++---- setup.py | 2 +- 22 files changed, 280 insertions(+), 296 deletions(-) diff --git a/demo/anti_flood_ftpd.py b/demo/anti_flood_ftpd.py index 87877f7a..b19c9a48 100755 --- a/demo/anti_flood_ftpd.py +++ b/demo/anti_flood_ftpd.py @@ -51,8 +51,8 @@ def process_command(self, *args, **kwargs): def ban(self, ip): # ban ip and schedule next un-ban if ip not in self.banned_ips: - self.log('banned IP %s for command flooding' % ip) - self.respond('550 You are banned for %s seconds.' % self.ban_for) + self.log(f'banned IP {ip} for command flooding') + self.respond(f'550 You are banned for {self.ban_for} seconds.') self.close() self.banned_ips.append(ip) @@ -63,7 +63,7 @@ def unban(self, ip): except ValueError: pass else: - self.log('unbanning IP %s' % ip) + self.log(f'unbanning IP {ip}') def close(self): FTPHandler.close(self) diff --git a/demo/unix_daemon.py b/demo/unix_daemon.py index ca88722a..3057545c 100755 --- a/demo/unix_daemon.py +++ b/demo/unix_daemon.py @@ -87,12 +87,12 @@ def stop(): try: os.kill(pid, sig) except ProcessLookupError: - print("\nstopped (pid %s)" % pid) + print(f"\nstopped (pid {pid})") i += 1 if i == 25: sig = signal.SIGKILL elif i == 50: - sys.exit("\ncould not kill daemon (pid %s)" % pid) + sys.exit(f"\ncould not kill daemon (pid {pid})") time.sleep(0.1) @@ -102,7 +102,7 @@ def status(): if not pid or not pid_exists(pid): print("daemon not running") else: - print("daemon running with pid %s" % pid) + print(f"daemon running with pid {pid}") sys.exit(0) @@ -148,12 +148,12 @@ def _daemonize(): # write pidfile pid = str(os.getpid()) with open(PID_FILE, 'w') as f: - f.write("%s\n" % pid) + f.write(f"{pid}\n") atexit.register(lambda: os.remove(PID_FILE)) pid = get_pid() if pid and pid_exists(pid): - sys.exit('daemon already running (pid %s)' % pid) + sys.exit(f'daemon already running (pid {pid})') # instance FTPd before daemonizing, so that in case of problems we # get an exception here and exit immediately server = get_server() diff --git a/docs/conf.py b/docs/conf.py index bf9ca828..b2abbb5d 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -86,7 +86,7 @@ def get_version(): # General information about the project. project = PROJECT_NAME -copyright = '2009-%s, %s' % (THIS_YEAR, AUTHOR) +copyright = f'2009-{THIS_YEAR}, {AUTHOR}' author = AUTHOR # The version info for the project you're documenting, acts as replacement for @@ -269,7 +269,7 @@ def get_version(): # html_search_scorer = 'scorer.js' # Output file base name for HTML help builder. -htmlhelp_basename = '%s-doc' % PROJECT_NAME +htmlhelp_basename = f'{PROJECT_NAME}-doc' # -- Options for LaTeX output --------------------------------------------- diff --git a/pyftpdlib/__main__.py b/pyftpdlib/__main__.py index 6167207c..a0bcea36 100644 --- a/pyftpdlib/__main__.py +++ b/pyftpdlib/__main__.py @@ -111,7 +111,7 @@ def main(args=None): options = parser.parse_args(args=args) if options.version: - sys.exit("pyftpdlib %s" % __ver__) + sys.exit(f"pyftpdlib {__ver__}") if options.debug: config_logging(level=logging.DEBUG) diff --git a/pyftpdlib/_asyncore.py b/pyftpdlib/_asyncore.py index 0bb4df11..4f213420 100644 --- a/pyftpdlib/_asyncore.py +++ b/pyftpdlib/_asyncore.py @@ -60,7 +60,7 @@ def _strerror(err): except (ValueError, OverflowError, NameError): if err in errorcode: return errorcode[err] - return "Unknown error %s" % err + return f"Unknown error {err}" _reraised_exceptions = (KeyboardInterrupt, SystemExit) @@ -244,7 +244,7 @@ def __repr__(self): status.append('%s:%d' % self.addr) except TypeError: status.append(repr(self.addr)) - return '<%s at %#x>' % (' '.join(status), id(self)) + return '<%s at %#x>' % (' '.join(status), id(self)) # noqa def add_channel(self, map=None): # self.log_info('adding channel %s' % self) @@ -369,11 +369,11 @@ def close(self): raise def log(self, message): - sys.stderr.write('log: %s\n' % str(message)) + sys.stderr.write(f'log: {str(message)}\n') def log_info(self, message, type='info'): if type not in self.ignore_log_types: - print('%s: %s' % (type, message)) # noqa + print(f'{type}: {message}') # noqa def handle_read_event(self): if self.accepting: @@ -415,10 +415,10 @@ def handle_error(self): try: self_repr = repr(self) except Exception: - self_repr = '<__repr__(self) failed for object at %0x>' % id(self) + self_repr = f'<__repr__(self) failed for object at {id(self):0x}>' self.log_info( - 'uncaptured python exception, closing channel %s (%s:%s %s)' + 'uncaptured python exception, closing channel %s (%s:%s %s)' # noqa % (self_repr, t, v, tbinfo), 'error', ) @@ -469,7 +469,7 @@ def writable(self): def send(self, data): if self.debug: - self.log_info('sending %s' % repr(data)) + self.log_info(f'sending {repr(data)}') self.out_buffer = self.out_buffer + data self.initiate_send() @@ -498,7 +498,7 @@ def compact_traceback(): del tb file, function, line = tbinfo[-1] - info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo]) + info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo]) # noqa: UP031 return (file, function, line), t, v, info @@ -534,7 +534,7 @@ def __init__(self, fd): def __del__(self): if self.fd >= 0: warnings.warn( - "unclosed file %r" % self, + f"unclosed file {self!r}", ResourceWarning, source=self, stacklevel=2, diff --git a/pyftpdlib/authorizers.py b/pyftpdlib/authorizers.py index 4e1b05ab..772acfc9 100644 --- a/pyftpdlib/authorizers.py +++ b/pyftpdlib/authorizers.py @@ -104,9 +104,9 @@ def add_user( provide customized response strings when user log-in and quit. """ if self.has_user(username): - raise ValueError('user %r already exists' % username) + raise ValueError(f'user {username!r} already exists') if not os.path.isdir(homedir): - raise ValueError('no such directory: %r' % homedir) + raise ValueError(f'no such directory: {homedir!r}') homedir = os.path.realpath(homedir) self._check_permissions(username, perm) dic = { @@ -145,7 +145,7 @@ def override_perm(self, username, directory, perm, recursive=False): """Override permissions for a given directory.""" self._check_permissions(username, perm) if not os.path.isdir(directory): - raise ValueError('no such directory: %r' % directory) + raise ValueError(f'no such directory: {directory!r}') directory = os.path.normcase(os.path.realpath(directory)) home = os.path.normcase(self.get_home_dir(username)) if directory == home: @@ -242,7 +242,7 @@ def _check_permissions(self, username, perm): warned = 0 for p in perm: if p not in self.read_perms + self.write_perms: - raise ValueError('no such permission %r' % p) + raise ValueError(f'no such permission {p!r}') if ( username == 'anonymous' and p in self.write_perms @@ -305,15 +305,15 @@ def __init__(self): if user == 'anonymous': raise AuthorizerError('invalid username "anonymous"') if user not in users: - raise AuthorizerError('unknown user %s' % user) + raise AuthorizerError(f'unknown user {user}') if self.anonymous_user is not None: if not self.has_user(self.anonymous_user): - raise AuthorizerError('no such user %s' % self.anonymous_user) + raise AuthorizerError(f'no such user {self.anonymous_user}') home = self.get_home_dir(self.anonymous_user) if not os.path.isdir(home): raise AuthorizerError( - 'no valid home set for user %s' % self.anonymous_user + f'no valid home set for user {self.anonymous_user}' ) def override_user( @@ -339,13 +339,13 @@ def override_user( "at least one keyword argument must be specified" ) if self.allowed_users and username not in self.allowed_users: - raise AuthorizerError('%s is not an allowed user' % username) + raise AuthorizerError(f'{username} is not an allowed user') if self.rejected_users and username in self.rejected_users: - raise AuthorizerError('%s is not an allowed user' % username) + raise AuthorizerError(f'{username} is not an allowed user') if username == "anonymous" and password: raise AuthorizerError("can't assign password to anonymous user") if not self.has_user(username): - raise AuthorizerError('no such user %s' % username) + raise AuthorizerError(f'no such user {username}') if username in self._dummy_authorizer.user_table: # re-set parameters @@ -426,7 +426,7 @@ def __init__(self, anonymous_user=None): try: pwd.getpwnam(self.anonymous_user).pw_dir # noqa except KeyError: - raise AuthorizerError('no such user %s' % anonymous_user) + raise AuthorizerError(f'no such user {anonymous_user}') # --- overridden / private API @@ -586,7 +586,7 @@ def __init__( for username in self.allowed_users: if not self._has_valid_shell(username): raise AuthorizerError( - "user %s has not a valid shell" % username + f"user {username} has not a valid shell" ) def override_user( @@ -757,7 +757,7 @@ def get_home_dir(self, username): key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, path) except OSError: raise AuthorizerError( - "No profile directory defined for user %s" % username + f"No profile directory defined for user {username}" ) value = winreg.QueryValueEx(key, "ProfileImagePath")[0] home = win32api.ExpandEnvironmentStrings(value) diff --git a/pyftpdlib/filesystems.py b/pyftpdlib/filesystems.py index 9a9d7ac4..351e59f1 100644 --- a/pyftpdlib/filesystems.py +++ b/pyftpdlib/filesystems.py @@ -441,7 +441,7 @@ def get_group_by_gid(gid): # https://github.com/giampaolo/pyftpdlib/issues/187 fmtstr = '%d %Y' if now - st.st_mtime > SIX_MONTHS else '%d %H:%M' try: - mtimestr = "%s %s" % ( + mtimestr = "%s %s" % ( # noqa: UP031 _months_map[mtime.tm_mon], time.strftime(fmtstr, mtime), ) @@ -450,7 +450,7 @@ def get_group_by_gid(gid): # old (prior to year 1900) in which case we return # the current time as last mtime. mtime = timefunc() - mtimestr = "%s %s" % ( + mtimestr = "%s %s" % ( # noqa: UP031 _months_map[mtime.tm_mon], time.strftime("%d %H:%M", mtime), ) @@ -590,13 +590,13 @@ def format_mlsx(self, basedir, listing, perms, facts, ignore_err=True): # platforms should use some platform-specific method (e.g. # on Windows NTFS filesystems MTF records could be used). if show_unique: - retfacts['unique'] = "%xg%x" % (st.st_dev, st.st_ino) + retfacts['unique'] = f"{st.st_dev:x}g{st.st_ino:x}" # facts can be in any order but we sort them by name factstring = "".join( - ["%s=%s;" % (x, retfacts[x]) for x in sorted(retfacts.keys())] + [f"{x}={retfacts[x]};" for x in sorted(retfacts.keys())] ) - line = "%s %s\r\n" % (factstring, basename) + line = f"{factstring} {basename}\r\n" yield line.encode('utf8', self.cmd_channel.unicode_errors) diff --git a/pyftpdlib/handlers.py b/pyftpdlib/handlers.py index a0040dc7..8a51fe34 100644 --- a/pyftpdlib/handlers.py +++ b/pyftpdlib/handlers.py @@ -438,7 +438,7 @@ def __init__(self, cmd_channel, extmode=False): self.bind((local_ip, port)) except PermissionError: self.cmd_channel.log( - "ignoring EPERM when bind()ing port %s" % port, + f"ignoring EPERM when bind()ing port {port}", logfun=logger.debug, ) except OSError as err: @@ -487,7 +487,7 @@ def __init__(self, cmd_channel, extmode=False): self.cmd_channel.respond(resp) else: self.cmd_channel.respond( - '229 Entering extended passive mode (|||%d|).' % port + f'229 Entering extended passive mode (|||{int(port)}|).' ) if self.timeout: self.call_later(self.timeout, self.handle_timeout) @@ -510,7 +510,7 @@ def handle_accepted(self, sock, addr): pass msg = ( '425 Rejected data connection from foreign address ' - + '%s:%s.' % (addr[0], addr[1]) + f'{addr[0]}:{addr[1]}.' ) self.cmd_channel.respond_w_warning(msg) # do not close listening socket: it couldn't be client's blame @@ -519,7 +519,7 @@ def handle_accepted(self, sock, addr): # site-to-site FTP allowed msg = ( 'Established data connection with foreign address ' - + '%s:%s.' % (addr[0], addr[1]) + f'{addr[0]}:{addr[1]}.' ) self.cmd_channel.log(msg, logfun=logger.warning) # Immediately close the current channel (we accept only one @@ -586,10 +586,10 @@ def __init__(self, ip, port, cmd_channel): if ip.count('.') == 3: self._cmd = "PORT" - self._normalized_addr = "%s:%s" % (ip, port) + self._normalized_addr = f"{ip}:{port}" else: self._cmd = "EPRT" - self._normalized_addr = "[%s]:%s" % (ip, port) + self._normalized_addr = f"[{ip}]:{port}" source_ip = self.cmd_channel.socket.getsockname()[0] # dual stack IPv4/IPv6 support @@ -737,9 +737,9 @@ def __init__(self, sock, cmd_channel): ) def __repr__(self): - return '<%s(%s)>' % ( + return '<%s(%s)>' % ( # noqa: UP031 self.__class__.__name__, - self.cmd_channel.get_repr_info(as_str=True), + self.cmd_channel.get_repr_info(as_str=True), # noqa: UP031 ) __str__ = __repr__ @@ -978,7 +978,7 @@ def handle_error(self): self.log_exception(self) error = "Internal error" try: - self._resp = ("426 %s; transfer aborted." % error, logger.warning) + self._resp = (f"426 {error}; transfer aborted.", logger.warning) self.close() except Exception: logger.critical(traceback.format_exc()) @@ -1002,8 +1002,10 @@ def handle_close(self): else: tot_bytes = self.get_transmitted_bytes() self._resp = ( - "426 Transfer aborted; %d bytes transmitted." - % tot_bytes, + ( + f"426 Transfer aborted; {int(tot_bytes)} bytes" + " transmitted." + ), logger.debug, ) finally: @@ -1323,7 +1325,7 @@ class FTPHandler(AsyncChat): # session attributes (explained in the docstring) timeout = 300 - banner = "pyftpdlib %s ready." % __ver__ + banner = f"pyftpdlib {__ver__} ready." max_login_attempts = 3 permit_foreign_addresses = False permit_privileged_ports = False @@ -1392,7 +1394,7 @@ def __init__(self, conn, server, ioloop=None): # https://github.com/giampaolo/pyftpdlib/issues/188 AsyncChat.__init__(self, socket.socket(), ioloop=ioloop) self.close() - debug("call: FTPHandler.__init__, err %r" % err, self) + debug(f"call: FTPHandler.__init__, err {err!r}", self) if err.errno == errno.EINVAL: # https://github.com/giampaolo/pyftpdlib/issues/143 return @@ -1405,7 +1407,7 @@ def __init__(self, conn, server, ioloop=None): self.remote_ip, self.remote_port = self.socket.getpeername()[:2] except OSError as err: debug( - "call: FTPHandler.__init__, err on getpeername() %r" % err, + f"call: FTPHandler.__init__, err on getpeername() {err!r}", self, ) # A race condition may occur if the other end is closing @@ -1425,7 +1427,7 @@ def __init__(self, conn, server, ioloop=None): self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_OOBINLINE, 1) except OSError as err: debug( - "call: FTPHandler.__init__, err on SO_OOBINLINE %r" % err, self + f"call: FTPHandler.__init__, err on SO_OOBINLINE {err!r}", self ) # disable Nagle algorithm for the control socket only, resulting @@ -1435,7 +1437,7 @@ def __init__(self, conn, server, ioloop=None): self.socket.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) except OSError as err: debug( - "call: FTPHandler.__init__, err on TCP_NODELAY %r" % err, + f"call: FTPHandler.__init__, err on TCP_NODELAY {err!r}", self, ) @@ -1454,7 +1456,7 @@ def get_repr_info(self, as_str=False, extra_info=None): extra_info = {} info = {} info['id'] = id(self) - info['addr'] = "%s:%s" % (self.remote_ip, self.remote_port) + info['addr'] = f"{self.remote_ip}:{self.remote_port}" if _is_ssl_sock(self.socket): info['ssl'] = True if self.username: @@ -1474,11 +1476,11 @@ def get_repr_info(self, as_str=False, extra_info=None): info['bytes-trans'] = dc.get_transmitted_bytes() info.update(extra_info) if as_str: - return ', '.join(['%s=%r' % (k, v) for (k, v) in info.items()]) + return ', '.join([f'{k}={v!r}' for (k, v) in info.items()]) return info def __repr__(self): - return '<%s(%s)>' % (self.__class__.__name__, self.get_repr_info(True)) + return f'<{self.__class__.__name__}({self.get_repr_info(True)})>' __str__ = __repr__ @@ -1489,9 +1491,9 @@ def handle(self): self.on_connect() if not self._closed and not self._closing: if len(self.banner) <= 75: - self.respond("220 %s" % str(self.banner)) + self.respond(f"220 {str(self.banner)}") else: - self.push('220-%s\r\n' % str(self.banner)) + self.push(f'220-{str(self.banner)}\r\n') self.respond('220 ') def handle_max_cons(self): @@ -1574,20 +1576,20 @@ def found_terminator(self): self.pre_process_command(line, cmd, arg) except UnicodeEncodeError: self.respond( - "501 can't decode path (server filesystem encoding is %s)" - % sys.getfilesystemencoding() + "501 can't decode path (server filesystem encoding is" + f" {sys.getfilesystemencoding()})" ) def pre_process_command(self, line, cmd, arg): kwargs = {} if cmd == "SITE" and arg: - cmd = "SITE %s" % arg.split(' ')[0].upper() + cmd = f"SITE {arg.split(' ')[0].upper()}" arg = line[len(cmd) + 1 :] if cmd != 'PASS': - self.logline("<- %s" % line) + self.logline(f"<- {line}") else: - self.logline("<- %s %s" % (line.split(' ')[0], '*' * 6)) + self.logline(f"<- {line.split(' ')[0]} {'*' * 6}") # Recognize those commands having a "special semantic". They # should be sent by following the RFC-959 procedure of sending @@ -1598,7 +1600,7 @@ def pre_process_command(self, line, cmd, arg): if cmd[-4:] in ('ABOR', 'STAT', 'QUIT'): cmd = cmd[-4:] else: - msg = 'Command "%s" not understood.' % cmd + msg = f'Command "{cmd}" not understood.' self.respond('500 ' + msg) if cmd: self.log_cmd(cmd, arg, 500, msg) @@ -1674,9 +1676,9 @@ def pre_process_command(self, line, cmd, arg): if not self.fs.validpath(arg): line = self.fs.fs2ftp(arg) - msg = "%r points to a path which is outside " % line + msg = f"{line!r} points to a path which is outside " msg += "the user's root directory" - self.respond("550 %s." % msg) + self.respond(f"550 {msg}.") self.log_cmd(cmd, arg, 550, msg) return @@ -1890,7 +1892,7 @@ def respond(self, resp, logfun=logger.debug): self._last_response = resp self.push(resp + '\r\n') if self._log_debug: - self.logline('-> %s' % resp, logfun=logfun) + self.logline(f'-> {resp}', logfun=logfun) else: self.log(resp[4:], logfun=logfun) @@ -1976,7 +1978,7 @@ def run_as_current_user(self, function, *args, **kwargs): def log(self, msg, logfun=logger.info): """Log a message, including additional identifying session data.""" prefix = self.log_prefix % self.__dict__ - logfun("%s %s" % (prefix, msg)) + logfun(f"{prefix} {msg}") def logline(self, msg, logfun=logger.debug): """Log a line including additional identifying session data. @@ -1984,12 +1986,12 @@ def logline(self, msg, logfun=logger.debug): """ if self._log_debug: prefix = self.log_prefix % self.__dict__ - logfun("%s %s" % (prefix, msg)) + logfun(f"{prefix} {msg}") def logerror(self, msg): """Log an error including additional identifying session data.""" prefix = self.log_prefix % self.__dict__ - logger.error("%s %s" % (prefix, msg)) + logger.error(f"{prefix} {msg}") def log_exception(self, instance): """Log an unhandled exception. 'instance' is the instance @@ -2043,9 +2045,9 @@ def log_cmd(self, cmd, arg, respcode, respstr): further commands. """ if not self._log_debug and cmd in self.log_cmds_list: - line = '%s %s' % (' '.join([cmd, arg]).strip(), respcode) + line = f"{' '.join([cmd, arg]).strip()} {respcode}" if str(respcode)[0] in ('4', '5'): - line += ' %r' % respstr + line += f' {respstr!r}' self.log(line) def log_transfer(self, cmd, filename, receive, completed, elapsed, bytes): @@ -2070,7 +2072,7 @@ def log_transfer(self, cmd, filename, receive, completed, elapsed, bytes): - (int) bytes: number of bytes transmitted. """ - line = '%s %s completed=%s bytes=%s seconds=%s' % ( + line = '%s %s completed=%s bytes=%s seconds=%s' % ( # noqa cmd, filename, completed and 1 or 0, @@ -2099,9 +2101,8 @@ def _make_eport(self, ip, port): # common IPv4 address. remote_ip = remote_ip[7:] if not self.permit_foreign_addresses and ip != remote_ip: - msg = "501 Rejected data connection to foreign address %s:%s." % ( - ip, - port, + msg = ( + f"501 Rejected data connection to foreign address {ip}:{port}." ) self.respond_w_warning(msg) return @@ -2109,7 +2110,7 @@ def _make_eport(self, ip, port): # ...another RFC-2577 recommendation is rejecting connections # to privileged ports (< 1024) for security reasons. if not self.permit_privileged_ports and port < 1024: - msg = '501 PORT against the privileged port "%s" refused.' % port + msg = f'501 PORT against the privileged port "{port}" refused.' self.respond_w_warning(msg) return @@ -2283,9 +2284,9 @@ def ftp_QUIT(self, line): else: msg_quit = "Goodbye." if len(msg_quit) <= 75: - self.respond("221 %s" % msg_quit) + self.respond(f"221 {msg_quit}") else: - self.push("221-%s\r\n" % msg_quit) + self.push(f"221-{msg_quit}\r\n") self.respond("221 ") # From RFC-959: @@ -2324,7 +2325,7 @@ def ftp_LIST(self, path): iterator = self.fs.format_list(basedir, [filename]) except (OSError, FilesystemError) as err: why = _strerror(err) - self.respond('550 %s.' % why) + self.respond(f'550 {why}.') else: producer = BufferedIteratorProducer(iterator) self.push_dtp_data(producer, isproducer=True, cmd="LIST") @@ -2343,7 +2344,7 @@ def ftp_NLST(self, path): self.fs.lstat(path) # raise exc in case of problems listing = [os.path.basename(path)] except (OSError, FilesystemError) as err: - self.respond('550 %s.' % _strerror(err)) + self.respond(f'550 {_strerror(err)}.') else: data = '' if listing: @@ -2380,14 +2381,14 @@ def ftp_MLST(self, path): ) data = b''.join(iterator) except (OSError, FilesystemError) as err: - self.respond('550 %s.' % _strerror(err)) + self.respond(f'550 {_strerror(err)}.') else: data = data.decode('utf8', self.unicode_errors) # since TVFS is supported (see RFC-3659 chapter 6), a fully # qualified pathname should be returned - data = data.split(' ')[0] + ' %s\r\n' % line + data = data.split(' ')[0] + f' {line}\r\n' # response is expected on the command channel - self.push('250-Listing "%s":\r\n' % line) + self.push(f'250-Listing "{line}":\r\n') # the fact set must be preceded by a space self.push(' ' + data) self.respond('250 End MLST.') @@ -2406,7 +2407,7 @@ def ftp_MLSD(self, path): listing = self.run_as_current_user(self.fs.listdir, path) except (OSError, FilesystemError) as err: why = _strerror(err) - self.respond('550 %s.' % why) + self.respond(f'550 {why}.') else: perms = self.authorizer.get_perms(self.username) iterator = self.fs.format_mlsx( @@ -2426,7 +2427,7 @@ def ftp_RETR(self, file): fd = self.run_as_current_user(self.fs.open, file, 'rb') except (OSError, FilesystemError) as err: why = _strerror(err) - self.respond('550 %s.' % why) + self.respond(f'550 {why}.') return try: @@ -2444,15 +2445,12 @@ def ftp_RETR(self, file): fd.seek(rest_pos) ok = 1 except ValueError: - why = "REST position (%s) > file size (%s)" % ( - rest_pos, - fsize, - ) + why = f"REST position ({rest_pos}) > file size ({fsize})" except (OSError, FilesystemError) as err: why = _strerror(err) if not ok: fd.close() - self.respond('554 %s' % why) + self.respond(f'554 {why}') return producer = FileProducer(fd, self._current_type) self.push_dtp_data(producer, isproducer=True, file=fd, cmd="RETR") @@ -2479,7 +2477,7 @@ def ftp_STOR(self, file, mode='w'): fd = self.run_as_current_user(self.fs.open, file, mode + 'b') except (OSError, FilesystemError) as err: why = _strerror(err) - self.respond('550 %s.' % why) + self.respond(f'550 {why}.') return try: @@ -2497,15 +2495,12 @@ def ftp_STOR(self, file, mode='w'): fd.seek(rest_pos) ok = 1 except ValueError: - why = "REST position (%s) > file size (%s)" % ( - rest_pos, - fsize, - ) + why = f"REST position ({rest_pos}) > file size ({fsize})" except (OSError, FilesystemError) as err: why = _strerror(err) if not ok: fd.close() - self.respond('554 %s' % why) + self.respond(f'554 {why}') return if self.data_channel is not None: @@ -2559,7 +2554,7 @@ def ftp_STOU(self, line): # something else happened else: why = _strerror(err) - self.respond("450 %s." % why) + self.respond(f"450 {why}.") return try: @@ -2575,11 +2570,11 @@ def ftp_STOU(self, line): # now just acts like STOR except that restarting isn't allowed filename = os.path.basename(fd.name) if self.data_channel is not None: - self.respond("125 FILE: %s" % filename) + self.respond(f"125 FILE: {filename}") self.data_channel.file_obj = fd self.data_channel.enable_receiving(self._current_type, "STOU") else: - self.respond("150 FILE: %s" % filename) + self.respond(f"150 FILE: {filename}") self._in_dtp_queue = (fd, "STOU") return filename except Exception: @@ -2608,7 +2603,7 @@ def ftp_REST(self, line): except (ValueError, OverflowError): self.respond("501 Invalid parameter.") else: - self.respond("350 Restarting at position %s." % marker) + self.respond(f"350 Restarting at position {marker}.") self._restart_position = marker def ftp_ABOR(self, line): @@ -2671,7 +2666,7 @@ def ftp_USER(self, line): # login sequence again. self.flush_account() msg = 'Previous account information was flushed' - self.respond('331 %s, send password.' % msg, logfun=logger.info) + self.respond(f'331 {msg}, send password.', logfun=logger.info) self.username = line def handle_auth_failed(self, msg, password): @@ -2685,7 +2680,7 @@ def callback(username, password, msg): self.close_when_done() else: self.respond("530 " + msg) - self.log("USER '%s' failed login." % username) + self.log(f"USER '{username}' failed login.") self.on_login_failed(username, password) self.del_channel() @@ -2709,11 +2704,11 @@ def callback(username, password, msg): def handle_auth_success(self, home, password, msg_login): if len(msg_login) <= 75: - self.respond('230 %s' % msg_login) + self.respond(f'230 {msg_login}') else: - self.push("230-%s\r\n" % msg_login) + self.push(f"230-{msg_login}\r\n") self.respond("230 ") - self.log("USER '%s' logged in." % self.username) + self.log(f"USER '{self.username}' logged in.") self.authenticated = True self.password = password self.attempted_logins = 0 @@ -2762,7 +2757,8 @@ def ftp_PWD(self, line): # they must be doubled (see RFC-959, chapter 7, appendix 2). cwd = self.fs.cwd self.respond( - '257 "%s" is the current directory.' % cwd.replace('"', '""') + '257 "%s" is the current directory.' # noqa: UP031 + % cwd.replace('"', '""') # noqa ) def ftp_CWD(self, path): @@ -2781,10 +2777,10 @@ def ftp_CWD(self, path): self.run_as_current_user(self.fs.chdir, path) except (OSError, FilesystemError) as err: why = _strerror(err) - self.respond('550 %s.' % why) + self.respond(f'550 {why}.') else: cwd = self.fs.cwd - self.respond('250 "%s" is the current directory.' % cwd) + self.respond(f'250 "{cwd}" is the current directory.') if os.getcwd() != init_cwd: os.chdir(init_cwd) return path @@ -2816,19 +2812,19 @@ def ftp_SIZE(self, path): line = self.fs.fs2ftp(path) if self._current_type == 'a': why = "SIZE not allowed in ASCII mode" - self.respond("550 %s." % why) + self.respond(f"550 {why}.") return if not self.fs.isfile(self.fs.realpath(path)): - why = "%s is not retrievable" % line - self.respond("550 %s." % why) + why = f"{line} is not retrievable" + self.respond(f"550 {why}.") return try: size = self.run_as_current_user(self.fs.getsize, path) except (OSError, FilesystemError) as err: why = _strerror(err) - self.respond('550 %s.' % why) + self.respond(f'550 {why}.') else: - self.respond("213 %s" % size) + self.respond(f"213 {size}") def ftp_MDTM(self, path): """Return last modification time of file to the client as an ISO @@ -2837,7 +2833,7 @@ def ftp_MDTM(self, path): """ line = self.fs.fs2ftp(path) if not self.fs.isfile(self.fs.realpath(path)): - self.respond("550 %s is not retrievable" % line) + self.respond(f"550 {line} is not retrievable") return timefunc = time.gmtime if self.use_gmt_times else time.localtime try: @@ -2850,9 +2846,9 @@ def ftp_MDTM(self, path): why = "Can't determine file's last modification time" else: why = _strerror(err) - self.respond('550 %s.' % why) + self.respond(f'550 {why}.') else: - self.respond("213 %s" % lmt) + self.respond(f"213 {lmt}") return path def ftp_MFMT(self, path, timeval): @@ -2869,10 +2865,10 @@ def ftp_MFMT(self, path, timeval): if len(timeval) != len("YYYYMMDDHHMMSS"): why = "Invalid time format; expected: YYYYMMDDHHMMSS" - self.respond('550 %s.' % why) + self.respond(f'550 {why}.') return if not self.fs.isfile(self.fs.realpath(path)): - self.respond("550 %s is not retrievable" % line) + self.respond(f"550 {line} is not retrievable") return timefunc = time.gmtime if self.use_gmt_times else time.localtime try: @@ -2882,7 +2878,7 @@ def ftp_MFMT(self, path, timeval): timeval_secs = (timeval_datetime_obj - epoch).total_seconds() except ValueError: why = "Invalid time format; expected: YYYYMMDDHHMMSS" - self.respond('550 %s.' % why) + self.respond(f'550 {why}.') return try: # Modify Time @@ -2897,9 +2893,9 @@ def ftp_MFMT(self, path, timeval): why = "Can't determine file's last modification time" else: why = _strerror(err) - self.respond('550 %s.' % why) + self.respond(f'550 {why}.') else: - self.respond("213 Modify=%s; %s." % (lmt, line)) + self.respond(f"213 Modify={lmt}; {line}.") return (lmt, path) def ftp_MKD(self, path): @@ -2911,13 +2907,13 @@ def ftp_MKD(self, path): self.run_as_current_user(self.fs.mkdir, path) except (OSError, FilesystemError) as err: why = _strerror(err) - self.respond('550 %s.' % why) + self.respond(f'550 {why}.') else: # The 257 response is supposed to include the directory # name and in case it contains embedded double-quotes # they must be doubled (see RFC-959, chapter 7, appendix 2). self.respond( - '257 "%s" directory created.' % line.replace('"', '""') + '257 "%s" directory created.' % line.replace('"', '""') # noqa ) return path @@ -2927,13 +2923,13 @@ def ftp_RMD(self, path): """ if self.fs.realpath(path) == self.fs.realpath(self.fs.root): msg = "Can't remove root directory." - self.respond("550 %s" % msg) + self.respond(f"550 {msg}") return try: self.run_as_current_user(self.fs.rmdir, path) except (OSError, FilesystemError) as err: why = _strerror(err) - self.respond('550 %s.' % why) + self.respond(f'550 {why}.') else: self.respond("250 Directory removed.") @@ -2945,7 +2941,7 @@ def ftp_DELE(self, path): self.run_as_current_user(self.fs.remove, path) except (OSError, FilesystemError) as err: why = _strerror(err) - self.respond('550 %s.' % why) + self.respond(f'550 {why}.') else: self.respond("250 File removed.") return path @@ -2975,7 +2971,7 @@ def ftp_RNTO(self, path): self.run_as_current_user(self.fs.rename, src, path) except (OSError, FilesystemError) as err: why = _strerror(err) - self.respond('550 %s.' % why) + self.respond(f'550 {why}.') else: self.respond("250 Renaming ok.") return (src, path) @@ -2992,7 +2988,7 @@ def ftp_TYPE(self, line): self.respond("200 Type set to: Binary.") self._current_type = 'i' else: - self.respond('504 Unsupported type "%s".' % line) + self.respond(f'504 Unsupported type "{line}".') def ftp_STRU(self, line): """Set file structure ("F" is the only one supported (noop)).""" @@ -3043,16 +3039,18 @@ def ftp_STAT(self, path): # return STATus information about ftpd if not path: s = [] - s.append('Connected to: %s:%s' % self.socket.getsockname()[:2]) + s.append( + 'Connected to: %s:%s' % self.socket.getsockname()[:2] # noqa + ) if self.authenticated: - s.append('Logged in as: %s' % self.username) + s.append(f'Logged in as: {self.username}') else: if not self.username: s.append("Waiting for username.") else: s.append("Waiting for password.") type = 'ASCII' if self._current_type == 'a' else 'Binary' - s.append("TYPE: %s; STRUcture: File; MODE: Stream" % type) + s.append(f"TYPE: {type}; STRUcture: File; MODE: Stream") if self._dtp_acceptor is not None: s.append('Passive data channel waiting for connection.') elif self.data_channel is not None: @@ -3060,14 +3058,14 @@ def ftp_STAT(self, path): bytes_recv = self.data_channel.tot_bytes_received elapsed_time = self.data_channel.get_elapsed_time() s.append('Data connection open:') - s.append('Total bytes sent: %s' % bytes_sent) - s.append('Total bytes received: %s' % bytes_recv) - s.append('Transfer elapsed time: %s secs' % elapsed_time) + s.append(f'Total bytes sent: {bytes_sent}') + s.append(f'Total bytes received: {bytes_recv}') + s.append(f'Transfer elapsed time: {elapsed_time} secs') else: s.append('Data connection closed.') self.push('211-FTP server status:\r\n') - self.push(''.join([' %s\r\n' % item for item in s])) + self.push(''.join([f' {item}\r\n' for item in s])) self.respond('211 End of status.') # return directory LISTing over the command channel else: @@ -3086,9 +3084,9 @@ def ftp_STAT(self, path): iterator = self.fs.format_list(basedir, [filename]) except (OSError, FilesystemError) as err: why = _strerror(err) - self.respond('550 %s.' % why) + self.respond(f'550 {why}.') else: - self.push('213-Status of "%s":\r\n' % line) + self.push(f'213-Status of "{line}":\r\n') self.push_with_producer(BufferedIteratorProducer(iterator)) self.respond('213 End of status.') return path @@ -3116,7 +3114,7 @@ def ftp_FEAT(self, line): features.add('REST STREAM') features = sorted(features) self.push("211-Features supported:\r\n") - self.push("".join([" %s\r\n" % x for x in features])) + self.push("".join([f" {x}\r\n" for x in features])) self.respond('211 End FEAT.') def ftp_OPTS(self, line): @@ -3132,9 +3130,9 @@ def ftp_OPTS(self, line): cmd, arg = line, '' # actually the only command able to accept options is MLST if cmd.upper() != 'MLST' or 'MLST' not in self.proto_cmds: - raise ValueError('Unsupported command "%s"' % cmd) + raise ValueError(f'Unsupported command "{cmd}"') except ValueError as err: - self.respond('501 %s.' % err) + self.respond(f'501 {err}.') else: facts = [x.lower() for x in arg.split(';')] self._current_facts = [ @@ -3166,7 +3164,7 @@ def ftp_HELP(self, line): if line: line = line.upper() if line in self.proto_cmds: - self.respond("214 %s" % self.proto_cmds[line]['help']) + self.respond(f"214 {self.proto_cmds[line]['help']}") else: self.respond("501 Unrecognized command.") else: @@ -3210,7 +3208,7 @@ def ftp_SITE_CHMOD(self, path, mode): self.run_as_current_user(self.fs.chmod, path, mode) except (OSError, FilesystemError) as err: why = _strerror(err) - self.respond('550 %s.' % why) + self.respond(f'550 {why}.') else: self.respond('200 SITE CHMOD successful.') return (path, mode) @@ -3220,7 +3218,7 @@ def ftp_SITE_HELP(self, line): if line: line = line.upper() if line in self.proto_cmds: - self.respond("214 %s" % self.proto_cmds[line]['help']) + self.respond(f"214 {self.proto_cmds[line]['help']}") else: self.respond("501 Unrecognized SITE command.") else: @@ -3228,7 +3226,7 @@ def ftp_SITE_HELP(self, line): site_cmds = [] for cmd in sorted(self.proto_cmds.keys()): if cmd.startswith('SITE '): - site_cmds.append(' %s\r\n' % cmd[5:]) + site_cmds.append(f' {cmd[5:]}\r\n') self.push(''.join(site_cmds)) self.respond("214 Help SITE command successful.") @@ -3306,7 +3304,7 @@ def secure_connection(self, ssl_context): # very quickly debug( "call: secure_connection(); can't secure SSL connection " - "%r; closing" % err, + f"{err!r}; closing", self, ) self.close() @@ -3362,7 +3360,7 @@ def _do_ssl_handshake(self): "call: _do_ssl_handshake, err: ssl-want-write", inst=self ) except SSL.SysCallError as err: - debug("call: _do_ssl_handshake, err: %r" % err, inst=self) + debug(f"call: _do_ssl_handshake, err: {err!r}", inst=self) retval, desc = err.args if (retval == -1 and desc == 'Unexpected EOF') or retval > 0: # Happens when the other side closes the socket before @@ -3376,7 +3374,7 @@ def _do_ssl_handshake(self): else: raise except SSL.Error as err: - debug("call: _do_ssl_handshake, err: %r" % err, inst=self) + debug(f"call: _do_ssl_handshake, err: {err!r}", inst=self) self.handle_failed_ssl_handshake() else: debug("SSL connection established", self) @@ -3454,7 +3452,7 @@ def send(self, data): super().handle_close() return 0 except SSL.SysCallError as err: - debug("call: send(), err: %r" % err, inst=self) + debug(f"call: send(), err: {err!r}", inst=self) errnum, errstr = err.args if errnum == errno.EWOULDBLOCK: return 0 @@ -3485,7 +3483,7 @@ def recv(self, buffer_size): super().handle_close() return b'' except SSL.SysCallError as err: - debug("call: recv(), err: %r" % err, inst=self) + debug(f"call: recv(), err: {err!r}", inst=self) errnum, errstr = err.args if ( errnum in _ERRNOS_DISCONNECTED @@ -3510,7 +3508,7 @@ def _do_ssl_shutdown(self): os.write(self.socket.fileno(), b'') except OSError as err: debug( - "call: _do_ssl_shutdown() -> os.write, err: %r" % err, + f"call: _do_ssl_shutdown() -> os.write, err: {err!r}", inst=self, ) if err.errno in { @@ -3560,7 +3558,7 @@ def _do_ssl_shutdown(self): super().close() except SSL.SysCallError as err: debug( - "call: _do_ssl_shutdown() -> shutdown(), err: %r" % err, + f"call: _do_ssl_shutdown() -> shutdown(), err: {err!r}", inst=self, ) errnum, errstr = err.args @@ -3573,7 +3571,7 @@ def _do_ssl_shutdown(self): raise except SSL.Error as err: debug( - "call: _do_ssl_shutdown() -> shutdown(), err: %r" % err, + f"call: _do_ssl_shutdown() -> shutdown(), err: {err!r}", inst=self, ) # see: @@ -3585,7 +3583,7 @@ def _do_ssl_shutdown(self): raise except OSError as err: debug( - "call: _do_ssl_shutdown() -> shutdown(), err: %r" % err, + f"call: _do_ssl_shutdown() -> shutdown(), err: {err!r}", inst=self, ) if err.errno in _ERRNOS_DISCONNECTED: @@ -3808,7 +3806,7 @@ def ftp_AUTH(self, line): # From RFC-4217: "As the SSL/TLS protocols self-negotiate # their levels, there is no need to distinguish between SSL # and TLS in the application layer". - self.respond('234 AUTH %s successful.' % arg) + self.respond(f'234 AUTH {arg} successful.') self.secure_connection(self.ssl_context) else: self.respond( @@ -3846,6 +3844,6 @@ def ftp_PROT(self, line): self.respond('200 Protection set to Private') self._prot = True elif arg in ('S', 'E'): - self.respond('521 PROT %s unsupported (use C or P).' % arg) + self.respond(f'521 PROT {arg} unsupported (use C or P).') else: self.respond("502 Unrecognized PROT type (use C or P).") diff --git a/pyftpdlib/ioloop.py b/pyftpdlib/ioloop.py index 3ce0975e..a90ff3fe 100644 --- a/pyftpdlib/ioloop.py +++ b/pyftpdlib/ioloop.py @@ -148,7 +148,7 @@ def poll(self): if self._cancellations > 512 and self._cancellations > ( len(self._tasks) >> 1 ): - debug("re-heapifying %s cancelled tasks" % self._cancellations) + debug(f"re-heapifying {self._cancellations} cancelled tasks") self.reheapify() try: @@ -199,10 +199,10 @@ class _CallLater: ) def __init__(self, seconds, target, *args, **kwargs): - assert callable(target), "%s is not callable" % target - assert sys.maxsize >= seconds >= 0, ( - "%s is not greater than or equal to 0 seconds" % seconds - ) + assert callable(target), f"{target} is not callable" + assert ( + sys.maxsize >= seconds >= 0 + ), f"{seconds} is not greater than or equal to 0 seconds" self._delay = seconds self._target = target self._args = args @@ -229,13 +229,13 @@ def __repr__(self): sig = object.__repr__(self) else: sig = repr(self._target) - sig += ' args=%s, kwargs=%s, cancelled=%s, secs=%s' % ( + sig += ' args=%s, kwargs=%s, cancelled=%s, secs=%s' % ( # noqa: UP031 self._args or '[]', self._kwargs or '{}', self.cancelled, self._delay, ) - return '<%s>' % sig + return f'<{sig}>' __str__ = __repr__ @@ -306,10 +306,9 @@ def __exit__(self, *args): def __repr__(self): status = [self.__class__.__module__ + "." + self.__class__.__name__] status.append( - "(fds=%s, tasks=%s)" - % (len(self.socket_map), len(self.sched._tasks)) + f"(fds={len(self.socket_map)}, tasks={len(self.sched._tasks)})" ) - return '<%s at %#x>' % (' '.join(status), id(self)) + return '<%s at %#x>' % (' '.join(status), id(self)) # noqa: UP031 __str__ = __repr__ @@ -523,8 +522,8 @@ def unregister(self, fd): except OSError as err: if err.errno in (errno.ENOENT, errno.EBADF): debug( - "call: unregister(); poller returned %r; ignoring it" - % err, + f"call: unregister(); poller returned {err!r};" + " ignoring it", self, ) else: @@ -695,8 +694,8 @@ def unregister(self, fd): except OSError as err: if err.errno in (errno.ENOENT, errno.EBADF): debug( - "call: unregister(); poller returned %r; " - "ignoring it" % err, + f"call: unregister(); poller returned {err!r};" + " ignoring it", self, ) else: @@ -842,8 +841,7 @@ def modify_ioloop_events(self, events, logdebug=False): else: ev = events debug( - "call: IOLoop.modify(); setting %r IO events" - % (ev), + f"call: IOLoop.modify(); setting {ev!r} IO events", self, ) self.ioloop.modify(self._fileno, events) @@ -932,7 +930,7 @@ def send(self, data): try: return self.socket.send(data) except OSError as err: - debug("call: send(), err: %s" % err, inst=self) + debug(f"call: send(), err: {err}", inst=self) if err.errno in _ERRNOS_RETRY: return 0 elif err.errno in _ERRNOS_DISCONNECTED: @@ -945,7 +943,7 @@ def recv(self, buffer_size): try: data = self.socket.recv(buffer_size) except OSError as err: - debug("call: recv(), err: %s" % err, inst=self) + debug(f"call: recv(), err: {err}", inst=self) if err.errno in _ERRNOS_DISCONNECTED: self.handle_close() return b'' diff --git a/pyftpdlib/log.py b/pyftpdlib/log.py index 905172a9..5de1fb14 100644 --- a/pyftpdlib/log.py +++ b/pyftpdlib/log.py @@ -89,7 +89,7 @@ def format(self, record): try: record.message = record.getMessage() except Exception as err: - record.message = "Bad message (%r): %r" % (err, record.__dict__) + record.message = f"Bad message ({err!r}): {record.__dict__!r}" record.asctime = time.strftime( TIME_FORMAT, self.converter(record.created) @@ -134,7 +134,7 @@ def format(self, record): def debug(s, inst=None): s = "[debug] " + s if inst is not None: - s += " (%r)" % inst + s += f" ({inst!r})" logger.debug(s) diff --git a/pyftpdlib/servers.py b/pyftpdlib/servers.py index 1a9015f8..90c0b4d6 100644 --- a/pyftpdlib/servers.py +++ b/pyftpdlib/servers.py @@ -156,7 +156,7 @@ def get_fqname(obj): config_logging(prefix=PREFIX_MPROC if prefork else PREFIX) if self.handler.passive_ports: - pasv_ports = "%s->%s" % ( + pasv_ports = "%s->%s" % ( # noqa: UP031 self.handler.passive_ports[0], self.handler.passive_ports[-1], ) @@ -375,8 +375,8 @@ def _refresh_tasks(self): """ if self._active_tasks: logger.debug( - "refreshing tasks (%s join() potentials)" - % len(self._active_tasks) + f"refreshing tasks ({len(self._active_tasks)} join()" + " potentials)" ) with self._lock: new = [] @@ -512,7 +512,7 @@ def serve_forever(self, timeout=1.0, blocking=True, handle_exit=True): def _terminate_task(self, t): if hasattr(t, 'terminate'): - logger.debug("terminate()ing task %r" % t) + logger.debug(f"terminate()ing task {t!r}") try: if not _BSD: t.terminate() @@ -525,7 +525,7 @@ def _terminate_task(self, t): pass def _join_task(self, t): - logger.debug("join()ing task %r" % t) + logger.debug(f"join()ing task {t!r}") t.join(self.join_timeout) if t.is_alive(): logger.warning( diff --git a/pyftpdlib/test/__init__.py b/pyftpdlib/test/__init__.py index bd34b990..d63ae624 100644 --- a/pyftpdlib/test/__init__.py +++ b/pyftpdlib/test/__init__.py @@ -51,7 +51,7 @@ PASSWD = '12345' HOME = os.getcwd() # Use PID to disambiguate file name for parallel testing. -TESTFN_PREFIX = 'pyftpd-tmp-%s-' % os.getpid() +TESTFN_PREFIX = f'pyftpd-tmp-{os.getpid()}-' GLOBAL_TIMEOUT = 2 BUFSIZE = 1024 INTERRUPTED_TRANSF_SIZE = 32768 @@ -78,11 +78,7 @@ def __str__(self): fqmod = self.__class__.__module__ if not fqmod.startswith('pyftpdlib.'): fqmod = 'pyftpdlib.test.' + fqmod - return "%s.%s.%s" % ( - fqmod, - self.__class__.__name__, - self._testMethodName, - ) + return f"{fqmod}.{self.__class__.__name__}.{self._testMethodName}" def get_testfn(self, suffix="", dir=None): fname = get_testfn(suffix=suffix, dir=dir) @@ -155,7 +151,7 @@ def retry_fun(fun): except OSError as _: err = _ warnings.warn( - "ignoring %s" % str(err), UserWarning, stacklevel=2 + f"ignoring {str(err)}", UserWarning, stacklevel=2 ) time.sleep(0.01) raise err @@ -269,7 +265,7 @@ def wrapper(self, *args, **kwargs): except AssertionError as exc: if x + 1 >= NO_RETRIES: raise - msg = "%r, retrying" % exc + msg = f"{exc!r}, retrying" print(msg, file=sys.stderr) # NOQA if PYTEST_PARALLEL: warnings.warn(msg, ResourceWarning, stacklevel=2) @@ -289,7 +285,7 @@ def call_until(fun, expr, timeout=GLOBAL_TIMEOUT): if eval(expr): # noqa return ret time.sleep(0.001) - raise RuntimeError('timed out (ret=%r)' % ret) + raise RuntimeError(f'timed out (ret={ret!r})') def get_server_handler(): @@ -331,8 +327,8 @@ def assert_free_resources(parent_pid=None): children = this_proc.children() if children: warnings.warn( - "some children didn't terminate (pid=%r) %r" - % (os.getpid(), str(children)), + f"some children didn't terminate (pid={os.getpid()!r})" + f" {str(children)!r}", UserWarning, stacklevel=2, ) @@ -351,8 +347,8 @@ def assert_free_resources(parent_pid=None): ] if cons: warnings.warn( - "some connections didn't close (pid=%r) %r" - % (os.getpid(), str(cons)), + f"some connections didn't close (pid={os.getpid()!r})" + f" {str(cons)!r}", UserWarning, stacklevel=2, ) @@ -471,7 +467,7 @@ def __init__(self, addr=None): def run(self): assert not self._started self._started = True - self.name = "%s(%s)" % (self.__class__.__name__, self.pid) + self.name = f"{self.__class__.__name__}({self.pid})" self.server.serve_forever() def stop(self): diff --git a/pyftpdlib/test/conftest.py b/pyftpdlib/test/conftest.py index ece36afa..3ec5b2be 100644 --- a/pyftpdlib/test/conftest.py +++ b/pyftpdlib/test/conftest.py @@ -60,9 +60,9 @@ def assert_closed_resources(setup_ctx, request): for key, value in before.items(): if key.startswith("_"): continue - msg = "%r left some unclosed %r resources behind: " % ( - setup_ctx['_origin'], - key, + msg = ( + f"{setup_ctx['_origin']!r} left some unclosed {key!r} resources" + " behind: " ) extra = after[key] - before[key] if extra: @@ -70,7 +70,7 @@ def assert_closed_resources(setup_ctx, request): msg += repr(extra) warn(msg) elif extra > 0: # unused, here just in case we extend it later - msg += "before=%r, after=%r" % (before[key], after[key]) + msg += f"before={before[key]!r}, after={after[key]!r}" warn(msg) diff --git a/pyftpdlib/test/test_authorizers.py b/pyftpdlib/test/test_authorizers.py index 699d69b9..3c078780 100644 --- a/pyftpdlib/test/test_authorizers.py +++ b/pyftpdlib/test/test_authorizers.py @@ -90,7 +90,7 @@ def test_common_methods(self): # raise exc if user already exists auth.add_user(USER, PASSWD, HOME) auth.add_anonymous(HOME) - with pytest.raises(ValueError, match='user %r already exists' % USER): + with pytest.raises(ValueError, match=f'user {USER!r} already exists'): auth.add_user(USER, PASSWD, HOME) with pytest.raises( ValueError, match="user 'anonymous' already exists" @@ -231,13 +231,13 @@ def assertRaisesWithMsg(self, excClass, msg, callableObj, *args, **kwargs): except excClass as err: if str(err) == msg: return - raise self.failureException("%s != %s" % (str(err), msg)) + raise self.failureException(f"{str(err)} != {msg}") else: if hasattr(excClass, '__name__'): excName = excClass.__name__ else: excName = str(excClass) - raise self.failureException("%s not raised" % excName) + raise self.failureException(f"{excName} not raised") # --- /utils @@ -350,13 +350,13 @@ def test_error_options(self): ) self.assertRaisesWithMsg( AuthorizerError, - 'unknown user %s' % wrong_user, + f'unknown user {wrong_user}', self.authorizer_class, allowed_users=[wrong_user], ) self.assertRaisesWithMsg( AuthorizerError, - 'unknown user %s' % wrong_user, + f'unknown user {wrong_user}', self.authorizer_class, rejected_users=[wrong_user], ) @@ -433,7 +433,7 @@ def test_override_user_errors(self): ) self.assertRaisesWithMsg( AuthorizerError, - 'no such user %s' % nonexistent_user, + f'no such user {nonexistent_user}', auth.override_user, nonexistent_user, perm='r', @@ -447,7 +447,7 @@ def test_override_user_errors(self): auth.override_user(this_user, perm='r') self.assertRaisesWithMsg( AuthorizerError, - '%s is not an allowed user' % another_user, + f'{another_user} is not an allowed user', auth.override_user, another_user, perm='r', @@ -461,7 +461,7 @@ def test_override_user_errors(self): auth.override_user(another_user, perm='r') self.assertRaisesWithMsg( AuthorizerError, - '%s is not an allowed user' % this_user, + f'{this_user} is not an allowed user', auth.override_user, this_user, perm='r', @@ -571,7 +571,7 @@ def get_fake_shell_user(): user = get_fake_shell_user() self.assertRaisesWithMsg( AuthorizerError, - "user %s has not a valid shell" % user, + f"user {user} has not a valid shell", UnixAuthorizer, allowed_users=[user], ) @@ -585,7 +585,7 @@ def get_fake_shell_user(): assert not auth._has_valid_shell(user) self.assertRaisesWithMsg( AuthorizerError, - "User %s doesn't have a valid shell." % user, + f"User {user} doesn't have a valid shell.", auth.override_user, user, perm='r', diff --git a/pyftpdlib/test/test_cli.py b/pyftpdlib/test/test_cli.py index b40aba0d..e89b65ed 100644 --- a/pyftpdlib/test/test_cli.py +++ b/pyftpdlib/test/test_cli.py @@ -131,7 +131,7 @@ def test_version_opt(self): for opt in ("-v", "--version"): with pytest.raises(SystemExit) as cm: main([opt, "-p", "0"]) - assert str(cm.value) == "pyftpdlib %s" % __ver__ + assert str(cm.value) == f"pyftpdlib {__ver__}" def test_verbose_opt(self): for opt in ("-V", "--verbose"): diff --git a/pyftpdlib/test/test_functional.py b/pyftpdlib/test/test_functional.py index eebdc576..ae753cfa 100644 --- a/pyftpdlib/test/test_functional.py +++ b/pyftpdlib/test/test_functional.py @@ -303,7 +303,7 @@ def test_quit(self): def test_help(self): self.client.sendcmd('help') cmd = random.choice(list(FTPHandler.proto_cmds.keys())) - self.client.sendcmd('help %s' % cmd) + self.client.sendcmd(f'help {cmd}') with pytest.raises(ftplib.error_perm, match="Unrecognized"): self.client.sendcmd('help ?!?') @@ -541,11 +541,11 @@ def test_cdup(self): os.mkdir(os.path.join(self.tempdir, subfolder)) assert self.client.pwd() == '/' self.client.cwd(self.tempdir) - assert self.client.pwd() == '/%s' % self.tempdir + assert self.client.pwd() == f'/{self.tempdir}' self.client.cwd(subfolder) - assert self.client.pwd() == '/%s/%s' % (self.tempdir, subfolder) + assert self.client.pwd() == f'/{self.tempdir}/{subfolder}' self.client.sendcmd('cdup') - assert self.client.pwd() == '/%s' % self.tempdir + assert self.client.pwd() == f'/{self.tempdir}' self.client.sendcmd('cdup') assert self.client.pwd() == '/' @@ -969,10 +969,10 @@ def test_rest_on_stor(self): # on stor file_size = self.client.size(self.testfn) assert file_size == bytes_sent - self.client.sendcmd('rest %s' % (file_size + 1)) + self.client.sendcmd(f'rest {file_size + 1}') with pytest.raises(ftplib.error_perm, match="> file size"): self.client.sendcmd('stor ' + self.testfn) - self.client.sendcmd('rest %s' % bytes_sent) + self.client.sendcmd(f'rest {bytes_sent}') self.client.storbinary('stor ' + self.testfn, self.dummy_sendfile) self.client.retrbinary( @@ -1146,13 +1146,13 @@ def test_restore_on_retr(self): # file size stored on the server should result in an error # on retr (RFC-1123) file_size = self.client.size(self.testfn) - self.client.sendcmd('rest %s' % (file_size + 1)) + self.client.sendcmd(f'rest {file_size + 1}') with pytest.raises( ftplib.error_perm, match="position .*? > file size" ): self.client.sendcmd('retr ' + self.testfn) # test resume - self.client.sendcmd('rest %s' % received_bytes) + self.client.sendcmd(f'rest {received_bytes}') self.client.retrbinary("retr " + self.testfn, self.dummyfile.write) self.dummyfile.seek(0) datafile = self.dummyfile.read() @@ -1210,7 +1210,7 @@ def _test_listing_cmds(self, cmd): if cmd.lower() != 'mlsd': # if pathname is a file one line is expected x = [] - self.client.retrlines('%s ' % cmd + self.testfn, x.append) + self.client.retrlines(f'{cmd} ' + self.testfn, x.append) assert len(x) == 1 assert ''.join(x).endswith(self.testfn) # non-existent path, 550 response is expected @@ -1218,14 +1218,14 @@ def _test_listing_cmds(self, cmd): with pytest.raises( ftplib.error_perm, match="No such (file|directory)" ): - self.client.retrlines('%s ' % cmd + bogus, lambda x: x) + self.client.retrlines(f'{cmd} ' + bogus, lambda x: x) # for an empty directory we excpect that the data channel is # opened anyway and that no data is received x = [] tempdir = self.get_testfn() os.mkdir(tempdir) try: - self.client.retrlines('%s %s' % (cmd, tempdir), x.append) + self.client.retrlines(f'{cmd} {tempdir}', x.append) assert x == [] finally: safe_rmpath(tempdir) @@ -1943,28 +1943,28 @@ def on_disconnect(self): self.write("on_disconnect,") def on_login(self, username): - self.write("on_login:%s," % username) + self.write(f"on_login:{username},") def on_login_failed(self, username, password): - self.write("on_login_failed:%s+%s," % (username, password)) + self.write(f"on_login_failed:{username}+{password},") def on_logout(self, username): - self.write("on_logout:%s," % username) + self.write(f"on_logout:{username},") def on_file_sent(self, file): - self.write("on_file_sent:%s," % os.path.basename(file)) + self.write(f"on_file_sent:{os.path.basename(file)},") def on_file_received(self, file): - self.write("on_file_received:%s," % os.path.basename(file)) + self.write(f"on_file_received:{os.path.basename(file)},") def on_incomplete_file_sent(self, file): self.write( - "on_incomplete_file_sent:%s," % os.path.basename(file) + f"on_incomplete_file_sent:{os.path.basename(file)}," ) def on_incomplete_file_received(self, file): self.write( - "on_incomplete_file_received:%s," % os.path.basename(file) + f"on_incomplete_file_received:{os.path.basename(file)}," ) self.testfn = testfn = self.get_testfn() @@ -1988,24 +1988,24 @@ def read_file(self, text): if data == text: return time.sleep(0.01) - self.fail("data: %r; expected: %r" % (data, text)) + self.fail(f"data: {data!r}; expected: {text!r}") def test_on_disconnect(self): self.client.login(USER, PASSWD) self.client.close() - self.read_file('on_connect,on_login:%s,on_disconnect,' % USER) + self.read_file(f'on_connect,on_login:{USER},on_disconnect,') def test_on_logout_quit(self): self.client.login(USER, PASSWD) self.client.sendcmd('quit') self.read_file( - 'on_connect,on_login:%s,on_logout:%s,on_disconnect,' % (USER, USER) + f'on_connect,on_login:{USER},on_logout:{USER},on_disconnect,' ) def test_on_logout_rein(self): self.client.login(USER, PASSWD) self.client.sendcmd('rein') - self.read_file('on_connect,on_login:%s,on_logout:%s,' % (USER, USER)) + self.read_file(f'on_connect,on_login:{USER},on_logout:{USER},') def test_on_logout_no_pass(self): # make sure on_logout() is not called if USER was provided @@ -2019,8 +2019,7 @@ def test_on_logout_user_issued_twice(self): self.client.login(USER, PASSWD) self.client.login("anonymous") self.read_file( - 'on_connect,on_login:%s,on_logout:%s,on_login:anonymous,' - % (USER, USER) + f'on_connect,on_login:{USER},on_logout:{USER},on_login:anonymous,' ) def test_on_login_failed(self): @@ -2036,8 +2035,7 @@ def test_on_file_received(self): self.client.login(USER, PASSWD) self.client.storbinary('stor ' + self.testfn2, dummyfile) self.read_file( - 'on_connect,on_login:%s,on_file_received:%s,' - % (USER, self.testfn2) + f'on_connect,on_login:{USER},on_file_received:{self.testfn2},' ) def test_on_file_sent(self): @@ -2047,7 +2045,7 @@ def test_on_file_sent(self): f.write(data) self.client.retrbinary("retr " + self.testfn2, lambda x: x) self.read_file( - 'on_connect,on_login:%s,on_file_sent:%s,' % (USER, self.testfn2) + f'on_connect,on_login:{USER},on_file_sent:{self.testfn2},' ) @retry_on_failure @@ -2076,8 +2074,7 @@ def test_on_incomplete_file_received(self): resp = self.client.getmultiline() assert resp.startswith("226") self.read_file( - 'on_connect,on_login:%s,on_incomplete_file_received:%s,' - % (USER, self.testfn2) + f'on_connect,on_login:{USER},on_incomplete_file_received:{self.testfn2},' ) @retry_on_failure @@ -2097,8 +2094,7 @@ def test_on_incomplete_file_sent(self): break assert self.client.getline()[:3] == "426" self.read_file( - 'on_connect,on_login:%s,on_incomplete_file_sent:%s,' - % (USER, self.testfn2) + f'on_connect,on_login:{USER},on_incomplete_file_sent:{self.testfn2},' ) @@ -2141,7 +2137,7 @@ def test_eprt(self): # test wrong proto try: self.client.sendcmd( - 'eprt |%s|%s|%s|' + 'eprt |%s|%s|%s|' # noqa: UP031 % (self.other_proto, self.server.host, self.server.port) ) except ftplib.error_perm as err: @@ -2156,17 +2152,15 @@ def test_eprt(self): # len('|') < 3 assert self.cmdresp('eprt ||') == msg # port > 65535 - assert ( - self.cmdresp('eprt |%s|%s|65536|' % (self.proto, self.HOST)) == msg - ) + assert self.cmdresp(f'eprt |{self.proto}|{self.HOST}|65536|') == msg # port < 0 - assert self.cmdresp('eprt |%s|%s|-1|' % (self.proto, self.HOST)) == msg + assert self.cmdresp(f'eprt |{self.proto}|{self.HOST}|-1|') == msg # port < 1024 - resp = self.cmdresp('eprt |%s|%s|222|' % (self.proto, self.HOST)) + resp = self.cmdresp(f'eprt |{self.proto}|{self.HOST}|222|') assert resp[:3] == '501' assert 'privileged port' in resp # proto > 2 - _cmd = 'eprt |3|%s|%s|' % (self.server.host, self.server.port) + _cmd = f'eprt |3|{self.server.host}|{self.server.port}|' with pytest.raises(ftplib.error_perm): self.client.sendcmd(_cmd) @@ -2185,7 +2179,7 @@ def test_eprt(self): sock.listen(5) sock.settimeout(GLOBAL_TIMEOUT) ip, port = sock.getsockname()[:2] - self.client.sendcmd('eprt |%s|%s|%s|' % (self.proto, ip, port)) + self.client.sendcmd(f'eprt |{self.proto}|{ip}|{port}|') try: s = sock.accept() s[0].close() @@ -2225,7 +2219,7 @@ def test_epsv_all(self): self.client.sendport(self.HOST, 2000) with pytest.raises(ftplib.error_perm): self.client.sendcmd( - 'eprt |%s|%s|%s|' % (self.proto, self.HOST, 2000), + f'eprt |{self.proto}|{self.HOST}|{2000}|', ) @@ -2257,7 +2251,7 @@ def test_port_v4(self): ae(self.cmdresp('port 127,0,0,1,256,1'), msg) # port > 65535 ae(self.cmdresp('port 127,0,0,1,-1,0'), msg) # port < 0 # port < 1024 - resp = self.cmdresp('port %s,1,1' % self.HOST.replace('.', ',')) + resp = self.cmdresp(f"port {self.HOST.replace('.', ',')},1,1") assert resp[:3] == '501' assert 'privileged port' in resp if self.HOST != "1.2.3.4": @@ -2374,7 +2368,7 @@ def test_eprt_v4(self): sock.listen(5) sock.settimeout(2) ip, port = sock.getsockname()[:2] - self.client.sendcmd('eprt |1|%s|%s|' % (ip, port)) + self.client.sendcmd(f'eprt |1|{ip}|{port}|') try: sock2, _ = sock.accept() sock2.close() diff --git a/pyftpdlib/test/test_functional_ssl.py b/pyftpdlib/test/test_functional_ssl.py index 1c9f3e3b..d484b0b0 100644 --- a/pyftpdlib/test/test_functional_ssl.py +++ b/pyftpdlib/test/test_functional_ssl.py @@ -204,13 +204,13 @@ def assertRaisesWithMsg(self, excClass, msg, callableObj, *args, **kwargs): except excClass as err: if str(err) == msg: return - raise self.failureException("%s != %s" % (str(err), msg)) + raise self.failureException(f"{str(err)} != {msg}") else: if hasattr(excClass, '__name__'): excName = excClass.__name__ else: excName = str(excClass) - raise self.failureException("%s not raised" % excName) + raise self.failureException(f"{excName} not raised") def test_auth(self): # unsecured diff --git a/pyproject.toml b/pyproject.toml index e516d0fa..49248924 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,6 @@ line-length = 79 preview = true select = [ "ALL", # to get a list of all values: `python3 -m ruff linter` - "PLR2044", # [*] Line with empty comment ] ignore = [ "A", # flake8-builtins @@ -68,7 +67,6 @@ ignore = [ "TRY003", # Avoid specifying long messages outside the exception class "TRY300", # Consider moving this statement to an `else` block "TRY301", # Abstract `raise` to an inner function - "UP031", # [*] Use format specifiers instead of percent format ] [tool.ruff.lint.per-file-ignores] diff --git a/scripts/internal/git_pre_commit.py b/scripts/internal/git_pre_commit.py index 086363ad..b30c9c55 100755 --- a/scripts/internal/git_pre_commit.py +++ b/scripts/internal/git_pre_commit.py @@ -47,7 +47,7 @@ def hilite(s, ok=True, bold=False): attr.append("31") if bold: attr.append("1") - return "\x1b[%sm%s\x1b[0m" % (";".join(attr), s) + return f"\x1b[{';'.join(attr)}m{s}\x1b[0m" def exit(msg): @@ -97,7 +97,7 @@ def git_committed_files(): def ruff(files): - print("running ruff (%s)" % len(files)) + print(f"running ruff ({len(files)})") cmd = [PYTHON, "-m", "ruff", "check", "--no-cache"] + files if subprocess.call(cmd) != 0: msg = "Python code didn't pass 'ruff' style check. " @@ -106,17 +106,17 @@ def ruff(files): def rstcheck(files): - print("running rst linter (%s)" % len(files)) + print(f"running rst linter ({len(files)})") cmd = ["rstcheck", "--config=pyproject.toml"] + files if subprocess.call(cmd) != 0: return sys.exit("RST code didn't pass style check") def toml_sort(files): - print("running toml linter (%s)" % len(files)) + print(f"running toml linter ({len(files)})") cmd = ["toml-sort", "--check"] + files if subprocess.call(cmd) != 0: - return sys.exit("%s didn't pass style check" % ' '.join(files)) + return sys.exit(f"{' '.join(files)} didn't pass style check") def main(): diff --git a/scripts/internal/print_announce.py b/scripts/internal/print_announce.py index 571995e3..19edcec5 100755 --- a/scripts/internal/print_announce.py +++ b/scripts/internal/print_announce.py @@ -77,7 +77,7 @@ def get_changes(): if re.match(r"^- \d+_: ", line): num, _, rest = line.partition(': ') num = ''.join([x for x in num if x.isdigit()]) - line = "- #%s: %s" % (num, rest) + line = f"- #{num}: {rest}" if line.startswith('===='): break diff --git a/scripts/internal/winmake.py b/scripts/internal/winmake.py index ae6e0864..56396411 100755 --- a/scripts/internal/winmake.py +++ b/scripts/internal/winmake.py @@ -117,7 +117,7 @@ def onerror(fun, path, excinfo): existed = os.path.isdir(path) shutil.rmtree(path, onerror=onerror) if existed: - safe_print("rmdir -f %s" % path) + safe_print(f"rmdir -f {path}") if "*" not in pattern: if directory: @@ -134,10 +134,10 @@ def onerror(fun, path, excinfo): for name in found: path = os.path.join(root, name) if directory: - safe_print("rmdir -f %s" % path) + safe_print(f"rmdir -f {path}") safe_rmtree(path) else: - safe_print("rm %s" % path) + safe_print(f"rm {path}") safe_remove(path) @@ -147,7 +147,7 @@ def safe_remove(path): except FileNotFoundError: pass else: - safe_print("rm %s" % path) + safe_print(f"rm {path}") def safe_rmtree(path): @@ -159,7 +159,7 @@ def onerror(fun, path, excinfo): existed = os.path.isdir(path) shutil.rmtree(path, onerror=onerror) if existed: - safe_print("rmdir -f %s" % path) + safe_print(f"rmdir -f {path}") def recursive_rm(*patterns): @@ -187,7 +187,7 @@ def build(): """Build / compile.""" # Make sure setuptools is installed (needed for 'develop' / # edit mode). - sh('%s -c "import setuptools"' % PYTHON) + sh(f'{PYTHON} -c "import setuptools"') cmd = [PYTHON, "setup.py", "build"] # Print coloured warnings in real time. @@ -213,33 +213,33 @@ def build(): p.wait() # Make sure it actually worked. - sh('%s -c "import pyftpdlib"' % PYTHON) + sh(f'{PYTHON} -c "import pyftpdlib"') win_colorprint("build + import successful", GREEN) def wheel(): """Create wheel file.""" build() - sh("%s setup.py bdist_wheel" % PYTHON) + sh(f"{PYTHON} setup.py bdist_wheel") def upload_wheels(): """Upload wheel files on PyPI.""" build() - sh("%s -m twine upload dist/*.whl" % PYTHON) + sh(f"{PYTHON} -m twine upload dist/*.whl") def install_pip(): """Install pip.""" try: - sh('%s -c "import pip"' % PYTHON) + sh(f'{PYTHON} -c "import pip"') except SystemExit: if hasattr(ssl, '_create_unverified_context'): ctx = ssl._create_unverified_context() else: ctx = None kw = dict(context=ctx) if ctx else {} - safe_print("downloading %s" % GET_PIP_URL) + safe_print(f"downloading {GET_PIP_URL}") req = urlopen(GET_PIP_URL, **kw) data = req.read() @@ -248,7 +248,7 @@ def install_pip(): f.write(data) try: - sh('%s %s --user' % (PYTHON, tfile)) + sh(f'{PYTHON} {tfile} --user') finally: os.remove(tfile) @@ -256,7 +256,7 @@ def install_pip(): def install(): """Install in develop / edit mode.""" build() - sh("%s setup.py develop" % PYTHON) + sh(f"{PYTHON} setup.py develop") def uninstall(): @@ -272,7 +272,7 @@ def uninstall(): except ImportError: break else: - sh("%s -m pip uninstall -y pyftpdlib" % PYTHON) + sh(f"{PYTHON} -m pip uninstall -y pyftpdlib") finally: os.chdir(here) @@ -297,7 +297,7 @@ def uninstall(): if 'pyftpdlib' not in line: f.write(line) else: - print("removed line %r from %r" % (line, path)) + print(f"removed line {line!r} from {path!r}") def clean(): @@ -330,7 +330,7 @@ def setup_dev_env(): """Install useful deps.""" install_pip() install_git_hooks() - sh("%s -m pip install -U %s" % (PYTHON, " ".join(DEPS))) + sh(f"{PYTHON} -m pip install -U {' '.join(DEPS)}") def lint(): @@ -339,57 +339,57 @@ def lint(): py_files = py_files.decode() py_files = [x for x in py_files.split() if x.endswith('.py')] py_files = ' '.join(py_files) - sh("%s -m flake8 %s" % (PYTHON, py_files), nolog=True) + sh(f"{PYTHON} -m flake8 {py_files}", nolog=True) def test(args=""): """Run tests.""" build() - sh("%s -m pytest %s %s" % (PYTHON, PYTEST_ARGS, args)) + sh(f"{PYTHON} -m pytest {PYTEST_ARGS} {args}") def test_authorizers(): build() - sh("%s pyftpdlib\\test\\test_authorizers.py" % PYTHON) + sh(f"{PYTHON} pyftpdlib\\test\\test_authorizers.py") def test_filesystems(): build() - sh("%s pyftpdlib\\test\\test_filesystems.py" % PYTHON) + sh(f"{PYTHON} pyftpdlib\\test\\test_filesystems.py") def test_functional(): build() - sh("%s pyftpdlib\\test\\test_functional.py" % PYTHON) + sh(f"{PYTHON} pyftpdlib\\test\\test_functional.py") def test_functional_ssl(): build() - sh("%s pyftpdlib\\test\\test_functional_ssl.py" % PYTHON) + sh(f"{PYTHON} pyftpdlib\\test\\test_functional_ssl.py") def test_ioloop(): build() - sh("%s pyftpdlib\\test\\test_ioloop.py" % PYTHON) + sh(f"{PYTHON} pyftpdlib\\test\\test_ioloop.py") def test_cli(): build() - sh("%s pyftpdlib\\test\\test_cli.py" % PYTHON) + sh(f"{PYTHON} pyftpdlib\\test\\test_cli.py") def test_servers(): build() - sh("%s pyftpdlib\\test\\test_servers.py" % PYTHON) + sh(f"{PYTHON} pyftpdlib\\test\\test_servers.py") def coverage(): """Run coverage tests.""" build() - sh("%s -m coverage run -m pytest %s" % (PYTHON, PYTEST_ARGS)) - sh("%s -m coverage report" % PYTHON) - sh("%s -m coverage html" % PYTHON) - sh("%s -m webbrowser -t htmlcov/index.html" % PYTHON) + sh(f"{PYTHON} -m coverage run -m pytest {PYTEST_ARGS}") + sh(f"{PYTHON} -m coverage report") + sh(f"{PYTHON} -m coverage html") + sh(f"{PYTHON} -m webbrowser -t htmlcov/index.html") def test_by_name(name): @@ -401,7 +401,7 @@ def test_by_name(name): def test_failed(): """Re-run tests which failed on last run.""" build() - sh("%s -m pytest %s --last-failed" % (PYTHON, PYTEST_ARGS)) + sh(f"{PYTHON} -m pytest {PYTEST_ARGS} --last-failed") def install_git_hooks(): @@ -441,7 +441,7 @@ def get_python(path): '39-64', ) for v in vers: - pypath = r'C:\\python%s\python.exe' % v + pypath = r'C:\\python%s\python.exe' % v # noqa: UP031 if path in pypath and os.path.isfile(pypath): return pypath @@ -481,7 +481,7 @@ def main(): PYTHON = get_python(args.python) if not PYTHON: return sys.exit( - "can't find any python installation matching %r" % args.python + f"can't find any python installation matching {args.python!r}" ) os.putenv('PYTHON', PYTHON) win_colorprint("using " + PYTHON) diff --git a/setup.py b/setup.py index 09cce803..ef4f906c 100644 --- a/setup.py +++ b/setup.py @@ -62,7 +62,7 @@ def hilite(s, ok=True, bold=False): attr.append('31') # red if bold: attr.append('1') - return '\x1b[%sm%s\x1b[0m' % (';'.join(attr), s) + return f"\x1b[{';'.join(attr)}m{s}\x1b[0m" if sys.version_info[0] < 3: