streaming.py 15.9 KB
Newer Older
1
# Tweepy
2
3
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
4

5
6
# Appengine users: https://developers.google.com/appengine/docs/python/sockets/#making_httplib_use_sockets

Mark Smith's avatar
Mark Smith committed
7
8
from __future__ import absolute_import, print_function

9
import logging
Aaron Hill's avatar
Aaron Hill committed
10
11
import requests
from requests.exceptions import Timeout
12
from threading import Thread
13
from time import sleep
Mark Smith's avatar
Mark Smith committed
14
15
16

import six

17
import ssl
18

19
from tweepy.models import Status
Josh Roesslein's avatar
Josh Roesslein committed
20
21
from tweepy.api import API
from tweepy.error import TweepError
22

23
from tweepy.utils import import_simplejson
24
json = import_simplejson()
25

Joshua Roesslein's avatar
Joshua Roesslein committed
26
STREAM_VERSION = '1.1'
27

Josh Roesslein's avatar
Josh Roesslein committed
28

29
30
class StreamListener(object):

31
32
    def __init__(self, api=None):
        self.api = api or API()
33

34
35
36
37
38
39
40
41
42
    def on_connect(self):
        """Called once connected to streaming server.

        This will be invoked once a successful response
        is received from the server. Allows the listener
        to perform some work prior to entering the read loop.
        """
        pass

43
    def on_data(self, raw_data):
44
45
46
47
48
        """Called when raw data is received from connection.

        Override this method if you wish to manually handle
        the stream data. Return False to stop stream and close connection.
        """
49
        data = json.loads(raw_data)
50
51

        if 'in_reply_to_status_id' in data:
52
            status = Status.parse(self.api, data)
53
54
55
            if self.on_status(status) is False:
                return False
        elif 'delete' in data:
56
            delete = data['delete']['status']
57
58
            if self.on_delete(delete['id'], delete['user_id']) is False:
                return False
Tetsuya Shinone's avatar
Tetsuya Shinone committed
59
60
61
62
63
64
65
66
        elif 'event' in data:
            status = Status.parse(self.api, data)
            if self.on_event(status) is False:
                return False
        elif 'direct_message' in data:
            status = Status.parse(self.api, data)
            if self.on_direct_message(status) is False:
                return False
67
68
69
        elif 'friends' in data:
            if self.on_friends(data['friends']) is False:
                return False
70
        elif 'limit' in data:
71
            if self.on_limit(data['limit']['track']) is False:
72
                return False
73
74
75
        elif 'disconnect' in data:
            if self.on_disconnect(data['disconnect']) is False:
                return False
76
77
78
        elif 'warning' in data:
            if self.on_warning(data['warning']) is False:
                return False
79
80
        else:
            logging.error("Unknown message type: " + str(raw_data))
81

Paul van der Linden's avatar
Paul van der Linden committed
82
83
84
85
    def keep_alive(self):
        """Called when a keep-alive arrived"""
        return

Josh Roesslein's avatar
Josh Roesslein committed
86
87
88
89
    def on_status(self, status):
        """Called when a new status arrives"""
        return

90
91
92
93
    def on_exception(self, exception):
        """Called when an unhandled exception occurs."""
        return

Josh Roesslein's avatar
Josh Roesslein committed
94
95
96
    def on_delete(self, status_id, user_id):
        """Called when a delete notice arrives for a status"""
        return
97

Tetsuya Shinone's avatar
Tetsuya Shinone committed
98
99
100
101
102
103
104
105
    def on_event(self, status):
        """Called when a new event arrives"""
        return

    def on_direct_message(self, status):
        """Called when a new direct message arrives"""
        return

106
107
108
109
110
111
112
    def on_friends(self, friends):
        """Called when a friends list arrives.

        friends is a list that contains user_id
        """
        return

Josh Roesslein's avatar
Josh Roesslein committed
113
    def on_limit(self, track):
114
        """Called when a limitation notice arrives"""
Josh Roesslein's avatar
Josh Roesslein committed
115
        return
116

Josh Roesslein's avatar
Josh Roesslein committed
117
118
119
    def on_error(self, status_code):
        """Called when a non-200 status code is returned"""
        return False
120

Josh Roesslein's avatar
Josh Roesslein committed
121
122
123
    def on_timeout(self):
        """Called when stream connection times out"""
        return
124

125
126
127
    def on_disconnect(self, notice):
        """Called when twitter sends a disconnect notice

128
        Disconnect codes are listed here:
129
130
        https://dev.twitter.com/docs/streaming-apis/messages#Disconnect_messages_disconnect
        """
131
        return
132

133
134
135
    def on_warning(self, notice):
        """Called when a disconnection warning message arrives"""
        return
136

137

Timo Ewalds's avatar
Timo Ewalds committed
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class ReadBuffer(object):
    """Buffer data from the response in a smarter way than httplib/requests can.

    Tweets are roughly in the 2-12kb range, averaging around 3kb.
    Requests/urllib3/httplib/socket all use socket.read, which blocks
    until enough data is returned. On some systems (eg google appengine), socket
    reads are quite slow. To combat this latency we can read big chunks,
    but the blocking part means we won't get results until enough tweets
    have arrived. That may not be a big deal for high throughput systems.
    For low throughput systems we don't want to sacrafice latency, so we
    use small chunks so it can read the length and the tweet in 2 read calls.
    """

    def __init__(self, stream, chunk_size):
        self._stream = stream
153
        self._buffer = ''
Timo Ewalds's avatar
Timo Ewalds committed
154
155
156
        self._chunk_size = chunk_size

    def read_len(self, length):
Michael Brooks's avatar
Michael Brooks committed
157
        while not self._stream.closed:
Timo Ewalds's avatar
Timo Ewalds committed
158
159
160
            if len(self._buffer) >= length:
                return self._pop(length)
            read_len = max(self._chunk_size, length - len(self._buffer))
161
            self._buffer += self._stream.read(read_len)
Timo Ewalds's avatar
Timo Ewalds committed
162
163
164

    def read_line(self, sep='\n'):
        start = 0
Michael Brooks's avatar
Michael Brooks committed
165
        while not self._stream.closed:
Timo Ewalds's avatar
Timo Ewalds committed
166
167
168
169
170
            loc = self._buffer.find(sep, start)
            if loc >= 0:
                return self._pop(loc + len(sep))
            else:
                start = len(self._buffer)
171
            self._buffer += self._stream.read(self._chunk_size)
Timo Ewalds's avatar
Timo Ewalds committed
172
173
174
175
176
177
178

    def _pop(self, length):
        r = self._buffer[:length]
        self._buffer = self._buffer[length:]
        return r


179
180
class Stream(object):

Josh Roesslein's avatar
Josh Roesslein committed
181
182
    host = 'stream.twitter.com'

183
    def __init__(self, auth, listener, **options):
184
        self.auth = auth
Josh Roesslein's avatar
Josh Roesslein committed
185
        self.listener = listener
186
        self.running = False
187
        self.timeout = options.get("timeout", 300.0)
188
        self.retry_count = options.get("retry_count")
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
189
190
        # values according to
        # https://dev.twitter.com/docs/streaming-apis/connecting#Reconnecting
191
192
193
194
195
        self.retry_time_start = options.get("retry_time", 5.0)
        self.retry_420_start = options.get("retry_420", 60.0)
        self.retry_time_cap = options.get("retry_time_cap", 320.0)
        self.snooze_time_step = options.get("snooze_time", 0.25)
        self.snooze_time_cap = options.get("snooze_time_cap", 16)
196
197
198
199
200
201
202

        # The default socket.read size. Default to less than half the size of
        # a tweet so that it reads tweets with the minimal latency of 2 reads
        # per tweet. Values higher than ~1kb will increase latency by waiting
        # for more data to arrive but may also increase throughput by doing
        # fewer socket read calls.
        self.chunk_size = options.get("chunk_size",  512)
203

204
205
        self.verify = options.get("verify", True)

Josh Roesslein's avatar
Josh Roesslein committed
206
        self.api = API()
207
208
        self.headers = options.get("headers") or {}
        self.new_session()
209
        self.body = None
210
        self.retry_time = self.retry_time_start
211
        self.snooze_time = self.snooze_time_step
Josh Roesslein's avatar
Josh Roesslein committed
212

213
214
215
216
217
    def new_session(self):
        self.session = requests.Session()
        self.session.headers = self.headers
        self.session.params = None

Josh Roesslein's avatar
Josh Roesslein committed
218
    def _run(self):
219
        # Authenticate
Joshua Roesslein's avatar
Joshua Roesslein committed
220
        url = "https://%s%s" % (self.host, self.url)
Josh Roesslein's avatar
Josh Roesslein committed
221

222
        # Connect and process the stream
Josh Roesslein's avatar
Josh Roesslein committed
223
        error_counter = 0
Aaron Hill's avatar
Aaron Hill committed
224
        resp = None
225
        exception = None
Josh Roesslein's avatar
Josh Roesslein committed
226
        while self.running:
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
227
228
229
230
            if self.retry_count is not None:
                if error_counter > self.retry_count:
                    # quit if error count greater than retry count
                    break
Josh Roesslein's avatar
Josh Roesslein committed
231
            try:
Aaron Hill's avatar
Aaron Hill committed
232
                auth = self.auth.apply_auth()
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
233
234
235
236
237
                resp = self.session.request('POST',
                                            url,
                                            data=self.body,
                                            timeout=self.timeout,
                                            stream=True,
238
239
                                            auth=auth,
                                            verify=self.verify)
Aaron Hill's avatar
Aaron Hill committed
240
241
                if resp.status_code != 200:
                    if self.listener.on_error(resp.status_code) is False:
Josh Roesslein's avatar
Josh Roesslein committed
242
243
                        break
                    error_counter += 1
skrew's avatar
skrew committed
244
                    if resp.status_code == 420:
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
245
246
                        self.retry_time = max(self.retry_420_start,
                                              self.retry_time)
Josh Roesslein's avatar
Josh Roesslein committed
247
                    sleep(self.retry_time)
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
248
249
                    self.retry_time = min(self.retry_time * 2,
                                          self.retry_time_cap)
Josh Roesslein's avatar
Josh Roesslein committed
250
251
                else:
                    error_counter = 0
252
                    self.retry_time = self.retry_time_start
253
                    self.snooze_time = self.snooze_time_step
254
                    self.listener.on_connect()
Josh Roesslein's avatar
Josh Roesslein committed
255
                    self._read_loop(resp)
256
            except (Timeout, ssl.SSLError) as exc:
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
257
258
                # This is still necessary, as a SSLError can actually be
                # thrown when using Requests
259
                # If it's not time out treat it like any other exception
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
260
261
262
263
264
                if isinstance(exc, ssl.SSLError):
                    if not (exc.args and 'timed out' in str(exc.args[0])):
                        exception = exc
                        break
                if self.listener.on_timeout() is False:
Josh Roesslein's avatar
Josh Roesslein committed
265
266
267
268
                    break
                if self.running is False:
                    break
                sleep(self.snooze_time)
269
270
                self.snooze_time = min(self.snooze_time + self.snooze_time_step,
                                       self.snooze_time_cap)
Mark Smith's avatar
Mark Smith committed
271
272
            except Exception as exc:
                exception = exc
Josh Roesslein's avatar
Josh Roesslein committed
273
274
275
276
277
                # any other exception is fatal, so kill loop
                break

        # cleanup
        self.running = False
