streaming.py 9.16 KB
Newer Older
1
# Tweepy
2
3
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
4

5
import logging
6
import httplib
7
from socket import timeout
8
from threading import Thread
9
from time import sleep
10

11
from tweepy.models import Status
Josh Roesslein's avatar
Josh Roesslein committed
12
13
from tweepy.api import API
from tweepy.error import TweepError
14

15
from tweepy.utils import import_simplejson, urlencode_noplus
16
json = import_simplejson()
17

Joshua Roesslein's avatar
Joshua Roesslein committed
18
STREAM_VERSION = '1.1'
19

Josh Roesslein's avatar
Josh Roesslein committed
20

21
22
class StreamListener(object):

23
24
    def __init__(self, api=None):
        self.api = api or API()
25

26
27
28
29
30
31
32
33
34
    def on_connect(self):
        """Called once connected to streaming server.

        This will be invoked once a successful response
        is received from the server. Allows the listener
        to perform some work prior to entering the read loop.
        """
        pass

35
    def on_data(self, raw_data):
36
37
38
39
40
        """Called when raw data is received from connection.

        Override this method if you wish to manually handle
        the stream data. Return False to stop stream and close connection.
        """
41
        data = json.loads(raw_data)
42
43

        if 'in_reply_to_status_id' in data:
44
            status = Status.parse(self.api, data)
45
46
47
            if self.on_status(status) is False:
                return False
        elif 'delete' in data:
48
            delete = data['delete']['status']
49
50
            if self.on_delete(delete['id'], delete['user_id']) is False:
                return False
Tetsuya Shinone's avatar
Tetsuya Shinone committed
51
52
53
54
55
56
57
58
        elif 'event' in data:
            status = Status.parse(self.api, data)
            if self.on_event(status) is False:
                return False
        elif 'direct_message' in data:
            status = Status.parse(self.api, data)
            if self.on_direct_message(status) is False:
                return False
59
        elif 'limit' in data:
60
            if self.on_limit(data['limit']['track']) is False:
61
                return False
62
63
64
65
66
        elif 'disconnect' in data:
            if self.on_disconnect(data['disconnect']) is False:
                return False
        else:
            logging.error("Unknown message type: " + str(raw_data))
67

Josh Roesslein's avatar
Josh Roesslein committed
68
69
70
71
72
73
74
    def on_status(self, status):
        """Called when a new status arrives"""
        return

    def on_delete(self, status_id, user_id):
        """Called when a delete notice arrives for a status"""
        return
75

Tetsuya Shinone's avatar
Tetsuya Shinone committed
76
77
78
79
80
81
82
83
    def on_event(self, status):
        """Called when a new event arrives"""
        return

    def on_direct_message(self, status):
        """Called when a new direct message arrives"""
        return

Josh Roesslein's avatar
Josh Roesslein committed
84
85
86
    def on_limit(self, track):
        """Called when a limitation notice arrvies"""
        return
87

Josh Roesslein's avatar
Josh Roesslein committed
88
89
90
    def on_error(self, status_code):
        """Called when a non-200 status code is returned"""
        return False
91

Josh Roesslein's avatar
Josh Roesslein committed
92
93
94
    def on_timeout(self):
        """Called when stream connection times out"""
        return
95

96
97
98
    def on_disconnect(self, notice):
        """Called when twitter sends a disconnect notice

99
        Disconnect codes are listed here:
100
101
        https://dev.twitter.com/docs/streaming-apis/messages#Disconnect_messages_disconnect
        """
102
        return
103

104

105
106
class Stream(object):

Josh Roesslein's avatar
Josh Roesslein committed
107
108
    host = 'stream.twitter.com'

109
    def __init__(self, auth, listener, **options):
110
        self.auth = auth
Josh Roesslein's avatar
Josh Roesslein committed
111
        self.listener = listener
112
        self.running = False
113
        self.timeout = options.get("timeout", 300.0)
114
        self.retry_count = options.get("retry_count")
115
116
117
        self.retry_time = options.get("retry_time", 10.0)
        self.snooze_time = options.get("snooze_time",  5.0)
        self.buffer_size = options.get("buffer_size",  1500)
118
        if options.get("secure", True):
119
120
121
122
            self.scheme = "https"
        else:
            self.scheme = "http"

Josh Roesslein's avatar
Josh Roesslein committed
123
        self.api = API()
124
        self.headers = options.get("headers") or {}
125
        self.parameters = None
126
        self.body = None
Josh Roesslein's avatar
Josh Roesslein committed
127
128

    def _run(self):
129
        # Authenticate
130
        url = "%s://%s%s" % (self.scheme, self.host, self.url)
Josh Roesslein's avatar
Josh Roesslein committed
131

132
        # Connect and process the stream
Josh Roesslein's avatar
Josh Roesslein committed
133
134
        error_counter = 0
        conn = None
135
        exception = None
Josh Roesslein's avatar
Josh Roesslein committed
136
        while self.running:
137
            if self.retry_count is not None and error_counter > self.retry_count:
Josh Roesslein's avatar
Josh Roesslein committed
138
139
140
                # quit if error count greater than retry count
                break
            try:
141
                if self.scheme == "http":
Timo Ewalds's avatar
Timo Ewalds committed
142
                    conn = httplib.HTTPConnection(self.host, timeout=self.timeout)
143
                else:
Timo Ewalds's avatar
Timo Ewalds committed
144
                    conn = httplib.HTTPSConnection(self.host, timeout=self.timeout)
145
                self.auth.apply_auth(url, 'POST', self.headers, self.parameters)
Josh Roesslein's avatar
Josh Roesslein committed
146
                conn.connect()
147
                conn.request('POST', self.url, self.body, headers=self.headers)
Josh Roesslein's avatar
Josh Roesslein committed
148
149
150
151
152
153
154
155
                resp = conn.getresponse()
                if resp.status != 200:
                    if self.listener.on_error(resp.status) is False:
                        break
                    error_counter += 1
                    sleep(self.retry_time)
                else:
                    error_counter = 0
156
                    self.listener.on_connect()
Josh Roesslein's avatar
Josh Roesslein committed
157
158
159
160
161
162
163
164
                    self._read_loop(resp)
            except timeout:
                if self.listener.on_timeout() == False:
                    break
                if self.running is False:
                    break
                conn.close()
                sleep(self.snooze_time)
165
            except Exception, exception:
Josh Roesslein's avatar
Josh Roesslein committed
166
167
168
169
170
171
172
173
                # any other exception is fatal, so kill loop
                break

        # cleanup
        self.running = False
        if conn:
            conn.close()

174
        if exception:
175
            raise
176

177
    def _data(self, data):
178
        if self.listener.on_data(data) is False:
179
            self.running = False
180

Josh Roesslein's avatar
Josh Roesslein committed
181
    def _read_loop(self, resp):
182

183
        while self.running and not resp.isclosed():
184
185
186

            # Note: keep-alive newlines might be inserted before each length value.
            # read until we get a digit...
187
188
            c = '\n'
            while c == '\n' and self.running and not resp.isclosed():
189
190
191
192
                c = resp.read(1)
            delimited_string = c

            # read rest of delimiter length..
Steve Jones's avatar
Steve Jones committed
193
            d = ''
194
            while d != '\n' and self.running and not resp.isclosed():
Steve Jones's avatar
Steve Jones committed
195
196
                d = resp.read(1)
                delimited_string += d
197
198

            # read the next twitter status object
199
            if delimited_string.strip().isdigit():
200
201
                next_status_obj = resp.read( int(delimited_string) )
                self._data(next_status_obj)
Josh Roesslein's avatar
Josh Roesslein committed
202

203
204
        if resp.isclosed():
            self.on_closed(resp)
Josh Roesslein's avatar
Josh Roesslein committed
205

206
207
208
209
210
211
    def _start(self, async):
        self.running = True
        if async:
            Thread(target=self._run).start()
        else:
            self._run()
Josh Roesslein's avatar
Josh Roesslein committed
212

213
214
215
216
    def on_closed(self, resp):
        """ Called when the response has been closed by Twitter """
        pass

AlanBell's avatar
AlanBell committed
217
    def userstream(self, count=None, async=False, secure=True):
218
        self.parameters = {'delimited': 'length'}
AlanBell's avatar
AlanBell committed
219
220
        if self.running:
            raise TweepError('Stream object already connected!')
221
        self.url = '/2/user.json?delimited=length'
AlanBell's avatar
AlanBell committed
222
223
        self.host='userstream.twitter.com'
        self._start(async)
224
225

    def firehose(self, count=None, async=False):
226
        self.parameters = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
227
228
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
229
        self.url = '/%s/statuses/firehose.json?delimited=length' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
230
231
        if count:
            self.url += '&count=%s' % count
232
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
233

234
    def retweet(self, async=False):
235
        self.parameters = {'delimited': 'length'}
236
237
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
238
        self.url = '/%s/statuses/retweet.json?delimited=length' % STREAM_VERSION
239
        self._start(async)
240

241
    def sample(self, count=None, async=False):
242
        self.parameters = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
243
244
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
245
        self.url = '/%s/statuses/sample.json?delimited=length' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
246
247
        if count:
            self.url += '&count=%s' % count
248
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
249

250
    def filter(self, follow=None, track=None, async=False, locations=None, 
251
        count = None, stall_warnings=False, languages=None):
252
        self.parameters = {}
253
        self.headers['Content-type'] = "application/x-www-form-urlencoded"
Josh Roesslein's avatar
Josh Roesslein committed
254
255
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
256
        self.url = '/%s/statuses/filter.json?delimited=length' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
257
        if follow:
258
            self.parameters['follow'] = ','.join(map(str, follow))
Josh Roesslein's avatar
Josh Roesslein committed
259
        if track:
260
            self.parameters['track'] = ','.join(map(str, track))
261
262
        if locations and len(locations) > 0:
            assert len(locations) % 4 == 0
263
            self.parameters['locations'] = ','.join(['%.2f' % l for l in locations])
264
265
        if count:
            self.parameters['count'] = count
266
267
        if stall_warnings:
            self.parameters['stall_warnings'] = stall_warnings
268
269
        if languages:
            self.parameters['language'] = ','.join(map(str, languages))
270
        self.body = urlencode_noplus(self.parameters)
271
        self.parameters['delimited'] = 'length'
272
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
273
274

    def disconnect(self):
275
        if self.running is False:
Josh Roesslein's avatar
Josh Roesslein committed
276
277
278
            return
        self.running = False