streaming.py 8.16 KB
Newer Older
1
# Tweepy
2
3
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
4
5

import httplib
6
from socket import timeout
7
from threading import Thread
8
from time import sleep
9

10
from tweepy.models import Status
Josh Roesslein's avatar
Josh Roesslein committed
11
12
from tweepy.api import API
from tweepy.error import TweepError
13

14
from tweepy.utils import import_simplejson, urlencode_noplus
15
json = import_simplejson()
16

Joshua Roesslein's avatar
Joshua Roesslein committed
17
STREAM_VERSION = '1.1'
18

Josh Roesslein's avatar
Josh Roesslein committed
19

20
21
class StreamListener(object):

22
23
    def __init__(self, api=None):
        self.api = api or API()
24

25
26
27
28
29
30
31
32
33
    def on_connect(self):
        """Called once connected to streaming server.

        This will be invoked once a successful response
        is received from the server. Allows the listener
        to perform some work prior to entering the read loop.
        """
        pass

34
35
36
37
38
39
40
41
    def on_data(self, data):
        """Called when raw data is received from connection.

        Override this method if you wish to manually handle
        the stream data. Return False to stop stream and close connection.
        """

        if 'in_reply_to_status_id' in data:
42
            status = Status.parse(self.api, json.loads(data))
43
44
45
46
47
48
49
50
51
52
            if self.on_status(status) is False:
                return False
        elif 'delete' in data:
            delete = json.loads(data)['delete']['status']
            if self.on_delete(delete['id'], delete['user_id']) is False:
                return False
        elif 'limit' in data:
            if self.on_limit(json.loads(data)['limit']['track']) is False:
                return False

Josh Roesslein's avatar
Josh Roesslein committed
53
54
55
56
57
58
59
    def on_status(self, status):
        """Called when a new status arrives"""
        return

    def on_delete(self, status_id, user_id):
        """Called when a delete notice arrives for a status"""
        return
60

Josh Roesslein's avatar
Josh Roesslein committed
61
62
63
    def on_limit(self, track):
        """Called when a limitation notice arrvies"""
        return
64

Josh Roesslein's avatar
Josh Roesslein committed
65
66
67
    def on_error(self, status_code):
        """Called when a non-200 status code is returned"""
        return False
68

Josh Roesslein's avatar
Josh Roesslein committed
69
70
71
    def on_timeout(self):
        """Called when stream connection times out"""
        return
72

73

74
75
class Stream(object):

Josh Roesslein's avatar
Josh Roesslein committed
76
77
    host = 'stream.twitter.com'

78
    def __init__(self, auth, listener, **options):
79
        self.auth = auth
Josh Roesslein's avatar
Josh Roesslein committed
80
        self.listener = listener
81
        self.running = False
82
        self.timeout = options.get("timeout", 300.0)
83
        self.retry_count = options.get("retry_count")
84
85
86
        self.retry_time = options.get("retry_time", 10.0)
        self.snooze_time = options.get("snooze_time",  5.0)
        self.buffer_size = options.get("buffer_size",  1500)
87
        if options.get("secure", True):
88
89
90
91
            self.scheme = "https"
        else:
            self.scheme = "http"

Josh Roesslein's avatar
Josh Roesslein committed
92
        self.api = API()
93
        self.headers = options.get("headers") or {}
94
        self.parameters = None
95
        self.body = None
Josh Roesslein's avatar
Josh Roesslein committed
96
97

    def _run(self):
98
        # Authenticate
99
        url = "%s://%s%s" % (self.scheme, self.host, self.url)
Josh Roesslein's avatar
Josh Roesslein committed
100

101
        # Connect and process the stream
Josh Roesslein's avatar
Josh Roesslein committed
102
103
        error_counter = 0
        conn = None
104
        exception = None
Josh Roesslein's avatar
Josh Roesslein committed
105
        while self.running:
106
            if self.retry_count is not None and error_counter > self.retry_count:
Josh Roesslein's avatar
Josh Roesslein committed
107
108
109
                # quit if error count greater than retry count
                break
            try:
110
                if self.scheme == "http":
Timo Ewalds's avatar
Timo Ewalds committed
111
                    conn = httplib.HTTPConnection(self.host, timeout=self.timeout)
112
                else:
Timo Ewalds's avatar
Timo Ewalds committed
113
                    conn = httplib.HTTPSConnection(self.host, timeout=self.timeout)
114
                self.auth.apply_auth(url, 'POST', self.headers, self.parameters)
Josh Roesslein's avatar
Josh Roesslein committed
115
                conn.connect()
116
                conn.request('POST', self.url, self.body, headers=self.headers)
Josh Roesslein's avatar
Josh Roesslein committed
117
118
119
120
121
122
123
124
                resp = conn.getresponse()
                if resp.status != 200:
                    if self.listener.on_error(resp.status) is False:
                        break
                    error_counter += 1
                    sleep(self.retry_time)
                else:
                    error_counter = 0
125
                    self.listener.on_connect()
Josh Roesslein's avatar
Josh Roesslein committed
126
127
128
129
130
131
132
133
                    self._read_loop(resp)
            except timeout:
                if self.listener.on_timeout() == False:
                    break
                if self.running is False:
                    break
                conn.close()
                sleep(self.snooze_time)
134
            except Exception, exception:
Josh Roesslein's avatar
Josh Roesslein committed
135
136
137
138
139
140
141
142
                # any other exception is fatal, so kill loop
                break

        # cleanup
        self.running = False
        if conn:
            conn.close()

143
        if exception:
144
            raise
145

146
    def _data(self, data):
147
        if self.listener.on_data(data) is False:
148
            self.running = False
149

Josh Roesslein's avatar
Josh Roesslein committed
150
    def _read_loop(self, resp):
151

152
        while self.running and not resp.isclosed():
153
154
155

            # Note: keep-alive newlines might be inserted before each length value.
            # read until we get a digit...
156
157
            c = '\n'
            while c == '\n' and self.running and not resp.isclosed():
158
159
160
161
                c = resp.read(1)
            delimited_string = c

            # read rest of delimiter length..
Steve Jones's avatar
Steve Jones committed
162
            d = ''
163
            while d != '\n' and self.running and not resp.isclosed():
Steve Jones's avatar
Steve Jones committed
164
165
                d = resp.read(1)
                delimited_string += d
166
167

            # read the next twitter status object
168
            if delimited_string.strip().isdigit():
169
170
                next_status_obj = resp.read( int(delimited_string) )
                self._data(next_status_obj)
Josh Roesslein's avatar
Josh Roesslein committed
171

172
173
        if resp.isclosed():
            self.on_closed(resp)
Josh Roesslein's avatar
Josh Roesslein committed
174

175
176
177
178
179
180
    def _start(self, async):
        self.running = True
        if async:
            Thread(target=self._run).start()
        else:
            self._run()
Josh Roesslein's avatar
Josh Roesslein committed
181

182
183
184
185
    def on_closed(self, resp):
        """ Called when the response has been closed by Twitter """
        pass

AlanBell's avatar
AlanBell committed
186
    def userstream(self, count=None, async=False, secure=True):
187
        self.parameters = {'delimited': 'length'}
AlanBell's avatar
AlanBell committed
188
189
        if self.running:
            raise TweepError('Stream object already connected!')
190
        self.url = '/2/user.json?delimited=length'
AlanBell's avatar
AlanBell committed
191
192
        self.host='userstream.twitter.com'
        self._start(async)
193
194

    def firehose(self, count=None, async=False):
195
        self.parameters = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
196
197
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
198
        self.url = '/%s/statuses/firehose.json?delimited=length' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
199
200
        if count:
            self.url += '&count=%s' % count
201
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
202

203
    def retweet(self, async=False):
204
        self.parameters = {'delimited': 'length'}
205
206
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
207
        self.url = '/%s/statuses/retweet.json?delimited=length' % STREAM_VERSION
208
        self._start(async)
209

210
    def sample(self, count=None, async=False):
211
        self.parameters = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
212
213
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
214
        self.url = '/%s/statuses/sample.json?delimited=length' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
215
216
        if count:
            self.url += '&count=%s' % count
217
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
218

219
    def filter(self, follow=None, track=None, async=False, locations=None, 
220
        count = None, stall_warnings=False, languages=None):
221
        self.parameters = {}
222
        self.headers['Content-type'] = "application/x-www-form-urlencoded"
Josh Roesslein's avatar
Josh Roesslein committed
223
224
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
225
        self.url = '/%s/statuses/filter.json?delimited=length' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
226
        if follow:
227
            self.parameters['follow'] = ','.join(map(str, follow))
Josh Roesslein's avatar
Josh Roesslein committed
228
        if track:
229
            self.parameters['track'] = ','.join(map(str, track))
230
231
        if locations and len(locations) > 0:
            assert len(locations) % 4 == 0
232
            self.parameters['locations'] = ','.join(['%.2f' % l for l in locations])
233
234
        if count:
            self.parameters['count'] = count
235
236
        if stall_warnings:
            self.parameters['stall_warnings'] = stall_warnings
237
238
        if languages:
            self.parameters['language'] = ','.join(map(str, languages))
239
        self.body = urlencode_noplus(self.parameters)
240
        self.parameters['delimited'] = 'length'
241
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
242
243

    def disconnect(self):
244
        if self.running is False:
Josh Roesslein's avatar
Josh Roesslein committed
245
246
247
            return
        self.running = False