Source code for pylab_ml.scope.softscope

# -*- coding: utf-8 -*-
"""
This script defines the Softscope class, which is responsible for communicating with a softscope GUI via MQTT. 
The class allows for starting and stopping data sampling, setting the channel to be monitored, and adjusting the sample time. 
It uses a Timer thread to manage the sampling loop and handles MQTT commands to control its behavior. 
The class also includes error handling for cases where the specified channel is not defined or returns an error.

Created on Thu Jan  7 17:18:03 2021

@author: jung
"""
from ate_common.logger import LogLevel
from pylab_ml.common.mqtt_client import mqtt_deviceattributes
from pylab_ml.common import common
from time import time
from threading import Timer

__author__ = "Zlin526F"
__credits__ = ["Zlin526F"]
__email__ = "Zlin526F@github"


[docs] class Softscope(mqtt_deviceattributes): """ Class for the communication with the softcope-GUI. via mqtt some command controlled the sending of data: start: create a Timer-thread with the loop-time = sampleTime stop: stop the Timer-thread setchannel(value): call the value as attribute for each timing event sampleTime: loop time TODO: very simple..... max sample Time ~50Hz """
[docs] def __init__(self, mqttc, logger, instName="softscope"): """ Initialise the softscope class and subscribe for mqtt if mqttc is not None Parameters ---------- mqttc : MQTT client object The MQTT client to use for communication. If None, MQTT functionality will be disabled. logger : Logger object The logger to use for logging messages. instName : str, optional The name of the instrument. Default is "softscope". """ self.instName = instName self.logger = logger super().__init__() self.gui = "pylab_ml.gui.instruments.softscope.softscope" # semi-ctrl use this lib for the scope gui self.mqtt_all = ["setchannel()", "on()", "off()", "sampleTime", "test"] # approved commands for mqtt if mqttc is not None: self.mqtt_add(mqttc, self) # subscribe for mqtt and send information about the gui self.scopeChannel = None self._samplethread = None self.sampleTime = 0.1 self._minSampleTime = 0.01 self._sawtooth = 0 self.topinstname = None self.busy = False
[docs] def init(self, topinstname): """Initialize the topinstname for the strcall to get the value for the scopeChannel""" self.topinstname = topinstname
[docs] def setchannel(self, value): """Set the scopeChannel via mqtt""" result = common.strcall(self.topinstname, value) self.scopeChannel = value if result != "ERROR" else "ERROR" # print(f'Softscope.setchannel: {value} -> {result}') self.publish_set("scopeChannel", self.scopeChannel)
[docs] def on(self): """Start the softscope sampling.""" if self.busy: self.off() if self.scopeChannel is None: self.logger.log_message(LogLevel.Error(), f"{self.instName} scopeChannel not defined") return self.busy = True self._newsample()
[docs] def off(self): """Stop the softscope sampling.""" self.busy = False if self._samplethread is not None: self._samplethread.cancel()
@property def sampleTime(self): """Get the current sample time for the softscope.""" return self._sampleTime @sampleTime.setter def sampleTime(self, value): """Set the sample time for the softscope.""" if type(value) not in [int, float]: return self._sampleTime = value
[docs] def close(self): """Close the softscope and disconnect from MQTT.""" self.off() self.mqtt_disconnect()
def _newsample(self): """Internal method to handle the sampling loop.""" if not self.busy: return starttime = time() # print(f'_newsample: {self.topinstname}, {self.scopeChannel}') result = common.strcall(self.topinstname, self.scopeChannel) if result == "ERROR": self.logger.log_message(LogLevel.Error(), f"{self.instName} get ERROR from {self.scopeChannel} -->stop sampling") return # print(result) mytime = self._sampleTime - (time() - starttime) if mytime <= 0: mytime = self._minSampleTime self._samplethread = Timer(mytime, self._newsample) self._samplethread.start() @property def test(self): """ Get the current test value for the softscope. Returns ------- int A sawtooth value that increments with each call and resets after reaching 256. """ # print(f'call scope.sawtooth value= {self._sawtooth}') self._sawtooth += 1 if self._sawtooth > 256: self._sawtooth = 0 return self._sawtooth def __repr__(self): return f"{self.__class__}"
if __name__ == "__main__": from pylab_ml.common.mqtt_client import mylogger scope = Softscope(mqttc=None, logger=mylogger, instName="softscope")