streaming.py 12.4 KB
Newer Older
1
# Tweepy
2
3
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
4

5
import logging
Aaron Hill's avatar
Aaron Hill committed
6
7
import requests
from requests.exceptions import Timeout
8
from threading import Thread
9
from time import sleep
Aaron Hill's avatar
Aaron Hill committed
10
from HTMLParser import HTMLParser
11
import ssl
12

13
from tweepy.models import Status
Josh Roesslein's avatar
Josh Roesslein committed
14
15
from tweepy.api import API
from tweepy.error import TweepError
16

17
from tweepy.utils import import_simplejson, urlencode_noplus
18
json = import_simplejson()
19

Joshua Roesslein's avatar
Joshua Roesslein committed
20
STREAM_VERSION = '1.1'
21

Josh Roesslein's avatar
Josh Roesslein committed
22

23
24
class StreamListener(object):

25
26
    def __init__(self, api=None):
        self.api = api or API()
27

28
29
30
31
32
33
34
35
36
    def on_connect(self):
        """Called once connected to streaming server.

        This will be invoked once a successful response
        is received from the server. Allows the listener
        to perform some work prior to entering the read loop.
        """
        pass

37
    def on_data(self, raw_data):
38
39
40
41
42
        """Called when raw data is received from connection.

        Override this method if you wish to manually handle
        the stream data. Return False to stop stream and close connection.
        """
Aaron Hill's avatar
Aaron Hill committed
43
        data = json.loads(HTMLParser().unescape(raw_data))
44
45

        if 'in_reply_to_status_id' in data:
46
            status = Status.parse(self.api, data)
47
48
49
            if self.on_status(status) is False:
                return False
        elif 'delete' in data:
50
            delete = data['delete']['status']
51
52
            if self.on_delete(delete['id'], delete['user_id']) is False:
                return False
Tetsuya Shinone's avatar
Tetsuya Shinone committed
53
54
55
56
57
58
59
60
        elif 'event' in data:
            status = Status.parse(self.api, data)
            if self.on_event(status) is False:
                return False
        elif 'direct_message' in data:
            status = Status.parse(self.api, data)
            if self.on_direct_message(status) is False:
                return False
61
62
63
        elif 'friends' in data:
            if self.on_friends(data['friends']) is False:
                return False
64
        elif 'limit' in data:
65
            if self.on_limit(data['limit']['track']) is False:
66
                return False
67
68
69
70
71
        elif 'disconnect' in data:
            if self.on_disconnect(data['disconnect']) is False:
                return False
        else:
            logging.error("Unknown message type: " + str(raw_data))
72

Josh Roesslein's avatar
Josh Roesslein committed
73
74
75
76
    def on_status(self, status):
        """Called when a new status arrives"""
        return

77
78
79
80
    def on_exception(self, exception):
        """Called when an unhandled exception occurs."""
        return

Josh Roesslein's avatar
Josh Roesslein committed
81
82
83
    def on_delete(self, status_id, user_id):
        """Called when a delete notice arrives for a status"""
        return
84

Tetsuya Shinone's avatar
Tetsuya Shinone committed
85
86
87
88
89
90
91
92
    def on_event(self, status):
        """Called when a new event arrives"""
        return

    def on_direct_message(self, status):
        """Called when a new direct message arrives"""
        return

93
94
95
96
97
98
99
    def on_friends(self, friends):
        """Called when a friends list arrives.

        friends is a list that contains user_id
        """
        return

Josh Roesslein's avatar
Josh Roesslein committed
100
101
102
    def on_limit(self, track):
        """Called when a limitation notice arrvies"""
        return
103

Josh Roesslein's avatar
Josh Roesslein committed
104
105
106
    def on_error(self, status_code):
        """Called when a non-200 status code is returned"""
        return False
107

Josh Roesslein's avatar
Josh Roesslein committed
108
109
110
    def on_timeout(self):
        """Called when stream connection times out"""
        return
111

112
113
114
    def on_disconnect(self, notice):
        """Called when twitter sends a disconnect notice

115
        Disconnect codes are listed here:
116
117
        https://dev.twitter.com/docs/streaming-apis/messages#Disconnect_messages_disconnect
        """
118
        return
119

120

121
122
class Stream(object):

Josh Roesslein's avatar
Josh Roesslein committed
123
124
    host = 'stream.twitter.com'

125
    def __init__(self, auth, listener, **options):
126
        self.auth = auth
Josh Roesslein's avatar
Josh Roesslein committed
127
        self.listener = listener
128
        self.running = False
129
        self.timeout = options.get("timeout", 300.0)
130
        self.retry_count = options.get("retry_count")
131
132
133
134
135
136
        # values according to https://dev.twitter.com/docs/streaming-apis/connecting#Reconnecting
        self.retry_time_start = options.get("retry_time", 5.0)
        self.retry_420_start = options.get("retry_420", 60.0)
        self.retry_time_cap = options.get("retry_time_cap", 320.0)
        self.snooze_time_step = options.get("snooze_time", 0.25)
        self.snooze_time_cap = options.get("snooze_time_cap", 16)
137
        self.buffer_size = options.get("buffer_size",  1500)
138

Josh Roesslein's avatar
Josh Roesslein committed
139
        self.api = API()
Aaron Hill's avatar
Aaron Hill committed
140
141
142
        self.session = requests.Session()
        self.session.headers = options.get("headers") or {}
        self.session.params = None
143
        self.body = None
144
        self.retry_time = self.retry_time_start
145
        self.snooze_time = self.snooze_time_step
Josh Roesslein's avatar
Josh Roesslein committed
146
147

    def _run(self):
148
        # Authenticate
Joshua Roesslein's avatar
Joshua Roesslein committed
149
        url = "https://%s%s" % (self.host, self.url)
Josh Roesslein's avatar
Josh Roesslein committed
150

151
        # Connect and process the stream
Josh Roesslein's avatar
Josh Roesslein committed
152
        error_counter = 0
Aaron Hill's avatar
Aaron Hill committed
153
        resp = None
154
        exception = None
Josh Roesslein's avatar
Josh Roesslein committed
155
        while self.running:
156
            if self.retry_count is not None and error_counter > self.retry_count:
Josh Roesslein's avatar
Josh Roesslein committed
157
158
159
                # quit if error count greater than retry count
                break
            try:
Aaron Hill's avatar
Aaron Hill committed
160
161
162
                auth = self.auth.apply_auth()
                resp = self.session.request('POST', url, data=self.body,
                        timeout=self.timeout, stream=True, auth=auth)
Aaron Hill's avatar
Aaron Hill committed
163
164
                if resp.status_code != 200:
                    if self.listener.on_error(resp.status_code) is False:
Josh Roesslein's avatar
Josh Roesslein committed
165
166
                        break
                    error_counter += 1
skrew's avatar
skrew committed
167
                    if resp.status_code == 420:
168
                        self.retry_time = max(self.retry_420_start, self.retry_time)
Josh Roesslein's avatar
Josh Roesslein committed
169
                    sleep(self.retry_time)
170
                    self.retry_time = min(self.retry_time * 2, self.retry_time_cap)
Josh Roesslein's avatar
Josh Roesslein committed
171
172
                else:
                    error_counter = 0
173
                    self.retry_time = self.retry_time_start
174
                    self.snooze_time = self.snooze_time_step
175
                    self.listener.on_connect()
Josh Roesslein's avatar
Josh Roesslein committed
176
                    self._read_loop(resp)
177
178
            except (Timeout, ssl.SSLError) as exc:
                # This is still necessary, as a SSLError can actually be thrown when using Requests
179
180
181
182
                # If it's not time out treat it like any other exception
                if isinstance(exc, ssl.SSLError) and not (exc.args and 'timed out' in str(exc.args[0])):
                    exception = exc
                    break
Josh Roesslein's avatar
Josh Roesslein committed
183
184
185
186
187
                if self.listener.on_timeout() == False:
                    break
                if self.running is False:
                    break
                sleep(self.snooze_time)
188
189
                self.snooze_time = min(self.snooze_time + self.snooze_time_step,
                                       self.snooze_time_cap)
190
            except Exception as exception:
Josh Roesslein's avatar
Josh Roesslein committed
191
192
193
194
195
                # any other exception is fatal, so kill loop
                break

        # cleanup
        self.running = False
Aaron Hill's avatar
Aaron Hill committed
196
197
        if resp:
            resp.close()
Josh Roesslein's avatar
Josh Roesslein committed
198

199
200
        self.session = requests.Session()

201
        if exception:
202
203
            # call a handler first so that the exception can be logged.
            self.listener.on_exception(exception)
204
            raise
205

206
    def _data(self, data):
207
        if self.listener.on_data(data) is False:
208
            self.running = False
209

Josh Roesslein's avatar
Josh Roesslein committed
210
    def _read_loop(self, resp):
211

Aaron Hill's avatar
Aaron Hill committed
212
        while self.running:
213
214
215

            # Note: keep-alive newlines might be inserted before each length value.
            # read until we get a digit...
216
            c = '\n'
Aaron Hill's avatar
Aaron Hill committed
217
218
219
220
221
            for c in resp.iter_content():
                if c == '\n':
                    continue
                break

222
223
224
            delimited_string = c

            # read rest of delimiter length..
Steve Jones's avatar
Steve Jones committed
225
            d = ''
Aaron Hill's avatar
Aaron Hill committed
226
227
228
229
230
            for d in resp.iter_content():
                if d != '\n':
                    delimited_string += d
                    continue
                break
231
232

            # read the next twitter status object
233
            if delimited_string.strip().isdigit():
Aaron Hill's avatar
Aaron Hill committed
234
                next_status_obj = resp.raw.read( int(delimited_string) )
235
236
                if self.running:
                    self._data(next_status_obj)
Josh Roesslein's avatar
Josh Roesslein committed
237

Aaron Hill's avatar
Aaron Hill committed
238
        if resp.raw._fp.isclosed():
239
            self.on_closed(resp)
Josh Roesslein's avatar
Josh Roesslein committed
240

241
242
243
    def _start(self, async):
        self.running = True
        if async:
Constantine's avatar
Constantine committed
244
245
            self._thread = Thread(target=self._run)
            self._thread.start()
246
247
        else:
            self._run()
Josh Roesslein's avatar
Josh Roesslein committed
248

249
250
251
252
    def on_closed(self, resp):
        """ Called when the response has been closed by Twitter """
        pass

Aaron Hill's avatar
Aaron Hill committed
253
254
    def userstream(self, stall_warnings=False, _with=None, replies=None,
            track=None, locations=None, async=False, encoding='utf8'):
255
        self.session.params = {'delimited': 'length'}
AlanBell's avatar
AlanBell committed
256
257
        if self.running:
            raise TweepError('Stream object already connected!')
258
        self.url = '/%s/user.json' % STREAM_VERSION
AlanBell's avatar
AlanBell committed
259
        self.host='userstream.twitter.com'
Aaron Hill's avatar
Aaron Hill committed
260
        if stall_warnings:
261
            self.session.params['stall_warnings'] = stall_warnings
Aaron Hill's avatar
Aaron Hill committed
262
        if _with:
263
            self.session.params['with'] = _with
Aaron Hill's avatar
Aaron Hill committed
264
        if replies:
265
            self.session.params['replies'] = replies
Aaron Hill's avatar
Aaron Hill committed
266
        if locations and len(locations) > 0:
267
268
269
            if len(locations) % 4 != 0:
                raise TweepError("Wrong number of locations points, "
                                 "it has to be a multiple of 4")
270
            self.session.params['locations'] = ','.join(['%.2f' % l for l in locations])
Aaron Hill's avatar
Aaron Hill committed
271
272
        if track:
            encoded_track = [s.encode(encoding) for s in track]
273
            self.session.params['track'] = ','.join(encoded_track)
Aaron Hill's avatar
Aaron Hill committed
274

AlanBell's avatar
AlanBell committed
275
        self._start(async)
276
277

    def firehose(self, count=None, async=False):
Aaron Hill's avatar
Aaron Hill committed
278
        self.session.params = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
279
280
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
281
        self.url = '/%s/statuses/firehose.json' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
282
283
        if count:
            self.url += '&count=%s' % count
284
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
285

286
    def retweet(self, async=False):
Aaron Hill's avatar
Aaron Hill committed
287
        self.session.params = {'delimited': 'length'}
288
289
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
290
        self.url = '/%s/statuses/retweet.json' % STREAM_VERSION
291
        self._start(async)
292

293
    def sample(self, async=False, language=None):
Josh Roesslein's avatar
Josh Roesslein committed
294
295
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
296
        self.url = '/%s/statuses/sample.json?delimited=length' % STREAM_VERSION
297
298
299
        if language:
            self.url += '&language=%s' % language
            self.parameters['language'] = language
300
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
301

302
    def filter(self, follow=None, track=None, async=False, locations=None,
303
               stall_warnings=False, languages=None, encoding='utf8'):
Aaron Hill's avatar
Aaron Hill committed
304
        self.session.params = {}
Aaron Hill's avatar
Aaron Hill committed
305
        self.session.headers['Content-type'] = "application/x-www-form-urlencoded"
Josh Roesslein's avatar
Josh Roesslein committed
306
307
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
308
        self.url = '/%s/statuses/filter.json' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
309
        if follow:
310
            encoded_follow = [s.encode(encoding) for s in follow]
Aaron Hill's avatar
Aaron Hill committed
311
            self.session.params['follow'] = ','.join(encoded_follow)
Josh Roesslein's avatar
Josh Roesslein committed
312
        if track:
Aaron Hill's avatar
Aaron Hill committed
313
314
            encoded_track = [s.encode(encoding) for s in track]
            self.session.params['track'] = ','.join(encoded_track)
315
        if locations and len(locations) > 0:
316
317
318
            if len(locations) % 4 != 0:
                raise TweepError("Wrong number of locations points, "
                                 "it has to be a multiple of 4")
Aaron Hill's avatar
Aaron Hill committed
319
            self.session.params['locations'] = ','.join(['%.4f' % l for l in locations])
320
        if stall_warnings:
Aaron Hill's avatar
Aaron Hill committed
321
            self.session.params['stall_warnings'] = stall_warnings
322
        if languages:
Aaron Hill's avatar
Aaron Hill committed
323
324
325
326
            self.session.params['language'] = ','.join(map(str, languages))
        self.body = urlencode_noplus(self.session.params)
        self.session.params['delimited'] = 'length'
        self.host = 'stream.twitter.com'
327
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
328

Aaron Hill's avatar
Aaron Hill committed
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
    def sitestream(self, follow, stall_warnings=False, with_='user', replies=False, async=False):
        self.parameters = {}
        if self.running:
            raise TweepError('Stream object already connected!')
        self.url = '/%s/site.json' % STREAM_VERSION
        self.parameters['follow'] = ','.join(map(str, follow))
        self.parameters['delimited'] = 'length'
        if stall_warnings:
            self.parameters['stall_warnings'] = stall_warnings
        if with_:
            self.parameters['with'] = with_
        if replies:
            self.parameters['replies'] = replies
        self.body = urlencode_noplus(self.parameters)
        self._start(async)

Josh Roesslein's avatar
Josh Roesslein committed
345
    def disconnect(self):
346
        if self.running is False:
Josh Roesslein's avatar
Josh Roesslein committed
347
348
349
            return
        self.running = False