Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flowmeter #197

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 67 additions & 132 deletions basil/HL/bronkhorst_elflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import logging
import struct
import time

from basil.HL.RegisterHardwareLayer import HardwareLayer

Expand All @@ -16,160 +17,94 @@

class Bronkhorst_ELFLOW(HardwareLayer):
''' Bronkhorst ELFLOW
Manual can be found here:
https://www.bronkhorst.com/getmedia/77a1438f-e547-4a79-95ad-53e81fd38a97/917027-Manual-RS232-interface.pdf
'''

CMDS = {
'get_measure_flow': ':06800401210120',
'get_capacity': ':068004014D014D',
'get_control_mode': ':06800401040104',
'set_control_mode': ':0580010104',
'set_setpoint': ':0680010121',
'get_setpoint': ':06800401210121',
}

def __init__(self, intf, conf):
self.debug = 0
self.node = "80"
super(Bronkhorst_ELFLOW, self).__init__(intf, conf)
self.pre_time = time.time()

def init(self):
super(Bronkhorst_ELFLOW, self).init()

def write(self, cmd):
cmd_s = ""
for c in cmd:
cmd_s = cmd_s + "%02X" % c
cmd_s = ":%02X%s%s" % (len(cmd) + 1, self.node, cmd_s)
if self.debug != 0:
logger.debug("ELFLOW.write() %s" % str(cmd_s))
self._intf.write(cmd_s)
if time.time() - self.pre_time < 1.0:
time.sleep(1.0)
self._intf.write(str(cmd))
self.pre_time = time.time()

def read(self):
ret_s = self._intf.read()
if self.debug != 0:
logger.debug("ELFLOW.read() %s" % str(ret_s))
if len(ret_s) < 5 or ret_s[0] != ":" or ret_s[3:5] != self.node:
logger.debug("ELFLOW.read() format error ret=%s" % str(ret_s))
return []
ret_len = int(ret_s[1:3])
if ret_len * 2 != len(ret_s[3:-2]):
logger.debug("ELFLOW.read() data lenth error ret=%s" % str(ret_s))
return []
ret = []
for i in range(ret_len - 1):
ret.append(int(ret_s[5 + 2 * i:5 + 2 * (i + 1)], 16))
return ret
ret = self._intf.read()
if len(ret) < 2 or ret[-2:] != "\r\n":
logger.warning("read() termination error")
return ret.strip()

def set_setpoint(self, value):
cmd = [1, 1, 0x21, (value >> 8) & 0xFF, value & 0xFF]
self.write(cmd)
"""value range from 0 - 32000
"""

if not isinstance(value, int):
raise ValueError(f"Given value has to be of type integer, is {type(value)}!")

hex_val = hex(value)[2:] # [2:] to remove the 0x from the beginning of the hex number
command = f"{self.CMDS['set_setpoint']}" + f"{hex_val.zfill(2)}" # hex should have at least two digits
self._intf.write(command)
ret = self.read()
if len(ret) != 3:
logger.debug("ELFLOW.set_setpoint() data lenth error ret=%s" % str(ret))
return -1
elif ret[0] == 0 and ret[1] == 0 and ret[2] == 5:
return 0
else:
logger.debug("ELFLOW.set_setpoint() ret error ret=%s" % str(ret))
return -1
return ret

def get_setpoint(self):
cmd = [4, 1, 0x21, 1, 0x21]
self.write(cmd)
self._intf.write(self.CMDS['get_setpoint'])
ret = self.read()
if len(ret) != 5:
logger.debug("ELFLOW.set_setpoint() data lenth error ret=%s" % str(ret))
return -1
elif ret[0] == 2 and ret[1] == cmd[1] and ret[2] == cmd[2]:
return ((ret[3] << 8) & 0xFF00) | (ret[4] & 0xFF)
else:
logger.debug("ELFLOW.set_setpoint() ret error ret=%s" % str(ret))
return -1

def set_control_mode(self, value):
answer_in_hex = ret[11:] # read from the 11th digits to translate what point is set
answer = int(answer_in_hex, 16)
return answer

def set_mode(self, value):
""" 0 setpoint source RS232
3 valve close
4 freeze valuve out
4 freeze valve out
8 valve fully open
20 valve steering (valve=setpoint)"""
cmd = [1, 1, 4, value & 0xFF]
self.write(cmd)
ret = self.read()
if len(ret) != 3:
logger.debug("ELFLOW.set_setpoint() data lenth error ret=%s" % str(ret))
return -1
elif ret[0] == 0 and ret[1] == 0 and ret[2] == 4:
return 0
else:
logger.debug("ELFLOW.set_setpoint() ret error ret=%s" % str(ret))
return -1

def get_control_mode(self):
cmd = [4, 1, 1, 1, 4]
self.write(cmd)
ret = self.read()
if len(ret) != 4:
logger.debug("ELFLOW.set_setpoint() data lenth error ret=%s" % str(ret))
return -1
elif ret[0] == 2 and ret[1] == cmd[1] and ret[2] == cmd[2]:
return ret[3]
else:
logger.debug("ELFLOW.set_setpoint() ret error ret=%s" % str(ret))
return -1

def set_valve_output(self, value):
cmd = [1, 114, 0x41, (value >> 24) & 0xFF, (value >> 16) & 0xFF, (value >> 8) & 0xFF, value & 0xFF]
self.write(cmd)
20 valve steering """
hex_val = hex(value)[2:] # [2:] to remove the 0x from the beginning of the hex number
command = f"{self.CMDS['set_control_mode']}" + f"{hex_val.zfill(2)}" # hex should have at least two digits
self._intf.write(command)
ret = self.read()
if len(ret) != 3:
logger.debug("ELFLOW.set_valve_output() data lenth error ret=%s" % str(ret))
return -1
elif ret[0] == 0 and ret[1] == 0 and ret[2] == 7:
return 0
else:
logger.debug("ELFLOW.set_valve_output() ret error ret=%s" % str(ret))
return -1

def get_valve_output(self):
cmd = [4, 114, 0x41, 114, 0x41]
self.write(cmd)
ret = self.read()
if len(ret) != 7:
logger.debug("ELFLOW.set_setpoint() data lenth error ret=%s" % str(ret))
return -1
elif ret[0] == 2 and ret[1] == cmd[1] and ret[2] == cmd[2]:
return ((ret[3] << 24) & 0xFF000000) | ((ret[4] << 16) & 0xFF0000) | ((ret[5] << 8) & 0xFF00) | (ret[6] & 0xFF)
else:
logger.debug("ELFLOW.get_valve_output() ret error ret=%s" % str(ret))
return -1

def set_controller_speed(self, value):
value = struct.unpack('<I', struct.pack('<f', value))[0]
cmd = [1, 114, 0x40 + 30, (value >> 24) & 0xFF, (value >> 16) & 0xFF, (value >> 8) & 0xFF, value & 0xFF]
self.write(cmd)
ret = self.read()
if len(ret) != 3:
logger.debug("ELFLOW.set_controller_speed() data lenth error ret=%s" % str(ret))
return -1
elif ret[0] == 0 and ret[1] == 0 and ret[2] == 7:
return 0
else:
logger.debug("ELFLOW.set_controller_speed() ret error ret=%s" % str(ret))
return -1

def get_controller_speed(self):
cmd = [4, 114, 0x41, 114, 0x40 + 30]
self.write(cmd)
return ret

def get_mode(self):
self._intf.write(self.CMDS['get_control_mode'])
ret = self.read()
if len(ret) != 7:
logger.debug("ELFLOW.set_setpoint() data lenth error ret=%s" % str(ret))
return -1
elif ret[0] == 2 and ret[1] == cmd[1] and ret[2] == cmd[2]:
return struct.unpack('!f', chr(ret[3]) + chr(ret[4]) + chr(ret[5]) + chr(ret[6]))[0]
else:
logger.debug("ELFLOW.get_valve_output() ret error ret=%s" % str(ret))
return -1

def get_measure(self):
cmd = [4, 1, 0x21, 1, 0x20]
self.write(cmd)
answer_in_hex = ret[11:] # read from the 11th digits to translate what mode is on
answer = int(answer_in_hex, 16)
return answer

def get_flow(self):
"""This should give the flow in l/min
"""

# first get the max capacity in %
self._intf.write(self.CMDS['get_capacity'])
ret = self.read()
if len(ret) != 5:
logger.debug("ELFLOW.set_setpoint() data lenth error ret=%s" % str(ret))
return -1
elif ret[0] == 2 and ret[1] == cmd[1] and ret[2] == cmd[2]:
return ((ret[3] << 8) & 0xFF00) | (ret[4] & 0xFF)
else:
logger.debug("ELFLOW.get_valve_output() ret error ret=%s" % str(ret))
return -1
answer_in_hex = ret[11:] # read from the 11th digits to translate what the capacity is
cap_100 = struct.unpack('!f', bytes.fromhex(answer_in_hex))[0]

# now measure the flow
self._intf.write(self.CMDS['get_measure_flow'])
ret1 = self.read()
answer_in_hex = ret1[11:]
answer = int(answer_in_hex, 16)

val = answer / 32000 * cap_100
return val
111 changes: 111 additions & 0 deletions basil/HL/julaboFP50.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#
# ------------------------------------------------------------
# Copyright (c) All rights reserved
# SiLab, Institute of Physics, University of Bonn
# ------------------------------------------------------------
#

"""
This script is used to communicate with the chiller julabo fp50
"""

import logging
import time

from basil.HL.HardwareLayer import HardwareLayer


logger = logging.getLogger(__name__)


class julaboFP50(HardwareLayer):
''' Driver for the Julabo FP50 chiller.
A simple protocol via crossed null modem serial port is used with baud rate of 9600.
All commands were taken from JulaboFP50 manual.
'''

CMDS = {'get_temp': 'in_sp_00',
'set_temp': 'out_sp_00',
'get_curr_temp': 'in_pv_00',
'get_version': 'version',
'get_status': 'status',
'start': 'out_mode_05 1',
'stop': 'out_mode_05 0',
'set_power': 'out_sp_06'
}

def __init__(self, intf, conf):
super(julaboFP50, self).__init__(intf, conf)
self.pre_time = time.time()

def init(self):
super(julaboFP50, self).init()

def read(self):
ret = self._intf.read()
if len(ret) < 2 or ret[-2:] != "\r\n":
logger.warning("read() termination error")
return ret[:-2]

def write(self, cmd):
if time.time() - self.pre_time < 1.0:
time.sleep(1.0)
self._intf.write(str(cmd))
self.pre_time = time.time()

def get_version(self):
''' Read identifier
'''
self.write(self.CMDS['get_version'])
ret = self.read()
return ret

def start_chiller(self):
''' Start chiller
'''
self.write(self.CMDS['start'])

def stop_chiller(self):
''' Stop chiller
'''
self.write(self.CMDS['stop'])

def get_status(self):
''' Get status
'''
self.write(self.CMDS['get_status'])
ret = self.read()
logger.debug("status:{:s}".format(ret))
try:
tmp = ret.split(" ", 1)
status = int(tmp[0])
status_str = tmp[1:]
except (ValueError, AttributeError):
logger.warning("get_status() wrong format: {}".format(repr(ret)))
status = -99
status_str = ret
return status, status_str

def get_set_temp(self):
'''get the set temperature
'''
self.write(self.CMDS['get_temp'])
ret = self.read()
return float(ret)

def set_temp(self, temp):
'''set the temperature
'''
self.write(f"{self.CMDS['set_temp']}={temp}")

def get_temp(self):
'''get the current temperature in chiller
'''
self.write(self.CMDS['get_curr_temp'])
ret = self.read()
return float(ret)

def set_power(self, variable):
'''Set the power for heater/cooler via serial interface (positive value for heating, negative value for cooling)
'''
self.write(f"{self.CMDS['set_power']}={variable}")
2 changes: 1 addition & 1 deletion examples/lab_devices/arduino_ntc_readout.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ transfer_layer:
- name : Serial
type : Serial
init :
port : /dev/ttyUSB1
port : /dev/ttyUSB2
baudrate : 115200
timeout: 2
read_termination: "\r\n" # Needs to be double-quoted string for YAML to parse this correctly
Expand Down
15 changes: 15 additions & 0 deletions examples/lab_devices/julaboFP50.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from basil.dut import Dut

dut = Dut('julaboFP50_pyserial.yaml')
dut.init()


# turn on:
# dut["chiller"].start_chiller()

# dut["chiller"].set_temp(15) # set temp

print("Status: {}".format(dut["chiller"].get_status()))

# turn off:
# dut["chiller"].stop_chiller()
21 changes: 21 additions & 0 deletions examples/lab_devices/julaboFP50_pyserial.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
transfer_layer:
- name : Serial
type : Serial
init :
port : /dev/ttyUSB0
read_termination : "\r\n"
write_termination : "\r\n"
baudrate : 9600
timeout : 5.0
parity : "N" ### serial.PARITY_NONE
xonxoff : True # software handshake on
rtscts : False
dsrdtr : False


hw_drivers:
- name : chiller
type : julaboFP50
interface : Serial
init:
device: julabo FP50