Commit 8effc5a3 authored by Joshua Roesslein's avatar Joshua Roesslein
Browse files

Merge pull request #496 from tewalds/buffer

Add a read buffer so that tweepy does fewer socket.read calls.
parents 05fe3684 1bbb2f7b
from StringIO import StringIO
from time import sleep
import unittest2 as unittest
from tweepy.api import API
from tweepy.auth import OAuthHandler
from tweepy.models import Status
from tweepy.streaming import Stream, StreamListener
from tweepy.streaming import Stream, StreamListener, ReadBuffer
from config import create_auth
from test_utils import mock_tweet
......@@ -102,6 +103,21 @@ class TweepyStreamTests(unittest.TestCase):
# Should be UTF-8 encoded
self.assertEqual(u'Caf\xe9'.encode('utf8'), s.session.params['follow'])
class TweepyStreamReadBuffer(unittest.TestCase):
stream = """11\n{id:12345}\n\n24\n{id:23456, test:"blah"}\n"""
def test_read_tweet(self):
for length in [1, 2, 5, 10, 20, 50]:
buf = ReadBuffer(StringIO(self.stream), length)
self.assertEqual('11\n', buf.read_line())
self.assertEqual('{id:12345}\n', buf.read_len(11))
self.assertEqual('\n', buf.read_line())
self.assertEqual('24\n', buf.read_line())
self.assertEqual('{id:23456, test:"blah"}\n', buf.read_len(24))
class TweepyStreamBackoffTests(unittest.TestCase):
def setUp(self):
#bad auth causes twitter to return 401 errors
......
......@@ -2,6 +2,8 @@
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
# Appengine users: https://developers.google.com/appengine/docs/python/sockets/#making_httplib_use_sockets
import logging
import requests
from requests.exceptions import Timeout
......@@ -117,6 +119,47 @@ class StreamListener(object):
return
class ReadBuffer(object):
"""Buffer data from the response in a smarter way than httplib/requests can.
Tweets are roughly in the 2-12kb range, averaging around 3kb.
Requests/urllib3/httplib/socket all use socket.read, which blocks
until enough data is returned. On some systems (eg google appengine), socket
reads are quite slow. To combat this latency we can read big chunks,
but the blocking part means we won't get results until enough tweets
have arrived. That may not be a big deal for high throughput systems.
For low throughput systems we don't want to sacrafice latency, so we
use small chunks so it can read the length and the tweet in 2 read calls.
"""
def __init__(self, stream, chunk_size):
self._stream = stream
self._buffer = ""
self._chunk_size = chunk_size
def read_len(self, length):
while True:
if len(self._buffer) >= length:
return self._pop(length)
read_len = max(self._chunk_size, length - len(self._buffer))
self._buffer += self._stream.read(read_len)
def read_line(self, sep='\n'):
start = 0
while True:
loc = self._buffer.find(sep, start)
if loc >= 0:
return self._pop(loc + len(sep))
else:
start = len(self._buffer)
self._buffer += self._stream.read(self._chunk_size)
def _pop(self, length):
r = self._buffer[:length]
self._buffer = self._buffer[length:]
return r
class Stream(object):
host = 'stream.twitter.com'
......@@ -134,7 +177,13 @@ class Stream(object):
self.retry_time_cap = options.get("retry_time_cap", 320.0)
self.snooze_time_step = options.get("snooze_time", 0.25)
self.snooze_time_cap = options.get("snooze_time_cap", 16)
self.buffer_size = options.get("buffer_size", 1500)
# The default socket.read size. Default to less than half the size of
# a tweet so that it reads tweets with the minimal latency of 2 reads
# per tweet. Values higher than ~1kb will increase latency by waiting
# for more data to arrive but may also increase throughput by doing
# fewer socket read calls.
self.chunk_size = options.get("chunk_size", 512)
self.api = API()
self.session = requests.Session()
......@@ -217,33 +266,23 @@ class Stream(object):
self.running = False
def _read_loop(self, resp):
buf = ReadBuffer(resp.raw, self.chunk_size)
while self.running:
length = 0
while True:
line = buf.read_line().strip()
if not line:
pass # keep-alive new lines are expected
elif line.isdigit():
length = int(line)
break
else:
raise TweepError('Expecting length, unexpected value found')
# Note: keep-alive newlines might be inserted
# before each length value.
# read until we get a digit...
c = '\n'
for c in resp.iter_content():
if c == '\n':
continue
break
delimited_string = c
# read rest of delimiter length..
d = ''
for d in resp.iter_content():
if d != '\n':
delimited_string += d
continue
break
# read the next twitter status object
if delimited_string.strip().isdigit():
next_status_obj = resp.raw.read(int(delimited_string))
if self.running:
self._data(next_status_obj)
next_status_obj = buf.read_len(length)
if self.running:
self._data(next_status_obj)
if resp.raw._fp.isclosed():
self.on_closed(resp)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment