import sys
import os
try:
import grp
except ImportError:
pass
import time
from abc import ABC, abstractmethod
from serial.tools import list_ports
import logging
from pylab_ml.common.singleton import Singleton
from pylab_ml.dummy import Dummy
from pylab_ml.ident import Ident
from pylab_ml.collate_instrument import Interface, DefInter, CollateInstrument
from pylab_ml.scope.lecroy.vicp import VICP
from pylab_ml.common.mqtt_client import mqtt_deviceattributes, mqtt_init
[docs]
def measure(self, message, *args, **kws):
if self.isEnabledFor(MEASURE_LEVEL_NUM):
# Yes, logger takes its '*args' as 'args'.
self._log(MEASURE_LEVEL_NUM, message, args, **kws)
[docs]
def choicelogger():
for px in sys.argv:
if px == '--labml':
idx = sys.argv.index(px)
logger = sys.argv[idx+1]
return logger
return logging.getLogger(__name__)
MEASURE_LEVEL_NUM = 15
logging.addLevelName(MEASURE_LEVEL_NUM, "MEASURE")
logging.MEASURE = MEASURE_LEVEL_NUM
logging.Logger.measure = measure
logger = choicelogger()
mqttc = mqtt_init(typ='instrument', logger=logger) # TODO: wenn base_instrument importiert wird dann wird mqtt initialisiert!! das darf so nicht sein!!
# da dann der Typ dann nicht mehr geƤndert werden kann!!
_createDummyifInvalid = False
[docs]
def createDummyifInvalid(val):
"""
Global switch to create Dummy instance if error occured in instrument._init or instrument have no connection.
Parameters
----------
val : bool
True/False(default)
Returns
-------
None.
"""
global _createDummyifInvalid
_createDummyifInvalid = val
[docs]
def mqttclose():
mqttc.close()
[docs]
def logsetup():
logger.setLevel(logging.DEBUG)
# remove existing handlers
for handler in logger.handlers:
print(handler)
logger.removeHandler(handler)
logger.handlers = []
# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# create file handler and set level to debug
fh = logging.FileHandler('instrument.log', 'w')
fh.setLevel(logging.MEASURE)
# create formatter
ch_formatter = logging.Formatter('%(levelname)s - %(message)s')
fh_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# add formatter to ch & fh
ch.setFormatter(ch_formatter)
fh.setFormatter(fh_formatter)
# add ch & fh to logger
logger.addHandler(ch)
logger.addHandler(fh)
try:
logger.ipython_shell = get_ipython().__class__.__name__
# ZMQInteractiveShell -- Jupyter notebook or qtconcole
# TerminalInteractiveShell -- Terminal running iPython
except NameError:
logger.ipython_shell = None
[docs]
class InvalidInstrumentCreateSessionFunction(Exception):
pass
[docs]
class InvalidInstrumentConnection(Exception):
pass
[docs]
class LocalInstrument(object, metaclass=Singleton):
has_visa = False
try:
import pyvisa
has_visa = True
except ImportError:
logger.error("import pyvisa not found")
[docs]
def __init__(self):
logger.debug("Class {}".format(self.__class__.__name__))
if os.sys.platform != 'win32':
self.check_group_dialout()
def _init(self, instrument, identify=True):
instrument.addr = self.search4vidpid(instrument.addr, instrument.backend)
if instrument.init_implict or identify:
instrument = instrument.collation.add(instrument)
else:
instrumentItem = instrument.collation.get_instrument(instrument.interface, instrument.addr)
if instrumentItem is None:
logger.warning("No such {!r} instrument at {!r} to init".format(instrument.interface.name, instrument.addr))
instrument.inst = None
return
if instrumentItem.instance != instrument and instrument.addr is not None:
logger.warning("Cannot initialise {!r} {!r} instrument at {!r} already occupied by {!r} - {!r}"
.format(instrument.__class__.__name__, instrument.interface.name, instrument.addr,
instrumentItem.instance.__class__.__name__, instrumentItem.instance.instName))
instrument.inst = None
return
if instrument.addr is None:
logger.warning('Local instrument: no address specified, empty instrument')
instrument.inst = None
return
if not self.has_visa:
logger.error('local instrument: import pyvisa failed')
logger.error('please install pyvisa first')
logger.debug("Local instrument backend {}".format(instrument.backend))
rm = self.pyvisa.ResourceManager(instrument.backend)
logger.debug("Local instrument resource manager {}".format(rm))
instrument.resource_manager = rm
try:
self.rid = self.resource_id(instrument.interface, instrument.addr)
logger.debug("Local instrument resource Id {}".format(self.rid))
instrument.inst = rm.open_resource(self.rid)
except Exception as e:
logger.error(f'Exception in {instrument.instName}, could not open addr {instrument.addr}')
logger.error('local_instrument resource_manager.open_resource exception {}'.format(repr(e)))
if self.pyvisa.__version__ < '1.11.3':
logger.error('local instrument: wrong pyvisa version, need Version >= 1.11.3')
raise Exception
instrument.inst = None
logger.error('could not open resource {}'.format(instrument.addr))
logger.error('perhaps disabled or in use?')
if instrument.interface == Interface.usbserial:
print('found following devices:')
self.serial_list(instrument.backend)
if os.sys.platform == 'win32' and hasattr(logger, 'ipython_shell') and logger.ipython_shell is None:
input('press any key to continue ...')
instrument.mqtt_add(mqttc, instrument)
if instrument.inst:
if instrument.debugInst:
return
instrument.setup_inst()
iid = instrument.reset()
instrument.identify()
if instrument.init_implict:
logger.debug("{} has implicit instrument init()".format(instrument.__class__.__name__))
elif identify:
logger.info("{} need to call instrument init()".format(instrument.__class__.__name__))
if iid is None or iid != -1:
try:
iid = instrument.id
except Exception:
iid = None
if iid is not None and iid != "" and iid != -1:
instrument.is_inited = instrument.init_implict or not identify
instrument.collation.identification(
instrument.interface, instrument.addr,
identity=iid, instance=instrument, initialized=instrument.is_inited
)
if not instrument.init_implict and identify:
logger.info("Validate identity {ident} display on '{iid}', then init()".format(ident=instrument, iid=iid))
instrument.close()
else:
logger.info("Device {!r} via {!r}".format(instrument, instrument.inst))
instrument.message()
if iid is None or iid == "" or iid == -1 or iid == '-1':
msg = "Invalid instrument connection {ident}, no identification id".format(ident=instrument)
if instrument.interface == Interface.usbserial:
msg = msg+'\nplease check the serial parameter on the device: Baudrate, Data bits, Parity, Terminator'
if instrument.debug:
logger.error(msg)
elif not _createDummyifInvalid:
raise InvalidInstrumentConnection(msg)
elif _createDummyifInvalid:
instrument.inst = Dummy(instrument, logger)
if instrument.init_implict:
instrument.mqtt_add(mqttc, instrument)
def resource_id(self, interface, address):
id_name = "UNDEFINED_INTERFACE"
if os.sys.platform == 'win32':
if interface == Interface.gpib and str(address).find('.') < -1:
id_name = 'GPIB0::{}::INSTR'.format(address)
elif interface == Interface.gpib and str(address).find('.') > -1:
id_name = 'TCPIP::{}::INSTR'.format(address)
elif interface == Interface.usbserial:
if isinstance(address, (str)) and address.find('COM') > -1: # pyvisa.__version__> 1.11 needs no 'COM' in the adress
address = address[3:]
id_name = 'ASRL{}::INSTR'.format(address)
else:
if interface == Interface.gpib:
id_name = 'GPIB0::{}::INSTR'.format(address)
logger.error("No GPIB support on UNIX")
elif interface == Interface.usbserial:
if isinstance(address, (str)) and address.find("/") == 0: # address is already an unix device
id_name = 'ASRL{}::INSTR'.format(address)
else:
id_name = 'ASRL/dev/ttyUSB{!r}::INSTR'.format(address)
return id_name
def check_group_dialout(self):
if os.sys.platform == 'win32':
return True
if "dialout" not in [grp.getgrgid(g).gr_name for g in os.getgroups()]:
logger.error("User not in group dialout")
return False
else:
return True
# def backend_windows_usbserial(self, libpath="Z:/pytestsharing/ms_win/libusb/MS64/dll/libusb-1.0.dll"):
# import usb.backend.libusb1
# print("Backend %s" % libpath)
# backend = usb.backend.libusb1.get_backend(find_library=lambda x: libpath)
# dev = usb.core.find(backend=backend)
def list_com(self):
ports = list(list_ports.comports())
for p, d, a in ports:
print("{} {!r} {!r}".format(p, d, a))
def list_usb_serial_ports(self):
ports = list_ports.comports()
for port in ports:
print(port)
[docs]
def search4vidpid(self, addr, backend=None):
"""
Search for VID:PID in the addr.
addr = 'VID:PID' e.q addr = '16C0:0483'
return: device name
"""
if isinstance(addr, int) and os.sys.platform == 'win32':
# return 'COM' + str(addr)
return addr
elif addr is None or isinstance(addr, int) or addr.find(':') != 4:
return addr # addr is not vid:pid
try:
vid = int(addr.split(':')[0], 16)
pid = int(addr.split(':')[1], 16)
except Exception:
return addr
result = None
ports = []
for port in list_ports.comports():
if port.vid is not None and port.vid == vid and port.pid == pid:
if result is not None:
ports.append(result)
result = port.device
if backend is not None:
list_resources_info = self.pyvisa.ResourceManager(backend).list_resources_info()
for port in list_resources_info:
if list_resources_info[port].alias == result:
result = list_resources_info[port].interface_board_number
elif result is not None and DefInter().check4nidriver():
result = result[3:]
if result is None:
logger.error("couldn't find device with vid:pid = {}".format(addr))
print('found following devices:')
self.serial_list(backend)
if len(ports) > 0:
logger.error('found more than 1 device with vid:pid = {}'.format(addr))
for port in ports:
logger.error(' Port: {}'.format(port))
logger.error(' Port: {}'.format(result))
result = None
return result
[docs]
def serial_list(self, backend=None):
"""
List all devices from serial-port (USB-Ports).
backend = None
or '@py'/'@ni', than also print ASRL::INSTR
"""
ports = list(list_ports.comports())
dl = len("Port")
ml = len("Manufacturer")
il = len("USB info")
for i in range(0, len(ports)):
dl = max(dl, len(ports[i].device))
if ports[i].manufacturer is not None:
ml = max(ml, len(ports[i].manufacturer))
if ports[i].usb_info() is not None:
il = max(il, len(ports[i].usb_info()))
if backend is not None and os.sys.platform == 'win32':
vports = self.pyvisa.ResourceManager(backend).list_resources_info()
vl = len("Visa")
for port in vports:
vl = max(vl, len(port))
print('{:{dl}} {:{vl}} {:{ml}} {:{il}}'.format("Port", "Visa", "Manufacturer", "USB info", dl=dl, vl=vl, ml=ml, il=il))
print('-----------------------------------------------------------------')
for i in range(0, len(ports)):
asrl = "?????::?????"
addr = "??"
for port in vports:
device = ports[i].device
if device.find('COM') > -1:
device = int(device[3:])
if (vports[port].alias is not None and vports[port].alias == ports[i].device) or vports[port].interface_board_number == device:
asrl = port
addr = asrl[4:asrl.find(':')]
print('{!s:{dl}} {:{vl}} {!r:{ml}} {!r:{il}}'.format(addr, asrl, ports[i].manufacturer, ports[i].usb_info(), dl=dl, vl=vl, ml=ml, il=il))
else:
print('{:{dl}} {:{ml}} {:{il}}'.format("Port", "Manufacturer", "USB info", dl=dl, ml=ml, il=il))
print('-----------------------------------------------------')
for i in range(0, len(ports)):
print('{!s:{dl}} {!r:{ml}} {!r:{il}}'.format(ports[i].device, ports[i].manufacturer, ports[i].usb_info(), dl=dl, ml=ml, il=il))
[docs]
def idtry(self, instrument):
"""Attempt to fix VISA termination characters to Query IDN."""
import pyvisa.errors
# budget.set_slack(self)
try:
value = instrument.query('*IDN?')
except pyvisa.errors.VisaIOError as x:
logger.warning("Issue with first {} id request, adjusting termination characters, was {!r},{!r}"
.format(instrument.__class__.__name__, str(instrument.write_termination), str(instrument.read_termination)))
if instrument.read_termination == '\n':
instrument.read_termination = '\r'
else:
instrument.read_termination = '\n'
if instrument.write_termination == '\n':
instrument.write_termination = '\r'
else:
instrument.write_termination = '\n'
try:
value = instrument.query('*IDN?')
logger.info("Termination adjustment required for {} id request, is {!r},{!r}"
.format(self.__class__.__name__, str(instrument.write_termination), str(instrument.read_termination)))
except Exception:
value = ""
except Exception:
value = ""
return value.replace('\r', '').replace('\n', '')
def __del__(self):
self.close()
[docs]
class PxieInstrument(object, metaclass=Singleton):
[docs]
def __init__(self):
logger.debug("Class {}".format(self.__class__.__name__))
def _init(self, instrument, identify=True):
if instrument.init_implict or identify:
instrument = instrument.collation.add(instrument)
else:
instrumentItem = instrument.collation.get_instrument(instrument.interface, instrument.addr)
if instrumentItem is None:
logger.warning("No such {!r} instrument at {!r} to init".format(instrument.interface.name, instrument.addr))
instrument.inst = None
return
if instrumentItem.instance != instrument and instrument.addr is not None:
logger.warning("Cannot initialise {!r} {!r} instrument at {!r} already occupied by {!r} - {!r}".
format(instrument.__class__.__name__, instrument.interface.name, instrument.addr,
instrumentItem.instance.__class__.__name__, instrumentItem.instance.instName))
instrument.inst = None
return
if instrument.addr is None:
logger.warning('PXIe instrument: no address slot specified, empty instrument')
instrument.inst = None
return
try:
if instrument.channels is None:
instrument.inst = instrument.backend.Session(resource_name=instrument.addr)
else:
instrument.inst = instrument.backend.Session(resource_name=instrument.addr, channels=str(instrument.channels))
except Exception:
instrument.inst = None
logger.error('could not open resource {} for {}'.format(instrument.addr, instrument))
logger.error(sys.exc_info())
if os.sys.platform == 'win32' and hasattr(logger, 'ipython_shell') and logger.ipython_shell is None:
input('press any key to continue ...')
raise InvalidInstrumentConnection("Invalid instrument connection {ident}".format(ident=instrument))
if instrument.inst:
instrument.setup_inst()
instrument.reset()
instrument.identify()
if instrument.init_implict:
logger.debug("{} has implicit instrument init()".format(instrument.__class__.__name__))
instrument.is_inited = True
elif identify:
logger.info("{} need to call instrument init()".format(instrument.__class__.__name__))
if hasattr(instrument, "id"):
instrument.is_inited = instrument.init_implict or not identify
iid = instrument.id
instrument.collation.identification(
instrument.interface, instrument.addr,
identity=iid, instance=instrument, initialized=instrument.is_inited
)
if iid == "":
raise InvalidInstrumentConnection("Invalid instrument connection {ident}, no identification id".format(ident=instrument))
if not instrument.init_implict and identify:
logger.info("Validate identity {ident} display on '{iid}', then init()".format(ident=instrument, iid=iid))
instrument.close()
else:
logger.info("Device {!r} via {!r}".format(instrument, instrument.inst))
instrument.message()
if instrument.init_implict:
instrument.mqtt_add(mqttc, instrument)
[docs]
class NetworkInstrument(object, metaclass=Singleton):
[docs]
def __init__(self):
logger.debug("Class {}".format(self.__class__.__name__))
def _init(self, instrument, identify=True):
if instrument.init_implict or identify:
if instrument.addr and not instrument.hostname:
instrument.hostname = instrument.addr
elif not instrument.addr and instrument.hostname:
instrument.addr = instrument.hostname
if instrument.port is None:
instrument.port = 1861
instrument = instrument.collation.add(instrument)
else:
instrumentItem = instrument.collation.get_instrument(instrument.interface, instrument.addr)
if instrumentItem is None:
logger.error("No such {!r} instrument at {!r} to init".format(instrument.interface, instrument.addr))
instrument.inst = None
return
if instrumentItem.instance != instrument and instrument.addr is not None:
logger.error("Cannot initialise {!r} {!r} instrument at {!r} already occupied by {!r} - {!r}".
format(instrument.__class__.__name__, instrument.interface.name, instrument.addr,
instrumentItem.instance.__class__.__name__, instrumentItem.instance.instName))
instrument.inst = None
return
if instrument.addr is None:
logger.warning('local instrument: no address specified, empty instrument')
instrument.inst = None
return
try:
if not hasattr(instrument, "create_session"):
instrument.inst = VICP(addr=instrument.addr, port=instrument.port, debug=instrument.debug)
else:
instrument.inst = instrument.create_session()
except Exception:
instrument.inst = None
logger.error('could not open resource {} for {}'.format(instrument.addr, instrument))
logger.error(sys.exc_info())
if os.sys.platform == 'win32' and logger.ipython_shell is None:
input('press any key to continue ...')
raise InvalidInstrumentConnection("Invalid instrument connection {ident}".format(ident=instrument))
if instrument.inst:
instrument.setup_inst()
instrument.reset()
instrument.identify()
if instrument.init_implict:
logger.debug("{} has implicit instrument init()".format(instrument.__class__.__name__))
instrument.is_inited = True
elif identify:
logger.info("{} need to call instrument init()".format(instrument.__class__.__name__))
if hasattr(instrument, "id"):
instrument.is_inited = instrument.init_implict or not identify
iid = instrument.id
instrument.collation.identification(
instrument.interface, instrument.addr,
identity=iid, instance=instrument, initialized=instrument.is_inited
)
if iid == "":
raise InvalidInstrumentConnection("Invalid instrument connection {ident}, no identification id".format(ident=instrument))
if not instrument.init_implict and identify:
logger.info("Validate identity {ident} display on '{iid}', then init()".format(ident=instrument, iid=iid))
instrument.close()
else:
logger.info("Device {!r} via {!r}".format(instrument, instrument.inst))
instrument.message()
if instrument.init_implict:
instrument.mqtt_add(mqttc, instrument)
[docs]
class GenericInstrument(object, metaclass=Singleton):
[docs]
def __init__(self):
logger.debug("Class {}".format(self.__class__.__name__))
def _init(self, instrument, identify=True):
if not hasattr(instrument, "create_session"):
logger.error("No 'create_session(instrument)' function defined in instrument {!r} instrument at {!r} to create interface inst of instrument".
format(instrument.__class__.__name__, instrument.addr))
raise InvalidInstrumentCreateSessionFunction("Invalid instrument create_session(instrument) function for {ident}".format(ident=instrument))
if instrument.init_implict or identify:
instrument = instrument.collation.add(instrument)
else:
instrumentItem = instrument.collation.get_instrument(instrument.interface, instrument.addr)
if instrumentItem is None:
logger.warning("No such {!r} instrument at {!r} to init".format(instrument.interface.name, instrument.addr))
instrument.inst = None
return
if instrumentItem.instance != instrument and instrument.addr is not None:
logger.warning("Cannot initialise {!r} {!r} instrument at {!r} already occupied by {!r} - {!r}".
format(instrument.__class__.__name__, instrument.interface.name, instrument.addr,
instrumentItem.instance.__class__.__name__, instrumentItem.instance.instName))
instrument.inst = None
return
if instrument.addr is None:
logger.warning('Generic instrument: no address specified, empty instrument')
instrument.inst = None
return
try:
instrument.inst = instrument.create_session()
except Exception:
instrument.inst = None
logger.error('could not open resource {} for {}'.format(instrument.addr, instrument))
logger.error(sys.exc_info())
if os.sys.platform == 'win32' and hasattr(logger, 'ipython_shell') and logger.ipython_shell is None:
input('press any key to continue ...')
raise InvalidInstrumentConnection("Invalid instrument connection {ident}".format(ident=instrument))
if instrument.inst:
instrument.setup_inst()
instrument.reset()
instrument.identify()
if instrument.init_implict:
logger.debug("{} has implicit instrument init()".format(instrument.__class__.__name__))
instrument.is_inited = True
elif identify:
logger.info("{} need to call instrument init()".format(instrument.__class__.__name__))
if hasattr(instrument, "id"):
instrument.is_inited = instrument.init_implict or not identify
iid = instrument.id
instrument.collation.identification(
instrument.interface, instrument.addr,
identity=iid, instance=instrument, initialized=instrument.is_inited
)
if iid == "":
raise InvalidInstrumentConnection("Invalid instrument connection {ident}, no identification id".format(ident=instrument))
if not instrument.init_implict and identify:
logger.info("Validate identity {ident} display on '{iid}', then init()".format(ident=instrument, iid=iid))
instrument.close()
else:
logger.info("Device {!r} via {!r}".format(instrument, instrument.inst))
instrument.message()
if instrument.init_implict:
instrument.mqtt_add(mqttc, instrument)
[docs]
class TimeoutBudget(object, metaclass=Singleton):
"""This TimeoutBudget class is a singleton to provide a shared budget object within each Instrument object.
Allowing each instrument transaction to accumulate timeout for their needs, which gradually bleeds away as time
elapses
Example:
>>> instrument.budget = TimeoutBudget()
>>> instrument.budget.cut_slack(instrument, 3)
add 3sec delay to whats left of the accumulated timeout to instrument.inst.timeout
>>> instrument.budget.cut_slack(instrument)
add minimal delay to whats left of the accumulated timeout to instrument.inst.timeout
"""
[docs]
def __init__(self):
self.slack_time = 0
self.scale = 1.0
self.relax = 0.1
self.minim = 1.0
self.debug = False
[docs]
def slack(self):
"""Seconds remaining of accumulated timeout, lower limit at minim"""
now = time.time()
todo = self.slack_time - now
if todo < self.minim:
todo = self.minim
return (todo)
[docs]
def cut_slack(self, need):
"""add needed delay to whats left of the accumulated timeout"""
todo = self.slack()
todo += need * self.scale + self.relax
now = time.time()
self.slack_time = todo + now
return (todo)
[docs]
def set_slack(self, instrument, need=None):
"""add needed delay to whats left of the accumulated timeout to given instrument.inst.timeout"""
if need is None:
# default time for mundane operations (could depend on instrument interface)
need = 0.2
todo = self.cut_slack(need)
if self.debug:
if instrument:
if hasattr(instrument, "inst"):
if instrument.inst and hasattr(instrument.inst, "timeout"):
instrument.inst.timeout = todo * 1000
logger.debug("{!r} timeout now {}".format(instrument.instName, instrument.inst.timeout/1000))
elif instrument.inst:
instrument.inst.timeout = todo * 1000
[docs]
class Instrument(ABC, Ident, mqtt_deviceattributes):
"""This is the abstract Instrument class.
It cannot be used directly but
provides a basic set of methods & properties to override with actual instruments
Initialization arguments of a derived class:
addr (int | str):
interface address, hostname, ip-address, pxi-slot or other address required by interface
interface (Interface):
gpib, usbserial, tcpip, pxie, generic.
backend (str):
backend could be path to a DLL, custom interface item or for
pyvisa backend is either '@ivi' (or '@ni') for NI-Library or
'@py' for pure python pyvisa-py backend.
On default it uses '@ivi' (or '@ni') on win32 and '@py' on
other platforms.
hostname (str):
optional interface hostname or ip-address required by interface, when not addr, for tcpip network instruments
port (int):
optional interface port number, for tcpip network instruments
instName (str):
Instrument object name passed as string for later reference in messages to user
debug (bool):
debug information
Example: Initialization of a derived class
>>> vdd = DerivedInstrument (addr=24) # GPIB or USB address
>>> vdd.init() # connect and initialize instrument if this method provided, otherwise implicitly
Methods:
init()
connect and initialize instrument if this method overidden, otherwise implicitly initialize instrument when this method not locally implemented
setup_inst()
post init method to override with specific instrument interface initialisation
reset()
abstract reset
identify()
instrument message, reflect address & interface - if message() implemented
message("")
abstract instrument message ("string") or ()
close()
terminate interface
inst.write('*RST')
write directly to instrument, using underlying instruments command language
ask=inst.query(':READ?')
write and read the answer, using underlying instruments command language
Properties:
id
abstract get IDN string
Objects:
collation
a collate_instrument.CollateInstrument singleton cataloging instruments connected
budget
a TimeoutBudget singleton to accumulate timeout across all instrument transactions
com
an instrument resource to generate interface instances, a singleton of LocalInstrument, PXIeInstrument, NetworkInstrument or GenericInstrument
inst
an instance of instrument interface once a connection session to instrument is successful, i.e. a visa object with write(),read(),query() API
"""
@property
@abstractmethod
def interchoices(self):
"""Abstract for static class variable interchoices, to list interfaces statically."""
return []
[docs]
def __init__(self, **kwargs):
super().__init__()
logger.debug("Class {}".format(self.__class__.__name__))
logger.debug("Instrument {} ({})".format(self.__class__.__name__, kwargs))
self.init_implict = self.init.__func__ == Instrument.init
logger.debug("Implicit {}".format(self.init_implict))
self.msg_row_col = (40, 80)
self.budget = TimeoutBudget()
self.interface = None
if "interface" in kwargs and kwargs["interface"] is not None:
self.interface = kwargs["interface"]
else:
if hasattr(self, "interchoices"):
logger.debug("Interchoices {}".format(self.interchoices))
for inter in DefInter().default_interface:
if inter in self.interchoices:
self.interface = inter
break
if not self.interface:
self.interface = self.interchoices[0]
logger.debug("Defaulting to instruments preferred interface of {} from {}".format(self.interface.name, self.interchoices))
else:
logger.debug("Instrument interface of {}".format(self.interface.name))
else:
logger.debug("No interface choices predefined in interchoices")
if not self.interface:
self.interface = DefInter().default_interface[0]
logger.debug("Interface {}".format(self.interface.name))
if self.interface == Interface.tcpip:
self.com = NetworkInstrument()
if "hostname" in kwargs:
self.hostname = kwargs["hostname"]
else:
self.hostname = None
if "port" in kwargs:
self.port = kwargs["port"]
else:
self.port = None
self.backend = None
elif self.interface == Interface.pxie:
self.com = PxieInstrument()
self.hostname = None
self.port = None
if "backend" in kwargs:
self.backend = kwargs["backend"]
else:
self.backend = None
if "channels" in kwargs:
self.channels = kwargs["channels"]
else:
self.channels = None
elif self.interface == Interface.usbserial or self.interface == Interface.gpib:
self.com = LocalInstrument()
if "backend" in kwargs and kwargs["backend"] is not None:
self.backend = kwargs["backend"]
else:
self.backend = DefInter().default_backend
self.hostname = None
self.port = None
elif self.interface == Interface.generic:
self.com = GenericInstrument()
if "hostname" in kwargs:
self.hostname = kwargs["hostname"]
else:
self.hostname = None
if "port" in kwargs:
self.port = kwargs["port"]
else:
self.port = None
if "backend" in kwargs:
self.backend = kwargs["backend"]
else:
self.backend = None
else:
logger.error("No interface type {interface} for {instrument}".format(instrument=self.__class__.__name__, interface=self.interface.name))
if "addr" in kwargs:
self.addr = kwargs["addr"]
if type(self.addr) is str and self.addr.find('COM') > -1:
self.addr = self.addr[3:]
else:
self.addr = None
if "instName" in kwargs:
self.instName = kwargs["instName"]
else:
self.instName = None
if "debug" in kwargs:
self.debug = kwargs["debug"]
else:
self.debug = False
if "debugInst" in kwargs:
self.debugInst = True
else:
self.debugInst = False
self.collation = CollateInstrument()
self.mqtt_all = ['']
self.is_inited = False
def __repr__(self):
args = ['addr={!r}'.format(self.addr)]
if self.backend is not None:
args.append('backend={!r}'.format(self.backend))
if self.interface is not None:
args.append('interface={!r}'.format(self.interface.name))
if self.is_inited:
return "{classname}({args})->{id}".format(
classname=self.__class__.__name__,
args=', '.join(args),
id=self.id)
else:
return "{classname}({args})".format(
classname=self.__class__.__name__,
args=', '.join(args))
def help(self):
print(self.__doc__)
[docs]
@abstractmethod
def reset(self):
"""Reset and switch beep off."""
logger.warning("Attention: reset() method unimplemented")
[docs]
@abstractmethod
def message(self, message=None):
"""Message display."""
logger.warning("Attention: message() method unimplemented")
@property
@abstractmethod
def id(self):
"""Query IDN."""
logger.warning("Attention: id property unimplemented")
[docs]
def identify(self, showInstName=False):
"""Identify message."""
if showInstName and self.instName and self.instName != "":
msg = self.instName
else:
msg = '{address} :{interface}'.format(interface=self.interface.name, address=self.addr)
self.message(msg)
return msg
[docs]
def setup_inst(self):
"""Setup the instrument settings."""
self.budget.set_slack(self, 5.5) # initial timeout longer for communication startup delay
if self.inst is not None:
self.inst.read_termination = '\n'
self.inst.write_termination = '\n'
[docs]
def init(self, identify=False):
"""Optional init for interlock startup after identification."""
if self.init_implict and not identify:
logger.warning("Attention: init() method unimplemented, reinitialising")
if self.is_inited:
self.close()
self.com._init(self, identify)
self.mqtt_add(mqttc, self)
[docs]
def close(self, force=False):
"""Close connection to instrument."""
self.collation.drop(self.interface, self.addr, force)
self.mqtt_disconnect()
if hasattr(self, 'inst') and self.inst is not None:
try:
self.inst.close()
except Exception:
pass
self.is_inited = False
self.inst = None
logger.info('{} closed'.format(self.instName))
def __del__(self):
self.close()
[docs]
class GeneralVisa (Instrument):
"""Interface to any Visa Instrument.
The GeneralVisa baseclass can connect to Visa usbserial & gpib instruments
Very limited capabilities, but general purpose for low level access to inst
Use this class to debug an instrument.inst, note there is no init()
Initialization arguments:
addr (int):
interface address
interface (Interface):
gpib, usbserial
backend (str):
visa backend is either '@ivi' (or '@ni') for NI-Library or
'@py' for pure python pyvisa-py backend.
On default it uses '@ivi' (or '@ni') on win32 and '@py' on
other platforms.
Example: Initialization
>>> instrument = GeneralVisa(addr=24) # GPIB or USB address
Methods:
close()
terminate interface
inst.write('*RST')
write direct to instrument
ask=inst.query(':READ?')
write and read the answer
Properties:
id get IDN string
"""
interchoices = [Interface.usbserial, Interface.gpib]
[docs]
def __init__(self, **kwargs):
self.is_local = False
super().__init__(**kwargs)
logger.debug("Class {}".format(self.__class__.__name__))
self.com._init(self)
@property
def id(self):
"""Query IDN."""
self.budget.set_slack(self)
try:
value = self.inst.query('*IDN?')
except Exception:
value = ""
return value.replace('\r', '').replace('\n', '')
@property
def idtry(self):
"""Query IDN."""
import pyvisa.errors
self.budget.set_slack(self)
try:
value = self.inst.query('*IDN?')
except pyvisa.errors.VisaIOError as x:
logger.warning("Issue with first {} id request, adjusting termination characters, was {!r},{!r}"
.format(self.__class__.__name__, str(self.inst.write_termination), str(self.inst.read_termination)))
if self.inst.read_termination == '\n':
self.inst.read_termination = '\r'
else:
self.inst.read_termination = '\n'
if self.inst.write_termination == '\n':
self.inst.write_termination = '\r'
else:
self.inst.write_termination = '\n'
try:
value = self.inst.query('*IDN?')
logger.info("Termination adjustment required for {} id request, is {!r},{!r}"
.format(self.__class__.__name__, str(self.inst.write_termination), str(self.inst.read_termination)))
except Exception:
value = ""
except Exception:
value = ""
return value.replace('\r', '').replace('\n', '')
[docs]
def message(self, msg=None):
"""There is no message implemented."""
super().message(msg)
[docs]
def reset(self):
"""There is no reset implemented."""
super().reset()