test_streaming.py 7.2 KB
Newer Older
Mark Smith's avatar
Mark Smith committed
1
2
3
4
5
6
7
8
9
10
11
from __future__ import absolute_import, print_function

from .config import tape

import six
if six.PY3:
    import unittest
    from unittest.case import skip
else:
    import unittest2 as unittest
    from unittest2.case import skip
Joshua Roesslein's avatar
Joshua Roesslein committed
12
13

from tweepy.api import API
14
from tweepy.auth import OAuthHandler
15
from tweepy.models import Status
Timo Ewalds's avatar
Timo Ewalds committed
16
from tweepy.streaming import Stream, StreamListener, ReadBuffer
Joshua Roesslein's avatar
Joshua Roesslein committed
17

Mark Smith's avatar
Mark Smith committed
18
19
from .config import create_auth
from .test_utils import mock_tweet
20
from mock import MagicMock, patch
Joshua Roesslein's avatar
Joshua Roesslein committed
21

Mark Smith's avatar
Mark Smith committed
22
23
24
25
26
27
28

if six.PY3:
    getresponse_location = 'http.client.HTTPConnection.getresponse'
else:
    getresponse_location = 'httplib.HTTPConnection.getresponse'


Joshua Roesslein's avatar
Joshua Roesslein committed
29
class MockStreamListener(StreamListener):
30
    def __init__(self, test_case):
Joshua Roesslein's avatar
Joshua Roesslein committed
31
        super(MockStreamListener, self).__init__()
32
        self.test_case = test_case
Joshua Roesslein's avatar
Joshua Roesslein committed
33
        self.status_count = 0
34
35
36
37
38
39
        self.status_stop_count = 0
        self.connect_cb = None

    def on_connect(self):
        if self.connect_cb:
            self.connect_cb()
Joshua Roesslein's avatar
Joshua Roesslein committed
40

41
42
43
44
    def on_timeout(self):
        self.test_case.fail('timeout')
        return False

45
    def on_error(self, code):
46
        print("response: %s" % code)
47
48
        return True

Joshua Roesslein's avatar
Joshua Roesslein committed
49
50
    def on_status(self, status):
        self.status_count += 1
51
52
53
        self.test_case.assertIsInstance(status, Status)
        if self.status_stop_count == self.status_count:
            return False
Joshua Roesslein's avatar
Joshua Roesslein committed
54

Mark Smith's avatar
Mark Smith committed
55

Joshua Roesslein's avatar
Joshua Roesslein committed
56
57
58
class TweepyStreamTests(unittest.TestCase):
    def setUp(self):
        self.auth = create_auth()
59
        self.listener = MockStreamListener(self)
60
        self.stream = Stream(self.auth, self.listener, timeout=3.0)
Joshua Roesslein's avatar
Joshua Roesslein committed
61
62
63
64

    def tearDown(self):
        self.stream.disconnect()

Mark Smith's avatar
Mark Smith committed
65
    def on_connect(self):
Aaron Hill's avatar
Aaron Hill committed
66
67
        API(self.auth).update_status(mock_tweet())

Joshua Roesslein's avatar
Joshua Roesslein committed
68
69
    def test_userstream(self):
        # Generate random tweet which should show up in the stream.
Mark Smith's avatar
Mark Smith committed
70

Aaron Hill's avatar
Aaron Hill committed
71
        self.listener.connect_cb = self.on_connect
72
73
        self.listener.status_stop_count = 1
        self.stream.userstream()
Joshua Roesslein's avatar
Joshua Roesslein committed
74
        self.assertEqual(self.listener.status_count, 1)
Mark Smith's avatar
Mark Smith committed
75
76

    @skip("Sitestream only available to whitelisted accounts.")
Aaron Hill's avatar
Aaron Hill committed
77
78
79
    def test_sitestream(self):
        self.listener.connect_cb = self.on_connect
        self.listener.status_stop_count = 1
Mark Smith's avatar
Mark Smith committed
80
        self.stream.sitestream(follow=[self.auth.get_username()])
Aaron Hill's avatar
Aaron Hill committed
81
        self.assertEqual(self.listener.status_count, 1)
Joshua Roesslein's avatar
Joshua Roesslein committed
82

83
84
85
86
87
88
89
90
91
92
    def test_userstream_with_params(self):
        # Generate random tweet which should show up in the stream.
        def on_connect():
            API(self.auth).update_status(mock_tweet())

        self.listener.connect_cb = on_connect
        self.listener.status_stop_count = 1
        self.stream.userstream(_with='user', replies='all', stall_warnings=True)
        self.assertEqual(self.listener.status_count, 1)

93
94
95
96
97
98
    def test_sample(self):
        self.listener.status_stop_count = 10
        self.stream.sample()
        self.assertEquals(self.listener.status_count,
                          self.listener.status_stop_count)

99
100
101
102
103
104
105
    def test_filter_track(self):
        self.listener.status_stop_count = 5
        phrases = ['twitter']
        self.stream.filter(track=phrases)
        self.assertEquals(self.listener.status_count,
                          self.listener.status_stop_count)

106
107
108
109
110
111
    def test_track_encoding(self):
        s = Stream(None, None)
        s._start = lambda async: None
        s.filter(track=[u'Caf\xe9'])

        # Should be UTF-8 encoded
112
        self.assertEqual(u'Caf\xe9'.encode('utf8'), s.session.params['track'])
113
114
115
116
117
118
119

    def test_follow_encoding(self):
        s = Stream(None, None)
        s._start = lambda async: None
        s.filter(follow=[u'Caf\xe9'])

        # Should be UTF-8 encoded
120
        self.assertEqual(u'Caf\xe9'.encode('utf8'), s.session.params['follow'])
121

Timo Ewalds's avatar
Timo Ewalds committed
122
123
124
125
126
127
128

class TweepyStreamReadBuffer(unittest.TestCase):

    stream = """11\n{id:12345}\n\n24\n{id:23456, test:"blah"}\n"""

    def test_read_tweet(self):
        for length in [1, 2, 5, 10, 20, 50]:
Mark Smith's avatar
Mark Smith committed
129
            buf = ReadBuffer(six.StringIO(self.stream), length)
Timo Ewalds's avatar
Timo Ewalds committed
130
131
132
133
134
135
            self.assertEqual('11\n', buf.read_line())
            self.assertEqual('{id:12345}\n', buf.read_len(11))
            self.assertEqual('\n', buf.read_line())
            self.assertEqual('24\n', buf.read_line())
            self.assertEqual('{id:23456, test:"blah"}\n', buf.read_len(24))

136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
    def test_read_empty_buffer(self):
        """
        Requests can be closed by twitter.
        The ReadBuffer should not loop infinitely when this happens.
        Instead it should return and let the outer _read_loop handle it.
        """

        # If the test fails, we are in danger of an infinite loop
        # so we need to do some work to block that from happening
        class InfiniteLoopException(Exception):
            pass

        self.called_count = 0
        call_limit = 5
        def on_read(chunk_size):
            self.called_count += 1

            if self.called_count > call_limit:
                # we have failed
                raise InfiniteLoopException("Oops, read() was called a bunch of times")

            return ""

        # Create a fake stream
        stream = six.StringIO('')

        # Mock it's read function so it can't be called too many times
        mock_read = MagicMock(side_effect=on_read)

        try:
Michael Brooks's avatar
Michael Brooks committed
166
            with patch.multiple(stream, create=True, read=mock_read, closed=True):
167
168
169
170
171
172
173
174
175
176
                # Now the stream can't call 'read' more than call_limit times
                # and it looks like a requests stream that is closed
                buf = ReadBuffer(stream, 50)
                buf.read_line("\n")
        except InfiniteLoopException:
            self.fail("ReadBuffer.read_line tried to loop infinitely.")

        # The mocked function not have been called at all since the stream looks closed
        self.assertEqual(mock_read.call_count, 0)

Timo Ewalds's avatar
Timo Ewalds committed
177

178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
class TweepyStreamBackoffTests(unittest.TestCase):
    def setUp(self):
        #bad auth causes twitter to return 401 errors
        self.auth = OAuthHandler("bad-key", "bad-secret")
        self.auth.set_access_token("bad-token", "bad-token-secret")
        self.listener = MockStreamListener(self)
        self.stream = Stream(self.auth, self.listener)

    def tearDown(self):
        self.stream.disconnect()

    def test_exp_backoff(self):
        self.stream = Stream(self.auth, self.listener, timeout=3.0,
                             retry_count=1, retry_time=1.0, retry_time_cap=100.0)
        self.stream.sample()
        # 1 retry, should be 4x the retry_time
        self.assertEqual(self.stream.retry_time, 4.0)

    def test_exp_backoff_cap(self):
        self.stream = Stream(self.auth, self.listener, timeout=3.0,
                             retry_count=1, retry_time=1.0, retry_time_cap=3.0)
        self.stream.sample()
        # 1 retry, but 4x the retry_time exceeds the cap, so should be capped
        self.assertEqual(self.stream.retry_time, 3.0)

    mock_resp = MagicMock()
    mock_resp.return_value.status = 420
Mark Smith's avatar
Mark Smith committed
205
206

    @patch(getresponse_location, mock_resp)
207
208
209
210
211
212
    def test_420(self):
        self.stream = Stream(self.auth, self.listener, timeout=3.0, retry_count=0,
                             retry_time=1.0, retry_420=1.5, retry_time_cap=20.0)
        self.stream.sample()
        # no retries, but error 420, should be double the retry_420, not double the retry_time
        self.assertEqual(self.stream.retry_time, 3.0)