Commit 247b02f6 authored by Joshua Roesslein's avatar Joshua Roesslein
Browse files

Add streaming test suite.

 - test_userstream
parent fef0f30c
---
script: nosetests -v tests.test_api
script: nosetests -v tests.test_api tests.test_streaming
language: python
env:
global:
......
import os
from tweepy.auth import OAuthHandler
username = os.environ.get('TWITTER_USERNAME', '')
oauth_consumer_key = os.environ.get('CONSUMER_KEY', '')
oauth_consumer_secret = os.environ.get('CONSUMER_SECRET', '')
oauth_token = os.environ.get('ACCESS_KEY', '')
oauth_token_secret = os.environ.get('ACCESS_SECRET', '')
def create_auth():
auth = OAuthHandler(oauth_consumer_key, oauth_consumer_secret)
auth.set_access_token(oauth_token, oauth_token_secret)
return auth
import random
import string
def mock_tweet():
"""Generate some random tweet text."""
count = random.randint(70, 140)
return ''.join([random.choice(string.letters) for i in xrange(count)])
from time import sleep
import unittest
from tweepy.api import API
from tweepy.streaming import Stream, StreamListener
from config import create_auth
from mock import mock_tweet
class MockStreamListener(StreamListener):
def __init__(self):
super(MockStreamListener, self).__init__()
self.status_count = 0
def on_status(self, status):
self.status_count += 1
return False
class TweepyStreamTests(unittest.TestCase):
def setUp(self):
self.auth = create_auth()
self.listener = MockStreamListener()
self.stream = Stream(self.auth, self.listener)
def tearDown(self):
self.stream.disconnect()
def test_userstream(self):
self.stream.userstream(async=True)
# Generate random tweet which should show up in the stream.
# Wait a bit of time for it to arrive before asserting.
API(self.auth).update_status(mock_tweet())
sleep(1)
self.assertEqual(self.listener.status_count, 1)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment