Source code for panoptes.utils.serial_handlers.protocol_buffers

# This module implements a handler for serial_for_url("buffers://").

import io
import threading

from panoptes.utils.serial_handlers import NoOpSerial
from serial import serialutil

# r_buffer and w_buffer are binary I/O buffers. read(size=N) on an instance
# of Serial reads the next N bytes from r_buffer, and write(data) appends the
# bytes of data to w_buffer.
# NOTE: The caller (a test) is responsible for resetting buffers before tests.
_r_buffer = None
_w_buffer = None

# The above I/O buffers are not thread safe, so we need to lock them during
# access.
_r_lock = threading.Lock()
_w_lock = threading.Lock()


[docs]def ResetBuffers(read_data=None): SetRBufferValue(read_data) with _w_lock: global _w_buffer _w_buffer = io.BytesIO()
[docs]def SetRBufferValue(data): """Sets the r buffer to data (a bytes object).""" if data and not isinstance(data, (bytes, bytearray)): raise TypeError("data must by a bytes or bytearray object.") with _r_lock: global _r_buffer _r_buffer = io.BytesIO(data)
[docs]def GetWBufferValue(): """Returns an immutable bytes object with the value of the w buffer.""" with _w_lock: if _w_buffer: return _w_buffer.getvalue()
[docs]class BuffersSerial(NoOpSerial): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @property def in_waiting(self): if not self.is_open: raise serialutil.portNotOpenError with _r_lock: return len(_r_buffer.getbuffer()) - _r_buffer.tell()
[docs] def read(self, size=1): """Read size bytes. If a timeout is set it may return fewer characters than requested. With no timeout it will block until the requested number of bytes is read. Args: size: Number of bytes to read. Returns: Bytes read from the port, of type 'bytes'. Raises: SerialTimeoutException: In case a write timeout is configured for the port and the time is exceeded. """ if not self.is_open: raise serialutil.portNotOpenError with _r_lock: # TODO(jamessynge): Figure out whether and how to handle timeout. # We might choose to generate a timeout if the caller asks for data # beyond the end of the buffer; or simply return what is left, # including nothing (i.e. bytes()) if there is nothing left. return _r_buffer.read(size)
[docs] def write(self, data): """ Args: data: The data to write. Returns: Number of bytes written. Raises: SerialTimeoutException: In case a write timeout is configured for the port and the time is exceeded. """ if not isinstance(data, (bytes, bytearray)): raise TypeError("data must by a bytes or bytearray object.") if not self.is_open: raise serialutil.portNotOpenError with _w_lock: return _w_buffer.write(data)
Serial = BuffersSerial