streaming.py 11.5 KB
Newer Older
1
# Tweepy
2
3
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
4

5
import logging
Aaron Hill's avatar
Aaron Hill committed
6
7
import requests
from requests.exceptions import Timeout
8
from threading import Thread
9
from time import sleep
10
import ssl
11

12
from tweepy.models import Status
Josh Roesslein's avatar
Josh Roesslein committed
13
14
from tweepy.api import API
from tweepy.error import TweepError
15

16
from tweepy.utils import import_simplejson, urlencode_noplus
17
json = import_simplejson()
18

Joshua Roesslein's avatar
Joshua Roesslein committed
19
STREAM_VERSION = '1.1'
20

Josh Roesslein's avatar
Josh Roesslein committed
21

22
23
class StreamListener(object):

24
25
    def __init__(self, api=None):
        self.api = api or API()
26

27
28
29
30
31
32
33
34
35
    def on_connect(self):
        """Called once connected to streaming server.

        This will be invoked once a successful response
        is received from the server. Allows the listener
        to perform some work prior to entering the read loop.
        """
        pass

36
    def on_data(self, raw_data):
37
38
39
40
41
        """Called when raw data is received from connection.

        Override this method if you wish to manually handle
        the stream data. Return False to stop stream and close connection.
        """
42
        data = json.loads(raw_data)
43
44

        if 'in_reply_to_status_id' in data:
45
            status = Status.parse(self.api, data)
46
47
48
            if self.on_status(status) is False:
                return False
        elif 'delete' in data:
49
            delete = data['delete']['status']
50
51
            if self.on_delete(delete['id'], delete['user_id']) is False:
                return False
Tetsuya Shinone's avatar
Tetsuya Shinone committed
52
53
54
55
56
57
58
59
        elif 'event' in data:
            status = Status.parse(self.api, data)
            if self.on_event(status) is False:
                return False
        elif 'direct_message' in data:
            status = Status.parse(self.api, data)
            if self.on_direct_message(status) is False:
                return False
60
        elif 'limit' in data:
61
            if self.on_limit(data['limit']['track']) is False:
62
                return False
63
64
65
66
67
        elif 'disconnect' in data:
            if self.on_disconnect(data['disconnect']) is False:
                return False
        else:
            logging.error("Unknown message type: " + str(raw_data))
68

Josh Roesslein's avatar
Josh Roesslein committed
69
70
71
72
    def on_status(self, status):
        """Called when a new status arrives"""
        return

73
74
75
76
    def on_exception(self, exception):
        """Called when an unhandled exception occurs."""
        return

Josh Roesslein's avatar
Josh Roesslein committed
77
78
79
    def on_delete(self, status_id, user_id):
        """Called when a delete notice arrives for a status"""
        return
80

Tetsuya Shinone's avatar
Tetsuya Shinone committed
81
82
83
84
85
86
87
88
    def on_event(self, status):
        """Called when a new event arrives"""
        return

    def on_direct_message(self, status):
        """Called when a new direct message arrives"""
        return

Josh Roesslein's avatar
Josh Roesslein committed
89
90
91
    def on_limit(self, track):
        """Called when a limitation notice arrvies"""
        return
92

Josh Roesslein's avatar
Josh Roesslein committed
93
94
95
    def on_error(self, status_code):
        """Called when a non-200 status code is returned"""
        return False
96

Josh Roesslein's avatar
Josh Roesslein committed
97
98
99
    def on_timeout(self):
        """Called when stream connection times out"""
        return
100

101
102
103
    def on_disconnect(self, notice):
        """Called when twitter sends a disconnect notice

104
        Disconnect codes are listed here:
105
106
        https://dev.twitter.com/docs/streaming-apis/messages#Disconnect_messages_disconnect
        """
107
        return
108

109

110
111
class Stream(object):

Josh Roesslein's avatar
Josh Roesslein committed
112
113
    host = 'stream.twitter.com'

114
    def __init__(self, auth, listener, **options):
115
        self.auth = auth
Josh Roesslein's avatar
Josh Roesslein committed
116
        self.listener = listener
117
        self.running = False
118
        self.timeout = options.get("timeout", 300.0)
119
        self.retry_count = options.get("retry_count")
120
121
122
123
124
125
        # values according to https://dev.twitter.com/docs/streaming-apis/connecting#Reconnecting
        self.retry_time_start = options.get("retry_time", 5.0)
        self.retry_420_start = options.get("retry_420", 60.0)
        self.retry_time_cap = options.get("retry_time_cap", 320.0)
        self.snooze_time_step = options.get("snooze_time", 0.25)
        self.snooze_time_cap = options.get("snooze_time_cap", 16)
126
        self.buffer_size = options.get("buffer_size",  1500)
127
        if options.get("secure", True):
128
129
130
131
            self.scheme = "https"
        else:
            self.scheme = "http"

Josh Roesslein's avatar
Josh Roesslein committed
132
        self.api = API()
Aaron Hill's avatar
Aaron Hill committed
133
134
135
        self.session = requests.Session()
        self.session.headers = options.get("headers") or {}
        self.session.params = None
136
        self.body = None
137
        self.retry_time = self.retry_time_start
138
        self.snooze_time = self.snooze_time_step
Josh Roesslein's avatar
Josh Roesslein committed
139
140

    def _run(self):
141
        # Authenticate
142
        url = "%s://%s%s" % (self.scheme, self.host, self.url)
Josh Roesslein's avatar
Josh Roesslein committed
143

144
        # Connect and process the stream
Josh Roesslein's avatar
Josh Roesslein committed
145
        error_counter = 0
Aaron Hill's avatar
Aaron Hill committed
146
        resp = None
147
        exception = None
Josh Roesslein's avatar
Josh Roesslein committed
148
        while self.running:
149
            if self.retry_count is not None and error_counter > self.retry_count:
Josh Roesslein's avatar
Josh Roesslein committed
150
151
152
                # quit if error count greater than retry count
                break
            try:
Aaron Hill's avatar
Aaron Hill committed
153
154
155
                auth = self.auth.apply_auth()
                resp = self.session.request('POST', url, data=self.body,
                        timeout=self.timeout, stream=True, auth=auth)
Aaron Hill's avatar
Aaron Hill committed
156
157
                if resp.status_code != 200:
                    if self.listener.on_error(resp.status_code) is False:
Josh Roesslein's avatar
Josh Roesslein committed
158
159
                        break
                    error_counter += 1
160
161
                    if resp.status == 420:
                        self.retry_time = max(self.retry_420_start, self.retry_time)
Josh Roesslein's avatar
Josh Roesslein committed
162
                    sleep(self.retry_time)
163
                    self.retry_time = min(self.retry_time * 2, self.retry_time_cap)
Josh Roesslein's avatar
Josh Roesslein committed
164
165
                else:
                    error_counter = 0
166
                    self.retry_time = self.retry_time_start
167
                    self.snooze_time = self.snooze_time_step
168
                    self.listener.on_connect()
Josh Roesslein's avatar
Josh Roesslein committed
169
                    self._read_loop(resp)
170
171
            except (Timeout, ssl.SSLError) as exc:
                # This is still necessary, as a SSLError can actually be thrown when using Requests
172
173
174
175
                # If it's not time out treat it like any other exception
                if isinstance(exc, ssl.SSLError) and not (exc.args and 'timed out' in str(exc.args[0])):
                    exception = exc
                    break
Josh Roesslein's avatar
Josh Roesslein committed
176
177
178
179
180
                if self.listener.on_timeout() == False:
                    break
                if self.running is False:
                    break
                sleep(self.snooze_time)
181
182
                self.snooze_time = min(self.snooze_time + self.snooze_time_step,
                                       self.snooze_time_cap)
183
            except Exception as exception:
Josh Roesslein's avatar
Josh Roesslein committed
184
185
186
187
188
                # any other exception is fatal, so kill loop
                break

        # cleanup
        self.running = False
Aaron Hill's avatar
Aaron Hill committed
189
190
        if resp:
            resp.close()
Josh Roesslein's avatar
Josh Roesslein committed
191

192
193
        self.session = requests.Session()

194
        if exception:
195
196
            # call a handler first so that the exception can be logged.
            self.listener.on_exception(exception)
197
            raise
198

199
    def _data(self, data):
200
        if self.listener.on_data(data) is False:
201
            self.running = False
202

Josh Roesslein's avatar
Josh Roesslein committed
203
    def _read_loop(self, resp):
204

Aaron Hill's avatar
Aaron Hill committed
205
        while self.running:
206
207
208

            # Note: keep-alive newlines might be inserted before each length value.
            # read until we get a digit...
209
            c = '\n'
Aaron Hill's avatar
Aaron Hill committed
210
211
212
213
214
            for c in resp.iter_content():
                if c == '\n':
                    continue
                break

215
216
217
            delimited_string = c

            # read rest of delimiter length..
Steve Jones's avatar
Steve Jones committed
218
            d = ''
Aaron Hill's avatar
Aaron Hill committed
219
220
221
222
223
            for d in resp.iter_content():
                if d != '\n':
                    delimited_string += d
                    continue
                break
224
225

            # read the next twitter status object
226
            if delimited_string.strip().isdigit():
Aaron Hill's avatar
Aaron Hill committed
227
                next_status_obj = resp.raw.read( int(delimited_string) )
228
229
                if self.running:
                    self._data(next_status_obj)
Josh Roesslein's avatar
Josh Roesslein committed
230

Aaron Hill's avatar
Aaron Hill committed
231
        if resp.raw._fp.isclosed():
232
            self.on_closed(resp)
Josh Roesslein's avatar
Josh Roesslein committed
233

234
235
236
237
238
239
    def _start(self, async):
        self.running = True
        if async:
            Thread(target=self._run).start()
        else:
            self._run()
Josh Roesslein's avatar
Josh Roesslein committed
240

241
242
243
244
    def on_closed(self, resp):
        """ Called when the response has been closed by Twitter """
        pass

Aaron Hill's avatar
Aaron Hill committed
245
246
    def userstream(self, stall_warnings=False, _with=None, replies=None,
            track=None, locations=None, async=False, encoding='utf8'):
247
        self.parameters = {'delimited': 'length'}
AlanBell's avatar
AlanBell committed
248
249
        if self.running:
            raise TweepError('Stream object already connected!')
250
        self.url = '/%s/user.json' % STREAM_VERSION
AlanBell's avatar
AlanBell committed
251
        self.host='userstream.twitter.com'
Aaron Hill's avatar
Aaron Hill committed
252
253
254
255
256
257
258
        if stall_warnings:
            self.parameters['stall_warnings'] = stall_warnings
        if _with:
            self.parameters['with'] = _with
        if replies:
            self.parameters['replies'] = replies
        if locations and len(locations) > 0:
259
260
261
            if len(locations) % 4 != 0:
                raise TweepError("Wrong number of locations points, "
                                 "it has to be a multiple of 4")
Aaron Hill's avatar
Aaron Hill committed
262
263
264
265
            self.parameters['locations'] = ','.join(['%.2f' % l for l in locations])
        if track:
            encoded_track = [s.encode(encoding) for s in track]
            self.parameters['track'] = ','.join(encoded_track)
266

Aaron Hill's avatar
Aaron Hill committed
267
        self.body = urlencode_noplus(self.parameters)
268
        self.url = self.url + '?' + self.body
Aaron Hill's avatar
Aaron Hill committed
269

AlanBell's avatar
AlanBell committed
270
        self._start(async)
271
272

    def firehose(self, count=None, async=False):
Aaron Hill's avatar
Aaron Hill committed
273
        self.session.params = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
274
275
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
276
        self.url = '/%s/statuses/firehose.json' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
277
278
        if count:
            self.url += '&count=%s' % count
279
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
280

281
    def retweet(self, async=False):
Aaron Hill's avatar
Aaron Hill committed
282
        self.session.params = {'delimited': 'length'}
283
284
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
285
        self.url = '/%s/statuses/retweet.json' % STREAM_VERSION
286
        self._start(async)
287

288
    def sample(self, async=False):
Aaron Hill's avatar
Aaron Hill committed
289
        self.session.params = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
290
291
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
292
        self.url = '/%s/statuses/sample.json?delimited=length' % STREAM_VERSION
293
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
294

295
    def filter(self, follow=None, track=None, async=False, locations=None,
296
               stall_warnings=False, languages=None, encoding='utf8'):
Aaron Hill's avatar
Aaron Hill committed
297
        self.session.params = {}
298
        self.headers['Content-type'] = "application/x-www-form-urlencoded"
Josh Roesslein's avatar
Josh Roesslein committed
299
300
        if self.running:
            raise TweepError('Stream object already connected!')
Aaron Hill's avatar
Aaron Hill committed
301
        self.url = '/%s/statuses/filter.json' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
302
        if follow:
303
            encoded_follow = [s.encode(encoding) for s in follow]
Aaron Hill's avatar
Aaron Hill committed
304
            self.session.params['follow'] = ','.join(encoded_follow)
Josh Roesslein's avatar
Josh Roesslein committed
305
        if track:
Aaron Hill's avatar
Aaron Hill committed
306
307
            encoded_track = [s.encode(encoding) for s in track]
            self.session.params['track'] = ','.join(encoded_track)
308
        if locations and len(locations) > 0:
309
310
311
            if len(locations) % 4 != 0:
                raise TweepError("Wrong number of locations points, "
                                 "it has to be a multiple of 4")
Aaron Hill's avatar
Aaron Hill committed
312
            self.session.params['locations'] = ','.join(['%.4f' % l for l in locations])
313
        if stall_warnings:
Aaron Hill's avatar
Aaron Hill committed
314
            self.session.params['stall_warnings'] = stall_warnings
315
        if languages:
Aaron Hill's avatar
Aaron Hill committed
316
317
318
319
            self.session.params['language'] = ','.join(map(str, languages))
        self.body = urlencode_noplus(self.session.params)
        self.session.params['delimited'] = 'length'
        self.host = 'stream.twitter.com'
320
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
321
322

    def disconnect(self):
323
        if self.running is False:
Josh Roesslein's avatar
Josh Roesslein committed
324
325
326
            return
        self.running = False