streaming.py 10.2 KB
Newer Older
1
# Tweepy
2
3
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
4

5
import logging
6
import httplib
7
from socket import timeout
8
from threading import Thread
9
from time import sleep
10
import ssl
11

12
from tweepy.models import Status
Josh Roesslein's avatar
Josh Roesslein committed
13
14
from tweepy.api import API
from tweepy.error import TweepError
15

16
from tweepy.utils import import_simplejson, urlencode_noplus
17
json = import_simplejson()
18

Joshua Roesslein's avatar
Joshua Roesslein committed
19
STREAM_VERSION = '1.1'
20

Josh Roesslein's avatar
Josh Roesslein committed
21

22
23
class StreamListener(object):

24
25
    def __init__(self, api=None):
        self.api = api or API()
26

27
28
29
30
31
32
33
34
35
    def on_connect(self):
        """Called once connected to streaming server.

        This will be invoked once a successful response
        is received from the server. Allows the listener
        to perform some work prior to entering the read loop.
        """
        pass

36
    def on_data(self, raw_data):
37
38
39
40
41
        """Called when raw data is received from connection.

        Override this method if you wish to manually handle
        the stream data. Return False to stop stream and close connection.
        """
42
        data = json.loads(raw_data)
43
44

        if 'in_reply_to_status_id' in data:
45
            status = Status.parse(self.api, data)
46
47
48
            if self.on_status(status) is False:
                return False
        elif 'delete' in data:
49
            delete = data['delete']['status']
50
51
            if self.on_delete(delete['id'], delete['user_id']) is False:
                return False
Tetsuya Shinone's avatar
Tetsuya Shinone committed
52
53
54
55
56
57
58
59
        elif 'event' in data:
            status = Status.parse(self.api, data)
            if self.on_event(status) is False:
                return False
        elif 'direct_message' in data:
            status = Status.parse(self.api, data)
            if self.on_direct_message(status) is False:
                return False
60
        elif 'limit' in data:
61
            if self.on_limit(data['limit']['track']) is False:
62
                return False
63
64
65
66
67
        elif 'disconnect' in data:
            if self.on_disconnect(data['disconnect']) is False:
                return False
        else:
            logging.error("Unknown message type: " + str(raw_data))
68

Josh Roesslein's avatar
Josh Roesslein committed
69
70
71
72
    def on_status(self, status):
        """Called when a new status arrives"""
        return

73
74
75
76
    def on_exception(self, exception):
        """Called when an unhandled exception occurs."""
        return

Josh Roesslein's avatar
Josh Roesslein committed
77
78
79
    def on_delete(self, status_id, user_id):
        """Called when a delete notice arrives for a status"""
        return
80

Tetsuya Shinone's avatar
Tetsuya Shinone committed
81
82
83
84
85
86
87
88
    def on_event(self, status):
        """Called when a new event arrives"""
        return

    def on_direct_message(self, status):
        """Called when a new direct message arrives"""
        return

Josh Roesslein's avatar
Josh Roesslein committed
89
90
91
    def on_limit(self, track):
        """Called when a limitation notice arrvies"""
        return
92

Josh Roesslein's avatar
Josh Roesslein committed
93
94
95
    def on_error(self, status_code):
        """Called when a non-200 status code is returned"""
        return False
96

Josh Roesslein's avatar
Josh Roesslein committed
97
98
99
    def on_timeout(self):
        """Called when stream connection times out"""
        return
100

101
102
103
    def on_disconnect(self, notice):
        """Called when twitter sends a disconnect notice

104
        Disconnect codes are listed here:
105
106
        https://dev.twitter.com/docs/streaming-apis/messages#Disconnect_messages_disconnect
        """
107
        return
108

109

110
111
class Stream(object):

Josh Roesslein's avatar
Josh Roesslein committed
112
113
    host = 'stream.twitter.com'

114
    def __init__(self, auth, listener, **options):
115
        self.auth = auth
Josh Roesslein's avatar
Josh Roesslein committed
116
        self.listener = listener
117
        self.running = False
118
        self.timeout = options.get("timeout", 300.0)
119
        self.retry_count = options.get("retry_count")
120
121
122
123
        self.retry_time_start = options.get("retry_time", 10.0)
        self.retry_time_cap = options.get("retry_time_cap", 240.0)
        self.snooze_time_start = options.get("snooze_time",  0.25)
        self.snooze_time_cap = options.get("snooze_time_cap",  16)
124
        self.buffer_size = options.get("buffer_size",  1500)
125
        if options.get("secure", True):
126
127
128
129
            self.scheme = "https"
        else:
            self.scheme = "http"

Josh Roesslein's avatar
Josh Roesslein committed
130
        self.api = API()
131
        self.headers = options.get("headers") or {}
132
        self.parameters = None
133
        self.body = None
134
135
        self.retry_time = self.retry_time_start
        self.snooze_time = self.snooze_time_start
Josh Roesslein's avatar
Josh Roesslein committed
136
137

    def _run(self):
138
        # Authenticate
139
        url = "%s://%s%s" % (self.scheme, self.host, self.url)
Josh Roesslein's avatar
Josh Roesslein committed
140

141
        # Connect and process the stream
Josh Roesslein's avatar
Josh Roesslein committed
142
143
        error_counter = 0
        conn = None
144
        exception = None
Josh Roesslein's avatar
Josh Roesslein committed
145
        while self.running:
146
            if self.retry_count is not None and error_counter > self.retry_count:
Josh Roesslein's avatar
Josh Roesslein committed
147
148
149
                # quit if error count greater than retry count
                break
            try:
150
                if self.scheme == "http":
Timo Ewalds's avatar
Timo Ewalds committed
151
                    conn = httplib.HTTPConnection(self.host, timeout=self.timeout)
152
                else:
Timo Ewalds's avatar
Timo Ewalds committed
153
                    conn = httplib.HTTPSConnection(self.host, timeout=self.timeout)
154
                self.auth.apply_auth(url, 'POST', self.headers, self.parameters)
Josh Roesslein's avatar
Josh Roesslein committed
155
                conn.connect()
156
                conn.request('POST', self.url, self.body, headers=self.headers)
Josh Roesslein's avatar
Josh Roesslein committed
157
158
159
160
161
162
                resp = conn.getresponse()
                if resp.status != 200:
                    if self.listener.on_error(resp.status) is False:
                        break
                    error_counter += 1
                    sleep(self.retry_time)
163
                    self.retry_time = min(self.retry_time * 2, self.retry_time_cap)
Josh Roesslein's avatar
Josh Roesslein committed
164
165
                else:
                    error_counter = 0
166
167
                    self.retry_time = self.retry_time_start
                    self.snooze_time = self.snooze_time_start
168
                    self.listener.on_connect()
Josh Roesslein's avatar
Josh Roesslein committed
169
                    self._read_loop(resp)
170
171
172
173
174
175
            except (timeout, ssl.SSLError), exc:
                # If it's not time out treat it like any other exception
                if isinstance(exc, ssl.SSLError) and not (exc.args and 'timed out' in str(exc.args[0])):
                    exception = exc
                    break

Josh Roesslein's avatar
Josh Roesslein committed
176
177
178
179
180
181
                if self.listener.on_timeout() == False:
                    break
                if self.running is False:
                    break
                conn.close()
                sleep(self.snooze_time)
182
                self.snooze_time = min(self.snooze_time+0.25, self.snooze_time_cap)
183
            except Exception, exception:
Josh Roesslein's avatar
Josh Roesslein committed
184
185
186
187
188
189
190
191
                # any other exception is fatal, so kill loop
                break

        # cleanup
        self.running = False
        if conn:
            conn.close()

192
        if exception:
193
194
            # call a handler first so that the exception can be logged.
            self.listener.on_exception(exception)
195
            raise
196

197
    def _data(self, data):
198
        if self.listener.on_data(data) is False:
199
            self.running = False
200

Josh Roesslein's avatar
Josh Roesslein committed
201
    def _read_loop(self, resp):
202

203
        while self.running and not resp.isclosed():
204
205
206

            # Note: keep-alive newlines might be inserted before each length value.
            # read until we get a digit...
207
208
            c = '\n'
            while c == '\n' and self.running and not resp.isclosed():
209
210
211
212
                c = resp.read(1)
            delimited_string = c

            # read rest of delimiter length..
Steve Jones's avatar
Steve Jones committed
213
            d = ''
214
            while d != '\n' and self.running and not resp.isclosed():
Steve Jones's avatar
Steve Jones committed
215
216
                d = resp.read(1)
                delimited_string += d
217
218

            # read the next twitter status object
219
            if delimited_string.strip().isdigit():
220
221
                next_status_obj = resp.read( int(delimited_string) )
                self._data(next_status_obj)
Josh Roesslein's avatar
Josh Roesslein committed
222

223
224
        if resp.isclosed():
            self.on_closed(resp)
Josh Roesslein's avatar
Josh Roesslein committed
225

226
227
228
229
230
231
    def _start(self, async):
        self.running = True
        if async:
            Thread(target=self._run).start()
        else:
            self._run()
Josh Roesslein's avatar
Josh Roesslein committed
232

233
234
235
236
    def on_closed(self, resp):
        """ Called when the response has been closed by Twitter """
        pass

AlanBell's avatar
AlanBell committed
237
    def userstream(self, count=None, async=False, secure=True):
238
        self.parameters = {'delimited': 'length'}
AlanBell's avatar
AlanBell committed
239
240
        if self.running:
            raise TweepError('Stream object already connected!')
241
        self.url = '/2/user.json?delimited=length'
AlanBell's avatar
AlanBell committed
242
243
        self.host='userstream.twitter.com'
        self._start(async)
244
245

    def firehose(self, count=None, async=False):
246
        self.parameters = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
247
248
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
249
        self.url = '/%s/statuses/firehose.json?delimited=length' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
250
251
        if count:
            self.url += '&count=%s' % count
252
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
253

254
    def retweet(self, async=False):
255
        self.parameters = {'delimited': 'length'}
256
257
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
258
        self.url = '/%s/statuses/retweet.json?delimited=length' % STREAM_VERSION
259
        self._start(async)
260

261
    def sample(self, count=None, async=False):
262
        self.parameters = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
263
264
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
265
        self.url = '/%s/statuses/sample.json?delimited=length' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
266
267
        if count:
            self.url += '&count=%s' % count
268
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
269

270
    def filter(self, follow=None, track=None, async=False, locations=None, 
271
        count = None, stall_warnings=False, languages=None):
272
        self.parameters = {}
273
        self.headers['Content-type'] = "application/x-www-form-urlencoded"
Josh Roesslein's avatar
Josh Roesslein committed
274
275
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
276
        self.url = '/%s/statuses/filter.json?delimited=length' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
277
        if follow:
278
            self.parameters['follow'] = ','.join(map(str, follow))
Josh Roesslein's avatar
Josh Roesslein committed
279
        if track:
280
            self.parameters['track'] = ','.join(map(str, track))
281
282
        if locations and len(locations) > 0:
            assert len(locations) % 4 == 0
283
            self.parameters['locations'] = ','.join(['%.2f' % l for l in locations])
284
285
        if count:
            self.parameters['count'] = count
286
287
        if stall_warnings:
            self.parameters['stall_warnings'] = stall_warnings
288
289
        if languages:
            self.parameters['language'] = ','.join(map(str, languages))
290
        self.body = urlencode_noplus(self.parameters)
291
        self.parameters['delimited'] = 'length'
292
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
293
294

    def disconnect(self):
295
        if self.running is False:
Josh Roesslein's avatar
Josh Roesslein committed
296
297
298
            return
        self.running = False