Source code for pylab_ml.scope.softscope

# -*- coding: utf-8 -*-
"""
Created on Thu Jan  7 17:18:03 2021

@author: jung
"""
from ate_common.logger import LogLevel
from pylab_ml.common.mqtt_client import mqtt_deviceattributes
from pylab_ml.common import common
from time import time
from threading import Timer

__author__ = "Zlin526F"
__credits__ = ["Zlin526F"]
__email__ = "Zlin526F@github"


[docs] class Softscope(mqtt_deviceattributes): """ Class for the communication with the softcope-GUI. via mqtt some command controlled the sending of data: start: create a Timer-thread with the loop-time = sampleTime stop: stop the Timer-thread setchannel(value): call the value as attribute for each timing event sampleTime: loop time TODO: very simple..... max sample Time ~50Hz """
[docs] def __init__(self, mqttc, logger, instName="softscope"): """Initialise.""" self.instName = instName self.logger = logger super().__init__() self.gui = "pylab_ml.gui.instruments.softscope.softscope" # semi-ctrl use this lib for the scope gui self.mqtt_all = ["setchannel()", "on()", "off()", "sampleTime", "test"] # approved commands for mqtt if mqttc is not None: self.mqtt_add(mqttc, self) # subscribe for mqtt and send information about the gui self.scopeChannel = None self._samplethread = None self.sampleTime = 0.1 self._minSampleTime = 0.01 self._sawtooth = 0 self.topinstname = None self.busy = False
def init(self, topinstname): self.topinstname = topinstname
[docs] def setchannel(self, value): """set the scopeChannel via mqtt""" result = common.strcall(self.topinstname, value) self.scopeChannel = value if result != "ERROR" else "ERROR" # print(f'Softscope.setchannel: {value} -> {result}') self.publish_set("scopeChannel", self.scopeChannel)
def on(self): if self.busy: self.off() if self.scopeChannel is None: self.logger.log_message(LogLevel.Error(), f"{self.instName} scopeChannel not defined") return self.busy = True self._newsample() def off(self): self.busy = False if self._samplethread is not None: self._samplethread.cancel() @property def sampleTime(self): return self._sampleTime @sampleTime.setter def sampleTime(self, value): if type(value) not in [int, float]: return self._sampleTime = value def close(self): self.off() self.mqtt_disconnect() def _newsample(self): if not self.busy: return starttime = time() # print(f'_newsample: {self.topinstname}, {self.scopeChannel}') result = common.strcall(self.topinstname, self.scopeChannel) if result == "ERROR": self.logger.log_message(LogLevel.Error(), f"{self.instName} get ERROR from {self.scopeChannel} -->stop sampling") return # print(result) mytime = self._sampleTime - (time() - starttime) if mytime <= 0: mytime = self._minSampleTime self._samplethread = Timer(mytime, self._newsample) self._samplethread.start() @property def test(self): # print(f'call scope.sawtooth value= {self._sawtooth}') self._sawtooth += 1 if self._sawtooth > 256: self._sawtooth = 0 return self._sawtooth def __repr__(self): return f"{self.__class__}"
if __name__ == "__main__": from pylab_ml.common.mqtt_client import mylogger scope = Softscope(mqttc=None, logger=mylogger, instName="softscope")