Source code for pylab_ml.thermostreamer.base_thermostreamer

"""
Basic Thermostreamer class.

:Date: |today|
:Author: Semi-ATE <info@Semi-ATE.org>
"""

from pylab_ml.base_instrument import logger
from pylab_ml.collate_instrument import Interface
from pylab_ml.base_instrument import Instrument


[docs] class Base_Thermostreamer(Instrument): """ Interface to the thermostreamer. :Date: |today| :Author: Semi-ATE <info@Semi-ATE.org> The thermostreamer baseclass can connect to thermostreamer Initialization arguments: addr (int): Interface address interface (Interface): GPIB, USB backend (str): VISA backend is either '@ni' for NI-Library or '@py' for pure python pyvisa-py backend. On default it uses '@ni' on win32 and '@py' on other platforms. Example: Initialization >>> instrument = Base_Thermostreamer(addr=24) # GPIB or USB address """ interchoices = [Interface.usbserial, Interface.gpib]
[docs] def __init__(self, **kwargs): """Initialise the Thermostreamer.""" self.is_local = False super().__init__(**kwargs) logger.debug("Class {}".format(self.__class__.__name__)) self.com._init(self) if self.inst is None: kwargs['message'] = 'Use Dummy Thermostreamer instead:' self.inst = Dummy(self, **kwargs) self.setup_inst()
[docs] def write(self, msg): """Write the instance with msg.""" self.inst.write(msg)
[docs] def read(self): """Read the instance.""" return self.inst.read()
def _get_ack(self): return # MPI_TA5000 sendet kein Acknowledge!! ret = self.inst.read() if ret != '@@@OK': raise ValueError("No 'OK' in return value after reset: {}".format(ret))
[docs] def reset(self): """Reset the instrument.""" self.budget.set_slack(self) self.inst.write('*RST') self._get_ack() self.local() self._setpoint = None
[docs] def clear(self): """Clear error status.""" self.budget.set_slack(self) self.inst.clear()
[docs] def close(self): """Close connection to Thermostreamer.""" if self.inst is not None: self.flow = 0 self.inst.flush(2) logger.info('{} close interface to {}'.format(self.instName, self.id)) super().close()
[docs] def error_list(self): """List of outstanding errors.""" self.budget.set_slack(self) errormsgs = self.inst.query(':SYST:ERR:ALL?') errors = errormsgs.split(",")[1::2] codes = errormsgs.split(",")[0::2] errorlist = [] for c, m in zip(codes, errors): logger.info("{} : {}".format(c, m)) errorlist.append((c, m)) return errorlist
[docs] def message(self, message=None): """Message display to Python console.""" if message is not None: logger.info(message)
@property def id(self): """Query IDN.""" self.budget.set_slack(self) try: value = self.inst.query('*IDN?') except Exception: value = "" return value.replace('\r', '').replace('\n', '')
[docs] def local(self): """Switch back to local instrument control.""" self.budget.set_slack(self) self.inst.write('%GL') # go to local button self._get_ack() self.is_local = True
[docs] class Dummy(object): """ Dummy object for the Thermostreamer. :Date: |today| :Author: Semi-ATE <info@Semi-ATE.org> Usable if you have no real Thermostreamer as instance """
[docs] def __init__(self, parent, **kwargs): """Initialise the Dummy Thermostreamer.""" # kwargs = {"addr": addr, "interface": interface, "backend": backend, "identify": identify, "instName": instName} if 'message' in kwargs: logger.error(kwargs['message']) else: logger.warning('Use Dummy Thermostreamer') self.parent = parent self._lastcmd = '' self.flow = 0 self.head = 0 self.HDLK = '0' self.HEAD = '1' self.TEMP = 25 # default is RT = 25
[docs] def query(self, cmd): """ Query the Dummy Thermostreamer with cmd. Parameters ---------- cmd : str The command to query. Returns ------- value : int or str The value returned by the Dummy Thermostreamer. """ if cmd == '*IDN?': return f'{self.__class__}\r' cmd = cmd[:cmd.find('?')] try: value = super(__class__, self).__getattribute__(cmd) try: value = int(value) except Exception: pass logger.debug(f'Dummy Thermostreamer query {cmd} == {value}') except Exception: value = 0xdeadbeef logger.debug(f'Dummy Thermostreamer query {cmd} == {hex(value)}') return value
[docs] def write(self, cmd): """ Write the Dummy Thermostreamer with cmd. Parameters ---------- cmd : str The command to write. Returns ------- None """ self._lastcmd = cmd[:cmd.find('?')] if cmd.find('?') > -1 else '' cmd = cmd.split(' ') if len(cmd) > 1: object.__setattr__(self, cmd[0], cmd[1]) if cmd[0] == 'SETP': self.TEMP = cmd[1] logger.debug(f'Dummy Thermostreamer write {cmd}')
[docs] def read(self): """Read from the Dummy Thermostreamer.""" # value = self.NaN value = 0xdeadbeef if self._lastcmd in ('SOAK', 'FLWM', 'DSNS'): value = 1 elif self._lastcmd in ('TECR'): value = '1' elif self._lastcmd in ('SETP', 'FLOW', 'HEAD', 'TEMP'): value = super(__class__, self).__getattribute__(self._lastcmd) logger.debug(f'Dummy Thermostreamer read 0x{int(value):0x}') return value
def flush(self, arg=None): pass