streaming.py 12.9 KB
Newer Older
1
# Tweepy
2
3
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
4

5
import logging
Aaron Hill's avatar
Aaron Hill committed
6
7
import requests
from requests.exceptions import Timeout
8
from threading import Thread
9
from time import sleep
Aaron Hill's avatar
Aaron Hill committed
10
from HTMLParser import HTMLParser
11
import ssl
12

13
from tweepy.models import Status
Josh Roesslein's avatar
Josh Roesslein committed
14
15
from tweepy.api import API
from tweepy.error import TweepError
16

17
from tweepy.utils import import_simplejson, urlencode_noplus
18
json = import_simplejson()
19

Joshua Roesslein's avatar
Joshua Roesslein committed
20
STREAM_VERSION = '1.1'
21

Josh Roesslein's avatar
Josh Roesslein committed
22

23
24
class StreamListener(object):

25
26
    def __init__(self, api=None):
        self.api = api or API()
27

28
29
30
31
32
33
34
35
36
    def on_connect(self):
        """Called once connected to streaming server.

        This will be invoked once a successful response
        is received from the server. Allows the listener
        to perform some work prior to entering the read loop.
        """
        pass

37
    def on_data(self, raw_data):
38
39
40
41
42
        """Called when raw data is received from connection.

        Override this method if you wish to manually handle
        the stream data. Return False to stop stream and close connection.
        """
Aaron Hill's avatar
Aaron Hill committed
43
        data = json.loads(HTMLParser().unescape(raw_data))
44
45

        if 'in_reply_to_status_id' in data:
46
            status = Status.parse(self.api, data)
47
48
49
            if self.on_status(status) is False:
                return False
        elif 'delete' in data:
50
            delete = data['delete']['status']
51
52
            if self.on_delete(delete['id'], delete['user_id']) is False:
                return False
Tetsuya Shinone's avatar
Tetsuya Shinone committed
53
54
55
56
57
58
59
60
        elif 'event' in data:
            status = Status.parse(self.api, data)
            if self.on_event(status) is False:
                return False
        elif 'direct_message' in data:
            status = Status.parse(self.api, data)
            if self.on_direct_message(status) is False:
                return False
61
62
63
        elif 'friends' in data:
            if self.on_friends(data['friends']) is False:
                return False
64
        elif 'limit' in data:
65
            if self.on_limit(data['limit']['track']) is False:
66
                return False
67
68
69
70
71
        elif 'disconnect' in data:
            if self.on_disconnect(data['disconnect']) is False:
                return False
        else:
            logging.error("Unknown message type: " + str(raw_data))
72

Josh Roesslein's avatar
Josh Roesslein committed
73
74
75
76
    def on_status(self, status):
        """Called when a new status arrives"""
        return

77
78
79
80
    def on_exception(self, exception):
        """Called when an unhandled exception occurs."""
        return

Josh Roesslein's avatar
Josh Roesslein committed
81
82
83
    def on_delete(self, status_id, user_id):
        """Called when a delete notice arrives for a status"""
        return
84

Tetsuya Shinone's avatar
Tetsuya Shinone committed
85
86
87
88
89
90
91
92
    def on_event(self, status):
        """Called when a new event arrives"""
        return

    def on_direct_message(self, status):
        """Called when a new direct message arrives"""
        return

93
94
95
96
97
98
99
    def on_friends(self, friends):
        """Called when a friends list arrives.

        friends is a list that contains user_id
        """
        return

Josh Roesslein's avatar
Josh Roesslein committed
100
101
102
    def on_limit(self, track):
        """Called when a limitation notice arrvies"""
        return
103

Josh Roesslein's avatar
Josh Roesslein committed
104
105
106
    def on_error(self, status_code):
        """Called when a non-200 status code is returned"""
        return False
107

Josh Roesslein's avatar
Josh Roesslein committed
108
109
110
    def on_timeout(self):
        """Called when stream connection times out"""
        return
111

112
113
114
    def on_disconnect(self, notice):
        """Called when twitter sends a disconnect notice

115
        Disconnect codes are listed here:
116
117
        https://dev.twitter.com/docs/streaming-apis/messages#Disconnect_messages_disconnect
        """
118
        return
119

120

121
122
class Stream(object):

Josh Roesslein's avatar
Josh Roesslein committed
123
124
    host = 'stream.twitter.com'

125
    def __init__(self, auth, listener, **options):
126
        self.auth = auth
Josh Roesslein's avatar
Josh Roesslein committed
127
        self.listener = listener
128
        self.running = False
129
        self.timeout = options.get("timeout", 300.0)
130
        self.retry_count = options.get("retry_count")
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
131
132
        # values according to
        # https://dev.twitter.com/docs/streaming-apis/connecting#Reconnecting
133
134
135
136
137
        self.retry_time_start = options.get("retry_time", 5.0)
        self.retry_420_start = options.get("retry_420", 60.0)
        self.retry_time_cap = options.get("retry_time_cap", 320.0)
        self.snooze_time_step = options.get("snooze_time", 0.25)
        self.snooze_time_cap = options.get("snooze_time_cap", 16)
138
        self.buffer_size = options.get("buffer_size",  1500)
139

Josh Roesslein's avatar
Josh Roesslein committed
140
        self.api = API()
Aaron Hill's avatar
Aaron Hill committed
141
142
143
        self.session = requests.Session()
        self.session.headers = options.get("headers") or {}
        self.session.params = None
144
        self.body = None
145
        self.retry_time = self.retry_time_start
146
        self.snooze_time = self.snooze_time_step
Josh Roesslein's avatar
Josh Roesslein committed
147
148

    def _run(self):
149
        # Authenticate
Joshua Roesslein's avatar
Joshua Roesslein committed
150
        url = "https://%s%s" % (self.host, self.url)
Josh Roesslein's avatar
Josh Roesslein committed
151

152
        # Connect and process the stream
Josh Roesslein's avatar
Josh Roesslein committed
153
        error_counter = 0
Aaron Hill's avatar
Aaron Hill committed
154
        resp = None
155
        exception = None
Josh Roesslein's avatar
Josh Roesslein committed
156
        while self.running:
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
157
158
159
160
            if self.retry_count is not None:
                if error_counter > self.retry_count:
                    # quit if error count greater than retry count
                    break
Josh Roesslein's avatar
Josh Roesslein committed
161
            try:
Aaron Hill's avatar
Aaron Hill committed
162
                auth = self.auth.apply_auth()
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
163
164
165
166
167
168
                resp = self.session.request('POST',
                                            url,
                                            data=self.body,
                                            timeout=self.timeout,
                                            stream=True,
                                            auth=auth)
Aaron Hill's avatar
Aaron Hill committed
169
170
                if resp.status_code != 200:
                    if self.listener.on_error(resp.status_code) is False:
Josh Roesslein's avatar
Josh Roesslein committed
171
172
                        break
                    error_counter += 1
skrew's avatar
skrew committed
173
                    if resp.status_code == 420:
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
174
175
                        self.retry_time = max(self.retry_420_start,
                                              self.retry_time)
Josh Roesslein's avatar
Josh Roesslein committed
176
                    sleep(self.retry_time)
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
177
178
                    self.retry_time = min(self.retry_time * 2,
                                          self.retry_time_cap)
Josh Roesslein's avatar
Josh Roesslein committed
179
180
                else:
                    error_counter = 0
181
                    self.retry_time = self.retry_time_start
182
                    self.snooze_time = self.snooze_time_step
183
                    self.listener.on_connect()
Josh Roesslein's avatar
Josh Roesslein committed
184
                    self._read_loop(resp)
185
            except (Timeout, ssl.SSLError) as exc:
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
186
187
                # This is still necessary, as a SSLError can actually be
                # thrown when using Requests
188
                # If it's not time out treat it like any other exception
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
189
190
191
192
193
                if isinstance(exc, ssl.SSLError):
                    if not (exc.args and 'timed out' in str(exc.args[0])):
                        exception = exc
                        break
                if self.listener.on_timeout() is False:
Josh Roesslein's avatar
Josh Roesslein committed
194
195
196
197
                    break
                if self.running is False:
                    break
                sleep(self.snooze_time)
198
199
                self.snooze_time = min(self.snooze_time + self.snooze_time_step,
                                       self.snooze_time_cap)
200
            except Exception as exception:
Josh Roesslein's avatar
Josh Roesslein committed
201
202
203
204
205
                # any other exception is fatal, so kill loop
                break

        # cleanup
        self.running = False
Aaron Hill's avatar
Aaron Hill committed
206
207
        if resp:
            resp.close()
Josh Roesslein's avatar
Josh Roesslein committed
208

209
210
        self.session = requests.Session()

211
        if exception:
212
213
            # call a handler first so that the exception can be logged.
            self.listener.on_exception(exception)
214
            raise
215

216
    def _data(self, data):
217
        if self.listener.on_data(data) is False:
218
            self.running = False
219

Josh Roesslein's avatar
Josh Roesslein committed
220
    def _read_loop(self, resp):
221

Aaron Hill's avatar
Aaron Hill committed
222
        while self.running:
223

Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
224
225
            # Note: keep-alive newlines might be inserted
            # before each length value.
226
            # read until we get a digit...
227
            c = '\n'
Aaron Hill's avatar
Aaron Hill committed
228
229
230
231
232
            for c in resp.iter_content():
                if c == '\n':
                    continue
                break

233
234
235
            delimited_string = c

            # read rest of delimiter length..
Steve Jones's avatar
Steve Jones committed
236
            d = ''
Aaron Hill's avatar
Aaron Hill committed
237
238
239
240
241
            for d in resp.iter_content():
                if d != '\n':
                    delimited_string += d
                    continue
                break
242
243

            # read the next twitter status object
244
            if delimited_string.strip().isdigit():
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
245
                next_status_obj = resp.raw.read(int(delimited_string))
246
247
                if self.running:
                    self._data(next_status_obj)
Josh Roesslein's avatar
Josh Roesslein committed
248

Aaron Hill's avatar
Aaron Hill committed
249
        if resp.raw._fp.isclosed():
250
            self.on_closed(resp)
Josh Roesslein's avatar
Josh Roesslein committed
251

252
253
254
    def _start(self, async):
        self.running = True
        if async:
Constantine's avatar
Constantine committed
255
256
            self._thread = Thread(target=self._run)
            self._thread.start()
257
258
        else:
            self._run()
Josh Roesslein's avatar
Josh Roesslein committed
259

260
261
262
263
    def on_closed(self, resp):
        """ Called when the response has been closed by Twitter """
        pass

Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
264
265
266
267
268
269
270
271
    def userstream(self,
                   stall_warnings=False,
                   _with=None,
                   replies=None,
                   track=None,
                   locations=None,
                   async=False,
                   encoding='utf8'):
272
        self.session.params = {'delimited': 'length'}
AlanBell's avatar
AlanBell committed
273
274
        if self.running:
            raise TweepError('Stream object already connected!')
275
        self.url = '/%s/user.json' % STREAM_VERSION
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
276
        self.host = 'userstream.twitter.com'
Aaron Hill's avatar
Aaron Hill committed
277
        if stall_warnings:
278
            self.session.params['stall_warnings'] = stall_warnings
Aaron Hill's avatar
Aaron Hill committed
279
        if _with:
280
            self.session.params['with'] = _with
Aaron Hill's avatar
Aaron Hill committed
281
        if replies:
282
            self.session.params['replies'] = replies
Aaron Hill's avatar
Aaron Hill committed
283
        if locations and len(locations) > 0:
284
285
286
            if len(locations) % 4 != 0:
                raise TweepError("Wrong number of locations points, "
                                 "it has to be a multiple of 4")
287
            self.session.params['locations'] = ','.join(['%.2f' % l for l in locations])
Aaron Hill's avatar
Aaron Hill committed
288
289
        if track:
            encoded_track = [s.encode(encoding) for s in track]
290
            self.session.params['track'] = ','.join(encoded_track)
Aaron Hill's avatar
Aaron Hill committed
291

AlanBell's avatar
AlanBell committed
292
        self._start(async)
293
294

    def firehose(self, count=None, async=False):
Aaron Hill's avatar
Aaron Hill committed
295
        self.session.params = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
296
297
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
298
        self.url = '/%s/statuses/firehose.json' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
299
300
        if count:
            self.url += '&count=%s' % count
301
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
302

303
    def retweet(self, async=False):
Aaron Hill's avatar
Aaron Hill committed
304
        self.session.params = {'delimited': 'length'}
305
306
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
307
        self.url = '/%s/statuses/retweet.json' % STREAM_VERSION
308
        self._start(async)
309

310
    def sample(self, async=False, language=None):
Josh Roesslein's avatar
Josh Roesslein committed
311
312
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
313
        self.url = '/%s/statuses/sample.json?delimited=length' % STREAM_VERSION
314
315
316
        if language:
            self.url += '&language=%s' % language
            self.parameters['language'] = language
317
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
318

319
    def filter(self, follow=None, track=None, async=False, locations=None,
320
               stall_warnings=False, languages=None, encoding='utf8'):
Aaron Hill's avatar
Aaron Hill committed
321
        self.session.params = {}
Aaron Hill's avatar
Aaron Hill committed
322
        self.session.headers['Content-type'] = "application/x-www-form-urlencoded"
Josh Roesslein's avatar
Josh Roesslein committed
323
324
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
325
        self.url = '/%s/statuses/filter.json' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
326
        if follow:
327
            encoded_follow = [s.encode(encoding) for s in follow]
Aaron Hill's avatar
Aaron Hill committed
328
            self.session.params['follow'] = ','.join(encoded_follow)
Josh Roesslein's avatar
Josh Roesslein committed
329
        if track:
Aaron Hill's avatar
Aaron Hill committed
330
331
            encoded_track = [s.encode(encoding) for s in track]
            self.session.params['track'] = ','.join(encoded_track)
332
        if locations and len(locations) > 0:
333
334
335
            if len(locations) % 4 != 0:
                raise TweepError("Wrong number of locations points, "
                                 "it has to be a multiple of 4")
Aaron Hill's avatar
Aaron Hill committed
336
            self.session.params['locations'] = ','.join(['%.4f' % l for l in locations])
337
        if stall_warnings:
Aaron Hill's avatar
Aaron Hill committed
338
            self.session.params['stall_warnings'] = stall_warnings
339
        if languages:
Aaron Hill's avatar
Aaron Hill committed
340
341
342
343
            self.session.params['language'] = ','.join(map(str, languages))
        self.body = urlencode_noplus(self.session.params)
        self.session.params['delimited'] = 'length'
        self.host = 'stream.twitter.com'
344
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
345

Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
346
347
    def sitestream(self, follow, stall_warnings=False,
                   with_='user', replies=False, async=False):
Aaron Hill's avatar
Aaron Hill committed
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
        self.parameters = {}
        if self.running:
            raise TweepError('Stream object already connected!')
        self.url = '/%s/site.json' % STREAM_VERSION
        self.parameters['follow'] = ','.join(map(str, follow))
        self.parameters['delimited'] = 'length'
        if stall_warnings:
            self.parameters['stall_warnings'] = stall_warnings
        if with_:
            self.parameters['with'] = with_
        if replies:
            self.parameters['replies'] = replies
        self.body = urlencode_noplus(self.parameters)
        self._start(async)

Josh Roesslein's avatar
Josh Roesslein committed
363
    def disconnect(self):
364
        if self.running is False:
Josh Roesslein's avatar
Josh Roesslein committed
365
366
            return
        self.running = False