streaming.py 8.62 KB
Newer Older
1
# Tweepy
2
3
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
4
5

import httplib
6
from socket import timeout
7
from threading import Thread
8
from time import sleep
9

10
from tweepy.models import Status
Josh Roesslein's avatar
Josh Roesslein committed
11
12
from tweepy.api import API
from tweepy.error import TweepError
13

14
from tweepy.utils import import_simplejson, urlencode_noplus
15
json = import_simplejson()
16

Joshua Roesslein's avatar
Joshua Roesslein committed
17
STREAM_VERSION = '1.1'
18

Josh Roesslein's avatar
Josh Roesslein committed
19

20
21
class StreamListener(object):

22
23
    def __init__(self, api=None):
        self.api = api or API()
24

25
26
27
28
29
30
31
32
33
    def on_connect(self):
        """Called once connected to streaming server.

        This will be invoked once a successful response
        is received from the server. Allows the listener
        to perform some work prior to entering the read loop.
        """
        pass

34
    def on_data(self, raw_data):
35
36
37
38
39
        """Called when raw data is received from connection.

        Override this method if you wish to manually handle
        the stream data. Return False to stop stream and close connection.
        """
40
        data = json.loads(raw_data)
41
42

        if 'in_reply_to_status_id' in data:
43
            status = Status.parse(self.api, data)
44
45
46
            if self.on_status(status) is False:
                return False
        elif 'delete' in data:
47
            delete = data['delete']['status']
48
49
50
            if self.on_delete(delete['id'], delete['user_id']) is False:
                return False
        elif 'limit' in data:
51
            if self.on_limit(data['limit']['track']) is False:
52
                return False
53
54
55
56
57
        elif 'disconnect' in data:
            if self.on_disconnect(data['disconnect']) is False:
                return False
        else:
            logging.error("Unknown message type: " + str(raw_data))
58

Josh Roesslein's avatar
Josh Roesslein committed
59
60
61
62
63
64
65
    def on_status(self, status):
        """Called when a new status arrives"""
        return

    def on_delete(self, status_id, user_id):
        """Called when a delete notice arrives for a status"""
        return
66

Josh Roesslein's avatar
Josh Roesslein committed
67
68
69
    def on_limit(self, track):
        """Called when a limitation notice arrvies"""
        return
70

Josh Roesslein's avatar
Josh Roesslein committed
71
72
73
    def on_error(self, status_code):
        """Called when a non-200 status code is returned"""
        return False
74

Josh Roesslein's avatar
Josh Roesslein committed
75
76
77
    def on_timeout(self):
        """Called when stream connection times out"""
        return
78

79
80
81
    def on_disconnect(self, notice):
        """Called when twitter sends a disconnect notice

82
        Disconnect codes are listed here:
83
84
        https://dev.twitter.com/docs/streaming-apis/messages#Disconnect_messages_disconnect
        """
85
        return
86

87

88
89
class Stream(object):

Josh Roesslein's avatar
Josh Roesslein committed
90
91
    host = 'stream.twitter.com'

92
    def __init__(self, auth, listener, **options):
93
        self.auth = auth
Josh Roesslein's avatar
Josh Roesslein committed
94
        self.listener = listener
95
        self.running = False
96
        self.timeout = options.get("timeout", 300.0)
97
        self.retry_count = options.get("retry_count")
98
99
100
        self.retry_time = options.get("retry_time", 10.0)
        self.snooze_time = options.get("snooze_time",  5.0)
        self.buffer_size = options.get("buffer_size",  1500)
101
        if options.get("secure", True):
102
103
104
105
            self.scheme = "https"
        else:
            self.scheme = "http"

Josh Roesslein's avatar
Josh Roesslein committed
106
        self.api = API()
107
        self.headers = options.get("headers") or {}
108
        self.parameters = None
109
        self.body = None
Josh Roesslein's avatar
Josh Roesslein committed
110
111

    def _run(self):
112
        # Authenticate
113
        url = "%s://%s%s" % (self.scheme, self.host, self.url)
Josh Roesslein's avatar
Josh Roesslein committed
114

115
        # Connect and process the stream
Josh Roesslein's avatar
Josh Roesslein committed
116
117
        error_counter = 0
        conn = None
118
        exception = None
Josh Roesslein's avatar
Josh Roesslein committed
119
        while self.running:
120
            if self.retry_count is not None and error_counter > self.retry_count:
Josh Roesslein's avatar
Josh Roesslein committed
121
122
123
                # quit if error count greater than retry count
                break
            try:
124
                if self.scheme == "http":
Timo Ewalds's avatar
Timo Ewalds committed
125
                    conn = httplib.HTTPConnection(self.host, timeout=self.timeout)
126
                else:
Timo Ewalds's avatar
Timo Ewalds committed
127
                    conn = httplib.HTTPSConnection(self.host, timeout=self.timeout)
128
                self.auth.apply_auth(url, 'POST', self.headers, self.parameters)
Josh Roesslein's avatar
Josh Roesslein committed
129
                conn.connect()
130
                conn.request('POST', self.url, self.body, headers=self.headers)
Josh Roesslein's avatar
Josh Roesslein committed
131
132
133
134
135
136
137
138
                resp = conn.getresponse()
                if resp.status != 200:
                    if self.listener.on_error(resp.status) is False:
                        break
                    error_counter += 1
                    sleep(self.retry_time)
                else:
                    error_counter = 0
139
                    self.listener.on_connect()
Josh Roesslein's avatar
Josh Roesslein committed
140
141
142
143
144
145
146
147
                    self._read_loop(resp)
            except timeout:
                if self.listener.on_timeout() == False:
                    break
                if self.running is False:
                    break
                conn.close()
                sleep(self.snooze_time)
148
            except Exception, exception:
Josh Roesslein's avatar
Josh Roesslein committed
149
150
151
152
153
154
155
156
                # any other exception is fatal, so kill loop
                break

        # cleanup
        self.running = False
        if conn:
            conn.close()

157
        if exception:
158
            raise
159

160
    def _data(self, data):
161
        if self.listener.on_data(data) is False:
162
            self.running = False
163

Josh Roesslein's avatar
Josh Roesslein committed
164
    def _read_loop(self, resp):
165

166
        while self.running and not resp.isclosed():
167
168
169

            # Note: keep-alive newlines might be inserted before each length value.
            # read until we get a digit...
170
171
            c = '\n'
            while c == '\n' and self.running and not resp.isclosed():
172
173
174
175
                c = resp.read(1)
            delimited_string = c

            # read rest of delimiter length..
Steve Jones's avatar
Steve Jones committed
176
            d = ''
177
            while d != '\n' and self.running and not resp.isclosed():
Steve Jones's avatar
Steve Jones committed
178
179
                d = resp.read(1)
                delimited_string += d
180
181

            # read the next twitter status object
182
            if delimited_string.strip().isdigit():
183
184
                next_status_obj = resp.read( int(delimited_string) )
                self._data(next_status_obj)
Josh Roesslein's avatar
Josh Roesslein committed
185

186
187
        if resp.isclosed():
            self.on_closed(resp)
Josh Roesslein's avatar
Josh Roesslein committed
188

189
190
191
192
193
194
    def _start(self, async):
        self.running = True
        if async:
            Thread(target=self._run).start()
        else:
            self._run()
Josh Roesslein's avatar
Josh Roesslein committed
195

196
197
198
199
    def on_closed(self, resp):
        """ Called when the response has been closed by Twitter """
        pass

AlanBell's avatar
AlanBell committed
200
    def userstream(self, count=None, async=False, secure=True):
201
        self.parameters = {'delimited': 'length'}
AlanBell's avatar
AlanBell committed
202
203
        if self.running:
            raise TweepError('Stream object already connected!')
204
        self.url = '/2/user.json?delimited=length'
AlanBell's avatar
AlanBell committed
205
206
        self.host='userstream.twitter.com'
        self._start(async)
207
208

    def firehose(self, count=None, async=False):
209
        self.parameters = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
210
211
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
212
        self.url = '/%s/statuses/firehose.json?delimited=length' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
213
214
        if count:
            self.url += '&count=%s' % count
215
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
216

217
    def retweet(self, async=False):
218
        self.parameters = {'delimited': 'length'}
219
220
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
221
        self.url = '/%s/statuses/retweet.json?delimited=length' % STREAM_VERSION
222
        self._start(async)
223

224
    def sample(self, count=None, async=False):
225
        self.parameters = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
226
227
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
228
        self.url = '/%s/statuses/sample.json?delimited=length' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
229
230
        if count:
            self.url += '&count=%s' % count
231
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
232

233
    def filter(self, follow=None, track=None, async=False, locations=None, 
234
        count = None, stall_warnings=False, languages=None):
235
        self.parameters = {}
236
        self.headers['Content-type'] = "application/x-www-form-urlencoded"
Josh Roesslein's avatar
Josh Roesslein committed
237
238
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
239
        self.url = '/%s/statuses/filter.json?delimited=length' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
240
        if follow:
241
            self.parameters['follow'] = ','.join(map(str, follow))
Josh Roesslein's avatar
Josh Roesslein committed
242
        if track:
243
            self.parameters['track'] = ','.join(map(str, track))
244
245
        if locations and len(locations) > 0:
            assert len(locations) % 4 == 0
246
            self.parameters['locations'] = ','.join(['%.2f' % l for l in locations])
247
248
        if count:
            self.parameters['count'] = count
249
250
        if stall_warnings:
            self.parameters['stall_warnings'] = stall_warnings
251
252
        if languages:
            self.parameters['language'] = ','.join(map(str, languages))
253
        self.body = urlencode_noplus(self.parameters)
254
        self.parameters['delimited'] = 'length'
255
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
256
257

    def disconnect(self):
258
        if self.running is False:
Josh Roesslein's avatar
Josh Roesslein committed
259
260
261
            return
        self.running = False