test_streaming.py 7.74 KB
Newer Older
Mark Smith's avatar
Mark Smith committed
1
2
3
4
5
6
7
8
9
10
11
from __future__ import absolute_import, print_function

from .config import tape

import six
if six.PY3:
    import unittest
    from unittest.case import skip
else:
    import unittest2 as unittest
    from unittest2.case import skip
Joshua Roesslein's avatar
Joshua Roesslein committed
12
13

from tweepy.api import API
14
from tweepy.auth import OAuthHandler
15
from tweepy.models import Status
Timo Ewalds's avatar
Timo Ewalds committed
16
from tweepy.streaming import Stream, StreamListener, ReadBuffer
Joshua Roesslein's avatar
Joshua Roesslein committed
17

Mark Smith's avatar
Mark Smith committed
18
19
from .config import create_auth
from .test_utils import mock_tweet
20
from mock import MagicMock, patch
Joshua Roesslein's avatar
Joshua Roesslein committed
21

Mark Smith's avatar
Mark Smith committed
22
23
24
25
26
27
28

if six.PY3:
    getresponse_location = 'http.client.HTTPConnection.getresponse'
else:
    getresponse_location = 'httplib.HTTPConnection.getresponse'


Joshua Roesslein's avatar
Joshua Roesslein committed
29
class MockStreamListener(StreamListener):
30
    def __init__(self, test_case):
Joshua Roesslein's avatar
Joshua Roesslein committed
31
        super(MockStreamListener, self).__init__()
32
        self.test_case = test_case
Joshua Roesslein's avatar
Joshua Roesslein committed
33
        self.status_count = 0
34
35
36
37
38
39
        self.status_stop_count = 0
        self.connect_cb = None

    def on_connect(self):
        if self.connect_cb:
            self.connect_cb()
Joshua Roesslein's avatar
Joshua Roesslein committed
40

41
42
43
44
    def on_timeout(self):
        self.test_case.fail('timeout')
        return False

45
    def on_error(self, code):
46
        print("response: %s" % code)
47
48
        return True

Joshua Roesslein's avatar
Joshua Roesslein committed
49
50
    def on_status(self, status):
        self.status_count += 1
51
52
53
        self.test_case.assertIsInstance(status, Status)
        if self.status_stop_count == self.status_count:
            return False
Joshua Roesslein's avatar
Joshua Roesslein committed
54

Mark Smith's avatar
Mark Smith committed
55

Joshua Roesslein's avatar
Joshua Roesslein committed
56
57
58
class TweepyStreamTests(unittest.TestCase):
    def setUp(self):
        self.auth = create_auth()
59
        self.listener = MockStreamListener(self)
60
        self.stream = Stream(self.auth, self.listener, timeout=3.0)
Joshua Roesslein's avatar
Joshua Roesslein committed
61
62
63
64

    def tearDown(self):
        self.stream.disconnect()

Mark Smith's avatar
Mark Smith committed
65
    def on_connect(self):
Aaron Hill's avatar
Aaron Hill committed
66
67
        API(self.auth).update_status(mock_tweet())

Joshua Roesslein's avatar
Joshua Roesslein committed
68
69
    def test_userstream(self):
        # Generate random tweet which should show up in the stream.
Mark Smith's avatar
Mark Smith committed
70

Aaron Hill's avatar
Aaron Hill committed
71
        self.listener.connect_cb = self.on_connect
72
73
        self.listener.status_stop_count = 1
        self.stream.userstream()
Joshua Roesslein's avatar
Joshua Roesslein committed
74
        self.assertEqual(self.listener.status_count, 1)
Mark Smith's avatar
Mark Smith committed
75
76

    @skip("Sitestream only available to whitelisted accounts.")
Aaron Hill's avatar
Aaron Hill committed
77
78
79
    def test_sitestream(self):
        self.listener.connect_cb = self.on_connect
        self.listener.status_stop_count = 1
Mark Smith's avatar
Mark Smith committed
80
        self.stream.sitestream(follow=[self.auth.get_username()])
Aaron Hill's avatar
Aaron Hill committed
81
        self.assertEqual(self.listener.status_count, 1)
Joshua Roesslein's avatar
Joshua Roesslein committed
82

83
84
85
86
87
88
89
90
91
92
    def test_userstream_with_params(self):
        # Generate random tweet which should show up in the stream.
        def on_connect():
            API(self.auth).update_status(mock_tweet())

        self.listener.connect_cb = on_connect
        self.listener.status_stop_count = 1
        self.stream.userstream(_with='user', replies='all', stall_warnings=True)
        self.assertEqual(self.listener.status_count, 1)

93
94
95
96
97
98
    def test_sample(self):
        self.listener.status_stop_count = 10
        self.stream.sample()
        self.assertEquals(self.listener.status_count,
                          self.listener.status_stop_count)

99
100
101
102
103
104
105
    def test_filter_track(self):
        self.listener.status_stop_count = 5
        phrases = ['twitter']
        self.stream.filter(track=phrases)
        self.assertEquals(self.listener.status_count,
                          self.listener.status_stop_count)

106
107
108
109
110
111
    def test_track_encoding(self):
        s = Stream(None, None)
        s._start = lambda async: None
        s.filter(track=[u'Caf\xe9'])

        # Should be UTF-8 encoded
