streaming.py 12 KB
Newer Older
1
# Tweepy
2
3
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
4

5
import logging
Aaron Hill's avatar
Aaron Hill committed
6
7
import requests
from requests.exceptions import Timeout
8
from threading import Thread
9
from time import sleep
Aaron Hill's avatar
Aaron Hill committed
10
from HTMLParser import HTMLParser
11
import ssl
12

13
from tweepy.models import Status
Josh Roesslein's avatar
Josh Roesslein committed
14
15
from tweepy.api import API
from tweepy.error import TweepError
16

17
from tweepy.utils import import_simplejson, urlencode_noplus
18
json = import_simplejson()
19

Joshua Roesslein's avatar
Joshua Roesslein committed
20
STREAM_VERSION = '1.1'
21

Josh Roesslein's avatar
Josh Roesslein committed
22

23
24
class StreamListener(object):

25
26
    def __init__(self, api=None):
        self.api = api or API()
27

28
29
30
31
32
33
34
35
36
    def on_connect(self):
        """Called once connected to streaming server.

        This will be invoked once a successful response
        is received from the server. Allows the listener
        to perform some work prior to entering the read loop.
        """
        pass

37
    def on_data(self, raw_data):
38
39
40
41
42
        """Called when raw data is received from connection.

        Override this method if you wish to manually handle
        the stream data. Return False to stop stream and close connection.
        """
Aaron Hill's avatar
Aaron Hill committed
43
        data = json.loads(HTMLParser().unescape(raw_data))
44
45

        if 'in_reply_to_status_id' in data:
46
            status = Status.parse(self.api, data)
47
48
49
            if self.on_status(status) is False:
                return False
        elif 'delete' in data:
50
            delete = data['delete']['status']
51
52
            if self.on_delete(delete['id'], delete['user_id']) is False:
                return False
Tetsuya Shinone's avatar
Tetsuya Shinone committed
53
54
55
56
57
58
59
60
        elif 'event' in data:
            status = Status.parse(self.api, data)
            if self.on_event(status) is False:
                return False
        elif 'direct_message' in data:
            status = Status.parse(self.api, data)
            if self.on_direct_message(status) is False:
                return False
61
        elif 'limit' in data:
62
            if self.on_limit(data['limit']['track']) is False:
63
                return False
64
65
66
67
68
        elif 'disconnect' in data:
            if self.on_disconnect(data['disconnect']) is False:
                return False
        else:
            logging.error("Unknown message type: " + str(raw_data))
69

Josh Roesslein's avatar
Josh Roesslein committed
70
71
72
73
    def on_status(self, status):
        """Called when a new status arrives"""
        return

74
75
76
77
    def on_exception(self, exception):
        """Called when an unhandled exception occurs."""
        return

Josh Roesslein's avatar
Josh Roesslein committed
78
79
80
    def on_delete(self, status_id, user_id):
        """Called when a delete notice arrives for a status"""
        return
81

Tetsuya Shinone's avatar
Tetsuya Shinone committed
82
83
84
85
86
87
88
89
    def on_event(self, status):
        """Called when a new event arrives"""
        return

    def on_direct_message(self, status):
        """Called when a new direct message arrives"""
        return

Josh Roesslein's avatar
Josh Roesslein committed
90
91
92
    def on_limit(self, track):
        """Called when a limitation notice arrvies"""
        return
93

Josh Roesslein's avatar
Josh Roesslein committed
94
95
96
    def on_error(self, status_code):
        """Called when a non-200 status code is returned"""
        return False
97

Josh Roesslein's avatar
Josh Roesslein committed
98
99
100
    def on_timeout(self):
        """Called when stream connection times out"""
        return
101

102
103
104
    def on_disconnect(self, notice):
        """Called when twitter sends a disconnect notice

105
        Disconnect codes are listed here:
106
107
        https://dev.twitter.com/docs/streaming-apis/messages#Disconnect_messages_disconnect
        """
108
        return
109

110

111
112
class Stream(object):

Josh Roesslein's avatar
Josh Roesslein committed
113
114
    host = 'stream.twitter.com'

115
    def __init__(self, auth, listener, **options):
116
        self.auth = auth
Josh Roesslein's avatar
Josh Roesslein committed
117
        self.listener = listener
118
        self.running = False
119
        self.timeout = options.get("timeout", 300.0)
120
        self.retry_count = options.get("retry_count")
121
122
123
124
125
126
        # values according to https://dev.twitter.com/docs/streaming-apis/connecting#Reconnecting
        self.retry_time_start = options.get("retry_time", 5.0)
        self.retry_420_start = options.get("retry_420", 60.0)
        self.retry_time_cap = options.get("retry_time_cap", 320.0)
        self.snooze_time_step = options.get("snooze_time", 0.25)
        self.snooze_time_cap = options.get("snooze_time_cap", 16)
127
        self.buffer_size = options.get("buffer_size",  1500)
128

Josh Roesslein's avatar
Josh Roesslein committed
129
        self.api = API()
Aaron Hill's avatar
Aaron Hill committed
130
131
132
        self.session = requests.Session()
        self.session.headers = options.get("headers") or {}
        self.session.params = None
133
        self.body = None
134
        self.retry_time = self.retry_time_start
135
        self.snooze_time = self.snooze_time_step
Josh Roesslein's avatar
Josh Roesslein committed
136
137

    def _run(self):
138
        # Authenticate
Joshua Roesslein's avatar
Joshua Roesslein committed
139
        url = "https://%s%s" % (self.host, self.url)
Josh Roesslein's avatar
Josh Roesslein committed
140

141
        # Connect and process the stream
Josh Roesslein's avatar
Josh Roesslein committed
142
        error_counter = 0
Aaron Hill's avatar
Aaron Hill committed
143
        resp = None
144
        exception = None
Josh Roesslein's avatar
Josh Roesslein committed
145
        while self.running:
146
            if self.retry_count is not None and error_counter > self.retry_count:
Josh Roesslein's avatar
Josh Roesslein committed
147
148
149
                # quit if error count greater than retry count
                break
            try:
Aaron Hill's avatar
Aaron Hill committed
150
151
152
                auth = self.auth.apply_auth()
                resp = self.session.request('POST', url, data=self.body,
                        timeout=self.timeout, stream=True, auth=auth)
Aaron Hill's avatar
Aaron Hill committed
153
154
                if resp.status_code != 200:
                    if self.listener.on_error(resp.status_code) is False:
Josh Roesslein's avatar
Josh Roesslein committed
155
156
                        break
                    error_counter += 1
skrew's avatar
skrew committed
157
                    if resp.status_code == 420:
158
                        self.retry_time = max(self.retry_420_start, self.retry_time)
Josh Roesslein's avatar
Josh Roesslein committed
159
                    sleep(self.retry_time)
160
                    self.retry_time = min(self.retry_time * 2, self.retry_time_cap)
Josh Roesslein's avatar
Josh Roesslein committed
161
162
                else:
                    error_counter = 0
163
                    self.retry_time = self.retry_time_start
164
                    self.snooze_time = self.snooze_time_step
165
                    self.listener.on_connect()
Josh Roesslein's avatar
Josh Roesslein committed
166
                    self._read_loop(resp)
167
168
            except (Timeout, ssl.SSLError) as exc:
                # This is still necessary, as a SSLError can actually be thrown when using Requests
169
170
171
172
                # If it's not time out treat it like any other exception
                if isinstance(exc, ssl.SSLError) and not (exc.args and 'timed out' in str(exc.args[0])):
                    exception = exc
                    break
Josh Roesslein's avatar
Josh Roesslein committed
173
174
175
176
177
                if self.listener.on_timeout() == False:
                    break
                if self.running is False:
                    break
                sleep(self.snooze_time)
178
179
                self.snooze_time = min(self.snooze_time + self.snooze_time_step,
                                       self.snooze_time_cap)
180
            except Exception as exception:
Josh Roesslein's avatar
Josh Roesslein committed
181
182
183
184
185
                # any other exception is fatal, so kill loop
                break

        # cleanup
        self.running = False
Aaron Hill's avatar
Aaron Hill committed
186
187
        if resp:
            resp.close()
Josh Roesslein's avatar
Josh Roesslein committed
188

189
190
        self.session = requests.Session()

191
        if exception:
192
193
            # call a handler first so that the exception can be logged.
            self.listener.on_exception(exception)
194
            raise
195

196
    def _data(self, data):
197
        if self.listener.on_data(data) is False:
198
            self.running = False
199

Josh Roesslein's avatar
Josh Roesslein committed
200
    def _read_loop(self, resp):
201

Aaron Hill's avatar
Aaron Hill committed
202
        while self.running:
203
204
205

            # Note: keep-alive newlines might be inserted before each length value.
            # read until we get a digit...
206
            c = '\n'
Aaron Hill's avatar
Aaron Hill committed
207
208
209
210
211
            for c in resp.iter_content():
                if c == '\n':
                    continue
                break

212
213
214
            delimited_string = c

            # read rest of delimiter length..
Steve Jones's avatar
Steve Jones committed
215
            d = ''
Aaron Hill's avatar
Aaron Hill committed
216
217
218
219
220
            for d in resp.iter_content():
                if d != '\n':
                    delimited_string += d
                    continue
                break
221
222

            # read the next twitter status object
223
            if delimited_string.strip().isdigit():
Aaron Hill's avatar
Aaron Hill committed
224
                next_status_obj = resp.raw.read( int(delimited_string) )
225
226
                if self.running:
                    self._data(next_status_obj)
Josh Roesslein's avatar
Josh Roesslein committed
227

Aaron Hill's avatar
Aaron Hill committed
228
        if resp.raw._fp.isclosed():
229
            self.on_closed(resp)
Josh Roesslein's avatar
Josh Roesslein committed
230

231
232
233
    def _start(self, async):
        self.running = True
        if async:
Constantine's avatar
Constantine committed
234
235
            self._thread = Thread(target=self._run)
            self._thread.start()
236
237
        else:
            self._run()
Josh Roesslein's avatar
Josh Roesslein committed
238

239
240
241
242
    def on_closed(self, resp):
        """ Called when the response has been closed by Twitter """
        pass

Aaron Hill's avatar
Aaron Hill committed
243
244
    def userstream(self, stall_warnings=False, _with=None, replies=None,
            track=None, locations=None, async=False, encoding='utf8'):
245
        self.session.params = {'delimited': 'length'}
AlanBell's avatar
AlanBell committed
246
247
        if self.running:
            raise TweepError('Stream object already connected!')
248
        self.url = '/%s/user.json' % STREAM_VERSION
AlanBell's avatar
AlanBell committed
249
        self.host='userstream.twitter.com'
Aaron Hill's avatar
Aaron Hill committed
250
        if stall_warnings:
251
            self.session.params['stall_warnings'] = stall_warnings
Aaron Hill's avatar
Aaron Hill committed
252
        if _with:
253
            self.session.params['with'] = _with
Aaron Hill's avatar
Aaron Hill committed
254
        if replies:
255
            self.session.params['replies'] = replies
Aaron Hill's avatar
Aaron Hill committed
256
        if locations and len(locations) > 0:
257
258
259
            if len(locations) % 4 != 0:
                raise TweepError("Wrong number of locations points, "
                                 "it has to be a multiple of 4")
260
            self.session.params['locations'] = ','.join(['%.2f' % l for l in locations])
Aaron Hill's avatar
Aaron Hill committed
261
262
        if track:
            encoded_track = [s.encode(encoding) for s in track]
263
            self.session.params['track'] = ','.join(encoded_track)
Aaron Hill's avatar
Aaron Hill committed
264

AlanBell's avatar
AlanBell committed
265
        self._start(async)
266
267

    def firehose(self, count=None, async=False):
Aaron Hill's avatar
Aaron Hill committed
268
        self.session.params = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
269
270
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
271
        self.url = '/%s/statuses/firehose.json' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
272
273
        if count:
            self.url += '&count=%s' % count
274
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
275

276
    def retweet(self, async=False):
Aaron Hill's avatar
Aaron Hill committed
277
        self.session.params = {'delimited': 'length'}
278
279
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
280
        self.url = '/%s/statuses/retweet.json' % STREAM_VERSION
281
        self._start(async)
282

283
    def sample(self, async=False):
Josh Roesslein's avatar
Josh Roesslein committed
284
285
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
286
        self.url = '/%s/statuses/sample.json?delimited=length' % STREAM_VERSION
287
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
288

289
    def filter(self, follow=None, track=None, async=False, locations=None,
290
               stall_warnings=False, languages=None, encoding='utf8'):
Aaron Hill's avatar
Aaron Hill committed
291
        self.session.params = {}
Aaron Hill's avatar
Aaron Hill committed
292
        self.session.headers['Content-type'] = "application/x-www-form-urlencoded"
Josh Roesslein's avatar
Josh Roesslein committed
293
294
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
295
        self.url = '/%s/statuses/filter.json' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
296
        if follow:
297
            encoded_follow = [s.encode(encoding) for s in follow]
Aaron Hill's avatar
Aaron Hill committed
298
            self.session.params['follow'] = ','.join(encoded_follow)
Josh Roesslein's avatar
Josh Roesslein committed
299
        if track:
Aaron Hill's avatar
Aaron Hill committed
300
301
            encoded_track = [s.encode(encoding) for s in track]
            self.session.params['track'] = ','.join(encoded_track)
302
        if locations and len(locations) > 0:
303
304
305
            if len(locations) % 4 != 0:
                raise TweepError("Wrong number of locations points, "
                                 "it has to be a multiple of 4")
Aaron Hill's avatar
Aaron Hill committed
306
            self.session.params['locations'] = ','.join(['%.4f' % l for l in locations])
307
        if stall_warnings:
Aaron Hill's avatar
Aaron Hill committed
308
            self.session.params['stall_warnings'] = stall_warnings
309
        if languages:
Aaron Hill's avatar
Aaron Hill committed
310
311
312
313
            self.session.params['language'] = ','.join(map(str, languages))
        self.body = urlencode_noplus(self.session.params)
        self.session.params['delimited'] = 'length'
        self.host = 'stream.twitter.com'
314
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
315

Aaron Hill's avatar
Aaron Hill committed
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
    def sitestream(self, follow, stall_warnings=False, with_='user', replies=False, async=False):
        self.parameters = {}
        if self.running:
            raise TweepError('Stream object already connected!')
        self.url = '/%s/site.json' % STREAM_VERSION
        self.parameters['follow'] = ','.join(map(str, follow))
        self.parameters['delimited'] = 'length'
        if stall_warnings:
            self.parameters['stall_warnings'] = stall_warnings
        if with_:
            self.parameters['with'] = with_
        if replies:
            self.parameters['replies'] = replies
        self.body = urlencode_noplus(self.parameters)
        self._start(async)

Josh Roesslein's avatar
Josh Roesslein committed
332
    def disconnect(self):
333
        if self.running is False:
Josh Roesslein's avatar
Josh Roesslein committed
334
335
336
            return
        self.running = False