首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >ConcurrentHashMap的<String,ConcurrentLinkedQueue>对不一致注册加法

ConcurrentHashMap的<String,ConcurrentLinkedQueue>对不一致注册加法
EN

Stack Overflow用户
提问于 2022-08-20 08:57:01
回答 1查看 114关注 0票数 1

背景

我有一个程序,通过使用Java代理和ASM,它添加了一个包含ConcurrentLinkedQueues of Object数组的静态Object(在下面的代码段中称为ThreadMarkers )。映射的键是线程in,它的值是ThreadMarker的队列。每个队列被添加到给定的.class文件中遇到的每一个新的.class字节码指令,在给定的.class文件中,遇到该行的线程使用它的id属性来标识要添加到映射中的队列。

Code (注意,QueueMapMediator的映射队列不必是ConcurrentLinkedQueues --它们每个只由一个线程访问)。

代码语言:javascript
复制
public class QueueMapMediator {

    private static final ConcurrentHashMap<String, ConcurrentLinkedQueue<ThreadMarker>> queueMap;

    private static final int CAPACITY;

    private static final ConcurrentStack<ConcurrentLinkedQueue<ThreadMarker>> queueStack;

    static {
        CAPACITY = 16; // arbitrary limit - may be assigned any power of 2
        queueMap = new ConcurrentHashMap<>(CAPACITY << 4, 0.75f, CAPACITY << 4);
        queueStack = new ConcurrentStack<>();
        for (int i = 0; i < CAPACITY; i++) {
            queueStack.push(new ConcurrentLinkedQueue<>());
        }
    }
    /**
     * Instantiation disallowed
     */
    private QueueMapMediator() {
    }

    private static ConcurrentLinkedQueue<ThreadMarker> newQueue(String id) {
        ConcurrentLinkedQueue<ThreadMarker> q = queueStack.pop();
        queueMap.put(id, q);
        return q;
    }

    /**
     * Used by java agent to get map entry pertaining to
     * currently executing thread.
     *
     * @param id    Thread id stored as a key in queueMap
     * @return      value associated with key of value id
     */
    public static ConcurrentLinkedQueue<ThreadMarker> getByThreadId(String id) {
        ConcurrentLinkedQueue<ThreadMarker> q = queueMap.get(id);
        return q != null ? q : newQueue(id);
    }

    public static String[][] output() {
        return queueMap.entrySet().stream()
                .map(m -> m.getValue().stream().map(e -> new String[]{
                                String.valueOf(e.getElements()[0]),
                                String.valueOf(e.getElements()[1]),
                                String.valueOf(e.getElements()[2]),
                                m.getKey()})
                        .toArray(String[][]::new))
                .flatMap(Arrays::stream)
                .sorted(QueueMapMediator::compareThreadNanos)
                .toArray(String[][]::new);
    }

    /**
     * @param curr      A String array containing ThreadMarker information
     * @param other     A String array containing ThreadMarker information
     * @return          int result of comparison
     */
    private static int compareThreadNanos(String[] curr, String[] other) {
        long time1 = Long.parseLong(curr[0]);
        long time2 = Long.parseLong(other[0]);
        return Long.compare(time1, time2);
    }

    public static void printOutput() {
        for (String[] arr : output()) {
            System.out.println(Arrays.toString(arr));
        }
    }

}
代码语言:javascript
复制
public class Agent {

    static final long START_TIME = System.nanoTime();

    public static void premain(String args, Instrumentation instrumentation) {
        instrumentation.addTransformer((new ClassFileTransformer() {
            @Override
            public byte[] transform(ClassLoader loader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws IllegalClassFormatException {
                if (loader == null) { // bootstrap loader caught by this check - will not load user's code
                    return null;
                }

                if (className.startsWith("application/")) {
                    return classfileBuffer;
                }

                ClassNode cn = new ClassNode(ASM9);
                ClassReader cr1 = new ClassReader(classfileBuffer);
                cr1.accept(cn, 0);

                for (MethodNode mn : cn.methods) {
                    InsnList insns = mn.instructions;

                    if (insns.size() == 0) {
                        continue;
                    }

                    int lineNum = -1;
                    int l1 = -1;
                    int l2 = -1;
                    AbstractInsnNode node;
                    int numAdded;
                    for (int i = 0; i < insns.size(); i++) {
                        node = insns.get(i);
                        if (node instanceof LineNumberNode) {
                            lineNum = ((LineNumberNode) node).line;
                        } else if (node instanceof LabelNode) {
                            if (l1 == -1) {
                                l1 = i;
                            } else {
                                l2 = i;
                            }
                        } else if (node instanceof FrameNode) {
                            l1 = i;
                        }

                        if (lineNum > -1 && l1 < l2) {
                            InsnList addedInsns = new InsnList();
                            addedInsns.add(new MethodInsnNode(INVOKESTATIC, "java/lang/Thread",
                                    "currentThread", "()Ljava/lang/Thread;", false));
                            addedInsns.add(new MethodInsnNode(INVOKEVIRTUAL, "java/lang/Thread",
                                    "getId", "()J", false));
                            addedInsns.add(new MethodInsnNode(INVOKESTATIC, "java/lang/String",
                                    "valueOf", "(J)Ljava/lang/String;", false));
                            addedInsns.add(new MethodInsnNode(INVOKESTATIC, "application/QueueMapMediator",
                                    "getByThreadId",
                                    "(Ljava/lang/String;)Ljava/util/concurrent/ConcurrentLinkedQueue;",
                                    false));
                            addedInsns.add(new TypeInsnNode(NEW, "application/ThreadMarker"));
                            addedInsns.add(new InsnNode(DUP));
                            addedInsns.add(new MethodInsnNode(INVOKESTATIC, "java/lang/System",
                                    "nanoTime", "()J", false));
                            addedInsns.add(new LdcInsnNode(START_TIME));
                            addedInsns.add(new InsnNode(LSUB));
                            addedInsns.add(new IntInsnNode(BIPUSH, lineNum));
                            addedInsns.add(new LdcInsnNode(className));
                            addedInsns.add(new MethodInsnNode(INVOKESPECIAL, "application/ThreadMarker",
                                    "<init>", "(JILjava/lang/String;)V"));
                            addedInsns.add(new MethodInsnNode(INVOKEVIRTUAL,
                                    "java/util/concurrent/ConcurrentLinkedQueue",
                                    "add", "(Ljava/lang/Object;)Z", false));
                            addedInsns.add(new InsnNode(POP));

                            numAdded = addedInsns.size();
                            insns.insert(insns.get(l1), addedInsns);
                            lineNum = -1;

                            i += numAdded - 1; // -1 to counteract i incrementing with next iteration
                            l1 = -1;
                            l2 = -1;
                        }

                    }

                }

                ClassWriter cw1 = new ClassWriter(ClassWriter.COMPUTE_MAXS);
                cn.accept(cw1);
                return cw1.toByteArray();

            }

        }));

    }

}
代码语言:javascript
复制
/**
 * @param <E>   type contained in Stack
 * @author      Brian Goetz and Tim Peierls
 */
public class ConcurrentStack <E> {
    AtomicReference<Node<E>> top = new AtomicReference<>();
    public void push(E item) {
        Node<E> newHead = new Node<E>(item);
        Node<E> oldHead;
        do {
            oldHead = top.get();
            newHead.next = oldHead;
        } while (!top.compareAndSet(oldHead, newHead));
    }
    public E pop() {
        Node<E> oldHead;
        Node<E> newHead;
        do {
            oldHead = top.get();
            if (oldHead == null)
                return null;
            newHead = oldHead.next;
        } while (!top.compareAndSet(oldHead, newHead));
        return oldHead.item;
    }
    private static class Node <E> {
        public final E item;
        public Node<E> next;
        public Node(E item) {
            this.item = item;
        }
    }
}

问题

我通常得到预期的输出(即每个线程应该有多少个条目,考虑到它们分别遇到的源行的数量),但是偶尔--通常是在一段时间之后第一次运行--一些条目丢失了。

示例

如果我有以下简单的多线程程序:

代码语言:javascript
复制
1  package test.usercode;
2 
3  import application.QueueMapMediator;
4
5  public class BasicMultithreadedPrinting {
6
7      public static void main(String[] args) throws InterruptedException {
8          for (int i = 0; i < 3; i++) {
9              Thread th = new Thread(() -> {
10                 System.out.println("Hello");
11                 System.out.println("Hello again");
12                 System.out.println("Hello once more");
13             });
14             th.start();
15         }
16         QueueMapMediator.printOutput(); // included to view result of instrumentation
17     }
18 }

如果我使用上面的代理来测试它,我可能会得到以下意想不到的输出:

代码语言:javascript
复制
Hello
Hello
Hello
Hello again
Hello again
Hello once more
Hello again
Hello once more
Hello once more
[44959100, 8, test/usercode/BasicMultithreadedPrinting, 1]
[45050000, 9, test/usercode/BasicMultithreadedPrinting, 1]
[45701600, 14, test/usercode/BasicMultithreadedPrinting, 1]
[45762500, 8, test/usercode/BasicMultithreadedPrinting, 1]
[45767300, 9, test/usercode/BasicMultithreadedPrinting, 1]
[45776400, 14, test/usercode/BasicMultithreadedPrinting, 1]
[45813400, 8, test/usercode/BasicMultithreadedPrinting, 1]
[45817400, 9, test/usercode/BasicMultithreadedPrinting, 1]
[45825400, 14, test/usercode/BasicMultithreadedPrinting, 1]
[45862800, 8, test/usercode/BasicMultithreadedPrinting, 1]
[45866700, 9, test/usercode/BasicMultithreadedPrinting, 1]
[45868500, 16, test/usercode/BasicMultithreadedPrinting, 1]
[45892200, 10, test/usercode/BasicMultithreadedPrinting, 23]
[45938500, 11, test/usercode/BasicMultithreadedPrinting, 23]
[46329900, 11, test/usercode/BasicMultithreadedPrinting, 25]
[46500300, 12, test/usercode/BasicMultithreadedPrinting, 23]
[46656400, 12, test/usercode/BasicMultithreadedPrinting, 25]

...where生成的线程之一根本不注册,另一个线程使用id 25注册,只注册了它遇到的3行中的2行。此后,我尝试添加战略性的print行,以确定故障发生的位置,但是,我还没有能够重现这个问题。

预先感谢任何人,谁能通过这个相当长的问题。我已经包含了相当数量的代码,希望它能产生更有意义的建议。我已经将javaagents标记包含在可能的情况下,代理与所描述的问题有关,我没有看到它。

EN

回答 1

Stack Overflow用户

发布于 2022-08-20 14:46:12

如果不同线程调用getByThreadId,重叠get-put的使用将失败,并丢失其中一个值。最好用原子操作来替换,以确保每个条目都有一个队列:

代码语言:javascript
复制
queueMap.computeIfAbsent(id, k -> new ConcurrentLinkedQueue<>() /* or queueStack.pop() */);
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73425360

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档