streaming.py 16.6 KB
Newer Older
1
# Tweepy
2
3
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
4

5
6
# Appengine users: https://developers.google.com/appengine/docs/python/sockets/#making_httplib_use_sockets

Mark Smith's avatar
Mark Smith committed
7
8
from __future__ import absolute_import, print_function

9
import logging
10
import re
Aaron Hill's avatar
Aaron Hill committed
11
12
import requests
from requests.exceptions import Timeout
13
from threading import Thread
14
from time import sleep
Mark Smith's avatar
Mark Smith committed
15
16
17

import six

18
import ssl
19

20
from tweepy.models import Status
Josh Roesslein's avatar
Josh Roesslein committed
21
22
from tweepy.api import API
from tweepy.error import TweepError
23

24
from tweepy.utils import import_simplejson
25
json = import_simplejson()
26

Joshua Roesslein's avatar
Joshua Roesslein committed
27
STREAM_VERSION = '1.1'
28

Josh Roesslein's avatar
Josh Roesslein committed
29

30
31
class StreamListener(object):

32
33
    def __init__(self, api=None):
        self.api = api or API()
34

35
36
37
38
39
40
41
42
43
    def on_connect(self):
        """Called once connected to streaming server.

        This will be invoked once a successful response
        is received from the server. Allows the listener
        to perform some work prior to entering the read loop.
        """
        pass

44
    def on_data(self, raw_data):
45
46
47
48
49
        """Called when raw data is received from connection.

        Override this method if you wish to manually handle
        the stream data. Return False to stop stream and close connection.
        """
50
        data = json.loads(raw_data)
51
52

        if 'in_reply_to_status_id' in data:
53
            status = Status.parse(self.api, data)
54
55
56
            if self.on_status(status) is False:
                return False
        elif 'delete' in data:
57
            delete = data['delete']['status']
58
59
            if self.on_delete(delete['id'], delete['user_id']) is False:
                return False
Tetsuya Shinone's avatar
Tetsuya Shinone committed
60
61
62
63
64
65
66
67
        elif 'event' in data:
            status = Status.parse(self.api, data)
            if self.on_event(status) is False:
                return False
        elif 'direct_message' in data:
            status = Status.parse(self.api, data)
            if self.on_direct_message(status) is False:
                return False
68
69
70
        elif 'friends' in data:
            if self.on_friends(data['friends']) is False:
                return False
71
        elif 'limit' in data:
72
            if self.on_limit(data['limit']['track']) is False:
73
                return False
74
75
76
        elif 'disconnect' in data:
            if self.on_disconnect(data['disconnect']) is False:
                return False
77
78
79
        elif 'warning' in data:
            if self.on_warning(data['warning']) is False:
                return False
80
81
        else:
            logging.error("Unknown message type: " + str(raw_data))
82

Paul van der Linden's avatar
Paul van der Linden committed
83
84
85
86
    def keep_alive(self):
        """Called when a keep-alive arrived"""
        return

Josh Roesslein's avatar
Josh Roesslein committed
87
88
89
90
    def on_status(self, status):
        """Called when a new status arrives"""
        return

91
92
93
94
    def on_exception(self, exception):
        """Called when an unhandled exception occurs."""
        return

Josh Roesslein's avatar
Josh Roesslein committed
95
96
97
    def on_delete(self, status_id, user_id):
        """Called when a delete notice arrives for a status"""
        return
98

Tetsuya Shinone's avatar
Tetsuya Shinone committed
99
100
101
102
103
104
105
106
    def on_event(self, status):
        """Called when a new event arrives"""
        return

    def on_direct_message(self, status):
        """Called when a new direct message arrives"""
        return

107
108
109
110
111
112
113
    def on_friends(self, friends):
        """Called when a friends list arrives.

        friends is a list that contains user_id
        """
        return

Josh Roesslein's avatar
Josh Roesslein committed
114
    def on_limit(self, track):
115
        """Called when a limitation notice arrives"""
Josh Roesslein's avatar
Josh Roesslein committed
116
        return
117

Josh Roesslein's avatar
Josh Roesslein committed
118
119
120
    def on_error(self, status_code):
        """Called when a non-200 status code is returned"""
        return False
121

Josh Roesslein's avatar
Josh Roesslein committed
122
123
124
    def on_timeout(self):
        """Called when stream connection times out"""
        return
125

126
127
128
    def on_disconnect(self, notice):
        """Called when twitter sends a disconnect notice

129
        Disconnect codes are listed here:
130
131
        https://dev.twitter.com/docs/streaming-apis/messages#Disconnect_messages_disconnect
        """
132
        return
133

134
135
136
    def on_warning(self, notice):
        """Called when a disconnection warning message arrives"""
        return
137

138

Timo Ewalds's avatar
Timo Ewalds committed
139
140
141
142
143
144
145
146
147
148
149
150
151
class ReadBuffer(object):
    """Buffer data from the response in a smarter way than httplib/requests can.

    Tweets are roughly in the 2-12kb range, averaging around 3kb.
    Requests/urllib3/httplib/socket all use socket.read, which blocks
    until enough data is returned. On some systems (eg google appengine), socket
    reads are quite slow. To combat this latency we can read big chunks,
    but the blocking part means we won't get results until enough tweets
    have arrived. That may not be a big deal for high throughput systems.
    For low throughput systems we don't want to sacrafice latency, so we
    use small chunks so it can read the length and the tweet in 2 read calls.
    """

152
    def __init__(self, stream, chunk_size, encoding='utf-8'):
Timo Ewalds's avatar
Timo Ewalds committed
153
        self._stream = stream
154
        self._buffer = six.b('')
Timo Ewalds's avatar
Timo Ewalds committed
155
        self._chunk_size = chunk_size
156
        self._encoding = encoding
Timo Ewalds's avatar
Timo Ewalds committed
157
158

    def read_len(self, length):
Michael Brooks's avatar
Michael Brooks committed
159
        while not self._stream.closed:
Timo Ewalds's avatar
Timo Ewalds committed
160
161
162
            if len(self._buffer) >= length:
                return self._pop(length)
            read_len = max(self._chunk_size, length - len(self._buffer))
163
            self._buffer += self._stream.read(read_len)
Timo Ewalds's avatar
Timo Ewalds committed
164

165
166
167
168
169
170
171
    def read_line(self, sep=six.b('\n')):
        """Read the data stream until a given separator is found (default \n)

        :param sep: Separator to read until. Must by of the bytes type (str in python 2,
            bytes in python 3)
        :return: The str of the data read until sep
        """
Timo Ewalds's avatar
Timo Ewalds committed
172
        start = 0
Michael Brooks's avatar
Michael Brooks committed
173
        while not self._stream.closed:
Timo Ewalds's avatar
Timo Ewalds committed
174
175
176
177
178
            loc = self._buffer.find(sep, start)
            if loc >= 0:
                return self._pop(loc + len(sep))
            else:
                start = len(self._buffer)
179
            self._buffer += self._stream.read(self._chunk_size)
Timo Ewalds's avatar
Timo Ewalds committed
180
181
182
183

    def _pop(self, length):
        r = self._buffer[:length]
        self._buffer = self._buffer[length:]
184
        return r.decode(self._encoding)
Timo Ewalds's avatar
Timo Ewalds committed
185
186


187
188
class Stream(object):

Josh Roesslein's avatar
Josh Roesslein committed
189
190
    host = 'stream.twitter.com'

191
    def __init__(self, auth, listener, **options):
192
        self.auth = auth
Josh Roesslein's avatar
Josh Roesslein committed
193
        self.listener = listener
194
        self.running = False
195
        self.timeout = options.get("timeout", 300.0)
196
        self.retry_count = options.get("retry_count")
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
197
198
        # values according to
        # https://dev.twitter.com/docs/streaming-apis/connecting#Reconnecting
199
200
201
202
203
        self.retry_time_start = options.get("retry_time", 5.0)
        self.retry_420_start = options.get("retry_420", 60.0)
        self.retry_time_cap = options.get("retry_time_cap", 320.0)
        self.snooze_time_step = options.get("snooze_time", 0.25)
        self.snooze_time_cap = options.get("snooze_time_cap", 16)
204
205
206
207
208
209
210

        # The default socket.read size. Default to less than half the size of
        # a tweet so that it reads tweets with the minimal latency of 2 reads
        # per tweet. Values higher than ~1kb will increase latency by waiting
        # for more data to arrive but may also increase throughput by doing
        # fewer socket read calls.
        self.chunk_size = options.get("chunk_size",  512)
211

212
213
        self.verify = options.get("verify", True)

Josh Roesslein's avatar
Josh Roesslein committed
214
        self.api = API()
215
216
        self.headers = options.get("headers") or {}
        self.new_session()
217
        self.body = None
218
        self.retry_time = self.retry_time_start
219
        self.snooze_time = self.snooze_time_step
Josh Roesslein's avatar
Josh Roesslein committed
220

221
222
223
224
225
    def new_session(self):
        self.session = requests.Session()
        self.session.headers = self.headers
        self.session.params = None

Josh Roesslein's avatar
Josh Roesslein committed
226
    def _run(self):
227
        # Authenticate
Joshua Roesslein's avatar
Joshua Roesslein committed
228
        url = "https://%s%s" % (self.host, self.url)
Josh Roesslein's avatar
Josh Roesslein committed
229

230
        # Connect and process the stream
Josh Roesslein's avatar
Josh Roesslein committed
231
        error_counter = 0
Aaron Hill's avatar
Aaron Hill committed
232
        resp = None
233
        exception = None
Josh Roesslein's avatar
Josh Roesslein committed
234
        while self.running:
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
235
236
237
238
            if self.retry_count is not None:
                if error_counter > self.retry_count:
                    # quit if error count greater than retry count
                    break
Josh Roesslein's avatar
Josh Roesslein committed
239
            try:
Aaron Hill's avatar
Aaron Hill committed
240
                auth = self.auth.apply_auth()
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
241
242
243
244
245
                resp = self.session.request('POST',
                                            url,
                                            data=self.body,
                                            timeout=self.timeout,
                                            stream=True,
246
247
                                            auth=auth,
                                            verify=self.verify)
Aaron Hill's avatar
Aaron Hill committed
248
249
                if resp.status_code != 200:
                    if self.listener.on_error(resp.status_code) is False:
Josh Roesslein's avatar
Josh Roesslein committed
250
251
                        break
                    error_counter += 1
skrew's avatar
skrew committed
252
                    if resp.status_code == 420:
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
253
254
                        self.retry_time = max(self.retry_420_start,
                                              self.retry_time)
Josh Roesslein's avatar
Josh Roesslein committed
255
                    sleep(self.retry_time)
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
256
257
                    self.retry_time = min(self.retry_time * 2,
                                          self.retry_time_cap)
Josh Roesslein's avatar
Josh Roesslein committed
258
259
                else:
                    error_counter = 0
260
                    self.retry_time = self.retry_time_start
261
                    self.snooze_time = self.snooze_time_step
262
                    self.listener.on_connect()
Josh Roesslein's avatar
Josh Roesslein committed
263
                    self._read_loop(resp)
264
            except (Timeout, ssl.SSLError) as exc:
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
265
266
                # This is still necessary, as a SSLError can actually be
                # thrown when using Requests
267
                # If it's not time out treat it like any other exception
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
268
269
270
271
272
                if isinstance(exc, ssl.SSLError):
                    if not (exc.args and 'timed out' in str(exc.args[0])):
                        exception = exc
                        break
                if self.listener.on_timeout() is False:
Josh Roesslein's avatar
Josh Roesslein committed
273
274
275
276
                    break
                if self.running is False:
                    break
                sleep(self.snooze_time)
277
278
                self.snooze_time = min(self.snooze_time + self.snooze_time_step,
                                       self.snooze_time_cap)
Mark Smith's avatar
Mark Smith committed
279
280
            except Exception as exc:
                exception = exc
Josh Roesslein's avatar
Josh Roesslein committed
281
282
283
284
285
                # any other exception is fatal, so kill loop
                break

        # cleanup
        self.running = False
Aaron Hill's avatar
Aaron Hill committed
286
287
        if resp:
            resp.close()
Josh Roesslein's avatar
Josh Roesslein committed
288

289
        self.new_session()
290

291
        if exception:
292
293
            # call a handler first so that the exception can be logged.
            self.listener.on_exception(exception)
tehspaceg's avatar
tehspaceg committed
294
            raise exception
295

296
    def _data(self, data):
297
        if self.listener.on_data(data) is False:
298
            self.running = False
299

Josh Roesslein's avatar
Josh Roesslein committed
300
    def _read_loop(self, resp):
301
302
303
304
305
306
307
308
        charset = resp.headers.get('content-type', default='')
        enc_search = re.search('charset=(?P<enc>\S*)', charset)
        if enc_search is not None:
            encoding = enc_search.group('enc')
        else:
            encoding = 'utf-8'

        buf = ReadBuffer(resp.raw, self.chunk_size, encoding=encoding)
309

Michael Brooks's avatar
Michael Brooks committed
310
        while self.running and not resp.raw.closed:
311
            length = 0
Michael Brooks's avatar
Michael Brooks committed
312
            while not resp.raw.closed:
313
314
                line = buf.read_line().strip()
                if not line:
Paul van der Linden's avatar
Paul van der Linden committed
315
                    self.listener.keep_alive()  # keep-alive new lines are expected
316
317
318
319
320
                elif line.isdigit():
                    length = int(line)
                    break
                else:
                    raise TweepError('Expecting length, unexpected value found')
321

322
323
324
            next_status_obj = buf.read_len(length)
            if self.running:
                self._data(next_status_obj)
Josh Roesslein's avatar
Josh Roesslein committed
325

Mark Smith's avatar
Mark Smith committed
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
            # # Note: keep-alive newlines might be inserted before each length value.
            # # read until we get a digit...
            # c = b'\n'
            # for c in resp.iter_content(decode_unicode=True):
            #     if c == b'\n':
            #         continue
            #     break
            #
            # delimited_string = c
            #
            # # read rest of delimiter length..
            # d = b''
            # for d in resp.iter_content(decode_unicode=True):
            #     if d != b'\n':
            #         delimited_string += d
            #         continue
            #     break
            #
            # # read the next twitter status object
            # if delimited_string.decode('utf-8').strip().isdigit():
            #     status_id = int(delimited_string)
            #     next_status_obj = resp.raw.read(status_id)
            #     if self.running:
            #         self._data(next_status_obj.decode('utf-8'))


Michael Brooks's avatar
Michael Brooks committed
352
        if resp.raw.closed:
353
            self.on_closed(resp)
Josh Roesslein's avatar
Josh Roesslein committed
354

355
356
357
    def _start(self, async):
        self.running = True
        if async:
Constantine's avatar
Constantine committed
358
359
            self._thread = Thread(target=self._run)
            self._thread.start()
360
361
        else:
            self._run()
Josh Roesslein's avatar
Josh Roesslein committed
362

363
364
365
366
    def on_closed(self, resp):
        """ Called when the response has been closed by Twitter """
        pass

Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
367
368
369
370
371
372
373
374
    def userstream(self,
                   stall_warnings=False,
                   _with=None,
                   replies=None,
                   track=None,
                   locations=None,
                   async=False,
                   encoding='utf8'):
375
        self.session.params = {'delimited': 'length'}
AlanBell's avatar
AlanBell committed
376
377
        if self.running:
            raise TweepError('Stream object already connected!')
378
        self.url = '/%s/user.json' % STREAM_VERSION
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
379
        self.host = 'userstream.twitter.com'
Aaron Hill's avatar
Aaron Hill committed
380
        if stall_warnings:
381
            self.session.params['stall_warnings'] = stall_warnings
Aaron Hill's avatar
Aaron Hill committed
382
        if _with:
383
            self.session.params['with'] = _with
Aaron Hill's avatar
Aaron Hill committed
384
        if replies:
385
            self.session.params['replies'] = replies
