Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Zahra Rajabi
tweepy
Commits
416d0d68
Commit
416d0d68
authored
Aug 16, 2013
by
Aaron Hill
Browse files
Use Requests's OAuth
parent
3acb3b6c
Changes
4
Expand all
Hide whitespace changes
Inline
Side-by-side
tweepy/auth.py
View file @
416d0d68
# Tweepy
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
#Embedded file name: /home/aaron/repos/tweepy/tweepy/auth.py
from
urllib2
import
Request
,
urlopen
import
urllib
import
base64
...
...
@@ -10,7 +7,9 @@ import json
from
tweepy
import
oauth
from
tweepy.error
import
TweepError
from
tweepy.api
import
API
import
requests
from
requests_oauthlib
import
OAuth1Session
,
OAuth1
from
requests.auth
import
AuthBase
class
AuthHandler
(
object
):
...
...
@@ -25,7 +24,6 @@ class AuthHandler(object):
class
OAuthHandler
(
AuthHandler
):
"""OAuth authentication handler"""
OAUTH_HOST
=
'api.twitter.com'
OAUTH_ROOT
=
'/oauth/'
...
...
@@ -43,77 +41,62 @@ class OAuthHandler(AuthHandler):
self
.
callback
=
callback
self
.
username
=
None
self
.
secure
=
secure
self
.
oauth
=
OAuth1Session
(
consumer_key
,
client_secret
=
consumer_secret
,
callback_uri
=
self
.
callback
)
def
_get_oauth_url
(
self
,
endpoint
,
secure
=
True
):
if
self
.
secure
or
secure
:
prefix
=
'https://'
else
:
prefix
=
'http://'
return
prefix
+
self
.
OAUTH_HOST
+
self
.
OAUTH_ROOT
+
endpoint
def
apply_auth
(
self
,
url
,
method
,
headers
,
parameters
):
request
=
oauth
.
OAuthRequest
.
from_consumer_and_token
(
self
.
_consumer
,
http_url
=
url
,
http_method
=
method
,
token
=
self
.
access_token
,
parameters
=
parameters
)
request
.
sign_request
(
self
.
_sigmethod
,
self
.
_consumer
,
self
.
access_token
)
headers
.
update
(
request
.
to_header
())
def
apply_auth
(
self
):
return
OAuth1
(
self
.
consumer_key
,
client_secret
=
self
.
consumer_secret
,
resource_owner_key
=
self
.
access_token
,
resource_owner_secret
=
self
.
access_token_secret
)
def
_get_request_token
(
self
):
try
:
url
=
self
.
_get_oauth_url
(
'request_token'
)
request
=
oauth
.
OAuthRequest
.
from_consumer_and_token
(
self
.
_consumer
,
http_url
=
url
,
callback
=
self
.
callback
)
return
self
.
oauth
.
fetch_request_token
(
url
)
request
=
oauth
.
OAuthRequest
.
from_consumer_and_token
(
self
.
_consumer
,
http_url
=
url
,
callback
=
self
.
callback
)
request
.
sign_request
(
self
.
_sigmethod
,
self
.
_consumer
,
None
)
resp
=
urlopen
(
Request
(
url
,
headers
=
request
.
to_header
()))
return
oauth
.
OAuthToken
.
from_string
(
resp
.
read
())
except
Exception
as
e
:
raise
TweepError
(
e
)
def
set_request_token
(
self
,
key
,
secret
):
self
.
request_token
=
oauth
.
OAuthToken
(
key
,
secret
)
def
set_access_token
(
self
,
key
,
secret
):
self
.
access_token
=
oauth
.
OAuthToken
(
key
,
secret
)
self
.
access_token
=
key
self
.
access_token_secret
=
secret
def
get_authorization_url
(
self
,
signin_with_twitter
=
False
):
def
get_authorization_url
(
self
,
signin_with_twitter
=
False
):
"""Get the authorization URL to redirect the user"""
try
:
# get the request token
self
.
request_token
=
self
.
_get_request_token
()
# build auth request and return as url
if
signin_with_twitter
:
url
=
self
.
_get_oauth_url
(
'authenticate'
)
else
:
url
=
self
.
_get_oauth_url
(
'authorize'
)
request
=
oauth
.
OAuthR
equest
.
from
_token
_and_callback
(
token
=
self
.
request_token
,
http
_url
=
url
)
self
.
request
_token
=
self
.
_get_r
equest_token
(
)
return
self
.
oauth
.
authorization
_url
(
url
)
token
=
oauth
.
fetch_request_token
(
url
)
request
=
oauth
.
OAuthRequest
.
from_token_and_callback
(
token
=
self
.
request_token
,
http_url
=
url
)
return
request
.
to_url
()
except
Exception
as
e
:
raise
TweepError
(
e
)
def
get_access_token
(
self
,
verifier
=
None
):
def
get_access_token
(
self
,
verifier
=
None
):
"""
After user has authorized the request token, get access token
with user supplied verifier.
"""
try
:
url
=
self
.
_get_oauth_url
(
'access_token'
)
# build request
request
=
oauth
.
OAuthRequest
.
from_consumer_and_token
(
self
.
_consumer
,
token
=
self
.
request_token
,
http_url
=
url
,
verifier
=
str
(
verifier
)
)
self
.
oauth
=
OAuth1Session
(
self
.
consumer_key
,
client_secret
=
self
.
consumer_secret
,
resource_owner_key
=
self
.
request_token
[
'oauth_token'
],
resource_owner_secret
=
self
.
request_token
[
'oauth_token_secret'
],
verifier
=
verifier
,
callback_uri
=
self
.
callback
)
resp
=
self
.
oauth
.
fetch_access_token
(
url
)
self
.
access_token
=
resp
[
'oauth_token'
]
self
.
access_token_secret
=
resp
[
'oauth_token_secret'
]
return
(
self
.
access_token
,
self
.
access_token_secret
)
request
=
oauth
.
OAuthRequest
.
from_consumer_and_token
(
self
.
_consumer
,
token
=
self
.
request_token
,
http_url
=
url
,
verifier
=
str
(
verifier
))
request
.
sign_request
(
self
.
_sigmethod
,
self
.
_consumer
,
self
.
request_token
)
# send request
resp
=
urlopen
(
Request
(
url
,
headers
=
request
.
to_header
()))
self
.
access_token
=
oauth
.
OAuthToken
.
from_string
(
resp
.
read
())
return
self
.
access_token
...
...
@@ -128,18 +111,11 @@ class OAuthHandler(AuthHandler):
and request activation of xAuth for it.
"""
try
:
url
=
self
.
_get_oauth_url
(
'access_token'
,
secure
=
True
)
# must use HTTPS
request
=
oauth
.
OAuthRequest
.
from_consumer_and_token
(
oauth_consumer
=
self
.
_consumer
,
http_method
=
'POST'
,
http_url
=
url
,
parameters
=
{
'x_auth_mode'
:
'client_auth'
,
'x_auth_username'
:
username
,
'x_auth_password'
:
password
}
)
url
=
self
.
_get_oauth_url
(
'access_token'
,
secure
=
True
)
request
=
oauth
.
OAuthRequest
.
from_consumer_and_token
(
oauth_consumer
=
self
.
_consumer
,
http_method
=
'POST'
,
http_url
=
url
,
parameters
=
{
'x_auth_mode'
:
'client_auth'
,
'x_auth_username'
:
username
,
'x_auth_password'
:
password
})
request
.
sign_request
(
self
.
_sigmethod
,
self
.
_consumer
,
None
)
resp
=
urlopen
(
Request
(
url
,
data
=
request
.
to_postdata
()))
self
.
access_token
=
oauth
.
OAuthToken
.
from_string
(
resp
.
read
())
return
self
.
access_token
...
...
@@ -153,10 +129,19 @@ class OAuthHandler(AuthHandler):
if
user
:
self
.
username
=
user
.
screen_name
else
:
raise
TweepError
(
"
Unable to get username, invalid oauth token!
"
)
raise
TweepError
(
'
Unable to get username, invalid oauth token!
'
)
return
self
.
username
class
OAuth2Bearer
(
AuthBase
):
def
__init__
(
self
,
url
,
bearer_token
):
self
.
url
=
url
self
.
bearer_token
=
bearer_token
def
__call__
(
self
,
request
):
request
.
headers
[
'Authorization'
]
=
'Bearer '
+
self
.
bearer_token
return
request
class
AppAuthHandler
(
AuthHandler
):
"""Application-only authentication handler"""
...
...
@@ -166,19 +151,17 @@ class AppAuthHandler(AuthHandler):
def
__init__
(
self
,
consumer_key
,
consumer_secret
,
callback
=
None
,
secure
=
True
):
self
.
callback
=
callback
self
.
secure
=
secure
self
.
_bearer_token
=
''
token_credential
=
urllib
.
quote
(
consumer_key
)
+
':'
+
urllib
.
quote
(
consumer_secret
)
credential
=
base64
.
b64encode
(
token_credential
)
resp
=
requests
.
post
(
self
.
url
,
auth
=
(
self
.
consumer_key
,
self
.
consumer_secret
),
data
=
{
'grant_type'
:
'client_credentials'
})
data
=
resp
.
json
()
if
data
.
get
(
'token_type'
)
!=
'bearer'
:
raise
TweepError
(
'Expected token_type to equal "bearer", but got %s
\
instead'
%
data
.
get
(
'token_type'
))
value
=
{
'grant_type'
:
'client_credentials'
}
data
=
urllib
.
urlencode
(
value
)
req
=
Request
(
self
.
_get_oauth_url
(
'token'
))
req
.
add_header
(
'Authorization'
,
'Basic '
+
credential
)
req
.
add_header
(
'Content-Type'
,
'application/x-www-form-urlencoded;charset=UTF-8'
)
response
=
urlopen
(
req
,
data
)
json_response
=
json
.
loads
(
response
.
read
())
self
.
_access_token
=
json_response
[
'access_token'
]
self
.
_bearer_token
=
json_response
[
'access_token'
]
def
_get_oauth_url
(
self
,
endpoint
,
secure
=
True
):
...
...
@@ -190,5 +173,5 @@ class AppAuthHandler(AuthHandler):
return
prefix
+
self
.
OAUTH_HOST
+
self
.
OAUTH_ROOT
+
endpoint
def
apply_auth
(
self
,
url
,
method
,
headers
,
parameters
):
h
ea
d
er
s
[
'Authorization'
]
=
'Bearer '
+
self
.
_
access
_token
def
apply_auth
(
self
):
return
OAuth2B
ea
r
er
(
self
.
_get_oauth_url
(
'token'
),
self
.
_
bearer
_token
)
tweepy/binder.py
View file @
416d0d68
...
...
@@ -149,10 +149,7 @@ def bind_api(**config):
# Apply authentication
if
self
.
api
.
auth
:
self
.
api
.
auth
.
apply_auth
(
full_url
,
self
.
method
,
self
.
session
.
headers
,
self
.
session
.
params
)
auth
=
self
.
api
.
auth
.
apply_auth
()
# Request compression if configured
if
self
.
api
.
compression
:
...
...
@@ -160,7 +157,9 @@ def bind_api(**config):
# Execute request
try
:
resp
=
self
.
session
.
request
(
self
.
method
,
full_url
,
data
=
self
.
post_data
,
timeout
=
self
.
api
.
timeout
)
resp
=
self
.
session
.
request
(
self
.
method
,
full_url
,
data
=
self
.
post_data
,
timeout
=
self
.
api
.
timeout
,
auth
=
auth
)
except
Exception
,
e
:
raise
TweepError
(
'Failed to send request: %s'
%
e
)
rem_calls
=
resp
.
getheader
(
'x-rate-limit-remaining'
)
...
...
tweepy/oauth.py
deleted
100644 → 0
View file @
3acb3b6c
This diff is collapsed.
Click to expand it.
tweepy/streaming.py
View file @
416d0d68
...
...
@@ -150,8 +150,9 @@ class Stream(object):
# quit if error count greater than retry count
break
try
:
self
.
auth
.
apply_auth
(
url
,
'POST'
,
self
.
session
.
headers
,
self
.
session
.
params
)
resp
=
self
.
session
.
request
(
'POST'
,
url
,
data
=
self
.
body
,
timeout
=
self
.
timeout
,
stream
=
True
)
auth
=
self
.
auth
.
apply_auth
()
resp
=
self
.
session
.
request
(
'POST'
,
url
,
data
=
self
.
body
,
timeout
=
self
.
timeout
,
stream
=
True
,
auth
=
auth
)
if
resp
.
status_code
!=
200
:
if
self
.
listener
.
on_error
(
resp
.
status_code
)
is
False
:
break
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment