Skip to content

Commit

Permalink
basic tests pass on the serialmidi handler
Browse files Browse the repository at this point in the history
  • Loading branch information
cpmpercussion committed Aug 15, 2024
1 parent 8cb74dd commit 6aa1d9d
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 124 deletions.
174 changes: 59 additions & 115 deletions impsy/impsio.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ def __init__(self, config: dict, callback: Callable[[int, float], None], dense_c
super().__init__(config, callback, dense_callback)
self.serial_port = config["serial"]["port"]
self.baudrate = config["serial"]["baudrate"] # 31250 midi,
# self.dimension = config["modell"]["dimension"]
self.serial = None
self.buffer = ""

Expand All @@ -74,6 +73,10 @@ def send(self, output_values) -> None:

def handle(self) -> None:
"""read in the serial bytes and process lines into value lists for IMPSY"""
# exist quickly if there is no serial connection.
if self.serial is None:
return

# first read in the serial bytes in waiting
while self.serial.in_waiting:
self.buffer += self.serial.read(self.serial.in_waiting).decode()
Expand All @@ -92,12 +95,11 @@ def handle(self) -> None:
def connect(self) -> None:
"""Tries to open a serial port for regular IO."""
try:
click.secho("Serial: Opening port", fg="yellow")
# TODO make the serial port configurable duh.
click.secho(f"Serial: Opening port {self.serial_port} at {self.baudrate} baud.", fg="yellow")
self.serial = serial.Serial( self.serial_port, baudrate=self.baudrate, timeout=0)
except:
self.serial = None
click.secho("Serial: Could not open port.", fg="red")
click.secho(f"Serial: Could not open {self.serial_port}.", fg="red")


def disconnect(self) -> None:
Expand All @@ -108,7 +110,7 @@ def disconnect(self) -> None:


class SerialMIDIServer(IOServer):
"""Handles MIDI over serial."""
"""Handles MIDI IO over serial."""

def __init__(
self,
Expand All @@ -118,87 +120,40 @@ def __init__(
) -> None:
super().__init__(config, callback, dense_callback)
self.parser = mido.parser.Parser()
self.serial_port = config["serial"]["port"]
self.baudrate = 31250 # midi baudrate
self.serial = None
self.buffer = "" # used for storing serial data after reading
self.last_midi_notes = {} # dict to store last played notes via midi
self.midi_output_mapping = self.config["midi"]["output"]
self.midi_input_mapping = self.config["midi"]["input"]

def send_midi_note_on(self, channel, pitch, velocity):
"""Send a MIDI note on (and implicitly handle note_off)"""
try:
midi_msg = mido.Message(
"note_off",
channel=channel,
note=self.last_midi_notes[channel],
velocity=0,
)
self.send_midi_message(midi_msg)
except KeyError:
click.secho("Something wrong with turning MIDI notes off!!", fg="red")
pass
midi_msg = mido.Message(
"note_on", channel=channel, note=pitch, velocity=velocity
)
self.send_midi_message(midi_msg)
self.last_midi_notes[channel] = pitch

def send_control_change(self, channel, control, value):
"""Send a MIDI control change message"""
midi_msg = mido.Message(
"control_change", channel=channel, control=control, value=value
)
self.send_midi_message(midi_msg)

def send_midi_note_offs(self):
"""Sends note offs on any MIDI channels that have been used for notes."""
outconf = self.config["midi"]["output"]
out_channels = [x[1] for x in outconf if x[0] == "note_on"]
for i in out_channels:
try:
midi_msg = mido.Message(
"note_off",
channel=i - 1,
note=self.last_midi_notes[i - 1],
velocity=0,
)
self.send_midi_message(midi_msg)
# click.secho(f"MIDI: note_off: {self.last_midi_notes[i-1]}: msg: {midi_msg.bin()}", fg="blue")
except KeyError:
click.secho("Something wrong with all MIDI Note off!", fg="red")
pass

def send_midi_message(self, message):
"""Sends a mido MIDI message via the very basic serial output on Raspberry Pi GPIO."""
try:
self.serial.write(message.bin())
except:
pass

def send(self, output_values) -> None:
"""Sends sound commands via MIDI"""
assert (
len(output_values) + 1 == self.dimension
), "Dimension not same as prediction size." # Todo more useful error.
start_time = datetime.datetime.now()
outconf = self.config["midi"]["output"]
values = list(map(int, (np.ceil(output_values * 127))))
if self.verbose:
click.secho(f"out: {values}", fg="green")
for i in range(self.dimension - 1):
if outconf[i][0] == "note_on":
self.send_midi_note_on(
outconf[i][1] - 1, values[i], 127
) # note decremented channel (0-15)
if outconf[i][0] == "control_change":
self.send_control_change(
outconf[i][1] - 1, outconf[i][2], values[i]
) # note decrement channel (0-15)

output_midi_messages = output_values_to_midi_messages(output_values, self.midi_output_mapping)
for msg in output_midi_messages:
# send note off if a previous note_on had been sent
if msg.type == 'note_on' and msg.channel in self.last_midi_notes:
note_off_msg = mido.Message("note_off", channel = msg.channel, note=self.last_midi_notes[msg.channel], velocity=0)
self.send_midi_message(note_off_msg)
self.send_midi_message(msg) # actually send the message.
if msg.type == 'note_on':
self.last_midi_notes[msg.channel] = msg.note # store last midi note if it was a note_on.

duration_time = (datetime.datetime.now() - start_time).total_seconds()
if duration_time > 0.02:
click.secho(
f"Sound command sending took a long time: {(duration_time):.3f}s",
f"Sound command sending took a long time: {duration_time:.3f}s",
fg="red",
)
# TODO: is it a good idea to have all this indexing? easy to screw up.

def handle(self) -> None:
"""Read in some bytes from the serial port and try to handle any found MIDI messages."""
if self.serial is None:
return
if self.serial.in_waiting >= 3:
midi_bytes = self.serial.read(3)
self.parser.feed(midi_bytes)
Expand Down Expand Up @@ -227,16 +182,14 @@ def handle(self) -> None:
pass

def connect(self) -> None:
"""Tries to open a serial port for MIDI IO on Raspberry Pi."""
"""Tries to open a serial port for regular IO."""
try:
click.secho(
"Trying to open serial port for MIDI in/out.", fg="yellow"
)
# TODO make the serial port configurable duh.
self.serial = serial.Serial("/dev/ttyAMA0", baudrate=31250)
click.secho(f"Serial: Opening port {self.serial_port} at {self.baudrate} baud. (MIDI mode)", fg="yellow")
self.serial = serial.Serial( self.serial_port, baudrate=self.baudrate, timeout=0)
except:
self.serial = None
click.secho("Could not open serial port.", fg="red")
click.secho(f"Serial: Could not open {self.serial_port}.", fg="red")


def disconnect(self) -> None:
try:
Expand All @@ -245,6 +198,20 @@ def disconnect(self) -> None:
pass


def send_midi_message(self, message):
"""Sends a mido MIDI message via the connected serial port."""
if self.serial is not None:
self.serial.write(message.bin())


def send_midi_note_offs(self):
"""Sends note offs on any MIDI channels that have been used for notes."""
note_off_messages = get_midi_note_offs(self.midi_output_mapping, self.last_midi_notes)
for msg in note_off_messages:
self.send_midi_message(msg)



class WebSocketServer(IOServer):
"""Handles Websocket Serving for IMPSY"""

Expand Down Expand Up @@ -449,19 +416,6 @@ def match_midi_port_to_list(port, port_list):
else:
return contains_list[0]

# def open_raspberry_serial():
# """Tries to open a serial port for MIDI IO on Raspberry Pi."""
# try:
# click.secho(
# "Trying to open Raspberry Pi serial port for MIDI in/out.", fg="yellow"
# )
# ser = serial.Serial("/dev/ttyAMA0", baudrate=31250)
# except:
# ser = None
# click.secho(
# "Could not open serial port, might be in development mode.", fg="red"
# )
# return ser

def __init__(self, config, callback, dense_callback) -> None:
super().__init__(config, callback, dense_callback)
Expand All @@ -470,6 +424,8 @@ def __init__(self, config, callback, dense_callback) -> None:
] # retrieve dimension from the config file.
self.verbose = self.config["verbose"]
self.last_midi_notes = {} # dict to store last played notes via midi
self.midi_output_mapping = self.config["midi"]["output"]
self.midi_input_mapping = self.config["midi"]["input"]
# self.websocket_send_midi = None # TODO implement some kind generic MIDI callback for other output channels.

def connect(self) -> None:
Expand Down Expand Up @@ -511,6 +467,7 @@ def disconnect(self) -> None:
except:
pass


def send_midi_message(self, message):
"""Send a MIDI message across all required outputs"""
if self.midi_out_port is not None:
Expand All @@ -521,8 +478,7 @@ def send_midi_message(self, message):

def send_midi_note_offs(self):
"""Sends note offs on any MIDI channels that have been used for notes."""
midi_output_mapping = self.config["midi"]["output"]
note_off_messages = get_midi_note_offs(midi_output_mapping, self.last_midi_notes)
note_off_messages = get_midi_note_offs(self.midi_output_mapping, self.last_midi_notes)
for msg in note_off_messages:
self.send_midi_message(msg)

Expand All @@ -532,30 +488,18 @@ def send(self, output_values) -> None:
assert (
len(output_values) + 1 == self.dimension
), "Dimension not same as prediction size." # Todo more useful error.
start_time = datetime.datetime.now()

midi_output_mapping = self.config["midi"]["output"]
output_midi_messages = output_values_to_midi_messages(output_values, midi_output_mapping)
output_midi_messages = output_values_to_midi_messages(output_values, self.midi_output_mapping)
for msg in output_midi_messages:
# do the sending
# send note off if a previous note_on had been sent
if msg.type == 'note_on' and msg.channel in self.last_midi_notes:
note_off_msg = mido.Message("note_off", channel = msg.channel, note=self.last_midi_notes[msg.channel], velocity=0)
self.send_midi_message(note_off_msg)
# actually send the message.
self.send_midi_message(msg)
# store last midi note if it was a note_on.
if msg.type == 'note_on':
# check if sent previous note_on
if msg.channel in self.last_midi_notes:
# send a note off.
note_off_msg = mido.Message("note_off", channel = msg.channel, note=self.last_midi_notes[msg.channel], velocity=0)
self.send_midi_message(note_off_msg)
self.send_midi_message(msg)
self.last_midi_notes[msg.channel] = msg.note
else:
# just send other messages, including cc
self.send_midi_message(msg)

duration_time = (datetime.datetime.now() - start_time).total_seconds()
if duration_time > 0.02:
click.secho(
f"Sound command sending took a long time: {(duration_time):.3f}s",
fg="red",
)

def handle(self) -> None:
"""Handle MIDI input messages that might come from mido"""
Expand Down
18 changes: 9 additions & 9 deletions impsy/tests/test_impsio.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,19 @@ def test_serial_server(default_config, sparse_callback, dense_callback, output_v
default_config, sparse_callback, dense_callback
)
sender.connect()
# sender.handle()
sender.handle()
sender.send(output_values)
sender.disconnect()


# def test_serial_midi_server(default_config, sparse_callback, dense_callback, output_values):
# sender = impsio.SerialMIDIServer(
# default_config, sparse_callback, dense_callback
# )
# sender.connect()
# # sender.handle()
# sender.send(output_values)
# sender.disconnect()
def test_serial_midi_server(default_config, sparse_callback, dense_callback, output_values):
sender = impsio.SerialMIDIServer(
default_config, sparse_callback, dense_callback
)
sender.connect()
sender.handle()
sender.send(output_values)
sender.disconnect()


def test_midi_server(default_config, sparse_callback, dense_callback, output_values):
Expand Down

0 comments on commit 6aa1d9d

Please sign in to comment.