streaming.py 11.3 KB
Newer Older
1
# Tweepy
2
3
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
4

5
import logging
6
import httplib
7
from socket import timeout
8
from threading import Thread
9
from time import sleep
10
import ssl
11

12
from tweepy.models import Status
Josh Roesslein's avatar
Josh Roesslein committed
13
14
from tweepy.api import API
from tweepy.error import TweepError
15

16
from tweepy.utils import import_simplejson, urlencode_noplus
17
json = import_simplejson()
18

Joshua Roesslein's avatar
Joshua Roesslein committed
19
STREAM_VERSION = '1.1'
20

Josh Roesslein's avatar
Josh Roesslein committed
21

22
23
class StreamListener(object):

24
25
    def __init__(self, api=None):
        self.api = api or API()
26

27
28
29
30
31
32
33
34
35
    def on_connect(self):
        """Called once connected to streaming server.

        This will be invoked once a successful response
        is received from the server. Allows the listener
        to perform some work prior to entering the read loop.
        """
        pass

36
    def on_data(self, raw_data):
37
38
39
40
41
        """Called when raw data is received from connection.

        Override this method if you wish to manually handle
        the stream data. Return False to stop stream and close connection.
        """
42
        data = json.loads(raw_data)
43
44

        if 'in_reply_to_status_id' in data:
45
            status = Status.parse(self.api, data)
46
47
48
            if self.on_status(status) is False:
                return False
        elif 'delete' in data:
49
            delete = data['delete']['status']
50
51
            if self.on_delete(delete['id'], delete['user_id']) is False:
                return False
Tetsuya Shinone's avatar
Tetsuya Shinone committed
52
53
54
55
56
57
58
59
        elif 'event' in data:
            status = Status.parse(self.api, data)
            if self.on_event(status) is False:
                return False
        elif 'direct_message' in data:
            status = Status.parse(self.api, data)
            if self.on_direct_message(status) is False:
                return False
60
        elif 'limit' in data:
61
            if self.on_limit(data['limit']['track']) is False:
62
                return False
63
64
65
66
67
        elif 'disconnect' in data:
            if self.on_disconnect(data['disconnect']) is False:
                return False
        else:
            logging.error("Unknown message type: " + str(raw_data))
68

Josh Roesslein's avatar
Josh Roesslein committed
69
70
71
72
    def on_status(self, status):
        """Called when a new status arrives"""
        return

73
74
75
76
    def on_exception(self, exception):
        """Called when an unhandled exception occurs."""
        return

Josh Roesslein's avatar
Josh Roesslein committed
77
78
79
    def on_delete(self, status_id, user_id):
        """Called when a delete notice arrives for a status"""
        return
80

Tetsuya Shinone's avatar
Tetsuya Shinone committed
81
82
83
84
85
86
87
88
    def on_event(self, status):
        """Called when a new event arrives"""
        return

    def on_direct_message(self, status):
        """Called when a new direct message arrives"""
        return

Josh Roesslein's avatar
Josh Roesslein committed
89
90
91
    def on_limit(self, track):
        """Called when a limitation notice arrvies"""
        return
92

Josh Roesslein's avatar
Josh Roesslein committed
93
94
95
    def on_error(self, status_code):
        """Called when a non-200 status code is returned"""
        return False
96

Josh Roesslein's avatar
Josh Roesslein committed
97
98
99
    def on_timeout(self):
        """Called when stream connection times out"""
        return
100

101
102
103
    def on_disconnect(self, notice):
        """Called when twitter sends a disconnect notice

104
        Disconnect codes are listed here:
105
106
        https://dev.twitter.com/docs/streaming-apis/messages#Disconnect_messages_disconnect
        """
107
        return
108

109

110
111
class Stream(object):

Josh Roesslein's avatar
Josh Roesslein committed
112
113
    host = 'stream.twitter.com'

114
    def __init__(self, auth, listener, **options):
115
        self.auth = auth
Josh Roesslein's avatar
Josh Roesslein committed
116
        self.listener = listener
117
        self.running = False
118
        self.timeout = options.get("timeout", 300.0)
119
        self.retry_count = options.get("retry_count")
120
121
122
123
124
125
        # values according to https://dev.twitter.com/docs/streaming-apis/connecting#Reconnecting
        self.retry_time_start = options.get("retry_time", 5.0)
        self.retry_420_start = options.get("retry_420", 60.0)
        self.retry_time_cap = options.get("retry_time_cap", 320.0)
        self.snooze_time_step = options.get("snooze_time", 0.25)
        self.snooze_time_cap = options.get("snooze_time_cap", 16)
126
        self.buffer_size = options.get("buffer_size",  1500)
127
        if options.get("secure", True):
128
129
130
131
            self.scheme = "https"
        else:
            self.scheme = "http"

Josh Roesslein's avatar
Josh Roesslein committed
132
        self.api = API()
133
        self.headers = options.get("headers") or {}
134
        self.parameters = None
135
        self.body = None
136
        self.retry_time = self.retry_time_start
137
        self.snooze_time = self.snooze_time_step
Josh Roesslein's avatar
Josh Roesslein committed
138
139

    def _run(self):
140
        # Authenticate
141
        url = "%s://%s%s" % (self.scheme, self.host, self.url)
Josh Roesslein's avatar
Josh Roesslein committed
142

143
        # Connect and process the stream
Josh Roesslein's avatar
Josh Roesslein committed
144
145
        error_counter = 0
        conn = None
146
        exception = None
Josh Roesslein's avatar
Josh Roesslein committed
147
        while self.running:
148
            if self.retry_count is not None and error_counter > self.retry_count:
Josh Roesslein's avatar
Josh Roesslein committed
149
150
151
                # quit if error count greater than retry count
                break
            try:
152
                if self.scheme == "http":
Timo Ewalds's avatar
Timo Ewalds committed
153
                    conn = httplib.HTTPConnection(self.host, timeout=self.timeout)
154
                else:
Timo Ewalds's avatar
Timo Ewalds committed
155
                    conn = httplib.HTTPSConnection(self.host, timeout=self.timeout)
156
                self.auth.apply_auth(url, 'POST', self.headers, self.parameters)
Josh Roesslein's avatar
Josh Roesslein committed
157
                conn.connect()
158
                conn.request('POST', self.url, self.body, headers=self.headers)
Josh Roesslein's avatar
Josh Roesslein committed
159
160
161
162
163
                resp = conn.getresponse()
                if resp.status != 200:
                    if self.listener.on_error(resp.status) is False:
                        break
                    error_counter += 1
164
165
                    if resp.status == 420:
                        self.retry_time = max(self.retry_420_start, self.retry_time)
Josh Roesslein's avatar
Josh Roesslein committed
166
                    sleep(self.retry_time)
167
                    self.retry_time = min(self.retry_time * 2, self.retry_time_cap)
Josh Roesslein's avatar
Josh Roesslein committed
168
169
                else:
                    error_counter = 0
170
                    self.retry_time = self.retry_time_start
171
                    self.snooze_time = self.snooze_time_step
172
                    self.listener.on_connect()
Josh Roesslein's avatar
Josh Roesslein committed
173
                    self._read_loop(resp)
174
            except (timeout, ssl.SSLError) as exc:
175
176
177
178
179
                # If it's not time out treat it like any other exception
                if isinstance(exc, ssl.SSLError) and not (exc.args and 'timed out' in str(exc.args[0])):
                    exception = exc
                    break

Josh Roesslein's avatar
Josh Roesslein committed
180
181
182
183
184
185
                if self.listener.on_timeout() == False:
                    break
                if self.running is False:
                    break
                conn.close()
                sleep(self.snooze_time)
186
187
                self.snooze_time = min(self.snooze_time + self.snooze_time_step,
                                       self.snooze_time_cap)
188
            except Exception as exception:
Josh Roesslein's avatar
Josh Roesslein committed
189
190
191
192
193
194
195
196
                # any other exception is fatal, so kill loop
                break

        # cleanup
        self.running = False
        if conn:
            conn.close()

197
        if exception:
198
199
            # call a handler first so that the exception can be logged.
            self.listener.on_exception(exception)
200
            raise
201

202
    def _data(self, data):
203
        if self.listener.on_data(data) is False:
204
            self.running = False
205

Josh Roesslein's avatar
Josh Roesslein committed
206
    def _read_loop(self, resp):
207

208
        while self.running and not resp.isclosed():
209
210
211

            # Note: keep-alive newlines might be inserted before each length value.
            # read until we get a digit...
212
213
            c = '\n'
            while c == '\n' and self.running and not resp.isclosed():
214
215
216
217
                c = resp.read(1)
            delimited_string = c

            # read rest of delimiter length..
Steve Jones's avatar
Steve Jones committed
218
            d = ''
219
            while d != '\n' and self.running and not resp.isclosed():
Steve Jones's avatar
Steve Jones committed
220
221
                d = resp.read(1)
                delimited_string += d
222
223

            # read the next twitter status object
224
            if delimited_string.strip().isdigit():
225
                next_status_obj = resp.read( int(delimited_string) )
226
227
                if self.running:
                    self._data(next_status_obj)
Josh Roesslein's avatar
Josh Roesslein committed
228

229
230
        if resp.isclosed():
            self.on_closed(resp)
Josh Roesslein's avatar
Josh Roesslein committed
231

232
233
234
235
236
237
    def _start(self, async):
        self.running = True
        if async:
            Thread(target=self._run).start()
        else:
            self._run()
Josh Roesslein's avatar
Josh Roesslein committed
238

239
240
241
242
    def on_closed(self, resp):
        """ Called when the response has been closed by Twitter """
        pass

Aaron Hill's avatar
Aaron Hill committed
243
244
    def userstream(self, stall_warnings=False, _with=None, replies=None,
            track=None, locations=None, async=False, encoding='utf8'):
245
        self.parameters = {'delimited': 'length'}
AlanBell's avatar
AlanBell committed
246
247
        if self.running:
            raise TweepError('Stream object already connected!')
248
        self.url = '/%s/user.json' % STREAM_VERSION
AlanBell's avatar
AlanBell committed
249
        self.host='userstream.twitter.com'
Aaron Hill's avatar
Aaron Hill committed
250
251
252
253
254
255
256
257
258
259
260
261
        if stall_warnings:
            self.parameters['stall_warnings'] = stall_warnings
        if _with:
            self.parameters['with'] = _with
        if replies:
            self.parameters['replies'] = replies
        if locations and len(locations) > 0:
            assert len(locations) % 4 == 0
            self.parameters['locations'] = ','.join(['%.2f' % l for l in locations])
        if track:
            encoded_track = [s.encode(encoding) for s in track]
            self.parameters['track'] = ','.join(encoded_track)
262

Aaron Hill's avatar
Aaron Hill committed
263
        self.body = urlencode_noplus(self.parameters)
264
        self.url = self.url + '?' + self.body
AlanBell's avatar
AlanBell committed
265
        self._start(async)
266
267

    def firehose(self, count=None, async=False):
268
        self.parameters = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
269
270
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
271
        self.url = '/%s/statuses/firehose.json?delimited=length' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
272
273
        if count:
            self.url += '&count=%s' % count
274
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
275

276
    def retweet(self, async=False):
277
        self.parameters = {'delimited': 'length'}
278
279
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
280
        self.url = '/%s/statuses/retweet.json?delimited=length' % STREAM_VERSION
281
        self._start(async)
282

283
    def sample(self, async=False):
284
        self.parameters = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
285
286
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
287
        self.url = '/%s/statuses/sample.json?delimited=length' % STREAM_VERSION
288
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
289

290
    def filter(self, follow=None, track=None, async=False, locations=None,
291
               stall_warnings=False, languages=None, encoding='utf8'):
292
        self.parameters = {}
293
        self.headers['Content-type'] = "application/x-www-form-urlencoded"
Josh Roesslein's avatar
Josh Roesslein committed
294
295
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
296
        self.url = '/%s/statuses/filter.json?delimited=length' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
297
        if follow:
298
299
            encoded_follow = [s.encode(encoding) for s in follow]
            self.parameters['follow'] = ','.join(encoded_follow)
Josh Roesslein's avatar
Josh Roesslein committed
300
        if track:
301
302
            encoded_track = [s.encode(encoding) for s in track]
            self.parameters['track'] = ','.join(encoded_track)
303
304
        if locations and len(locations) > 0:
            assert len(locations) % 4 == 0
NoMoKeTo's avatar
NoMoKeTo committed
305
            self.parameters['locations'] = ','.join(['%.4f' % l for l in locations])
306
307
        if stall_warnings:
            self.parameters['stall_warnings'] = stall_warnings
308
309
        if languages:
            self.parameters['language'] = ','.join(map(str, languages))
310
        self.body = urlencode_noplus(self.parameters)
311
        self.parameters['delimited'] = 'length'
312
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
313
314

    def disconnect(self):
315
        if self.running is False:
Josh Roesslein's avatar
Josh Roesslein committed
316
317
318
            return
        self.running = False