streaming.py 4.03 KB
Newer Older
1
2
3
4
5
# Tweepy
# Copyright 2009 Joshua Roesslein
# See LICENSE

import httplib
6
from socket import timeout
7
from threading import Thread
8
from time import sleep
9
10

from . auth import BasicAuthHandler
11
12
from . parsers import parse_status
from . api import API
13
from . error import TweepError
14
15
16
17
18
19
20
21

try:
  import json
except ImportError:
  import simplejson as json

class Stream(object):

22
  def __init__(self, username, password, callback, host='stream.twitter.com', timeout=2.0, buffer_size=1500):
23
24
25
    self.host = host
    self.auth = BasicAuthHandler(username, password)
    self.running = False
26
    self.timeout = timeout
27
    self.buffer_size = buffer_size
28
    self.callback = callback
29
    self.api = API()
30
31

  def _run(self):
32
    # setup
33
34
    headers = {}
    self.auth.apply_auth(None, None, headers, None)
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

    # enter loop
    while self.running:
      try:
        conn = httplib.HTTPConnection(self.host, timeout=self.timeout)
        conn.request('POST', self.url, headers=headers)
        resp = conn.getresponse()
        if resp.status != 200:
          # TODO: better handle failures
          sleep(5.0)
          continue
        self._read_loop(resp)
      except timeout:
        conn.close()
        continue
50
51
52
53
      except Exception:
        # any other exception is fatal, so kill loop
        self.running = False
        break
54
55
56
57
58

    # cleanup
    conn.close()

  def _read_loop(self, resp):
59
60
61
62
63
64
65
    data = ''
    while self.running:
      if resp.isclosed():
        break

      # read length
      length = ''
66
      while True:
67
68
69
70
71
72
73
74
75
76
77
78
        c = resp.read(1)
        if c == '\n':
          break
        length += c
      length = length.strip()
      if length.isdigit():
        length = int(length)
      else:
        continue

      # read data
      data = resp.read(length)
79
80
81

      # turn json data into status object
      if 'in_reply_to_status_id' in data:
82
        status = parse_status(data, self.api)
83
84
85
86
        self.callback(status)

      # TODO: we should probably also parse delete/track messages
      # and pass to a callback
87

88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
  def firehose(self, count=None, ):
    if self.running:
      raise TweepError('Stream object already connected!')
    self.url = '/firehose.json?delimited=length'
    if count:
      self.url += '&count=%s' % count
    self.running = True
    Thread(target=self._run).start()

  def gardenhose(self):
    if self.running:
      raise TweepError('Stream object already connected!')
    self.url = '/gardenhose.json?delimited=length'
    self.running = True
    Thread(target=self._run).start()

  def birddog(self, follow, count=None):
    if self.running:
      raise TweepError('Stream object already connected!')
    self.url = '/birddog.json?delimited=length&follow=%s' % str(follow).strip('[]').replace(' ', '')
    if count:
      self.url += '&count=%s' % count
    self.running = True
    Thread(target=self._run).start()

  def shadow(self, follow, count=None):
    if self.running:
      raise TweepError('Stream object already connected!')
    self.url = '/shadow.json?delimited=length&follow=%s' % str(follow).strip('[]').replace(' ', '')
    if count:
      self.url += '&count=%s' % count
    self.running = True
    Thread(target=self._run).start()

122
  def spritzer(self):
123
124
    if self.running:
      raise TweepError('Stream object already connected!')
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
    self.url = '/spritzer.json?delimited=length'
    self.running = True
    Thread(target=self._run).start()

  def follow(self, follow=None):
    if self.running:
      raise TweepError('Stream object already connected!')
    self.url = '/follow.json?delimited=length'
    if follow:
      self.url += '&follow=%s' % str(follow).strip('[]').replace(' ', '')
    self.running = True
    Thread(target=self._run).start()

  def track(self, track=None):
    if self.running:
      raise TweepError('Stream object already connected!')
    self.url = '/track.json?delimited=length'
    if track:
      self.url += '&track=%s' % str(track).strip('[]').replace(' ', '')
144
145
146
147
    self.running = True
    Thread(target=self._run).start()
    
  def disconnect(self):
148
149
    if self.running is False:
      return
150
151
    self.running = False