Aaron Hill's avatar
Aaron Hill committed
386
        if locations and len(locations) > 0:
387
388
389
            if len(locations) % 4 != 0:
                raise TweepError("Wrong number of locations points, "
                                 "it has to be a multiple of 4")
390
            self.session.params['locations'] = ','.join(['%.2f' % l for l in locations])
Aaron Hill's avatar
Aaron Hill committed
391
        if track:
Mark Smith's avatar
Mark Smith committed
392
            self.session.params['track'] = u','.join(track).encode(encoding)
Aaron Hill's avatar
Aaron Hill committed
393

AlanBell's avatar
AlanBell committed
394
        self._start(async)
395
396

    def firehose(self, count=None, async=False):
Aaron Hill's avatar
Aaron Hill committed
397
        self.session.params = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
398
399
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
400
        self.url = '/%s/statuses/firehose.json' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
401
402
        if count:
            self.url += '&count=%s' % count
403
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
404

405
    def retweet(self, async=False):
Aaron Hill's avatar
Aaron Hill committed
406
        self.session.params = {'delimited': 'length'}
407
408
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
409
        self.url = '/%s/statuses/retweet.json' % STREAM_VERSION
410
        self._start(async)
411

412
413
    def sample(self, async=False, languages=None):
        self.session.params = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
414
415
        if self.running:
            raise TweepError('Stream object already connected!')
416
417
418
        self.url = '/%s/statuses/sample.json' % STREAM_VERSION
        if languages:
            self.session.params['language'] = ','.join(map(str, languages))
419
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
420

421
    def filter(self, follow=None, track=None, async=False, locations=None,
422
               stall_warnings=False, languages=None, encoding='utf8', filter_level=None):
423
        self.body = {}
Aaron Hill's avatar
Aaron Hill committed
424
        self.session.headers['Content-type'] = "application/x-www-form-urlencoded"
Josh Roesslein's avatar
Josh Roesslein committed
425
426
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
427
        self.url = '/%s/statuses/filter.json' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
428
        if follow:
429
            self.body['follow'] = u','.join(follow).encode(encoding)
Josh Roesslein's avatar
Josh Roesslein committed
430
        if track:
431
            self.body['track'] = u','.join(track).encode(encoding)
432
        if locations and len(locations) > 0:
433
434
435
            if len(locations) % 4 != 0:
                raise TweepError("Wrong number of locations points, "
                                 "it has to be a multiple of 4")
436
            self.body['locations'] = u','.join(['%.4f' % l for l in locations])
437
        if stall_warnings:
438
            self.body['stall_warnings'] = stall_warnings
439
        if languages:
440
            self.body['language'] = u','.join(map(str, languages))
441
        if filter_level:
john's avatar
john committed
442
            self.body['filter_level'] = unicode(filter_level, encoding)
elanting's avatar
elanting committed
443
        self.session.params = {'delimited': 'length'}
Aaron Hill's avatar
Aaron Hill committed
444
        self.host = 'stream.twitter.com'
445
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
446

Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
447
448
    def sitestream(self, follow, stall_warnings=False,
                   with_='user', replies=False, async=False):
449
        self.body = {}
Aaron Hill's avatar
Aaron Hill committed
450
451
452
        if self.running:
            raise TweepError('Stream object already connected!')
        self.url = '/%s/site.json' % STREAM_VERSION
453
454
        self.body['follow'] = u','.join(map(six.text_type, follow))
        self.body['delimited'] = 'length'
Aaron Hill's avatar
Aaron Hill committed
455
        if stall_warnings:
456
            self.body['stall_warnings'] = stall_warnings
Aaron Hill's avatar
Aaron Hill committed
457
        if with_:
458
            self.body['with'] = with_
Aaron Hill's avatar
Aaron Hill committed
459
        if replies:
460
            self.body['replies'] = replies
Aaron Hill's avatar
Aaron Hill committed
461
462
        self._start(async)

Josh Roesslein's avatar
Josh Roesslein committed
463
    def disconnect(self):
464
        if self.running is False:
Josh Roesslein's avatar
Josh Roesslein committed
465
466
            return
        self.running = False