streaming.py 4.57 KB
Newer Older
1
2
3
4
5
# Tweepy
# Copyright 2009 Joshua Roesslein
# See LICENSE

import httplib
6
from socket import timeout
7
from threading import Thread
8
from time import sleep
9
10

from . auth import BasicAuthHandler
11
12
from . parsers import parse_status
from . api import API
13
from . error import TweepError
14
15
16
17
18
19
20
21

try:
  import json
except ImportError:
  import simplejson as json

class Stream(object):

22
23
24
  host = 'stream.twitter.com'

  def __init__(self, username, password, callback, timeout=2.0, retry_count = 3, 
Josh Roesslein's avatar
Josh Roesslein committed
25
                retry_time = 3.0, snooze_time = 10.0, buffer_size=1500):
26
27
    self.auth = BasicAuthHandler(username, password)
    self.running = False
28
    self.timeout = timeout
29
30
31
    self.retry_count = retry_count
    self.retry_time = retry_time
    self.snooze_time = snooze_time
32
    self.buffer_size = buffer_size
33
    self.callback = callback
34
    self.api = API()
35
36

  def _run(self):
37
    # setup
38
39
    headers = {}
    self.auth.apply_auth(None, None, headers, None)
40
41

    # enter loop
42
43
    error_counter = 0
    conn = None
44
    while self.running:
45
46
47
      if error_counter > self.retry_count:
        # quit if error count greater than retry count
        break
48
      try:
49
50
51
        conn = httplib.HTTPConnection(self.host)
        conn.connect()
        conn.sock.settimeout(self.timeout)
52
53
54
        conn.request('POST', self.url, headers=headers)
        resp = conn.getresponse()
        if resp.status != 200:
55
56
57
58
59
          error_counter += 1
          sleep(self.retry_time)
        else:
          error_counter = 0
          self._read_loop(resp)
60
      except timeout:
61
62
        if self.running is False:
          break
63
        conn.close()
64
        sleep(self.snooze_time)
65
66
67
      except Exception:
        # any other exception is fatal, so kill loop
        break
68
69

    # cleanup
70
71
72
    self.running = False
    if conn:
      conn.close()
73
74

  def _read_loop(self, resp):
75
76
77
78
79
80
81
    data = ''
    while self.running:
      if resp.isclosed():
        break

      # read length
      length = ''
82
      while True:
83
84
85
86
87
88
89
90
91
92
93
94
        c = resp.read(1)
        if c == '\n':
          break
        length += c
      length = length.strip()
      if length.isdigit():
        length = int(length)
      else:
        continue

      # read data
      data = resp.read(length)
95
96
97

      # turn json data into status object
      if 'in_reply_to_status_id' in data:
98
        status = parse_status(data, self.api)
99
100
101
102
103
        self.callback('status', status)
      elif 'delete' in data:
        self.callback('delete', json.loads(data)['delete']['status'])
      elif 'limit' in data:
        self.callback('limit', json.loads(data)['limit'])
104

105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
  def firehose(self, count=None, ):
    if self.running:
      raise TweepError('Stream object already connected!')
    self.url = '/firehose.json?delimited=length'
    if count:
      self.url += '&count=%s' % count
    self.running = True
    Thread(target=self._run).start()

  def gardenhose(self):
    if self.running:
      raise TweepError('Stream object already connected!')
    self.url = '/gardenhose.json?delimited=length'
    self.running = True
    Thread(target=self._run).start()

  def birddog(self, follow, count=None):
    if self.running:
      raise TweepError('Stream object already connected!')
    self.url = '/birddog.json?delimited=length&follow=%s' % str(follow).strip('[]').replace(' ', '')
    if count:
      self.url += '&count=%s' % count
    self.running = True
    Thread(target=self._run).start()

  def shadow(self, follow, count=None):
    if self.running:
      raise TweepError('Stream object already connected!')
    self.url = '/shadow.json?delimited=length&follow=%s' % str(follow).strip('[]').replace(' ', '')
    if count:
      self.url += '&count=%s' % count
    self.running = True
    Thread(target=self._run).start()

139
  def spritzer(self):
140
141
    if self.running:
      raise TweepError('Stream object already connected!')
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
    self.url = '/spritzer.json?delimited=length'
    self.running = True
    Thread(target=self._run).start()

  def follow(self, follow=None):
    if self.running:
      raise TweepError('Stream object already connected!')
    self.url = '/follow.json?delimited=length'
    if follow:
      self.url += '&follow=%s' % str(follow).strip('[]').replace(' ', '')
    self.running = True
    Thread(target=self._run).start()

  def track(self, track=None):
    if self.running:
      raise TweepError('Stream object already connected!')
    self.url = '/track.json?delimited=length'
    if track:
      self.url += '&track=%s' % str(track).strip('[]').replace(' ', '')
161
162
163
164
    self.running = True
    Thread(target=self._run).start()
    
  def disconnect(self):
165
166
    if self.running is False:
      return
167
168
    self.running = False