Source code for panoptes.utils.theskyx

import socket

from panoptes.utils import error
from panoptes.utils.logging import logger


[docs]class TheSkyX(object): """ A socket connection for communicating with TheSkyX """ def __init__(self, host='localhost', port=3040, connect=True, *args, **kwargs): self.logger = logger self._host = host self._port = port self._socket = None self._is_connected = False if connect: self.connect() @property def is_connected(self): return self._is_connected
[docs] def connect(self): """ Sets up serial connection """ self.logger.debug('Making TheSkyX connection at {}:{}'.format(self._host, self._port)) if not self.is_connected: try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.connect((self._host, self._port)) except ConnectionRefusedError: self.logger.warning('Cannot create connection to TheSkyX') else: self._is_connected = True self.logger.info('Connected to TheSkyX via {}:{}'.format(self._host, self._port))
[docs] def write(self, value): try: assert isinstance(value, str) self.socket.sendall(value.encode()) except AttributeError: raise error.BadConnection("Not connected to TheSkyX")
[docs] def read(self, timeout=5): try: self.socket.settimeout(timeout) response = None err = None try: response = self.socket.recv(2048).decode() if '|' in response: response, err = response.split('|') if 'Error:' in response: response, err = response.split(':') if err is not None and 'No error' not in err: if 'Error = 303' in err: raise error.TheSkyXKeyError("Invalid TheSkyX key") raise error.TheSkyXError(err) except socket.timeout: # pragma: no cover raise error.TheSkyXTimeout() return response except AttributeError: raise error.BadConnection("Not connected to TheSkyX")