Commit dcbd075e authored by Josh Roesslein's avatar Josh Roesslein
Browse files

Update stream API to use new API method URLs. Added StreamListener to replace callback function.

parent 0012f922
......@@ -13,7 +13,11 @@ during upgrade will be listed here.
+ added new() method. shortcut for setting up new API instances
example: API.new(auth='basic', username='testuser', password='testpass')
+ update_profile_image() and update_profile_background_image() method added.
+ Streaming:
+ Update to new streaming API methods
+ New StreamListener class replacing callback function
+ Fixes
+ User.following is now set to False instead of None
when user is not followed.
+ python 2.5 import syntax error fixed
+ python 2.5 timeout support for streaming API
......@@ -5,32 +5,38 @@ from getpass import getpass
import tweepy
def callback(t, stream_object):
if t == 'status':
print stream_object.text
elif t == 'delete':
print 'delete!!! id = %s' % stream_object['id']
elif t == 'limit':
print 'limit!!! track=%s' % stream_object['track']
class StreamWatcherListener(tweepy.StreamListener):
def on_status(self, status):
print status.text
def on_error(self, status_code):
print 'An error has occured! Status code = %s' % status_code
return True # keep stream alive
# Prompt for login credentials and setup stream object
username = raw_input('Twitter username: ')
password = getpass('Twitter password: ')
stream = tweepy.Stream(username, password, callback)
stream = tweepy.Stream(username, password, StreamWatcherListener())
# Prompt for mode of streaming and connect
while True:
mode = raw_input('Mode? [spritzer/follow/track] ')
if mode == 'spritzer':
stream.spritzer()
break
elif mode == 'follow':
follow_list = raw_input('Users to follow (comma separated): ')
stream.follow(follow_list)
mode = raw_input('Mode? [sample/filter] ')
if mode == 'sample':
stream.sample()
break
elif mode == 'track':
track_list = raw_input('Keywords to track (comma separated): ')
stream.track(track_list)
elif mode == 'filter':
follow_list = raw_input('Users to follow (comma separated): ').strip()
track_list = raw_input('Keywords to track (comma seperated): ').strip()
if follow_list:
follow_list = [u for u in follow_list.split(',')]
else:
follow_list = None
if track_list:
track_list = [k for k in track_list.split(',')]
else:
track_list = None
stream.filter(follow_list, track_list)
break
else:
print 'Invalid choice! Try again.'
......
......@@ -12,7 +12,7 @@ from . error import TweepError
from . api import API
from . cache import Cache, MemoryCache, FileCache, MemCache
from . auth import BasicAuthHandler, OAuthHandler
from . streaming import Stream
from . streaming import Stream, StreamListener
# Global, unauthenticated instance of API
api = API()
......@@ -17,12 +17,32 @@ try:
except ImportError:
import simplejson as json
STREAM_VERSION = 1
class StreamListener(object):
def on_status(self, status):
"""Called when a new status arrives"""
return
def on_delete(self, status_id, user_id):
"""Called when a delete notice arrives for a status"""
return
def on_limit(self, track):
"""Called when a limitation notice arrvies"""
return
def on_error(self, status_code):
"""Called when a non-200 status code is returned"""
return False
class Stream(object):
host = 'stream.twitter.com'
def __init__(self, username, password, callback, timeout=2.0, retry_count = 3,
retry_time = 3.0, snooze_time = 10.0, buffer_size=1500):
def __init__(self, username, password, listener, timeout=5.0, retry_count = None,
retry_time = 10.0, snooze_time = 5.0, buffer_size=1500):
self.auth = BasicAuthHandler(username, password)
self.running = False
self.timeout = timeout
......@@ -30,7 +50,7 @@ class Stream(object):
self.retry_time = retry_time
self.snooze_time = snooze_time
self.buffer_size = buffer_size
self.callback = callback
self.listener = listener
self.api = API()
def _run(self):
......@@ -42,7 +62,7 @@ class Stream(object):
error_counter = 0
conn = None
while self.running:
if error_counter > self.retry_count:
if self.retry_count and error_counter > self.retry_count:
# quit if error count greater than retry count
break
try:
......@@ -52,12 +72,15 @@ class Stream(object):
conn.request('POST', self.url, headers=headers)
resp = conn.getresponse()
if resp.status != 200:
if self.listener.on_error(resp.status) is False:
break
error_counter += 1
sleep(self.retry_time)
else:
error_counter = 0
self._read_loop(resp)
except timeout:
print 'timeout!'
if self.running is False:
break
conn.close()
......@@ -96,68 +119,40 @@ class Stream(object):
# turn json data into status object
if 'in_reply_to_status_id' in data:
status = parse_status(data, self.api)
self.callback('status', status)
self.listener.on_status(status)
elif 'delete' in data:
self.callback('delete', json.loads(data)['delete']['status'])
delete = json.loads(data)['delete']['status']
self.listener.on_delete(delete['id'], delete['user_id'])
elif 'limit' in data:
self.callback('limit', json.loads(data)['limit'])
self.listener.on_limit(json.loads(data)['limit']['track'])
def firehose(self, count=None, ):
if self.running:
raise TweepError('Stream object already connected!')
self.url = '/firehose.json?delimited=length'
if count:
self.url += '&count=%s' % count
self.running = True
Thread(target=self._run).start()
def gardenhose(self):
if self.running:
raise TweepError('Stream object already connected!')
self.url = '/gardenhose.json?delimited=length'
self.running = True
Thread(target=self._run).start()
def birddog(self, follow, count=None):
if self.running:
raise TweepError('Stream object already connected!')
self.url = '/birddog.json?delimited=length&follow=%s' % str(follow).strip('[]').replace(' ', '')
self.url = '/%i/statuses/firehose.json?delimited=length' % STREAM_VERSION
if count:
self.url += '&count=%s' % count
self.running = True
Thread(target=self._run).start()
def shadow(self, follow, count=None):
def sample(self, count=None):
if self.running:
raise TweepError('Stream object already connected!')
self.url = '/shadow.json?delimited=length&follow=%s' % str(follow).strip('[]').replace(' ', '')
self.url = '/%i/statuses/sample.json?delimited=length' % STREAM_VERSION
if count:
self.url += '&count=%s' % count
self.running = True
Thread(target=self._run).start()
def spritzer(self):
if self.running:
raise TweepError('Stream object already connected!')
self.url = '/spritzer.json?delimited=length'
self.running = True
Thread(target=self._run).start()
def follow(self, follow=None):
def filter(self, follow=None, track=None):
if self.running:
raise TweepError('Stream object already connected!')
self.url = '/follow.json?delimited=length'
self.url = '/%i/statuses/filter.json?delimited=length' % STREAM_VERSION
if follow:
self.url += '&follow=%s' % str(follow).strip('[]').replace(' ', '')
self.running = True
Thread(target=self._run).start()
def track(self, track=None):
if self.running:
raise TweepError('Stream object already connected!')
self.url = '/track.json?delimited=length'
self.url += '&follow=%s' % ','.join(follow)
if track:
self.url += '&track=%s' % str(track).strip('[]').replace(' ', '')
self.url += '&track=%s' % ','.join(track)
print self.url
self.running = True
Thread(target=self._run).start()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment