Source code for pylab_ml.scope.lecroy.vicp

"""
This script provides an interface to the LeCroy Oscilloscopes using the VICP protocol.

:Date: |today|
:Author: Semi-ATE <info@Semi-ATE.org>

"""
import socket
import struct


[docs] class OP(): DATA = 0b10000000 REMOTE = 0b01000000 LOCKOUT = 0b00100000 CLEAR = 0b00010000 SQR = 0b00001000 SERIAL_POLL = 0b00000100 Reserved = 0b00000010 EOI = 0b00000001
[docs] class VICP(): """ Interface to LeCroy Oscilloscopes using the VICP protocol. """
[docs] def __init__(self, addr='LCRY3703N15966', port=1861, timeout=5, debug=False): """ Initialize the VICP interface. Parameters ---------- addr : str The IP address or hostname of the LeCroy Oscilloscope. port : int The TCP/IP port of the LeCroy Oscilloscope (default is 1861). timeout : float The timeout for socket operations in seconds (default is 5). debug : bool If True, enables debug output (default is False). """ self.debug = debug self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.addr = addr self.s.connect((self.addr, port))
def __del__(self): """ Ensure the socket is closed when the object is deleted. """ self.s.close()
[docs] def write_raw(self, data): """ Write raw bytes to the VICP interface. Parameters ---------- data : bytes The raw bytes to send to the VICP interface. """ if self.debug: print(' write_raw:{!r}'.format(data)) header = [0] * 4 header[0] = OP.DATA + OP.EOI header[1] = 1 # header version header[2] = 0 # sequence number header[3] = 0 # unused header = bytes(header) + struct.pack('>I', len(data)) self.s.send(header + bytes(data))
[docs] def write(self, data, term=''): """ Write a string to the VICP interface, optionally with a termination string. Parameters ---------- data : str The string to send to the VICP interface. term : str An optional termination string to append to the data (default is ''). """ data += term self.write_raw(data.encode())
[docs] def read_bytes(self, size=1): """ Read a specified number of bytes from the VICP interface. Parameters ---------- size : int The number of bytes to read (default is 1). Returns ------- data : bytes The bytes read from the VICP interface. """ data = b'' while len(data) < size: data += self.s.recv(size - len(data)) if self.debug: print(' read_bytes: {} / {}'.format(len(data), size)) return data
[docs] def read_chunk(self, size=None): """ Read a chunk of data from the VICP interface, optionally specifying the size. Parameters ---------- size : int The number of bytes to read (if None, the size is determined from the header). Returns ------- data : bytes The chunk of data read from the VICP interface. """ header = self.read_bytes(8) if size is None: size = struct.unpack('>I', header[4:])[0] if self.debug: print(' read_chunk: {!r} => size = {}'.format(header, size)) data = self.read_bytes(size) return data
[docs] def read_raw(self): """ Read raw data from the VICP interface until a newline character is encountered. Returns ------- data : bytes The raw data read from the VICP interface. """ chunks = [] while True: chunk = self.read_chunk() chunks.append(chunk) if chunk.endswith(b'\n'): break return b''.join(chunks)
[docs] def read(self): """ Read a string from the VICP interface until a newline character is encountered. Returns ------- data : str The string read from the VICP interface. """ return self.read_raw().decode().rstrip('\n')
[docs] def query(self, data): """ Send a query to the VICP interface and read the response. Parameters ---------- data : str The query string to send to the VICP interface. Returns ------- str The response string read from the VICP interface. """ self.write(data) return self.read()
[docs] def close(self): """ Close the VICP interface. """ self.s.close()