streaming.py 4.49 KB
Newer Older
1
2
3
4
5
# Tweepy
# Copyright 2009 Joshua Roesslein
# See LICENSE

import httplib
6
from socket import timeout
7
from threading import Thread
8
from time import sleep
9
10

from . auth import BasicAuthHandler
11
12
from . parsers import parse_status
from . api import API
13
from . error import TweepError
14
15
16
17
18
19

try:
  import json
except ImportError:
  import simplejson as json

20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
STREAM_VERSION = 1

class StreamListener(object):

  def on_status(self, status):
    """Called when a new status arrives"""
    return

  def on_delete(self, status_id, user_id):
    """Called when a delete notice arrives for a status"""
    return

  def on_limit(self, track):
    """Called when a limitation notice arrvies"""
    return

  def on_error(self, status_code):
    """Called when a non-200 status code is returned"""
    return False

40
41
42
43
  def on_timeout(self):
    """Called when stream connection times out"""
    return 

44
45
class Stream(object):

46
47
  host = 'stream.twitter.com'

48
49
  def __init__(self, username, password, listener, timeout=5.0, retry_count = None, 
                retry_time = 10.0, snooze_time = 5.0, buffer_size=1500):
50
51
    self.auth = BasicAuthHandler(username, password)
    self.running = False
52
    self.timeout = timeout
53
54
55
    self.retry_count = retry_count
    self.retry_time = retry_time
    self.snooze_time = snooze_time
56
    self.buffer_size = buffer_size
57
    self.listener = listener
58
    self.api = API()
59
60

  def _run(self):
61
    # setup
62
63
    headers = {}
    self.auth.apply_auth(None, None, headers, None)
64
65

    # enter loop
66
67
    error_counter = 0
    conn = None
68
    while self.running:
69
      if self.retry_count and error_counter > self.retry_count:
70
71
        # quit if error count greater than retry count
        break
72
      try:
73
74
75
        conn = httplib.HTTPConnection(self.host)
        conn.connect()
        conn.sock.settimeout(self.timeout)
76
77
78
        conn.request('POST', self.url, headers=headers)
        resp = conn.getresponse()
        if resp.status != 200:
79
80
          if self.listener.on_error(resp.status) is False:
            break
81
82
83
84
85
          error_counter += 1
          sleep(self.retry_time)
        else:
          error_counter = 0
          self._read_loop(resp)
86
      except timeout:
87
88
        if self.listener.on_timeout() == False:
          break
89
90
        if self.running is False:
          break
91
        conn.close()
92
        sleep(self.snooze_time)
93
94
95
      except Exception:
        # any other exception is fatal, so kill loop
        break
96
97

    # cleanup
98
99
100
    self.running = False
    if conn:
      conn.close()
101
102

  def _read_loop(self, resp):
103
104
105
106
107
108
109
    data = ''
    while self.running:
      if resp.isclosed():
        break

      # read length
      length = ''
110
      while True:
111
112
113
114
115
116
117
118
119
120
121
122
        c = resp.read(1)
        if c == '\n':
          break
        length += c
      length = length.strip()
      if length.isdigit():
        length = int(length)
      else:
        continue

      # read data
      data = resp.read(length)
123
124
125

      # turn json data into status object
      if 'in_reply_to_status_id' in data:
126
        status = parse_status(data, self.api)
127
128
        if self.listener.on_status(status) == False:
          self.running = False
129
      elif 'delete' in data:
130
        delete = json.loads(data)['delete']['status']
131
132
        if self.listener.on_delete(delete['id'], delete['user_id']) == False:
          self.running = False
133
      elif 'limit' in data:
134
135
        if self.listener.on_limit(json.loads(data)['limit']['track']) == False:
          self.running = False
136

137
138
139
  def firehose(self, count=None, ):
    if self.running:
      raise TweepError('Stream object already connected!')
140
    self.url = '/%i/statuses/firehose.json?delimited=length' % STREAM_VERSION
141
142
143
144
145
    if count:
      self.url += '&count=%s' % count
    self.running = True
    Thread(target=self._run).start()

146
  def sample(self, count=None):
147
148
    if self.running:
      raise TweepError('Stream object already connected!')
149
    self.url = '/%i/statuses/sample.json?delimited=length' % STREAM_VERSION
150
151
152
153
154
    if count:
      self.url += '&count=%s' % count
    self.running = True
    Thread(target=self._run).start()

155
  def filter(self, follow=None, track=None):
156
157
    if self.running:
      raise TweepError('Stream object already connected!')
158
    self.url = '/%i/statuses/filter.json?delimited=length' % STREAM_VERSION
159
    if follow:
160
      self.url += '&follow=%s' % ','.join(follow)
161
    if track:
162
163
      self.url += '&track=%s' % ','.join(track)
    print self.url
164
165
166
167
    self.running = True
    Thread(target=self._run).start()
    
  def disconnect(self):
168
169
    if self.running is False:
      return
170
171
    self.running = False