streaming.py 9.19 KB
Newer Older
1
# Tweepy
2
3
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
4

5
import logging
6
import httplib
7
from socket import timeout
8
from threading import Thread
9
from time import sleep
10

11
from tweepy.models import Status
Josh Roesslein's avatar
Josh Roesslein committed
12
13
from tweepy.api import API
from tweepy.error import TweepError
14

15
from tweepy.utils import import_simplejson, urlencode_noplus
16
json = import_simplejson()
17

Joshua Roesslein's avatar
Joshua Roesslein committed
18
STREAM_VERSION = '1.1'
19

Josh Roesslein's avatar
Josh Roesslein committed
20

21
22
class StreamListener(object):

23
24
    def __init__(self, api=None):
        self.api = api or API()
25

26
27
28
29
30
31
32
33
34
    def on_connect(self):
        """Called once connected to streaming server.

        This will be invoked once a successful response
        is received from the server. Allows the listener
        to perform some work prior to entering the read loop.
        """
        pass

35
    def on_data(self, raw_data):
36
37
38
39
40
        """Called when raw data is received from connection.

        Override this method if you wish to manually handle
        the stream data. Return False to stop stream and close connection.
        """
41
        data = json.loads(raw_data)
42
43

        if 'in_reply_to_status_id' in data:
44
            status = Status.parse(self.api, data)
45
46
47
            if self.on_status(status) is False:
                return False
        elif 'delete' in data:
48
            delete = data['delete']['status']
49
50
51
            if self.on_delete(delete['id'], delete['user_id']) is False:
                return False
        elif 'limit' in data:
52
            if self.on_limit(data['limit']['track']) is False:
53
                return False
54
55
56
57
58
        elif 'disconnect' in data:
            if self.on_disconnect(data['disconnect']) is False:
                return False
        else:
            logging.error("Unknown message type: " + str(raw_data))
59

Josh Roesslein's avatar
Josh Roesslein committed
60
61
62
63
64
65
66
    def on_status(self, status):
        """Called when a new status arrives"""
        return

    def on_delete(self, status_id, user_id):
        """Called when a delete notice arrives for a status"""
        return
67

Josh Roesslein's avatar
Josh Roesslein committed
68
69
70
    def on_limit(self, track):
        """Called when a limitation notice arrvies"""
        return
71

Josh Roesslein's avatar
Josh Roesslein committed
72
73
74
    def on_error(self, status_code):
        """Called when a non-200 status code is returned"""
        return False
75

Josh Roesslein's avatar
Josh Roesslein committed
76
77
78
    def on_timeout(self):
        """Called when stream connection times out"""
        return
79

80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
    def on_disconnect(self, notice):
        """Called when twitter sends a disconnect notice

        All disconnect codes are listed here:
        https://dev.twitter.com/docs/streaming-apis/messages#Disconnect_messages_disconnect
        """
        if notice['code'] in (
            2, # duplicate stream
            3, # control request, shut down by control stream
            5, # shut down by this client
            6, # token revoked, auth will fail next time
            7, # admin logout, connected elsewhere
            9, # max message limit
            ):
            logging.info("Disconnect notice code %s received, disconnecting", notice['code'])
            return False
        else: # reconnect
            logging.info("Disconnect notice code %s received, reconnecting", notice['code'])
            return

100

101
102
class Stream(object):

Josh Roesslein's avatar
Josh Roesslein committed
103
104
    host = 'stream.twitter.com'

105
    def __init__(self, auth, listener, **options):
106
        self.auth = auth
Josh Roesslein's avatar
Josh Roesslein committed
107
        self.listener = listener
108
        self.running = False
109
        self.timeout = options.get("timeout", 300.0)
110
        self.retry_count = options.get("retry_count")
111
112
113
        self.retry_time = options.get("retry_time", 10.0)
        self.snooze_time = options.get("snooze_time",  5.0)
        self.buffer_size = options.get("buffer_size",  1500)
114
        if options.get("secure", True):
115
116
117
118
            self.scheme = "https"
        else:
            self.scheme = "http"

Josh Roesslein's avatar
Josh Roesslein committed
119
        self.api = API()
120
        self.headers = options.get("headers") or {}
121
        self.parameters = None
122
        self.body = None
Josh Roesslein's avatar
Josh Roesslein committed
123
124

    def _run(self):
125
        # Authenticate
126
        url = "%s://%s%s" % (self.scheme, self.host, self.url)
Josh Roesslein's avatar
Josh Roesslein committed
127

128
        # Connect and process the stream
Josh Roesslein's avatar
Josh Roesslein committed
129
130
        error_counter = 0
        conn = None
131
        exception = None
Josh Roesslein's avatar
Josh Roesslein committed
132
        while self.running:
133
            if self.retry_count is not None and error_counter > self.retry_count:
Josh Roesslein's avatar
Josh Roesslein committed
134
135
136
                # quit if error count greater than retry count
                break
            try:
137
                if self.scheme == "http":
Timo Ewalds's avatar
Timo Ewalds committed
138
                    conn = httplib.HTTPConnection(self.host, timeout=self.timeout)
139
                else:
Timo Ewalds's avatar
Timo Ewalds committed
140
                    conn = httplib.HTTPSConnection(self.host, timeout=self.timeout)
141
                self.auth.apply_auth(url, 'POST', self.headers, self.parameters)
Josh Roesslein's avatar
Josh Roesslein committed
142
                conn.connect()
143
                conn.request('POST', self.url, self.body, headers=self.headers)
Josh Roesslein's avatar
Josh Roesslein committed
144
145
146
147
148
149
150
151
                resp = conn.getresponse()
                if resp.status != 200:
                    if self.listener.on_error(resp.status) is False:
                        break
                    error_counter += 1
                    sleep(self.retry_time)
                else:
                    error_counter = 0
152
                    self.listener.on_connect()
Josh Roesslein's avatar
Josh Roesslein committed
153
154
155
156
157
158
159
160
                    self._read_loop(resp)
            except timeout:
                if self.listener.on_timeout() == False:
                    break
                if self.running is False:
                    break
                conn.close()
                sleep(self.snooze_time)
161
            except Exception, exception:
Josh Roesslein's avatar
Josh Roesslein committed
162
163
164
165
166
167
168
169
                # any other exception is fatal, so kill loop
                break

        # cleanup
        self.running = False
        if conn:
            conn.close()

170
        if exception:
171
            raise
172

173
    def _data(self, data):
174
        if self.listener.on_data(data) is False:
175
            self.running = False
176

Josh Roesslein's avatar
Josh Roesslein committed
177
    def _read_loop(self, resp):
178

179
        while self.running and not resp.isclosed():
180
181
182

            # Note: keep-alive newlines might be inserted before each length value.
            # read until we get a digit...
183
184
            c = '\n'
            while c == '\n' and self.running and not resp.isclosed():
185
186
187
188
                c = resp.read(1)
            delimited_string = c

            # read rest of delimiter length..
Steve Jones's avatar
Steve Jones committed
189
            d = ''
190
            while d != '\n' and self.running and not resp.isclosed():
Steve Jones's avatar
Steve Jones committed
191
192
                d = resp.read(1)
                delimited_string += d
193
194

            # read the next twitter status object
195
            if delimited_string.strip().isdigit():
196
197
                next_status_obj = resp.read( int(delimited_string) )
                self._data(next_status_obj)
Josh Roesslein's avatar
Josh Roesslein committed
198

199
200
        if resp.isclosed():
            self.on_closed(resp)
Josh Roesslein's avatar
Josh Roesslein committed
201

202
203
204
205
206
207
    def _start(self, async):
        self.running = True
        if async:
            Thread(target=self._run).start()
        else:
            self._run()
Josh Roesslein's avatar
Josh Roesslein committed
208

209
210
211
212
    def on_closed(self, resp):
        """ Called when the response has been closed by Twitter """
        pass

AlanBell's avatar
AlanBell committed
213
    def userstream(self, count=None, async=False, secure=True):
214
        self.parameters = {'delimited': 'length'}
AlanBell's avatar
AlanBell committed
215
216
        if self.running:
            raise TweepError('Stream object already connected!')
217
        self.url = '/2/user.json?delimited=length'
AlanBell's avatar
AlanBell committed
218
219
        self.host='userstream.twitter.com'
        self._start(async)
220
221

    def firehose(self, count=None, async=False):
222
        self.parameters = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
223
224
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
225
        self.url = '/%s/statuses/firehose.json?delimited=length' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
226
227
        if count:
            self.url += '&count=%s' % count
228
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
229

230
    def retweet(self, async=False):
231
        self.parameters = {'delimited': 'length'}
232
233
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
234
        self.url = '/%s/statuses/retweet.json?delimited=length' % STREAM_VERSION
235
        self._start(async)
236

237
    def sample(self, count=None, async=False):
238
        self.parameters = {'delimited': 'length'}
Josh Roesslein's avatar
Josh Roesslein committed
239
240
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
241
        self.url = '/%s/statuses/sample.json?delimited=length' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
242
243
        if count:
            self.url += '&count=%s' % count
244
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
245

246
    def filter(self, follow=None, track=None, async=False, locations=None, 
247
        count = None, stall_warnings=False, languages=None):
248
        self.parameters = {}
249
        self.headers['Content-type'] = "application/x-www-form-urlencoded"
Josh Roesslein's avatar
Josh Roesslein committed
250
251
        if self.running:
            raise TweepError('Stream object already connected!')
Joshua Roesslein's avatar
Joshua Roesslein committed
252
        self.url = '/%s/statuses/filter.json?delimited=length' % STREAM_VERSION
Josh Roesslein's avatar
Josh Roesslein committed
253
        if follow:
254
            self.parameters['follow'] = ','.join(map(str, follow))
Josh Roesslein's avatar
Josh Roesslein committed
255
        if track:
256
            self.parameters['track'] = ','.join(map(str, track))
257
258
        if locations and len(locations) > 0:
            assert len(locations) % 4 == 0
259
            self.parameters['locations'] = ','.join(['%.2f' % l for l in locations])
260
261
        if count:
            self.parameters['count'] = count
262
263
        if stall_warnings:
            self.parameters['stall_warnings'] = stall_warnings
264
265
        if languages:
            self.parameters['language'] = ','.join(map(str, languages))
266
        self.body = urlencode_noplus(self.parameters)
267
        self.parameters['delimited'] = 'length'
268
        self._start(async)
Josh Roesslein's avatar
Josh Roesslein committed
269
270

    def disconnect(self):
271
        if self.running is False:
Josh Roesslein's avatar
Josh Roesslein committed
272
273
274
            return
        self.running = False