Source code for panoptes.utils.social.twitter

import tweepy
from loguru import logger


[docs]class SocialTwitter(object): """Social Messaging sink to output to Twitter.""" def __init__(self, **kwargs): consumer_key = kwargs.get('consumer_key', '') if consumer_key == '': raise ValueError('consumer_key parameter is not defined.') consumer_secret = kwargs.get('consumer_secret', '') if consumer_secret == '': raise ValueError('consumer_secret parameter is not defined.') access_token = kwargs.get('access_token', '') if access_token == '': raise ValueError('access_token parameter is not defined.') access_token_secret = kwargs.get('access_token_secret', '') if access_token_secret == '': raise ValueError('access_token_secret parameter is not defined.') # Output timestamp should always be True by default otherwise Twitter will reject duplicate statuses. self.output_timestamp = kwargs.get("output_timestamp", True) # Create a new twitter api object try: auth = tweepy.OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_token_secret) self.api = tweepy.API(auth) except tweepy.TweepError: # pragma: no cover msg = 'Error authenicating with Twitter. Please check your Twitter configuration.' logger.warning(msg) raise ValueError(msg)
[docs] def send_message(self, msg, timestamp): try: # update_status returns a tweepy Status instance, but we # drop it on the floor because we don't have anything we # can do with it. if self.output_timestamp: self.api.update_status('{} - {}'.format(msg, timestamp)) else: self.api.update_status(msg) except tweepy.TweepError: # pragma: no cover logger.debug('Error tweeting message. Please check your Twitter configuration.')