Source code for pylab_ml.misc.registermaster

# $Id: $
"""
User interface to the register master excel file.

This excel file descibes all registers of a sensor. For each register
the following items are mentioned:
    - name
    - address
    - bank        (optional)
    - description
    - bitslices

A bitslice is a range of one or more consecutive bits in a register. Each
bitslice is descripted by the following items:
    - name
    - index_startbit
    - index_stopbit
    - read/write direction
    - reset value
    - description
"""

import re
import fnmatch
import ast
from typing import Iterable, List, Union
import sys
import os
import pandas as pd
import ipywidgets as widgets

try:
    from itertools import zip_longest
except ImportError:
    from itertools import izip_longest as zip_longest
from collections import OrderedDict  # , namedtuple
from ate_common.logger import LogLevel
from pylab_ml.common.mqtt_client import mqtt_deviceattributes
from pylab_ml.common import environment
from pylab_ml.common import common

__copyright__ = "Copyright 2023, Lab"
__version__ = "0.0.3"


mylogger = None


[docs] class Logger(): """Basic Logger, if a real logger not necessary."""
[docs] def log_message(self, level, msg): """Print out the message.""" print(msg)
[docs] class RegDB(object): """ Class to read the register master excel file and to build a database of registers. """
[docs] def __init__(self, filename=""): """ Initialize the class by reading the excel file and building the database. Parameters ---------- filename : str The path to the register master excel file. """ import xlrd self.bk = xlrd.open_workbook(filename) self.database = None self.database = []
[docs] def get_row_data(self, bk, sh, rowx, colrange): """ Utility function to extract a single row out of an Excel sheet Parameters ---------- bk : xlrd.Book The workbook object. sh : xlrd.Sheet The sheet object. rowx : int The index of the row to extract. colrange : range The range of column indices to extract. Returns ------- result : list A list of tuples containing the column index, cell type, cell value, and cell formatting index. """ import xlrd result = [] dmode = bk.datemode ctys = sh.row_types(rowx) cvals = sh.row_values(rowx) for colx in colrange: cty = ctys[colx] cval = cvals[colx] if bk.formatting_info: cxfx = str(sh.cell_xf_index(rowx, colx)) else: cxfx = "" if cty == xlrd.XL_CELL_DATE: try: showval = xlrd.xldate_as_tuple(cval, dmode) except xlrd.XLDateError: e1, e2 = sys.exc_info()[:2] showval = "%s:%s" % (e1.__name__, e2) cty = xlrd.XL_CELL_ERROR elif cty == xlrd.XL_CELL_NUMBER: showval = "%.0f" % (cval) elif cty == xlrd.XL_CELL_ERROR: showval = xlrd.error_text_from_code.get(cval, "<Unknown error code 0x%02x>" % cval) else: showval = "%s" % (cval) result.append((colx, cty, showval, cxfx)) return result
[docs] def build_database(self): """ Fill database with register master fields by iterating through all sheets and rows of the excel file. The database is a list of dictionaries, each dictionary represents a register block and contains the following keys: - name: The name of the register block. - id: The row index of the block definition in the excel sheet. - cells: A dictionary mapping column indices to cell names (e.g., 'blk', 'pos', etc.). - types: A dictionary mapping cell names to type conversion definitions (e.g., 'int', 'hex', etc.). - checks: A dictionary mapping cell names to check definitions (e.g., 'hu', 'bu', etc.). - registers: A list of dictionaries, each representing a register defined in the block, containing the following keys: - blk: The name of the register block. - pos: The position of the register within the block. - slices: A list of dictionaries, each representing a bitslice defined for the register, containing the following keys: - posmin: The starting bit index of the bitslice. - posmax: The ending bit index of the bitslice. - poswidth: The width of the bitslice (posmax - posmin + 1). - posmask: A bitmask corresponding to the bitslice (e.g., for bits 3:1, posmask would be 0b1110). - other fields as defined in the excel sheet (e.g., type conversions, checks, etc.). """ shxrange = range(self.bk.nsheets) # # Iterating over all sheets of the file # for shx in shxrange: sh = self.bk.sheet_by_index(shx) nrows, ncols = sh.nrows, sh.ncols colrange = range(ncols) rowrange = range(nrows) # print "Processing sheet %d: name = %r; nrows = %d; ncols = %d" % (shx, sh.name, sh.nrows, sh.ncols) if nrows and ncols: # Attempt to access the RHS corners sh.row_types(0)[ncols - 1] sh.row_values(0)[ncols - 1] sh.row_types(nrows - 1)[ncols - 1] sh.row_values(nrows - 1)[ncols - 1] cell_row = -1 cell_col = -1 pos_col = -1 actual_block = None actual_header = None # # Iterating over all rows of the sheet # for rowx in rowrange: row_data = {} ############################################################################# # # Fill 'row_data' dictionary with data of a complete row # for colx, ty, val, _unused in self.get_row_data(self.bk, sh, rowx, colrange): # # First: idling until the keyword '::blk(xxx)' is found # if not actual_block: try: m = re.match("^::blk\(([^\)]*)\)", val) except Exception: m = None if m: # # '::blk(xxx)' found --> preparing 'actual_block' dictionary # cell_col = colx cell_row = rowx actual_block = {"cells": {}, "types": {}, "checks": {}, "registers": []} actual_block["cells"][colx] = "blk" actual_block["name"] = m.group(1) actual_block["id"] = rowx else: # # While in the definition row, fill up the 'cells' dictionary # if cell_row == rowx: try: m = re.match("^([a-zA-Z_]+[a-zA-Z0-9_]*)$", val) except Exception: m = None if m: if m.group(1) == "pos": pos_col = colx actual_block["cells"][colx] = m.group(1) else: # # Otherwise just filling 'row_data' # try: m = re.match(".*", val) except Exception: m = None if m: row_data[colx] = val ############################################################################# # # If 'row_data" was filled up, processing it # if row_data: ######################################################################### # # Check for a type conversion definition row ('::typ' in 'blk' column) # try: m = re.match("^::typ", row_data[cell_col]) except Exception: m = None if m: # # If found, fill up the 'types' dictionary with valid type conversions # for col in actual_block["cells"].keys(): try: dat = row_data[col] except Exception: dat = None if dat: try: m = re.match("^(int|hex|dec|bin|flt|str)$", row_data[col]) except Exception: m = None if m: actual_block["types"][actual_block["cells"][col]] = row_data[col] continue continue ######################################################################### # # Check for a check definition row ('::check' in 'blk' column) # try: m = re.match("^::check", row_data[cell_col]) except Exception: m = None if m: # # If found, fill up the 'checks' dictionary with valid checks # for col in actual_block["cells"].keys(): try: dat = row_data[col] except Exception: dat = None if dat: try: m = re.match("^(hu|bu|br|,)+$", row_data[col]) except Exception: m = None if m: actual_block["checks"][actual_block["cells"][col]] = row_data[col] continue continue ######################################################################### # # Check for a header definition row ('hdr' in 'pos' column) # try: m = re.match("^hdr$", row_data[pos_col]) except Exception: m = None if m: # # If found, prepare a new 'actual_header' dictionary # (if there already exists a filled dictionary, add it to # the list of registers of the 'actual_block') # if actual_header: for col in actual_block["cells"].keys(): try: s = actual_header[actual_block["cells"][col]] except Exception: actual_header[actual_block["cells"][col]] = "" actual_block["registers"].append(actual_header) actual_header = {"slices": [], "id": rowx} # # Iterate through all valid columns # for col in actual_block["cells"].keys(): try: s = row_data[col] except Exception: s = None if s is not None and s != "": # # If a valid entry is found, perform type conversion # conversion_error = None try: conv = actual_block["types"][actual_block["cells"][col]] except Exception: conv = "str" if conv == "int" or conv == "dec": try: dat = int(s) except Exception: dat = s conversion_error = True elif conv == "hex": try: dat = int(s, 16) except Exception: dat = s conversion_error = True elif conv == "bin": try: dat = int(s, 2) except Exception: dat = s conversion_error = True elif conv == "flt": try: dat = float(s) except Exception: dat = s conversion_error = True else: dat = s # if conversion_error: # print "ERROR: While converting value '%s' to type '%s' for cell '%s' in header '%s'"%(dat, conv, actual_block['cells'][col], actual_header['blk']) # # If a check is specified, perform it # try: c = re.match("hu", actual_block["checks"][actual_block["cells"][col]]) except Exception: c = None if c: # # Check for unique value in header cell # for register in actual_block["registers"]: try: p = register[actual_block["cells"][col]] except Exception: p = None # if p and p == dat: # print "ERROR: Duplicate value '%s' in cell '%s' for header '%s' (same value found in header '%s')"%(dat, actual_block['cells'][col], actual_header['blk'], register['blk']) # # Store value into 'actual_header' dictionary # actual_header[actual_block["cells"][col]] = dat # # If an inheritance is requested, perform this # # ### --> TO BE DONE (adapt 'blk' value and copy slice array from somewhere) continue ######################################################################### # # Check for a slice definition row (slice definition in 'pos' column) # try: m = re.match("^([0-9][0-9:]*)$", row_data[pos_col]) except Exception: m = None if m: # # If found, prepare a new 'actual_slice' dictionary # flds = m.group(0).split(":") if len(flds) == 2: mn = int(flds[1]) mx = int(flds[0]) actual_slice = {"id": rowx, "posmin": mn, "posmax": mx, "poswidth": (mx - mn), "posmask": (((1 << (mx + 1)) - 1) - ((1 << mn) - 1))} else: mn = int(flds[0]) actual_slice = {"id": rowx, "posmin": mn, "posmax": mn, "poswidth": 1, "posmask": (1 << mn)} # # Iterate through all valid columns # for col in actual_block["cells"].keys(): try: s = row_data[col] except Exception: s = None if s is not None and s != "": # # If a valid entry is found, perform type conversion # conversion_error = None try: conv = actual_block["types"][actual_block["cells"][col]] except Exception: conv = "str" if conv == "int" or conv == "dec": try: dat = int(s) except Exception: dat = s conversion_error = True elif conv == "hex": try: dat = int(s, 16) except Exception: dat = s conversion_error = True elif conv == "bin": try: dat = int(s, 2) except Exception: dat = s conversion_error = True elif conv == "flt": try: dat = float(s) except Exception: dat = s conversion_error = True else: dat = s # if conversion_error: # print "ERROR: While converting value '%s' to type '%s' for cell '%s' in slice '%s' of header '%s'"%(dat, conv, actual_block['cells'][col], m.group(0), actual_header['blk']) # # If a check is specified, perform it # try: c = re.match("bu", actual_block["checks"][actual_block["cells"][col]]) except Exception: c = None if c: # # Check for unique value in slice cell # for slice in actual_header["slices"]: try: p = slice[actual_block["cells"][col]] except Exception: p = None # if p and p == dat: # print "ERROR: Duplicate value '%s' in cell '%s' for slice '%s' of header '%s'"%(dat, actual_block['cells'][col], actual_slice['pos'], actual_header['blk']) # # Store value into 'actual_slice' dictionary # actual_slice[actual_block["cells"][col]] = dat # # Check whether all required entries are existant # for check in actual_block["checks"].keys(): try: c = re.match("br", actual_block["checks"][check]) except Exception: c = None if c: try: p = re.match("^.*$", actual_slice[check]) except Exception: p = None # if not p: # print "ERROR: Required cell '%s' not found for slice '%s' of header '%s'"%(check, actual_slice['pos'], actual_header['blk']) # # Add 'actual_slice' dictionary to 'actual_header' dictionary # if actual_header and actual_slice: for col in actual_block["cells"].keys(): try: s = actual_slice[actual_block["cells"][col]] except Exception: actual_slice[actual_block["cells"][col]] = "" actual_header["slices"].append(actual_slice) continue # # Add 'actual_header' dictionary to 'actual_block' dictionary (if one exists) # if actual_header: for col in actual_block["cells"].keys(): try: s = actual_header[actual_block["cells"][col]] except Exception: actual_header[actual_block["cells"][col]] = "" actual_block["registers"].append(actual_header) # # Add 'actual_block' dictionary to 'database' array (if one exists) # if actual_block: self.database.append(actual_block) actual_block = None return
[docs] class Register: """ Class to represent a register defined in the register master excel file. """ # read-only attributes def __setattr__(self, name, value): """ Set attribute value, but only for a predefined set of attributes. (e.g., '_cache', 'value', 'read', 'write', etc.) and the bit-slice names defined in the register master excel file. Parameters ---------- name : str The name of the attribute to set. value : any The value to set for the attribute. Raises ------ AttributeError If the attribute name is not valid. """ valid = "__cache__", "_cache", "_debug", "value", "value_bin", "read", "write", "res" if name not in valid and name not in self._slices: raise AttributeError("can't set attribute {}".format(repr(name))) object.__setattr__(self, name, value) def _get_addr(self): """ Get the register address based on the current protocol type and the defined addresses in the register master excel file. Returns ------- paddr : int The register address to access. Raises ------ ValueError If the interface is in a mode that does not allow access to the register. (e.g., if the protocol type is 'tin' but the CPU address is not defined). """ paddr = self._addr # set to default if self._rm._protocol_typ == "tin": paddr = self._cpuaddr if paddr == -1: msg = "access not possible, interface in {!r} mode, please use e.g. tin".format(self._rm._protocol_typ) self._rm.handle_exception(msg, ValueError, self._name) return paddr
[docs] def writebase(self, bank): """ Write the bank address to the interface if the current bank is different from the given bank or if the forcebank flag is set. Parameters ---------- bank : int The bank address to write to the interface. """ if self._rm._forcebank or (bank != self._rm._bank): self._rm._protocol.writebase(bank) self._rm._bank = bank
[docs] def __init__(self, cpuaddr, addr, bank=0, name="", slices={}, description="", rm=None): """ Initialize the Register object with the given parameters and set up the necessary attributes. Parameters ---------- cpuaddr : int The CPU address of the register. addr : int The address of the register. bank : int, optional The bank address of the register (default is 0). name : str, optional The name of the register (default is ""). slices : dict, optional The bit-slices of the register (default is {}). description : str, optional The description of the register (default is ""). rm : RegisterMaster, optional The RegisterMaster instance (default is None). """ global mylogger _setattr = object.__setattr__.__get__(self, self.__class__) if bank == "": mylogger.log_message(LogLevel.Warning(), f"Regisermaster {name} entray 'bank' is empty, use 0 instead") bank = 0 _setattr("_cpuaddr", cpuaddr) _setattr("_addr", addr) _setattr("_bank", bank) _setattr("_name", name) _setattr("_slices", slices) _setattr("_desc", description) _setattr("_rm", rm) _setattr("_reset", 0) _setattr("__cache__", None) _setattr("_rw", all([slc["dir"] == "RW" or slc["dir"] == "R" for slc in slices.values()])) _setattr("mqtt_list", ["read", "write"])
def _validate(self, value, length): """ Validate the given value against the specified length (number of bits) and ensure it is a non-negative integer that fits within the allowed range for the register. Parameters ---------- value : any The value to validate. length : int The number of bits for the register (used to determine the valid range for the value). Returns ------- value : int The validated value if it is valid. Raises ------ ValueError If the value is not an integer, is negative, or exceeds the maximum allowed value for the specified length. """ if isinstance(value, int): if value > 2**length - 1: msg = "{!r} too big for {} bits".format(value, length) self._rm.handle_exception(msg, ValueError, self._name) elif value < 0: msg = "value = {!r} must be positive".format(value) self._rm.handle_exception(msg, ValueError, self._name) else: msg = "value {!r} is not an integer".format(value) self._rm.handle_exception(msg, ValueError, self._name) return value @property def _cache(self): """ Get the cached value of the register if it is available and valid, otherwise raise an error if the register is in non-atomic mode and the cache is empty. Returns ------- __cache__ : int or None The cached value of the register if it is available and valid, or None if the cache is empty. """ if not self._rm._atomic and self.__cache__ is None: msg = "cache is empty\n" msg += "use either _cache = <value>, _use_reset(), " msg += "_read() or _write(<value>)" self._rm.handle_exception(msg, ValueError, self._name) return self.__cache__ @_cache.setter def _cache(self, value): if value is None: object.__setattr__(self, "__cache__", None) else: self._validate(value, len(self)) object.__setattr__(self, "__cache__", value) @property def value(self): """ Bit-slice over full register,shadow register, read/write the true regiser only if None Returns ------- value : int The value of the register, either from the cache or by reading from the hardware if the cache is empty and the register is in atomic mode. """ if self._rm._atomic and self.__cache__ is None: if self._bank is not None and self._bank != "": self.writebase(self._bank) value = self._rm._protocol.readreg(self._get_addr()) value &= 2 ** self._len_slices() - 1 self._check_r_err() else: value = self._cache return value @value.setter def value(self, value): self._validate(value, len(self)) if self._rm._atomic and self.__cache__ is None: if self._bank is not None and self._bank != "": self.writebase(self._bank) self._rm._protocol.writereg(self._get_addr(), value) self._check_w_err() else: self._cache = value @property def addr(self): """ Returns the register address Returns ------- int The address of the register to access, determined by the current protocol type and the defined addresses in the register master excel file. """ return self._get_addr() def _get_slice(self, name): """ Get the value of a specific bit-slice of the register by applying the appropriate bitmask and shift operations to either the cached value or by reading from the hardware if the cache is empty and the register is in atomic mode. Parameters ---------- name : str The name of the bit-slice to retrieve. Returns ------- int The value of the specified bit-slice. Raises ------ ValueError If the specified bit-slice is not readable or if there is an error while reading from the hardware. """ lsb = self._slices[name]["lsb"] msb = self._slices[name]["msb"] length = msb - lsb + 1 mask = (2**length - 1) << lsb if self._rm._atomic and self.__cache__ is None: if self._bank is not None and self._bank != "": self.writebase(self._bank) value = self._rm._protocol.readreg(self._get_addr()) value &= 2 ** self._len_slices() - 1 self._check_r_err() else: value = self._cache value = (value & mask) >> lsb return value def _set_slice(self, name, value, force=False): """ Set the value of a specific bit-slice of the register by applying the appropriate bitmask and shift operations to either the cached value or by writing to the hardware if the cache is empty and the register is in atomic mode. Parameters ---------- name : str The name of the bit-slice to set. value : int The value to set for the specified bit-slice. force : bool, optional If True, allows setting the value of a bit-slice even if it is not marked as writable (default is False). Returns ------- None Raises ------ ValueError If the specified bit-slice is not writable and force is False, or if there is an error while writing to the hardware. """ if not force and "W" not in self._slices[name]["dir"].upper(): msg = "can't write slice {!r}, it's not writeable" msg = msg.format(name) raise ValueError(msg) lsb = self._slices[name]["lsb"] msb = self._slices[name]["msb"] length = msb - lsb + 1 self._validate(value, length) if self._rm._atomic and self.__cache__ is None: if not self._rw: msg = "can´t use read-modify-write because " msg += "not all bits are readable\n" msg += "use either _cache = <value>, _use_reset(), " msg += "_read() or _write(<value>)" raise ValueError(msg.format(self._name)) if self._bank is not None and self._bank != "": self.writebase(self._bank) current = self._rm._protocol.readreg(self._get_addr()) current &= 2 ** self._len_slices() - 1 self._check_r_err() elif self.__cache__ is None: current = self.read() else: current = self._cache # clear all bits between msb:lsb mask = (2**length - 1) << lsb current &= ~mask # & (2**len(obj) - 1) # set value current = current | (value << lsb) if self._rm._atomic and self.__cache__ is None: self._rm._protocol.writereg(self._get_addr(), current) self._check_w_err() else: self._cache = current def _use_reset(self, default=None, force=False): """ Copy reset values from all bit-slices into _cache. If force == False -> Only a writable slice sees a reset. == True -> readable and writable slice sees a reset. Note: All bit-slices must have a reset value otherwise an error occurs. Parameters ---------- default : int, optional The default reset value to use for bit-slices that do not have a defined reset value (default is None). If None, an error will be raised for bit-slices without a reset value. force : bool, optional If True, applies the reset value to all bit-slices regardless of their writability (default is False). If False, only writable bit-slices will have their reset values applied. Returns ------- None """ cache_old = self.__cache__ self._cache = 0 for name, slice in self._slices.items(): valres = slice["res"] if (valres is None) and (default is None): self.__cache__ = cache_old msg = "bit-slice {!r} has no reset value" raise ValueError(msg.format(name)) elif valres is None: valres = default msg = "WARNING: register {!r} [{!r}:{!r}] has no reset value, set to default={!r}" print(msg.format(name, slice["msb"], slice["lsb"], default)) pass self._set_slice(name, valres, force) @property def __has_reset(self): """ Check if all bit-slices of the register have a defined reset value. Returns ------- bool True if all bit-slices have a defined reset value, False otherwise. """ for name, slice in self._slices.items(): valres = slice["res"] if valres is None: return False return True def __slices2attr(self): """ Dynamically create properties for each bit-slice defined in the register master excel file, allowing access to the bit-slices as attributes of the Register object. Returns ------- new_reg : Register A new Register object with properties for each bit-slice defined in the register master excel file. """ _setattr = object.__setattr__.__get__(self, self.__class__) dct = {} for name, slice in self._slices.items(): def getter(self, name=name): if "R" not in self._slices[name]["dir"].upper(): msg = "can't read slice {!r}, it's not readable" msg = msg.format(name) raise ValueError(msg) return self._get_slice(name) def setter(obj, value, name=name): obj._set_slice(name, value) dct[name] = property(getter, setter, doc=slice["desc"]) dct["__doc__"] = self.__class__.__doc__ SliceRegister = type("Register", (self.__class__,), dct) new_reg = SliceRegister(self._cpuaddr, self._addr, self._bank, self._name, self._slices, self._desc, self._rm) object.__setattr__(new_reg, "_reset", self._reset) return new_reg @property def value_bin(self): """ Get the binary representation of the register value. eg. for a 16-bit register with value 0xABCD, the binary representation would be "1010 1011 1100 1101". Returns ------- str Binary representation of the register value, grouped in 4-bit segments. """ fmt = "{{:0{}b}}".format(len(self)) groups = self.__grouper(fmt.format(self.value), 4) return " ".join(groups) @value_bin.setter def value_bin(self, value): self.value = int(value.replace(" ", ""), base=2)
[docs] def read(self, compare=None, onlycheck=True, tolerance=0, mask=None): """ Read from the Register with selected protocol. Parameters ---------- compare : int or None compare value, if None than no compare onlycheck: bool True (default) result is the compare value 0 or 1 False result is compare value and the read value tolerance : int tolerance for compare, default is 0 mask : int or None mask for compare, default is None (no mask) Returns ------- error : int 0 if compare is successful, 1 if compare fails, or the value read from the register if onlycheck is False. value : int The value read from the register (only returned if onlycheck is False). -1 : int If the protocol is not defined or if there is an error while reading from the hardware. """ global mylogger if self._rm._protocol is None: mylogger.log_message(LogLevel.Error(), f"{self.__class__.__name__} protocol not defined!!") return -1 value = self._read() if not self._rm._protocol.board.error and value is not None: mylogger.log_message(LogLevel.Measure(), f"{self.__class__.__name__}.{self._name} == {hex(value)}") self._rm.publish_set(f"{self._name}.read", value) if compare is not None: error = common.check(f"{self.__class__.__name__}.{self._name}", compare, value, tolerance, mask) if onlycheck: return error else: return error, value return value
[docs] def write(self, value=None): """ Write to the Register with selected protocol. Parameters ---------- value : int or None The value to write to the register. If None, the method will attempt to write the cached value of the register. Returns ------- result : int The result of the write operation. Returns -1 if the protocol is not defined. """ global mylogger if self._rm._protocol is None: mylogger.log_message(LogLevel.Error(), f"{self.__class__.__name__} protocol not defined!!") return -1 result = self._write(value) msg = f"{self.__class__.__name__}.{self._name} := " msg = f"{msg} {hex(result)}" if value is not None else f"{msg} {result}" mylogger.log_message(LogLevel.Measure(), msg) self._rm.publish_get(f"{self._name}.write", result) return result
[docs] def res(self): """ Get the reset value of the register by checking the reset values of all bit-slices defined in the register master excel file. Returns ------- valres : int or str The reset value of the register if all bit-slices have a defined reset value, or a string indicating that the reset value is not defined if any bit-slice does not have a defined reset value. """ for name, slice in self._slices.items(): valres = slice["res"] return valres
def _read(self, protocol=None): """ Read the value of the register from the hardware using the selected protocol, and update the cache with the read value if the read operation is successful and the register is in atomic mode. Parameters ---------- protocol : any, optional The protocol to use for reading the register (default is None, which means using the current protocol defined in the RegisterMaster instance). Returns ------- value : int or None The value read from the register, or None if the read operation failed. """ if self._bank is not None and self._bank != "": self.writebase(self._bank) value = self._rm._protocol.readreg(self._get_addr()) if not self._rm._protocol.board.error and value is not None: value &= 2 ** self._len_slices() - 1 self._check_r_err() if value is not None and value > -1: self._cache = value return value def _write(self, value=None, protocol=None, verify=False): """ Write the given value to the register using the selected protocol, or write the cached value if no value is provided. Parameters ---------- value : int or None The value to write to the register. If None, the method will attempt to write the cached value of the register (default is None). protocol : any, optional The protocol to use for writing to the register. (default is None, which means using the current protocol defined in the RegisterMaster instance). verify : bool, optional If True, the method will read back the value after writing and verify that it was written correctly (default is False). Returns ------- _value : int or None The value that was written to the register, or None if the write operation failed. """ if value is None: _value = self._cache else: _value = self._validate(value, len(self)) if _value is not None: if self._bank is not None and self._bank != "": self.writebase(self._bank) self._rm._protocol.writereg(self._get_addr(), _value) self._check_w_err() else: msg = "nothing to write (neither argument nor _cache value is given)" raise ValueError(msg) if self._rm._atomic and _value is None: self._cache = None else: self._cache = _value return _value def _check_r_err(self): """ Check for errors after reading from the hardware and handle any errors that occur by raising an exception with an appropriate error message. """ if self._rm._protocol.board.error is True: msg = "can't read from interface - {!r}" msg = msg.format(self._rm._protocol.board.errortext) self._rm.handle_exception(msg, ValueError, self._name) def _check_w_err(self): """ Check for errors after writing to the hardware and handle any errors that occur by raising an exception with an appropriate error message. """ if self._rm._protocol.board.error is True: msg = "can't write to interface - {!r}" msg = msg.format(self._rm._protocol.board.errortext) self._rm.handle_exception(msg, ValueError, self._name) @property def value_table(self): """ Generate a table of the bit-slices of the register, including their names, directions, values in decimal, hexadecimal, and binary formats, and reset values. Returns ------- df : pandas.DataFrame A DataFrame containing the bit-slice information for the register, with columns for location, name, direction, decimal value, hexadecimal value, binary value, and reset value. """ cache_old = self._cache self._cache = self.value slices = [] for name, dct in self._slices.items(): slice = OrderedDict() slice["name"] = name if dct["msb"] == dct["lsb"]: loc = "[{msb}]".format(**dct) else: loc = "[{msb}:{lsb}]".format(**dct) slice["loc"] = loc length = dct["msb"] - dct["lsb"] + 1 value = self._get_slice(name) slice["dec"] = str(value) fmt = "0x{{:0{}x}}".format(length // 4 + (1 if length % 4 else 0)) slice["hex"] = fmt.format(value) fmt = "{{:0{}b}}".format(length) groups = self.__grouper(fmt.format(value), 4) slice["bin"] = "" + " ".join(groups) valres = dct["res"] slice["res"] = "-" if valres is None else valres slice["dir"] = dict(R="R", W="W", RW="RW").get(dct["dir"].upper()) slices.append(slice) df = pd.DataFrame(slices, columns=["loc", "name", "dir", "dec", "hex", "bin", "res"]) df = df.set_index("loc") df.index.name = None self._cache = cache_old return df @property def help(self): """ Print the documentation for the register. """ print(self.__doc__) @property def __doc__(self): """ Generate a documentation string for the register, including its description and information about its bit-slices. Returns ------- str A formatted string containing the documentation for the register, including its description and information about its bit-slices such as their names, locations, directions, and reset values. """ lines = [self._desc, ""] width_left = 0 for name, s in self._slices.items(): if s["res"] != 0: name = name + " (={})".format(s["res"]) width_left = max(width_left, len(name)) if s["msb"] == s["lsb"]: slc = "[{}]".format(s["msb"]) else: slc = "[{msb}:{lsb}]".format(**s) rw = dict(RW="RW", R="R", W="W").get(s["dir"], "") field = " ".join([slc, rw]) width_left = max(width_left, len(field)) for name, s in self._slices.items(): if s["res"] != 0: name = name + " (={!r})".format(s["res"]) if s["msb"] == s["lsb"]: slc = "[{}]".format(s["msb"]) else: slc = "[{msb}:{lsb}]".format(**s) rw = dict(RW="RW", R="R", W="W").get(s["dir"], "") field = " ".join([slc, rw]) left = name, field right = s["desc"].split("\n") for le, r in zip_longest(left, right, fillvalue=""): fmt = "{{:{}}} {{}}".format(width_left) lines.append(fmt.format(le, r)) lines.append("") # lines.append('note: direction is RW if not marked as RO or WO') lines.append("note: reset value is zero if not marked with (=<res>)") return "\n".join(lines) def _len_slices(self): """ Calculate the total number of bits covered by all bit-slices defined in the register master excel file by summing the lengths of each bit-slice. eg. if there are two bit-slices defined as [7:4] and [3:0], the total number of bits covered would be (7 - 4 + 1) + (3 - 0 + 1) = 4 + 4 = 8 bits. Returns ------- int The total number of bits covered by all bit-slices defined in the register master excel file. """ return sum(s["msb"] - s["lsb"] + 1 for s in self._slices.values()) def __len__(self): """ Return the length of the register in bits, which is determined by the total number of bits covered by all bit-slices defined in the register master excel file. eg. if there are two bit-slices defined as [7:4] and [3:0], the length of the register would be 8 bits. Returns ------- int The length of the register in bits. """ return self._rm._len_reg def __repr__(self): """ Return a string representation of the Register object, including its class name and key attributes such as CPU address, register address, bank, and name. Returns ------- str A string representation of the Register object, including its class name and key attributes. """ args = ["cpuaddr=0x{:x}".format(self._cpuaddr)] args.append("addr=0x{:x}".format(self._addr)) args.append("bank={}".format(self._bank)) args.append("name={!r}".format(self._name)) return "{classname}({args})".format(classname=self.__class__.__name__, args=", ".join(args)) @staticmethod def __grouper(seq, n): """ Group a sequence into chunks of a specified size and reverse the order of the groups while keeping the order of characters within each group intact. eg. grouper('ABCDEFG', 3) --> A BCD EFG Parameters ---------- seq : str The sequence to be grouped. n : int The size of each group. Returns ------- list A list of grouped sequences. """ rseq = seq[::-1] return [rseq[i : i + n][::-1] for i in range(0, len(seq), n)][::-1] @staticmethod def __int2dec(value, length=None): """ Convert an integer value to a decimal string representation, grouped in segments of three digits for better readability. eg. int2dec(1234567) --> "1 234 567 Parameters ---------- value : int The integer value to be converted to a decimal string representation. length : int, optional The number of bits for the register (used for validation, default is None). Returns ------- value : str The decimal string representation of the integer value. """ try: groups = Register.__grouper("{}".format(value), 3) return " ".join(groups) except ValueError: return value @staticmethod def __int2hex(value, length): """ Convert an integer value to a hexadecimal string representation, grouped in segments of four bits for better readability. eg. int2hex(12000, 16) --> "0x 2e e0" int2hex(333, 32) --> "0x 00 00 01 4d" Parameters ---------- value : int The integer value to be converted to a hexadecimal string representation. length : int The number of bits for the register. Returns ------- value : str The hexadecimal string representation of the integer value. """ try: fmt = "{{:0{}X}}".format(length // 4 + (1 if length % 4 else 0)) groups = Register.__grouper(fmt.format(value), 2) return "0x " + " ".join(groups) except ValueError: return value @staticmethod def __int2bin(value, length): """ Convert an integer value to a binary string representation, grouped in segments of four bits for better readability. eg. int2bin(10, 8) --> "0b 0000 1010 int2bin(255, 16) --> "0b 0000 0000 1111 1111 Parameters ---------- value : int The integer value to be converted to a binary string representation. length : int The number of bits for the register. Returns ------- value : str The binary string representation of the integer value. """ try: fmt = "{{:0{}b}}".format(length) groups = Register.__grouper(fmt.format(value), 4) return "0b " + " ".join(groups) except ValueError: return value def _calc(self): """ Calculate the value of the register and its bit-slices, and generate a visual representation of the register using ipywidgets, including fields for the CPU address, register address, bank, name, reset value, decimal value, hexadecimal value, binary value, and description. Returns ------- widgets.VBox A VBox widget containing the visual representation of the register, including its fields and values. """ names = OrderedDict() names["cpuaddr"] = dict(width=50) names["addr"] = dict(width=50) names["bank"] = dict(width=50) names["name"] = dict(width=100) names["res"] = dict(width=60) names["dec"] = dict(width=75) names["hex"] = dict(width=75) names["bin"] = dict(width=175) names["description"] = dict(width=380, margin="0px 0px 10px 10px") reg_header = [] for name, dct in names.items(): params = dct.copy() if "width" in params: params["width"] = "{}px".format(dct["width"]) box = widgets.HTML("<b>{}</b>".format(name), layout=params) reg_header.append(box) reg_header = widgets.HBox(reg_header) reg_hline = widgets.Box(layout=dict(border="1px solid", margin="0px 2px 5px 2px")) width_max = sum(dct["width"] for dct in names.values()) reg_hline.layout.width = "{}px".format(width_max) reg_line = [] reg_line.append(widgets.Label("0x{:X}".format(self._cpuaddr))) reg_line.append(widgets.Label("0x{:X}".format(self._addr))) reg_line.append(widgets.Label("{}".format(self._bank))) reg_line.append(widgets.Label(str(self._name))) freset = widgets.Button(description="Reset", disabled=not self.__has_reset) reg_line.append(freset) def on_reset(obj): self._use_reset() reg_line.children[4].value = self.__int2dec(self._cache, len(self)) reg_line.children[5].value = self.__int2hex(self._cache, len(self)) reg_line.children[6].value = self.__int2bin(self._cache, len(self)) for (name, dct), line in zip(self._slices.items(), lines): value = self._get_slice(name) slen = dct["msb"] - dct["lsb"] + 1 line.children[4].value = self.__int2dec(value, slen) line.children[5].value = self.__int2hex(value, slen) line.children[6].value = self.__int2bin(value, slen) freset.on_click(on_reset) try: value = self._cache except ValueError: value = "" length = len(self) fdec = widgets.Text("", placeholder="0d ...") fdec.value = self.__int2dec(value, length) reg_line.append(fdec) fhex = widgets.Text("", placeholder="0d ...") fhex.value = self.__int2hex(value, length) reg_line.append(fhex) fbin = widgets.Text("", placeholder="0d ...") fbin.value = self.__int2bin(value, length) reg_line.append(fbin) fmt = '<p style="line-height:1.2">{}</p>' field = widgets.HTML(fmt.format(self._desc.replace("\n", "<br>"))) field.layout.margin = "0px 0px 10px 10px" reg_line.append(field) for field, dct in zip(reg_line, names.values()): field.layout.width = "{}px".format(dct["width"]) reg_line = widgets.HBox(reg_line) reg_line.layout.margin = "0px 0px 10px 0px" def dec2other(obj, length=len(self), fdec=fdec, fhex=fhex, fbin=fbin): """ Convert the decimal value entered in the text field to other representations (hexadecimal and binary) and update the corresponding text fields. Parameters ---------- obj : widget The widget that triggered the event. length : int The number of bits for the register. fdec : widget The decimal text field. fhex : widget The hexadecimal text field. fbin : widget The binary text field. Returns ------- None Raises ------ ValueError If the input value is not a valid integer. """ try: value = int(fdec.value.replace(" ", "")) self._cache = value fdec.value = self.__int2dec(value, length) fhex.value = self.__int2hex(value, length) fbin.value = self.__int2bin(value, length) for name, line in zip(self._slices, lines): value = self._get_slice(name) dct = self._slices[name] slen = dct["msb"] - dct["lsb"] + 1 line.children[4].value = self.__int2dec(value, slen) line.children[5].value = self.__int2hex(value, slen) line.children[6].value = self.__int2bin(value, slen) except ValueError: fdec.value = self.__int2dec("", length) fdec.on_submit(dec2other) def hex2other(obj, length=len(self), fdec=fdec, fhex=fhex, fbin=fbin): """ Convert the hexadecimal value entered in the text field to other representations (decimal and binary) and update the corresponding text fields. Parameters ---------- obj : widget The widget that triggered the event. length : int The number of bits for the register. fdec : widget The decimal text field. fhex : widget The hexadecimal text field. fbin : widget The binary text field. Returns ------- None Raises ------ ValueError If the input value is not a valid hexadecimal integer. """ try: value = int(fhex.value.replace(" ", ""), 16) self._cache = value fdec.value = self.__int2dec(value, length) fhex.value = self.__int2hex(value, length) fbin.value = self.__int2bin(value, length) for name, line in zip(self._slices, lines): value = self._get_slice(name) dct = self._slices[name] slen = dct["msb"] - dct["lsb"] + 1 line.children[4].value = self.__int2dec(value, slen) line.children[5].value = self.__int2hex(value, slen) line.children[6].value = self.__int2bin(value, slen) except ValueError: fhex.value = self.__int2hex("", length) fhex.on_submit(hex2other) def bin2other(obj, length=len(self), fdec=fdec, fhex=fhex, fbin=fbin): """ Convert the binary value entered in the text field to other representations (decimal and hexadecimal) and update the corresponding text fields. Parameters ---------- obj : widget The widget that triggered the event. length : int The number of bits for the register. fdec : widget The decimal text field. fhex : widget The hexadecimal text field. fbin : widget The binary text field. Returns ------- None Raises ------ ValueError If the input value is not a valid binary integer. """ try: value = int(fbin.value.replace(" ", ""), base=2) self._cache = value fdec.value = self.__int2dec(value, length) fhex.value = self.__int2hex(value, length) fbin.value = self.__int2bin(value, length) for name, line in zip(self._slices, lines): value = self._get_slice(name) dct = self._slices[name] slen = dct["msb"] - dct["lsb"] + 1 line.children[4].value = self.__int2dec(value, slen) line.children[5].value = self.__int2hex(value, slen) line.children[6].value = self.__int2bin(value, slen) except ValueError: fbin.value = self.__int2bin("", length) fbin.on_submit(bin2other) names = OrderedDict() names["bits"] = dict(width=50) names["dir"] = dict(width=50) names["slice"] = dict(width=100) names["res"] = dict(width=60) names["dec"] = dict(width=75) names["hex"] = dict(width=75) names["bin"] = dict(width=175) names["description"] = dict(width=380, margin="0px 0px 10px 10px") header = [] for name, dct in names.items(): params = dct.copy() if "width" in params: params["width"] = "{}px".format(dct["width"]) box = widgets.HTML("<b>{}</b>".format(name), layout=params) header.append(box) header = widgets.HBox(header) hline = widgets.Box(layout=dict(border="1px solid", margin="0px 2px 5px 2px")) width_max = sum(dct["width"] for dct in names.values()) hline.layout.width = "{}px".format(width_max) lines = [] for name, dct in self._slices.items(): if dct["msb"] == dct["lsb"]: loc = "[{msb}]".format(**dct) else: loc = "[{msb}:{lsb}]".format(**dct) ro = not "W" in dct["dir"].upper() length = dct["msb"] - dct["lsb"] + 1 res = "-" if dct["res"] is None else dct["res"] try: value = self._get_slice(name) except ValueError: value = "" line = [] line.append(widgets.Label(loc)) dir = dict(R="R", W="W", RW="RW").get(dct["dir"].upper()) line.append(widgets.Label(dir)) line.append(widgets.Label(name)) line.append(widgets.Label("{}".format(res))) fdec = widgets.Text("", placeholder="0d ...", disabled=ro) fdec.value = self.__int2dec(value, length) line.append(fdec) fhex = widgets.Text("", placeholder="0x ...", disabled=ro) fhex.value = self.__int2hex(value, length) line.append(fhex) fbin = widgets.Text("", placeholder="0b ...", disabled=ro) fbin.value = self.__int2bin(value, length) line.append(fbin) fmt = '<p style="line-height:1.2">{}</p>' field = widgets.HTML(fmt.format(dct["desc"].replace("\n", "<br>"))) field.layout.margin = "0px 0px 10px 10px" line.append(field) def dec2other(obj, self=self, name=name, length=length, fdec=fdec, fhex=fhex, fbin=fbin): """ Convert the decimal value entered in the text field to other representations (hexadecimal and binary) and update the corresponding text fields for the specific bit-slice. Parameters ---------- obj : widget The widget that triggered the event. self : Register The Register instance that contains the bit-slice. name : str The name of the bit-slice. length : int The number of bits for the bit-slice. fdec : widget The decimal text field for the bit-slice. fhex : widget The hexadecimal text field for the bit-slice. fbin : widget The binary text field for the bit-slice. Returns ------- None Raises ------ ValueError If the input value is not a valid integer. """ old = self._get_slice(name) try: value = int(fdec.value.replace(" ", "")) self._set_slice(name, value) fdec.value = self.__int2dec(value, length) fhex.value = self.__int2hex(value, length) fbin.value = self.__int2bin(value, length) reg_line.children[4].value = self.__int2dec(self._cache, len(self)) reg_line.children[5].value = self.__int2hex(self._cache, len(self)) reg_line.children[6].value = self.__int2bin(self._cache, len(self)) except ValueError: fdec.value = self.__int2dec("", length) if not "W" in self._slices[name]["dir"].upper(): fdec.value = self.__int2dec(old, length) else: fdec.value = self.__int2dec("", length) fdec.on_submit(dec2other) def hex2other(obj, self=self, name=name, length=length, fdec=fdec, fhex=fhex, fbin=fbin): """ Convert the hexadecimal value entered in the text field to other representations (decimal and binary) and update the corresponding text fields for the specific bit-slice. Parameters ---------- obj : widget The widget that triggered the event. self : Register The Register instance that contains the bit-slice. name : str The name of the bit-slice. length : int The number of bits for the bit-slice. fdec : widget The decimal text field for the bit-slice. fhex : widget The hexadecimal text field for the bit-slice. fbin : widget The binary text field for the bit-slice. Returns ------- None Raises ------ ValueError If the input value is not a valid integer. """ old = self._get_slice(name) try: value = int(fhex.value.replace(" ", ""), 16) self._set_slice(name, value) fdec.value = self.__int2dec(value, length) fhex.value = self.__int2hex(value, length) fbin.value = self.__int2bin(value, length) reg_line.children[4].value = self.__int2dec(self._cache, len(self)) reg_line.children[5].value = self.__int2hex(self._cache, len(self)) reg_line.children[6].value = self.__int2bin(self._cache, len(self)) except ValueError: if not "W" in self._slices[name]["dir"].upper(): fhex.value = self.__int2hex(old, length) else: fhex.value = self.__int2hex("", length) fhex.on_submit(hex2other) def bin2other(obj, self=self, name=name, length=length, fdec=fdec, fhex=fhex, fbin=fbin): """ Convert the binary value entered in the text field to other representations (decimal and hexadecimal) and update the corresponding text fields for the specific bit-slice. Parameters ---------- obj : widget The widget that triggered the event. self : Register The Register instance that contains the bit-slice. name : str The name of the bit-slice. length : int The number of bits for the bit-slice. fdec : widget The decimal text field for the bit-slice. fhex : widget The hexadecimal text field for the bit-slice. fbin : widget The binary text field for the bit-slice. Returns ------- None Raises ------ ValueError If the input value is not a valid integer. """ old = self._get_slice(name) try: value = int(fbin.value.replace(" ", ""), base=2) self._set_slice(name, value) fdec.value = self.__int2dec(value, length) fhex.value = self.__int2hex(value, length) fbin.value = self.__int2bin(value, length) reg_line.children[4].value = self.__int2dec(self._cache, len(self)) reg_line.children[5].value = self.__int2hex(self._cache, len(self)) reg_line.children[6].value = self.__int2bin(self._cache, len(self)) except ValueError: if not "W" in self._slices[name]["dir"].upper(): fbin.value = self.__int2bin(old, length) else: fbin.value = self.__int2bin("", length) fbin.on_submit(bin2other) for field, dct in zip(line, names.values()): field.layout.width = "{}px".format(dct["width"]) lines.append(widgets.HBox(line)) boxes = [reg_header, reg_hline, reg_line, header, hline] + lines return widgets.VBox(boxes)
[docs] class RegisterMaster(mqtt_deviceattributes): """ Returns a container object for registers imported from registermaster. filename: name of registermaster (excel file), default from ENV:registermaster atomic: enables read-modify-write behavoiur of bit-slices (reset: False) interface: hardware object for STI/BiPhase protocol """ # read-only attributes def __setattr__(self, name, value): """ Set the value of an attribute for the RegisterMaster instance, while ensuring that only valid attributes can be set and that any changes to attributes that are part of the MQTT publishing mechanism trigger the appropriate MQTT updates. Parameters ---------- name : str The name of the attribute to be set. value : any The value to be assigned to the attribute. Raises ------ AttributeError If the attribute name is not valid. """ valid = ( "__class__", "_cached", "_debug", "_interface", "_protocol", "_atomic", "_forcebank", "_bank", "_len_slices", "register", "use", "mqtt_status", "_mqtt_status", "_mqttclient", "mqtt_list", "mqtt_enable", "gui", "mqtt_all", "filename", ) if name not in valid: raise AttributeError("can't set attribute {}".format(repr(name))) object.__setattr__(self, name, value) if hasattr(self, "mqtt_all") and name in self.mqtt_all: self.publish_set(name, value)
[docs] def __init__(self, logger=None, filename=None, interface=None, instname="regs", read_mod_write=False): """ Initialize a RegisterMaster instance, which serves as a container for registers imported from a register master excel file. The initialization process includes setting up attributes for caching, debugging, hardware interface, protocol, atomic operations, bank management, and MQTT publishing. Parameters ---------- logger : Logger, optional A Logger instance for logging messages (default is None, which creates a new Logger). filename : str, optional The name of the register master excel file (default is None, which uses the environment variable "registermaster"). interface : object, optional The hardware interface object for STI/BiPhase protocol (default is None). instname : str, optional The instance name for the RegisterMaster (default is "regs"). read_mod_write : bool, optional Enables read-modify-write behavior for bit-slices (default is False). """ global mylogger self.gui = "pylab_ml.gui.instruments.regs.registermaster" _setattr = object.__setattr__.__get__(self, self.__class__) super().__init__() self.mqtt_all = ["filename", "use"] filename = os.environ.get("REGISTERMASTER") if filename is None else filename mylogger = logger if logger is not None else Logger() _setattr("instName", instname) _setattr("filename", filename) _setattr("_cached", False) _setattr("_debug", False) _setattr("_buffer", []) _setattr("_regs", {}) _setattr("_cpuregs", {}) _setattr("_protocol_typ", None) _setattr("_interface", interface) _setattr("_protocol", None) _setattr("_forcebank", True) _setattr("_bank", -1) _setattr("_atomic", read_mod_write) _setattr("_len_reg", 0)
def __repr__(self): """ Return a string representation of the RegisterMaster instance, including its class name and key attributes such as the filename, hardware interface, and read-modify-write behavior. Returns ------- str A string representation of the RegisterMaster instance, including its class name and key attributes. """ args = ["{!r}".format(self.filename)] if self._interface is not None: args.append("interface={!r}".format(self._interface)) args.append("read_mod_write={!r}".format(self._atomic)) return "{classname}({args})".format(classname=self.__class__.__name__, args=", ".join(args))
[docs] def init(self): """ Initialize the RegisterMaster instance by loading the register definitions from the specified register master excel file, building the register database, and creating Register objects for each register defined in the database. """ try: from labml_adjutancy.ctrl.labctrl import mqttc except Exception: mqttc = None filename = self.filename if filename is None: print(f"{__class__}: no filename defined") return # raise IOError(f"{__class__}: no filename defined") print(f' {self.instName}.init: self._mqttclient = {self._mqttclient}') if self._mqttclient is None and mqttc is not None: self.mqtt_add(mqttc, self) blocked_regs = tuple(self.__class__.__dict__) blocked_slices = tuple(Register.__dict__) db = RegDB(filename) db.build_database() for item in db.database[0]["registers"]: slices = OrderedDict() for bsl in item["slices"]: name = bsl["bsn"] if name in blocked_slices: msg = "WARNING: can't use name {!r} for slice\n" "Change slice name {!r} (line {!r}) in file {!r}" msg = msg.format(name, name, bsl["id"], filename) print(msg) elif not isinstance(bsl["posmin"], int): msg = "WARNING: bit-slice {!r} in register {!r} has " msg += "invalid lsb: {!r}" print(msg.format(name, item["blk"], bsl["posmin"])) elif not isinstance(bsl["posmax"], int): msg = "WARNING: bit-slice {!r} in register {!r} has " msg += "invalid msb: {!r}" print(msg.format(name, item["blk"], bsl["posmax"])) else: try: valres = common.str2num(bsl["res"]) except ValueError: valres = None slices[name] = dict( lsb=bsl["posmin"], msb=bsl["posmax"], res=valres, dir=bsl["dir"], desc=bsl["des"], ) name = item["blk"] if name in blocked_regs: msg = "WARNING: can't use name {!r} for register\n" "Change register name {!} in file {!r}" msg = msg.format(name, name, filename) print(msg) elif item.get("adr") is None and item.get("prgadr") is None: msg = "WARNING: register {!r} has invalid address: {!r}" # print(msg.format(name, item['adr'])) print(msg.format(name, item)) # elif item.get ('bank'): # to avoid Keyerror if 'bank' not exist --> not necessary anymore # msg = 'WARNING: register {!r} has invalid bank: {!r}' # print(msg.format(name, item['bank'])) else: addr = -1 cpuaddr = -1 if item.get("adr") is not None and item["adr"] != "": addr = item["adr"] elif item.get("prgadr") is not None and item["prgadr"] != "": addr = item["prgadr"] if item.get("cpuadr") is not None and item["cpuadr"] != "": cpuaddr = item["cpuadr"] if name != "": reg = Register( cpuaddr=cpuaddr, addr=addr, bank=item.get("bank"), name=name, description=item["des"], slices=slices, rm=self, ) reg = reg._Register__slices2attr() if not isinstance(reg._addr, int): msg = f"ERROR: {name} addr is not a integer! addr= {addr}" print(msg) continue if reg._addr >= 0: self._regs.setdefault(reg._addr, []).append(reg) if reg._cpuaddr >= 0: self._cpuregs.setdefault(reg._cpuaddr, []).append(reg) object.__setattr__(self, item["blk"], reg) len_max = max(reg._len_slices() for reg in self) object.__setattr__(self, "_len_reg", len_max) for reg in self: if reg._len_slices() < len_max: msg = "WARNING: misses some slices ({} {}) in register {!r}" num = len_max - reg._len_slices() unit = "bits" if num > 1 else "bit" msg = msg.format(num, unit, reg._name) try: self.use = "tin" except AttributeError: pass object.__setattr__(self, "mapping", {}) object.__setattr__(self, "_len_slices", None) object.__setattr__(self, "banks", {}) self.register = self.__dict__.copy() # create attribute register with all registernames for reg in self.__dict__: if reg != "register" and not hasattr(self.register[reg], "_Register__has_reset"): self.register.pop(reg, None) elif reg != "register": bank = self.register[reg]._bank if bank is not None and (bank not in self.banks or self.banks[bank] < self.register[reg].addr): self.banks[bank] = self.register[reg].addr if bank is not None and bank != "": self.mapping[bank] = self.register[reg]._len_slices() self.mqtt_list = list(self.register) + self.mqtt_all return self
def __iter__(self): """ Iterate over the registers in the RegisterMaster instance, yielding each register object one by one. The iteration order depends on the protocol type being used (either "tin" or another protocol), and the method ensures that all registers are included in the iteration regardless of their organization in the internal data structures. """ global mylogger if self._protocol_typ == "tin": for regs in self._cpuregs.values(): for reg in regs: yield reg else: if not bool(self._regs.values()): mylogger.log_message(LogLevel.Error(), "\n####################################################") mylogger.log_message(LogLevel.Error(), 'ERROR: No "addr" has been defined in register master') mylogger.log_message(LogLevel.Error(), "####################################################\n") for regs in self._regs.values(): for reg in regs: yield reg def __len__(self): """ Return the total number of registers in the RegisterMaster instance, which is calculated based on the protocol type being used. If the protocol type is "tin", the method counts the number of registers in the "_cpuregs" data structure, while for other protocol types, it counts the registers in the "_regs" data structure. Returns ------- int The total number of registers in the RegisterMaster instance. """ if self._protocol_typ == "tin": return sum(len(regs) for regs in self._cpuregs.values()) else: return sum(len(regs) for regs in self._regs.values()) def _mqtt2json(self, value, attr=None): """ Convert the value of an attribute to a JSON-compatible format for MQTT publishing, while ensuring that only attributes that are part of the MQTT publishing mechanism are included in the conversion. Parameters ---------- value : any The value of the attribute to be converted to JSON format. attr : str, optional The name of the attribute being converted (default is None). Returns ------- any The value of the attribute in a JSON-compatible format if the attribute is part of the MQTT publishing mechanism, or the string "nomqtt" if the attribute is not part of the MQTT publishing mechanism. """ if attr in self.mqtt_all: return value return "nomqtt" def _register_to_list(patterns: Union[str, Iterable[str]]) ->List[str]: """ Convert a string or an iterable of strings into a list of strings. If the input is a single string, it will be wrapped in a list. If the input is already an iterable of strings, it will be converted to a list. Parameters ---------- patterns : str or Iterable[str] A string or an iterable of strings to be converted into a list of strings. Returns ------- List[str] A list of strings derived from the input patterns. If the input is a single string, the output will be a list containing that string. If the input is an iterable of strings, the output will be a list containing all the strings from the iterable. """ if isinstance(patterns, str): return[patterns] return list(patterns) def close(self): pass
[docs] def find(self, addr): """ Find the name of a register based on its address, depending on the protocol type being used. Parameters ---------- addr : int The address of the register to be found. Returns ------- str or None The name of the register if found, or None if no register with the specified address exists. """ try: if self._protocol_typ == "tin": return self._cpuregs[addr][0] else: return self._regs[addr][0]._name except Exception: return None
@property def use(self): """ Get the current protocol type. eg. self.regs.use returns "tin" or "spi" or "biphase" depending on the protocol type being used. Returns ------- str The current protocol type. """ return self._protocol_typ @use.setter def use(self, typ): """ Set/Get interface to 'typ'. typ must be defined in the interface """ interface = getattr(self._interface, typ) object.__setattr__(self, "_protocol", interface) object.__setattr__(self, "_protocol_typ", typ) object.__setattr__(self, "_bank", -1) if hasattr(self._protocol, "init"): self._protocol.init()
[docs] def set_bank(self, adr): """ Set the bank from a giving adr and recalculate adr. Parameters ---------- adr : int address. Returns ------- adr : int address without bank. """ if self.banks: for bank in self.banks.keys(): if adr <= self.banks[bank]: self._bank = bank break if self.mapping != {}: mybank = 0 for bank in self.mapping: mybank = bank if adr >= bank else mybank # self._bank = mybank self._len_slices = self.mapping[mybank] # adr = adr - mybank # TODO: cj 7.2.2026 das ist doch falsch, check it with a real project else: self._bank = None self._len_slices = None return adr
def _call_from_string(self, adr, callstr): """ Call a function for all registers matching the given address pattern, using a string representation of the function call. Parameters ---------- adr : str The address pattern to match registers against, which can include wildcards for flexible matching. callstr : str A string representation of the function call to be executed for each matching register, including the function name and its arguments in a format that can be parsed and evaluated. Returns ------- result : list or int The result of the function calls for each matching register, aggregated based on the type of the input data. """ result = [] regs = self._register_to_list() call = ast.parse(callstr, mode='eval').body args = [ast.literal_eval(a) for a in call.args] kwargs = {kw.arg: ast.literal_eval(kw.value) for kw in call.keywords} index = 0 dat = args[0] result = 0 if type(dat) is list else [] for reg in regs: if fnmatch.fnmatchcase(reg._name, adr): function = getattr(getattr(self, reg._name), call.func.id) if type(dat) is list: args[0] = dat[index] resu = function(*args, **kwargs) if type(dat) is list: result += resu else: result.append(resu) result.append(function(*args, **kwargs)) index += 1 return result
[docs] def readreg(self, adr, bank=None, compare=None, onlycheck=True, tolerance=0, mask=None): """ Read Register with selected protocol. Parameters ---------- adr : int or str or list The address or addresses of the register(s) to read. bank : int, optional The bank address, if applicable. compare : int or list, optional The value(s) to compare against, if any. onlycheck : bool, optional If True, only perform the check without returning the value. tolerance : int, optional The tolerance for the comparison. mask : int, optional The mask to apply to the value before comparison. Returns ------- int or list or tuple The result of the read operation, which can be an integer value, a list of values, or a tuple containing the comparison result and the value, depending on the parameters provided. """ if type(adr) is str: if len(adr) == 0: mylogger.log_message(LogLevel.Error(), 'readreg: adr is empty') return elif adr[0].isdigit(): adr = list(common.arange(adr)) else: return self._call_from_string(adr, f"read({compare}, {onlycheck}, {tolerance}, {mask})") if type(adr) is list: result = [] if compare is None else 0 index = 0 for a in adr: comp = compare[index] if type(compare) is list else compare resu = self._readreg(a, bank, comp, onlycheck, tolerance, mask) if compare is None: result.append(resu) else: result += resu index += 1 return result return self._readreg(adr, bank, compare, onlycheck, tolerance, mask)
def _readreg(self, adr, bank=None, compare=None, onlycheck=True, tolerance=0, mask=None): """ Read Register with selected protokoll. Parameters ---------- adr : int address. bank : int bank adress compare : int compare value, if None than no compare onlycheck : bool if True, only perform the check without returning the value. tolerance : int tolerance for the comparison. mask : int mask to apply to the value before comparison. Returns ------- value : int The value read from the register, potentially after applying a mask and comparison, depending on the parameters provided. error : int The error code resulting from the comparison, if applicable. check : int If compare is defined, this is the compare result: 0 = ok 1 = error """ hadr = adr if bank is not None: self._bank = bank else: adr = self.set_bank(adr) if self._bank is not None and self._bank != "": self._protocol.writebase(self._bank) value = self._protocol.readreg(adr, compare=compare, tolerance=tolerance, mask=mask) mylogger.log_message(LogLevel.Measure(), f"readreg(0x{hadr:02x}) == {hex(value)}") self.publish_set(f"readreg(0x{hadr:02x})", value) if self._len_slices is not None: value &= 2**self._len_slices - 1 if compare is not None: bank = 0 if self._bank is None else self._bank if value is not None: error = common.check(f"{hex(bank+adr)}", compare, value, tolerance, mask) if onlycheck and value is not None: return error elif onlycheck: return 0 else: return error, value return value
[docs] def writereg(self, adr, dat, bank=None): """ Write Register with selected protocol. Parameters ---------- adr : int or str or list The address or addresses of the register(s) to write to. dat : int or list The data to write to the register(s). bank : int, optional The bank address. If None, the current bank is used. Returns ------- None. """ if type(adr) is str: if len(adr) == 0: mylogger.log_message(LogLevel.Error(), 'writereg: adr is empty') return elif adr[0].isdigit(): adr = list(common.arange(adr)) else: return self._call_from_string(adr, f"write({dat})") if type(adr) is list: index = 0 for a in adr: d = dat[index] if type(dat) is list else dat self._writereg(a, d, bank) index += 1 return self._writereg(adr, dat, bank)
def _writereg(self, adr, dat, bank=None): """ Write Register with selected protokoll. Parameters ---------- adr : int address. dat : int data. bank : int bank adress Returns ------- None. """ hadr = adr if bank is not None: self._bank = bank else: adr = self.set_bank(adr) if self._bank is not None and self._bank != "": self._protocol.writebase(self._bank) self._protocol.writereg(adr, dat) msg = f"writereg(0x{hadr:02x}) := {hex(dat)}" mylogger.log_message(LogLevel.Measure(), msg) self.publish_set(f"writereg({hadr})", dat)
[docs] def reset(self): """ Reset the RegisterMaster instance by resetting the protocol and clearing the cache for all registers. """ self._protocol.reset() object.__setattr__(self, "_bank", -1) for reg in self: object.__setattr__(reg, "__cache__", None)
[docs] def reset_internal(self): """ Reset the internal state of the RegisterMaster instance by resetting the protocol's internal state and clearing the cache for all registers. """ object.__setattr__(self, "_bank", -1) try: self._protocol.reset_internal() except AttributeError: msg = "{!r} protocol has no 'reset_internal'." raise AttributeError(msg.format(self._protocol.__class__.__name__)) for reg in self: object.__setattr__(reg, "__cache__", None)
[docs] def reset_regs(self, default=None, force=False): """ Reset the slices to the reset value Parameters ---------- default : int, optional The value to set the register to. If None, default values 0 will be used. force : bool, optional False -> Only a writable slice sees a reset. True -> Readable and writable slice sees a reset. Returns ------- None. """ global mylogger for regname in self: try: # print (regname) for slices in regname._slices: if "W" in regname._slices[slices]["dir"] or force: regname._use_reset(default, force) except Exception: mylogger.log_message(LogLevel.Warning(), f"Error in {regname._name} with resetvalues -> Check your Registermaster") pass mylogger.log_message(LogLevel.Info(), "Reset register Cache to resetvalues")
[docs] def handle_exception(self, msg, typ=None, name=None): """ Handle exceptions by logging an error message using the logger instance, and optionally raising a specified exception type with the provided message. Parameters ---------- msg : str The error message to be logged and potentially included in the raised exception. typ : Exception class, optional The type of exception to be raised. If None, no exception is raised. name : str, optional The name associated with the error message. Returns ------- None. """ global mylogger if mylogger is not None: if name is None: mylogger.log_message(LogLevel.Error(), msg) else: mylogger.log_message(LogLevel.Error(), f'{name}: {msg}') elif typ is not None: raise typ(msg) else: ValueError(msg)
[docs] def set_configuration_values(self, data): """ Only empty dummy function. Parameters ---------- data : dict A dictionary containing configuration values to be set for the RegisterMaster instance. Returns ------- None. """ global mylogger mylogger.log_message(LogLevel.Warning(), "pylab_ml.RegisterMaster: set_configuration_values only dummy function.....") pass
[docs] def apply_configuration(self, data): """ Call from the Plugin if the modul placed in the hardwaresetups. Parameters ---------- data : dict A dictionary containing configuration values to be applied to the RegisterMaster instance, such as the filename. Returns ------- None. """ _setattr = object.__setattr__.__get__(self, self.__class__) config = environment.replaceEnvs(data) filename = config["filename"] if "filename" in config and config["filename"] != "" else self.filename filename = environment.checkNetworkPath(filename) if filename is not None and filename.find("..") == 0 and "WORKING_DIR" in os.environ: filename = filename.replace("..", os.environ["WORKING_DIR"]) _setattr("filename", filename) instname = config["instance name"] if "instance name" in config and config["instance name"] != "" else self.instName _setattr("instName", instname) read_mod_write = config["read mod write"] if "read mod write" in config and config["read mod write"] != "" else self._atomic _setattr("_atomic", read_mod_write) forcebank = config["force bank"] if "force bank" in config and config["force bank"] != "" else self._forcebank _setattr("_forcebank", forcebank) resetvalue = config["reset value"] if "reset value" in config and config["reset value"] != "" else 0 self.init()
# self.reset_regs(resetvalue) # TODO: definition interface is missing if __name__ == "__main__": filename = "your_registermaster.xls" regs = RegisterMaster(filename=filename) regs.init()