Source code for pylab_ml.matrix.base_matrix

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Interface to a relay matrix.

:Date: |today|
:Author: Semi-ATE <info@Semi-ATE.org>

[Categories]
Pos=struct                     ('Name', 'Pos',                      'Type', 'Exclusive', 'List', {'Position1', 'Position2'},                    'Protected', 0, 'Color', [1,0.6,0])
Supplies=struct				   ('Name', 'Supplies',                 'Type', 'Exclusive', 'List', {'C_Board_Vsup', 'ContactTestGND'},            'Protected', 0, 'Color', [0.2,0.6,1])
CommunicationBoardConfig=struct('Name', 'CommunicationBoardConfig', 'Type', 'Exclusive', 'List', {'C_BoardOut_BUS1', 'C_BoardOut_Only_BUS1'},   'Protected', 0, 'Color', [0.2,0.8,0])
Additonal=struct               ('Name', 'Additonal',                'Type', 'Stackable', 'List', {'Scope_BUS1', 'US1_VSUP'},                    'Protected', 0, 'Color', [0.74902,0,0.74902])
BUS1Config=struct              ('Name', 'BUS1Config',               'Type', 'Exclusive', 'List', {'BUS1_IO1_Position1'},                        'Protected', 0, 'Color', [0.93333,0.46667,0])
BUS2Config=struct              ('Name', 'BUS2Config',               'Type', 'Exclusive', 'List', {'BUS2_IO1_Position1'},                        'Protected', 0, 'Color', [0.8,0.33333,0])

"""

import os
import pathlib
import configparser
from pylab_ml.collate_instrument import Interface
from pylab_ml.base_instrument import logger


[docs] class BaseMatrix(): """ Basic interface to a Relay Matrix. :Date: |today| :Author: Semi-ATE <info@Semi-ATE.org> """ interchoices = [Interface.generic]
[docs] def __init__(self, connectionTableName=None, identify=False, emulator=False): """Initialise the instrument. Parameters ---------_ connectionTableName : str, optional Filename from setup- or matlab-file with connection definition. The default is $WORKAREA/harness/matrix*.setup identify : bool, optional If True than identify the instrument. The default is False. emulator : bool, optional If True than emulate only a Matrix. No real hardware. The default is False. Example: Initialization >>> tablename = 'matrix_messplatz.setup' >>> matrix = Pickering_40_5xx(addr='Switch', tablename, instName='matrix') >>> matrix.set('Position2','close') # need connectionTable >>> matrix.set('Oszi','close') >>> matrix.set('APB_Vsup','close') >>> matrix.set('APB_Vsup','open') >>> matrix.set('SMU_Vsup','close') >>> matrix.set('SMU_Vsup') # default = open >>> matrix.set('APB_Vsup','close',SwitchOver=True) >>> matrix.connect('1,1,1;1,2,3;1,3,5','close') # if connectionTable not loaded Detailed example of usage: * :download:`examples/pickeringmatrix/matrix_40_541_201.py <../../../examples/pickeringmatrix/matrix_40_541_201.py>` """ self.connectionTableName = connectionTableName self.connectionTable = None self.emulator = emulator self.gui = "pylab_ml.gui.instruments.matrix.matrix" # semi-ctrl use this lib for the matrix gui logger.debug("Class {}".format(self.__class__.__name__))
[docs] def setup_inst(self): """Set instrument settings for setup, called von class Instrument.""" super().setup_inst() breakpoint() errorlist = self.error_list() if errorlist[0] != 0: logger.error("{} found Diagnose error {}".format(self.instName, errorlist)) self.cardnr = 1 # only one card Sub-Unit accept, changes necessary for more sub-units, if more than self.inst.GetSubCounts() self.dic_connectionTable = None # will be set if connectionTable = *.setup self.dic_constantsTable = None # " path = os.environ.get("harness") if self.connectionTableName is not None: self.load_connectionTable(self.connectionTableName) elif path is not None and path != "": # load default $workarea/harness/matrix*.setup load = False with os.scandir(path) as entries: for entry in entries: if entry.is_file() and pathlib.Path(entry.name).suffix == ".setup" and entry.name.find("matrix") == 0: if load: logger.error(f"{self.instName} found more than one setup file, use {entry.path}") self.load_connectionTable(entry.path) load = True self.type = "" self.mqtt_all = ["set()", "clear()", "id", "load_connectionTable()", "close()", "display()"]
def _error_message(self, err): """Get the error_message from instance.""" msg = "{}.Error: {}".format(self.instName, self.inst.ErrorMessage(err)) return msg
[docs] def display(self, mode=None): """ Display state, nodes or connection in ASCII-String. | mode==None or 'state' display actual state | mode=='nodes' display connected nodes | mode=='connection' display available codes for connections | mode==someone else display help """ if mode is None or mode == "state": if self.connectionTable is None: logger.error(" display({}) not possible, connectionTable not loaded".format(mode)) else: msg = "\n" for key in self.connectionTable.keys(): if key.find("ModeSets.") < 0: msg = msg + (" {:<20} : {}\n".format(key, self.ActualState[key])) msg += f"{self.instName}.display('?') for other modes\n" logger.info(msg) print(msg) self.publish_get("display", ("state", msg)) return elif mode == "nodes": self._GetCrosspointState(self.cardnr) elif mode == "connection": if self.connectionTable is None: logger.error(" display({}) not possible, connectionTable not loaded".format(mode)) return msg = "\n" for key in self.connectionTable.keys(): if key.find("ModeSets.") < 0: state = ": {}".format(self.ActualState[key]) else: state = "" msg = msg + (" {:<20}{}\n".format(key, state)) for mode in self.connectionTable[key]: msg = msg + " {}\n".format(mode) logger.info(msg) print(msg) else: print(" usage: display(mode)") print(" with mode= 'state' ") print(" = 'nodes' ") print(" = 'connection' ")
[docs] def clear(self): """ Clear all connections (open). Add the correct command for the instance to this function. """ if self.connectionTable is None: self.ActualState = "open" else: # set all Categories to open self.ActualState = {} for key in self.connectionTable.keys(): self.ActualState.update({key: "open"}) self.publish_set("clear", 0)
[docs] def set(self, connection, state="open", SwitchOver=False): """ Set categories or scenarios to state 'open' or 'close'. Parameters ---------- connection : str Categories or scenarios. state : str, optional 'open' or 'close'. Defaults to 'open'. SwitchOver : bool, optional | True : Connection will be open AFTER the new one was set, | False : default -> first: Open last connection, then: Close new connection Returns: None """ if self.connectionTable is None: logger.warning("set({},{}) not possible: connectionTable not loaded".format(connection, state)) return key = connection # for setup-File if self.dic_connectionTable is None: key = "Category_" + connection # for Matlab-File if key in self.connectionTable: # what is the key, is it in Categories ? if self.ActualState[key] == state: logger.info("set({},{}) already set".format(connection, state)) return vtype = self.dic_connectionTable[key]["Type"] if vtype == "Exclusive": # if Exclusive than open the last setting if self.connect(self.connectionTable[key][self.ActualState[key]], "open"): self.ActualState[key] = "open" self.publish_set("set", (connection, state)) elif vtype == "Stackable": print("found Stackable", connection, state) for connection in self.connectionTable[key]: self.set(connection, "open") else: logger.error("{}.set type {} unknown, please check your setup-file".format(self.instName, vtype)) return if self.dic_connectionTable is None: key = "ModeSets." + connection # for Matlab-File if key in self.connectionTable: # search for ScenariosSets for connection in self.connectionTable[key]: # it is a ScenariosSets self.set(connection, state) self.publish_set("set", (connection, state)) return # it is not a categorie or ScenariosSets, so it is a scenario: found = 0 for key in self.connectionTable.keys(): # scan all keys for the connection if connection in self.connectionTable[key]: found = 1 last_connection = self.ActualState[key] if SwitchOver: break if ( key.find("Category_Supply") == 0 or (self.dic_connectionTable is not None and self.dic_connectionTable[key]["Type"] == "Exclusive") ) and self.ActualState[key].find( "open" ) < 0: # if supply than always open last state if self.connect(self.connectionTable[key][self.ActualState[key]], "open"): self.ActualState[key] = "open" else: return for gkey in self.connectionTable[key].keys(): # search for key 'open*' and if found, than 'open' last connection if gkey.find("open") == 0 and self.ActualState[key].find("open") < 0: # key open indicate open before close if self.connect(self.connectionTable[key][self.ActualState[key]], "open"): self.ActualState[key] = "open" else: return # connection goes wrong... if connection.find(gkey) == 0: found = 2 # do nothing more break if found == 1: if self.ActualState[key] != "mode": if self.connect(self.connectionTable[key][connection], state): if state == "close": self.ActualState[key] = connection else: self.ActualState[key] = state if SwitchOver: self.connect(self.connectionTable[key][last_connection], "open") # open last connection self.publish_set("set", (connection, state)) logger.measure(f"{self.instName}.set('{connection}', '{state}')") elif found == 0: logger.error("{}.set('{}') not found !".format(self.instName, connection)) logger.info(" for available connections, write {}.display('connection')".format(self.instName)) return
[docs] def load_connectionTable(self, connectionTableName=None): """ Load matlab(.m) or setup-file(.setup) with definition from connections. Create constantsTable and connectionTable Parameters ---------- connectionTableName : str, optional Filename from setup- or matlab-file with connection definition. The default is None. """ self.id if connectionTableName is None: if self.connectionTableName is not None: connectionTableName = self.connectionTableName else: logger.error(f"{self.instName}.load_connectionTable: connectionTableName isn't defined") return file_extension = os.path.splitext(connectionTableName)[-1] if file_extension == ".m": constants, contab = self._load_matlabTable(connectionTableName) else: constants, contab = self._load_setupTable(connectionTableName) logger.info(f"{self.instName}.load_connectionTable('{connectionTableName}')") self.connectionTableName = connectionTableName self.publish_set("dic_constantsTable", self.dic_constantsTable) self.publish_set("dic_connectionTable", self.dic_connectionTable) if self.connectionTable != contab: self.connectionTable = contab self.clear() self.constantsTable = constants self.connectionTable = contab self.constantsTable = constants
def _load_setupTable(self, filename): """ Load filename and create dictionary constants and contab. Parameters ---------- filename : str Filename *.setup Returns ------- constants : dict Dictionary with constants from setup-file. contab : dict Dictionary with connectionTable from setup-file. Raises ------ Exception: when file not found. """ if not os.path.isfile(filename): raise Exception(" couldn't find {}".format(filename)) dic_connectionTable = {} dic_constantsTable = {} # constants = {} contab = {} config = configparser.ConfigParser() config.optionxform = str # allow upper character in key config.read(filename) if config.sections() == []: raise IOError(" {} couldn't found necessary information in the file".format(filename)) for key in config["Device"]: dic_constantsTable = self._struc2dic(config["Device"][key]) # have to change for more than one devices # if 'XLables' in dic_constantsTable: # constants = dic_constantsTable['XLables'] # if 'YLables' in dic_constantsTable: # constants.update(dic_constantsTable['YLables']) for categories in config["Categories"]: dic = self._struc2dic(config["Categories"][categories]) for scenario in config["Scenarios"]: if scenario in dic["List"]: dic["List"][scenario] = config["Scenarios"][scenario][1:-1].replace(",'", ";") dic_connectionTable.update({dic["Name"]: dic}) for key in dic_connectionTable: contab.update({dic_connectionTable[key]["Name"]: dic_connectionTable[key]["List"]}) self.dic_connectionTable = dic_connectionTable self.dic_constantsTable = dic_constantsTable self.clear() # return(constants, contab) return (dic_constantsTable, contab)
[docs] def connect(self, crosspointtable, state): """ Set state from crosspoint. Parameters ---------- crosspointtable : str Stringlist with nodes ('1,3,4;1,5,6;1,2,14') card, row, col. state : str 'open' or 'close'. Raises ------ IOError: When state is not 'open' or 'close'. Returns ------- bool: | True : etablish state | False : error """ if state == "close": state = 1 elif state == "open": state = 0 else: logger.error("{}.connect: could not connect to state {}".format(self.instName, state)) return False crosspoints = crosspointtable.split(";") for point in crosspoints: net = point.split(",") if len(net) == 2: cardnr = 1 row = int(net[0]) col = int(net[1]) else: cardnr = int(net[0]) row = int(net[1]) col = int(net[2]) if cardnr != 1: raise IOError("{!r} supports only 1 Card, but Card={}, if you need this, please extend the software....".format(self.instName, cardnr)) err = self.inst.SetCrosspointState(cardnr, row, col, state) if err != 0: logger.error("{}.connect: row={}, col={}, state={} -> {}".format(self.instName, row, col, state, self._error_message(err))) return False else: self.publish_set("SetCrosspointState", (cardnr, row, col, state)) return True
def _GetCrosspointState(self, cardnr): """ Get the state of all crosspoints and display in ASCII-String. Parameters ---------- cardnr : int Card number. Returns ------- None """ msg = "\n {} = {} {}x{}\n ".format(self.instName, self.type, self.cols, self.rows) for cols in range(1, int(self.cols / 10) + 1): msg = msg + (" {}".format(cols)) msg = msg + ("\n ") for cols in range(1, self.cols + 1): msg = msg + "{}".format(cols % 10) for rows in range(1, self.rows + 1): msg = msg + "\n {} ".format(rows) for cols in range(1, self.cols + 1): err, value = self.inst.GetCrosspointState(cardnr, rows, cols) if value == 0: value = " " elif value == 1: value = "*" msg = msg + "{}".format(value) if self.dic_constantsTable is not None: for key in self.dic_constantsTable["YLables"]: if key.find(".") < 0 and self.dic_constantsTable["YLables"][key] == str(rows): msg = msg + "{}".format(key) logger.info(msg) print(msg)
[docs] class Matrix_Emulator(object): """ Emulator from a relay Matrix. :Date: |today| :Author: Semi-ATE <info@Semi-ATE.org> Usable if you have no real Matrix as instance """ import numpy as np
[docs] def __init__(self, addr=None, x=66, y=8): """Initialise the emulator. Parameters ---------- addr : str, optional Address of the emulator. x : int, optional Number of columns in the emulator. y : int, optional Number of rows in the emulator. """ self.addr = addr self.y_max = y self.x_max = x
[docs] def Reset(self): """Reset.""" self.clear()
[docs] def clear(self): """Clear all connections (opem).""" self.ClearCard()
[docs] def Close(self): """Close connection to Pickering Emulator.""" self.matrix_array = None self = None return 0
[docs] def ClearCard(self): """Open all connections.""" self.matrix_array = self.np.zeros((self.y_max, self.x_max))
[docs] def SetCrosspointState(self, cardnr, row, col, state): """ Set state from crosspoint. Parameters ---------- cardnr : int Card number. row : int Row number. col : int Column number. state : int State to set. Returns ------- int 0 if successful, error code otherwise. """ self.matrix_array[row - 1][col - 1] = state return 0
[docs] def GetCrosspointState(self, cardnr, rows, cols): """ Get the state of a crosspoint. Parameters ---------- cardnr : int Card number. rows : int Row number. cols : int Column number. Returns ------- tuple 0 if successful, error code otherwise, and the state of the crosspoint. """ return 0, self.matrix_array[rows - 1][cols - 1]
[docs] def ErrorMessage(self, err): """ Get the error message corresponding to an error code. Parameters ---------- err : int Error code. Raises ------ Exception If an error occurs. """ msg = "Emulator.matrix: something goes wrong, error = {}".format(err) raise Exception(msg)
[docs] def message(self, message=None): """ Display a message. Parameters ---------- message : str, optional Message to display. """ """Device has no display, message display to logger.""" if message is not None: logger.debug(message)
[docs] def GetCardId(self): """ Get the card ID. """ return 0, "Pickering Emulator Matrix"
[docs] def Diagnostic(self): """ Perform a diagnostic check. """ value = 0 return [value]
[docs] def SubInfo(self, cardnr, unknown): """ Get sub-unit information. Parameters ---------- cardnr : int Card number. unknown : int Unknown parameter. Returns ------- tuple 0 if successful, error code otherwise, and the sub-unit information. """ return 0, 100, self.y_max, self.x_max