Commit 45bb5a15 authored by Josh Roesslein's avatar Josh Roesslein
Browse files

Added unit tests for caches. Added count() method to cache interface.

parent a436a4c4
......@@ -4,6 +4,7 @@
import unittest
import random
from time import sleep
from tweepy import *
......@@ -99,6 +100,7 @@ class TweepyAPITests(unittest.TestCase):
self.assert_(isinstance(source, Friendship))
self.assert_(isinstance(target, Friendship))
# Authentication tests
class TweepyAuthTests(unittest.TestCase):
consumer_key = 'ZbzSsdQj7t68VYlqIFvdcA'
......@@ -127,6 +129,46 @@ class TweepyAuthTests(unittest.TestCase):
api = API(auth)
api.update_status('test %i' % random.randint(1,1000))
# Cache tests
class TweepyCacheTests(unittest.TestCase):
timeout = 2.0
def _run_tests(self):
# test store and get
self.cache.store('testkey', 'testvalue')
self.assertEqual(self.cache.count(), 1, 'Count is wrong')
self.assertEqual(self.cache.get('testkey'), 'testvalue', 'Stored value does not match retrieved value')
# test timeout
sleep(self.timeout)
self.assertEqual(self.cache.get('testkey'), None, 'Cache entry should have expired')
# test cleanup
self.cache.store('testkey', 'testvalue')
sleep(self.timeout)
self.cache.cleanup()
self.assertEqual(self.cache.count(), 0, 'Cache cleanup failed')
# test flush
for i in range(0,10):
self.cache.store('testkey%i' % i, 'testvalue')
self.cache.flush()
self.assertEqual(self.cache.count(), 0, 'Cache failed to flush')
def testmemorycache(self):
self.cache = MemoryCache(timeout=self.timeout)
self._run_tests()
def testfilecache(self):
os.mkdir('cache_test_dir')
self.cache = FileCache('cache_test_dir', self.timeout)
self._run_tests()
self.cache.flush()
os.rmdir('cache_test_dir')
if __name__ == '__main__':
unittest.main()
......@@ -37,6 +37,10 @@ class Cache(object):
"""
raise NotImplementedError
def count(self):
"""Get count of entries currently stored in cache"""
raise NotImplementedError
def cleanup(self):
"""Delete any expired entries in cache."""
raise NotImplementedError
......@@ -91,6 +95,9 @@ class MemoryCache(Cache):
# entry found and not expired, return it
return entry[1]
def count(self):
return len(self._entries)
def cleanup(self):
with self.lock:
for k,v in self._entries.items():
......@@ -184,6 +191,13 @@ class FileCache(Cache):
f_lock.close()
return value
def count(self):
c = 0
for entry in os.listdir(self.cache_dir):
if entry.endswith('.lock'): continue
c += 1
return c
def cleanup(self):
for entry in os.listdir(self.cache_dir):
if entry.endswith('.lock'): continue
......@@ -219,6 +233,10 @@ class MemCache(Cache):
return value
def count(self):
# TODO: implement
raise NotImplementedError
def cleanup(self):
# not implemented for this cache
return
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment