Source code for pylab_ml.common.mqtt_client

# -*- coding: utf-8 -*-
"""
Created on Tue Feb 18 09:52:09 2020

@author: Zlin526F

Bugs:
    wenn device ohne close in spyder beendet wird, so ist
    thread mqtt_deviceattributes._mqtt_message weiterhin aktiv

todo:

"""

import time
import platform
import json
from abc import abstractmethod
import paho.mqtt.client as mqttc
import socket
from .singleton import Singleton
from PyQt5 import QtCore
from . import common

__author__ = "Zlin526F"
__copyright__ = "Copyright 2023, Lab"
__credits__ = ["Zlin526F"]
__email__ = "Zlin526F@github"
__version__ = "0.0.1"


TOPIC_PREFIX = "ate"
TOPIC_CONTROL = "semictrl"
TOPIC_INSTRUMENT = "instruments"
TOPIC_INSTNAME = "tcc"


[docs] class mylogger(object):
[docs] def __init__(self, output=None, enable=False, parent=None): self.output = output self.enable = enable self.parent = parent
def debug(self, msg): if not self.enable: return self.display("DEBUG", msg) def info(self, msg): if not self.enable: return self.display("INFO", msg) def measure(self, msg): if not self.enable: return self.display("MEASURE", msg) def warning(self, msg): self.display("WARNING", msg) def error(self, msg): self.display("ERROR", msg) def display(self, typ, msg): msg = f"{self.parent} {typ} | {msg}" if self.output is None: print(msg) else: self.output.append(msg) def log_message(self, level, msg): self.display(level, msg)
[docs] class mqtt_init(object, metaclass=Singleton): """Initialise the mqtt connection and provide functions for the communication."""
[docs] def __init__(self, typ="control", logger=None): """Initialise the class mqtt_init. typ = 'instrument' for instruments = 'control' for guis or extern controlling (default) """ def on_connect(client, userdata, flags, rc): if rc == 0: msg = f"MQTT Broker {self.broker}, user {self.username} connected" self.logger.info(msg) elif rc == 5: msg = f"MQTT Broker {self.broker}, user {self.username} authentication error" self.logger.error(msg) else: msg = f"MQTT Broker {self.broker}, user {self.username} connection failed ({mqttc.connack_string(rc)})" self.logger.error(msg) def on_disconnect(client, userdata, rc): msg = "MQTT Broker {} user {} disconnected".format( self.broker, self.username ) if rc > 0: msg += ", {}".format(mqttc.connack_string(rc)) print(msg) self.typ = typ print(f'create mqtt_init as Singleton typ={typ}') self.broker = "" self.username = "" self.qos = 0 self.retain = False if logger is not None: self.logger = logger else: self.logger = mylogger(parent=self.typ) # enable=True self.mqttreceive = mqtt_signal() client = mqttc.Client(clean_session=True) client.enable_logger(logger=None) client.on_connect = on_connect client.on_disconnect = on_disconnect self.client = client self.instruments = {}
[docs] def init( self, broker="127.0.0.1", port=1883, message_client=None, username="", userpasswd="", qos=0, retain=False, ): """ Normally call from plugin semi-control, you have not to call this function, if you use semi-control. message_client = None if client the instrument, like e.q. smu, digital-multimeter or thermostreamer. = e.q. 'DT1604092' if control and you want connect from an extern computer to DT1604092 (but is not checked until now!!) """ import socket self.qos = qos self.retain = retain self.username = username self.hostname = socket.gethostname() if broker.find(".") > 0: self.broker = broker else: self.broker = socket.gethostname(broker) if username != "": self.client.username_pw_set(username, password=userpasswd) try: self.client.connect( self.broker, port, 60 ) # blocking until connection establish or timeout except Exception: self.logger.error( "ERROR: MQTT couldn't connect to broker {}".format(self.broker) ) self.topic = message_client return False self.client.loop_start() if message_client is None: message_client = f"{TOPIC_PREFIX}" self.client.subscribe(f"{message_client}/#", qos=qos) self.logger.info( f"{self.__class__.__name__}.init: subscribe for {message_client}/#" ) self.topic = message_client self.instruments = {} return True
[docs] def mqtt_add(self, instrument, hostname=None): """ Add first callback to get mqtt-commands from control, and append instrument to instruments-list. Returns: None. """ if not hasattr(self, "hostname"): return if hostname is None: hostname = self.hostname if self.instruments == {}: if hostname == "": callback = f"{TOPIC_PREFIX}/#" else: callback = f"{TOPIC_PREFIX}/{hostname}/{TOPIC_CONTROL}/#" self.client.message_callback_add(callback, self._mqtt_message) self.logger.debug( f"{self.__class__.__name__}.mqtt_add: watching for {callback}" ) self.instruments[instrument.instName] = instrument self.logger.debug( f"{self.__class__.__name__}.mqtt_add: add {instrument.instName} to instruments-list" )
[docs] def mqtt_disconnect(self, instrument=None, hostname=None): """ Remove instrument from the instruments-list, if instruments-list is empty than remove callback. Returns: None. """ if hostname is None: if not hasattr(self, "hostname"): return hostname = self.hostname if instrument is not None and instrument.instName in self.instruments.keys(): self.instruments.pop(instrument.instName) if self.instruments == {}: callback = f"{TOPIC_PREFIX}/{self.hostname}/{TOPIC_CONTROL}/#" self.client.message_callback_remove(callback) # remove subscribe self.logger.debug( f"{self.__class__.__name__}.mqtt_disconnect: remove watching for {callback}" )
def _mqtt_message(self, client, userdata, message): """ Automatically call, if message received from broker. if message from an instrument than send it via channel to mqtt_receive in your application else if message from control than call functions/attribute from the instrument """ # if self.mqtt_debug: print('{}.mqtt_message value/payload: {} = {}'.format(self.__class__.__name__,message.topic,value)) msg = message.payload.decode("utf-8", "ignore") try: payload = json.loads(str(msg)) except Exception: # print("failed:",type(value)) payload = msg endtopic = message.topic.split("/")[-1] if ( self.typ == "control" and type(payload) is dict and endtopic != TOPIC_CONTROL ): # message from an instrument received -> send it to a GUI or control self.mqttreceive.channel.emit(message.topic, msg) elif ( self.typ == "instrument" and type(payload) is dict and endtopic == TOPIC_CONTROL ): # message from control received, call function from instrument instname = list(payload.keys())[0] payload = payload[instname] if instname == TOPIC_INSTNAME: instname = payload["cmd"][0] payload["cmd"].pop(0) if instname in self.instruments: instrument = self.instruments[instname] else: if "cmd" in payload and payload["cmd"] != "mqtt_status": self.logger.warning( f"mqtt_message: {instname} not in my instrument list {self.instruments.keys()} payload = {payload} -> do nothing" ) return cmd = payload["cmd"] value = payload["payload"] typ = payload["type"] result = common.strcall( instrument, cmd, value=value, typ=typ, mqttcheck=True ) if result == "ERROR": self.logger.warning( f"{instname} get an 'not found' from a mqtt command = {cmd}, {value}" ) return elif self.typ == "instrument" or ( self.typ == "control" and endtopic != TOPIC_CONTROL ): print( f"unknown mqtt message {message.topic} = {message.payload} -> do nothing" )
[docs] def publish(self, attr, value, qos=None, retain=None): """Send message to broker.""" if qos is None: qos = self.qos if retain is None: retain = self.retain try: value = json.dumps(value) except Exception as ex: self.logger.error( f"publish: couldn't send {attr} = {value} excepton occure {ex}" ) return self.client.publish( f"{attr}", str(value), qos=qos, retain=retain ) # payload must be string,bytearray,int,float or None
[docs] def clearpublish(self, attr): """Remove retained publish message from broker.""" self.client.publish( attr, "", 0, True ) # send 0 message to remove retained flag
[docs] def close(self): """Disconnect from broker.""" if hasattr(self, "broker") and self.broker is not None: self.client.loop_stop() self.client.disconnect() self.client.on_connect = None self.client.on_disconnect = None
def __del__(self): self.close() @abstractmethod def mqtt_receive(self, topic, msg): pass
[docs] class mqtt_deviceattributes(object): """ Handle mqqt messages for the instrument (=devices is 'transmitter'). - topic for the device: f'{TOPIC_PREFIX}/'Hostname'/{TOPIC_INSTRUMENT} - payload: {"instrumentname": {"type": "set/get", "cmd": "function/attributename", "payload": yourvalues}} mqtt_all will be overwriten, to uncover attributes for sending mqqt-messages - if you define the function '_mqtt2json(value, attributename)' in your device, than this function will be call if a mqtt message should be send With this function you can translate the value, or it the result 'nomqtt' than the message will not be send. """ import json mqtt_enable = False mqtt_list = [] command = { "set": {"type": "cmd", "command": "menu", "payload": None}, }
[docs] def __init__(self): _setattr = object.__setattr__.__get__( self, self.__class__ ) # set the values direct, without sending a mqtt-message _setattr("mqtt_debug", False) _setattr("_mqttclient", None) _setattr("_mqtt_status", "disconnect") _setattr("mqtt_enable", False) _setattr("hostname", socket.gethostname()) _setattr("topic", f"{TOPIC_PREFIX}/{self.hostname}/{TOPIC_INSTRUMENT}") _setattr("mqtt_all", []) _setattr( "_mqttclient", None ) # normally, overwritten in setup_inst from instrument
[docs] def mqtt_add(self, client, instrument, liste="#", qos=0): """ Add the instrument to mqtt. calling from base_instrument, after the instrument (device) has been create. Normally you have not to use this function, only base_instrument use it. Returns: None. """ self._mqttclient = client if liste == "#": self.mqtt_list = self.mqtt_all else: self.mqtt_list = liste if client is None: print("mqtt_add: client not defined") return client.mqtt_add(instrument) if self._mqttclient is not None: self.mqtt_enable = True if ( hasattr(self, "gui") and self.gui is not None ): # send a message direct to semictrl that the gui is available payload = { "semictrl": { "type": "cmd", "cmd": "button", "payload": {self.instName: self.gui}, } } self._mqttclient.publish(self.topic, payload) self._mqtt_status = "disconnect" self.publish_set( "mqtt_status", "disconnect" ) # send mqtt_status default = disconnect
[docs] def mqtt_disconnect(self): """ Remove the instrument from mqtt. calling from base_instrument, if the instrument are closing. Normally you have not to use this function, only base_instrument use it. Returns: None. """ if hasattr(self, "_mqttclient") and self._mqttclient is not None: self.publish_set("mqtt_status", "disconnect") self._mqtt_status = "disconnect" self._mqttclient.mqtt_disconnect(self) # remove from list self.mqtt_enable = False
def __setattr__(self, attr, value): """ Publish attribute and value, if attribute was set and it is in mqtt_list. pulish mean: send 'hostname/instName/attibute/set value' to the broker This function will call from _mqtt_message, normally you have not to use this function. """ object.__setattr__(self, attr, value) if ( self.mqtt_enable and attr in self.mqtt_list ): # attr == 'mqtt_status' or payload = { f"{self.instName}": { "type": "set", "cmd": attr, "payload": value, } } self._mqttclient.publish(self.topic, payload) if self.mqtt_debug: print( "{} {} publish: {} {}".format( self.__class__.__name__, self.instName, self.topic, payload, ) ) def __getattribute__(self, attr, *values): """ Publish attribute and value, if attribute was get and it is in mqtt_list. pulish mean: send 'hostname/instName/attibute/set value' to the broker This function will call from _mqtt_message, normally you have not to use this function. """ value = super(__class__, self).__getattribute__(attr) if attr == "mqtt_enable": return value if ( attr not in ["__class__", "mqtt_list", "INST_FUNCTION"] and attr in self.mqtt_list ): if f"{attr}()" in self.mqtt_list: # TODO: function-call have to add their own call from self._mqttclient.publish(), because unitl now # I did'nt know to get the function parameters # payload = {f"{self.instName}": {'type': 'get', 'cmd': attr, 'payload': values}} print( f" ({value} sorry, I did'nt know to get the function parameters -> improve it, until now no mqtt publish" ) return value elif hasattr(self, "_mqtt2json"): # print(f" call _mqtt2json({value})") payload = self._mqtt2json(value, attr) if payload != "nomqtt": payload = { f"{self.instName}": { "type": "get", "cmd": attr, "payload": payload, } } else: payload = { f"{self.instName}": { "type": "get", "cmd": attr, "payload": value, } } if payload != "nomqtt": if self._mqttclient is not None: self._mqttclient.publish(self.topic, payload) else: print(f" _mqttclient is None: coulnt't publish {attr}, {value}") if self.mqtt_debug: print( "{} {} publish: {} {}".format( self.__class__.__name__, self.instName, self.topic, payload, ) ) return value
[docs] def publish(self, topic, value): """Publish topic as type='cmd' with paylad=value.""" # function_name=inspect.stack()[1][3] payload = { f"{self.instName}": {"type": "cmd", "cmd": topic, "payload": value} } if self.mqtt_debug: print( f"{self.__class__.__name__}.publish: {self.topic} = {payload}" ) if self._mqttclient is not None: self._mqttclient.publish(self.topic, payload)
[docs] def publish_get(self, function_name, value): """Publish function_name as type='get' with paylad=value.""" # function_name=inspect.stack()[1][3] if self.mqtt_enable: payload = { f"{self.instName}": { "type": "set", "cmd": function_name, "payload": value, } } if self.mqtt_debug: print( f"{self.__class__.__name__}.publish_get: {self.topic} = {payload}" ) if self._mqttclient is not None: self._mqttclient.publish(self.topic, payload)
[docs] def publish_set(self, function_name, value): """Publish function_name as type='set' with paylad=value.""" # function_name=inspect.stack()[1][3] if self.mqtt_enable: payload = { f"{self.instName}": { "type": "set", "cmd": function_name, "payload": value, } } if self.mqtt_debug: print( f"{self.__class__.__name__}.publish_set: {self.topic} = {payload}" ) if self._mqttclient is not None: self._mqttclient.publish(self.topic, payload)
@property def mqtt_status(self): """Getter for the mqtt_status.""" if self.mqtt_debug: print(f"{self.instName}.mqtt_status == {self._mqtt_status}") return self._mqtt_status @mqtt_status.setter def mqtt_status(self, value): """The mqtt_status should be set only from an extern mqtt message. value = disconnect/connect """ if self.mqtt_debug: print(f" {self.instName}.mqtt_status := {value}") self._mqtt_status = value if value == "connect": self.publish_set("mqtt_status", "connect") def close(self): self.mqtt_disconnect()
[docs] class mqtt_signal(QtCore.QObject): """Class for communicate with mqtt messages . We need a channel to communicate with some gui widgets (e.q. QTextEdit). otherwhile we get the error: 'Cannot create children for a parent that is in a different thread' """ channel = QtCore.pyqtSignal(str, str)
[docs] class mqtt_displayattributes(object): """ mqtt messages for display and controlling (='receiver'). docu muss überarbeitet werden: publish: - topic for display application: f'{TOPIC_PREFIX}/'Hostname'/{TOPIC_CONTROL} - payload: f'{"instrumentname": {"type": "set/get", "cmd": "function/attributename", "payload": yourvalues}}' """ import json
[docs] def __init__(self, client, message_client, mqtt_receive=None): self.mqtt_debug = False self.mqttclient = client self.instName = message_client # self._mqttreceive = client.mqtt_signal() if mqtt_receive is None: mqtt_receive = self.mqtt_receive self.mqttclient.mqttreceive.channel.connect(mqtt_receive) self.topic = f"{message_client}/{platform.node()}/{TOPIC_CONTROL}"
[docs] def publish(self, attr, value): """Send message to broker.""" self.mqttclient.publish(attr, value)
[docs] def publish_set(self, instName, function_name, value): """Publish attribute with value as set.""" # function_name=inspect.stack()[1][3] payload = { f"{instName}": { "type": "set", "cmd": function_name, "payload": value, } } if self.mqtt_debug: self.mqttclient.logger.debug( f"publish_set: {self.topic} = {payload}" ) self.mqttclient.publish(self.topic, payload)
[docs] def publish_get(self, instName, function_name): """Publish attribute with value as get.""" # function_name=inspect.stack()[1][3] payload = { f"{instName}": {"type": "get", "cmd": function_name, "payload": ""} } if self.mqtt_debug: self.mqttclient.logger.debug( f"publish_get: {self.topic} = {payload}" ) self.mqttclient.publish(self.topic, payload)
[docs] def mqtt_add(self): """Add the modul as member for the mqtt communication.""" self.mqttclient.mqtt_add(self, "")
[docs] def mqtt_disconnect(self): """Remove the modul as member for the mqtt communication.""" self.mqttclient.client.message_callback_remove( self.instName + "/#" ) # remove subscribe for this client
@abstractmethod def mqtt_receive(self, topic, msg): print("ERROR: You have to overwrite mqtt_receive in your class")
if __name__ == "__main__": mqtt = mqtt_init() # mqtt.init('172.27.112.62', message_client='ate', username='') # broker mosquitto on 'ubuntu-0001' mqtt.init( "127.0.0.1", message_client=f"ate/{socket.gethostname()}/", username="" ) # local mosquitto broker i = 0 while True: time.sleep(0.5) mqtt.client.publish("mqtt_client/temperature", "25") i += 1 if i > 10: break mqtt.close()