对于调试来说,判断特定函数是否位于调用堆栈的更高位置通常是很有用的。例如,我们通常只希望在某个函数调用我们时才运行调试代码。
一种解决方案是在更高的层次上检查所有堆栈条目,但这是在堆栈中的一个函数中,并且反复调用,这会导致过多的开销。问题是找到一种方法,允许我们以一种合理有效的方式确定某个函数是否在调用堆栈中处于较高的位置。
相似
发布于 2009-09-10 05:16:11
除非你想要的函数做一些非常特别的事情来标记“我的一个实例在堆栈上是活跃的”(IOW:如果函数是原始的、不可触摸的,并且不可能知道你的这种特殊的需求),那么在你到达你感兴趣的函数的顶部(而且函数不在那里)或堆栈框架之前,没有其他办法可以替代你一帧一帧地在堆栈上行走。正如对这个问题的一些评论所表明的,是否值得努力优化这个问题是非常值得怀疑的。但是,假设为了证明这是值得的.:
编辑:最初的答案(由OP编写)有许多缺陷,但有些已经修复,所以我正在编辑,以反映当前的情况和为什么某些方面是重要的。
首先,在装饰器中使用try/except或with是至关重要的,这样就可以正确地考虑被监视函数的退出,而不仅仅是普通的退出(正如OP自己的答案的最初版本所做的那样)。
第二,每个装饰师都应该确保它保持装饰函数的__name__和__doc__完整--这就是functools.wraps的作用(还有其他方法,但wraps使其变得最简单)。
第三,与第一点一样重要的是,set (最初由OP选择的数据结构)是错误的选择:函数可以多次出现在堆栈上(直接或间接递归)。我们显然需要一个“多套”(也称为“包”),一套类似结构的跟踪“多少次”每个项目是存在的。在Python中,多集的自然实现是将键映射为计数,而计数作为collections.defaultdict(int)最容易实现。
第四,一般的方法应该是threadsafe (至少可以轻松完成;-)。幸运的是,如果适用的话,threading.local使它变得微不足道--在这里,它肯定是这样的(每个堆栈都有自己独立的调用线程)。
第五,在一些评论中提出了一个有趣的问题(注意到提供的装饰师在某些答案中与其他装饰师玩得有多糟糕:监视装饰师似乎是最后(最外面)的一个,否则检查就中断了。这是自然但不幸的选择,使用函数对象本身作为监视dict的关键。
我建议通过一个不同的键选择来解决这个问题:让装饰器使用一个(字符串,比如说) identifier参数,该参数必须是唯一的(在每个给定的线程中),并将标识符作为密钥使用到监视dict中。当然,检查堆栈的代码必须知道标识符并使用它。
在装饰时,装饰者可以检查唯一性属性(通过使用单独的集合)。标识符可以默认为函数名(因此,只需要显式地保持在同一个命名空间中监视同名函数的灵活性);当为了监视目的而将多个受监视的函数视为“相同的”时,可以显式放弃唯一性属性(如果给定的def语句要在稍微不同的上下文中多次执行,以便使程序员为了监视目的要考虑“相同功能”的几个函数对象),则可能会出现这种情况。最后,对于已知不可能进行进一步修饰的罕见情况(因为在这些情况下,函数对象可能是保证唯一性的最简便的方法),应该可以选择性地恢复到"function作为标识符“。
因此,将这些考虑因素结合在一起,我们可以得到(包括可能已经在工具箱模块中的threadlocal_var实用程序函数;-)如下所示.:
import collections
import functools
import threading
threadlocal = threading.local()
def threadlocal_var(varname, factory, *a, **k):
v = getattr(threadlocal, varname, None)
if v is None:
v = factory(*a, **k)
setattr(threadlocal, varname, v)
return v
def monitoring(identifier=None, unique=True, use_function=False):
def inner(f):
assert (not use_function) or (identifier is None)
if identifier is None:
if use_function:
identifier = f
else:
identifier = f.__name__
if unique:
monitored = threadlocal_var('uniques', set)
if identifier in monitored:
raise ValueError('Duplicate monitoring identifier %r' % identifier)
monitored.add(identifier)
counts = threadlocal_var('counts', collections.defaultdict, int)
@functools.wraps(f)
def wrapper(*a, **k):
counts[identifier] += 1
try:
return f(*a, **k)
finally:
counts[identifier] -= 1
return wrapper
return inner我还没有测试这段代码,所以它可能包含一些错误或类似的内容,但是我提供它是因为我希望它涵盖了我前面解释的所有重要的技术要点。
一切都值得吗?可能不是,正如前面所解释的。然而,我认为“如果它是值得做的,那么它是值得做正确的”;-)。
发布于 2009-09-10 06:13:49
我不太喜欢这种方法,但下面是你正在做的事情的一个修正版本:
from collections import defaultdict
import threading
functions_on_stack = threading.local()
def record_function_on_stack(f):
def wrapped(*args, **kwargs):
if not getattr(functions_on_stack, "stacks", None):
functions_on_stack.stacks = defaultdict(int)
functions_on_stack.stacks[wrapped] += 1
try:
result = f(*args, **kwargs)
finally:
functions_on_stack.stacks[wrapped] -= 1
if functions_on_stack.stacks[wrapped] == 0:
del functions_on_stack.stacks[wrapped]
return result
wrapped.orig_func = f
return wrapped
def function_is_on_stack(f):
return f in functions_on_stack.stacks
def nested():
if function_is_on_stack(test):
print "nested"
@record_function_on_stack
def test():
nested()
test()这将处理递归、线程和异常。
我不喜欢这种做法有两个原因:
如果函数被进一步修饰,
更好的方法是直接检查堆栈(可能是作为速度的本机扩展),如果可能的话,可以找到一种方法来缓存堆栈帧生存期的结果。(不过,如果不修改Python核心,我不确定这是否可能。)
https://stackoverflow.com/questions/1403471
复制相似问题