# Tweepy # Copyright 2009-2010 Joshua Roesslein # See LICENSE for details. import httplib from socket import timeout from threading import Thread from time import sleep import urllib from tweepy.models import Status from tweepy.api import API from tweepy.error import TweepError from tweepy.utils import import_simplejson json = import_simplejson() STREAM_VERSION = 1 class StreamListener(object): def __init__(self, api=None): self.api = api or API() def on_data(self, data): """Called when raw data is received from connection. Override this method if you wish to manually handle the stream data. Return False to stop stream and close connection. """ if 'in_reply_to_status_id' in data: status = Status.parse(self.api, json.loads(data)) if self.on_status(status) is False: return False elif 'delete' in data: delete = json.loads(data)['delete']['status'] if self.on_delete(delete['id'], delete['user_id']) is False: return False elif 'limit' in data: if self.on_limit(json.loads(data)['limit']['track']) is False: return False def on_status(self, status): """Called when a new status arrives""" return def on_delete(self, status_id, user_id): """Called when a delete notice arrives for a status""" return def on_limit(self, track): """Called when a limitation notice arrvies""" return def on_error(self, status_code): """Called when a non-200 status code is returned""" return False def on_timeout(self): """Called when stream connection times out""" return class Stream(object): host = 'stream.twitter.com' def __init__(self, auth, listener, **options): self.auth = auth self.listener = listener self.running = False self.timeout = options.get("timeout", 300.0) self.retry_count = options.get("retry_count") self.retry_time = options.get("retry_time", 10.0) self.snooze_time = options.get("snooze_time", 5.0) self.buffer_size = options.get("buffer_size", 1500) if options.get("secure"): self.scheme = "https" else: self.scheme = "http" self.api = API() self.headers = options.get("headers") or {} self.parameters = None self.body = None def _run(self): # Authenticate url = "%s://%s%s" % (self.scheme, self.host, self.url) # Connect and process the stream error_counter = 0 conn = None exception = None while self.running: if self.retry_count is not None and error_counter > self.retry_count: # quit if error count greater than retry count break try: if self.scheme == "http": conn = httplib.HTTPConnection(self.host) else: conn = httplib.HTTPSConnection(self.host) self.auth.apply_auth(url, 'POST', self.headers, self.parameters) conn.connect() conn.sock.settimeout(self.timeout) conn.request('POST', self.url, self.body, headers=self.headers) resp = conn.getresponse() if resp.status != 200: if self.listener.on_error(resp.status) is False: break error_counter += 1 sleep(self.retry_time) else: error_counter = 0 self._read_loop(resp) except timeout: if self.listener.on_timeout() == False: break if self.running is False: break conn.close() sleep(self.snooze_time) except Exception, exception: # any other exception is fatal, so kill loop break # cleanup self.running = False if conn: conn.close() if exception: raise def _read_loop(self, resp): while self.running: if resp.isclosed(): break # read length data = '' while True: c = resp.read(1) if c == '\n': break data += c data = data.strip() # read data and pass into listener if self.listener.on_data(data) is False: self.running = False def _start(self, async): self.running = True if async: Thread(target=self._run).start() else: self._run() def userstream(self, count=None, async=False, secure=True): if self.running: raise TweepError('Stream object already connected!') self.url = '/2/user.json' self.host='userstream.twitter.com' if count: self.url += '&count=%s' % count self._start(async) def firehose(self, count=None, async=False): self.parameters = {'delimited': 'length'} if self.running: raise TweepError('Stream object already connected!') self.url = '/%i/statuses/firehose.json?delimited=length' % STREAM_VERSION if count: self.url += '&count=%s' % count self._start(async) def retweet(self, async=False): self.parameters = {'delimited': 'length'} if self.running: raise TweepError('Stream object already connected!') self.url = '/%i/statuses/retweet.json?delimited=length' % STREAM_VERSION self._start(async) def sample(self, count=None, async=False): self.parameters = {'delimited': 'length'} if self.running: raise TweepError('Stream object already connected!') self.url = '/%i/statuses/sample.json?delimited=length' % STREAM_VERSION if count: self.url += '&count=%s' % count self._start(async) def filter(self, follow=None, track=None, async=False, locations=None, count = None): self.parameters = {} self.headers['Content-type'] = "application/x-www-form-urlencoded" if self.running: raise TweepError('Stream object already connected!') self.url = '/%i/statuses/filter.json?delimited=length' % STREAM_VERSION if follow: self.parameters['follow'] = ','.join(map(str, follow)) if track: self.parameters['track'] = ','.join(map(str, track)) if locations and len(locations) > 0: assert len(locations) % 4 == 0 self.parameters['locations'] = ','.join(['%.2f' % l for l in locations]) if count: self.parameters['count'] = count self.body = urllib.urlencode(self.parameters) self.parameters['delimited'] = 'length' self._start(async) def disconnect(self): if self.running is False: return self.running = False