Aaron Hill's avatar
Aaron Hill committed
112
        self.assertEqual(u'Caf\xe9'.encode('utf8'), s.body['track'])
113
114
115
116
117
118
119

    def test_follow_encoding(self):
        s = Stream(None, None)
        s._start = lambda async: None
        s.filter(follow=[u'Caf\xe9'])

        # Should be UTF-8 encoded
Aaron Hill's avatar
Aaron Hill committed
120
        self.assertEqual(u'Caf\xe9'.encode('utf8'), s.body['follow'])
121

Timo Ewalds's avatar
Timo Ewalds committed
122

123
class TweepyStreamReadBufferTests(unittest.TestCase):
Timo Ewalds's avatar
Timo Ewalds committed
124

125
    stream = six.b("""11\n{id:12345}\n\n24\n{id:23456, test:"blah"}\n""")
Timo Ewalds's avatar
Timo Ewalds committed
126
127
128

    def test_read_tweet(self):
        for length in [1, 2, 5, 10, 20, 50]:
129
            buf = ReadBuffer(six.BytesIO(self.stream), length)
Timo Ewalds's avatar
Timo Ewalds committed
130
131
132
133
134
135
            self.assertEqual('11\n', buf.read_line())
            self.assertEqual('{id:12345}\n', buf.read_len(11))
            self.assertEqual('\n', buf.read_line())
            self.assertEqual('24\n', buf.read_line())
            self.assertEqual('{id:23456, test:"blah"}\n', buf.read_len(24))

136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
    def test_read_empty_buffer(self):
        """
        Requests can be closed by twitter.
        The ReadBuffer should not loop infinitely when this happens.
        Instead it should return and let the outer _read_loop handle it.
        """

        # If the test fails, we are in danger of an infinite loop
        # so we need to do some work to block that from happening
        class InfiniteLoopException(Exception):
            pass

        self.called_count = 0
        call_limit = 5
        def on_read(chunk_size):
            self.called_count += 1

            if self.called_count > call_limit:
                # we have failed
                raise InfiniteLoopException("Oops, read() was called a bunch of times")

            return ""

        # Create a fake stream
160
        stream = six.BytesIO(six.b(''))
161
162
163
164
165

        # Mock it's read function so it can't be called too many times
        mock_read = MagicMock(side_effect=on_read)

        try:
166
167
            stream.close()
            with patch.multiple(stream, create=True, read=mock_read):
168
169
170
171
172
173
174
175
176
177
                # Now the stream can't call 'read' more than call_limit times
                # and it looks like a requests stream that is closed
                buf = ReadBuffer(stream, 50)
                buf.read_line("\n")
        except InfiniteLoopException:
            self.fail("ReadBuffer.read_line tried to loop infinitely.")

        # The mocked function not have been called at all since the stream looks closed
        self.assertEqual(mock_read.call_count, 0)

178
    def test_read_unicode_tweet(self):
179
        stream = six.b('11\n{id:12345}\n\n23\n{id:23456, test:"\xe3\x81\x93"}\n\n')
180
        for length in [1, 2, 5, 10, 20, 50]:
181
            buf = ReadBuffer(six.BytesIO(stream), length)
182
183
184
185
            self.assertEqual('11\n', buf.read_line())
            self.assertEqual('{id:12345}\n', buf.read_len(11))
            self.assertEqual('\n', buf.read_line())
            self.assertEqual('23\n', buf.read_line())
186
            self.assertEqual(u'{id:23456, test:"\u3053"}\n', buf.read_len(23))
187

Timo Ewalds's avatar
Timo Ewalds committed
188

189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
class TweepyStreamBackoffTests(unittest.TestCase):
    def setUp(self):
        #bad auth causes twitter to return 401 errors
        self.auth = OAuthHandler("bad-key", "bad-secret")
        self.auth.set_access_token("bad-token", "bad-token-secret")
        self.listener = MockStreamListener(self)
        self.stream = Stream(self.auth, self.listener)

    def tearDown(self):
        self.stream.disconnect()

    def test_exp_backoff(self):
        self.stream = Stream(self.auth, self.listener, timeout=3.0,
                             retry_count=1, retry_time=1.0, retry_time_cap=100.0)
        self.stream.sample()
        # 1 retry, should be 4x the retry_time
        self.assertEqual(self.stream.retry_time, 4.0)

    def test_exp_backoff_cap(self):
        self.stream = Stream(self.auth, self.listener, timeout=3.0,
                             retry_count=1, retry_time=1.0, retry_time_cap=3.0)
        self.stream.sample()
        # 1 retry, but 4x the retry_time exceeds the cap, so should be capped
        self.assertEqual(self.stream.retry_time, 3.0)

    mock_resp = MagicMock()
Aaron Hill's avatar
Aaron Hill committed
215
    mock_resp.return_value.status_code = 420
Mark Smith's avatar
Mark Smith committed
216

Aaron Hill's avatar
Aaron Hill committed
217
    @patch('requests.Session.request', mock_resp)
218
219
220
221
222
223
    def test_420(self):
        self.stream = Stream(self.auth, self.listener, timeout=3.0, retry_count=0,
                             retry_time=1.0, retry_420=1.5, retry_time_cap=20.0)
        self.stream.sample()
        # no retries, but error 420, should be double the retry_420, not double the retry_time
        self.assertEqual(self.stream.retry_time, 3.0)