Commit 7a22e2de authored by Josh Roesslein's avatar Josh Roesslein
Browse files

Implemented binder. Reworking api class to use new binder.

parent 61ff268d
import urllib
import httplib
import base64
from misc import TweepError, require_auth, process_param
from models import Status, User
from binder import bind_api
from parsers import *
Twitter API Interface
"""Twitter API"""
class API(object):
def __init__(self, username=None, password=None, host='',
user_agent='tweepy', secure=False,
user_class=User, status_class=Status):
self._Status = status_class
self._User = user_class
self._Status._User = self._User
self._parameters = None
self._post_data = None
# Setup headers
self._headers = {}
self._headers['User-Agent'] = user_agent
def __init__(self, username=None, password=None):
if username and password:
self._auth = True
self._headers['Authorization'] = \
'Basic ' + base64.encodestring('%s:%s' % (username, password))[:-1]
self._auth = False
if secure:
self._http = httplib.HTTPSConnection(host)
self._http = httplib.HTTPConnection(host)
def public_timeline(self):
return parse_list(self._Status, self._fetch('/statuses/public_timeline.json'))
def friends_timeline(self, **kargs):
if self._parameters:
for k,v in self._parameters.items():
print k,v
#return parse_list(self._Status, self._fetch('/statuses/friends_timeline.json'))
def _fetch(self, url, method='GET'):
# Build the url
if self._parameters:
_url = '%s?%s' % (url, urllib.urlencode(parameters))
_url = url
# Encode post data
post = None
if self._post_data:
post = urllib.encode(post_data)
# Send request
self._http.request(method, _url, body=post, headers=self._headers)
resp = self._http.getresponse()
if resp.status != 200:
raise TweepError(parse_error(
self._b64up = base64.encode('%s:%s' % (username, password))
"""Twitter API endpoint bindings"""
Returns the 20 most recent statuses from non-protected users who have
set a custom icon. The public timeline is cached for 60 seconds
so requesting it more often than that is a waste of resources.
Requires Authentication: false
API Rate limited: true
Response: list of statuses
public_timeline = bind_api(
path = '/statuses/public_timeline.json',
parser = parse_test,
allowed_param = [])
from parsers import parse_error
def bind_api(path, parser, allowed_param=None, method='GET'):
def _call(api, **kargs):
# Filter out unallowed parameters
if len(kargs) == 0:
parameters = None
elif allowed_param:
parameters = dict((k,v) for k,v in kargs.items() if k in allowed_param)
parameters = kargs
# Open connection
conn = httplib.HTTPSConnection(
conn = httplib.HTTPConnection(
# Build url with parameters
if parameters:
url = '%s?%s' % (path, urllib.urlencode(parameters))
url = path
# Assemble headers
headers = {
'User-Agent': 'tweepy'
if api.username and api.b64pass:
headers['Authorization'] = 'Basic %s' % api.b64pass
# Build request
conn.request(method, url, headers=headers)
# Get response
resp = conn.getresponse()
# If an error was returned, throw an exception
if resp.status != 200:
raise TweepError(parse_error(
# Pass returned body into parser and return parser output
return parser(
return _call
......@@ -7,6 +7,10 @@ def parse_error(data):
return json.loads(data)['error']
def parse_test(data):
return data
def _parse_item(type, jsondata):
t = type()
for k,v in jsondata.items():
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment