streaming.py 14.6 KB
Newer Older
1
# Tweepy
2
3
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
4

5
6
# Appengine users: https://developers.google.com/appengine/docs/python/sockets/#making_httplib_use_sockets

7
import logging
Aaron Hill's avatar
Aaron Hill committed
8
9
import requests
from requests.exceptions import Timeout
10
from threading import Thread
11
from time import sleep
12
import ssl
13

14
from tweepy.models import Status
Josh Roesslein's avatar
Josh Roesslein committed
15
16
from tweepy.api import API
from tweepy.error import TweepError
17

18
from tweepy.utils import import_simplejson, urlencode_noplus
19
json = import_simplejson()
20

Joshua Roesslein's avatar
Joshua Roesslein committed
21
STREAM_VERSION = '1.1'
22

Josh Roesslein's avatar
Josh Roesslein committed
23

24
25
class StreamListener(object):

26
27
    def __init__(self, api=None):
        self.api = api or API()
28

29
30
31
32
33
34
35
36
37
    def on_connect(self):
        """Called once connected to streaming server.

        This will be invoked once a successful response
        is received from the server. Allows the listener
        to perform some work prior to entering the read loop.
        """
        pass

38
    def on_data(self, raw_data):
39
40
41
42
43
        """Called when raw data is received from connection.

        Override this method if you wish to manually handle
        the stream data. Return False to stop stream and close connection.
        """
44
        data = json.loads(raw_data)
45
46

        if 'in_reply_to_status_id' in data:
47
            status = Status.parse(self.api, data)
48
49
50
            if self.on_status(status) is False:
                return False
        elif 'delete' in data:
51
            delete = data['delete']['status']
52
53
            if self.on_delete(delete['id'], delete['user_id']) is False:
                return False
Tetsuya Shinone's avatar
Tetsuya Shinone committed
54
55
56
57
58
59
60
61
        elif 'event' in data:
            status = Status.parse(self.api, data)
            if self.on_event(status) is False:
                return False
        elif 'direct_message' in data:
            status = Status.parse(self.api, data)
            if self.on_direct_message(status) is False:
                return False
62
63
64
        elif 'friends' in data:
            if self.on_friends(data['friends']) is False:
                return False
65
        elif 'limit' in data:
66
            if self.on_limit(data['limit']['track']) is False:
67
                return False
68
69
70
71
72
        elif 'disconnect' in data:
            if self.on_disconnect(data['disconnect']) is False:
                return False
        else:
            logging.error("Unknown message type: " + str(raw_data))
73

Josh Roesslein's avatar
Josh Roesslein committed
74
75
76
77
    def on_status(self, status):
        """Called when a new status arrives"""
        return

78
79
80
81
    def on_exception(self, exception):
        """Called when an unhandled exception occurs."""
        return

Josh Roesslein's avatar
Josh Roesslein committed
82
83
84
    def on_delete(self, status_id, user_id):
        """Called when a delete notice arrives for a status"""
        return
85

Tetsuya Shinone's avatar
Tetsuya Shinone committed
86
87
88
89
90
91
92
93
    def on_event(self, status):
        """Called when a new event arrives"""
        return

    def on_direct_message(self, status):
        """Called when a new direct message arrives"""
        return

94
95
96
97
98
99
100
    def on_friends(self, friends):
        """Called when a friends list arrives.

        friends is a list that contains user_id
        """
        return

Josh Roesslein's avatar
Josh Roesslein committed
101
102
103
    def on_limit(self, track):
        """Called when a limitation notice arrvies"""
        return
104

Josh Roesslein's avatar
Josh Roesslein committed
105
106
107
    def on_error(self, status_code):
        """Called when a non-200 status code is returned"""
        return False
108

Josh Roesslein's avatar
Josh Roesslein committed
109
110
111
    def on_timeout(self):
        """Called when stream connection times out"""
        return
112

113
114
115
    def on_disconnect(self, notice):
        """Called when twitter sends a disconnect notice

116
        Disconnect codes are listed here:
117
118
        https://dev.twitter.com/docs/streaming-apis/messages#Disconnect_messages_disconnect
        """
119
        return
120

121

Timo Ewalds's avatar
Timo Ewalds committed
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
class ReadBuffer(object):
    """Buffer data from the response in a smarter way than httplib/requests can.

    Tweets are roughly in the 2-12kb range, averaging around 3kb.
    Requests/urllib3/httplib/socket all use socket.read, which blocks
    until enough data is returned. On some systems (eg google appengine), socket
    reads are quite slow. To combat this latency we can read big chunks,
    but the blocking part means we won't get results until enough tweets
    have arrived. That may not be a big deal for high throughput systems.
    For low throughput systems we don't want to sacrafice latency, so we
    use small chunks so it can read the length and the tweet in 2 read calls.
    """

    def __init__(self, stream, chunk_size):
        self._stream = stream
        self._buffer = ""
        self._chunk_size = chunk_size

    def read_len(self, length):
        while True:
            if len(self._buffer) >= length:
                return self._pop(length)
            read_len = max(self._chunk_size, length - len(self._buffer))
            self._buffer += self._stream.read(read_len)

    def read_line(self, sep='\n'):
        start = 0
        while True:
            loc = self._buffer.find(sep, start)
            if loc >= 0:
                return self._pop(loc + len(sep))
            else:
                start = len(self._buffer)
            self._buffer += self._stream.read(self._chunk_size)

    def _pop(self, length):
        r = self._buffer[:length]
        self._buffer = self._buffer[length:]
        return r


163
164
class Stream(object):

Josh Roesslein's avatar
Josh Roesslein committed
165
166
    host = 'stream.twitter.com'

167
    def __init__(self, auth, listener, **options):
168
        self.auth = auth
Josh Roesslein's avatar
Josh Roesslein committed
169
        self.listener = listener
170
        self.running = False
171
        self.timeout = options.get("timeout", 300.0)
172
        self.retry_count = options.get("retry_count")
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
173
174
        # values according to
        # https://dev.twitter.com/docs/streaming-apis/connecting#Reconnecting
175
176
177
178
179
        self.retry_time_start = options.get("retry_time", 5.0)
        self.retry_420_start = options.get("retry_420", 60.0)
        self.retry_time_cap = options.get("retry_time_cap", 320.0)
        self.snooze_time_step = options.get("snooze_time", 0.25)
        self.snooze_time_cap = options.get("snooze_time_cap", 16)
180
181
182
183
184
185
186

        # The default socket.read size. Default to less than half the size of
        # a tweet so that it reads tweets with the minimal latency of 2 reads
        # per tweet. Values higher than ~1kb will increase latency by waiting
        # for more data to arrive but may also increase throughput by doing
        # fewer socket read calls.
        self.chunk_size = options.get("chunk_size",  512)
187

Josh Roesslein's avatar
Josh Roesslein committed
188
        self.api = API()
Aaron Hill's avatar
Aaron Hill committed
189
190
191
        self.session = requests.Session()
        self.session.headers = options.get("headers") or {}
        self.session.params = None
192
        self.body = None
193
        self.retry_time = self.retry_time_start
194
        self.snooze_time = self.snooze_time_step
Josh Roesslein's avatar
Josh Roesslein committed
195
196

    def _run(self):
197
        # Authenticate
Joshua Roesslein's avatar
Joshua Roesslein committed
198
        url = "https://%s%s" % (self.host, self.url)
Josh Roesslein's avatar
Josh Roesslein committed
199

200
        # Connect and process the stream
Josh Roesslein's avatar
Josh Roesslein committed
201
        error_counter = 0
Aaron Hill's avatar
Aaron Hill committed
202
        resp = None
203
        exception = None
Josh Roesslein's avatar
Josh Roesslein committed
204
        while self.running:
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
205
206
207
208
            if self.retry_count is not None:
                if error_counter > self.retry_count:
                    # quit if error count greater than retry count
                    break
Josh Roesslein's avatar
Josh Roesslein committed
209
            try:
Aaron Hill's avatar
Aaron Hill committed
210
                auth = self.auth.apply_auth()
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
211
212
213
214
215
216
                resp = self.session.request('POST',
                                            url,
                                            data=self.body,
                                            timeout=self.timeout,
                                            stream=True,
                                            auth=auth)
Aaron Hill's avatar
Aaron Hill committed
217
218
                if resp.status_code != 200:
                    if self.listener.on_error(resp.status_code) is False:
Josh Roesslein's avatar
Josh Roesslein committed
219
220
                        break
                    error_counter += 1
skrew's avatar
skrew committed
221
                    if resp.status_code == 420:
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
222
223
                        self.retry_time = max(self.retry_420_start,
                                              self.retry_time)
Josh Roesslein's avatar
Josh Roesslein committed
224
                    sleep(self.retry_time)
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
225
226
                    self.retry_time = min(self.retry_time * 2,
                                          self.retry_time_cap)
Josh Roesslein's avatar
Josh Roesslein committed
227
228
                else:
                    error_counter = 0
229
                    self.retry_time = self.retry_time_start
230
                    self.snooze_time = self.snooze_time_step
231
                    self.listener.on_connect()
Josh Roesslein's avatar
Josh Roesslein committed
232
                    self._read_loop(resp)
233
            except (Timeout, ssl.SSLError, requests.compat.IncompleteRead) as exc:
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
234
235
                # This is still necessary, as a SSLError can actually be
                # thrown when using Requests
236
                # If it's not time out treat it like any other exception
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
237
238
239
240
241
                if isinstance(exc, ssl.SSLError):
                    if not (exc.args and 'timed out' in str(exc.args[0])):
                        exception = exc
                        break
                if self.listener.on_timeout() is False:
Josh Roesslein's avatar
Josh Roesslein committed
242
243
244
245
                    break
                if self.running is False:
                    break
                sleep(self.snooze_time)
246
247
                self.snooze_time = min(self.snooze_time + self.snooze_time_step,
                                       self.snooze_time_cap)
248
            except Exception as exception:
Josh Roesslein's avatar
Josh Roesslein committed
249
250
251
252
253
                # any other exception is fatal, so kill loop
                break

        # cleanup
        self.running = False
Aaron Hill's avatar
Aaron Hill committed
254
255
        if resp:
            resp.close()
Josh Roesslein's avatar
Josh Roesslein committed
256

257
258
        self.session = requests.Session()

259
        if exception:
260
261
            # call a handler first so that the exception can be logged.
            self.listener.on_exception(exception)
262
            raise
263

264
    def _data(self, data):
265
        if self.listener.on_data(data) is False:
266
            self.running = False
267

Josh Roesslein's avatar
Josh Roesslein committed
268
    def _read_loop(self, resp):
Timo Ewalds's avatar
Timo Ewalds committed
269
        buf = ReadBuffer(resp.raw, self.chunk_size)
270

Aaron Hill's avatar
Aaron Hill committed
271
        while self.running:
272
273
274
275
276
277
278
279
280
281
            length = 0
            while True:
                line = buf.read_line().strip()
                if not line:
                    pass  # keep-alive new lines are expected
                elif line.isdigit():
                    length = int(line)
                    break
                else:
                    raise TweepError('Expecting length, unexpected value found')
282

283
284
285
            next_status_obj = buf.read_len(length)
            if self.running:
                self._data(next_status_obj)
Josh Roesslein's avatar
Josh Roesslein committed
286

Aaron Hill's avatar
Aaron Hill committed
287
        if resp.raw._fp.isclosed():
288
            self.on_closed(resp)
Josh Roesslein's avatar
Josh Roesslein committed
289

290
291
292
    def _start(self, async):
        self.running = True
        if async:
Constantine's avatar
Constantine committed
293
294
            self._thread = Thread(target=self._run)
            self._thread.start()
295
296
        else:
            self._run()
Josh Roesslein's avatar
Josh Roesslein committed
297

298
299
300
301
    def on_closed(self, resp):
        """ Called when the response has been closed by Twitter """
        pass

Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
302
303
304
305
306
307
308
309
    def userstream(self,
                   stall_warnings=False,
                   _with=None,
                   replies=None,
                   track=None,
                   locations=None,
                   async=False,
                   encoding='utf8'):
310
        self.session.params = {'delimited': 'length'}
AlanBell's avatar
AlanBell committed
311
312
        if self.running:
            raise TweepError('Stream object already connected!')
313
        self.url = '/%s/user.json' % STREAM_VERSION
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
314
        self.host = 'userstream.twitter.com'
Aaron Hill's avatar
Aaron Hill committed
315
        if stall_warnings:
316
            self.session.params['stall_warnings'] = stall_warnings
Aaron Hill's avatar
Aaron Hill committed
317
        if _with:
318
            self.session.params['with'] = _with
