streaming.py 16.9 KB
Newer Older
1
# Tweepy
2
3
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
4

5
6
# Appengine users: https://developers.google.com/appengine/docs/python/sockets/#making_httplib_use_sockets

Mark Smith's avatar
Mark Smith committed
7
8
from __future__ import absolute_import, print_function

9
import logging
10
import re
Aaron Hill's avatar
Aaron Hill committed
11
import requests
12
import sys
Aaron Hill's avatar
Aaron Hill committed
13
from requests.exceptions import Timeout
14
from threading import Thread
15
from time import sleep
Mark Smith's avatar
Mark Smith committed
16
17
18

import six

19
import ssl
20

21
from tweepy.models import Status
Josh Roesslein's avatar
Josh Roesslein committed
22
23
from tweepy.api import API
from tweepy.error import TweepError
24

25
from tweepy.utils import import_simplejson
26
json = import_simplejson()
27

Joshua Roesslein's avatar
Joshua Roesslein committed
28
STREAM_VERSION = '1.1'
29

Josh Roesslein's avatar
Josh Roesslein committed
30

31
32
class StreamListener(object):

33
34
    def __init__(self, api=None):
        self.api = api or API()
35

36
37
38
39
40
41
42
43
44
    def on_connect(self):
        """Called once connected to streaming server.

        This will be invoked once a successful response
        is received from the server. Allows the listener
        to perform some work prior to entering the read loop.
        """
        pass

45
    def on_data(self, raw_data):
46
47
48
49
50
        """Called when raw data is received from connection.

        Override this method if you wish to manually handle
        the stream data. Return False to stop stream and close connection.
        """
51
        data = json.loads(raw_data)
52
53

        if 'in_reply_to_status_id' in data:
54
            status = Status.parse(self.api, data)
55
56
57
            if self.on_status(status) is False:
                return False
        elif 'delete' in data:
58
            delete = data['delete']['status']
59
60
            if self.on_delete(delete['id'], delete['user_id']) is False:
                return False
Tetsuya Shinone's avatar
Tetsuya Shinone committed
61
62
63
64
65
66
67
68
        elif 'event' in data:
            status = Status.parse(self.api, data)
            if self.on_event(status) is False:
                return False
        elif 'direct_message' in data:
            status = Status.parse(self.api, data)
            if self.on_direct_message(status) is False:
                return False
69
70
71
        elif 'friends' in data:
            if self.on_friends(data['friends']) is False:
                return False
72
        elif 'limit' in data:
73
            if self.on_limit(data['limit']['track']) is False:
74
                return False
75
76
77
        elif 'disconnect' in data:
            if self.on_disconnect(data['disconnect']) is False:
                return False
78
79
80
        elif 'warning' in data:
            if self.on_warning(data['warning']) is False:
                return False
81
82
        else:
            logging.error("Unknown message type: " + str(raw_data))
83

Paul van der Linden's avatar
Paul van der Linden committed
84
85
86
87
    def keep_alive(self):
        """Called when a keep-alive arrived"""
        return

Josh Roesslein's avatar
Josh Roesslein committed
88
89
90
91
    def on_status(self, status):
        """Called when a new status arrives"""
        return

92
93
94
95
    def on_exception(self, exception):
        """Called when an unhandled exception occurs."""
        return

Josh Roesslein's avatar
Josh Roesslein committed
96
97
98
    def on_delete(self, status_id, user_id):
        """Called when a delete notice arrives for a status"""
        return
99

Tetsuya Shinone's avatar
Tetsuya Shinone committed
100
101
102
103
104
105
106
107
    def on_event(self, status):
        """Called when a new event arrives"""
        return

    def on_direct_message(self, status):
        """Called when a new direct message arrives"""
        return

108
109
110
111
112
113
114
    def on_friends(self, friends):
        """Called when a friends list arrives.

        friends is a list that contains user_id
        """
        return

Josh Roesslein's avatar
Josh Roesslein committed
115
    def on_limit(self, track):
116
        """Called when a limitation notice arrives"""
Josh Roesslein's avatar
Josh Roesslein committed
117
        return
118

Josh Roesslein's avatar
Josh Roesslein committed
119
120
121
    def on_error(self, status_code):
        """Called when a non-200 status code is returned"""
        return False
122

Josh Roesslein's avatar
Josh Roesslein committed
123
124
125
    def on_timeout(self):
        """Called when stream connection times out"""
        return
126

127
128
129
    def on_disconnect(self, notice):
        """Called when twitter sends a disconnect notice

130
        Disconnect codes are listed here:
131
132
        https://dev.twitter.com/docs/streaming-apis/messages#Disconnect_messages_disconnect
        """
133
        return
134

135
136
137
    def on_warning(self, notice):
        """Called when a disconnection warning message arrives"""
        return
138

139

Timo Ewalds's avatar
Timo Ewalds committed
140
141
142
143
144
145
146
147
148
149
150
151
152
class ReadBuffer(object):
    """Buffer data from the response in a smarter way than httplib/requests can.

    Tweets are roughly in the 2-12kb range, averaging around 3kb.
    Requests/urllib3/httplib/socket all use socket.read, which blocks
    until enough data is returned. On some systems (eg google appengine), socket
    reads are quite slow. To combat this latency we can read big chunks,
    but the blocking part means we won't get results until enough tweets
    have arrived. That may not be a big deal for high throughput systems.
    For low throughput systems we don't want to sacrafice latency, so we
    use small chunks so it can read the length and the tweet in 2 read calls.
    """

153
    def __init__(self, stream, chunk_size, encoding='utf-8'):
Timo Ewalds's avatar
Timo Ewalds committed
154
        self._stream = stream
155
        self._buffer = six.b('')
Timo Ewalds's avatar
Timo Ewalds committed
156
        self._chunk_size = chunk_size
157
        self._encoding = encoding
Timo Ewalds's avatar
Timo Ewalds committed
158
159

    def read_len(self, length):
Michael Brooks's avatar
Michael Brooks committed
160
        while not self._stream.closed:
Timo Ewalds's avatar
Timo Ewalds committed
161
162
163
            if len(self._buffer) >= length:
                return self._pop(length)
            read_len = max(self._chunk_size, length - len(self._buffer))
164
            self._buffer += self._stream.read(read_len)
165
        return six.b('')
Timo Ewalds's avatar
Timo Ewalds committed
166

167
168
169
170
171
172
173
    def read_line(self, sep=six.b('\n')):
        """Read the data stream until a given separator is found (default \n)

        :param sep: Separator to read until. Must by of the bytes type (str in python 2,
            bytes in python 3)
        :return: The str of the data read until sep
        """
Timo Ewalds's avatar
Timo Ewalds committed
174
        start = 0
Michael Brooks's avatar
Michael Brooks committed
175
        while not self._stream.closed:
Timo Ewalds's avatar
Timo Ewalds committed
176
177
178
179
180
            loc = self._buffer.find(sep, start)
            if loc >= 0:
                return self._pop(loc + len(sep))
            else:
                start = len(self._buffer)
181
            self._buffer += self._stream.read(self._chunk_size)
182
        return six.b('')
Timo Ewalds's avatar
Timo Ewalds committed
183
184
185
186

    def _pop(self, length):
        r = self._buffer[:length]
        self._buffer = self._buffer[length:]
187
        return r.decode(self._encoding)
Timo Ewalds's avatar
Timo Ewalds committed
188
189


190
191
class Stream(object):

Josh Roesslein's avatar
Josh Roesslein committed
192
193
    host = 'stream.twitter.com'

194
    def __init__(self, auth, listener, **options):
195
        self.auth = auth
Josh Roesslein's avatar
Josh Roesslein committed
196
        self.listener = listener
197
        self.running = False
198
        self.timeout = options.get("timeout", 300.0)
199
        self.retry_count = options.get("retry_count")
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
200
201
        # values according to
        # https://dev.twitter.com/docs/streaming-apis/connecting#Reconnecting
202
203
204
205
206
        self.retry_time_start = options.get("retry_time", 5.0)
        self.retry_420_start = options.get("retry_420", 60.0)
        self.retry_time_cap = options.get("retry_time_cap", 320.0)
        self.snooze_time_step = options.get("snooze_time", 0.25)
        self.snooze_time_cap = options.get("snooze_time_cap", 16)
207
208
209
210
211
212
213

        # The default socket.read size. Default to less than half the size of
        # a tweet so that it reads tweets with the minimal latency of 2 reads
        # per tweet. Values higher than ~1kb will increase latency by waiting
        # for more data to arrive but may also increase throughput by doing
        # fewer socket read calls.
        self.chunk_size = options.get("chunk_size",  512)
214

215
216
        self.verify = options.get("verify", True)

Josh Roesslein's avatar
Josh Roesslein committed
217
        self.api = API()
218
219
        self.headers = options.get("headers") or {}
        self.new_session()
220
        self.body = None
221
        self.retry_time = self.retry_time_start
222
        self.snooze_time = self.snooze_time_step
Josh Roesslein's avatar
Josh Roesslein committed
223

224
225
226
227
228
    def new_session(self):
        self.session = requests.Session()
        self.session.headers = self.headers
        self.session.params = None

Josh Roesslein's avatar
Josh Roesslein committed
229
    def _run(self):
230
        # Authenticate
Joshua Roesslein's avatar
Joshua Roesslein committed
231
        url = "https://%s%s" % (self.host, self.url)
Josh Roesslein's avatar
Josh Roesslein committed
232

233
        # Connect and process the stream
Josh Roesslein's avatar
Josh Roesslein committed
234
        error_counter = 0
Aaron Hill's avatar
Aaron Hill committed
235
        resp = None
236
        exc_info = None
Josh Roesslein's avatar
Josh Roesslein committed
237
        while self.running:
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
238
239
240
241
            if self.retry_count is not None:
                if error_counter > self.retry_count:
                    # quit if error count greater than retry count
                    break
Josh Roesslein's avatar
Josh Roesslein committed
242
            try:
Aaron Hill's avatar
Aaron Hill committed
243
                auth = self.auth.apply_auth()
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
244
245
246
247
248
                resp = self.session.request('POST',
                                            url,
                                            data=self.body,
                                            timeout=self.timeout,
                                            stream=True,
249
250
                                            auth=auth,
                                            verify=self.verify)
Aaron Hill's avatar
Aaron Hill committed
251
252
                if resp.status_code != 200:
                    if self.listener.on_error(resp.status_code) is False:
Josh Roesslein's avatar
Josh Roesslein committed
253
254
                        break
                    error_counter += 1
skrew's avatar
skrew committed
255
                    if resp.status_code == 420:
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
256
257
                        self.retry_time = max(self.retry_420_start,
                                              self.retry_time)
Josh Roesslein's avatar
Josh Roesslein committed
258
                    sleep(self.retry_time)
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
259
260
                    self.retry_time = min(self.retry_time * 2,
                                          self.retry_time_cap)
Josh Roesslein's avatar
Josh Roesslein committed
261
262
                else:
                    error_counter = 0
263
                    self.retry_time = self.retry_time_start
264
                    self.snooze_time = self.snooze_time_step
265
                    self.listener.on_connect()
Josh Roesslein's avatar
Josh Roesslein committed
266
                    self._read_loop(resp)
267
            except (Timeout, ssl.SSLError) as exc:
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
268
269
                # This is still necessary, as a SSLError can actually be
                # thrown when using Requests
270
                # If it's not time out treat it like any other exception
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
271
272
                if isinstance(exc, ssl.SSLError):
                    if not (exc.args and 'timed out' in str(exc.args[0])):
273
                        exc_info = sys.exc_info()
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
274
275
                        break
                if self.listener.on_timeout() is False:
Josh Roesslein's avatar
Josh Roesslein committed
276
277
278
279
                    break
                if self.running is False:
                    break
                sleep(self.snooze_time)
280
281
                self.snooze_time = min(self.snooze_time + self.snooze_time_step,
                                       self.snooze_time_cap)
Mark Smith's avatar
Mark Smith committed
282
            except Exception as exc:
283
284
285
                exc_info = sys.exc_info()
                print(exc_info)
                import pdb; pdb.post_mortem(exc_info[2])
Josh Roesslein's avatar
Josh Roesslein committed
286
287
288
289
290
                # any other exception is fatal, so kill loop
                break

        # cleanup
        self.running = False
Aaron Hill's avatar
Aaron Hill committed
291
292
        if resp:
            resp.close()
Josh Roesslein's avatar
Josh Roesslein committed
293

294
        self.new_session()
295

296
        if exc_info:
297
            # call a handler first so that the exception can be logged.
298
299
            self.listener.on_exception(exc_info[1])
            six.reraise(*exc_info)
300

301
    def _data(self, data):
302
        if self.listener.on_data(data) is False:
303
            self.running = False
304

Josh Roesslein's avatar
Josh Roesslein committed
305
    def _read_loop(self, resp):
306
307
308
309
310
311
312
313
        charset = resp.headers.get('content-type', default='')
        enc_search = re.search('charset=(?P<enc>\S*)', charset)
        if enc_search is not None:
            encoding = enc_search.group('enc')
        else:
            encoding = 'utf-8'

        buf = ReadBuffer(resp.raw, self.chunk_size, encoding=encoding)
314

Michael Brooks's avatar
Michael Brooks committed
315
        while self.running and not resp.raw.closed:
316
            length = 0
Michael Brooks's avatar
Michael Brooks committed
317
            while not resp.raw.closed:
318
319
                line = buf.read_line().strip()
                if not line:
Paul van der Linden's avatar
Paul van der Linden committed
320
                    self.listener.keep_alive()  # keep-alive new lines are expected
321
322
323
324
325
                elif line.isdigit():
                    length = int(line)
                    break
                else:
                    raise TweepError('Expecting length, unexpected value found')
326

327
            next_status_obj = buf.read_len(length)
328
            if self.running and next_status_obj:
329
                self._data(next_status_obj)
Josh Roesslein's avatar
Josh Roesslein committed
330

Mark Smith's avatar
Mark Smith committed
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
            # # Note: keep-alive newlines might be inserted before each length value.
            # # read until we get a digit...
            # c = b'\n'
            # for c in resp.iter_content(decode_unicode=True):
            #     if c == b'\n':
            #         continue
            #     break
            #
            # delimited_string = c
            #
            # # read rest of delimiter length..
            # d = b''
            # for d in resp.iter_content(decode_unicode=True):
            #     if d != b'\n':
            #         delimited_string += d
            #         continue
            #     break
            #
            # # read the next twitter status object
            # if delimited_string.decode('utf-8').strip().isdigit():
            #     status_id = int(delimited_string)
            #     next_status_obj = resp.raw.read(status_id)
            #     if self.running:
            #         self._data(next_status_obj.decode('utf-8'))


Michael Brooks's avatar
Michael Brooks committed
357
        if resp.raw.closed:
358
            self.on_closed(resp)
Josh Roesslein's avatar
Josh Roesslein committed
359

360
361
362
    def _start(self, async):
        self.running = True
        if async:
Constantine's avatar
Constantine committed
363
364
            self._thread = Thread(target=self._run)
            self._thread.start()
365
366
        else:
            self._run()
Josh Roesslein's avatar
Josh Roesslein committed
367

368
369
370
371
    def on_closed(self, resp):
        """ Called when the response has been closed by Twitter """
        pass

Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
372
373
374
375
376
377
378
379
    def userstream(self,
                   stall_warnings=False,
                   _with=None,
                   replies=None,
                   track=None,
                   locations=None,
                   async=False,
                   encoding='utf8'):
380
        self.session.params = {'delimited': 'length'}
AlanBell's avatar
AlanBell committed
381
382
        if self.running:
            raise TweepError('Stream object already connected!')
383
        self.url = '/%s/user.json' % STREAM_VERSION
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
384
        self.host = 'userstream.twitter.com'
Aaron Hill's avatar
Aaron Hill committed
385
        if stall_warnings:
386
            self.session.params['stall_warnings'] = stall_warnings
Aaron Hill's avatar
Aaron Hill committed
387
        if _with:
388
            self.session.params['with'] = _with
Aaron Hill's avatar
Aaron Hill committed
389
        if replies:
390
            self.session.params['replies'] = replies
Aaron Hill's avatar
Aaron Hill committed
391
        if locations and len(locations) > 0:
