"""
Basic Thermostreamer class.
:Date: |today|
:Author: Semi-ATE <info@Semi-ATE.org>
"""
from pylab_ml.base_instrument import logger
from pylab_ml.collate_instrument import Interface
from pylab_ml.base_instrument import Instrument
[docs]
class Base_Thermostreamer(Instrument):
"""
Interface to the thermostreamer.
:Date: |today|
:Author: Semi-ATE <info@Semi-ATE.org>
The thermostreamer baseclass can connect to thermostreamer
Initialization arguments:
addr (int):
Interface address
interface (Interface):
GPIB, USB
backend (str):
VISA backend is either '@ni' for NI-Library or
'@py' for pure python pyvisa-py backend.
On default it uses '@ni' on win32 and '@py' on
other platforms.
Example: Initialization
>>> instrument = Base_Thermostreamer(addr=24) # GPIB or USB address
"""
interchoices = [Interface.usbserial, Interface.gpib]
[docs]
def __init__(self, **kwargs):
"""Initialise the Thermostreamer."""
self.is_local = False
super().__init__(**kwargs)
logger.debug("Class {}".format(self.__class__.__name__))
self.com._init(self)
if self.inst is None:
kwargs['message'] = 'Use Dummy Thermostreamer instead:'
self.inst = Dummy(self, **kwargs)
self.setup_inst()
[docs]
def write(self, msg):
"""Write the instance with msg."""
self.inst.write(msg)
[docs]
def read(self):
"""Read the instance."""
return self.inst.read()
def _get_ack(self):
return # MPI_TA5000 sendet kein Acknowledge!!
ret = self.inst.read()
if ret != '@@@OK':
raise ValueError("No 'OK' in return value after reset: {}".format(ret))
[docs]
def reset(self):
"""Reset the instrument."""
self.budget.set_slack(self)
self.inst.write('*RST')
self._get_ack()
self.local()
self._setpoint = None
[docs]
def clear(self):
"""Clear error status."""
self.budget.set_slack(self)
self.inst.clear()
[docs]
def close(self):
"""Close connection to Thermostreamer."""
if self.inst is not None:
self.flow = 0
self.inst.flush(2)
logger.info('{} close interface to {}'.format(self.instName, self.id))
super().close()
[docs]
def error_list(self):
"""List of outstanding errors."""
self.budget.set_slack(self)
errormsgs = self.inst.query(':SYST:ERR:ALL?')
errors = errormsgs.split(",")[1::2]
codes = errormsgs.split(",")[0::2]
errorlist = []
for c, m in zip(codes, errors):
logger.info("{} : {}".format(c, m))
errorlist.append((c, m))
return errorlist
[docs]
def message(self, message=None):
"""Message display to Python console."""
if message is not None:
logger.info(message)
@property
def id(self):
"""Query IDN."""
self.budget.set_slack(self)
try:
value = self.inst.query('*IDN?')
except Exception:
value = ""
return value.replace('\r', '').replace('\n', '')
[docs]
def local(self):
"""Switch back to local instrument control."""
self.budget.set_slack(self)
self.inst.write('%GL') # go to local button
self._get_ack()
self.is_local = True
[docs]
class Dummy(object):
"""
Dummy object for the Thermostreamer.
:Date: |today|
:Author: Semi-ATE <info@Semi-ATE.org>
Usable if you have no real Thermostreamer as instance
"""
[docs]
def __init__(self, parent, **kwargs):
"""Initialise the Dummy Thermostreamer."""
# kwargs = {"addr": addr, "interface": interface, "backend": backend, "identify": identify, "instName": instName}
if 'message' in kwargs:
logger.error(kwargs['message'])
else:
logger.warning('Use Dummy Thermostreamer')
self.parent = parent
self._lastcmd = ''
self.flow = 0
self.head = 0
self.HDLK = '0'
self.HEAD = '1'
self.TEMP = 25 # default is RT = 25
[docs]
def query(self, cmd):
"""
Query the Dummy Thermostreamer with cmd.
Parameters
----------
cmd : str
The command to query.
Returns
-------
value : int or str
The value returned by the Dummy Thermostreamer.
"""
if cmd == '*IDN?':
return f'{self.__class__}\r'
cmd = cmd[:cmd.find('?')]
try:
value = super(__class__, self).__getattribute__(cmd)
try:
value = int(value)
except Exception:
pass
logger.debug(f'Dummy Thermostreamer query {cmd} == {value}')
except Exception:
value = 0xdeadbeef
logger.debug(f'Dummy Thermostreamer query {cmd} == {hex(value)}')
return value
[docs]
def write(self, cmd):
"""
Write the Dummy Thermostreamer with cmd.
Parameters
----------
cmd : str
The command to write.
Returns
-------
None
"""
self._lastcmd = cmd[:cmd.find('?')] if cmd.find('?') > -1 else ''
cmd = cmd.split(' ')
if len(cmd) > 1:
object.__setattr__(self, cmd[0], cmd[1])
if cmd[0] == 'SETP':
self.TEMP = cmd[1]
logger.debug(f'Dummy Thermostreamer write {cmd}')
[docs]
def read(self):
"""Read from the Dummy Thermostreamer."""
# value = self.NaN
value = 0xdeadbeef
if self._lastcmd in ('SOAK', 'FLWM', 'DSNS'):
value = 1
elif self._lastcmd in ('TECR'):
value = '1'
elif self._lastcmd in ('SETP', 'FLOW', 'HEAD', 'TEMP'):
value = super(__class__, self).__getattribute__(self._lastcmd)
logger.debug(f'Dummy Thermostreamer read 0x{int(value):0x}')
return value
def flush(self, arg=None):
pass