因此,我一直试图为我的项目实现一个LRU缓存,使用lru_cache。作为参考,我使用了这。下面是引用中使用的代码。
def timed_lru_cache(maxsize, seconds):
def wrapper_cache(func):
func = lru_cache(maxsize=maxsize)(func)
func.lifetime = timedelta(seconds=seconds)
func.expiration = datetime.utcnow() + func.lifetime
@wraps(func)
def wrapped_func(*args, **kwargs):
if datetime.utcnow() >= func.expiration:
func.cache_clear()
func.expiration = datetime.utcnow() + func.lifetime
return func(*args, **kwargs)
return wrapped_func
return wrapper_cache
@timed_lru_cache(maxsize=config.cache_size, seconds=config.ttl)
def load_into_cache(id):
return object在包装功能部分,func.cache_clear()与所有项一起清除整个缓存。我需要帮助,只删除元素超过它的解释时间后插入。附近有工作吗?
发布于 2021-06-02 14:53:37
我不认为调整现有的lru_cache很容易,而且我也不认为链接方法是非常清楚的。
相反,我从零开始实现了一个定时的lru缓存。有关使用情况,请参阅顶部的docstring。
它根据输入的args和kwargs存储密钥,并管理两种结构:
key => (expiry, result)的一个映射每次尝试获取一个项目时,都会在“最近使用”列表中查找密钥。如果它不在那里,它将被添加到列表和映射中。如果它在那里,我们检查过期是否已经过去了。如果是,我们将重新计算结果,并进行更新。否则,我们就可以返回映射中的任何内容。
from datetime import datetime, timedelta
from functools import wraps
from typing import Any, Dict, List, Optional, Tuple
class TimedLRUCache:
""" Cache that caches results based on an expiry time, and on least recently used.
Items are eliminated first if they expire, and then if too many "recent" items are being
stored.
There are two methods of using this cache, either the `get` method`, or calling this as a
decorator. The `get` method accepts any arbitrary function, but on the parameters are
considered in the key, so it is advisable not to mix function.
>>> cache = TimedLRUCache(5)
>>> def foo(i):
... return i + 1
>>> cache.get(foo, 1) # runs foo
>>> cache.get(foo, 1) # returns the previously calculated result
As a decorator is more familiar:
>>> @TimedLRUCache(5)
... def foo(i):
... return i + 1
>>> foo(1) # runs foo
>>> foo(1) # returns the previously calculated result
Either method can allow for fine-grained control of the cache:
>>> five_second_cache = TimedLRUCache(5)
>>> @five_second_cache
... def foo(i):
... return i + 1
>>> five_second_cache.clear_cache() # resets the cache (clear every item)
>>> five_second_cache.prune() # clear invalid items
"""
_items: Dict[int, Tuple[datetime, Any]]
_recently_added: List[int]
delta: timedelta
max_size: int
def __init__(self, seconds: Optional[int] = None, max_size: Optional[int] = None):
self.delta = timedelta(seconds=seconds) if seconds else None
self.max_size = max_size
self._items = {}
self._recently_added = []
def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
return self.get(func, args, kwargs)
return wrapper
@staticmethod
def _get_key(args, kwargs) -> int:
""" Get the thing we're going to use to lookup items in the cache. """
key = (args, tuple(sorted(kwargs.items())))
return hash(key)
def _update(self, key: int, item: Any) -> None:
""" Make sure an item is up to date. """
if key in self._recently_added:
self._recently_added.remove(key)
# the first item in the list is the least recently used
self._recently_added.append(key)
self._items[key] = (datetime.now() + self.delta, item)
# when this function is called, something has changed, so we can also sort out the cache
self.prune()
def prune(self):
""" Clear out everything that no longer belongs in the cache
First delete everything that has expired. Then delete everything that isn't recent (only
if there is a `max_size`).
"""
# clear out anything that no longer belongs in the cache.
current_time = datetime.now()
# first get rid of things which have expired
for key, (expiry, item) in self._items.items():
if expiry < current_time:
del self._items[key]
self._recently_added.remove(key)
# then make sure there aren't too many recent items
if self.max_size:
self._recently_added[:-self.max_size] = []
def clear_cache(self):
""" Clear everything from the cache """
self._items = {}
self._recently_added = []
def get(self, func, args, kwargs):
""" Given a function and its arguments, get the result using the cache
Get the key from the arguments of the function. If the key is in the cache, and the
expiry time of that key hasn't passed, return the result from the cache.
If the key *has* expired, or there are too many "recent" items, recalculate the result,
add it to the cache, and then return the result.
"""
key = self._get_key(args, kwargs)
current_time = datetime.now()
if key in self._recently_added:
# there is something in the cache
expiry, item = self._items.get(key)
if expiry < current_time:
# the item has expired, so we need to get the new value
new_item = func(*args, **kwargs)
self._update(key, new_item)
return new_item
else:
# we can use the existing value
return item
else:
# never seen this before, so add it
new_item = func(*args, **kwargs)
self._update(key, new_item)
return new_itemhttps://stackoverflow.com/questions/67803200
复制相似问题