Commit c2a04373 authored by Timo Ewalds's avatar Timo Ewalds
Browse files

Add a read buffer so that tweepy does fewer socket.read calls, which are expensive on GAE.

parent b64527ac
......@@ -2,6 +2,8 @@
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
# Appengine users: https://developers.google.com/appengine/docs/python/sockets/#making_httplib_use_sockets
import logging
import requests
from requests.exceptions import Timeout
......@@ -135,7 +137,13 @@ class Stream(object):
self.retry_time_cap = options.get("retry_time_cap", 320.0)
self.snooze_time_step = options.get("snooze_time", 0.25)
self.snooze_time_cap = options.get("snooze_time_cap", 16)
self.buffer_size = options.get("buffer_size", 1500)
# The default socket.read size. Default to less than half the size of
# a tweet so that it reads tweets with the minimal latency of 2 reads
# per tweet. Values higher than ~1kb will increase latency by waiting
# for more data to arrive but may also increase throughput by doing
# fewer socket read calls.
self.chunk_size = options.get("chunk_size", 512)
self.api = API()
self.session = requests.Session()
......@@ -218,33 +226,63 @@ class Stream(object):
self.running = False
def _read_loop(self, resp):
class ReadBuffer(object):
"""Buffer data from the response in a smarter way than httplib/requests can.
Tweets are roughly in the 2-12kb range, averaging around 3kb.
Requests/urllib3/httplib/socket all use socket.read, which blocks
until enough data is returned. On some systems (eg google appengine), socket
reads are quite slow. To combat this latency we can read big chunks,
but the blocking part means we won't get results until enough tweets
have arrived. That may not be a big deal for high throughput systems.
For low throughput systems we don't want to sacrafice latency, so we
use small chunks so it can read the length and the tweet in 2 read calls.
"""
def __init__(self, resp, chunk_size):
self._resp = resp
self._buffer = ""
self._chunk_size = chunk_size
def read_len(self, length):
while True:
if len(self._buffer) >= length:
return self._pop(length)
read_len = max(self._chunk_size, length - len(self._buffer))
self._buffer += self._resp.raw.read(read_len)
def read_line(self, sep='\n'):
start = 0
while True:
loc = self._buffer.find(sep, start)
if loc >= 0:
return self._pop(loc + len(sep))
else:
start = len(self._buffer)
self._buffer += self._resp.raw.read(self._chunk_size)
def _pop(self, length):
r = self._buffer[:length]
self._buffer = self._buffer[length:]
return r
buf = ReadBuffer(resp, self.chunk_size)
while self.running:
length = 0
while True:
line = buf.read_line().strip()
if not line:
pass # keep-alive new lines are expected
elif line.isdigit():
length = int(line)
break
else:
raise TweepError('Expecting length, unexpected value found')
# Note: keep-alive newlines might be inserted
# before each length value.
# read until we get a digit...
c = '\n'
for c in resp.iter_content():
if c == '\n':
continue
break
delimited_string = c
# read rest of delimiter length..
d = ''
for d in resp.iter_content():
if d != '\n':
delimited_string += d
continue
break
# read the next twitter status object
if delimited_string.strip().isdigit():
next_status_obj = resp.raw.read(int(delimited_string))
if self.running:
self._data(next_status_obj)
next_status_obj = buf.read_len(length)
if self.running:
self._data(next_status_obj)
if resp.raw._fp.isclosed():
self.on_closed(resp)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment