streaming.py 8.17 KB
Newer Older
1
# Tweepy
2
3
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
4
5

import httplib
6
from socket import timeout
7
from threading import Thread
8
from time import sleep
9

10
from tweepy.models import Status
Josh Roesslein's avatar
Josh Roesslein committed
11
12
from tweepy.api import API
from tweepy.error import TweepError
13

14
from tweepy.utils import import_simplejson, urlencode_noplus
15
json = import_simplejson()
16

Joshua Roesslein's avatar
Joshua Roesslein committed
17
STREAM_VERSION = '1.1'
18

Josh Roesslein's avatar
Josh Roesslein committed
19

20
21
class StreamListener(object):

22
23
    def __init__(self, api=None):
        self.api = api or API()
24

25
26
27
28
29
30
31
32
33
    def on_connect(self):
        """Called once connected to streaming server.

        This will be invoked once a successful response
        is received from the server. Allows the listener
        to perform some work prior to entering the read loop.
        """
        pass

34
35
36
37
38
39
40
41
    def on_data(self, data):
        """Called when raw data is received from connection.

        Override this method if you wish to manually handle
        the stream data. Return False to stop stream and close connection.
        """

        if 'in_reply_to_status_id' in data:
42
            status = Status.parse(self.api, json.loads(data))
43
44
45
46
47
48
49
50
51
52
            if self.on_status(status) is False:
                return False
        elif 'delete' in data:
            delete = json.loads(data)['delete']['status']
            if self.on_delete(delete['id'], delete['user_id']) is False:
                return False
        elif 'limit' in data:
            if self.on_limit(json.loads(data)['limit']['track']) is False:
                return False

Josh Roesslein's avatar
Josh Roesslein committed
53
54
55
56
57
58
59
    def on_status(self, status):
        """Called when a new status arrives"""
        return

    def on_delete(self, status_id, user_id):
        """Called when a delete notice arrives for a status"""
        return
60

Josh Roesslein's avatar
Josh Roesslein committed
61
62
63
    def on_limit(self, track):
        """Called when a limitation notice arrvies"""
        return
64

Josh Roesslein's avatar
Josh Roesslein committed
65
66
67
    def on_error(self, status_code):
        """Called when a non-200 status code is returned"""
        return False
68

Josh Roesslein's avatar
Josh Roesslein committed
69
70
71
    def on_timeout(self):
        """Called when stream connection times out"""
        return
72

73

74
75
class Stream(object):

Josh Roesslein's avatar
Josh Roesslein committed
76
77
    host = 'stream.twitter.com'

78
    def __init__(self, auth, listener, **options):
79
        self.auth = auth
Josh Roesslein's avatar
Josh Roesslein committed
80
        self.listener = listener
81
        self.running = False
82
        self.timeout = options.get("timeout", 300.0)
83
        self.retry_count = options.get("retry_count")
84
85
86
        self.retry_time = options.get("retry_time", 10.0)
        self.snooze_time = options.get("snooze_time",  5.0)
        self.buffer_size = options.get("buffer_size",  1500)
87
        if options.get("secure", True):
88
89
90
91
            self.scheme = "https"
        else:
            self.scheme = "http"

Josh Roesslein's avatar
Josh Roesslein committed
92
        self.api = API()
93
        self.headers = options.get("headers") or {}
94
        self.parameters = None
95
        self.body = None
Josh Roesslein's avatar
Josh Roesslein committed
96
97

    def _run(self):
98
        # Authenticate
99
        url = "%s://%s%s" % (self.scheme, self.host, self.url)
Josh Roesslein's avatar
Josh Roesslein committed
100

101
        # Connect and process the stream
Josh Roesslein's avatar
Josh Roesslein committed
102
103
        error_counter = 0
        conn = None
104
        exception = None
Josh Roesslein's avatar
Josh Roesslein committed
105
        while self.running:
106
            if self.retry_count is not None and error_counter > self.retry_count:
Josh Roesslein's avatar
Josh Roesslein committed
107
108
109
                # quit if error count greater than retry count
                break
            try:
110
111
112
113
                if self.scheme == "http":
                    conn = httplib.HTTPConnection(self.host)
                else:
                    conn = httplib.HTTPSConnection(self.host)
114
                self.auth.apply_auth(url, 'POST', self.headers, self.parameters)
Josh Roesslein's avatar
Josh Roesslein committed
115
116
                conn.connect()
                conn.sock.settimeout(self.timeout)
117
                conn.request('POST', self.url, self.body, headers=self.headers)
Josh Roesslein's avatar
Josh Roesslein committed
118
119
120
121
122
123
124
125
                resp = conn.getresponse()
                if resp.status != 200:
                    if self.listener.on_error(resp.status) is False:
                        break
                    error_counter += 1
                    sleep(self.retry_time)
                else:
                    error_counter = 0
126
                    self.listener.on_connect()
Josh Roesslein's avatar
Josh Roesslein committed
127
128
129
130
131
132
133
134
                    self._read_loop(resp)
            except timeout:
                if self.listener.on_timeout() == False:
                    break
                if self.running is False:
                    break
                conn.close()
                sleep(self.snooze_time)
135
            except Exception, exception:
Josh Roesslein's avatar
Josh Roesslein committed
136
137
138
139
140
141
142
143
                # any other exception is fatal, so kill loop
                break

        # cleanup
        self.running = False
        if conn:
            conn.close()

144
        if exception:
145
            raise
146

147
    def _data(self, data):
148
        if self.listener.on_data(data) is False:
149
            self.running = False
150

Josh Roesslein's avatar
Josh Roesslein committed
151
    def _read_loop(self, resp):
152

153
        while self.running and not resp.isclosed():
154
155
156

            # Note: keep-alive newlines might be inserted before each length value.
            # read until we get a digit...
157
158
            c = '\n'
            while c == '\n' and self.running and not resp.isclosed():
159
160
161
162
                c = resp.read(1)
            delimited_string = c

            # read rest of delimiter length..
Steve Jones's avatar
Steve Jones committed
163
            d = ''
164
            while d != '\n' and self.running and not resp.isclosed():
Steve Jones's avatar
Steve Jones committed
165
166
                d = resp.read(1)
                delimited_string += d
167
168

            # read the next twitter status object
169
            if delimited_string.strip().isdigit():
170
171
                next_status_obj = resp.read( int(delimited_string) )
                self._data(next_status_obj)
Josh Roesslein's avatar
Josh Roesslein committed
172

173
174
        if resp.isclosed():
            self.on_closed(resp)
Josh Roesslein's avatar
Josh Roesslein committed
175

176
177
178
179
180
181
    def _start(self, async):
        self.running = True
        if async:
            Thread(target=self._run).start()
        else:
            self._run()
Josh Roesslein's avatar
Josh Roesslein committed
182

183
184
185
186
    def on_closed(self, resp):
        """ Called when the response has been closed by Twitter """
        pass

AlanBell's avatar
AlanBell committed
187
    def userstream(self, count=None, async=False, secure=True):
188
        self.parameters = {'delimited': 'length'}
AlanBell's avatar
AlanBell committed
189
190
        if self.running:
            raise TweepError('Stream object already connected!')
191
        self.url = '/2/user.json?delimited=length'
AlanBell's avatar
AlanBell committed
192
193
        self.host='userstream.twitter.com'
        self._start(async)
194
195

    def firehose(self, count=None, async=False):
196
        self.parameters = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
197
198
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
199
        self.url = '/%s/statuses/firehose.json?delimited=length' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
200
201
        if count:
            self.url += '&count=%s' % count
202
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
203

204
    def retweet(self, async=False):
205
        self.parameters = {'delimited': 'length'}
206
207
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
208
        self.url = '/%s/statuses/retweet.json?delimited=length' % STREAM_VERSION
209
        self._start(async)
210

211
    def sample(self, count=None, async=False):
212
        self.parameters = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
213
214
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
215
        self.url = '/%s/statuses/sample.json?delimited=length' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
216
217
        if count:
            self.url += '&count=%s' % count
218
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
219

220
    def filter(self, follow=None, track=None, async=False, locations=None, 
221
        count = None, stall_warnings=False, languages=None):
222
        self.parameters = {}
223
        self.headers['Content-type'] = "application/x-www-form-urlencoded"
Josh Roesslein's avatar
Josh Roesslein committed
224
225
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
226
        self.url = '/%s/statuses/filter.json?delimited=length' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
227
        if follow:
228
            self.parameters['follow'] = ','.join(map(str, follow))
Josh Roesslein's avatar
Josh Roesslein committed
229
        if track:
230
            self.parameters['track'] = ','.join(map(str, track))
231
232
        if locations and len(locations) > 0:
            assert len(locations) % 4 == 0
233
            self.parameters['locations'] = ','.join(['%.2f' % l for l in locations])
234
235
        if count:
            self.parameters['count'] = count
236
237
        if stall_warnings:
            self.parameters['stall_warnings'] = stall_warnings
238
239
        if languages:
            self.parameters['language'] = ','.join(map(str, languages))
240
        self.body = urlencode_noplus(self.parameters)
241
        self.parameters['delimited'] = 'length'
242
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
243
244

    def disconnect(self):
245
        if self.running is False:
Josh Roesslein's avatar
Josh Roesslein committed
246
247
248
            return
        self.running = False