streaming.py 14.8 KB
Newer Older
1
# Tweepy
2
3
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
4

5
6
# Appengine users: https://developers.google.com/appengine/docs/python/sockets/#making_httplib_use_sockets

7
import logging
Aaron Hill's avatar
Aaron Hill committed
8
9
import requests
from requests.exceptions import Timeout
10
from threading import Thread
11
from time import sleep
12
import ssl
13

14
from tweepy.models import Status
Josh Roesslein's avatar
Josh Roesslein committed
15
16
from tweepy.api import API
from tweepy.error import TweepError
17

18
from tweepy.utils import import_simplejson, urlencode_noplus
19
json = import_simplejson()
20

Joshua Roesslein's avatar
Joshua Roesslein committed
21
STREAM_VERSION = '1.1'
22

Josh Roesslein's avatar
Josh Roesslein committed
23

24
25
class StreamListener(object):

26
27
    def __init__(self, api=None):
        self.api = api or API()
28

29
30
31
32
33
34
35
36
37
    def on_connect(self):
        """Called once connected to streaming server.

        This will be invoked once a successful response
        is received from the server. Allows the listener
        to perform some work prior to entering the read loop.
        """
        pass

38
    def on_data(self, raw_data):
39
40
41
42
43
        """Called when raw data is received from connection.

        Override this method if you wish to manually handle
        the stream data. Return False to stop stream and close connection.
        """
44
        data = json.loads(raw_data)
45
46

        if 'in_reply_to_status_id' in data:
47
            status = Status.parse(self.api, data)
48
49
50
            if self.on_status(status) is False:
                return False
        elif 'delete' in data:
51
            delete = data['delete']['status']
52
53
            if self.on_delete(delete['id'], delete['user_id']) is False:
                return False
Tetsuya Shinone's avatar
Tetsuya Shinone committed
54
55
56
57
58
59
60
61
        elif 'event' in data:
            status = Status.parse(self.api, data)
            if self.on_event(status) is False:
                return False
        elif 'direct_message' in data:
            status = Status.parse(self.api, data)
            if self.on_direct_message(status) is False:
                return False
62
63
64
        elif 'friends' in data:
            if self.on_friends(data['friends']) is False:
                return False
65
        elif 'limit' in data:
66
            if self.on_limit(data['limit']['track']) is False:
67
                return False
68
69
70
        elif 'disconnect' in data:
            if self.on_disconnect(data['disconnect']) is False:
                return False
71
72
73
        elif 'warning' in data:
            if self.on_warning(data['warning']) is False:
                return False
74
75
        else:
            logging.error("Unknown message type: " + str(raw_data))
76

Josh Roesslein's avatar
Josh Roesslein committed
77
78
79
80
    def on_status(self, status):
        """Called when a new status arrives"""
        return

81
82
83
84
    def on_exception(self, exception):
        """Called when an unhandled exception occurs."""
        return

Josh Roesslein's avatar
Josh Roesslein committed
85
86
87
    def on_delete(self, status_id, user_id):
        """Called when a delete notice arrives for a status"""
        return
88

Tetsuya Shinone's avatar
Tetsuya Shinone committed
89
90
91
92
93
94
95
96
    def on_event(self, status):
        """Called when a new event arrives"""
        return

    def on_direct_message(self, status):
        """Called when a new direct message arrives"""
        return

97
98
99
100
101
102
103
    def on_friends(self, friends):
        """Called when a friends list arrives.

        friends is a list that contains user_id
        """
        return

Josh Roesslein's avatar
Josh Roesslein committed
104
    def on_limit(self, track):
105
        """Called when a limitation notice arrives"""
Josh Roesslein's avatar
Josh Roesslein committed
106
        return
107

Josh Roesslein's avatar
Josh Roesslein committed
108
109
110
    def on_error(self, status_code):
        """Called when a non-200 status code is returned"""
        return False
111

Josh Roesslein's avatar
Josh Roesslein committed
112
113
114
    def on_timeout(self):
        """Called when stream connection times out"""
        return
115

116
117
118
    def on_disconnect(self, notice):
        """Called when twitter sends a disconnect notice

119
        Disconnect codes are listed here:
120
121
        https://dev.twitter.com/docs/streaming-apis/messages#Disconnect_messages_disconnect
        """
122
        return
123
124
125
126
    
    def on_warning(self, notice):
        """Called when a disconnection warning message arrives"""
        return
127

128

Timo Ewalds's avatar
Timo Ewalds committed
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class ReadBuffer(object):
    """Buffer data from the response in a smarter way than httplib/requests can.

    Tweets are roughly in the 2-12kb range, averaging around 3kb.
    Requests/urllib3/httplib/socket all use socket.read, which blocks
    until enough data is returned. On some systems (eg google appengine), socket
    reads are quite slow. To combat this latency we can read big chunks,
    but the blocking part means we won't get results until enough tweets
    have arrived. That may not be a big deal for high throughput systems.
    For low throughput systems we don't want to sacrafice latency, so we
    use small chunks so it can read the length and the tweet in 2 read calls.
    """

    def __init__(self, stream, chunk_size):
        self._stream = stream
        self._buffer = ""
        self._chunk_size = chunk_size

    def read_len(self, length):
        while True:
            if len(self._buffer) >= length:
                return self._pop(length)
            read_len = max(self._chunk_size, length - len(self._buffer))
            self._buffer += self._stream.read(read_len)

    def read_line(self, sep='\n'):
        start = 0
        while True:
            loc = self._buffer.find(sep, start)
            if loc >= 0:
                return self._pop(loc + len(sep))
            else:
                start = len(self._buffer)
            self._buffer += self._stream.read(self._chunk_size)

    def _pop(self, length):
        r = self._buffer[:length]
        self._buffer = self._buffer[length:]
        return r


170
171
class Stream(object):

Josh Roesslein's avatar
Josh Roesslein committed
172
173
    host = 'stream.twitter.com'

174
    def __init__(self, auth, listener, **options):
175
        self.auth = auth
Josh Roesslein's avatar
Josh Roesslein committed
176
        self.listener = listener
177
        self.running = False
178
        self.timeout = options.get("timeout", 300.0)
179
        self.retry_count = options.get("retry_count")
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
180
181
        # values according to
        # https://dev.twitter.com/docs/streaming-apis/connecting#Reconnecting
182
183
184
185
186
        self.retry_time_start = options.get("retry_time", 5.0)
        self.retry_420_start = options.get("retry_420", 60.0)
        self.retry_time_cap = options.get("retry_time_cap", 320.0)
        self.snooze_time_step = options.get("snooze_time", 0.25)
        self.snooze_time_cap = options.get("snooze_time_cap", 16)
187
188
189
190
191
192
193

        # The default socket.read size. Default to less than half the size of
        # a tweet so that it reads tweets with the minimal latency of 2 reads
        # per tweet. Values higher than ~1kb will increase latency by waiting
        # for more data to arrive but may also increase throughput by doing
        # fewer socket read calls.
        self.chunk_size = options.get("chunk_size",  512)
194

Josh Roesslein's avatar
Josh Roesslein committed
195
        self.api = API()
Aaron Hill's avatar
Aaron Hill committed
196
197
198
        self.session = requests.Session()
        self.session.headers = options.get("headers") or {}
        self.session.params = None
199
        self.body = None
200
        self.retry_time = self.retry_time_start
201
        self.snooze_time = self.snooze_time_step
Josh Roesslein's avatar
Josh Roesslein committed
202
203

    def _run(self):
204
        # Authenticate
Joshua Roesslein's avatar
Joshua Roesslein committed
205
        url = "https://%s%s" % (self.host, self.url)
Josh Roesslein's avatar
Josh Roesslein committed
206

207
        # Connect and process the stream
Josh Roesslein's avatar
Josh Roesslein committed
208
        error_counter = 0
Aaron Hill's avatar
Aaron Hill committed
209
        resp = None
210
        exception = None
Josh Roesslein's avatar
Josh Roesslein committed
211
        while self.running:
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
212
213
214
215
            if self.retry_count is not None:
                if error_counter > self.retry_count:
                    # quit if error count greater than retry count
                    break
Josh Roesslein's avatar
Josh Roesslein committed
216
            try:
Aaron Hill's avatar
Aaron Hill committed
217
                auth = self.auth.apply_auth()
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
218
219
220
221
222
223
                resp = self.session.request('POST',
                                            url,
                                            data=self.body,
                                            timeout=self.timeout,
                                            stream=True,
                                            auth=auth)
Aaron Hill's avatar
Aaron Hill committed
224
225
                if resp.status_code != 200:
                    if self.listener.on_error(resp.status_code) is False:
Josh Roesslein's avatar
Josh Roesslein committed
226
227
                        break
                    error_counter += 1
skrew's avatar
skrew committed
228
                    if resp.status_code == 420:
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
229
230
                        self.retry_time = max(self.retry_420_start,
                                              self.retry_time)
Josh Roesslein's avatar
Josh Roesslein committed
231
                    sleep(self.retry_time)
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
232
233
                    self.retry_time = min(self.retry_time * 2,
                                          self.retry_time_cap)
Josh Roesslein's avatar
Josh Roesslein committed
234
235
                else:
                    error_counter = 0
236
                    self.retry_time = self.retry_time_start
237
                    self.snooze_time = self.snooze_time_step
238
                    self.listener.on_connect()
Josh Roesslein's avatar
Josh Roesslein committed
239
                    self._read_loop(resp)
240
            except (Timeout, ssl.SSLError) as exc:
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
241
242
                # This is still necessary, as a SSLError can actually be
                # thrown when using Requests
243
                # If it's not time out treat it like any other exception
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
244
245
246
247
248
                if isinstance(exc, ssl.SSLError):
                    if not (exc.args and 'timed out' in str(exc.args[0])):
                        exception = exc
                        break
                if self.listener.on_timeout() is False:
Josh Roesslein's avatar
Josh Roesslein committed
249
250
251
252
                    break
                if self.running is False:
                    break
                sleep(self.snooze_time)
253
254
                self.snooze_time = min(self.snooze_time + self.snooze_time_step,
                                       self.snooze_time_cap)
255
            except Exception as exception:
Josh Roesslein's avatar
Josh Roesslein committed
256
257
258
259
260
                # any other exception is fatal, so kill loop
                break

        # cleanup
        self.running = False
Aaron Hill's avatar
Aaron Hill committed
261
262
        if resp:
            resp.close()
Josh Roesslein's avatar
Josh Roesslein committed
263

264
265
        self.session = requests.Session()

266
        if exception:
267
268
            # call a handler first so that the exception can be logged.
            self.listener.on_exception(exception)
269
            raise
270

271
    def _data(self, data):
272
        if self.listener.on_data(data) is False:
273
            self.running = False
274

Josh Roesslein's avatar
Josh Roesslein committed
275
    def _read_loop(self, resp):
Timo Ewalds's avatar
Timo Ewalds committed
276
        buf = ReadBuffer(resp.raw, self.chunk_size)
277

Aaron Hill's avatar
Aaron Hill committed
278
        while self.running:
279
280
281
282
283
284
285
286
287
288
            length = 0
            while True:
                line = buf.read_line().strip()
                if not line:
                    pass  # keep-alive new lines are expected
                elif line.isdigit():
                    length = int(line)
                    break
                else:
                    raise TweepError('Expecting length, unexpected value found')
289

290
291
292
            next_status_obj = buf.read_len(length)
            if self.running:
                self._data(next_status_obj)
Josh Roesslein's avatar
Josh Roesslein committed
293

Aaron Hill's avatar
Aaron Hill committed
294
        if resp.raw._fp.isclosed():
295
            self.on_closed(resp)
Josh Roesslein's avatar
Josh Roesslein committed
296

297
298
299
    def _start(self, async):
        self.running = True
        if async:
Constantine's avatar
Constantine committed
300
301
            self._thread = Thread(target=self._run)
            self._thread.start()
302
303
        else:
            self._run()
Josh Roesslein's avatar
Josh Roesslein committed
304

305
306
307
308
    def on_closed(self, resp):
        """ Called when the response has been closed by Twitter """
        pass

Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
309
310
311
312
313
314
315
316
    def userstream(self,
                   stall_warnings=False,
                   _with=None,
                   replies=None,
                   track=None,
                   locations=None,
                   async=False,
                   encoding='utf8'):
317
        self.session.params = {'delimited': 'length'}
AlanBell's avatar
AlanBell committed
318
319
        if self.running:
            raise TweepError('Stream object already connected!')
320
        self.url = '/%s/user.json' % STREAM_VERSION
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
321
        self.host = 'userstream.twitter.com'
Aaron Hill's avatar
Aaron Hill committed
322
        if stall_warnings:
323
            self.session.params['stall_warnings'] = stall_warnings
Aaron Hill's avatar
Aaron Hill committed
324
        if _with:
