cache.py 12.6 KB
Newer Older
1
# Tweepy
2
3
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
4

Mark Smith's avatar
Mark Smith committed
5
6
from __future__ import print_function

7
import time
8
import datetime
Hugo's avatar
Hugo committed
9
import hashlib
10
import threading
Josh Roesslein's avatar
Josh Roesslein committed
11
import os
12
import logging
13
14
15
16
17

try:
    import cPickle as pickle
except ImportError:
    import pickle
18

19
20
21
22
23
24
25
try:
    import fcntl
except ImportError:
    # Probably on a windows system
    # TODO: use win32file
    pass

26
log = logging.getLogger('tweepy.cache')
Josh Roesslein's avatar
Josh Roesslein committed
27

28
class Cache(object):
Josh Roesslein's avatar
Josh Roesslein committed
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
    """Cache interface"""

    def __init__(self, timeout=60):
        """Initialize the cache
            timeout: number of seconds to keep a cached entry
        """
        self.timeout = timeout

    def store(self, key, value):
        """Add new record to cache
            key: entry key
            value: data of entry
        """
        raise NotImplementedError

    def get(self, key, timeout=None):
        """Get cached entry if exists and not expired
            key: which entry to get
            timeout: override timeout with this value [optional]
        """
        raise NotImplementedError

    def count(self):
        """Get count of entries currently stored in cache"""
        raise NotImplementedError

    def cleanup(self):
        """Delete any expired entries in cache."""
        raise NotImplementedError

    def flush(self):
        """Delete all cached entries"""
        raise NotImplementedError

63
64

class MemoryCache(Cache):
Josh Roesslein's avatar
Josh Roesslein committed
65
    """In-memory cache"""
66

Josh Roesslein's avatar
Josh Roesslein committed
67
68
69
70
    def __init__(self, timeout=60):
        Cache.__init__(self, timeout)
        self._entries = {}
        self.lock = threading.Lock()
Josh Roesslein's avatar
Josh Roesslein committed
71

Josh Roesslein's avatar
Josh Roesslein committed
72
73
74
75
76
77
78
79
80
81
82
83
84
85
    def __getstate__(self):
        # pickle
        return {'entries': self._entries, 'timeout': self.timeout}

    def __setstate__(self, state):
        # unpickle
        self.lock = threading.Lock()
        self._entries = state['entries']
        self.timeout = state['timeout']

    def _is_expired(self, entry, timeout):
        return timeout > 0 and (time.time() - entry[0]) >= timeout

    def store(self, key, value):
86
87
88
        self.lock.acquire()
        self._entries[key] = (time.time(), value)
        self.lock.release()
Josh Roesslein's avatar
Josh Roesslein committed
89
90

    def get(self, key, timeout=None):
91
92
        self.lock.acquire()
        try:
Josh Roesslein's avatar
Josh Roesslein committed
93
94
95
96
97
            # check to see if we have this key
            entry = self._entries.get(key)
            if not entry:
                # no hit, return nothing
                return None
98

Josh Roesslein's avatar
Josh Roesslein committed
99
100
            # use provided timeout in arguments if provided
            # otherwise use the one provided during init.
101
102
            if timeout is None:
                timeout = self.timeout
103

Josh Roesslein's avatar
Josh Roesslein committed
104
            # make sure entry is not expired
105
            if self._is_expired(entry, timeout):
Josh Roesslein's avatar
Josh Roesslein committed
106
107
108
                # entry expired, delete and return nothing
                del self._entries[key]
                return None
109

Josh Roesslein's avatar
Josh Roesslein committed
110
111
            # entry found and not expired, return it
            return entry[1]
112
113
        finally:
            self.lock.release()
114

Josh Roesslein's avatar
Josh Roesslein committed
115
116
    def count(self):
        return len(self._entries)
117

Josh Roesslein's avatar
Josh Roesslein committed
118
    def cleanup(self):
119
120
        self.lock.acquire()
        try:
Mark Smith's avatar
Mark Smith committed
121
            for k, v in dict(self._entries).items():
Josh Roesslein's avatar
Josh Roesslein committed
122
123
                if self._is_expired(v, self.timeout):
                    del self._entries[k]
124
125
        finally:
            self.lock.release()
126

Josh Roesslein's avatar
Josh Roesslein committed
127
    def flush(self):
128
129
130
        self.lock.acquire()
        self._entries.clear()
        self.lock.release()
131

132

Josh Roesslein's avatar
Josh Roesslein committed
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
class FileCache(Cache):
    """File-based cache"""

    # locks used to make cache thread-safe
    cache_locks = {}

    def __init__(self, cache_dir, timeout=60):
        Cache.__init__(self, timeout)
        if os.path.exists(cache_dir) is False:
            os.mkdir(cache_dir)
        self.cache_dir = cache_dir
        if cache_dir in FileCache.cache_locks:
            self.lock = FileCache.cache_locks[cache_dir]
        else:
            self.lock = threading.Lock()
            FileCache.cache_locks[cache_dir] = self.lock

150
151
152
153
154
155
156
        if os.name == 'posix':
            self._lock_file = self._lock_file_posix
            self._unlock_file = self._unlock_file_posix
        elif os.name == 'nt':
            self._lock_file = self._lock_file_win32
            self._unlock_file = self._unlock_file_win32
        else:
157
            log.warning('FileCache locking not supported on this system!')
158
159
160
            self._lock_file = self._lock_file_dummy
            self._unlock_file = self._unlock_file_dummy

Josh Roesslein's avatar
Josh Roesslein committed
161
162
    def _get_path(self, key):
        md5 = hashlib.md5()
Mark Smith's avatar
Mark Smith committed
163
        md5.update(key.encode('utf-8'))
Josh Roesslein's avatar
Josh Roesslein committed
164
165
        return os.path.join(self.cache_dir, md5.hexdigest())

166
167
168
169
170
171
172
    def _lock_file_dummy(self, path, exclusive=True):
        return None

    def _unlock_file_dummy(self, lock):
        return

    def _lock_file_posix(self, path, exclusive=True):
Josh Roesslein's avatar
Josh Roesslein committed
173
174
175
176
177
178
179
180
181
182
183
184
        lock_path = path + '.lock'
        if exclusive is True:
            f_lock = open(lock_path, 'w')
            fcntl.lockf(f_lock, fcntl.LOCK_EX)
        else:
            f_lock = open(lock_path, 'r')
            fcntl.lockf(f_lock, fcntl.LOCK_SH)
        if os.path.exists(lock_path) is False:
            f_lock.close()
            return None
        return f_lock

185
186
187
188
189
190
191
192
193
194
195
    def _unlock_file_posix(self, lock):
        lock.close()

    def _lock_file_win32(self, path, exclusive=True):
        # TODO: implement
        return None

    def _unlock_file_win32(self, lock):
        # TODO: implement
        return

Josh Roesslein's avatar
Josh Roesslein committed
196
197
    def _delete_file(self, path):
        os.remove(path)
198
199
        if os.path.exists(path + '.lock'):
            os.remove(path + '.lock')
Josh Roesslein's avatar
Josh Roesslein committed
200
201
202

    def store(self, key, value):
        path = self._get_path(key)
203
204
        self.lock.acquire()
        try:
Josh Roesslein's avatar
Josh Roesslein committed
205
206
207
208
209
210
211
212
213
            # acquire lock and open file
            f_lock = self._lock_file(path)
            datafile = open(path, 'wb')

            # write data
            pickle.dump((time.time(), value), datafile)

            # close and unlock file
            datafile.close()
214
            self._unlock_file(f_lock)
215
216
        finally:
            self.lock.release()
Josh Roesslein's avatar
Josh Roesslein committed
217
218
219
220
221
222
223
224

    def get(self, key, timeout=None):
        return self._get(self._get_path(key), timeout)

    def _get(self, path, timeout):
        if os.path.exists(path) is False:
            # no record
            return None
225
226
        self.lock.acquire()
        try:
Josh Roesslein's avatar
Josh Roesslein committed
227
228
229
230
231
232
233
234
235
            # acquire lock and open
            f_lock = self._lock_file(path, False)
            datafile = open(path, 'rb')

            # read pickled object
            created_time, value = pickle.load(datafile)
            datafile.close()

            # check if value is expired
236
237
            if timeout is None:
                timeout = self.timeout
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
238
239
240
241
242
            if timeout > 0:
                if (time.time() - created_time) >= timeout:
                    # expired! delete from cache
                    value = None
                    self._delete_file(path)
Josh Roesslein's avatar
Josh Roesslein committed
243
244

            # unlock and return result
245
            self._unlock_file(f_lock)
Josh Roesslein's avatar
Josh Roesslein committed
246
            return value
247
248
        finally:
            self.lock.release()
Josh Roesslein's avatar
Josh Roesslein committed
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269

    def count(self):
        c = 0
        for entry in os.listdir(self.cache_dir):
            if entry.endswith('.lock'):
                continue
            c += 1
        return c

    def cleanup(self):
        for entry in os.listdir(self.cache_dir):
            if entry.endswith('.lock'):
                continue
            self._get(os.path.join(self.cache_dir, entry), None)

    def flush(self):
        for entry in os.listdir(self.cache_dir):
            if entry.endswith('.lock'):
                continue
            self._delete_file(os.path.join(self.cache_dir, entry))

Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
270

gilles's avatar
gilles committed
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
class MemCacheCache(Cache):
    """Cache interface"""

    def __init__(self, client, timeout=60):
        """Initialize the cache
            client: The memcache client
            timeout: number of seconds to keep a cached entry
        """
        self.client = client
        self.timeout = timeout

    def store(self, key, value):
        """Add new record to cache
            key: entry key
            value: data of entry
        """
        self.client.set(key, value, time=self.timeout)

    def get(self, key, timeout=None):
        """Get cached entry if exists and not expired
            key: which entry to get
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
292
293
            timeout: override timeout with this value [optional].
            DOES NOT WORK HERE
gilles's avatar
gilles committed
294
        """
295
        return self.client.get(key)
gilles's avatar
gilles committed
296
297
298

    def count(self):
        """Get count of entries currently stored in cache. RETURN 0"""
299
        raise NotImplementedError
gilles's avatar
gilles committed
300
301
302

    def cleanup(self):
        """Delete any expired entries in cache. NO-OP"""
303
        raise NotImplementedError
gilles's avatar
gilles committed
304
305
306

    def flush(self):
        """Delete all cached entries. NO-OP"""
rogelio's avatar
rogelio committed
307
308
        raise NotImplementedError

Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
309

rogelio's avatar
rogelio committed
310
class RedisCache(Cache):
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
311
    """Cache running in a redis server"""
rogelio's avatar
rogelio committed
312

Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
313
314
315
316
    def __init__(self, client,
                 timeout=60,
                 keys_container='tweepy:keys',
                 pre_identifier='tweepy:'):
rogelio's avatar
rogelio committed
317
318
319
320
321
322
323
324
325
326
        Cache.__init__(self, timeout)
        self.client = client
        self.keys_container = keys_container
        self.pre_identifier = pre_identifier

    def _is_expired(self, entry, timeout):
        # Returns true if the entry has expired
        return timeout > 0 and (time.time() - entry[0]) >= timeout

    def store(self, key, value):
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
327
328
329
        """Store the key, value pair in our redis server"""
        # Prepend tweepy to our key,
        # this makes it easier to identify tweepy keys in our redis server
rogelio's avatar
rogelio committed
330
331
332
333
334
335
336
337
338
339
340
341
342
        key = self.pre_identifier + key
        # Get a pipe (to execute several redis commands in one step)
        pipe = self.client.pipeline()
        # Set our values in a redis hash (similar to python dict)
        pipe.set(key, pickle.dumps((time.time(), value)))
        # Set the expiration
        pipe.expire(key, self.timeout)
        # Add the key to a set containing all the keys
        pipe.sadd(self.keys_container, key)
        # Execute the instructions in the redis server
        pipe.execute()

    def get(self, key, timeout=None):
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
343
        """Given a key, returns an element from the redis table"""
rogelio's avatar
rogelio committed
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
        key = self.pre_identifier + key
        # Check to see if we have this key
        unpickled_entry = self.client.get(key)
        if not unpickled_entry:
            # No hit, return nothing
            return None

        entry = pickle.loads(unpickled_entry)
        # Use provided timeout in arguments if provided
        # otherwise use the one provided during init.
        if timeout is None:
            timeout = self.timeout

        # Make sure entry is not expired
        if self._is_expired(entry, timeout):
            # entry expired, delete and return nothing
            self.delete_entry(key)
            return None
        # entry found and not expired, return it
        return entry[1]

    def count(self):
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
366
367
368
        """Note: This is not very efficient,
        since it retreives all the keys from the redis
        server to know how many keys we have"""
rogelio's avatar
rogelio committed
369
370
371
        return len(self.client.smembers(self.keys_container))

    def delete_entry(self, key):
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
372
        """Delete an object from the redis table"""
rogelio's avatar
rogelio committed
373
374
375
376
377
378
        pipe = self.client.pipeline()
        pipe.srem(self.keys_container, key)
        pipe.delete(key)
        pipe.execute()

    def cleanup(self):
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
379
        """Cleanup all the expired keys"""
rogelio's avatar
rogelio committed
380
381
382
383
384
385
386
387
388
        keys = self.client.smembers(self.keys_container)
        for key in keys:
            entry = self.client.get(key)
            if entry:
                entry = pickle.loads(entry)
                if self._is_expired(entry, self.timeout):
                    self.delete_entry(key)

    def flush(self):
Omer Murat Yildirim's avatar
Omer Murat Yildirim committed
389
        """Delete all entries from the cache"""
rogelio's avatar
rogelio committed
390
391
392
        keys = self.client.smembers(self.keys_container)
        for key in keys:
            self.delete_entry(key)
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432


class MongodbCache(Cache):
    """A simple pickle-based MongoDB cache sytem."""

    def __init__(self, db, timeout=3600, collection='tweepy_cache'):
        """Should receive a "database" cursor from pymongo."""
        Cache.__init__(self, timeout)
        self.timeout = timeout
        self.col = db[collection]
        self.col.create_index('created', expireAfterSeconds=timeout)

    def store(self, key, value):
        from bson.binary import Binary

        now = datetime.datetime.utcnow()
        blob = Binary(pickle.dumps(value))

        self.col.insert({'created': now, '_id': key, 'value': blob})

    def get(self, key, timeout=None):
        if timeout:
            raise NotImplementedError
        obj = self.col.find_one({'_id': key})
        if obj:
            return pickle.loads(obj['value'])

    def count(self):
        return self.col.find({}).count()

    def delete_entry(self, key):
        return self.col.remove({'_id': key})

    def cleanup(self):
        """MongoDB will automatically clear expired keys."""
        pass

    def flush(self):
        self.col.drop()
        self.col.create_index('created', expireAfterSeconds=self.timeout)