"""
This script contains the base classes for different types of instruments, such as LocalInstrument, PxieInstrument, NetworkInstrument, and GenericInstrument.
It also includes a TimeoutBudget class for managing timeouts across instrument transactions.
The Instrument class is an abstract base class that provides a basic set of methods and properties to be overridden by actual instrument implementations.
"""
import sys
import os
try:
import grp
except ImportError:
pass
import time
from abc import ABC, abstractmethod
from serial.tools import list_ports
import logging
from pylab_ml.common.singleton import Singleton
from pylab_ml.dummy import Dummy
from pylab_ml.ident import Ident
from pylab_ml.collate_instrument import Interface, DefInter, CollateInstrument
from pylab_ml.scope.lecroy.vicp import VICP
from pylab_ml.common.mqtt_client import mqtt_deviceattributes, mqtt_init
[docs]
def measure(self, message, *args, **kws):
"""
Custom logging level for measurements. Logs messages at the MEASURE level (15) if the logger is configured to handle that level.
Parameters
----------
message : str
The message to be logged.
*args : tuple
Additional arguments to be passed to the logger.
**kws : dict
Additional keyword arguments to be passed to the logger.
"""
if self.isEnabledFor(MEASURE_LEVEL_NUM):
# Yes, logger takes its '*args' as 'args'.
self._log(MEASURE_LEVEL_NUM, message, args, **kws)
[docs]
def choicelogger():
"""
Chooses the logger based on command line arguments. If '--labml' is found in the command line arguments,
it uses the specified logger; otherwise, it defaults to a standard logger.
Returns
-------
logging.Logger
The configured logger instance.
"""
for px in sys.argv:
if px == '--labml':
idx = sys.argv.index(px)
logger = sys.argv[idx+1]
return logger
return logging.getLogger(__name__)
MEASURE_LEVEL_NUM = 15
logging.addLevelName(MEASURE_LEVEL_NUM, "MEASURE")
logging.MEASURE = MEASURE_LEVEL_NUM
logging.Logger.measure = measure
logger = choicelogger()
mqttc = mqtt_init(typ='instrument', logger=logger) # TODO: wenn base_instrument importiert wird dann wird mqtt initialisiert!! das darf so nicht sein!!
# da dann der Typ dann nicht mehr geƤndert werden kann!!
_createDummyifInvalid = False
[docs]
def createDummyifInvalid(val):
"""
Global switch to create Dummy instance if error occured in instrument._init or instrument have no connection.
Parameters
----------
val : bool
True/False(default)
Returns
-------
None.
"""
global _createDummyifInvalid
_createDummyifInvalid = val
[docs]
def mqttclose():
"""Close the MQTT client connection."""
mqttc.close()
[docs]
def logsetup():
"""Set up the logger with console and file handlers, and configure the logging level."""
logger.setLevel(logging.DEBUG)
# remove existing handlers
for handler in logger.handlers:
print(handler)
logger.removeHandler(handler)
logger.handlers = []
# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# create file handler and set level to debug
fh = logging.FileHandler('instrument.log', 'w')
fh.setLevel(logging.MEASURE)
# create formatter
ch_formatter = logging.Formatter('%(levelname)s - %(message)s')
fh_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# add formatter to ch & fh
ch.setFormatter(ch_formatter)
fh.setFormatter(fh_formatter)
# add ch & fh to logger
logger.addHandler(ch)
logger.addHandler(fh)
try:
logger.ipython_shell = get_ipython().__class__.__name__
# ZMQInteractiveShell -- Jupyter notebook or qtconcole
# TerminalInteractiveShell -- Terminal running iPython
except NameError:
logger.ipython_shell = None
[docs]
class InvalidInstrumentCreateSessionFunction(Exception):
pass
[docs]
class InvalidInstrumentConnection(Exception):
pass
[docs]
class LocalInstrument(object, metaclass=Singleton):
"""
Class for local instruments that are connected via interfaces such as GPIB, USB, or serial.
This class handles the initialization and communication with the instrument using the PyVISA library.
"""
has_visa = False
try:
import pyvisa
has_visa = True
except ImportError:
logger.error("import pyvisa not found")
[docs]
def __init__(self):
logger.debug("Class {}".format(self.__class__.__name__))
if os.sys.platform != 'win32':
self.check_group_dialout()
def _init(self, instrument, identify=True):
"""
Initialize the local instrument by searching for its address, setting up the resource manager, and opening the connection.
Parameters
----------
instrument : Instrument
The instrument object to be initialized.
identify : bool, optional
If True, the instrument will be identified after initialization (default is True).
"""
instrument.addr = self.search4vidpid(instrument.addr, instrument.backend)
if instrument.init_implict or identify:
instrument = instrument.collation.add(instrument)
else:
instrumentItem = instrument.collation.get_instrument(instrument.interface, instrument.addr)
if instrumentItem is None:
logger.warning("No such {!r} instrument at {!r} to init".format(instrument.interface.name, instrument.addr))
instrument.inst = None
return
if instrumentItem.instance != instrument and instrument.addr is not None:
logger.warning("Cannot initialise {!r} {!r} instrument at {!r} already occupied by {!r} - {!r}"
.format(instrument.__class__.__name__, instrument.interface.name, instrument.addr,
instrumentItem.instance.__class__.__name__, instrumentItem.instance.instName))
instrument.inst = None
return
if instrument.addr is None:
logger.warning('Local instrument: no address specified, empty instrument')
instrument.inst = None
return
if not self.has_visa:
logger.error('local instrument: import pyvisa failed')
logger.error('please install pyvisa first')
logger.debug("Local instrument backend {}".format(instrument.backend))
rm = self.pyvisa.ResourceManager(instrument.backend)
logger.debug("Local instrument resource manager {}".format(rm))
instrument.resource_manager = rm
try:
self.rid = self.resource_id(instrument.interface, instrument.addr)
logger.debug("Local instrument resource Id {}".format(self.rid))
instrument.inst = rm.open_resource(self.rid)
except Exception as e:
logger.error(f'Exception in {instrument.instName}, could not open addr {instrument.addr}')
logger.error('local_instrument resource_manager.open_resource exception {}'.format(repr(e)))
if self.pyvisa.__version__ < '1.11.3':
logger.error('local instrument: wrong pyvisa version, need Version >= 1.11.3')
raise Exception
instrument.inst = None
logger.error('could not open resource {}'.format(instrument.addr))
logger.error('perhaps disabled or in use?')
if instrument.interface == Interface.usbserial:
print('found following devices:')
self.serial_list(instrument.backend)
if os.sys.platform == 'win32' and hasattr(logger, 'ipython_shell') and logger.ipython_shell is None:
input('press any key to continue ...')
instrument.mqtt_add(mqttc, instrument)
if instrument.inst:
if instrument.debugInst:
return
instrument.setup_inst()
iid = instrument.reset()
instrument.identify()
if instrument.init_implict:
logger.debug("{} has implicit instrument init()".format(instrument.__class__.__name__))
elif identify:
logger.info("{} need to call instrument init()".format(instrument.__class__.__name__))
if iid is None or iid != -1:
try:
iid = instrument.id
except Exception:
iid = None
if iid is not None and iid != "" and iid != -1:
instrument.is_inited = instrument.init_implict or not identify
instrument.collation.identification(
instrument.interface, instrument.addr,
identity=iid, instance=instrument, initialized=instrument.is_inited
)
if not instrument.init_implict and identify:
logger.info("Validate identity {ident} display on '{iid}', then init()".format(ident=instrument, iid=iid))
instrument.close()
else:
logger.info("Device {!r} via {!r}".format(instrument, instrument.inst))
instrument.message()
if iid is None or iid == "" or iid == -1 or iid == '-1':
msg = "Invalid instrument connection {ident}, no identification id".format(ident=instrument)
if instrument.interface == Interface.usbserial:
msg = msg+'\nplease check the serial parameter on the device: Baudrate, Data bits, Parity, Terminator'
if instrument.debug:
logger.error(msg)
elif not _createDummyifInvalid:
raise InvalidInstrumentConnection(msg)
elif _createDummyifInvalid:
instrument.inst = Dummy(instrument, logger)
if instrument.init_implict:
instrument.mqtt_add(mqttc, instrument)
[docs]
def resource_id(self, interface, address):
"""
Generate the resource ID for the instrument based on its interface and address.
Parameters
----------
interface : Interface
The interface type of the instrument (e.g., GPIB, USB, serial).
address : str or int
The address of the instrument.
Returns
-------
id_name : str
The resource ID string for the instrument.
"""
id_name = "UNDEFINED_INTERFACE"
if os.sys.platform == 'win32':
if interface == Interface.gpib and str(address).find('.') < -1:
id_name = 'GPIB0::{}::INSTR'.format(address)
elif interface == Interface.gpib and str(address).find('.') > -1:
id_name = 'TCPIP::{}::INSTR'.format(address)
elif interface == Interface.usbserial:
if isinstance(address, (str)) and address.find('COM') > -1: # pyvisa.__version__> 1.11 needs no 'COM' in the adress
address = address[3:]
id_name = 'ASRL{}::INSTR'.format(address)
else:
if interface == Interface.gpib:
id_name = 'GPIB0::{}::INSTR'.format(address)
logger.error("No GPIB support on UNIX")
elif interface == Interface.usbserial:
if isinstance(address, (str)) and address.find("/") == 0: # address is already an unix device
id_name = 'ASRL{}::INSTR'.format(address)
else:
id_name = 'ASRL/dev/ttyUSB{!r}::INSTR'.format(address)
return id_name
[docs]
def check_group_dialout(self):
"""
Check if the user is in the 'dialout' group on UNIX systems, which is required for accessing serial ports.
Returns
-------
bool
True if the user is in the 'dialout' group or if the platform is Windows, False otherwise.
"""
if os.sys.platform == 'win32':
return True
if "dialout" not in [grp.getgrgid(g).gr_name for g in os.getgroups()]:
logger.error("User not in group dialout")
return False
else:
return True
# def backend_windows_usbserial(self, libpath="Z:/pytestsharing/ms_win/libusb/MS64/dll/libusb-1.0.dll"):
# import usb.backend.libusb1
# print("Backend %s" % libpath)
# backend = usb.backend.libusb1.get_backend(find_library=lambda x: libpath)
# dev = usb.core.find(backend=backend)
[docs]
def list_com(self):
"""List all COM ports available on the system."""
ports = list(list_ports.comports())
for p, d, a in ports:
print("{} {!r} {!r}".format(p, d, a))
[docs]
def list_usb_serial_ports(self):
"""List all USB serial ports available on the system."""
ports = list_ports.comports()
for port in ports:
print(port)
[docs]
def search4vidpid(self, addr, backend=None):
"""
Search for VID:PID in the addr.
Parameters
----------
addr : str or int
The address of the device in the format 'VID:PID' (e.g., '16C0:0483').
backend : str, optional
The backend to use for the search.
Returns
-------
str or int
The device name or address if found, otherwise None.
"""
if isinstance(addr, int) and os.sys.platform == 'win32':
# return 'COM' + str(addr)
return addr
elif addr is None or isinstance(addr, int) or addr.find(':') != 4:
return addr # addr is not vid:pid
try:
vid = int(addr.split(':')[0], 16)
pid = int(addr.split(':')[1], 16)
except Exception:
return addr
result = None
ports = []
for port in list_ports.comports():
if port.vid is not None and port.vid == vid and port.pid == pid:
if result is not None:
ports.append(result)
result = port.device
if backend is not None:
list_resources_info = self.pyvisa.ResourceManager(backend).list_resources_info()
for port in list_resources_info:
if list_resources_info[port].alias == result:
result = list_resources_info[port].interface_board_number
elif result is not None and DefInter().check4nidriver():
result = result[3:]
if result is None:
logger.error("couldn't find device with vid:pid = {}".format(addr))
print('found following devices:')
self.serial_list(backend)
if len(ports) > 0:
logger.error('found more than 1 device with vid:pid = {}'.format(addr))
for port in ports:
logger.error(' Port: {}'.format(port))
logger.error(' Port: {}'.format(result))
result = None
return result
[docs]
def serial_list(self, backend=None):
"""
List all devices from serial-port (USB-Ports).
Parameters
----------
backend : str, optional
The backend to use for the search. If None, the default backend is used.
Can be '@py' or '@ni' to also print ASRL::INSTR.
"""
ports = list(list_ports.comports())
dl = len("Port")
ml = len("Manufacturer")
il = len("USB info")
for i in range(0, len(ports)):
dl = max(dl, len(ports[i].device))
if ports[i].manufacturer is not None:
ml = max(ml, len(ports[i].manufacturer))
if ports[i].usb_info() is not None:
il = max(il, len(ports[i].usb_info()))
if backend is not None and os.sys.platform == 'win32':
vports = self.pyvisa.ResourceManager(backend).list_resources_info()
vl = len("Visa")
for port in vports:
vl = max(vl, len(port))
print('{:{dl}} {:{vl}} {:{ml}} {:{il}}'.format("Port", "Visa", "Manufacturer", "USB info", dl=dl, vl=vl, ml=ml, il=il))
print('-----------------------------------------------------------------')
for i in range(0, len(ports)):
asrl = "?????::?????"
addr = "??"
for port in vports:
device = ports[i].device
if device.find('COM') > -1:
device = int(device[3:])
if (vports[port].alias is not None and vports[port].alias == ports[i].device) or vports[port].interface_board_number == device:
asrl = port
addr = asrl[4:asrl.find(':')]
print('{!s:{dl}} {:{vl}} {!r:{ml}} {!r:{il}}'.format(addr, asrl, ports[i].manufacturer, ports[i].usb_info(), dl=dl, vl=vl, ml=ml, il=il))
else:
print('{:{dl}} {:{ml}} {:{il}}'.format("Port", "Manufacturer", "USB info", dl=dl, ml=ml, il=il))
print('-----------------------------------------------------')
for i in range(0, len(ports)):
print('{!s:{dl}} {!r:{ml}} {!r:{il}}'.format(ports[i].device, ports[i].manufacturer, ports[i].usb_info(), dl=dl, ml=ml, il=il))
[docs]
def idtry(self, instrument):
"""
Attempt to fix VISA termination characters to Query IDN.
Parameters
----------
instrument : Instrument
The instrument object for which to attempt the IDN query.
Returns
-------
str
The IDN string returned by the instrument, with termination characters removed.
"""
import pyvisa.errors
# budget.set_slack(self)
try:
value = instrument.query('*IDN?')
except pyvisa.errors.VisaIOError as x:
logger.warning("Issue with first {} id request, adjusting termination characters, was {!r},{!r}"
.format(instrument.__class__.__name__, str(instrument.write_termination), str(instrument.read_termination)))
if instrument.read_termination == '\n':
instrument.read_termination = '\r'
else:
instrument.read_termination = '\n'
if instrument.write_termination == '\n':
instrument.write_termination = '\r'
else:
instrument.write_termination = '\n'
try:
value = instrument.query('*IDN?')
logger.info("Termination adjustment required for {} id request, is {!r},{!r}"
.format(self.__class__.__name__, str(instrument.write_termination), str(instrument.read_termination)))
except Exception:
value = ""
except Exception:
value = ""
return value.replace('\r', '').replace('\n', '')
def __del__(self):
self.close()
[docs]
class PxieInstrument(object, metaclass=Singleton):
"""
Class for PXIe instruments that are connected via the NI-VISA backend.
This class handles the initialization and communication with the instrument using the NI-VISA library.
"""
[docs]
def __init__(self):
logger.debug("Class {}".format(self.__class__.__name__))
def _init(self, instrument, identify=True):
"""
Initialize the PXIe instrument by searching for its address, setting up the resource manager, and opening the connection.
Parameters
----------
instrument : Instrument
The instrument object to initialize.
identify : bool, optional
Whether to identify the instrument during initialization. Default is True.
"""
if instrument.init_implict or identify:
instrument = instrument.collation.add(instrument)
else:
instrumentItem = instrument.collation.get_instrument(instrument.interface, instrument.addr)
if instrumentItem is None:
logger.warning("No such {!r} instrument at {!r} to init".format(instrument.interface.name, instrument.addr))
instrument.inst = None
return
if instrumentItem.instance != instrument and instrument.addr is not None:
logger.warning("Cannot initialise {!r} {!r} instrument at {!r} already occupied by {!r} - {!r}".
format(instrument.__class__.__name__, instrument.interface.name, instrument.addr,
instrumentItem.instance.__class__.__name__, instrumentItem.instance.instName))
instrument.inst = None
return
if instrument.addr is None:
logger.warning('PXIe instrument: no address slot specified, empty instrument')
instrument.inst = None
return
try:
if instrument.channels is None:
instrument.inst = instrument.backend.Session(resource_name=instrument.addr)
else:
instrument.inst = instrument.backend.Session(resource_name=instrument.addr, channels=str(instrument.channels))
except Exception:
instrument.inst = None
logger.error('could not open resource {} for {}'.format(instrument.addr, instrument))
logger.error(sys.exc_info())
if os.sys.platform == 'win32' and hasattr(logger, 'ipython_shell') and logger.ipython_shell is None:
input('press any key to continue ...')
raise InvalidInstrumentConnection("Invalid instrument connection {ident}".format(ident=instrument))
if instrument.inst:
instrument.setup_inst()
instrument.reset()
instrument.identify()
if instrument.init_implict:
logger.debug("{} has implicit instrument init()".format(instrument.__class__.__name__))
instrument.is_inited = True
elif identify:
logger.info("{} need to call instrument init()".format(instrument.__class__.__name__))
if hasattr(instrument, "id"):
instrument.is_inited = instrument.init_implict or not identify
iid = instrument.id
instrument.collation.identification(
instrument.interface, instrument.addr,
identity=iid, instance=instrument, initialized=instrument.is_inited
)
if iid == "":
raise InvalidInstrumentConnection("Invalid instrument connection {ident}, no identification id".format(ident=instrument))
if not instrument.init_implict and identify:
logger.info("Validate identity {ident} display on '{iid}', then init()".format(ident=instrument, iid=iid))
instrument.close()
else:
logger.info("Device {!r} via {!r}".format(instrument, instrument.inst))
instrument.message()
if instrument.init_implict:
instrument.mqtt_add(mqttc, instrument)
[docs]
class NetworkInstrument(object, metaclass=Singleton):
"""
Class for network instruments that are connected via the TCP/IP interface.
This class handles the initialization and communication with the instrument using the VICP protocol.
"""
[docs]
def __init__(self):
logger.debug("Class {}".format(self.__class__.__name__))
def _init(self, instrument, identify=True):
"""
Initialize the network instrument by searching for its address, setting up the connection, and identifying the instrument.
Parameters
----------
instrument : Instrument
The instrument object to initialize.
identify : bool, optional
Whether to identify the instrument during initialization. Default is True.
"""
if instrument.init_implict or identify:
if instrument.addr and not instrument.hostname:
instrument.hostname = instrument.addr
elif not instrument.addr and instrument.hostname:
instrument.addr = instrument.hostname
if instrument.port is None:
instrument.port = 1861
instrument = instrument.collation.add(instrument)
else:
instrumentItem = instrument.collation.get_instrument(instrument.interface, instrument.addr)
if instrumentItem is None:
logger.error("No such {!r} instrument at {!r} to init".format(instrument.interface, instrument.addr))
instrument.inst = None
return
if instrumentItem.instance != instrument and instrument.addr is not None:
logger.error("Cannot initialise {!r} {!r} instrument at {!r} already occupied by {!r} - {!r}".
format(instrument.__class__.__name__, instrument.interface.name, instrument.addr,
instrumentItem.instance.__class__.__name__, instrumentItem.instance.instName))
instrument.inst = None
return
if instrument.addr is None:
logger.warning('local instrument: no address specified, empty instrument')
instrument.inst = None
return
try:
if not hasattr(instrument, "create_session"):
instrument.inst = VICP(addr=instrument.addr, port=instrument.port, debug=instrument.debug)
else:
instrument.inst = instrument.create_session()
except Exception:
instrument.inst = None
logger.error('could not open resource {} for {}'.format(instrument.addr, instrument))
logger.error(sys.exc_info())
if os.sys.platform == 'win32' and logger.ipython_shell is None:
input('press any key to continue ...')
raise InvalidInstrumentConnection("Invalid instrument connection {ident}".format(ident=instrument))
if instrument.inst:
instrument.setup_inst()
instrument.reset()
instrument.identify()
if instrument.init_implict:
logger.debug("{} has implicit instrument init()".format(instrument.__class__.__name__))
instrument.is_inited = True
elif identify:
logger.info("{} need to call instrument init()".format(instrument.__class__.__name__))
if hasattr(instrument, "id"):
instrument.is_inited = instrument.init_implict or not identify
iid = instrument.id
instrument.collation.identification(
instrument.interface, instrument.addr,
identity=iid, instance=instrument, initialized=instrument.is_inited
)
if iid == "":
raise InvalidInstrumentConnection("Invalid instrument connection {ident}, no identification id".format(ident=instrument))
if not instrument.init_implict and identify:
logger.info("Validate identity {ident} display on '{iid}', then init()".format(ident=instrument, iid=iid))
instrument.close()
else:
logger.info("Device {!r} via {!r}".format(instrument, instrument.inst))
instrument.message()
if instrument.init_implict:
instrument.mqtt_add(mqttc, instrument)
[docs]
class GenericInstrument(object, metaclass=Singleton):
"""Class for generic instruments that do not fit into the local, PXIe, or network categories."""
[docs]
def __init__(self):
logger.debug("Class {}".format(self.__class__.__name__))
def _init(self, instrument, identify=True):
"""
Initialize the generic instrument by checking for the create_session function, adding it to the collation, and setting up the connection.
Parameters
----------
instrument : Instrument
The instrument object to initialize.
identify : bool, optional
Whether to identify the instrument during initialization. Default is True.
"""
if not hasattr(instrument, "create_session"):
logger.error("No 'create_session(instrument)' function defined in instrument {!r} instrument at {!r} to create interface inst of instrument".
format(instrument.__class__.__name__, instrument.addr))
raise InvalidInstrumentCreateSessionFunction("Invalid instrument create_session(instrument) function for {ident}".format(ident=instrument))
if instrument.init_implict or identify:
instrument = instrument.collation.add(instrument)
else:
instrumentItem = instrument.collation.get_instrument(instrument.interface, instrument.addr)
if instrumentItem is None:
logger.warning("No such {!r} instrument at {!r} to init".format(instrument.interface.name, instrument.addr))
instrument.inst = None
return
if instrumentItem.instance != instrument and instrument.addr is not None:
logger.warning("Cannot initialise {!r} {!r} instrument at {!r} already occupied by {!r} - {!r}".
format(instrument.__class__.__name__, instrument.interface.name, instrument.addr,
instrumentItem.instance.__class__.__name__, instrumentItem.instance.instName))
instrument.inst = None
return
if instrument.addr is None:
logger.warning('Generic instrument: no address specified, empty instrument')
instrument.inst = None
return
try:
instrument.inst = instrument.create_session()
except Exception:
instrument.inst = None
logger.error('could not open resource {} for {}'.format(instrument.addr, instrument))
logger.error(sys.exc_info())
if os.sys.platform == 'win32' and hasattr(logger, 'ipython_shell') and logger.ipython_shell is None:
input('press any key to continue ...')
raise InvalidInstrumentConnection("Invalid instrument connection {ident}".format(ident=instrument))
if instrument.inst:
instrument.setup_inst()
instrument.reset()
instrument.identify()
if instrument.init_implict:
logger.debug("{} has implicit instrument init()".format(instrument.__class__.__name__))
instrument.is_inited = True
elif identify:
logger.info("{} need to call instrument init()".format(instrument.__class__.__name__))
if hasattr(instrument, "id"):
instrument.is_inited = instrument.init_implict or not identify
iid = instrument.id
instrument.collation.identification(
instrument.interface, instrument.addr,
identity=iid, instance=instrument, initialized=instrument.is_inited
)
if iid == "":
raise InvalidInstrumentConnection("Invalid instrument connection {ident}, no identification id".format(ident=instrument))
if not instrument.init_implict and identify:
logger.info("Validate identity {ident} display on '{iid}', then init()".format(ident=instrument, iid=iid))
instrument.close()
else:
logger.info("Device {!r} via {!r}".format(instrument, instrument.inst))
instrument.message()
if instrument.init_implict:
instrument.mqtt_add(mqttc, instrument)
[docs]
class TimeoutBudget(object, metaclass=Singleton):
"""
This TimeoutBudget class is a singleton to provide a shared budget object within each Instrument object.
Allowing each instrument transaction to accumulate timeout for their needs, which gradually bleeds away as time
elapses
Example:
>>> instrument.budget = TimeoutBudget()
>>> instrument.budget.cut_slack(instrument, 3)
add 3sec delay to whats left of the accumulated timeout to instrument.inst.timeout
>>> instrument.budget.cut_slack(instrument)
add minimal delay to whats left of the accumulated timeout to instrument.inst.timeout
"""
[docs]
def __init__(self):
self.slack_time = 0
self.scale = 1.0
self.relax = 0.1
self.minim = 1.0
self.debug = False
[docs]
def slack(self):
"""Seconds remaining of accumulated timeout, lower limit at minim"""
now = time.time()
todo = self.slack_time - now
if todo < self.minim:
todo = self.minim
return (todo)
[docs]
def cut_slack(self, need):
"""
Add needed delay to whats left of the accumulated timeout
Parameters
----------
need : float
The additional time in seconds to add to the accumulated timeout.
Returns
-------
float
The total time in seconds to wait, which is the sum of the remaining slack time, the needed time, and the relaxation time.
"""
todo = self.slack()
todo += need * self.scale + self.relax
now = time.time()
self.slack_time = todo + now
return (todo)
[docs]
def set_slack(self, instrument, need=None):
"""
Add needed delay to whats left of the accumulated timeout to given instrument.inst.timeout
Parameters
----------
instrument : Instrument
The instrument object whose timeout will be adjusted.
need : float, optional
The additional time in seconds to add to the accumulated timeout. Default is 0.2 seconds.
"""
if need is None:
# default time for mundane operations (could depend on instrument interface)
need = 0.2
todo = self.cut_slack(need)
if self.debug:
if instrument:
if hasattr(instrument, "inst"):
if instrument.inst and hasattr(instrument.inst, "timeout"):
instrument.inst.timeout = todo * 1000
logger.debug("{!r} timeout now {}".format(instrument.instName, instrument.inst.timeout/1000))
elif instrument.inst:
instrument.inst.timeout = todo * 1000
[docs]
class Instrument(ABC, Ident, mqtt_deviceattributes):
"""
This is the abstract Instrument class.
It cannot be used directly but
provides a basic set of methods & properties to override with actual instruments
Initialization arguments of a derived class:
addr : int or str
Interface address, hostname, IP-address, PXI-slot or other address required by interface
interface : Interface
GPIB, USBSerial, TCPIP, PXIe, Generic.
backend : str
Backend could be path to a DLL, custom interface item or for
PyVisa backend is either '@ivi' (or '@ni') for NI-Library or
'@py' for pure python pyvisa-py backend.
On default it uses '@ivi' (or '@ni') on win32 and '@py' on
other platforms.
hostname : str
Optional interface hostname or IP-address required by interface, when not addr, for TCPIP network instruments
port : int
Optional interface port number, for TCPIP network instruments
instName : str
Instrument object name passed as string for later reference in messages to user
debug : bool
Debug information
Example: Initialization of a derived class
>>> vdd = DerivedInstrument (addr=24) # GPIB or USB address
>>> vdd.init() # connect and initialize instrument if this method provided, otherwise implicitly
Methods:
init()
Connect and initialize instrument if this method overidden, otherwise implicitly initialize instrument when this method not locally implemented
setup_inst()
Post init method to override with specific instrument interface initialisation
reset()
Abstract reset
identify()
Instrument message, reflect address & interface - if message() implemented
message("")
Abstract instrument message ("string") or ()
close()
Terminate interface
inst.write('*RST')
Write directly to instrument, using underlying instruments command language
ask=inst.query(':READ?')
Write and read the answer, using underlying instruments command language
Properties:
id
Abstract get IDN string
Objects:
collation
A collate_instrument.CollateInstrument singleton cataloging instruments connected
budget
A TimeoutBudget singleton to accumulate timeout across all instrument transactions
com
An instrument resource to generate interface instances, a singleton of LocalInstrument, PXIeInstrument, NetworkInstrument or GenericInstrument
inst
An instance of instrument interface once a connection session to instrument is successful, i.e. a visa object with write(),read(),query() API
"""
@property
@abstractmethod
def interchoices(self):
"""Abstract for static class variable interchoices, to list interfaces statically."""
return []
[docs]
def __init__(self, **kwargs):
super().__init__()
logger.debug("Class {}".format(self.__class__.__name__))
logger.debug("Instrument {} ({})".format(self.__class__.__name__, kwargs))
self.init_implict = self.init.__func__ == Instrument.init
logger.debug("Implicit {}".format(self.init_implict))
self.msg_row_col = (40, 80)
self.budget = TimeoutBudget()
self.interface = None
if "interface" in kwargs and kwargs["interface"] is not None:
self.interface = kwargs["interface"]
else:
if hasattr(self, "interchoices"):
logger.debug("Interchoices {}".format(self.interchoices))
for inter in DefInter().default_interface:
if inter in self.interchoices:
self.interface = inter
break
if not self.interface:
self.interface = self.interchoices[0]
logger.debug("Defaulting to instruments preferred interface of {} from {}".format(self.interface.name, self.interchoices))
else:
logger.debug("Instrument interface of {}".format(self.interface.name))
else:
logger.debug("No interface choices predefined in interchoices")
if not self.interface:
self.interface = DefInter().default_interface[0]
logger.debug("Interface {}".format(self.interface.name))
if self.interface == Interface.tcpip:
self.com = NetworkInstrument()
if "hostname" in kwargs:
self.hostname = kwargs["hostname"]
else:
self.hostname = None
if "port" in kwargs:
self.port = kwargs["port"]
else:
self.port = None
self.backend = None
elif self.interface == Interface.pxie:
self.com = PxieInstrument()
self.hostname = None
self.port = None
if "backend" in kwargs:
self.backend = kwargs["backend"]
else:
self.backend = None
if "channels" in kwargs:
self.channels = kwargs["channels"]
else:
self.channels = None
elif self.interface == Interface.usbserial or self.interface == Interface.gpib:
self.com = LocalInstrument()
if "backend" in kwargs and kwargs["backend"] is not None:
self.backend = kwargs["backend"]
else:
self.backend = DefInter().default_backend
self.hostname = None
self.port = None
elif self.interface == Interface.generic:
self.com = GenericInstrument()
if "hostname" in kwargs:
self.hostname = kwargs["hostname"]
else:
self.hostname = None
if "port" in kwargs:
self.port = kwargs["port"]
else:
self.port = None
if "backend" in kwargs:
self.backend = kwargs["backend"]
else:
self.backend = None
else:
logger.error("No interface type {interface} for {instrument}".format(instrument=self.__class__.__name__, interface=self.interface.name))
if "addr" in kwargs:
self.addr = kwargs["addr"]
if type(self.addr) is str and self.addr.find('COM') > -1:
self.addr = self.addr[3:]
else:
self.addr = None
if "instName" in kwargs:
self.instName = kwargs["instName"]
else:
self.instName = None
if "debug" in kwargs:
self.debug = kwargs["debug"]
else:
self.debug = False
if "debugInst" in kwargs:
self.debugInst = True
else:
self.debugInst = False
self.collation = CollateInstrument()
self.mqtt_all = ['']
self.is_inited = False
def __repr__(self):
args = ['addr={!r}'.format(self.addr)]
if self.backend is not None:
args.append('backend={!r}'.format(self.backend))
if self.interface is not None:
args.append('interface={!r}'.format(self.interface.name))
if self.is_inited:
return "{classname}({args})->{id}".format(
classname=self.__class__.__name__,
args=', '.join(args),
id=self.id)
else:
return "{classname}({args})".format(
classname=self.__class__.__name__,
args=', '.join(args))
[docs]
def help(self):
"""Print the docstring of the instrument class."""
print(self.__doc__)
[docs]
@abstractmethod
def reset(self):
"""Reset and switch beep off."""
logger.warning("Attention: reset() method unimplemented")
[docs]
@abstractmethod
def message(self, message=None):
"""Message display."""
logger.warning("Attention: message() method unimplemented")
@property
@abstractmethod
def id(self):
"""Query IDN."""
logger.warning("Attention: id property unimplemented")
[docs]
def identify(self, showInstName=False):
"""Identify message."""
if showInstName and self.instName and self.instName != "":
msg = self.instName
else:
msg = '{address} :{interface}'.format(interface=self.interface.name, address=self.addr)
self.message(msg)
return msg
[docs]
def setup_inst(self):
"""Setup the instrument settings."""
self.budget.set_slack(self, 5.5) # initial timeout longer for communication startup delay
if self.inst is not None:
self.inst.read_termination = '\n'
self.inst.write_termination = '\n'
[docs]
def init(self, identify=False):
"""Optional init for interlock startup after identification."""
if self.init_implict and not identify:
logger.warning("Attention: init() method unimplemented, reinitialising")
if self.is_inited:
self.close()
self.com._init(self, identify)
self.mqtt_add(mqttc, self)
[docs]
def close(self, force=False):
"""Close connection to instrument."""
self.collation.drop(self.interface, self.addr, force)
self.mqtt_disconnect()
if hasattr(self, 'inst') and self.inst is not None:
try:
self.inst.close()
except Exception:
pass
self.is_inited = False
self.inst = None
logger.info('{} closed'.format(self.instName))
def __del__(self):
self.close()
[docs]
class GeneralVisa (Instrument):
"""
Interface to any Visa Instrument.
The GeneralVisa baseclass can connect to Visa usbserial & gpib instruments
Very limited capabilities, but general purpose for low level access to inst
Use this class to debug an instrument.inst, note there is no init()
Initialization arguments:
addr : int or str
Interface address
interface : Interface
GPIB, USBSerial
backend : str
VISA backend is either '@ivi' (or '@ni') for NI-Library or
'@py' for pure python pyvisa-py backend.
On default it uses '@ivi' (or '@ni') on win32 and '@py' on
other platforms.
Example: Initialization
>>> instrument = GeneralVisa(addr=24) # GPIB or USB address
Methods:
close()
terminate interface
inst.write('*RST')
write direct to instrument
ask=inst.query(':READ?')
write and read the answer
Properties:
id
Get IDN string
"""
interchoices = [Interface.usbserial, Interface.gpib]
[docs]
def __init__(self, **kwargs):
self.is_local = False
super().__init__(**kwargs)
logger.debug("Class {}".format(self.__class__.__name__))
self.com._init(self)
@property
def id(self):
"""Query IDN."""
self.budget.set_slack(self)
try:
value = self.inst.query('*IDN?')
except Exception:
value = ""
return value.replace('\r', '').replace('\n', '')
@property
def idtry(self):
"""Query IDN."""
import pyvisa.errors
self.budget.set_slack(self)
try:
value = self.inst.query('*IDN?')
except pyvisa.errors.VisaIOError as x:
logger.warning("Issue with first {} id request, adjusting termination characters, was {!r},{!r}"
.format(self.__class__.__name__, str(self.inst.write_termination), str(self.inst.read_termination)))
if self.inst.read_termination == '\n':
self.inst.read_termination = '\r'
else:
self.inst.read_termination = '\n'
if self.inst.write_termination == '\n':
self.inst.write_termination = '\r'
else:
self.inst.write_termination = '\n'
try:
value = self.inst.query('*IDN?')
logger.info("Termination adjustment required for {} id request, is {!r},{!r}"
.format(self.__class__.__name__, str(self.inst.write_termination), str(self.inst.read_termination)))
except Exception:
value = ""
except Exception:
value = ""
return value.replace('\r', '').replace('\n', '')
[docs]
def message(self, msg=None):
"""There is no message implemented."""
super().message(msg)
[docs]
def reset(self):
"""There is no reset implemented."""
super().reset()