Aaron Hill's avatar
Aaron Hill committed
319
        if replies:
320
            self.session.params['replies'] = replies
Aaron Hill's avatar
Aaron Hill committed
321
        if locations and len(locations) > 0:
322
323
324
            if len(locations) % 4 != 0:
                raise TweepError("Wrong number of locations points, "
                                 "it has to be a multiple of 4")
325
            self.session.params['locations'] = ','.join(['%.2f' % l for l in locations])
Aaron Hill's avatar
Aaron Hill committed
326
327
        if track:
            encoded_track = [s.encode(encoding) for s in track]
328
            self.session.params['track'] = ','.join(encoded_track)
Aaron Hill's avatar
Aaron Hill committed
329

AlanBell's avatar
AlanBell committed
330
        self._start(async)
331
332

    def firehose(self, count=None, async=False):
Aaron Hill's avatar
Aaron Hill committed
333
        self.session.params = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
334
335
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
336
        self.url = '/%s/statuses/firehose.json' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
337
338
        if count:
            self.url += '&count=%s' % count
339
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
340

341
    def retweet(self, async=False):
Aaron Hill's avatar
Aaron Hill committed
342
        self.session.params = {'delimited': 'length'}
343
344
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
345
        self.url = '/%s/statuses/retweet.json' % STREAM_VERSION
346
        self._start(async)
347

348
349
    def sample(self, async=False, languages=None):
        self.session.params = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
350
351
        if self.running:
            raise TweepError('Stream object already connected!')
352
353
354
        self.url = '/%s/statuses/sample.json' % STREAM_VERSION
        if languages:
            self.session.params['language'] = ','.join(map(str, languages))
355
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
356

357
    def filter(self, follow=None, track=None, async=False, locations=None,
358
               stall_warnings=False, languages=None, encoding='utf8'):
Aaron Hill's avatar
Aaron Hill committed
359
        self.session.params = {}
Aaron Hill's avatar
Aaron Hill committed
360
        self.session.headers['Content-type'] = "application/x-www-form-urlencoded"
Josh Roesslein's avatar
Josh Roesslein committed
361
362
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
363
        self.url = '/%s/statuses/filter.json' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
364
        if follow:
365
            encoded_follow = [s.encode(encoding) for s in follow]
Aaron Hill's avatar
Aaron Hill committed
366
            self.session.params['follow'] = ','.join(encoded_follow)
Josh Roesslein's avatar
Josh Roesslein committed
367
        if track:
Aaron Hill's avatar
Aaron Hill committed
368
369
            encoded_track = [s.encode(encoding) for s in track]
            self.session.params['track'] = ','.join(encoded_track)
370
        if locations and len(locations) > 0:
371
372
373
            if len(locations) % 4 != 0:
                raise TweepError("Wrong number of locations points, "
                                 "it has to be a multiple of 4")
Aaron Hill's avatar
Aaron Hill committed
374
            self.session.params['locations'] = ','.join(['%.4f' % l for l in locations])
375
        if stall_warnings:
Aaron Hill's avatar
Aaron Hill committed
376
            self.session.params['stall_warnings'] = stall_warnings
377
        if languages:
Aaron Hill's avatar
Aaron Hill committed
378
379
380
381
            self.session.params['language'] = ','.join(map(str, languages))
        self.body = urlencode_noplus(self.session.params)
        self.session.params['delimited'] = 'length'
        self.host = 'stream.twitter.com'
382
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
383

Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
384
385
    def sitestream(self, follow, stall_warnings=False,
                   with_='user', replies=False, async=False):
Aaron Hill's avatar
Aaron Hill committed
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
        self.parameters = {}
        if self.running:
            raise TweepError('Stream object already connected!')
        self.url = '/%s/site.json' % STREAM_VERSION
        self.parameters['follow'] = ','.join(map(str, follow))
        self.parameters['delimited'] = 'length'
        if stall_warnings:
            self.parameters['stall_warnings'] = stall_warnings
        if with_:
            self.parameters['with'] = with_
        if replies:
            self.parameters['replies'] = replies
        self.body = urlencode_noplus(self.parameters)
        self._start(async)

Josh Roesslein's avatar
Josh Roesslein committed
401
    def disconnect(self):
402
        if self.running is False:
Josh Roesslein's avatar
Josh Roesslein committed
403
404
            return
        self.running = False