因此,这似乎是一个非常常见的用例,也许我想得太多了,但我在保留来自多个线程的集中化指标时遇到了问题。假设我有多个工作线程,所有线程都在处理记录,我每处理1000条记录,就想输出一些指标。现在,我可以让每个线程记录单独的指标,但为了获得吞吐量数字,我必须手动将它们相加(当然,时间界限不是很精确)。下面是一个简单的例子:
public class Worker implements Runnable {
private static int count = 0;
private static long processingTime = 0;
public void run() {
while (true) {
...get record
count++;
long start = System.currentTimeMillis();
...do work
long end = System.currentTimeMillis();
processingTime += (end-start);
if (count % 1000 == 0) {
... log some metrics
processingTime = 0;
count = 0;
}
}
}
}希望这能说得通。我还知道两个静态变量可能是AtomicInteger和AtomicLong。。。但也许不是。对人们有什么样的想法感兴趣。我曾考虑过使用原子变量和ReeantrantReadWriteLock -但我真的不希望指标停止处理流程(即指标对处理的影响应该非常非常小)。谢谢。
发布于 2011-05-18 23:22:51
将实际处理任务转移给另一个线程可能是个好主意。其思想是封装数据并将其快速传递给处理线程,以便最大限度地减少对正在执行有意义工作的线程的影响。
有一个小的切换争用,但该成本通常比任何其他类型的同步要小得多,因此在许多情况下它应该是一个很好的候选。我认为M.Jessup的解决方案与我的非常接近,但希望下面的代码能清楚地说明这一点。
public class Worker implements Runnable {
private static final Metrics metrics = new Metrics();
public void run() {
while (true) {
...get record
long start = System.currentTimeMillis();
...do work
long end = System.currentTimeMillis();
// process the metric asynchronously
metrics.addMetric(end - start);
}
}
private static final class Metrics {
// a single "background" thread that actually handles
// processing
private final ExecutorService metricThread =
Executors.newSingleThreadExecutor();
// data (no synchronization needed)
private int count = 0;
private long processingTime = 0;
public void addMetric(final long time) {
metricThread.execute(new Runnable() {
public void run() {
count++;
processingTime += time;
if (count % 1000 == 0) {
... log some metrics
processingTime = 0;
count = 0;
}
}
});
}
}
}发布于 2011-05-18 22:20:34
我建议,如果您不希望日志记录干扰处理,您应该有一个单独的日志工作线程,并让您的处理线程简单地提供某种类型的值对象,以供传递。在本例中,我选择了一个LinkedBlockingQueue,因为它能够使用offer()阻塞很短的时间,您可以将阻塞推迟到另一个从队列中拉取值的线程。您可能需要增加MetricProcessor中的逻辑,以根据您的需求对数据等进行排序,但即使这是一个长时间运行的操作,它也不会阻止VM线程调度器同时重新启动实际的处理线程。
public class Worker implements Runnable {
public void run() {
while (true) {
... do some stuff
if (count % 1000 == 0) {
... log some metrics
if(MetricProcessor.getInstance().addMetrics(
new Metrics(processingTime, count, ...)) {
processingTime = 0;
count = 0;
} else {
//the call would have blocked for a more significant
//amount of time, here the results
//could be abandoned or just held and attempted again
//as a larger data set later
}
}
}
}
}
public class WorkerMetrics {
...some interesting data
public WorkerMetrics(... data){
...
}
...getter setters etc
}
public class MetricProcessor implements Runnable {
LinkedBlockingQueue metrics = new LinkedBlockingQueue();
public boolean addMetrics(WorkerMetrics m) {
return metrics.offer(m); //This may block, but not for a significant amount of time.
}
public void run() {
while(true) {
WorkMetrics m = metrics.take(); //wait here for something to come in
//the above call does all the significant blocking without
//interrupting the real processing
...do some actual logging, aggregation, etc of the metrics
}
}
}发布于 2011-05-18 21:56:03
如果您依赖于count的状态和processingTime的状态来同步,那么您将必须使用锁。例如,当++count % 1000 == 0为true时,您希望评估当时processingTime的指标。
在这种情况下,使用ReentrantLock是有意义的。我不会使用RRWL,因为没有真正发生纯读取的实例。它始终是一个读/写集合。但是你需要锁定所有的
count++
processingTime += (end-start);
if (count % 1000 == 0) {
... log some metrics
processingTime = 0;
count = 0;
}无论count++是否会出现在该位置,您都需要锁定该位置。最后,如果您使用的是锁,则不需要AtomicLong和AtomicInteger。它只会增加开销,并且不是更安全的线程。
https://stackoverflow.com/questions/6045736
复制相似问题