# $Id: $
"""
User interface to the register master excel file.
This excel file descibes all registers of a sensor. For each register
the following items are mentioned:
- name
- address
- bank (optional)
- description
- bitslices
A bitslice is a range of one or more consecutive bits in a register. Each
bitslice is descripted by the following items:
- name
- index_startbit
- index_stopbit
- read/write direction
- reset value
- description
"""
import re
import fnmatch
import ast
from typing import Iterable, List, Union
import sys
import os
import pandas as pd
import ipywidgets as widgets
try:
from itertools import zip_longest
except ImportError:
from itertools import izip_longest as zip_longest
from collections import OrderedDict # , namedtuple
from ate_common.logger import LogLevel
from pylab_ml.common.mqtt_client import mqtt_deviceattributes
from pylab_ml.common import environment
from pylab_ml.common import common
__copyright__ = "Copyright 2023, Lab"
__version__ = "0.0.3"
mylogger = None
[docs]
class Logger():
"""Basic Logger, if a real logger not necessary."""
[docs]
def log_message(self, level, msg):
"""Print out the message."""
print(msg)
[docs]
class RegDB(object):
""" Class to read the register master excel file and to build a database of registers. """
[docs]
def __init__(self, filename=""):
"""
Initialize the class by reading the excel file and building the database.
Parameters
----------
filename : str
The path to the register master excel file.
"""
import xlrd
self.bk = xlrd.open_workbook(filename)
self.database = None
self.database = []
[docs]
def get_row_data(self, bk, sh, rowx, colrange):
"""
Utility function to extract a single row out of an Excel sheet
Parameters
----------
bk : xlrd.Book
The workbook object.
sh : xlrd.Sheet
The sheet object.
rowx : int
The index of the row to extract.
colrange : range
The range of column indices to extract.
Returns
-------
result : list
A list of tuples containing the column index, cell type, cell value, and cell formatting index.
"""
import xlrd
result = []
dmode = bk.datemode
ctys = sh.row_types(rowx)
cvals = sh.row_values(rowx)
for colx in colrange:
cty = ctys[colx]
cval = cvals[colx]
if bk.formatting_info:
cxfx = str(sh.cell_xf_index(rowx, colx))
else:
cxfx = ""
if cty == xlrd.XL_CELL_DATE:
try:
showval = xlrd.xldate_as_tuple(cval, dmode)
except xlrd.XLDateError:
e1, e2 = sys.exc_info()[:2]
showval = "%s:%s" % (e1.__name__, e2)
cty = xlrd.XL_CELL_ERROR
elif cty == xlrd.XL_CELL_NUMBER:
showval = "%.0f" % (cval)
elif cty == xlrd.XL_CELL_ERROR:
showval = xlrd.error_text_from_code.get(cval, "<Unknown error code 0x%02x>" % cval)
else:
showval = "%s" % (cval)
result.append((colx, cty, showval, cxfx))
return result
[docs]
def build_database(self):
""" Fill database with register master fields by iterating through all sheets and rows of the excel file.
The database is a list of dictionaries, each dictionary represents a register block and contains the following keys:
- name: The name of the register block.
- id: The row index of the block definition in the excel sheet.
- cells: A dictionary mapping column indices to cell names (e.g., 'blk', 'pos', etc.).
- types: A dictionary mapping cell names to type conversion definitions (e.g., 'int', 'hex', etc.).
- checks: A dictionary mapping cell names to check definitions (e.g., 'hu', 'bu', etc.).
- registers: A list of dictionaries, each representing a register defined in the block, containing the following keys:
- blk: The name of the register block.
- pos: The position of the register within the block.
- slices: A list of dictionaries, each representing a bitslice defined for the register, containing the following keys:
- posmin: The starting bit index of the bitslice.
- posmax: The ending bit index of the bitslice.
- poswidth: The width of the bitslice (posmax - posmin + 1).
- posmask: A bitmask corresponding to the bitslice (e.g., for bits 3:1, posmask would be 0b1110).
- other fields as defined in the excel sheet (e.g., type conversions, checks, etc.).
"""
shxrange = range(self.bk.nsheets)
#
# Iterating over all sheets of the file
#
for shx in shxrange:
sh = self.bk.sheet_by_index(shx)
nrows, ncols = sh.nrows, sh.ncols
colrange = range(ncols)
rowrange = range(nrows)
# print "Processing sheet %d: name = %r; nrows = %d; ncols = %d" % (shx, sh.name, sh.nrows, sh.ncols)
if nrows and ncols:
# Attempt to access the RHS corners
sh.row_types(0)[ncols - 1]
sh.row_values(0)[ncols - 1]
sh.row_types(nrows - 1)[ncols - 1]
sh.row_values(nrows - 1)[ncols - 1]
cell_row = -1
cell_col = -1
pos_col = -1
actual_block = None
actual_header = None
#
# Iterating over all rows of the sheet
#
for rowx in rowrange:
row_data = {}
#############################################################################
#
# Fill 'row_data' dictionary with data of a complete row
#
for colx, ty, val, _unused in self.get_row_data(self.bk, sh, rowx, colrange):
#
# First: idling until the keyword '::blk(xxx)' is found
#
if not actual_block:
try:
m = re.match("^::blk\(([^\)]*)\)", val)
except Exception:
m = None
if m:
#
# '::blk(xxx)' found --> preparing 'actual_block' dictionary
#
cell_col = colx
cell_row = rowx
actual_block = {"cells": {}, "types": {}, "checks": {}, "registers": []}
actual_block["cells"][colx] = "blk"
actual_block["name"] = m.group(1)
actual_block["id"] = rowx
else:
#
# While in the definition row, fill up the 'cells' dictionary
#
if cell_row == rowx:
try:
m = re.match("^([a-zA-Z_]+[a-zA-Z0-9_]*)$", val)
except Exception:
m = None
if m:
if m.group(1) == "pos":
pos_col = colx
actual_block["cells"][colx] = m.group(1)
else:
#
# Otherwise just filling 'row_data'
#
try:
m = re.match(".*", val)
except Exception:
m = None
if m:
row_data[colx] = val
#############################################################################
#
# If 'row_data" was filled up, processing it
#
if row_data:
#########################################################################
#
# Check for a type conversion definition row ('::typ' in 'blk' column)
#
try:
m = re.match("^::typ", row_data[cell_col])
except Exception:
m = None
if m:
#
# If found, fill up the 'types' dictionary with valid type conversions
#
for col in actual_block["cells"].keys():
try:
dat = row_data[col]
except Exception:
dat = None
if dat:
try:
m = re.match("^(int|hex|dec|bin|flt|str)$", row_data[col])
except Exception:
m = None
if m:
actual_block["types"][actual_block["cells"][col]] = row_data[col]
continue
continue
#########################################################################
#
# Check for a check definition row ('::check' in 'blk' column)
#
try:
m = re.match("^::check", row_data[cell_col])
except Exception:
m = None
if m:
#
# If found, fill up the 'checks' dictionary with valid checks
#
for col in actual_block["cells"].keys():
try:
dat = row_data[col]
except Exception:
dat = None
if dat:
try:
m = re.match("^(hu|bu|br|,)+$", row_data[col])
except Exception:
m = None
if m:
actual_block["checks"][actual_block["cells"][col]] = row_data[col]
continue
continue
#########################################################################
#
# Check for a header definition row ('hdr' in 'pos' column)
#
try:
m = re.match("^hdr$", row_data[pos_col])
except Exception:
m = None
if m:
#
# If found, prepare a new 'actual_header' dictionary
# (if there already exists a filled dictionary, add it to
# the list of registers of the 'actual_block')
#
if actual_header:
for col in actual_block["cells"].keys():
try:
s = actual_header[actual_block["cells"][col]]
except Exception:
actual_header[actual_block["cells"][col]] = ""
actual_block["registers"].append(actual_header)
actual_header = {"slices": [], "id": rowx}
#
# Iterate through all valid columns
#
for col in actual_block["cells"].keys():
try:
s = row_data[col]
except Exception:
s = None
if s is not None and s != "":
#
# If a valid entry is found, perform type conversion
#
conversion_error = None
try:
conv = actual_block["types"][actual_block["cells"][col]]
except Exception:
conv = "str"
if conv == "int" or conv == "dec":
try:
dat = int(s)
except Exception:
dat = s
conversion_error = True
elif conv == "hex":
try:
dat = int(s, 16)
except Exception:
dat = s
conversion_error = True
elif conv == "bin":
try:
dat = int(s, 2)
except Exception:
dat = s
conversion_error = True
elif conv == "flt":
try:
dat = float(s)
except Exception:
dat = s
conversion_error = True
else:
dat = s
# if conversion_error:
# print "ERROR: While converting value '%s' to type '%s' for cell '%s' in header '%s'"%(dat, conv, actual_block['cells'][col], actual_header['blk'])
#
# If a check is specified, perform it
#
try:
c = re.match("hu", actual_block["checks"][actual_block["cells"][col]])
except Exception:
c = None
if c:
#
# Check for unique value in header cell
#
for register in actual_block["registers"]:
try:
p = register[actual_block["cells"][col]]
except Exception:
p = None
# if p and p == dat:
# print "ERROR: Duplicate value '%s' in cell '%s' for header '%s' (same value found in header '%s')"%(dat, actual_block['cells'][col], actual_header['blk'], register['blk'])
#
# Store value into 'actual_header' dictionary
#
actual_header[actual_block["cells"][col]] = dat
#
# If an inheritance is requested, perform this
#
# ### --> TO BE DONE (adapt 'blk' value and copy slice array from somewhere)
continue
#########################################################################
#
# Check for a slice definition row (slice definition in 'pos' column)
#
try:
m = re.match("^([0-9][0-9:]*)$", row_data[pos_col])
except Exception:
m = None
if m:
#
# If found, prepare a new 'actual_slice' dictionary
#
flds = m.group(0).split(":")
if len(flds) == 2:
mn = int(flds[1])
mx = int(flds[0])
actual_slice = {"id": rowx, "posmin": mn, "posmax": mx, "poswidth": (mx - mn), "posmask": (((1 << (mx + 1)) - 1) - ((1 << mn) - 1))}
else:
mn = int(flds[0])
actual_slice = {"id": rowx, "posmin": mn, "posmax": mn, "poswidth": 1, "posmask": (1 << mn)}
#
# Iterate through all valid columns
#
for col in actual_block["cells"].keys():
try:
s = row_data[col]
except Exception:
s = None
if s is not None and s != "":
#
# If a valid entry is found, perform type conversion
#
conversion_error = None
try:
conv = actual_block["types"][actual_block["cells"][col]]
except Exception:
conv = "str"
if conv == "int" or conv == "dec":
try:
dat = int(s)
except Exception:
dat = s
conversion_error = True
elif conv == "hex":
try:
dat = int(s, 16)
except Exception:
dat = s
conversion_error = True
elif conv == "bin":
try:
dat = int(s, 2)
except Exception:
dat = s
conversion_error = True
elif conv == "flt":
try:
dat = float(s)
except Exception:
dat = s
conversion_error = True
else:
dat = s
# if conversion_error:
# print "ERROR: While converting value '%s' to type '%s' for cell '%s' in slice '%s' of header '%s'"%(dat, conv, actual_block['cells'][col], m.group(0), actual_header['blk'])
#
# If a check is specified, perform it
#
try:
c = re.match("bu", actual_block["checks"][actual_block["cells"][col]])
except Exception:
c = None
if c:
#
# Check for unique value in slice cell
#
for slice in actual_header["slices"]:
try:
p = slice[actual_block["cells"][col]]
except Exception:
p = None
# if p and p == dat:
# print "ERROR: Duplicate value '%s' in cell '%s' for slice '%s' of header '%s'"%(dat, actual_block['cells'][col], actual_slice['pos'], actual_header['blk'])
#
# Store value into 'actual_slice' dictionary
#
actual_slice[actual_block["cells"][col]] = dat
#
# Check whether all required entries are existant
#
for check in actual_block["checks"].keys():
try:
c = re.match("br", actual_block["checks"][check])
except Exception:
c = None
if c:
try:
p = re.match("^.*$", actual_slice[check])
except Exception:
p = None
# if not p:
# print "ERROR: Required cell '%s' not found for slice '%s' of header '%s'"%(check, actual_slice['pos'], actual_header['blk'])
#
# Add 'actual_slice' dictionary to 'actual_header' dictionary
#
if actual_header and actual_slice:
for col in actual_block["cells"].keys():
try:
s = actual_slice[actual_block["cells"][col]]
except Exception:
actual_slice[actual_block["cells"][col]] = ""
actual_header["slices"].append(actual_slice)
continue
#
# Add 'actual_header' dictionary to 'actual_block' dictionary (if one exists)
#
if actual_header:
for col in actual_block["cells"].keys():
try:
s = actual_header[actual_block["cells"][col]]
except Exception:
actual_header[actual_block["cells"][col]] = ""
actual_block["registers"].append(actual_header)
#
# Add 'actual_block' dictionary to 'database' array (if one exists)
#
if actual_block:
self.database.append(actual_block)
actual_block = None
return
[docs]
class Register:
""" Class to represent a register defined in the register master excel file. """
# read-only attributes
def __setattr__(self, name, value):
"""
Set attribute value, but only for a predefined set of attributes.
(e.g., '_cache', 'value', 'read', 'write', etc.) and the bit-slice names defined in the register master excel file.
Parameters
----------
name : str
The name of the attribute to set.
value : any
The value to set for the attribute.
Raises
------
AttributeError
If the attribute name is not valid.
"""
valid = "__cache__", "_cache", "_debug", "value", "value_bin", "read", "write", "res"
if name not in valid and name not in self._slices:
raise AttributeError("can't set attribute {}".format(repr(name)))
object.__setattr__(self, name, value)
def _get_addr(self):
"""
Get the register address based on the current protocol type and the defined addresses in the register master excel file.
Returns
-------
paddr : int
The register address to access.
Raises
------
ValueError
If the interface is in a mode that does not allow access to the register.
(e.g., if the protocol type is 'tin' but the CPU address is not defined).
"""
paddr = self._addr # set to default
if self._rm._protocol_typ == "tin":
paddr = self._cpuaddr
if paddr == -1:
msg = "access not possible, interface in {!r} mode, please use e.g. tin".format(self._rm._protocol_typ)
self._rm.handle_exception(msg, ValueError, self._name)
return paddr
[docs]
def writebase(self, bank):
"""
Write the bank address to the interface if the current bank is different from the given bank or if the forcebank flag is set.
Parameters
----------
bank : int
The bank address to write to the interface.
"""
if self._rm._forcebank or (bank != self._rm._bank):
self._rm._protocol.writebase(bank)
self._rm._bank = bank
[docs]
def __init__(self, cpuaddr, addr, bank=0, name="", slices={}, description="", rm=None):
"""
Initialize the Register object with the given parameters and set up the necessary attributes.
Parameters
----------
cpuaddr : int
The CPU address of the register.
addr : int
The address of the register.
bank : int, optional
The bank address of the register (default is 0).
name : str, optional
The name of the register (default is "").
slices : dict, optional
The bit-slices of the register (default is {}).
description : str, optional
The description of the register (default is "").
rm : RegisterMaster, optional
The RegisterMaster instance (default is None).
"""
global mylogger
_setattr = object.__setattr__.__get__(self, self.__class__)
if bank == "":
mylogger.log_message(LogLevel.Warning(), f"Regisermaster {name} entray 'bank' is empty, use 0 instead")
bank = 0
_setattr("_cpuaddr", cpuaddr)
_setattr("_addr", addr)
_setattr("_bank", bank)
_setattr("_name", name)
_setattr("_slices", slices)
_setattr("_desc", description)
_setattr("_rm", rm)
_setattr("_reset", 0)
_setattr("__cache__", None)
_setattr("_rw", all([slc["dir"] == "RW" or slc["dir"] == "R" for slc in slices.values()]))
_setattr("mqtt_list", ["read", "write"])
def _validate(self, value, length):
"""
Validate the given value against the specified length (number of bits) and ensure it is a non-negative integer that fits within the allowed range for the register.
Parameters
----------
value : any
The value to validate.
length : int
The number of bits for the register (used to determine the valid range for the value).
Returns
-------
value : int
The validated value if it is valid.
Raises
------
ValueError
If the value is not an integer, is negative, or exceeds the maximum allowed value for the specified length.
"""
if isinstance(value, int):
if value > 2**length - 1:
msg = "{!r} too big for {} bits".format(value, length)
self._rm.handle_exception(msg, ValueError, self._name)
elif value < 0:
msg = "value = {!r} must be positive".format(value)
self._rm.handle_exception(msg, ValueError, self._name)
else:
msg = "value {!r} is not an integer".format(value)
self._rm.handle_exception(msg, ValueError, self._name)
return value
@property
def _cache(self):
"""
Get the cached value of the register if it is available and valid,
otherwise raise an error if the register is in non-atomic mode and the cache is empty.
Returns
-------
__cache__ : int or None
The cached value of the register if it is available and valid, or None if the cache is empty.
"""
if not self._rm._atomic and self.__cache__ is None:
msg = "cache is empty\n"
msg += "use either _cache = <value>, _use_reset(), "
msg += "_read() or _write(<value>)"
self._rm.handle_exception(msg, ValueError, self._name)
return self.__cache__
@_cache.setter
def _cache(self, value):
if value is None:
object.__setattr__(self, "__cache__", None)
else:
self._validate(value, len(self))
object.__setattr__(self, "__cache__", value)
@property
def value(self):
"""
Bit-slice over full register,shadow register, read/write the true regiser only if None
Returns
-------
value : int
The value of the register, either from the cache or by reading from the hardware if the cache is empty and the register is in atomic mode.
"""
if self._rm._atomic and self.__cache__ is None:
if self._bank is not None and self._bank != "":
self.writebase(self._bank)
value = self._rm._protocol.readreg(self._get_addr())
value &= 2 ** self._len_slices() - 1
self._check_r_err()
else:
value = self._cache
return value
@value.setter
def value(self, value):
self._validate(value, len(self))
if self._rm._atomic and self.__cache__ is None:
if self._bank is not None and self._bank != "":
self.writebase(self._bank)
self._rm._protocol.writereg(self._get_addr(), value)
self._check_w_err()
else:
self._cache = value
@property
def addr(self):
"""
Returns the register address
Returns
-------
int
The address of the register to access, determined by the current protocol type and
the defined addresses in the register master excel file.
"""
return self._get_addr()
def _get_slice(self, name):
"""
Get the value of a specific bit-slice of the register by applying the appropriate bitmask and
shift operations to either the cached value or by reading from the hardware if the cache is empty and
the register is in atomic mode.
Parameters
----------
name : str
The name of the bit-slice to retrieve.
Returns
-------
int
The value of the specified bit-slice.
Raises
------
ValueError
If the specified bit-slice is not readable or if there is an error while reading from the hardware.
"""
lsb = self._slices[name]["lsb"]
msb = self._slices[name]["msb"]
length = msb - lsb + 1
mask = (2**length - 1) << lsb
if self._rm._atomic and self.__cache__ is None:
if self._bank is not None and self._bank != "":
self.writebase(self._bank)
value = self._rm._protocol.readreg(self._get_addr())
value &= 2 ** self._len_slices() - 1
self._check_r_err()
else:
value = self._cache
value = (value & mask) >> lsb
return value
def _set_slice(self, name, value, force=False):
"""
Set the value of a specific bit-slice of the register by applying the appropriate bitmask and
shift operations to either the cached value or by writing to the hardware if the cache is empty and
the register is in atomic mode.
Parameters
----------
name : str
The name of the bit-slice to set.
value : int
The value to set for the specified bit-slice.
force : bool, optional
If True, allows setting the value of a bit-slice even if it is not marked as writable (default is False).
Returns
-------
None
Raises
------
ValueError
If the specified bit-slice is not writable and force is False, or if there is an error while writing to the hardware.
"""
if not force and "W" not in self._slices[name]["dir"].upper():
msg = "can't write slice {!r}, it's not writeable"
msg = msg.format(name)
raise ValueError(msg)
lsb = self._slices[name]["lsb"]
msb = self._slices[name]["msb"]
length = msb - lsb + 1
self._validate(value, length)
if self._rm._atomic and self.__cache__ is None:
if not self._rw:
msg = "can´t use read-modify-write because "
msg += "not all bits are readable\n"
msg += "use either _cache = <value>, _use_reset(), "
msg += "_read() or _write(<value>)"
raise ValueError(msg.format(self._name))
if self._bank is not None and self._bank != "":
self.writebase(self._bank)
current = self._rm._protocol.readreg(self._get_addr())
current &= 2 ** self._len_slices() - 1
self._check_r_err()
elif self.__cache__ is None:
current = self.read()
else:
current = self._cache
# clear all bits between msb:lsb
mask = (2**length - 1) << lsb
current &= ~mask # & (2**len(obj) - 1)
# set value
current = current | (value << lsb)
if self._rm._atomic and self.__cache__ is None:
self._rm._protocol.writereg(self._get_addr(), current)
self._check_w_err()
else:
self._cache = current
def _use_reset(self, default=None, force=False):
"""
Copy reset values from all bit-slices into _cache.
If force == False -> Only a writable slice sees a reset.
== True -> readable and writable slice sees a reset.
Note: All bit-slices must have a reset value otherwise an error occurs.
Parameters
----------
default : int, optional
The default reset value to use for bit-slices that do not have a defined reset value (default is None).
If None, an error will be raised for bit-slices without a reset value.
force : bool, optional
If True, applies the reset value to all bit-slices regardless of their writability (default is False).
If False, only writable bit-slices will have their reset values applied.
Returns
-------
None
"""
cache_old = self.__cache__
self._cache = 0
for name, slice in self._slices.items():
valres = slice["res"]
if (valres is None) and (default is None):
self.__cache__ = cache_old
msg = "bit-slice {!r} has no reset value"
raise ValueError(msg.format(name))
elif valres is None:
valres = default
msg = "WARNING: register {!r} [{!r}:{!r}] has no reset value, set to default={!r}"
print(msg.format(name, slice["msb"], slice["lsb"], default))
pass
self._set_slice(name, valres, force)
@property
def __has_reset(self):
"""
Check if all bit-slices of the register have a defined reset value.
Returns
-------
bool
True if all bit-slices have a defined reset value, False otherwise.
"""
for name, slice in self._slices.items():
valres = slice["res"]
if valres is None:
return False
return True
def __slices2attr(self):
"""
Dynamically create properties for each bit-slice defined in the register master excel file, allowing access to the bit-slices as attributes of the Register object.
Returns
-------
new_reg : Register
A new Register object with properties for each bit-slice defined in the register master excel file.
"""
_setattr = object.__setattr__.__get__(self, self.__class__)
dct = {}
for name, slice in self._slices.items():
def getter(self, name=name):
if "R" not in self._slices[name]["dir"].upper():
msg = "can't read slice {!r}, it's not readable"
msg = msg.format(name)
raise ValueError(msg)
return self._get_slice(name)
def setter(obj, value, name=name):
obj._set_slice(name, value)
dct[name] = property(getter, setter, doc=slice["desc"])
dct["__doc__"] = self.__class__.__doc__
SliceRegister = type("Register", (self.__class__,), dct)
new_reg = SliceRegister(self._cpuaddr, self._addr, self._bank, self._name, self._slices, self._desc, self._rm)
object.__setattr__(new_reg, "_reset", self._reset)
return new_reg
@property
def value_bin(self):
"""
Get the binary representation of the register value.
eg. for a 16-bit register with value 0xABCD, the binary representation would be "1010 1011 1100 1101".
Returns
-------
str
Binary representation of the register value, grouped in 4-bit segments.
"""
fmt = "{{:0{}b}}".format(len(self))
groups = self.__grouper(fmt.format(self.value), 4)
return " ".join(groups)
@value_bin.setter
def value_bin(self, value):
self.value = int(value.replace(" ", ""), base=2)
[docs]
def read(self, compare=None, onlycheck=True, tolerance=0, mask=None):
"""
Read from the Register with selected protocol.
Parameters
----------
compare : int or None
compare value, if None than no compare
onlycheck: bool
True (default) result is the compare value 0 or 1
False result is compare value and the read value
tolerance : int
tolerance for compare, default is 0
mask : int or None
mask for compare, default is None (no mask)
Returns
-------
error : int
0 if compare is successful, 1 if compare fails, or the value read from the register if onlycheck is False.
value : int
The value read from the register (only returned if onlycheck is False).
-1 : int
If the protocol is not defined or if there is an error while reading from the hardware.
"""
global mylogger
if self._rm._protocol is None:
mylogger.log_message(LogLevel.Error(), f"{self.__class__.__name__} protocol not defined!!")
return -1
value = self._read()
if not self._rm._protocol.board.error and value is not None:
mylogger.log_message(LogLevel.Measure(), f"{self.__class__.__name__}.{self._name} == {hex(value)}")
self._rm.publish_set(f"{self._name}.read", value)
if compare is not None:
error = common.check(f"{self.__class__.__name__}.{self._name}", compare, value, tolerance, mask)
if onlycheck:
return error
else:
return error, value
return value
[docs]
def write(self, value=None):
"""
Write to the Register with selected protocol.
Parameters
----------
value : int or None
The value to write to the register. If None, the method will attempt to write the cached value of the register.
Returns
-------
result : int
The result of the write operation. Returns -1 if the protocol is not defined.
"""
global mylogger
if self._rm._protocol is None:
mylogger.log_message(LogLevel.Error(), f"{self.__class__.__name__} protocol not defined!!")
return -1
result = self._write(value)
msg = f"{self.__class__.__name__}.{self._name} := "
msg = f"{msg} {hex(result)}" if value is not None else f"{msg} {result}"
mylogger.log_message(LogLevel.Measure(), msg)
self._rm.publish_get(f"{self._name}.write", result)
return result
[docs]
def res(self):
"""
Get the reset value of the register by checking the reset values of all bit-slices defined in the register master excel file.
Returns
-------
valres : int or str
The reset value of the register if all bit-slices have a defined reset value,
or a string indicating that the reset value is not defined if any bit-slice does not have a defined reset value.
"""
for name, slice in self._slices.items():
valres = slice["res"]
return valres
def _read(self, protocol=None):
"""
Read the value of the register from the hardware using the selected protocol,
and update the cache with the read value if the read operation is successful and the register is in atomic mode.
Parameters
----------
protocol : any, optional
The protocol to use for reading the register (default is None, which means using the current protocol defined in the RegisterMaster instance).
Returns
-------
value : int or None
The value read from the register, or None if the read operation failed.
"""
if self._bank is not None and self._bank != "":
self.writebase(self._bank)
value = self._rm._protocol.readreg(self._get_addr())
if not self._rm._protocol.board.error and value is not None:
value &= 2 ** self._len_slices() - 1
self._check_r_err()
if value is not None and value > -1:
self._cache = value
return value
def _write(self, value=None, protocol=None, verify=False):
"""
Write the given value to the register using the selected protocol, or write the cached value if no value is provided.
Parameters
----------
value : int or None
The value to write to the register.
If None, the method will attempt to write the cached value of the register (default is None).
protocol : any, optional
The protocol to use for writing to the register.
(default is None, which means using the current protocol defined in the RegisterMaster instance).
verify : bool, optional
If True, the method will read back the value after writing and verify that it was written correctly (default is False).
Returns
-------
_value : int or None
The value that was written to the register, or None if the write operation failed.
"""
if value is None:
_value = self._cache
else:
_value = self._validate(value, len(self))
if _value is not None:
if self._bank is not None and self._bank != "":
self.writebase(self._bank)
self._rm._protocol.writereg(self._get_addr(), _value)
self._check_w_err()
else:
msg = "nothing to write (neither argument nor _cache value is given)"
raise ValueError(msg)
if self._rm._atomic and _value is None:
self._cache = None
else:
self._cache = _value
return _value
def _check_r_err(self):
"""
Check for errors after reading from the hardware and handle any errors that occur by
raising an exception with an appropriate error message.
"""
if self._rm._protocol.board.error is True:
msg = "can't read from interface - {!r}"
msg = msg.format(self._rm._protocol.board.errortext)
self._rm.handle_exception(msg, ValueError, self._name)
def _check_w_err(self):
"""
Check for errors after writing to the hardware and handle any errors that occur by
raising an exception with an appropriate error message.
"""
if self._rm._protocol.board.error is True:
msg = "can't write to interface - {!r}"
msg = msg.format(self._rm._protocol.board.errortext)
self._rm.handle_exception(msg, ValueError, self._name)
@property
def value_table(self):
"""
Generate a table of the bit-slices of the register, including their names, directions,
values in decimal, hexadecimal, and binary formats, and reset values.
Returns
-------
df : pandas.DataFrame
A DataFrame containing the bit-slice information for the register, with columns for
location, name, direction, decimal value, hexadecimal value, binary value, and reset value.
"""
cache_old = self._cache
self._cache = self.value
slices = []
for name, dct in self._slices.items():
slice = OrderedDict()
slice["name"] = name
if dct["msb"] == dct["lsb"]:
loc = "[{msb}]".format(**dct)
else:
loc = "[{msb}:{lsb}]".format(**dct)
slice["loc"] = loc
length = dct["msb"] - dct["lsb"] + 1
value = self._get_slice(name)
slice["dec"] = str(value)
fmt = "0x{{:0{}x}}".format(length // 4 + (1 if length % 4 else 0))
slice["hex"] = fmt.format(value)
fmt = "{{:0{}b}}".format(length)
groups = self.__grouper(fmt.format(value), 4)
slice["bin"] = "" + " ".join(groups)
valres = dct["res"]
slice["res"] = "-" if valres is None else valres
slice["dir"] = dict(R="R", W="W", RW="RW").get(dct["dir"].upper())
slices.append(slice)
df = pd.DataFrame(slices, columns=["loc", "name", "dir", "dec", "hex", "bin", "res"])
df = df.set_index("loc")
df.index.name = None
self._cache = cache_old
return df
@property
def help(self):
""" Print the documentation for the register. """
print(self.__doc__)
@property
def __doc__(self):
"""
Generate a documentation string for the register, including its description and information about its bit-slices.
Returns
-------
str
A formatted string containing the documentation for the register, including its description and information about its bit-slices such as their names, locations, directions, and reset values.
"""
lines = [self._desc, ""]
width_left = 0
for name, s in self._slices.items():
if s["res"] != 0:
name = name + " (={})".format(s["res"])
width_left = max(width_left, len(name))
if s["msb"] == s["lsb"]:
slc = "[{}]".format(s["msb"])
else:
slc = "[{msb}:{lsb}]".format(**s)
rw = dict(RW="RW", R="R", W="W").get(s["dir"], "")
field = " ".join([slc, rw])
width_left = max(width_left, len(field))
for name, s in self._slices.items():
if s["res"] != 0:
name = name + " (={!r})".format(s["res"])
if s["msb"] == s["lsb"]:
slc = "[{}]".format(s["msb"])
else:
slc = "[{msb}:{lsb}]".format(**s)
rw = dict(RW="RW", R="R", W="W").get(s["dir"], "")
field = " ".join([slc, rw])
left = name, field
right = s["desc"].split("\n")
for le, r in zip_longest(left, right, fillvalue=""):
fmt = "{{:{}}} {{}}".format(width_left)
lines.append(fmt.format(le, r))
lines.append("")
# lines.append('note: direction is RW if not marked as RO or WO')
lines.append("note: reset value is zero if not marked with (=<res>)")
return "\n".join(lines)
def _len_slices(self):
"""
Calculate the total number of bits covered by all bit-slices defined in the register master excel file by summing the lengths of each bit-slice.
eg. if there are two bit-slices defined as [7:4] and [3:0], the total number of bits covered would be (7 - 4 + 1) + (3 - 0 + 1) = 4 + 4 = 8 bits.
Returns
-------
int
The total number of bits covered by all bit-slices defined in the register master excel file.
"""
return sum(s["msb"] - s["lsb"] + 1 for s in self._slices.values())
def __len__(self):
"""
Return the length of the register in bits, which is determined by the total number of bits covered by all bit-slices defined in the register master excel file.
eg. if there are two bit-slices defined as [7:4] and [3:0], the length of the register would be 8 bits.
Returns
-------
int
The length of the register in bits.
"""
return self._rm._len_reg
def __repr__(self):
"""
Return a string representation of the Register object, including its class name and key attributes such as CPU address, register address, bank, and name.
Returns
-------
str
A string representation of the Register object, including its class name and key attributes.
"""
args = ["cpuaddr=0x{:x}".format(self._cpuaddr)]
args.append("addr=0x{:x}".format(self._addr))
args.append("bank={}".format(self._bank))
args.append("name={!r}".format(self._name))
return "{classname}({args})".format(classname=self.__class__.__name__, args=", ".join(args))
@staticmethod
def __grouper(seq, n):
"""
Group a sequence into chunks of a specified size and reverse the order of the groups while keeping the order of characters within each group intact.
eg. grouper('ABCDEFG', 3) --> A BCD EFG
Parameters
----------
seq : str
The sequence to be grouped.
n : int
The size of each group.
Returns
-------
list
A list of grouped sequences.
"""
rseq = seq[::-1]
return [rseq[i : i + n][::-1] for i in range(0, len(seq), n)][::-1]
@staticmethod
def __int2dec(value, length=None):
"""
Convert an integer value to a decimal string representation, grouped in segments of three digits for better readability.
eg. int2dec(1234567) --> "1 234 567
Parameters
----------
value : int
The integer value to be converted to a decimal string representation.
length : int, optional
The number of bits for the register (used for validation, default is None).
Returns
-------
value : str
The decimal string representation of the integer value.
"""
try:
groups = Register.__grouper("{}".format(value), 3)
return " ".join(groups)
except ValueError:
return value
@staticmethod
def __int2hex(value, length):
"""
Convert an integer value to a hexadecimal string representation, grouped in segments of four bits for better readability.
eg. int2hex(12000, 16) --> "0x 2e e0"
int2hex(333, 32) --> "0x 00 00 01 4d"
Parameters
----------
value : int
The integer value to be converted to a hexadecimal string representation.
length : int
The number of bits for the register.
Returns
-------
value : str
The hexadecimal string representation of the integer value.
"""
try:
fmt = "{{:0{}X}}".format(length // 4 + (1 if length % 4 else 0))
groups = Register.__grouper(fmt.format(value), 2)
return "0x " + " ".join(groups)
except ValueError:
return value
@staticmethod
def __int2bin(value, length):
"""
Convert an integer value to a binary string representation, grouped in segments of four bits for better readability.
eg. int2bin(10, 8) --> "0b 0000 1010
int2bin(255, 16) --> "0b 0000 0000 1111 1111
Parameters
----------
value : int
The integer value to be converted to a binary string representation.
length : int
The number of bits for the register.
Returns
-------
value : str
The binary string representation of the integer value.
"""
try:
fmt = "{{:0{}b}}".format(length)
groups = Register.__grouper(fmt.format(value), 4)
return "0b " + " ".join(groups)
except ValueError:
return value
def _calc(self):
"""
Calculate the value of the register and its bit-slices, and generate a visual representation of the register using ipywidgets, including fields for the CPU address, register address, bank, name, reset value, decimal value, hexadecimal value, binary value, and description.
Returns
-------
widgets.VBox
A VBox widget containing the visual representation of the register, including its fields and values.
"""
names = OrderedDict()
names["cpuaddr"] = dict(width=50)
names["addr"] = dict(width=50)
names["bank"] = dict(width=50)
names["name"] = dict(width=100)
names["res"] = dict(width=60)
names["dec"] = dict(width=75)
names["hex"] = dict(width=75)
names["bin"] = dict(width=175)
names["description"] = dict(width=380, margin="0px 0px 10px 10px")
reg_header = []
for name, dct in names.items():
params = dct.copy()
if "width" in params:
params["width"] = "{}px".format(dct["width"])
box = widgets.HTML("<b>{}</b>".format(name), layout=params)
reg_header.append(box)
reg_header = widgets.HBox(reg_header)
reg_hline = widgets.Box(layout=dict(border="1px solid", margin="0px 2px 5px 2px"))
width_max = sum(dct["width"] for dct in names.values())
reg_hline.layout.width = "{}px".format(width_max)
reg_line = []
reg_line.append(widgets.Label("0x{:X}".format(self._cpuaddr)))
reg_line.append(widgets.Label("0x{:X}".format(self._addr)))
reg_line.append(widgets.Label("{}".format(self._bank)))
reg_line.append(widgets.Label(str(self._name)))
freset = widgets.Button(description="Reset", disabled=not self.__has_reset)
reg_line.append(freset)
def on_reset(obj):
self._use_reset()
reg_line.children[4].value = self.__int2dec(self._cache, len(self))
reg_line.children[5].value = self.__int2hex(self._cache, len(self))
reg_line.children[6].value = self.__int2bin(self._cache, len(self))
for (name, dct), line in zip(self._slices.items(), lines):
value = self._get_slice(name)
slen = dct["msb"] - dct["lsb"] + 1
line.children[4].value = self.__int2dec(value, slen)
line.children[5].value = self.__int2hex(value, slen)
line.children[6].value = self.__int2bin(value, slen)
freset.on_click(on_reset)
try:
value = self._cache
except ValueError:
value = ""
length = len(self)
fdec = widgets.Text("", placeholder="0d ...")
fdec.value = self.__int2dec(value, length)
reg_line.append(fdec)
fhex = widgets.Text("", placeholder="0d ...")
fhex.value = self.__int2hex(value, length)
reg_line.append(fhex)
fbin = widgets.Text("", placeholder="0d ...")
fbin.value = self.__int2bin(value, length)
reg_line.append(fbin)
fmt = '<p style="line-height:1.2">{}</p>'
field = widgets.HTML(fmt.format(self._desc.replace("\n", "<br>")))
field.layout.margin = "0px 0px 10px 10px"
reg_line.append(field)
for field, dct in zip(reg_line, names.values()):
field.layout.width = "{}px".format(dct["width"])
reg_line = widgets.HBox(reg_line)
reg_line.layout.margin = "0px 0px 10px 0px"
def dec2other(obj, length=len(self), fdec=fdec, fhex=fhex, fbin=fbin):
"""
Convert the decimal value entered in the text field to other representations (hexadecimal and binary)
and update the corresponding text fields.
Parameters
----------
obj : widget
The widget that triggered the event.
length : int
The number of bits for the register.
fdec : widget
The decimal text field.
fhex : widget
The hexadecimal text field.
fbin : widget
The binary text field.
Returns
-------
None
Raises
------
ValueError
If the input value is not a valid integer.
"""
try:
value = int(fdec.value.replace(" ", ""))
self._cache = value
fdec.value = self.__int2dec(value, length)
fhex.value = self.__int2hex(value, length)
fbin.value = self.__int2bin(value, length)
for name, line in zip(self._slices, lines):
value = self._get_slice(name)
dct = self._slices[name]
slen = dct["msb"] - dct["lsb"] + 1
line.children[4].value = self.__int2dec(value, slen)
line.children[5].value = self.__int2hex(value, slen)
line.children[6].value = self.__int2bin(value, slen)
except ValueError:
fdec.value = self.__int2dec("", length)
fdec.on_submit(dec2other)
def hex2other(obj, length=len(self), fdec=fdec, fhex=fhex, fbin=fbin):
"""
Convert the hexadecimal value entered in the text field to other representations (decimal and binary)
and update the corresponding text fields.
Parameters
----------
obj : widget
The widget that triggered the event.
length : int
The number of bits for the register.
fdec : widget
The decimal text field.
fhex : widget
The hexadecimal text field.
fbin : widget
The binary text field.
Returns
-------
None
Raises
------
ValueError
If the input value is not a valid hexadecimal integer.
"""
try:
value = int(fhex.value.replace(" ", ""), 16)
self._cache = value
fdec.value = self.__int2dec(value, length)
fhex.value = self.__int2hex(value, length)
fbin.value = self.__int2bin(value, length)
for name, line in zip(self._slices, lines):
value = self._get_slice(name)
dct = self._slices[name]
slen = dct["msb"] - dct["lsb"] + 1
line.children[4].value = self.__int2dec(value, slen)
line.children[5].value = self.__int2hex(value, slen)
line.children[6].value = self.__int2bin(value, slen)
except ValueError:
fhex.value = self.__int2hex("", length)
fhex.on_submit(hex2other)
def bin2other(obj, length=len(self), fdec=fdec, fhex=fhex, fbin=fbin):
"""
Convert the binary value entered in the text field to other representations (decimal and hexadecimal)
and update the corresponding text fields.
Parameters
----------
obj : widget
The widget that triggered the event.
length : int
The number of bits for the register.
fdec : widget
The decimal text field.
fhex : widget
The hexadecimal text field.
fbin : widget
The binary text field.
Returns
-------
None
Raises
------
ValueError
If the input value is not a valid binary integer.
"""
try:
value = int(fbin.value.replace(" ", ""), base=2)
self._cache = value
fdec.value = self.__int2dec(value, length)
fhex.value = self.__int2hex(value, length)
fbin.value = self.__int2bin(value, length)
for name, line in zip(self._slices, lines):
value = self._get_slice(name)
dct = self._slices[name]
slen = dct["msb"] - dct["lsb"] + 1
line.children[4].value = self.__int2dec(value, slen)
line.children[5].value = self.__int2hex(value, slen)
line.children[6].value = self.__int2bin(value, slen)
except ValueError:
fbin.value = self.__int2bin("", length)
fbin.on_submit(bin2other)
names = OrderedDict()
names["bits"] = dict(width=50)
names["dir"] = dict(width=50)
names["slice"] = dict(width=100)
names["res"] = dict(width=60)
names["dec"] = dict(width=75)
names["hex"] = dict(width=75)
names["bin"] = dict(width=175)
names["description"] = dict(width=380, margin="0px 0px 10px 10px")
header = []
for name, dct in names.items():
params = dct.copy()
if "width" in params:
params["width"] = "{}px".format(dct["width"])
box = widgets.HTML("<b>{}</b>".format(name), layout=params)
header.append(box)
header = widgets.HBox(header)
hline = widgets.Box(layout=dict(border="1px solid", margin="0px 2px 5px 2px"))
width_max = sum(dct["width"] for dct in names.values())
hline.layout.width = "{}px".format(width_max)
lines = []
for name, dct in self._slices.items():
if dct["msb"] == dct["lsb"]:
loc = "[{msb}]".format(**dct)
else:
loc = "[{msb}:{lsb}]".format(**dct)
ro = not "W" in dct["dir"].upper()
length = dct["msb"] - dct["lsb"] + 1
res = "-" if dct["res"] is None else dct["res"]
try:
value = self._get_slice(name)
except ValueError:
value = ""
line = []
line.append(widgets.Label(loc))
dir = dict(R="R", W="W", RW="RW").get(dct["dir"].upper())
line.append(widgets.Label(dir))
line.append(widgets.Label(name))
line.append(widgets.Label("{}".format(res)))
fdec = widgets.Text("", placeholder="0d ...", disabled=ro)
fdec.value = self.__int2dec(value, length)
line.append(fdec)
fhex = widgets.Text("", placeholder="0x ...", disabled=ro)
fhex.value = self.__int2hex(value, length)
line.append(fhex)
fbin = widgets.Text("", placeholder="0b ...", disabled=ro)
fbin.value = self.__int2bin(value, length)
line.append(fbin)
fmt = '<p style="line-height:1.2">{}</p>'
field = widgets.HTML(fmt.format(dct["desc"].replace("\n", "<br>")))
field.layout.margin = "0px 0px 10px 10px"
line.append(field)
def dec2other(obj, self=self, name=name, length=length, fdec=fdec, fhex=fhex, fbin=fbin):
"""
Convert the decimal value entered in the text field to other representations (hexadecimal and binary)
and update the corresponding text fields for the specific bit-slice.
Parameters
----------
obj : widget
The widget that triggered the event.
self : Register
The Register instance that contains the bit-slice.
name : str
The name of the bit-slice.
length : int
The number of bits for the bit-slice.
fdec : widget
The decimal text field for the bit-slice.
fhex : widget
The hexadecimal text field for the bit-slice.
fbin : widget
The binary text field for the bit-slice.
Returns
-------
None
Raises
------
ValueError
If the input value is not a valid integer.
"""
old = self._get_slice(name)
try:
value = int(fdec.value.replace(" ", ""))
self._set_slice(name, value)
fdec.value = self.__int2dec(value, length)
fhex.value = self.__int2hex(value, length)
fbin.value = self.__int2bin(value, length)
reg_line.children[4].value = self.__int2dec(self._cache, len(self))
reg_line.children[5].value = self.__int2hex(self._cache, len(self))
reg_line.children[6].value = self.__int2bin(self._cache, len(self))
except ValueError:
fdec.value = self.__int2dec("", length)
if not "W" in self._slices[name]["dir"].upper():
fdec.value = self.__int2dec(old, length)
else:
fdec.value = self.__int2dec("", length)
fdec.on_submit(dec2other)
def hex2other(obj, self=self, name=name, length=length, fdec=fdec, fhex=fhex, fbin=fbin):
"""
Convert the hexadecimal value entered in the text field to other representations (decimal and binary)
and update the corresponding text fields for the specific bit-slice.
Parameters
----------
obj : widget
The widget that triggered the event.
self : Register
The Register instance that contains the bit-slice.
name : str
The name of the bit-slice.
length : int
The number of bits for the bit-slice.
fdec : widget
The decimal text field for the bit-slice.
fhex : widget
The hexadecimal text field for the bit-slice.
fbin : widget
The binary text field for the bit-slice.
Returns
-------
None
Raises
------
ValueError
If the input value is not a valid integer.
"""
old = self._get_slice(name)
try:
value = int(fhex.value.replace(" ", ""), 16)
self._set_slice(name, value)
fdec.value = self.__int2dec(value, length)
fhex.value = self.__int2hex(value, length)
fbin.value = self.__int2bin(value, length)
reg_line.children[4].value = self.__int2dec(self._cache, len(self))
reg_line.children[5].value = self.__int2hex(self._cache, len(self))
reg_line.children[6].value = self.__int2bin(self._cache, len(self))
except ValueError:
if not "W" in self._slices[name]["dir"].upper():
fhex.value = self.__int2hex(old, length)
else:
fhex.value = self.__int2hex("", length)
fhex.on_submit(hex2other)
def bin2other(obj, self=self, name=name, length=length, fdec=fdec, fhex=fhex, fbin=fbin):
"""
Convert the binary value entered in the text field to other representations (decimal and hexadecimal)
and update the corresponding text fields for the specific bit-slice.
Parameters
----------
obj : widget
The widget that triggered the event.
self : Register
The Register instance that contains the bit-slice.
name : str
The name of the bit-slice.
length : int
The number of bits for the bit-slice.
fdec : widget
The decimal text field for the bit-slice.
fhex : widget
The hexadecimal text field for the bit-slice.
fbin : widget
The binary text field for the bit-slice.
Returns
-------
None
Raises
------
ValueError
If the input value is not a valid integer.
"""
old = self._get_slice(name)
try:
value = int(fbin.value.replace(" ", ""), base=2)
self._set_slice(name, value)
fdec.value = self.__int2dec(value, length)
fhex.value = self.__int2hex(value, length)
fbin.value = self.__int2bin(value, length)
reg_line.children[4].value = self.__int2dec(self._cache, len(self))
reg_line.children[5].value = self.__int2hex(self._cache, len(self))
reg_line.children[6].value = self.__int2bin(self._cache, len(self))
except ValueError:
if not "W" in self._slices[name]["dir"].upper():
fbin.value = self.__int2bin(old, length)
else:
fbin.value = self.__int2bin("", length)
fbin.on_submit(bin2other)
for field, dct in zip(line, names.values()):
field.layout.width = "{}px".format(dct["width"])
lines.append(widgets.HBox(line))
boxes = [reg_header, reg_hline, reg_line, header, hline] + lines
return widgets.VBox(boxes)
[docs]
class RegisterMaster(mqtt_deviceattributes):
"""
Returns a container object for registers imported from registermaster.
filename: name of registermaster (excel file), default from ENV:registermaster
atomic: enables read-modify-write behavoiur of bit-slices
(reset: False)
interface: hardware object for STI/BiPhase protocol
"""
# read-only attributes
def __setattr__(self, name, value):
"""
Set the value of an attribute for the RegisterMaster instance, while ensuring that only valid attributes
can be set and that any changes to attributes that are part of the MQTT publishing mechanism trigger the appropriate MQTT updates.
Parameters
----------
name : str
The name of the attribute to be set.
value : any
The value to be assigned to the attribute.
Raises
------
AttributeError
If the attribute name is not valid.
"""
valid = (
"__class__",
"_cached",
"_debug",
"_interface",
"_protocol",
"_atomic",
"_forcebank",
"_bank",
"_len_slices",
"register",
"use",
"mqtt_status",
"_mqtt_status",
"_mqttclient",
"mqtt_list",
"mqtt_enable",
"gui",
"mqtt_all",
"filename",
)
if name not in valid:
raise AttributeError("can't set attribute {}".format(repr(name)))
object.__setattr__(self, name, value)
if hasattr(self, "mqtt_all") and name in self.mqtt_all:
self.publish_set(name, value)
[docs]
def __init__(self, logger=None, filename=None, interface=None, instname="regs", read_mod_write=False):
"""
Initialize a RegisterMaster instance, which serves as a container for registers imported from a register master excel file.
The initialization process includes setting up attributes for caching, debugging, hardware interface, protocol, atomic operations, bank management, and MQTT publishing.
Parameters
----------
logger : Logger, optional
A Logger instance for logging messages (default is None, which creates a new Logger).
filename : str, optional
The name of the register master excel file (default is None, which uses the environment variable "registermaster").
interface : object, optional
The hardware interface object for STI/BiPhase protocol (default is None).
instname : str, optional
The instance name for the RegisterMaster (default is "regs").
read_mod_write : bool, optional
Enables read-modify-write behavior for bit-slices (default is False).
"""
global mylogger
self.gui = "pylab_ml.gui.instruments.regs.registermaster"
_setattr = object.__setattr__.__get__(self, self.__class__)
super().__init__()
self.mqtt_all = ["filename", "use"]
filename = os.environ.get("REGISTERMASTER") if filename is None else filename
mylogger = logger if logger is not None else Logger()
_setattr("instName", instname)
_setattr("filename", filename)
_setattr("_cached", False)
_setattr("_debug", False)
_setattr("_buffer", [])
_setattr("_regs", {})
_setattr("_cpuregs", {})
_setattr("_protocol_typ", None)
_setattr("_interface", interface)
_setattr("_protocol", None)
_setattr("_forcebank", True)
_setattr("_bank", -1)
_setattr("_atomic", read_mod_write)
_setattr("_len_reg", 0)
def __repr__(self):
"""
Return a string representation of the RegisterMaster instance, including its class name and key attributes such as the
filename, hardware interface, and read-modify-write behavior.
Returns
-------
str
A string representation of the RegisterMaster instance, including its class name and key attributes.
"""
args = ["{!r}".format(self.filename)]
if self._interface is not None:
args.append("interface={!r}".format(self._interface))
args.append("read_mod_write={!r}".format(self._atomic))
return "{classname}({args})".format(classname=self.__class__.__name__, args=", ".join(args))
[docs]
def init(self):
"""
Initialize the RegisterMaster instance by loading the register definitions from the specified register master excel file,
building the register database, and creating Register objects for each register defined in the database.
"""
try:
from labml_adjutancy.ctrl.labctrl import mqttc
except Exception:
mqttc = None
filename = self.filename
if filename is None:
print(f"{__class__}: no filename defined")
return
# raise IOError(f"{__class__}: no filename defined")
print(f' {self.instName}.init: self._mqttclient = {self._mqttclient}')
if self._mqttclient is None and mqttc is not None:
self.mqtt_add(mqttc, self)
blocked_regs = tuple(self.__class__.__dict__)
blocked_slices = tuple(Register.__dict__)
db = RegDB(filename)
db.build_database()
for item in db.database[0]["registers"]:
slices = OrderedDict()
for bsl in item["slices"]:
name = bsl["bsn"]
if name in blocked_slices:
msg = "WARNING: can't use name {!r} for slice\n" "Change slice name {!r} (line {!r}) in file {!r}"
msg = msg.format(name, name, bsl["id"], filename)
print(msg)
elif not isinstance(bsl["posmin"], int):
msg = "WARNING: bit-slice {!r} in register {!r} has "
msg += "invalid lsb: {!r}"
print(msg.format(name, item["blk"], bsl["posmin"]))
elif not isinstance(bsl["posmax"], int):
msg = "WARNING: bit-slice {!r} in register {!r} has "
msg += "invalid msb: {!r}"
print(msg.format(name, item["blk"], bsl["posmax"]))
else:
try:
valres = common.str2num(bsl["res"])
except ValueError:
valres = None
slices[name] = dict(
lsb=bsl["posmin"],
msb=bsl["posmax"],
res=valres,
dir=bsl["dir"],
desc=bsl["des"],
)
name = item["blk"]
if name in blocked_regs:
msg = "WARNING: can't use name {!r} for register\n" "Change register name {!} in file {!r}"
msg = msg.format(name, name, filename)
print(msg)
elif item.get("adr") is None and item.get("prgadr") is None:
msg = "WARNING: register {!r} has invalid address: {!r}"
# print(msg.format(name, item['adr']))
print(msg.format(name, item))
# elif item.get ('bank'): # to avoid Keyerror if 'bank' not exist --> not necessary anymore
# msg = 'WARNING: register {!r} has invalid bank: {!r}'
# print(msg.format(name, item['bank']))
else:
addr = -1
cpuaddr = -1
if item.get("adr") is not None and item["adr"] != "":
addr = item["adr"]
elif item.get("prgadr") is not None and item["prgadr"] != "":
addr = item["prgadr"]
if item.get("cpuadr") is not None and item["cpuadr"] != "":
cpuaddr = item["cpuadr"]
if name != "":
reg = Register(
cpuaddr=cpuaddr,
addr=addr,
bank=item.get("bank"),
name=name,
description=item["des"],
slices=slices,
rm=self,
)
reg = reg._Register__slices2attr()
if not isinstance(reg._addr, int):
msg = f"ERROR: {name} addr is not a integer! addr= {addr}"
print(msg)
continue
if reg._addr >= 0:
self._regs.setdefault(reg._addr, []).append(reg)
if reg._cpuaddr >= 0:
self._cpuregs.setdefault(reg._cpuaddr, []).append(reg)
object.__setattr__(self, item["blk"], reg)
len_max = max(reg._len_slices() for reg in self)
object.__setattr__(self, "_len_reg", len_max)
for reg in self:
if reg._len_slices() < len_max:
msg = "WARNING: misses some slices ({} {}) in register {!r}"
num = len_max - reg._len_slices()
unit = "bits" if num > 1 else "bit"
msg = msg.format(num, unit, reg._name)
try:
self.use = "tin"
except AttributeError:
pass
object.__setattr__(self, "mapping", {})
object.__setattr__(self, "_len_slices", None)
object.__setattr__(self, "banks", {})
self.register = self.__dict__.copy() # create attribute register with all registernames
for reg in self.__dict__:
if reg != "register" and not hasattr(self.register[reg], "_Register__has_reset"):
self.register.pop(reg, None)
elif reg != "register":
bank = self.register[reg]._bank
if bank is not None and (bank not in self.banks or self.banks[bank] < self.register[reg].addr):
self.banks[bank] = self.register[reg].addr
if bank is not None and bank != "":
self.mapping[bank] = self.register[reg]._len_slices()
self.mqtt_list = list(self.register) + self.mqtt_all
return self
def __iter__(self):
"""
Iterate over the registers in the RegisterMaster instance, yielding each register object one by one.
The iteration order depends on the protocol type being used (either "tin" or another protocol),
and the method ensures that all registers are included in the iteration regardless of their organization in the internal data structures.
"""
global mylogger
if self._protocol_typ == "tin":
for regs in self._cpuregs.values():
for reg in regs:
yield reg
else:
if not bool(self._regs.values()):
mylogger.log_message(LogLevel.Error(), "\n####################################################")
mylogger.log_message(LogLevel.Error(), 'ERROR: No "addr" has been defined in register master')
mylogger.log_message(LogLevel.Error(), "####################################################\n")
for regs in self._regs.values():
for reg in regs:
yield reg
def __len__(self):
"""
Return the total number of registers in the RegisterMaster instance, which is calculated based on the protocol type being used.
If the protocol type is "tin", the method counts the number of registers in the "_cpuregs" data structure, while for other protocol types,
it counts the registers in the "_regs" data structure.
Returns
-------
int
The total number of registers in the RegisterMaster instance.
"""
if self._protocol_typ == "tin":
return sum(len(regs) for regs in self._cpuregs.values())
else:
return sum(len(regs) for regs in self._regs.values())
def _mqtt2json(self, value, attr=None):
"""
Convert the value of an attribute to a JSON-compatible format for MQTT publishing,
while ensuring that only attributes that are part of the MQTT publishing mechanism are included in the conversion.
Parameters
----------
value : any
The value of the attribute to be converted to JSON format.
attr : str, optional
The name of the attribute being converted (default is None).
Returns
-------
any
The value of the attribute in a JSON-compatible format if the attribute is part of the MQTT
publishing mechanism, or the string "nomqtt" if the attribute is not part of the MQTT publishing mechanism.
"""
if attr in self.mqtt_all:
return value
return "nomqtt"
def _register_to_list(patterns: Union[str, Iterable[str]]) ->List[str]:
"""
Convert a string or an iterable of strings into a list of strings.
If the input is a single string, it will be wrapped in a list.
If the input is already an iterable of strings, it will be converted to a list.
Parameters
----------
patterns : str or Iterable[str]
A string or an iterable of strings to be converted into a list of strings.
Returns
-------
List[str]
A list of strings derived from the input patterns.
If the input is a single string, the output will be a list containing that string.
If the input is an iterable of strings, the output will be a list containing all the strings from the iterable.
"""
if isinstance(patterns, str):
return[patterns]
return list(patterns)
def close(self):
pass
[docs]
def find(self, addr):
"""
Find the name of a register based on its address, depending on the protocol type being used.
Parameters
----------
addr : int
The address of the register to be found.
Returns
-------
str or None
The name of the register if found, or None if no register with the specified address exists.
"""
try:
if self._protocol_typ == "tin":
return self._cpuregs[addr][0]
else:
return self._regs[addr][0]._name
except Exception:
return None
@property
def use(self):
"""
Get the current protocol type.
eg. self.regs.use returns "tin" or "spi" or "biphase" depending on the protocol type being used.
Returns
-------
str
The current protocol type.
"""
return self._protocol_typ
@use.setter
def use(self, typ):
"""
Set/Get interface to 'typ'.
typ must be defined in the interface
"""
interface = getattr(self._interface, typ)
object.__setattr__(self, "_protocol", interface)
object.__setattr__(self, "_protocol_typ", typ)
object.__setattr__(self, "_bank", -1)
if hasattr(self._protocol, "init"):
self._protocol.init()
[docs]
def set_bank(self, adr):
"""
Set the bank from a giving adr and recalculate adr.
Parameters
----------
adr : int
address.
Returns
-------
adr : int
address without bank.
"""
if self.banks:
for bank in self.banks.keys():
if adr <= self.banks[bank]:
self._bank = bank
break
if self.mapping != {}:
mybank = 0
for bank in self.mapping:
mybank = bank if adr >= bank else mybank
# self._bank = mybank
self._len_slices = self.mapping[mybank]
# adr = adr - mybank # TODO: cj 7.2.2026 das ist doch falsch, check it with a real project
else:
self._bank = None
self._len_slices = None
return adr
def _call_from_string(self, adr, callstr):
"""
Call a function for all registers matching the given address pattern, using a string representation of the function call.
Parameters
----------
adr : str
The address pattern to match registers against, which can include wildcards for flexible matching.
callstr : str
A string representation of the function call to be executed for each matching register,
including the function name and its arguments in a format that can be parsed and evaluated.
Returns
-------
result : list or int
The result of the function calls for each matching register, aggregated based on the type of the input data.
"""
result = []
regs = self._register_to_list()
call = ast.parse(callstr, mode='eval').body
args = [ast.literal_eval(a) for a in call.args]
kwargs = {kw.arg: ast.literal_eval(kw.value) for kw in call.keywords}
index = 0
dat = args[0]
result = 0 if type(dat) is list else []
for reg in regs:
if fnmatch.fnmatchcase(reg._name, adr):
function = getattr(getattr(self, reg._name), call.func.id)
if type(dat) is list:
args[0] = dat[index]
resu = function(*args, **kwargs)
if type(dat) is list:
result += resu
else:
result.append(resu)
result.append(function(*args, **kwargs))
index += 1
return result
[docs]
def readreg(self, adr, bank=None, compare=None, onlycheck=True, tolerance=0, mask=None):
"""
Read Register with selected protocol.
Parameters
----------
adr : int or str or list
The address or addresses of the register(s) to read.
bank : int, optional
The bank address, if applicable.
compare : int or list, optional
The value(s) to compare against, if any.
onlycheck : bool, optional
If True, only perform the check without returning the value.
tolerance : int, optional
The tolerance for the comparison.
mask : int, optional
The mask to apply to the value before comparison.
Returns
-------
int or list or tuple
The result of the read operation, which can be an integer value, a list of values, or a tuple
containing the comparison result and the value, depending on the parameters provided.
"""
if type(adr) is str:
if len(adr) == 0:
mylogger.log_message(LogLevel.Error(), 'readreg: adr is empty')
return
elif adr[0].isdigit():
adr = list(common.arange(adr))
else:
return self._call_from_string(adr, f"read({compare}, {onlycheck}, {tolerance}, {mask})")
if type(adr) is list:
result = [] if compare is None else 0
index = 0
for a in adr:
comp = compare[index] if type(compare) is list else compare
resu = self._readreg(a, bank, comp, onlycheck, tolerance, mask)
if compare is None:
result.append(resu)
else:
result += resu
index += 1
return result
return self._readreg(adr, bank, compare, onlycheck, tolerance, mask)
def _readreg(self, adr, bank=None, compare=None, onlycheck=True, tolerance=0, mask=None):
"""
Read Register with selected protokoll.
Parameters
----------
adr : int
address.
bank : int
bank adress
compare : int
compare value, if None than no compare
onlycheck : bool
if True, only perform the check without returning the value.
tolerance : int
tolerance for the comparison.
mask : int
mask to apply to the value before comparison.
Returns
-------
value : int
The value read from the register, potentially after applying a mask and comparison, depending on the parameters provided.
error : int
The error code resulting from the comparison, if applicable.
check : int
If compare is defined, this is the compare result:
0 = ok
1 = error
"""
hadr = adr
if bank is not None:
self._bank = bank
else:
adr = self.set_bank(adr)
if self._bank is not None and self._bank != "":
self._protocol.writebase(self._bank)
value = self._protocol.readreg(adr, compare=compare, tolerance=tolerance, mask=mask)
mylogger.log_message(LogLevel.Measure(), f"readreg(0x{hadr:02x}) == {hex(value)}")
self.publish_set(f"readreg(0x{hadr:02x})", value)
if self._len_slices is not None:
value &= 2**self._len_slices - 1
if compare is not None:
bank = 0 if self._bank is None else self._bank
if value is not None:
error = common.check(f"{hex(bank+adr)}", compare, value, tolerance, mask)
if onlycheck and value is not None:
return error
elif onlycheck:
return 0
else:
return error, value
return value
[docs]
def writereg(self, adr, dat, bank=None):
"""
Write Register with selected protocol.
Parameters
----------
adr : int or str or list
The address or addresses of the register(s) to write to.
dat : int or list
The data to write to the register(s).
bank : int, optional
The bank address. If None, the current bank is used.
Returns
-------
None.
"""
if type(adr) is str:
if len(adr) == 0:
mylogger.log_message(LogLevel.Error(), 'writereg: adr is empty')
return
elif adr[0].isdigit():
adr = list(common.arange(adr))
else:
return self._call_from_string(adr, f"write({dat})")
if type(adr) is list:
index = 0
for a in adr:
d = dat[index] if type(dat) is list else dat
self._writereg(a, d, bank)
index += 1
return
self._writereg(adr, dat, bank)
def _writereg(self, adr, dat, bank=None):
"""
Write Register with selected protokoll.
Parameters
----------
adr : int
address.
dat : int
data.
bank : int
bank adress
Returns
-------
None.
"""
hadr = adr
if bank is not None:
self._bank = bank
else:
adr = self.set_bank(adr)
if self._bank is not None and self._bank != "":
self._protocol.writebase(self._bank)
self._protocol.writereg(adr, dat)
msg = f"writereg(0x{hadr:02x}) := {hex(dat)}"
mylogger.log_message(LogLevel.Measure(), msg)
self.publish_set(f"writereg({hadr})", dat)
[docs]
def reset(self):
""" Reset the RegisterMaster instance by resetting the protocol and clearing the cache for all registers. """
self._protocol.reset()
object.__setattr__(self, "_bank", -1)
for reg in self:
object.__setattr__(reg, "__cache__", None)
[docs]
def reset_internal(self):
""" Reset the internal state of the RegisterMaster instance by resetting the protocol's internal state and clearing the cache for all registers. """
object.__setattr__(self, "_bank", -1)
try:
self._protocol.reset_internal()
except AttributeError:
msg = "{!r} protocol has no 'reset_internal'."
raise AttributeError(msg.format(self._protocol.__class__.__name__))
for reg in self:
object.__setattr__(reg, "__cache__", None)
[docs]
def reset_regs(self, default=None, force=False):
"""
Reset the slices to the reset value
Parameters
----------
default : int, optional
The value to set the register to. If None, default values 0 will be used.
force : bool, optional
False -> Only a writable slice sees a reset.
True -> Readable and writable slice sees a reset.
Returns
-------
None.
"""
global mylogger
for regname in self:
try:
# print (regname)
for slices in regname._slices:
if "W" in regname._slices[slices]["dir"] or force:
regname._use_reset(default, force)
except Exception:
mylogger.log_message(LogLevel.Warning(), f"Error in {regname._name} with resetvalues -> Check your Registermaster")
pass
mylogger.log_message(LogLevel.Info(), "Reset register Cache to resetvalues")
[docs]
def handle_exception(self, msg, typ=None, name=None):
"""
Handle exceptions by logging an error message using the logger instance, and optionally raising a specified exception type with the provided message.
Parameters
----------
msg : str
The error message to be logged and potentially included in the raised exception.
typ : Exception class, optional
The type of exception to be raised. If None, no exception is raised.
name : str, optional
The name associated with the error message.
Returns
-------
None.
"""
global mylogger
if mylogger is not None:
if name is None:
mylogger.log_message(LogLevel.Error(), msg)
else:
mylogger.log_message(LogLevel.Error(), f'{name}: {msg}')
elif typ is not None:
raise typ(msg)
else:
ValueError(msg)
[docs]
def set_configuration_values(self, data):
"""
Only empty dummy function.
Parameters
----------
data : dict
A dictionary containing configuration values to be set for the RegisterMaster instance.
Returns
-------
None.
"""
global mylogger
mylogger.log_message(LogLevel.Warning(), "pylab_ml.RegisterMaster: set_configuration_values only dummy function.....")
pass
[docs]
def apply_configuration(self, data):
"""
Call from the Plugin if the modul placed in the hardwaresetups.
Parameters
----------
data : dict
A dictionary containing configuration values to be applied to the RegisterMaster instance, such as the filename.
Returns
-------
None.
"""
_setattr = object.__setattr__.__get__(self, self.__class__)
config = environment.replaceEnvs(data)
filename = config["filename"] if "filename" in config and config["filename"] != "" else self.filename
filename = environment.checkNetworkPath(filename)
if filename is not None and filename.find("..") == 0 and "WORKING_DIR" in os.environ:
filename = filename.replace("..", os.environ["WORKING_DIR"])
_setattr("filename", filename)
instname = config["instance name"] if "instance name" in config and config["instance name"] != "" else self.instName
_setattr("instName", instname)
read_mod_write = config["read mod write"] if "read mod write" in config and config["read mod write"] != "" else self._atomic
_setattr("_atomic", read_mod_write)
forcebank = config["force bank"] if "force bank" in config and config["force bank"] != "" else self._forcebank
_setattr("_forcebank", forcebank)
resetvalue = config["reset value"] if "reset value" in config and config["reset value"] != "" else 0
self.init()
# self.reset_regs(resetvalue) # TODO: definition interface is missing
if __name__ == "__main__":
filename = "your_registermaster.xls"
regs = RegisterMaster(filename=filename)
regs.init()