# -*- coding: utf-8 -*-
"""
Created on Tue May 4 14:18:05 2021
"""
import numpy as np
import ast
import copy
import psutil
from pylab_ml.common.data import str2num
mylogger = None
[docs]
def set_logger(logger):
global mylogger
mylogger = logger
[docs]
def kill_proc_tree(pid=None, instance=None, including_parent=False):
if pid is None and instance is None:
return
if pid is None and instance is not None:
pid = instance.pid
parent = psutil.Process(pid)
children = parent.children(recursive=True)
for child in children:
child.kill()
gone, still_alive = psutil.wait_procs(children, timeout=5)
if including_parent:
parent.kill()
parent.wait(5)
if instance is not None:
instance.terminate()
del (instance)
[docs]
def multistrcall(self, commandlist):
# if type(command) == str:
# command = command.split(',')
# result = []
for cmd in commandlist:
strcall(self, cmd, commandlist[cmd][0])
[docs]
def strcall(self, command, value=None, typ=None, mqttcheck=False):
"""
make a call from the command:
result = common.strcall(tcc, 'regs.HW_ID.read()')
result = common.strcall(self, 'smu.voltage=5.3' , mqttcheck=True)'
result = common.strcall(self, 'smu.blabla()', 4.7, mqttcheck=True)'
needs the the logger from self
Parameters
----------
command : string
value : int/float/list, optional
DESCRIPTION. The default is None.
typ : None/'set'/'get'
DESCRIPTION. If the command an mqtt-command, than you have coice if it is settable or gettable
mqttcheck : True/False, optional
DESCRIPTION. The default is False. Check if the command is in the mqtt_list. If not, the command will not be execute
Returns
-------
myresult : the result from the call
"""
if self is None:
print("strcall: initialise missing, self is None -> do nothing")
return
parent = self
instname = self.instName + "." if hasattr(self, "instName") else ""
mqttcmd = command
if mqttcmd.find("(") > 0:
mqttcmd = mqttcmd[: mqttcmd.find("(") + 1] + ")"
typ = "func"
elif typ is None and mqttcmd.find(':') < 0:
typ = "get"
if mqttcheck and mqttcmd not in ["mqtt_status"] + parent.mqtt_list:
instname = parent.instName if hasattr(self, "instName") else parent
print(f" Warning strcall: {instname} get {mqttcmd}, but it is not in the mqtt_list")
return "ERROR"
if typ == "set":
value = tuple(value) if type(value) is list else str2num(value)
evalue = f'"{value}"' if isinstance(value, str) else value
try:
exec(f"self.{command} = {evalue}")
print(f" self.{instname}{command} := {value}")
return None
except Exception as ex:
print(f" strcall error: 'self.{instname}{command} := {value}' get an exception: {ex}")
return "ERROR"
elif typ == "get":
try:
value = eval(f"self.{command}")
print(f" self.{instname}{command} == {value}")
return value
except Exception as ex:
print(f" strcall get error: 'self.{instname}{command}' get an exception: {ex}")
return "ERROR"
elif typ == "func":
command = command[: command.find("(")]
para = ""
if type(value) is list:
for val in value:
para += f"'{val}',"
para = para[:-1]
elif type(value) is str and value != "":
para = f"'{value}'"
elif value is not None:
para = value
try:
result = eval(f"self.{command}({para})")
print(f" self.{instname}{command}({para})")
return result
except Exception as ex:
print(f" strcall error func: self.{instname}{command}({para}), get an exception: {ex}")
return 'ERROR'
[docs]
def convertExpr2Expression(Expr):
Expr.lineno = 0
Expr.col_offset = 0
result = ast.Expression(Expr.value, lineno=0, col_offset=0)
return result
[docs]
def exec_with_return(code, parent=None):
"""
Exec with return
implement from https://stackoverflow.com/questions/33409207/how-to-return-value-from-exec-in-function
Parameters
----------
code : TYPE
DESCRIPTION.
parent : TYPE, optional
DESCRIPTION. The default is None.
Returns
-------
TYPE
DESCRIPTION.
"""
code_ast = ast.parse(code)
init_ast = copy.deepcopy(code_ast)
init_ast.body = code_ast.body[:-1]
last_ast = copy.deepcopy(code_ast)
last_ast.body = code_ast.body[-1:]
exec(compile(init_ast, "<ast>", "exec"), globals())
if type(last_ast.body[0]) is ast.Expr:
return eval(compile(convertExpr2Expression(last_ast.body[0]), "<ast>", "eval"), globals(), {"self": parent})
else:
exec(compile(last_ast, "<ast>", "exec"), globals(), {"self": parent})
[docs]
def arange(myitems):
"""create from 'myitems' an array like in matlab.
e.q. "18:-1:6:"
"18:-1:6, 5.9:-0.1:4.1"
"18:-1:6, 5.9:-0.1:4.1, 7, 9"
also possible: "18:6:-1"
"""
result = None
if myitems.find(",") > 0:
delimiter = ","
elif myitems.find(";") > 0:
delimiter = ";"
else:
delimiter = " "
for item in myitems.split(delimiter):
if item.find(":"):
split = item.split(":")
start = str2num(split[0])
if len(split) == 1:
myresult = start
else:
if len(split) == 2:
inc = 1
stop = str2num(split[1])
else:
stop = str2num(split[2])
inc = 1 if len(split) < 3 else str2num(split[1])
if inc > stop:
temp = inc
inc = stop
stop = temp
if len(split) == 2 and start > stop:
inc = -1
try:
correctur = inc # to calculate like matlab 3:6:1 -> 3,4,5,6 normaly in python 3,4,5
myresult = np.arange(start, stop + correctur, inc)
except Exception:
myresult = f"Syntax error in {item}"
else:
myresult = item
result = myresult if result is None else np.append(result, myresult)
return result
[docs]
def choice(arg, myargs):
"""
check if arg in myargs
Parameters
----------
arg : string
DESCRIPTION.
myargs : array of strings
DESCRIPTION.
Returns
-------
bool
True/False
"""
if arg not in myargs:
print(f"checkargs: attribute {arg} not valid")
return False
return True
[docs]
def checkargs(myargs, **kwargs):
for arg in kwargs:
if arg not in myargs:
print(f"checkargs: attribute {arg} not valid")
[docs]
def check(msg, target, actual, tolerance=0, mask=None):
"""
Compare target with the acutal value.
Parameters
----------
msg : TYPE
DESCRIPTION.
target : TYPE
the target value
if str and start with 0x than each X is a 4bit mask
if str and start with 0b than each x is a 1bit mask
actual : TYPE
the actual value.
tolerance : float or integer, optional
DESCRIPTION. The default is 0.
mask : Integer or None(default)
Returns
-------
error : bool
True : target = actual value
False : target != actual value.
"""
error = 0
if type(target) is str and len(target) > 3:
if target[1] == "x": # it is a hex number with mask information
target = target.replace("_", "", len(target))
mask = 0
for index in range(2, len(target)):
mask = (mask << 4) + (15 if target[index] != "X" else 0)
target = str2num(target.replace("X", "0", len(target)))
elif target[1] == "b": # it is a bin number with mask information
target = target.replace("_", "", len(target))
mask = 0
for index in range(2, len(target)):
mask = (mask << 1) + (1 if target[index] != "x" else 0)
target = str2num(target.replace("x", "0", len(target)))
if type(actual) is not type(target):
error = 1
msg = f"{msg} different type: target= {target}({type(target)}) <-> actual= {actual}({type(actual)}) -> couldn't check')"
elif type(actual) is list:
if len(target) != len(actual):
logprint("WARNING", f": check len() from target list (={len(target)}) <-> actual list ({len(actual)}), are different!!!!")
for index in range(0, len(actual)):
if actual[index] != target[index]:
logprint("ERROR", f"Wrong value at adr 0x{index:2x}, read 0x{actual[index]:x}, expected 0x{target[index]:x}")
error += 1
msg = f"{msg}: {len(actual)} Word checked ->"
if error == 0:
msg = f"{msg} OK"
else:
msg = f"{msg} {error} Errors"
elif type(actual) is int:
if mask is not None and (actual & mask) != (target & mask):
error = 1
msg = f"{msg} target: 0x{target&mask:x} != actual: 0x{actual&mask:x}"
elif mask is None and not (target - tolerance <= actual <= target + tolerance): # actual != target:
error = 1
msg = f"{msg} target: 0x{target:x} != actual: 0x{actual:x}" if tolerance == 0 \
else f"{msg}: 0x{target:x} +- 0x{tolerance:x} <> 0x{actual:x}"
else:
msg = f"{msg} == 0x{actual:2x}"
elif (type(actual) is str) or (type(actual) is bool):
if target != actual:
error = 1
msg = f"{msg} target: {target} != actual: {actual}"
else:
msg = "{msg} = {actual}"
elif type(actual) is float: # float checked with tolerance
if (target < (actual - tolerance)) or (target > (actual + tolerance)):
error = 1
msg = f"{msg}: expected {target} +- {tolerance} <>{actual}"
else:
msg = f"{msg}: expected {target} +- {tolerance} == {actual}"
else:
logprint("ERROR", "pylab_ml.common.check: type {type(actual)} not yet implant !")
error = 1
if error > 0:
logprint("ERROR", msg)
else:
logprint("MEASURE", msg)
return error
[docs]
def color(n, s):
code = {
"bold": 1,
"faint": 2,
"italic": 3,
"underline": 4,
"blink_slow": 5,
"blink_fast": 6,
"negative": 7,
"conceal": 8,
"strike_th": 9,
"ack": 30,
"red": 31,
"green": 32,
"yellow": 33,
"blue": 34,
"magenda": 35,
"cyan": 36,
"white": 37,
"black": 40,
"b_red": 41,
"b_green": 42,
"b_yellow": 43,
"b_blue": 44,
"b_magenda": 45,
"b_cyan": 46,
"b_white": 47,
}
try:
num = str(code[n])
value = "\033[" + num + "m" + s + "\033[0m"
return value
except Exception:
pass
[docs]
def logprint(level, msg):
if mylogger is None:
print(f"{level}: {msg}")
else:
if level == "DEBUG":
mylogger.debug(msg)
elif level == "MEASURE":
mylogger.measure(msg)
elif level == "INFO":
mylogger.info(msg)
elif level == "WARNING":
mylogger.warning(msg)
elif level == "ERROR":
mylogger.error(msg)
if __name__ == "__main__":
class test:
mqtt_list = [
"CH0.connect()",
"CH0.drv.vdl",
"CH0.drv.vdh",
"CH0.disconnect()",
"CH1.connect()",
"CH1.drv.vdl",
"CH1.drv.vdh",
"CH1.disconnect()",
"CH2.connect()",
]
def strcall_test(self):
strcall(
self,
'CH0.disconnect("PBUS_F")',
value=None,
typ=None,
mqttcheck=True,
)
print(arange("18:5:-1"))
print(arange("6:4:-1"))
print(arange("6:4:-0.1"))
print(arange("-6:-4:0.1"))
print(arange("18:5:-1, 5.9:4.1:-0.1, 4:-18:-1, -18:5:1, 4.1:5.9:0.1, 6:19:1"))
print(arange("18:5:-1, 5.9:4.1:-0.1, 5, 8"))
print(arange("18:5:-1, 4, 8"))
print(arange("18:5:-1 6, 9"))
print(arange("18:5"))
print(arange("5:5"))