test_streaming.py 4.11 KB
Newer Older
Joshua Roesslein's avatar
Joshua Roesslein committed
1
from time import sleep
2
import unittest2 as unittest
Joshua Roesslein's avatar
Joshua Roesslein committed
3
4

from tweepy.api import API
5
from tweepy.auth import OAuthHandler
6
from tweepy.models import Status
Joshua Roesslein's avatar
Joshua Roesslein committed
7
8
9
from tweepy.streaming import Stream, StreamListener

from config import create_auth
10
11
from test_utils import mock_tweet
from mock import MagicMock, patch
Joshua Roesslein's avatar
Joshua Roesslein committed
12
13

class MockStreamListener(StreamListener):
14
    def __init__(self, test_case):
Joshua Roesslein's avatar
Joshua Roesslein committed
15
        super(MockStreamListener, self).__init__()
16
        self.test_case = test_case
Joshua Roesslein's avatar
Joshua Roesslein committed
17
        self.status_count = 0
18
19
20
21
22
23
        self.status_stop_count = 0
        self.connect_cb = None

    def on_connect(self):
        if self.connect_cb:
            self.connect_cb()
Joshua Roesslein's avatar
Joshua Roesslein committed
24

25
26
27
28
    def on_timeout(self):
        self.test_case.fail('timeout')
        return False

29
30
31
32
    def on_error(self, code):
        print "response: %s" % code
        return True

Joshua Roesslein's avatar
Joshua Roesslein committed
33
34
    def on_status(self, status):
        self.status_count += 1
35
36
37
        self.test_case.assertIsInstance(status, Status)
        if self.status_stop_count == self.status_count:
            return False
Joshua Roesslein's avatar
Joshua Roesslein committed
38
39
40
41

class TweepyStreamTests(unittest.TestCase):
    def setUp(self):
        self.auth = create_auth()
42
        self.listener = MockStreamListener(self)
43
        self.stream = Stream(self.auth, self.listener, timeout=3.0)
Joshua Roesslein's avatar
Joshua Roesslein committed
44
45
46
47
48
49

    def tearDown(self):
        self.stream.disconnect()

    def test_userstream(self):
        # Generate random tweet which should show up in the stream.
50
51
        def on_connect():
            API(self.auth).update_status(mock_tweet())
Joshua Roesslein's avatar
Joshua Roesslein committed
52

53
54
55
        self.listener.connect_cb = on_connect
        self.listener.status_stop_count = 1
        self.stream.userstream()
Joshua Roesslein's avatar
Joshua Roesslein committed
56
57
        self.assertEqual(self.listener.status_count, 1)

58
59
60
61
62
63
    def test_sample(self):
        self.listener.status_stop_count = 10
        self.stream.sample()
        self.assertEquals(self.listener.status_count,
                          self.listener.status_stop_count)

64
65
66
67
68
69
70
    def test_filter_track(self):
        self.listener.status_stop_count = 5
        phrases = ['twitter']
        self.stream.filter(track=phrases)
        self.assertEquals(self.listener.status_count,
                          self.listener.status_stop_count)

71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
    def test_track_encoding(self):
        s = Stream(None, None)
        s._start = lambda async: None
        s.filter(track=[u'Caf\xe9'])

        # Should be UTF-8 encoded
        self.assertEqual(u'Caf\xe9'.encode('utf8'), s.parameters['track'])

    def test_follow_encoding(self):
        s = Stream(None, None)
        s._start = lambda async: None
        s.filter(follow=[u'Caf\xe9'])

        # Should be UTF-8 encoded
        self.assertEqual(u'Caf\xe9'.encode('utf8'), s.parameters['follow'])
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120

class TweepyStreamBackoffTests(unittest.TestCase):
    def setUp(self):
        #bad auth causes twitter to return 401 errors
        self.auth = OAuthHandler("bad-key", "bad-secret")
        self.auth.set_access_token("bad-token", "bad-token-secret")
        self.listener = MockStreamListener(self)
        self.stream = Stream(self.auth, self.listener)

    def tearDown(self):
        self.stream.disconnect()

    def test_exp_backoff(self):
        self.stream = Stream(self.auth, self.listener, timeout=3.0,
                             retry_count=1, retry_time=1.0, retry_time_cap=100.0)
        self.stream.sample()
        # 1 retry, should be 4x the retry_time
        self.assertEqual(self.stream.retry_time, 4.0)

    def test_exp_backoff_cap(self):
        self.stream = Stream(self.auth, self.listener, timeout=3.0,
                             retry_count=1, retry_time=1.0, retry_time_cap=3.0)
        self.stream.sample()
        # 1 retry, but 4x the retry_time exceeds the cap, so should be capped
        self.assertEqual(self.stream.retry_time, 3.0)

    mock_resp = MagicMock()
    mock_resp.return_value.status = 420
    @patch('httplib.HTTPConnection.getresponse', mock_resp)
    def test_420(self):
        self.stream = Stream(self.auth, self.listener, timeout=3.0, retry_count=0,
                             retry_time=1.0, retry_420=1.5, retry_time_cap=20.0)
        self.stream.sample()
        # no retries, but error 420, should be double the retry_420, not double the retry_time
        self.assertEqual(self.stream.retry_time, 3.0)