Source code for panoptes.utils.serial.device

import operator
from collections import deque
from contextlib import suppress
from dataclasses import dataclass
from typing import Optional, Union, Callable

import serial
from loguru import logger
from serial.threaded import LineReader, ReaderThread
from serial.tools.list_ports import comports as get_comports

from panoptes.utils import error


[docs]@dataclass class SerialDeviceDefaults: """Dataclass for the serial port defaults. This can be instantiated with changed values and then passed to the serial device via the `apply_settings` method. Values are: write_timeout, inter_byte_timeout, dsrdtr, baudrate, timeout, parity, bytesize, rtscts, stopbits, xonxoff See https://pyserial.readthedocs.io/en/latest/pyserial_api.html#serial.Serial.get_settings .. doctest: >>> import serial >>> from panoptes.utils.serial.device import SerialDeviceDefaults >>> serial_settings = SerialDeviceDefaults(baudrate=115200) >>> serial_settings SerialDeviceDefaults(baudrate=115200, timeout=1.0, write_timeout=1.0, bytesize=8, parity='N', stopbits=1, xonxoff=False, rtscts=False, dsrdtr=False) >>> # Create a serial device and apply changed settings. >>> ser0 = serial.Serial() >>> serial_settings.write_timeout = 2.5 >>> serial_settings.baudrate = 9600 >>> ser0.apply_settings(serial_settings.to_dict()) >>> ser0.write_timeout 2.5 >>> ser0.baudrate 9600 """ baudrate: int = 9600 timeout: float = 1. write_timeout: float = 1. bytesize: int = serial.EIGHTBITS parity: str = serial.PARITY_NONE stopbits: int = serial.STOPBITS_ONE xonxoff: bool = False rtscts: bool = False dsrdtr: bool = False
[docs] def to_dict(self): """Return fields as dict.""" return {field: getattr(self, field) for field in self.__dataclass_fields__.keys()}
[docs]def get_serial_port_info(): """Returns the serial ports defined on the system sorted by device name. .. doctest:: >>> from panoptes.utils.serial.device import get_serial_port_info >>> devices = get_serial_port_info() >>> devices # doctest: +SKIP [<serial.tools.list_ports_linux.SysFS object at 0x7f6c9cbd9460>] >>> devices[0].hwid # doctest: +SKIP 'USB VID:PID=2886:802D SER=3C788B875337433838202020FF122204 LOCATION=3-5:1.0' Returns: a list of PySerial's ListPortInfo objects. See: https://github.com/pyserial/pyserial/blob/master/serial/tools/list_ports_common.py """ return sorted(get_comports(include_links=True), key=operator.attrgetter('device'))
[docs]def find_serial_port(vendor_id, product_id, return_all=False): # pragma: no cover """Finds the serial port that matches the given vendor and product id. .. doctest:: >>> from panoptes.utils.serial.device import find_serial_port >>> vendor_id = 0x2a03 # arduino >>> product_id = 0x0043 # Uno Rev 3 >>> find_serial_port(vendor_id, product_id) # doctest: +SKIP '/dev/ttyACM0' >>> # Raises error when not found. >>> find_serial_port(0x1234, 0x4321) Traceback (most recent call last): ... panoptes.utils.error.NotFound: NotFound: No serial ports... Args: vendor_id (int): The vendor id, can be hex or int. product_id (int): The product id, can be hex or int. return_all (bool): If more than one serial port matches, return all devices, default False. Returns: str or list: Either the path to the detected port or a list of all comports that match. """ # Get all serial ports. matched_ports = [p for p in get_serial_port_info() if p.vid == vendor_id and p.pid == product_id] if len(matched_ports) == 1: return matched_ports[0].device elif return_all: return matched_ports else: raise error.NotFound( f'No serial ports for vendor_id={vendor_id:x} and product_id={product_id:x}')
[docs]class SerialDevice(object): def __init__(self, port: str = None, name: str = None, reader_callback: Callable = None, serial_settings: Optional[Union[SerialDeviceDefaults, dict]] = None, reader_queue_size: int = 50, ): """A SerialDevice class with helper methods for serial communications. The device need not exist at the time this is called, in which case is_connected will be false. The serial device settings can be passed as a dictionary. See SerialDeviceDefaults for possible values. .. doctest:: >>> from panoptes.utils.serial.device import SerialDevice >>> dev0 = SerialDevice(port='loop://', name='My device') >>> dev0.is_connected True >>> str(dev0) 'My device on port=loop:// [9600/8-N-1]' >>> dev0.write('Hello World!') # doctest: +SKIP >>> len(dev0.readings) # doctest: +SKIP 1 >>> dev0.readings[0] == 'Hello World!' # doctest: +SKIP True >>> # We can also pass a custom callback. >>> from panoptes.utils.serializers import from_json, to_json >>> dev1 = SerialDevice(port='loop://', reader_callback=from_json) >>> str(dev1) 'SerialDevice loop:// [9600/8-N-1]' >>> dev1.write(to_json(dict(message='Hello JSON World!'))) # doctest: +SKIP >>> len(dev0.readings) # doctest: +SKIP 1 >>> dev0.readings[0] # doctest: +SKIP '{"message": "Hello JSON World!"}' Args: port (str): The port (e.g. /dev/tty123 or socket://host:port) to which to open a connection. name (str): Name of this object. Defaults to the name of the port. reader_callback (Callable): A callback from the reader thread. This should accept and return a single parameter. The return item will get appended to the `readings` deque. serial_settings (dict): The settings to apply to the serial device. See docstring for details. reader_queue_size (int, optional): The size of the deque for readings, default 50. Raises: ValueError: If the serial parameters are invalid (e.g. a negative baudrate). """ self.name = name or port self.readings = deque(maxlen=reader_queue_size) self.reader_thread = None self._reader_callback = reader_callback self.serial: serial.Serial = serial.serial_for_url(port) logger.debug(f'SerialDevice for {self.name} created. Connected={self.is_connected}') serial_settings = serial_settings or SerialDeviceDefaults() if isinstance(serial_settings, SerialDeviceDefaults): serial_settings = serial_settings.to_dict() logger.debug(f'Applying settings to serial class: {serial_settings!r}') self.serial.apply_settings(serial_settings) self._add_stream_reader() @property def port(self): """Name of the port.""" return self.serial.port @property def is_connected(self): """True if serial port is open, False otherwise.""" return self.serial.is_open
[docs] def connect(self): """Connect to device and add default reader.""" if not self.is_connected: self.serial.open() self._add_stream_reader()
[docs] def disconnect(self): """Disconnect from the device and reset the reader thread.""" with suppress(AttributeError): self.serial.close() self.reader_thread = None
[docs] def write(self, line): """Write to the serial device. Note that this expects unicode and will handle adding a newline at the end. """ return self.reader_thread.protocol.write_line(line)
def _add_stream_reader(self, callback=None): """Add a reader to the device.""" callback = callback or self._reader_callback # Set up a custom threaded reader class that calls user callback. class CustomReader(LineReader): # Use `this` so `self` still refers to device instance. def connection_made(this, transport): super(LineReader, this).connection_made(transport) def connection_lost(this, exc): logger.trace(f'Disconnected from {self}') def handle_line(this, data): try: if callback and callable(callback): data = callback(data) if data is not None: self.readings.append(data) except Exception as e: logger.trace(f'Error with callback: {e!r}') self.reader_thread = ReaderThread(self.serial, CustomReader) self.reader_thread.start() def __str__(self): if self.name == self.port: full_name = f'SerialDevice {self.name}' else: full_name = f'{self.name} on port={self.port}' with self.serial as s: serial_summary = f'{s.baudrate}/{s.bytesize}-{s.parity}-{s.stopbits}' return f'{full_name} [{serial_summary}]' def __del__(self): """Close the serial device on delete. This is to avoid leaving a file or device open if there are multiple references to the serial.Serial object. """ # If an exception is thrown when running __init__, then self.serial may not have # been set, in which case reading that field will generate a AttributeError. with suppress(AttributeError): # pragma: no cover if self.serial and self.serial.is_open: self.serial.close()