streaming.py 4.37 KB
Newer Older
1
2
3
4
5
# Tweepy
# Copyright 2009 Joshua Roesslein
# See LICENSE

import httplib
6
from socket import timeout
7
from threading import Thread
8
from time import sleep
9
10

from . auth import BasicAuthHandler
11
12
from . parsers import parse_status
from . api import API
13
from . error import TweepError
14
15
16
17
18
19

try:
  import json
except ImportError:
  import simplejson as json

20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
STREAM_VERSION = 1

class StreamListener(object):

  def on_status(self, status):
    """Called when a new status arrives"""
    return

  def on_delete(self, status_id, user_id):
    """Called when a delete notice arrives for a status"""
    return

  def on_limit(self, track):
    """Called when a limitation notice arrvies"""
    return

  def on_error(self, status_code):
    """Called when a non-200 status code is returned"""
    return False

40
41
class Stream(object):

42
43
  host = 'stream.twitter.com'

44
45
  def __init__(self, username, password, listener, timeout=5.0, retry_count = None, 
                retry_time = 10.0, snooze_time = 5.0, buffer_size=1500):
46
47
    self.auth = BasicAuthHandler(username, password)
    self.running = False
48
    self.timeout = timeout
49
50
51
    self.retry_count = retry_count
    self.retry_time = retry_time
    self.snooze_time = snooze_time
52
    self.buffer_size = buffer_size
53
    self.listener = listener
54
    self.api = API()
55
56

  def _run(self):
57
    # setup
58
59
    headers = {}
    self.auth.apply_auth(None, None, headers, None)
60
61

    # enter loop
62
63
    error_counter = 0
    conn = None
64
    while self.running:
65
      if self.retry_count and error_counter > self.retry_count:
66
67
        # quit if error count greater than retry count
        break
68
      try:
69
70
71
        conn = httplib.HTTPConnection(self.host)
        conn.connect()
        conn.sock.settimeout(self.timeout)
72
73
74
        conn.request('POST', self.url, headers=headers)
        resp = conn.getresponse()
        if resp.status != 200:
75
76
          if self.listener.on_error(resp.status) is False:
            break
77
78
79
80
81
          error_counter += 1
          sleep(self.retry_time)
        else:
          error_counter = 0
          self._read_loop(resp)
82
      except timeout:
83
        print 'timeout!'
84
85
        if self.running is False:
          break
86
        conn.close()
87
        sleep(self.snooze_time)
88
89
90
      except Exception:
        # any other exception is fatal, so kill loop
        break
91
92

    # cleanup
93
94
95
    self.running = False
    if conn:
      conn.close()
96
97

  def _read_loop(self, resp):
98
99
100
101
102
103
104
    data = ''
    while self.running:
      if resp.isclosed():
        break

      # read length
      length = ''
105
      while True:
106
107
108
109
110
111
112
113
114
115
116
117
        c = resp.read(1)
        if c == '\n':
          break
        length += c
      length = length.strip()
      if length.isdigit():
        length = int(length)
      else:
        continue

      # read data
      data = resp.read(length)
118
119
120

      # turn json data into status object
      if 'in_reply_to_status_id' in data:
121
        status = parse_status(data, self.api)
122
123
        if self.listener.on_status(status) == False:
          self.running = False
124
      elif 'delete' in data:
125
        delete = json.loads(data)['delete']['status']
126
127
        if self.listener.on_delete(delete['id'], delete['user_id']) == False:
          self.running = False
128
      elif 'limit' in data:
129
130
        if self.listener.on_limit(json.loads(data)['limit']['track']) == False:
          self.running = False
131

132
133
134
  def firehose(self, count=None, ):
    if self.running:
      raise TweepError('Stream object already connected!')
135
    self.url = '/%i/statuses/firehose.json?delimited=length' % STREAM_VERSION
136
137
138
139
140
    if count:
      self.url += '&count=%s' % count
    self.running = True
    Thread(target=self._run).start()

141
  def sample(self, count=None):
142
143
    if self.running:
      raise TweepError('Stream object already connected!')
144
    self.url = '/%i/statuses/sample.json?delimited=length' % STREAM_VERSION
145
146
147
148
149
    if count:
      self.url += '&count=%s' % count
    self.running = True
    Thread(target=self._run).start()

150
  def filter(self, follow=None, track=None):
151
152
    if self.running:
      raise TweepError('Stream object already connected!')
153
    self.url = '/%i/statuses/filter.json?delimited=length' % STREAM_VERSION
154
    if follow:
155
      self.url += '&follow=%s' % ','.join(follow)
156
    if track:
157
158
      self.url += '&track=%s' % ','.join(track)
    print self.url
159
160
161
162
    self.running = True
    Thread(target=self._run).start()
    
  def disconnect(self):
163
164
    if self.running is False:
      return
165
166
    self.running = False