325
            self.session.params['with'] = _with
Aaron Hill's avatar
Aaron Hill committed
326
        if replies:
327
            self.session.params['replies'] = replies
Aaron Hill's avatar
Aaron Hill committed
328
        if locations and len(locations) > 0:
329
330
331
            if len(locations) % 4 != 0:
                raise TweepError("Wrong number of locations points, "
                                 "it has to be a multiple of 4")
332
            self.session.params['locations'] = ','.join(['%.2f' % l for l in locations])
Aaron Hill's avatar
Aaron Hill committed
333
334
        if track:
            encoded_track = [s.encode(encoding) for s in track]
335
            self.session.params['track'] = ','.join(encoded_track)
Aaron Hill's avatar
Aaron Hill committed
336

AlanBell's avatar
AlanBell committed
337
        self._start(async)
338
339

    def firehose(self, count=None, async=False):
Aaron Hill's avatar
Aaron Hill committed
340
        self.session.params = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
341
342
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
343
        self.url = '/%s/statuses/firehose.json' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
344
345
        if count:
            self.url += '&count=%s' % count
346
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
347

348
    def retweet(self, async=False):
Aaron Hill's avatar
Aaron Hill committed
349
        self.session.params = {'delimited': 'length'}
350
351
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
352
        self.url = '/%s/statuses/retweet.json' % STREAM_VERSION
353
        self._start(async)
354

355
356
    def sample(self, async=False, languages=None):
        self.session.params = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
357
358
        if self.running:
            raise TweepError('Stream object already connected!')
359
360
361
        self.url = '/%s/statuses/sample.json' % STREAM_VERSION
        if languages:
            self.session.params['language'] = ','.join(map(str, languages))
362
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
363

364
    def filter(self, follow=None, track=None, async=False, locations=None,
365
               stall_warnings=False, languages=None, encoding='utf8'):
Aaron Hill's avatar
Aaron Hill committed
366
        self.session.params = {}
Aaron Hill's avatar
Aaron Hill committed
367
        self.session.headers['Content-type'] = "application/x-www-form-urlencoded"
Josh Roesslein's avatar
Josh Roesslein committed
368
369
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
370
        self.url = '/%s/statuses/filter.json' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
371
        if follow:
372
            encoded_follow = [s.encode(encoding) for s in follow]
Aaron Hill's avatar
Aaron Hill committed
373
            self.session.params['follow'] = ','.join(encoded_follow)
Josh Roesslein's avatar
Josh Roesslein committed
374
        if track:
Aaron Hill's avatar
Aaron Hill committed
375
376
            encoded_track = [s.encode(encoding) for s in track]
            self.session.params['track'] = ','.join(encoded_track)
377
        if locations and len(locations) > 0:
378
379
380
            if len(locations) % 4 != 0:
                raise TweepError("Wrong number of locations points, "
                                 "it has to be a multiple of 4")
Aaron Hill's avatar
Aaron Hill committed
381
            self.session.params['locations'] = ','.join(['%.4f' % l for l in locations])
382
        if stall_warnings:
Aaron Hill's avatar
Aaron Hill committed
383
            self.session.params['stall_warnings'] = stall_warnings
384
        if languages:
Aaron Hill's avatar
Aaron Hill committed
385
386
387
388
            self.session.params['language'] = ','.join(map(str, languages))
        self.body = urlencode_noplus(self.session.params)
        self.session.params['delimited'] = 'length'
        self.host = 'stream.twitter.com'
389
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
390

Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
391
392
    def sitestream(self, follow, stall_warnings=False,
                   with_='user', replies=False, async=False):
Aaron Hill's avatar
Aaron Hill committed
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
        self.parameters = {}
        if self.running:
            raise TweepError('Stream object already connected!')
        self.url = '/%s/site.json' % STREAM_VERSION
        self.parameters['follow'] = ','.join(map(str, follow))
        self.parameters['delimited'] = 'length'
        if stall_warnings:
            self.parameters['stall_warnings'] = stall_warnings
        if with_:
            self.parameters['with'] = with_
        if replies:
            self.parameters['replies'] = replies
        self.body = urlencode_noplus(self.parameters)
        self._start(async)

Josh Roesslein's avatar
Josh Roesslein committed
408
    def disconnect(self):
409
        if self.running is False:
Josh Roesslein's avatar
Josh Roesslein committed
410
411
            return
        self.running = False