392
393
394
            if len(locations) % 4 != 0:
                raise TweepError("Wrong number of locations points, "
                                 "it has to be a multiple of 4")
395
            self.session.params['locations'] = ','.join(['%.2f' % l for l in locations])
Aaron Hill's avatar
Aaron Hill committed
396
        if track:
Mark Smith's avatar
Mark Smith committed
397
            self.session.params['track'] = u','.join(track).encode(encoding)
Aaron Hill's avatar
Aaron Hill committed
398

AlanBell's avatar
AlanBell committed
399
        self._start(async)
400
401

    def firehose(self, count=None, async=False):
Aaron Hill's avatar
Aaron Hill committed
402
        self.session.params = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
403
404
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
405
        self.url = '/%s/statuses/firehose.json' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
406
407
        if count:
            self.url += '&count=%s' % count
408
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
409

410
    def retweet(self, async=False):
Aaron Hill's avatar
Aaron Hill committed
411
        self.session.params = {'delimited': 'length'}
412
413
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
414
        self.url = '/%s/statuses/retweet.json' % STREAM_VERSION
415
        self._start(async)
416

417
    def sample(self, async=False, languages=None, stall_warnings=False):
418
        self.session.params = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
419
420
        if self.running:
            raise TweepError('Stream object already connected!')
421
422
423
        self.url = '/%s/statuses/sample.json' % STREAM_VERSION
        if languages:
            self.session.params['language'] = ','.join(map(str, languages))
424
425
        if stall_warnings:
            self.session.params['stall_warnings'] = 'true'
426
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
427

428
    def filter(self, follow=None, track=None, async=False, locations=None,
429
               stall_warnings=False, languages=None, encoding='utf8', filter_level=None):
430
        self.body = {}
Aaron Hill's avatar
Aaron Hill committed
431
        self.session.headers['Content-type'] = "application/x-www-form-urlencoded"
Josh Roesslein's avatar
Josh Roesslein committed
432
433
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
434
        self.url = '/%s/statuses/filter.json' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
435
        if follow:
436
            self.body['follow'] = u','.join(follow).encode(encoding)
Josh Roesslein's avatar
Josh Roesslein committed
437
        if track:
438
            self.body['track'] = u','.join(track).encode(encoding)
439
        if locations and len(locations) > 0:
440
441
442
            if len(locations) % 4 != 0:
                raise TweepError("Wrong number of locations points, "
                                 "it has to be a multiple of 4")
443
            self.body['locations'] = u','.join(['%.4f' % l for l in locations])
444
        if stall_warnings:
445
            self.body['stall_warnings'] = stall_warnings
446
        if languages:
447
            self.body['language'] = u','.join(map(str, languages))
448
        if filter_level:
john's avatar
john committed
449
            self.body['filter_level'] = unicode(filter_level, encoding)
elanting's avatar
elanting committed
450
        self.session.params = {'delimited': 'length'}
Aaron Hill's avatar
Aaron Hill committed
451
        self.host = 'stream.twitter.com'
452
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
453

Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
454
455
    def sitestream(self, follow, stall_warnings=False,
                   with_='user', replies=False, async=False):
456
        self.body = {}
Aaron Hill's avatar
Aaron Hill committed
457
458
459
        if self.running:
            raise TweepError('Stream object already connected!')
        self.url = '/%s/site.json' % STREAM_VERSION
460
461
        self.body['follow'] = u','.join(map(six.text_type, follow))
        self.body['delimited'] = 'length'
Aaron Hill's avatar
Aaron Hill committed
462
        if stall_warnings:
463
            self.body['stall_warnings'] = stall_warnings
Aaron Hill's avatar
Aaron Hill committed
464
        if with_:
465
            self.body['with'] = with_
Aaron Hill's avatar
Aaron Hill committed
466
        if replies:
467
            self.body['replies'] = replies
Aaron Hill's avatar
Aaron Hill committed
468
469
        self._start(async)

Josh Roesslein's avatar
Josh Roesslein committed
470
    def disconnect(self):
471
        if self.running is False:
Josh Roesslein's avatar
Josh Roesslein committed
472
473
            return
        self.running = False