auth.py 3.07 KB
Newer Older
1
2
3
4
5
# Tweepy
# Copyright 2009 Joshua Roesslein
# See LICENSE

from urllib2 import Request, urlopen
6
from urllib import quote
7
8
9
10
11
12
13
import base64

import oauth
from error import TweepError

class AuthHandler(object):

14
  def apply_auth(self, url, method, headers, parameters):
15
16
17
18
19
20
21
22
    """Apply authentication headers to request"""
    raise NotImplemented

class BasicAuthHandler(AuthHandler):

  def __init__(self, username, password):
    self._b64up = base64.b64encode('%s:%s' % (username, password))

23
  def apply_auth(self, url, method, headers, parameters):
24
25
26
27
28
29
30
31
32
    headers['Authorization'] = 'Basic %s' % self._b64up

"""OAuth authentication handler"""
class OAuthHandler(AuthHandler):

  REQUEST_TOKEN_URL = 'http://twitter.com/oauth/request_token'
  AUTHORIZATION_URL = 'http://twitter.com/oauth/authorize'
  ACCESS_TOKEN_URL = 'http://twitter.com/oauth/access_token'

33
  def __init__(self, consumer_key, consumer_secrete, callback=None):
34
35
36
37
    self._consumer = oauth.OAuthConsumer(consumer_key, consumer_secrete)
    self._sigmethod = oauth.OAuthSignatureMethod_HMAC_SHA1()
    self.request_token = None
    self.access_token = None
38
    self.callback = callback
39

40
41
42
43
44
45
  def apply_auth(self, url, method, headers, parameters):
    request = oauth.OAuthRequest.from_consumer_and_token(self._consumer,
        http_url=url, http_method=method, token=self.access_token, parameters=parameters)
    request.sign_request(self._sigmethod, self._consumer, self.access_token)
    headers.update(request.to_header())

46
47
48
  def _get_request_token(self):
    try:
      request = oauth.OAuthRequest.from_consumer_and_token(self._consumer, http_url = self.REQUEST_TOKEN_URL)
49
50
      if self.callback:
        request.set_parameter('oauth_callback', self.callback)
51
52
53
54
55
56
57
      request.sign_request(self._sigmethod, self._consumer, None)
      resp = urlopen(Request(self.REQUEST_TOKEN_URL, headers=request.to_header()))
      return oauth.OAuthToken.from_string(resp.read())

    except Exception, e:
      raise TweepError(e)

58
59
  def get_authorization_url(self):
    """Get the authorization URL to redirect the user"""
60
61
62
63
64
65
    try:
      # get the request token
      self.request_token = self._get_request_token()

      # build auth request and return as url
      request = oauth.OAuthRequest.from_token_and_callback(
66
          token=self.request_token, http_url=self.AUTHORIZATION_URL)
67
68
69
70
71
      return request.to_url()

    except Exception, e:
      raise TweepError(e)

72
  def get_access_token(self, verifier):
73
    """After user has authorized the request token, get access token with user supplied verifier."""
74
75
76
77
    try:
      # build request
      request = oauth.OAuthRequest.from_consumer_and_token(self._consumer,
          token=self.request_token, http_url=self.ACCESS_TOKEN_URL)
78
      request.set_parameter('oauth_verifier', str(verifier))
79
80
81
82
83
      request.sign_request(self._sigmethod, self._consumer, self.request_token)

      # send request
      resp = urlopen(Request(self.ACCESS_TOKEN_URL, headers=request.to_header()))
      self.access_token = oauth.OAuthToken.from_string(resp.read())
84
      return self.access_token
85
86
87
88
89
    except Exception, e:
      raise TweepError(e)