streaming.py 8.63 KB
Newer Older
1
# Tweepy
2
3
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
4

5
import logging
6
import httplib
7
from socket import timeout
8
from threading import Thread
9
from time import sleep
10

11
from tweepy.models import Status
Josh Roesslein's avatar
Josh Roesslein committed
12
13
from tweepy.api import API
from tweepy.error import TweepError
14

15
from tweepy.utils import import_simplejson, urlencode_noplus
16
json = import_simplejson()
17

Joshua Roesslein's avatar
Joshua Roesslein committed
18
STREAM_VERSION = '1.1'
19

Josh Roesslein's avatar
Josh Roesslein committed
20

21
22
class StreamListener(object):

23
24
    def __init__(self, api=None):
        self.api = api or API()
25

26
27
28
29
30
31
32
33
34
    def on_connect(self):
        """Called once connected to streaming server.

        This will be invoked once a successful response
        is received from the server. Allows the listener
        to perform some work prior to entering the read loop.
        """
        pass

35
    def on_data(self, raw_data):
36
37
38
39
40
        """Called when raw data is received from connection.

        Override this method if you wish to manually handle
        the stream data. Return False to stop stream and close connection.
        """
41
        data = json.loads(raw_data)
42
43

        if 'in_reply_to_status_id' in data:
44
            status = Status.parse(self.api, data)
45
46
47
            if self.on_status(status) is False:
                return False
        elif 'delete' in data:
48
            delete = data['delete']['status']
49
50
51
            if self.on_delete(delete['id'], delete['user_id']) is False:
                return False
        elif 'limit' in data:
52
            if self.on_limit(data['limit']['track']) is False:
53
                return False
54
55
56
57
58
        elif 'disconnect' in data:
            if self.on_disconnect(data['disconnect']) is False:
                return False
        else:
            logging.error("Unknown message type: " + str(raw_data))
59

Josh Roesslein's avatar
Josh Roesslein committed
60
61
62
63
64
65
66
    def on_status(self, status):
        """Called when a new status arrives"""
        return

    def on_delete(self, status_id, user_id):
        """Called when a delete notice arrives for a status"""
        return
67

Josh Roesslein's avatar
Josh Roesslein committed
68
69
70
    def on_limit(self, track):
        """Called when a limitation notice arrvies"""
        return
71

Josh Roesslein's avatar
Josh Roesslein committed
72
73
74
    def on_error(self, status_code):
        """Called when a non-200 status code is returned"""
        return False
75

Josh Roesslein's avatar
Josh Roesslein committed
76
77
78
    def on_timeout(self):
        """Called when stream connection times out"""
        return
79

80
81
82
    def on_disconnect(self, notice):
        """Called when twitter sends a disconnect notice

83
        Disconnect codes are listed here:
84
85
        https://dev.twitter.com/docs/streaming-apis/messages#Disconnect_messages_disconnect
        """
86
        return
87

88

89
90
class Stream(object):

Josh Roesslein's avatar
Josh Roesslein committed
91
92
    host = 'stream.twitter.com'

93
    def __init__(self, auth, listener, **options):
94
        self.auth = auth
Josh Roesslein's avatar
Josh Roesslein committed
95
        self.listener = listener
96
        self.running = False
97
        self.timeout = options.get("timeout", 300.0)
98
        self.retry_count = options.get("retry_count")
99
100
101
        self.retry_time = options.get("retry_time", 10.0)
        self.snooze_time = options.get("snooze_time",  5.0)
        self.buffer_size = options.get("buffer_size",  1500)
102
        if options.get("secure", True):
103
104
105
106
            self.scheme = "https"
        else:
            self.scheme = "http"

Josh Roesslein's avatar
Josh Roesslein committed
107
        self.api = API()
108
        self.headers = options.get("headers") or {}
109
        self.parameters = None
110
        self.body = None
Josh Roesslein's avatar
Josh Roesslein committed
111
112

    def _run(self):
113
        # Authenticate
114
        url = "%s://%s%s" % (self.scheme, self.host, self.url)
Josh Roesslein's avatar
Josh Roesslein committed
115

116
        # Connect and process the stream
Josh Roesslein's avatar
Josh Roesslein committed
117
118
        error_counter = 0
        conn = None
119
        exception = None
Josh Roesslein's avatar
Josh Roesslein committed
120
        while self.running:
121
            if self.retry_count is not None and error_counter > self.retry_count:
Josh Roesslein's avatar
Josh Roesslein committed
122
123
124
                # quit if error count greater than retry count
                break
            try:
125
                if self.scheme == "http":
Timo Ewalds's avatar
Timo Ewalds committed
126
                    conn = httplib.HTTPConnection(self.host, timeout=self.timeout)
127
                else:
Timo Ewalds's avatar
Timo Ewalds committed
128
                    conn = httplib.HTTPSConnection(self.host, timeout=self.timeout)
129
                self.auth.apply_auth(url, 'POST', self.headers, self.parameters)
Josh Roesslein's avatar
Josh Roesslein committed
130
                conn.connect()
131
                conn.request('POST', self.url, self.body, headers=self.headers)
Josh Roesslein's avatar
Josh Roesslein committed
132
133
134
135
136
137
138
139
                resp = conn.getresponse()
                if resp.status != 200:
                    if self.listener.on_error(resp.status) is False:
                        break
                    error_counter += 1
                    sleep(self.retry_time)
                else:
                    error_counter = 0
140
                    self.listener.on_connect()
Josh Roesslein's avatar
Josh Roesslein committed
141
142
143
144
145
146
147
148
                    self._read_loop(resp)
            except timeout:
                if self.listener.on_timeout() == False:
                    break
                if self.running is False:
                    break
                conn.close()
                sleep(self.snooze_time)
149
            except Exception, exception:
Josh Roesslein's avatar
Josh Roesslein committed
150
151
152
153
154
155
156
157
                # any other exception is fatal, so kill loop
                break

        # cleanup
        self.running = False
        if conn:
            conn.close()

158
        if exception:
159
            raise
160

161
    def _data(self, data):
162
        if self.listener.on_data(data) is False:
163
            self.running = False
164

Josh Roesslein's avatar
Josh Roesslein committed
165
    def _read_loop(self, resp):
166

167
        while self.running and not resp.isclosed():
168
169
170

            # Note: keep-alive newlines might be inserted before each length value.
            # read until we get a digit...
171
172
            c = '\n'
            while c == '\n' and self.running and not resp.isclosed():
173
174
175
176
                c = resp.read(1)
            delimited_string = c

            # read rest of delimiter length..
Steve Jones's avatar
Steve Jones committed
177
            d = ''
178
            while d != '\n' and self.running and not resp.isclosed():
Steve Jones's avatar
Steve Jones committed
179
180
                d = resp.read(1)
                delimited_string += d
181
182

            # read the next twitter status object
183
            if delimited_string.strip().isdigit():
184
185
                next_status_obj = resp.read( int(delimited_string) )
                self._data(next_status_obj)
Josh Roesslein's avatar
Josh Roesslein committed
186

187
188
        if resp.isclosed():
            self.on_closed(resp)
Josh Roesslein's avatar
Josh Roesslein committed
189

190
191
192
193
194
195
    def _start(self, async):
        self.running = True
        if async:
            Thread(target=self._run).start()
        else:
            self._run()
Josh Roesslein's avatar
Josh Roesslein committed
196

197
198
199
200
    def on_closed(self, resp):
        """ Called when the response has been closed by Twitter """
        pass

AlanBell's avatar
AlanBell committed
201
    def userstream(self, count=None, async=False, secure=True):
202
        self.parameters = {'delimited': 'length'}
AlanBell's avatar
AlanBell committed
203
204
        if self.running:
            raise TweepError('Stream object already connected!')
205
        self.url = '/2/user.json?delimited=length'
AlanBell's avatar
AlanBell committed
206
207
        self.host='userstream.twitter.com'
        self._start(async)
208
209

    def firehose(self, count=None, async=False):
210
        self.parameters = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
211
212
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
213
        self.url = '/%s/statuses/firehose.json?delimited=length' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
214
215
        if count:
            self.url += '&count=%s' % count
216
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
217

218
    def retweet(self, async=False):
219
        self.parameters = {'delimited': 'length'}
220
221
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
222
        self.url = '/%s/statuses/retweet.json?delimited=length' % STREAM_VERSION
223
        self._start(async)
224

225
    def sample(self, count=None, async=False):
226
        self.parameters = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
227
228
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
229
        self.url = '/%s/statuses/sample.json?delimited=length' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
230
231
        if count:
            self.url += '&count=%s' % count
232
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
233

234
    def filter(self, follow=None, track=None, async=False, locations=None, 
235
        count = None, stall_warnings=False, languages=None):
236
        self.parameters = {}
237
        self.headers['Content-type'] = "application/x-www-form-urlencoded"
Josh Roesslein's avatar
Josh Roesslein committed
238
239
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
240
        self.url = '/%s/statuses/filter.json?delimited=length' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
241
        if follow:
242
            self.parameters['follow'] = ','.join(map(str, follow))
Josh Roesslein's avatar
Josh Roesslein committed
243
        if track:
244
            self.parameters['track'] = ','.join(map(str, track))
245
246
        if locations and len(locations) > 0:
            assert len(locations) % 4 == 0
247
            self.parameters['locations'] = ','.join(['%.2f' % l for l in locations])
248
249
        if count:
            self.parameters['count'] = count
250
251
        if stall_warnings:
            self.parameters['stall_warnings'] = stall_warnings
252
253
        if languages:
            self.parameters['language'] = ','.join(map(str, languages))
254
        self.body = urlencode_noplus(self.parameters)
255
        self.parameters['delimited'] = 'length'
256
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
257
258

    def disconnect(self):
259
        if self.running is False:
Josh Roesslein's avatar
Josh Roesslein committed
260
261
262
            return
        self.running = False