Aaron Hill's avatar
Aaron Hill committed
278
279
        if resp:
            resp.close()
Josh Roesslein's avatar
Josh Roesslein committed
280

281
        self.new_session()
282

283
        if exception:
284
285
            # call a handler first so that the exception can be logged.
            self.listener.on_exception(exception)
286
            raise exception
287

288
    def _data(self, data):
289
        if self.listener.on_data(data) is False:
290
            self.running = False
291

Josh Roesslein's avatar
Josh Roesslein committed
292
    def _read_loop(self, resp):
Timo Ewalds's avatar
Timo Ewalds committed
293
        buf = ReadBuffer(resp.raw, self.chunk_size)
294

Michael Brooks's avatar
Michael Brooks committed
295
        while self.running and not resp.raw.closed:
296
            length = 0
Michael Brooks's avatar
Michael Brooks committed
297
            while not resp.raw.closed:
298
299
                line = buf.read_line().strip()
                if not line:
Paul van der Linden's avatar
Paul van der Linden committed
300
                    self.listener.keep_alive()  # keep-alive new lines are expected
301
302
303
304
305
                elif line.isdigit():
                    length = int(line)
                    break
                else:
                    raise TweepError('Expecting length, unexpected value found')
306

307
308
309
            next_status_obj = buf.read_len(length)
            if self.running:
                self._data(next_status_obj)
Josh Roesslein's avatar
Josh Roesslein committed
310

Mark Smith's avatar
Mark Smith committed
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
            # # Note: keep-alive newlines might be inserted before each length value.
            # # read until we get a digit...
            # c = b'\n'
            # for c in resp.iter_content(decode_unicode=True):
            #     if c == b'\n':
            #         continue
            #     break
            #
            # delimited_string = c
            #
            # # read rest of delimiter length..
            # d = b''
            # for d in resp.iter_content(decode_unicode=True):
            #     if d != b'\n':
            #         delimited_string += d
            #         continue
            #     break
            #
            # # read the next twitter status object
            # if delimited_string.decode('utf-8').strip().isdigit():
            #     status_id = int(delimited_string)
            #     next_status_obj = resp.raw.read(status_id)
            #     if self.running:
            #         self._data(next_status_obj.decode('utf-8'))


Michael Brooks's avatar
Michael Brooks committed
337
        if resp.raw.closed:
338
            self.on_closed(resp)
Josh Roesslein's avatar
Josh Roesslein committed
339

340
341
342
    def _start(self, async):
        self.running = True
        if async:
Constantine's avatar
Constantine committed
343
344
            self._thread = Thread(target=self._run)
            self._thread.start()
345
346
        else:
            self._run()
Josh Roesslein's avatar
Josh Roesslein committed
347

348
349
350
351
    def on_closed(self, resp):
        """ Called when the response has been closed by Twitter """
        pass

Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
352
353
354
355
356
357
358
359
    def userstream(self,
                   stall_warnings=False,
                   _with=None,
                   replies=None,
                   track=None,
                   locations=None,
                   async=False,
                   encoding='utf8'):
360
        self.session.params = {'delimited': 'length'}
AlanBell's avatar
AlanBell committed
361
362
        if self.running:
            raise TweepError('Stream object already connected!')
363
        self.url = '/%s/user.json' % STREAM_VERSION
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
364
        self.host = 'userstream.twitter.com'
Aaron Hill's avatar
Aaron Hill committed
365
        if stall_warnings:
366
            self.session.params['stall_warnings'] = stall_warnings
Aaron Hill's avatar
Aaron Hill committed
367
        if _with:
368
            self.session.params['with'] = _with
Aaron Hill's avatar
Aaron Hill committed
369
        if replies:
370
            self.session.params['replies'] = replies
Aaron Hill's avatar
Aaron Hill committed
371
        if locations and len(locations) > 0:
372
373
374
            if len(locations) % 4 != 0:
                raise TweepError("Wrong number of locations points, "
                                 "it has to be a multiple of 4")
375
            self.session.params['locations'] = ','.join(['%.2f' % l for l in locations])
Aaron Hill's avatar
Aaron Hill committed
376
        if track:
Mark Smith's avatar
Mark Smith committed
377
            self.session.params['track'] = u','.join(track).encode(encoding)
Aaron Hill's avatar
Aaron Hill committed
378

AlanBell's avatar
AlanBell committed
379
        self._start(async)
380
381

    def firehose(self, count=None, async=False):
Aaron Hill's avatar
Aaron Hill committed
382
        self.session.params = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
383
384
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
385
        self.url = '/%s/statuses/firehose.json' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
386
387
        if count:
            self.url += '&count=%s' % count
388
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
389

390
    def retweet(self, async=False):
Aaron Hill's avatar
Aaron Hill committed
391
        self.session.params = {'delimited': 'length'}
392
393
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
394
        self.url = '/%s/statuses/retweet.json' % STREAM_VERSION
395
        self._start(async)
396

397
398
    def sample(self, async=False, languages=None):
        self.session.params = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
399
400
        if self.running:
            raise TweepError('Stream object already connected!')
401
402
403
        self.url = '/%s/statuses/sample.json' % STREAM_VERSION
        if languages:
            self.session.params['language'] = ','.join(map(str, languages))
404
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
405

406
    def filter(self, follow=None, track=None, async=False, locations=None,
407
               stall_warnings=False, languages=None, encoding='utf8'):
408
        self.body = {}
Aaron Hill's avatar
Aaron Hill committed
409
        self.session.headers['Content-type'] = "application/x-www-form-urlencoded"
Josh Roesslein's avatar
Josh Roesslein committed
410
411
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
412
        self.url = '/%s/statuses/filter.json' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
413
        if follow:
414
            self.body['follow'] = u','.join(follow).encode(encoding)
Josh Roesslein's avatar
Josh Roesslein committed
415
        if track:
416
            self.body['track'] = u','.join(track).encode(encoding)
417
        if locations and len(locations) > 0:
418
419
420
            if len(locations) % 4 != 0:
                raise TweepError("Wrong number of locations points, "
                                 "it has to be a multiple of 4")
421
            self.body['locations'] = u','.join(['%.4f' % l for l in locations])
422
        if stall_warnings:
423
            self.body['stall_warnings'] = stall_warnings
424
        if languages:
425
            self.body['language'] = u','.join(map(str, languages))
elanting's avatar
elanting committed
426
        self.session.params = {'delimited': 'length'}
Aaron Hill's avatar
Aaron Hill committed
427
        self.host = 'stream.twitter.com'
428
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
429

Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
430
431
    def sitestream(self, follow, stall_warnings=False,
                   with_='user', replies=False, async=False):
432
        self.body = {}
Aaron Hill's avatar
Aaron Hill committed
433
434
435
        if self.running:
            raise TweepError('Stream object already connected!')
        self.url = '/%s/site.json' % STREAM_VERSION
436
437
        self.body['follow'] = u','.join(map(six.text_type, follow))
        self.body['delimited'] = 'length'
Aaron Hill's avatar
Aaron Hill committed
438
        if stall_warnings:
439
            self.body['stall_warnings'] = stall_warnings
Aaron Hill's avatar
Aaron Hill committed
440
        if with_:
441
            self.body['with'] = with_
Aaron Hill's avatar
Aaron Hill committed
442
        if replies:
443
            self.body['replies'] = replies
Aaron Hill's avatar
Aaron Hill committed
444
445
        self._start(async)

Josh Roesslein's avatar
Josh Roesslein committed
446
    def disconnect(self):
447
        if self.running is False:
Josh Roesslein's avatar
Josh Roesslein committed
448
449
            return
        self.running = False