streaming.py 16 KB
Newer Older
1
# Tweepy
2
3
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
4

5
6
# Appengine users: https://developers.google.com/appengine/docs/python/sockets/#making_httplib_use_sockets

Mark Smith's avatar
Mark Smith committed
7
8
from __future__ import absolute_import, print_function

9
import logging
Aaron Hill's avatar
Aaron Hill committed
10
11
import requests
from requests.exceptions import Timeout
12
from threading import Thread
13
from time import sleep
Mark Smith's avatar
Mark Smith committed
14
15
16

import six

17
import ssl
18

19
from tweepy.models import Status
Josh Roesslein's avatar
Josh Roesslein committed
20
21
from tweepy.api import API
from tweepy.error import TweepError
22

23
from tweepy.utils import import_simplejson, urlencode_noplus
24
json = import_simplejson()
25

Joshua Roesslein's avatar
Joshua Roesslein committed
26
STREAM_VERSION = '1.1'
27

Josh Roesslein's avatar
Josh Roesslein committed
28

29
30
class StreamListener(object):

31
32
    def __init__(self, api=None):
        self.api = api or API()
33

34
35
36
37
38
39
40
41
42
    def on_connect(self):
        """Called once connected to streaming server.

        This will be invoked once a successful response
        is received from the server. Allows the listener
        to perform some work prior to entering the read loop.
        """
        pass

43
    def on_data(self, raw_data):
44
45
46
47
48
        """Called when raw data is received from connection.

        Override this method if you wish to manually handle
        the stream data. Return False to stop stream and close connection.
        """
49
        data = json.loads(raw_data)
50
51

        if 'in_reply_to_status_id' in data:
52
            status = Status.parse(self.api, data)
53
54
55
            if self.on_status(status) is False:
                return False
        elif 'delete' in data:
56
            delete = data['delete']['status']
57
58
            if self.on_delete(delete['id'], delete['user_id']) is False:
                return False
Tetsuya Shinone's avatar
Tetsuya Shinone committed
59
60
61
62
63
64
65
66
        elif 'event' in data:
            status = Status.parse(self.api, data)
            if self.on_event(status) is False:
                return False
        elif 'direct_message' in data:
            status = Status.parse(self.api, data)
            if self.on_direct_message(status) is False:
                return False
67
68
69
        elif 'friends' in data:
            if self.on_friends(data['friends']) is False:
                return False
70
        elif 'limit' in data:
71
            if self.on_limit(data['limit']['track']) is False:
72
                return False
73
74
75
        elif 'disconnect' in data:
            if self.on_disconnect(data['disconnect']) is False:
                return False
76
77
78
        elif 'warning' in data:
            if self.on_warning(data['warning']) is False:
                return False
79
80
        else:
            logging.error("Unknown message type: " + str(raw_data))
81

Josh Roesslein's avatar
Josh Roesslein committed
82
83
84
85
    def on_status(self, status):
        """Called when a new status arrives"""
        return

86
87
88
89
    def on_exception(self, exception):
        """Called when an unhandled exception occurs."""
        return

Josh Roesslein's avatar
Josh Roesslein committed
90
91
92
    def on_delete(self, status_id, user_id):
        """Called when a delete notice arrives for a status"""
        return
93

Tetsuya Shinone's avatar
Tetsuya Shinone committed
94
95
96
97
98
99
100
101
    def on_event(self, status):
        """Called when a new event arrives"""
        return

    def on_direct_message(self, status):
        """Called when a new direct message arrives"""
        return

102
103
104
105
106
107
108
    def on_friends(self, friends):
        """Called when a friends list arrives.

        friends is a list that contains user_id
        """
        return

Josh Roesslein's avatar
Josh Roesslein committed
109
    def on_limit(self, track):
110
        """Called when a limitation notice arrives"""
Josh Roesslein's avatar
Josh Roesslein committed
111
        return
112

Josh Roesslein's avatar
Josh Roesslein committed
113
114
115
    def on_error(self, status_code):
        """Called when a non-200 status code is returned"""
        return False
116

Josh Roesslein's avatar
Josh Roesslein committed
117
118
119
    def on_timeout(self):
        """Called when stream connection times out"""
        return
120

121
122
123
    def on_disconnect(self, notice):
        """Called when twitter sends a disconnect notice

124
        Disconnect codes are listed here:
125
126
        https://dev.twitter.com/docs/streaming-apis/messages#Disconnect_messages_disconnect
        """
127
        return
128
129
130
131
    
    def on_warning(self, notice):
        """Called when a disconnection warning message arrives"""
        return
132

133

Timo Ewalds's avatar
Timo Ewalds committed
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
class ReadBuffer(object):
    """Buffer data from the response in a smarter way than httplib/requests can.

    Tweets are roughly in the 2-12kb range, averaging around 3kb.
    Requests/urllib3/httplib/socket all use socket.read, which blocks
    until enough data is returned. On some systems (eg google appengine), socket
    reads are quite slow. To combat this latency we can read big chunks,
    but the blocking part means we won't get results until enough tweets
    have arrived. That may not be a big deal for high throughput systems.
    For low throughput systems we don't want to sacrafice latency, so we
    use small chunks so it can read the length and the tweet in 2 read calls.
    """

    def __init__(self, stream, chunk_size):
        self._stream = stream
Katsunori SUZUI's avatar
Katsunori SUZUI committed
149
        self._buffer = u""
Timo Ewalds's avatar
Timo Ewalds committed
150
151
152
153
154
155
156
        self._chunk_size = chunk_size

    def read_len(self, length):
        while True:
            if len(self._buffer) >= length:
                return self._pop(length)
            read_len = max(self._chunk_size, length - len(self._buffer))
Katsunori SUZUI's avatar
Katsunori SUZUI committed
157
            self._buffer += self._stream.read(read_len).decode("ascii")
Timo Ewalds's avatar
Timo Ewalds committed
158
159
160
161
162
163
164
165
166

    def read_line(self, sep='\n'):
        start = 0
        while True:
            loc = self._buffer.find(sep, start)
            if loc >= 0:
                return self._pop(loc + len(sep))
            else:
                start = len(self._buffer)
Katsunori SUZUI's avatar
Katsunori SUZUI committed
167
            self._buffer += self._stream.read(self._chunk_size).decode("ascii")
Timo Ewalds's avatar
Timo Ewalds committed
168
169
170
171
172
173
174

    def _pop(self, length):
        r = self._buffer[:length]
        self._buffer = self._buffer[length:]
        return r


175
176
class Stream(object):

Josh Roesslein's avatar
Josh Roesslein committed
177
178
    host = 'stream.twitter.com'

179
    def __init__(self, auth, listener, **options):
180
        self.auth = auth
Josh Roesslein's avatar
Josh Roesslein committed
181
        self.listener = listener
182
        self.running = False
183
        self.timeout = options.get("timeout", 300.0)
184
        self.retry_count = options.get("retry_count")
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
185
186
        # values according to
        # https://dev.twitter.com/docs/streaming-apis/connecting#Reconnecting
187
188
189
190
191
        self.retry_time_start = options.get("retry_time", 5.0)
        self.retry_420_start = options.get("retry_420", 60.0)
        self.retry_time_cap = options.get("retry_time_cap", 320.0)
        self.snooze_time_step = options.get("snooze_time", 0.25)
        self.snooze_time_cap = options.get("snooze_time_cap", 16)
192
193
194
195
196
197
198

        # The default socket.read size. Default to less than half the size of
        # a tweet so that it reads tweets with the minimal latency of 2 reads
        # per tweet. Values higher than ~1kb will increase latency by waiting
        # for more data to arrive but may also increase throughput by doing
        # fewer socket read calls.
        self.chunk_size = options.get("chunk_size",  512)
199

200
201
        self.verify = options.get("verify", True)

Josh Roesslein's avatar
Josh Roesslein committed
202
        self.api = API()
203
204
        self.headers = options.get("headers") or {}
        self.new_session()
205
        self.body = None
206
        self.retry_time = self.retry_time_start
207
        self.snooze_time = self.snooze_time_step
Josh Roesslein's avatar
Josh Roesslein committed
208

209
210
211
212
213
    def new_session(self):
        self.session = requests.Session()
        self.session.headers = self.headers
        self.session.params = None

Josh Roesslein's avatar
Josh Roesslein committed
214
    def _run(self):
215
        # Authenticate
Joshua Roesslein's avatar
Joshua Roesslein committed
216
        url = "https://%s%s" % (self.host, self.url)
Josh Roesslein's avatar
Josh Roesslein committed
217

218
        # Connect and process the stream
Josh Roesslein's avatar
Josh Roesslein committed
219
        error_counter = 0
Aaron Hill's avatar
Aaron Hill committed
220
        resp = None
221
        exception = None
Josh Roesslein's avatar
Josh Roesslein committed
222
        while self.running:
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
223
224
225
226
            if self.retry_count is not None:
                if error_counter > self.retry_count:
                    # quit if error count greater than retry count
                    break
Josh Roesslein's avatar
Josh Roesslein committed
227
            try:
Aaron Hill's avatar
Aaron Hill committed
228
                auth = self.auth.apply_auth()
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
229
230
231
232
233
                resp = self.session.request('POST',
                                            url,
                                            data=self.body,
                                            timeout=self.timeout,
                                            stream=True,
234
235
                                            auth=auth,
                                            verify=self.verify)
Aaron Hill's avatar
Aaron Hill committed
236
237
                if resp.status_code != 200:
                    if self.listener.on_error(resp.status_code) is False:
Josh Roesslein's avatar
Josh Roesslein committed
238
239
                        break
                    error_counter += 1
skrew's avatar
skrew committed
240
                    if resp.status_code == 420:
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
241
242
                        self.retry_time = max(self.retry_420_start,
                                              self.retry_time)
Josh Roesslein's avatar
Josh Roesslein committed
243
                    sleep(self.retry_time)
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
244
245
                    self.retry_time = min(self.retry_time * 2,
                                          self.retry_time_cap)
Josh Roesslein's avatar
Josh Roesslein committed
246
247
                else:
                    error_counter = 0
248
                    self.retry_time = self.retry_time_start
249
                    self.snooze_time = self.snooze_time_step
250
                    self.listener.on_connect()
Josh Roesslein's avatar
Josh Roesslein committed
251
                    self._read_loop(resp)
252
            except (Timeout, ssl.SSLError) as exc:
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
253
254
                # This is still necessary, as a SSLError can actually be
                # thrown when using Requests
255
                # If it's not time out treat it like any other exception
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
256
257
258
259
260
                if isinstance(exc, ssl.SSLError):
                    if not (exc.args and 'timed out' in str(exc.args[0])):
                        exception = exc
                        break
                if self.listener.on_timeout() is False:
Josh Roesslein's avatar
Josh Roesslein committed
261
262
263
264
                    break
                if self.running is False:
                    break
                sleep(self.snooze_time)
265
266
                self.snooze_time = min(self.snooze_time + self.snooze_time_step,
                                       self.snooze_time_cap)
Mark Smith's avatar
Mark Smith committed
267
268
            except Exception as exc:
                exception = exc
Josh Roesslein's avatar
Josh Roesslein committed
269
270
271
272
273
                # any other exception is fatal, so kill loop
                break

        # cleanup
        self.running = False
Aaron Hill's avatar
Aaron Hill committed
274
275
        if resp:
            resp.close()
Josh Roesslein's avatar
Josh Roesslein committed
276

277
        self.new_session()
278

279
        if exception:
280
281
            # call a handler first so that the exception can be logged.
            self.listener.on_exception(exception)
282
            raise
283

284
    def _data(self, data):
285
        if self.listener.on_data(data) is False:
286
            self.running = False
287

Josh Roesslein's avatar
Josh Roesslein committed
288
    def _read_loop(self, resp):
Timo Ewalds's avatar
Timo Ewalds committed
289
        buf = ReadBuffer(resp.raw, self.chunk_size)
290

Aaron Hill's avatar
Aaron Hill committed
291
        while self.running:
292
293
294
295
296
297
298
299
300
301
            length = 0
            while True:
                line = buf.read_line().strip()
                if not line:
                    pass  # keep-alive new lines are expected
                elif line.isdigit():
                    length = int(line)
                    break
                else:
                    raise TweepError('Expecting length, unexpected value found')
302

303
304
305
            next_status_obj = buf.read_len(length)
            if self.running:
                self._data(next_status_obj)
Josh Roesslein's avatar
Josh Roesslein committed
306

Mark Smith's avatar
Mark Smith committed
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
            # # Note: keep-alive newlines might be inserted before each length value.
            # # read until we get a digit...
            # c = b'\n'
            # for c in resp.iter_content(decode_unicode=True):
            #     if c == b'\n':
            #         continue
            #     break
            #
            # delimited_string = c
            #
            # # read rest of delimiter length..
            # d = b''
            # for d in resp.iter_content(decode_unicode=True):
            #     if d != b'\n':
            #         delimited_string += d
            #         continue
            #     break
            #
            # # read the next twitter status object
            # if delimited_string.decode('utf-8').strip().isdigit():
            #     status_id = int(delimited_string)
            #     next_status_obj = resp.raw.read(status_id)
            #     if self.running:
            #         self._data(next_status_obj.decode('utf-8'))


Aaron Hill's avatar
Aaron Hill committed
333
        if resp.raw._fp.isclosed():
334
            self.on_closed(resp)
Josh Roesslein's avatar
Josh Roesslein committed
335

336
337
338
    def _start(self, async):
        self.running = True
        if async:
Constantine's avatar
Constantine committed
339
340
            self._thread = Thread(target=self._run)
            self._thread.start()
341
342
        else:
            self._run()
Josh Roesslein's avatar
Josh Roesslein committed
343

344
345
346
347
    def on_closed(self, resp):
        """ Called when the response has been closed by Twitter """
        pass

Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
348
349
350
351
352
353
354
355
    def userstream(self,
                   stall_warnings=False,
                   _with=None,
                   replies=None,
                   track=None,
                   locations=None,
                   async=False,
                   encoding='utf8'):
356
        self.session.params = {'delimited': 'length'}
AlanBell's avatar
AlanBell committed
357
358
        if self.running:
            raise TweepError('Stream object already connected!')
359
        self.url = '/%s/user.json' % STREAM_VERSION
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
360
        self.host = 'userstream.twitter.com'
Aaron Hill's avatar
Aaron Hill committed
361
        if stall_warnings:
362
            self.session.params['stall_warnings'] = stall_warnings
Aaron Hill's avatar
Aaron Hill committed
363
        if _with:
364
            self.session.params['with'] = _with
Aaron Hill's avatar
Aaron Hill committed
365
        if replies:
366
            self.session.params['replies'] = replies
Aaron Hill's avatar
Aaron Hill committed
367
        if locations and len(locations) > 0:
368
369
370
            if len(locations) % 4 != 0:
                raise TweepError("Wrong number of locations points, "
                                 "it has to be a multiple of 4")
371
            self.session.params['locations'] = ','.join(['%.2f' % l for l in locations])
Aaron Hill's avatar
Aaron Hill committed
372
        if track:
Mark Smith's avatar
Mark Smith committed
373
            self.session.params['track'] = u','.join(track).encode(encoding)
Aaron Hill's avatar
Aaron Hill committed
374

AlanBell's avatar
AlanBell committed
375
        self._start(async)
376
377

    def firehose(self, count=None, async=False):
Aaron Hill's avatar
Aaron Hill committed
378
        self.session.params = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
379
380
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
381
        self.url = '/%s/statuses/firehose.json' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
382
383
        if count:
            self.url += '&count=%s' % count
384
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
385

386
    def retweet(self, async=False):
Aaron Hill's avatar
Aaron Hill committed
387
        self.session.params = {'delimited': 'length'}
388
389
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
390
        self.url = '/%s/statuses/retweet.json' % STREAM_VERSION
391
        self._start(async)
392

393
394
    def sample(self, async=False, languages=None):
        self.session.params = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
395
396
        if self.running:
            raise TweepError('Stream object already connected!')
397
398
399
        self.url = '/%s/statuses/sample.json' % STREAM_VERSION
        if languages:
            self.session.params['language'] = ','.join(map(str, languages))
400
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
401

402
    def filter(self, follow=None, track=None, async=False, locations=None,
403
               stall_warnings=False, languages=None, encoding='utf8'):
Aaron Hill's avatar
Aaron Hill committed
404
        self.session.params = {}
Aaron Hill's avatar
Aaron Hill committed
405
        self.session.headers['Content-type'] = "application/x-www-form-urlencoded"
Josh Roesslein's avatar
Josh Roesslein committed
406
407
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
408
        self.url = '/%s/statuses/filter.json' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
409
        if follow:
Mark Smith's avatar
Mark Smith committed
410
            self.session.params['follow'] = u','.join(follow).encode(encoding)
Josh Roesslein's avatar
Josh Roesslein committed
411
        if track:
Mark Smith's avatar
Mark Smith committed
412
            self.session.params['track'] = u','.join(track).encode(encoding)
413
        if locations and len(locations) > 0:
414
415
416
            if len(locations) % 4 != 0:
                raise TweepError("Wrong number of locations points, "
                                 "it has to be a multiple of 4")
Mark Smith's avatar
Mark Smith committed
417
            self.session.params['locations'] = u','.join(['%.4f' % l for l in locations])
418
        if stall_warnings:
Aaron Hill's avatar
Aaron Hill committed
419
            self.session.params['stall_warnings'] = stall_warnings
420
        if languages:
Mark Smith's avatar
Mark Smith committed
421
            self.session.params['language'] = u','.join(map(str, languages))
Aaron Hill's avatar
Aaron Hill committed
422
        self.body = urlencode_noplus(self.session.params)
elanting's avatar
elanting committed
423
        self.session.params = {'delimited': 'length'}
Aaron Hill's avatar
Aaron Hill committed
424
        self.host = 'stream.twitter.com'
425
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
426

Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
427
428
    def sitestream(self, follow, stall_warnings=False,
                   with_='user', replies=False, async=False):
Aaron Hill's avatar
Aaron Hill committed
429
430
431
432
        self.parameters = {}
        if self.running:
            raise TweepError('Stream object already connected!')
        self.url = '/%s/site.json' % STREAM_VERSION
Mark Smith's avatar
Mark Smith committed
433
        self.parameters['follow'] = u','.join(map(six.text_type, follow))
Aaron Hill's avatar
Aaron Hill committed
434
435
436
437
438
439
440
441
442
443
        self.parameters['delimited'] = 'length'
        if stall_warnings:
            self.parameters['stall_warnings'] = stall_warnings
        if with_:
            self.parameters['with'] = with_
        if replies:
            self.parameters['replies'] = replies
        self.body = urlencode_noplus(self.parameters)
        self._start(async)

Josh Roesslein's avatar
Josh Roesslein committed
444
    def disconnect(self):
445
        if self.running is False:
Josh Roesslein's avatar
Josh Roesslein committed
446
447
            return
        self.running = False