Skip to content

Commit

Permalink
fix flake8
Browse files Browse the repository at this point in the history
  • Loading branch information
pabera committed Nov 16, 2024
1 parent c792037 commit 76402e1
Showing 1 changed file with 93 additions and 121 deletions.
214 changes: 93 additions & 121 deletions src/jukebox/run_rpc_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,30 @@
or for debugging.
The tool features auto-completion and command history.
Now supports quoted arguments for handling spaces in arguments.
The list of available commands is fetched from the running Jukebox service.
.. todo:
- kwargs support
"""

import argparse
import zmq
import curses
import curses.ascii
import jukebox.rpc.client as rpc
import shlex # Added for proper command parsing

# Developers note: Scripting at it's dirty end :-)


# Careful: curses and default outputs don't mix!
# In case you'll get an error, most likely your terminal may become funny
# Best bet: Just don't configure any logger at all!
# import logging
# import misc.loggingext
# logger = misc.loggingext.configure_default(logging.ERROR)


url: str
client: rpc.RpcClient
Expand Down Expand Up @@ -64,6 +77,7 @@ def format_help(scr, topic):
sign: str = value['signature']
sign = sign[sign.find('('):]
func = f"{key}{sign}"
# print(f"{func:50}: {value['description']}")
if key.startswith(topic):
scr.addstr(f"{func:50}: {value['description']}\n")
[y, x] = scr.getyx()
Expand Down Expand Up @@ -156,8 +170,12 @@ def get_common_beginning(strings):


def autocomplete(msg):
# logger.debug(f"Autocomplete {msg}")
# Get all stings that match the beginning
# candidates = ["ap1", 'ap2', 'appbbb3', 'appbbb4', 'appbbb5', 'appbbb6', 'exit']
matches = [s for s in candidates if s.startswith(msg)]
if len(matches) == 0:
# Matches is empty: nothing found
return msg, matches
common = get_common_beginning(matches)
return common, matches
Expand All @@ -175,7 +193,7 @@ def reprompt(scr, msg, y, x):
scr.move(y, x)


def get_input(scr):
def get_input(scr): # noqa: C901
curses.noecho()
ch = 0
msg = ''
Expand All @@ -184,11 +202,6 @@ def get_input(scr):
[y, x] = scr.getyx()
reprompt(scr, msg, y, len(prompt) + len(msg))
scr.refresh()

# Track if we're inside quotes
in_quotes = False
escape_next = False

while ch != ord(b'\n'):
try:
ch = scr.getch()
Expand All @@ -197,41 +210,26 @@ def get_input(scr):
break
[y, x] = scr.getyx()
pos = x - len(prompt)

if ch == ord(b'\t') and not in_quotes:
if ch == ord(b'\t'):
msg, matches = autocomplete(msg)
if len(matches) > 1:
scr.addstr('\n')
scr.addstr(', '.join(matches))
scr.addstr('\n')
scr.clrtobot()
reprompt(scr, msg, y, len(prompt) + len(msg))
elif ch == ord(b'\n'):
# Only accept newline if we're not in the middle of a quote
if not in_quotes:
break
else:
# Add the newline to multi-line quoted string
msg = msg[0:pos] + "\\n" + msg[pos:]
reprompt(scr, msg, y, x + 2)
elif ch == 4 and not in_quotes:
if ch == ord(b'\n'):
break
if ch == 4:
msg = 'exit'
break
elif ch == curses.KEY_BACKSPACE or ch == 127:
if pos > 0:
# Handle backspace in quotes - need to check if we're deleting a quote char
if msg[pos-1] == '"' and not escape_next:
in_quotes = not in_quotes
elif msg[pos-1] == '\\':
escape_next = False
scr.delch(y, x - 1)
msg = msg[0:pos - 1] + msg[pos:]
elif ch == curses.KEY_DC:
if pos < len(msg):
if msg[pos] == '"' and not escape_next:
in_quotes = not in_quotes
scr.delch(y, x)
msg = msg[0:pos] + msg[pos + 1:]
scr.delch(y, x)
msg = msg[0:pos] + msg[pos + 1:]
elif ch == curses.KEY_LEFT:
if pos > 0:
scr.move(y, x - 1)
Expand All @@ -242,62 +240,31 @@ def get_input(scr):
scr.move(y, len(prompt))
elif ch == curses.KEY_END:
scr.move(y, len(prompt) + len(msg))
elif ch == curses.KEY_UP and not in_quotes:
elif ch == curses.KEY_UP:
if hidx == len(history):
ihist = msg
hidx = max(hidx - 1, 0)
msg = history[hidx]
reprompt(scr, msg, y, len(prompt) + len(msg))
elif ch == curses.KEY_DOWN and not in_quotes:
elif ch == curses.KEY_DOWN:
hidx = min(hidx + 1, len(history))
if hidx == len(history):
msg = ihist
else:
msg = history[hidx]
reprompt(scr, msg, y, len(prompt) + len(msg))
elif is_printable(ch):
char = curses.ascii.unctrl(ch)
if char == '"' and not escape_next:
in_quotes = not in_quotes
elif char == '\\':
escape_next = True
else:
escape_next = False
msg = msg[0:pos] + char + msg[pos:]
msg = msg[0:pos] + curses.ascii.unctrl(ch) + msg[pos:]
reprompt(scr, msg, y, x + 1)

# else:
# print(f" {ch} -- {type(ch)}")
scr.refresh()

scr.refresh()
if msg:
history.append(msg)
history.append(msg)
return msg


def parse_command(cmd_str):
"""
Parse command string using shlex to handle quoted arguments properly.
Returns (command_parts, args) tuple.
"""
try:
parts = shlex.split(cmd_str)
if not parts:
return [], []

# Split the command on dots for package.plugin.method
cmd_parts = [v for v in parts[0].split('.') if v]

# Convert args to appropriate types
args = [tonum(arg) for arg in parts[1:]]

return cmd_parts, args
except ValueError as e:
# Handle unclosed quotes
return None, str(e)


def tonum(string_value):
"""Convert string to number if possible, otherwise return original string."""
ret = string_value
try:
ret = int(string_value)
Expand Down Expand Up @@ -332,94 +299,99 @@ def main(scr):
while cmd != 'exit':
cmd = get_input(scr)
scr.addstr("\n")

if cmd == '':
# Split on whitespaces to separate cmd and arg list
dec = [v for v in cmd.strip().split(' ') if len(v) > 0]
if len(dec) == 0:
continue

# Handle built-in commands
if cmd.startswith('help'):
elif dec[0] == 'help':
topic = ''
try:
_, args = parse_command(cmd)
if args:
topic = args[0]
except:
pass
if len(dec) > 1:
topic = dec[1]
format_help(scr, topic)
continue
elif cmd == 'usage':
elif dec[0] == 'usage':
format_usage(scr)
continue
elif cmd == 'exit':
break

# Parse the command
cmd_parts, args = parse_command(cmd)

if isinstance(args, str):
scr.addstr(f"Error parsing command: {args}\n")
# scr.addstr(f"\n{cmd}\n")
# Split cmd on '.' into package.plugin.method
# Remove duplicate '.' along the way
sl = [v for v in dec[0].split('.') if len(v) > 0]
fargs = [tonum(a) for a in dec[1:]]
scr.addstr(f"\n:: Command = {sl}, args = {fargs}\n")
response = None
method = None
if not (2 <= len(sl) <= 3):
scr.addstr(":: Error = Ill-formatted command\n")
continue

if not (2 <= len(cmd_parts) <= 3):
scr.addstr("Error: Invalid command format. Use: package.plugin.method or package.plugin\n")
continue

method = cmd_parts[2] if len(cmd_parts) == 3 else None

if len(sl) == 3:
method = sl[2]
try:
response = client.enque(cmd_parts[0], cmd_parts[1], method, args=args)
scr.addstr(f"\n:: Response =\n{response}\n\n")
response = client.enque(sl[0], sl[1], method, args=fargs)
except zmq.error.Again:
scr.addstr("\n\n" + '-' * 70 + "\n")
scr.addstr("Could not reach RPC Server. Jukebox running? Correct Port?\n")
scr.addstr('-' * 70 + "\n\n")
scr.refresh()
except Exception as e:
scr.addstr(f":: Error: {str(e)}\n")
scr.addstr(f":: Exception response =\n{e}\n")
else:
scr.addstr(f"\n:: Response =\n{response}\n\n")


def runcmd(cmd):
"""Run a single command and exit."""
cmd_parts, args = parse_command(cmd)
"""
Just run a command.
Right now duplicates more or less main()
:todo remove duplication of code
"""

if isinstance(args, str):
print(f"Error parsing command: {args}")
# Split on whitespaces to separate cmd and arg list
dec = [v for v in cmd.strip().split(' ') if len(v) > 0]
if len(dec) == 0:
return

if not (2 <= len(cmd_parts) <= 3):
print("Error: Invalid command format. Use: package.plugin.method or package.plugin")
# Split cmd on '.' into package.plugin.method
# Remove duplicate '.' along the way
sl = [v for v in dec[0].split('.') if len(v) > 0]
fargs = [tonum(a) for a in dec[1:]]
response = None
method = None
if not (2 <= len(sl) <= 3):
print(":: Error = Ill-formatted command\n")
return

method = cmd_parts[2] if len(cmd_parts) == 3 else None

if len(sl) == 3:
method = sl[2]
try:
response = client.enque(cmd_parts[0], cmd_parts[1], method, args=args)
print(f"\n:: Response =\n{response}\n")
response = client.enque(sl[0], sl[1], method, args=fargs)
except zmq.error.Again:
print("\n\n" + '-' * 70)
print("Could not reach RPC Server. Jukebox running? Correct Port?")
print('-' * 70 + "\n")
print("\n\n" + '-' * 70 + "\n")
print("Could not reach RPC Server. Jukebox running? Correct Port?\n")
print('-' * 70 + "\n\n")
return
except Exception as e:
print(f":: Error: {str(e)}")
print(f":: Exception response =\n{e}\n")
return
else:
print(f"\n:: Response =\n{response}\n\n")


if __name__ == '__main__':
default_tcp = 5555
default_ws = 5556
url = f"tcp://localhost:{default_tcp}"
argparser = argparse.ArgumentParser(description='The Jukebox RPC command line tool',
epilog=f'Default connection: {url}')
epilog=f'Default connection: {url}')
port_group = argparser.add_mutually_exclusive_group()
port_group.add_argument("-w", "--websocket",
help=f"Use websocket protocol on PORT [default: {default_ws}]",
nargs='?', const=default_ws,
metavar="PORT", default=None)
help=f"Use websocket protocol on PORT [default: {default_ws}]",
nargs='?', const=default_ws,
metavar="PORT", default=None)
port_group.add_argument("-t", "--tcp",
help=f"Use tcp protocol on PORT [default: {default_tcp}]",
nargs='?', const=default_tcp,
metavar="PORT", default=None)
help=f"Use tcp protocol on PORT [default: {default_tcp}]",
nargs='?', const=default_tcp,
metavar="PORT", default=None)
port_group.add_argument("-c", "--command",
help="Send command to Jukebox server",
default=None)
help="Send command to Jukebox server",
default=None)
args = argparser.parse_args()

if args.websocket is not None:
Expand Down

0 comments on commit 76402e1

Please sign in to comment.