Source code for pylab_ml.common.mqtt_client

# -*- coding: utf-8 -*-
"""
This script contains a MQTT client class that can be used to communicate with a MQTT broker.
The MQTT client class uses the paho-mqtt library to handle the communication.

Created on Tue Feb 18 09:52:09 2020

@author: Zlin526F

Bugs:
    wenn device ohne close in spyder beendet wird, so ist
    thread mqtt_deviceattributes._mqtt_message weiterhin aktiv

todo:

"""

import time
import platform
import json
from abc import abstractmethod
import paho.mqtt.client as mqttc
import socket
from .singleton import Singleton
from PyQt5 import QtCore
from . import common

__author__ = "Zlin526F"
__copyright__ = "Copyright 2023, Lab"
__credits__ = ["Zlin526F"]
__email__ = "Zlin526F@github"
__version__ = "0.0.1"


TOPIC_PREFIX = "ate"
TOPIC_CONTROL = "semictrl"
TOPIC_INSTRUMENT = "instruments"
TOPIC_INSTNAME = "tcc"


[docs] class mylogger(object): """ Class for logging messages. """
[docs] def __init__(self, output=None, enable=False, parent=None): self.output = output self.enable = enable self.parent = parent
[docs] def debug(self, msg): """ Log a debug message. """ if not self.enable: return self.display("DEBUG", msg)
[docs] def info(self, msg): """ Log an info message. """ if not self.enable: return self.display("INFO", msg)
[docs] def measure(self, msg): """ Log a measure message. """ if not self.enable: return self.display("MEASURE", msg)
[docs] def warning(self, msg): """ Log a warning message. """ self.display("WARNING", msg)
[docs] def error(self, msg): """ Log an error message. """ self.display("ERROR", msg)
[docs] def display(self, typ, msg): """ Display a message with a specific type. Parameters ---------- typ : str The type of the message (e.g., "DEBUG", "INFO", "WARNING", "ERROR"). msg : str The message to be displayed. Returns ------- None """ msg = f"{self.parent} {typ} | {msg}" if self.output is None: print(msg) else: self.output.append(msg)
[docs] def log_message(self, level, msg): """ Log a message with a specific level. """ self.display(level, msg)
[docs] class mqtt_init(object, metaclass=Singleton): """ Initialise the mqtt connection and provide functions for the communication. """
[docs] def __init__(self, typ="control", logger=None): """ Initialise the class mqtt_init. Parameters ---------- typ : str 'instrument' for instruments 'control' for guis or extern controlling (default) logger : mylogger or None Logger instance for logging messages. If None, no logging is performed. """ def on_connect(client, userdata, flags, rc): """ Callback function for when the client receives a CONNACK response from the server. Parameters ---------- client : paho.mqtt.client.Client The client instance for this callback. userdata : any The private user data as set in Client() or userdata_set(). flags : dict Response flags sent by the broker. rc : int The connection result. Returns ------- None """ if rc == 0: msg = f"MQTT Broker {self.broker}, user {self.username} connected" self.logger.info(msg) elif rc == 5: msg = f"MQTT Broker {self.broker}, user {self.username} authentication error" self.logger.error(msg) else: msg = f"MQTT Broker {self.broker}, user {self.username} connection failed ({mqttc.connack_string(rc)})" self.logger.error(msg) def on_disconnect(client, userdata, rc): """ Callback function for when the client disconnects from the broker. Parameters ---------- client : paho.mqtt.client.Client The client instance for this callback. userdata : any The private user data as set in Client() or userdata_set(). rc : int The disconnection result. Returns ------- None """ msg = "MQTT Broker {} user {} disconnected".format( self.broker, self.username ) if rc > 0: msg += ", {}".format(mqttc.connack_string(rc)) print(msg) self.typ = typ print(f'create mqtt_init as Singleton typ={typ}') self.broker = "" self.username = "" self.qos = 0 self.retain = False if logger is not None: self.logger = logger else: self.logger = mylogger(parent=self.typ) # enable=True self.mqttreceive = mqtt_signal() client = mqttc.Client(clean_session=True) client.enable_logger(logger=None) client.on_connect = on_connect client.on_disconnect = on_disconnect self.client = client self.instruments = {}
[docs] def init( self, broker="127.0.0.1", port=1883, message_client=None, username="", userpasswd="", qos=0, retain=False, ): """ Normally call from plugin semi-control, you have not to call this function, if you use semi-control. Parameters ---------- broker : str The address of the MQTT broker (default: "127.0.0.1") port : int The port of the MQTT broker (default: 1883) message_client : str or None None if client the instrument, like e.q. smu, digital-multimeter or thermostreamer. eg. 'DT1604092' if control and you want connect from an extern computer to DT1604092 (but is not checked until now!!) username : str The username for MQTT authentication (default: "") userpasswd : str The password for MQTT authentication (default: "") qos : int The Quality of Service level for MQTT messages (default: 0) retain : bool Whether to retain MQTT messages (default: False) Returns ------- bool True if the MQTT client was successfully initialized and connected to the broker, False otherwise. """ import socket self.qos = qos self.retain = retain self.username = username self.hostname = socket.gethostname() if broker.find(".") > 0: self.broker = broker else: self.broker = socket.gethostname(broker) if username != "": self.client.username_pw_set(username, password=userpasswd) try: self.client.connect( self.broker, port, 60 ) # blocking until connection establish or timeout except Exception: self.logger.error( "ERROR: MQTT couldn't connect to broker {}".format(self.broker) ) self.topic = message_client return False self.client.loop_start() if message_client is None: message_client = f"{TOPIC_PREFIX}" self.client.subscribe(f"{message_client}/#", qos=qos) self.logger.info( f"{self.__class__.__name__}.init: subscribe for {message_client}/#" ) self.topic = message_client self.instruments = {} return True
[docs] def mqtt_add(self, instrument, hostname=None): """ Add first callback to get mqtt-commands from control, and append instrument to instruments-list. Parameters ---------- instrument : object The instrument to be added to the MQTT client. hostname : str or None The hostname for the MQTT topic. If None, the client's hostname will be used (default: None). Returns ------- None """ if not hasattr(self, "hostname"): return if hostname is None: hostname = self.hostname if self.instruments == {}: if hostname == "": callback = f"{TOPIC_PREFIX}/#" else: callback = f"{TOPIC_PREFIX}/{hostname}/{TOPIC_CONTROL}/#" self.client.message_callback_add(callback, self._mqtt_message) self.logger.debug( f"{self.__class__.__name__}.mqtt_add: watching for {callback}" ) self.instruments[instrument.instName] = instrument self.logger.debug( f"{self.__class__.__name__}.mqtt_add: add {instrument.instName} to instruments-list" )
[docs] def mqtt_disconnect(self, instrument=None, hostname=None): """ Remove instrument from the instruments-list, if instruments-list is empty than remove callback. Parameters ---------- instrument : object or None The instrument to be removed from the MQTT client. If None, no instrument will be removed (default: None). hostname : str or None The hostname for the MQTT topic. If None, the client's hostname will be used (default: None). Returns ------- None """ if hostname is None: if not hasattr(self, "hostname"): return hostname = self.hostname if instrument is not None and instrument.instName in self.instruments.keys(): self.instruments.pop(instrument.instName) if self.instruments == {}: callback = f"{TOPIC_PREFIX}/{self.hostname}/{TOPIC_CONTROL}/#" self.client.message_callback_remove(callback) # remove subscribe self.logger.debug( f"{self.__class__.__name__}.mqtt_disconnect: remove watching for {callback}" )
def _mqtt_message(self, client, userdata, message): """ Automatically call, if message received from broker. If message from an instrument than send it via channel to mqtt_receive in your application, else if message from control than call functions/attribute from the instrument. Parameters ---------- client : object The MQTT client instance. userdata : object The private user data as set in Client() or userdata_set(). message : object An instance of MQTTMessage, which contains topic, payload, qos, retain. Returns ------- None """ # if self.mqtt_debug: print('{}.mqtt_message value/payload: {} = {}'.format(self.__class__.__name__,message.topic,value)) msg = message.payload.decode("utf-8", "ignore") try: payload = json.loads(str(msg)) except Exception: # print("failed:",type(value)) payload = msg endtopic = message.topic.split("/")[-1] if ( self.typ == "control" and type(payload) is dict and endtopic != TOPIC_CONTROL ): # message from an instrument received -> send it to a GUI or control self.mqttreceive.channel.emit(message.topic, msg) elif ( self.typ == "instrument" and type(payload) is dict and endtopic == TOPIC_CONTROL ): # message from control received, call function from instrument instname = list(payload.keys())[0] payload = payload[instname] if instname == TOPIC_INSTNAME: instname = payload["cmd"][0] payload["cmd"].pop(0) if instname in self.instruments: instrument = self.instruments[instname] else: if "cmd" in payload and payload["cmd"] != "mqtt_status": self.logger.warning( f"mqtt_message: {instname} not in my instrument list {self.instruments.keys()} payload = {payload} -> do nothing" ) return cmd = payload["cmd"] value = payload["payload"] typ = payload["type"] result = common.strcall( instrument, cmd, value=value, typ=typ, mqttcheck=True ) if result == "ERROR": self.logger.warning( f"{instname} get an 'not found' from a mqtt command = {cmd}, {value}" ) return elif self.typ == "instrument" or ( self.typ == "control" and endtopic != TOPIC_CONTROL ): print( f"unknown mqtt message {message.topic} = {message.payload} -> do nothing" )
[docs] def publish(self, attr, value, qos=None, retain=None): """ Publish a message to the MQTT broker. Parameters ---------- attr : str The topic attribute to publish. value : any The value to publish. This will be converted to a JSON string before publishing. qos : int or None The Quality of Service level for the MQTT message. If None, the default QoS level of the client will be used (default: None). retain : bool or None Whether to retain the MQTT message. If None, the default retain flag of the client will be used (default: None). Returns ------- None """ if qos is None: qos = self.qos if retain is None: retain = self.retain try: value = json.dumps(value) except Exception as ex: self.logger.error( f"publish: couldn't send {attr} = {value} excepton occure {ex}" ) return self.client.publish( f"{attr}", str(value), qos=qos, retain=retain ) # payload must be string,bytearray,int,float or None
[docs] def clearpublish(self, attr): """ Remove retained publish message from broker Parameters ---------- attr : str The topic attribute for which the retained message should be removed. Returns ------- None """ self.client.publish( attr, "", 0, True ) # send 0 message to remove retained flag
[docs] def close(self): """ Close the MQTT client connection and clean up resources. """ if hasattr(self, "broker") and self.broker is not None: self.client.loop_stop() self.client.disconnect() self.client.on_connect = None self.client.on_disconnect = None
def __del__(self): """ Ensure that the MQTT client connection is closed when the instance is deleted. """ self.close() @abstractmethod def mqtt_receive(self, topic, msg): pass
[docs] class mqtt_deviceattributes(object): """ Handle MQTT messages for the instrument (=devices is 'transmitter'). - topic for the device: f'{TOPIC_PREFIX}/'Hostname'/{TOPIC_INSTRUMENT} - payload: {"instrumentname": {"type": "set/get", "cmd": "function/attributename", "payload": yourvalues}} mqtt_all will be overwriten, to uncover attributes for sending mqtt-messages - if you define the function '_mqtt2json(value, attributename)' in your device, than this function will be call if a mqtt message should be send With this function you can translate the value, or it the result 'nomqtt' than the message will not be send. """ import json mqtt_enable = False mqtt_list = [] command = { "set": {"type": "cmd", "command": "menu", "payload": None}, }
[docs] def __init__(self): _setattr = object.__setattr__.__get__( self, self.__class__ ) # set the values direct, without sending a mqtt-message _setattr("mqtt_debug", False) _setattr("_mqttclient", None) _setattr("_mqtt_status", "disconnect") _setattr("mqtt_enable", False) _setattr("hostname", socket.gethostname()) _setattr("topic", f"{TOPIC_PREFIX}/{self.hostname}/{TOPIC_INSTRUMENT}") _setattr("mqtt_all", []) _setattr( "_mqttclient", None ) # normally, overwritten in setup_inst from instrument
[docs] def mqtt_add(self, client, instrument, liste="#", qos=0): """ Add the instrument to mqtt. Calling from base_instrument, after the instrument (device) has been create. Normally you have not to use this function, only base_instrument use it. Parameters ---------- client : mqtt_init The MQTT client instance to which the instrument should be added. instrument : object The instrument to be added to the MQTT client. liste : list or str A list of attributes for which MQTT messages should be sent. If set to "#", all attributes will be included (default: "#"). qos : int The Quality of Service level for MQTT messages (default: 0). Returns ------- None """ self._mqttclient = client if liste == "#": self.mqtt_list = self.mqtt_all else: self.mqtt_list = liste if client is None: print("mqtt_add: client not defined") return client.mqtt_add(instrument) if self._mqttclient is not None: self.mqtt_enable = True if ( hasattr(self, "gui") and self.gui is not None ): # send a message direct to semictrl that the gui is available payload = { "semictrl": { "type": "cmd", "cmd": "button", "payload": {self.instName: self.gui}, } } self._mqttclient.publish(self.topic, payload) self._mqtt_status = "disconnect" self.publish_set( "mqtt_status", "disconnect" ) # send mqtt_status default = disconnect
[docs] def mqtt_disconnect(self): """ Remove the instrument from mqtt. Calling from base_instrument, if the instrument are closing. Normally you have not to use this function, only base_instrument use it. """ if hasattr(self, "_mqttclient") and self._mqttclient is not None: self.publish_set("mqtt_status", "disconnect") self._mqtt_status = "disconnect" self._mqttclient.mqtt_disconnect(self) # remove from list self.mqtt_enable = False
def __setattr__(self, attr, value): """ Publish attribute and value, if attribute was set and it is in mqtt_list. Publish means: send 'hostname/instName/attribute/set value' to the broker. This function will be called from _mqtt_message, normally you have not to use this function. Parameters ---------- attr : str The name of the attribute being set. value : any The value being assigned to the attribute. Returns ------- None """ object.__setattr__(self, attr, value) if ( self.mqtt_enable and attr in self.mqtt_list ): # attr == 'mqtt_status' or payload = { f"{self.instName}": { "type": "set", "cmd": attr, "payload": value, } } self._mqttclient.publish(self.topic, payload) if self.mqtt_debug: print( "{} {} publish: {} {}".format( self.__class__.__name__, self.instName, self.topic, payload, ) ) def __getattribute__(self, attr, *values): """ Publish attribute and value, if attribute was get and it is in mqtt_list. Publish means: send 'hostname/instName/attribute/set value' to the broker. This function will be called from _mqtt_message, normally you have not to use this function. Parameters ---------- attr : str The name of the attribute being accessed. *values : tuple Additional values that may be used for function calls (not implemented in this version). Returns ------- any The value of the attribute being accessed. """ value = super(__class__, self).__getattribute__(attr) if attr == "mqtt_enable": return value if ( attr not in ["__class__", "mqtt_list", "INST_FUNCTION"] and attr in self.mqtt_list ): if f"{attr}()" in self.mqtt_list: # TODO: function-call have to add their own call from self._mqttclient.publish(), because unitl now # I did'nt know to get the function parameters # payload = {f"{self.instName}": {'type': 'get', 'cmd': attr, 'payload': values}} print( f" ({value} sorry, I did'nt know to get the function parameters -> improve it, until now no mqtt publish" ) return value elif hasattr(self, "_mqtt2json"): # print(f" call _mqtt2json({value})") payload = self._mqtt2json(value, attr) if payload != "nomqtt": payload = { f"{self.instName}": { "type": "get", "cmd": attr, "payload": payload, } } else: payload = { f"{self.instName}": { "type": "get", "cmd": attr, "payload": value, } } if payload != "nomqtt": if self._mqttclient is not None: self._mqttclient.publish(self.topic, payload) else: print(f" _mqttclient is None: coulnt't publish {attr}, {value}") if self.mqtt_debug: print( "{} {} publish: {} {}".format( self.__class__.__name__, self.instName, self.topic, payload, ) ) return value
[docs] def publish(self, topic, value): """ Publish topic as type='cmd' with payload=value. Parameters ---------- topic : str The name of the topic to publish. value : any The value to be published. Returns ------- None """ # function_name=inspect.stack()[1][3] payload = { f"{self.instName}": {"type": "cmd", "cmd": topic, "payload": value} } if self.mqtt_debug: print( f"{self.__class__.__name__}.publish: {self.topic} = {payload}" ) if self._mqttclient is not None: self._mqttclient.publish(self.topic, payload)
[docs] def publish_get(self, function_name, value): """ Publish function_name as type='get' with paylad=value. Parameters ---------- function_name : str The name of the function to be published as a 'get' command. value : any The value to be published as the payload of the 'get' command. Returns ------- None """ # function_name=inspect.stack()[1][3] if self.mqtt_enable: payload = { f"{self.instName}": { "type": "set", "cmd": function_name, "payload": value, } } if self.mqtt_debug: print( f"{self.__class__.__name__}.publish_get: {self.topic} = {payload}" ) if self._mqttclient is not None: self._mqttclient.publish(self.topic, payload)
[docs] def publish_set(self, function_name, value): """ Publish function_name as type='set' with paylad=value. Parameters ---------- function_name : str The name of the function to be published as a 'set' command. value : any The value to be published as the payload of the 'set' command. Returns ------- None """ # function_name=inspect.stack()[1][3] if self.mqtt_enable: payload = { f"{self.instName}": { "type": "set", "cmd": function_name, "payload": value, } } if self.mqtt_debug: print( f"{self.__class__.__name__}.publish_set: {self.topic} = {payload}" ) if self._mqttclient is not None: self._mqttclient.publish(self.topic, payload)
@property def mqtt_status(self): """ Getter for the mqtt_status. """ if self.mqtt_debug: print(f"{self.instName}.mqtt_status == {self._mqtt_status}") return self._mqtt_status @mqtt_status.setter def mqtt_status(self, value): """The mqtt_status should be set only from an extern mqtt message. value = disconnect/connect """ if self.mqtt_debug: print(f" {self.instName}.mqtt_status := {value}") self._mqtt_status = value if value == "connect": self.publish_set("mqtt_status", "connect") def close(self): self.mqtt_disconnect()
[docs] class mqtt_signal(QtCore.QObject): """Class for communicate with mqtt messages . We need a channel to communicate with some gui widgets (e.q. QTextEdit). otherwhile we get the error: 'Cannot create children for a parent that is in a different thread' """ channel = QtCore.pyqtSignal(str, str)
[docs] class mqtt_displayattributes(object): """ MQTT messages for display and controlling (='receiver'). docu muss überarbeitet werden: publish: - topic for display application: f'{TOPIC_PREFIX}/'Hostname'/{TOPIC_CONTROL} - payload: f'{"instrumentname": {"type": "set/get", "cmd": "function/attributename", "payload": yourvalues}}' """ import json
[docs] def __init__(self, client, message_client, mqtt_receive=None): """ Initialize the MQTT display attributes. Parameters ---------- client : mqtt_init The MQTT client instance to be used for communication. message_client : str The name of the message client, used for constructing the MQTT topic. mqtt_receive : function or None A custom function to handle received MQTT messages. If None, the default mqtt_receive method will be used (default: None). """ self.mqtt_debug = False self.mqttclient = client self.instName = message_client # self._mqttreceive = client.mqtt_signal() if mqtt_receive is None: mqtt_receive = self.mqtt_receive self.mqttclient.mqttreceive.channel.connect(mqtt_receive) self.topic = f"{message_client}/{platform.node()}/{TOPIC_CONTROL}"
[docs] def publish(self, attr, value): """ Send message to broker. Parameters ---------- attr : str The attribute name to be published. value : any The value to be published. Returns ------- None """ self.mqttclient.publish(attr, value)
[docs] def publish_set(self, instName, function_name, value): """ Publish attribute with value as set. Parameters ---------- instName : str The name of the instrument for which the command is being published. function_name : str The name of the function or attribute to be set. value : any The value to be set for the specified function or attribute. Returns ------- None """ # function_name=inspect.stack()[1][3] payload = { f"{instName}": { "type": "set", "cmd": function_name, "payload": value, } } if self.mqtt_debug: self.mqttclient.logger.debug( f"publish_set: {self.topic} = {payload}" ) self.mqttclient.publish(self.topic, payload)
[docs] def publish_get(self, instName, function_name): """ Publish attribute with value as get. Parameters ---------- instName : str The name of the instrument for which the command is being published. function_name : str The name of the function or attribute to be retrieved. Returns ------- None """ # function_name=inspect.stack()[1][3] payload = { f"{instName}": {"type": "get", "cmd": function_name, "payload": ""} } if self.mqtt_debug: self.mqttclient.logger.debug( f"publish_get: {self.topic} = {payload}" ) self.mqttclient.publish(self.topic, payload)
[docs] def mqtt_add(self): """ Add the module as member for the mqtt communication. """ self.mqttclient.mqtt_add(self, "")
[docs] def mqtt_disconnect(self): """ Remove the module as member for the mqtt communication. """ self.mqttclient.client.message_callback_remove( self.instName + "/#" ) # remove subscribe for this client
@abstractmethod def mqtt_receive(self, topic, msg): print("ERROR: You have to overwrite mqtt_receive in your class")
if __name__ == "__main__": mqtt = mqtt_init() # mqtt.init('172.27.112.62', message_client='ate', username='') # broker mosquitto on 'ubuntu-0001' mqtt.init( "127.0.0.1", message_client=f"ate/{socket.gethostname()}/", username="" ) # local mosquitto broker i = 0 while True: time.sleep(0.5) mqtt.client.publish("mqtt_client/temperature", "25") i += 1 if i > 10: break mqtt.close()