Commit a592ff55 authored by Aaron Hill's avatar Aaron Hill
Browse files

Merge branch 'backoff' of https://github.com/tewalds/tweepy into tewalds-backoff

Conflicts:
	test_requirements.txt
	tweepy/streaming.py
parents 81658a99 e44bf2a3
import random
import string
def mock_tweet():
"""Generate some random tweet text."""
count = random.randint(70, 140)
return ''.join([random.choice(string.letters) for i in xrange(count)])
...@@ -2,11 +2,13 @@ from time import sleep ...@@ -2,11 +2,13 @@ from time import sleep
import unittest2 as unittest import unittest2 as unittest
from tweepy.api import API from tweepy.api import API
from tweepy.auth import OAuthHandler
from tweepy.models import Status from tweepy.models import Status
from tweepy.streaming import Stream, StreamListener from tweepy.streaming import Stream, StreamListener
from config import create_auth from config import create_auth
from mock import mock_tweet from test_utils import mock_tweet
from mock import MagicMock, patch
class MockStreamListener(StreamListener): class MockStreamListener(StreamListener):
def __init__(self, test_case): def __init__(self, test_case):
...@@ -24,6 +26,10 @@ class MockStreamListener(StreamListener): ...@@ -24,6 +26,10 @@ class MockStreamListener(StreamListener):
self.test_case.fail('timeout') self.test_case.fail('timeout')
return False return False
def on_error(self, code):
print "response: %s" % code
return True
def on_status(self, status): def on_status(self, status):
self.status_count += 1 self.status_count += 1
self.test_case.assertIsInstance(status, Status) self.test_case.assertIsInstance(status, Status)
...@@ -62,3 +68,38 @@ class TweepyStreamTests(unittest.TestCase): ...@@ -62,3 +68,38 @@ class TweepyStreamTests(unittest.TestCase):
self.assertEquals(self.listener.status_count, self.assertEquals(self.listener.status_count,
self.listener.status_stop_count) self.listener.status_stop_count)
class TweepyStreamBackoffTests(unittest.TestCase):
def setUp(self):
#bad auth causes twitter to return 401 errors
self.auth = OAuthHandler("bad-key", "bad-secret")
self.auth.set_access_token("bad-token", "bad-token-secret")
self.listener = MockStreamListener(self)
self.stream = Stream(self.auth, self.listener)
def tearDown(self):
self.stream.disconnect()
def test_exp_backoff(self):
self.stream = Stream(self.auth, self.listener, timeout=3.0,
retry_count=1, retry_time=1.0, retry_time_cap=100.0)
self.stream.sample()
# 1 retry, should be 4x the retry_time
self.assertEqual(self.stream.retry_time, 4.0)
def test_exp_backoff_cap(self):
self.stream = Stream(self.auth, self.listener, timeout=3.0,
retry_count=1, retry_time=1.0, retry_time_cap=3.0)
self.stream.sample()
# 1 retry, but 4x the retry_time exceeds the cap, so should be capped
self.assertEqual(self.stream.retry_time, 3.0)
mock_resp = MagicMock()
mock_resp.return_value.status = 420
@patch('httplib.HTTPConnection.getresponse', mock_resp)
def test_420(self):
self.stream = Stream(self.auth, self.listener, timeout=3.0, retry_count=0,
retry_time=1.0, retry_420=1.5, retry_time_cap=20.0)
self.stream.sample()
# no retries, but error 420, should be double the retry_420, not double the retry_time
self.assertEqual(self.stream.retry_time, 3.0)
...@@ -2,6 +2,15 @@ from unittest2 import TestCase ...@@ -2,6 +2,15 @@ from unittest2 import TestCase
from tweepy.utils import * from tweepy.utils import *
import random
import string
def mock_tweet():
"""Generate some random tweet text."""
count = random.randint(70, 140)
return ''.join([random.choice(string.letters) for i in xrange(count)])
class TweepyUtilsTests(TestCase): class TweepyUtilsTests(TestCase):
def testparse_datetime(self): def testparse_datetime(self):
......
...@@ -117,10 +117,12 @@ class Stream(object): ...@@ -117,10 +117,12 @@ class Stream(object):
self.running = False self.running = False
self.timeout = options.get("timeout", 300.0) self.timeout = options.get("timeout", 300.0)
self.retry_count = options.get("retry_count") self.retry_count = options.get("retry_count")
self.retry_time_start = options.get("retry_time", 10.0) # values according to https://dev.twitter.com/docs/streaming-apis/connecting#Reconnecting
self.retry_time_cap = options.get("retry_time_cap", 240.0) self.retry_time_start = options.get("retry_time", 5.0)
self.snooze_time_start = options.get("snooze_time", 0.25) self.retry_420_start = options.get("retry_420", 60.0)
self.snooze_time_cap = options.get("snooze_time_cap", 16) self.retry_time_cap = options.get("retry_time_cap", 320.0)
self.snooze_time_step = options.get("snooze_time", 0.25)
self.snooze_time_cap = options.get("snooze_time_cap", 16)
self.buffer_size = options.get("buffer_size", 1500) self.buffer_size = options.get("buffer_size", 1500)
if options.get("secure", True): if options.get("secure", True):
self.scheme = "https" self.scheme = "https"
...@@ -132,7 +134,7 @@ class Stream(object): ...@@ -132,7 +134,7 @@ class Stream(object):
self.parameters = None self.parameters = None
self.body = None self.body = None
self.retry_time = self.retry_time_start self.retry_time = self.retry_time_start
self.snooze_time = self.snooze_time_start self.snooze_time = self.snooze_time_step
def _run(self): def _run(self):
# Authenticate # Authenticate
...@@ -159,12 +161,14 @@ class Stream(object): ...@@ -159,12 +161,14 @@ class Stream(object):
if self.listener.on_error(resp.status) is False: if self.listener.on_error(resp.status) is False:
break break
error_counter += 1 error_counter += 1
if resp.status == 420:
self.retry_time = max(self.retry_420_start, self.retry_time)
sleep(self.retry_time) sleep(self.retry_time)
self.retry_time = min(self.retry_time * 2, self.retry_time_cap) self.retry_time = min(self.retry_time * 2, self.retry_time_cap)
else: else:
error_counter = 0 error_counter = 0
self.retry_time = self.retry_time_start self.retry_time = self.retry_time_start
self.snooze_time = self.snooze_time_start self.snooze_time = self.snooze_time_step
self.listener.on_connect() self.listener.on_connect()
self._read_loop(resp) self._read_loop(resp)
except (timeout, ssl.SSLError), exc: except (timeout, ssl.SSLError), exc:
...@@ -179,7 +183,8 @@ class Stream(object): ...@@ -179,7 +183,8 @@ class Stream(object):
break break
conn.close() conn.close()
sleep(self.snooze_time) sleep(self.snooze_time)
self.snooze_time = min(self.snooze_time+0.25, self.snooze_time_cap) self.snooze_time = min(self.snooze_time + self.snooze_time_step,
self.snooze_time_cap)
except Exception, exception: except Exception, exception:
# any other exception is fatal, so kill loop # any other exception is fatal, so kill loop
break break
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment