首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >螺纹安全管端头

螺纹安全管端头
EN

Stack Overflow用户
提问于 2018-08-25 07:41:51
回答 1查看 251关注 0票数 0

(开始前请注意:虽然我的问题是一般性的,但我的代码需要使用遗留的VisualStudio2008MFC应用程序进行编译,并且必须使用MFC或win32同步,请避免使用ie或c++ 11回答)

我试图实现一个线程安全管道(一个只有一个读取器和一个作者的队列),我做了以下工作:

代码语言:javascript
复制
template<class T>
class CMultiThreadPipe { 

private:
    HANDLE hSemaphore, hTerminateEvent1, hTerminateEvent2;
    CRITICAL_SECTION listMutex; 
    CList<T*, T*> list;

public:
    CMultiThreadPipe() { 
        InitializeCriticalSection(&listMutex);
        hSemaphore = CreateSemaphore(NULL, 0, LONG_MAX, NULL);
        hTerminateEvent1 = ::CreateEvent(NULL, TRUE, FALSE, NULL); 
        hTerminateEvent2 = ::CreateEvent(NULL, TRUE, FALSE, NULL);
    }

    // pdata must be allocated with new. The dequeueing thread will delete it
    void Enqueue(T* pdata) { 
        EnterCriticalSection(&listMutex);
        list.AddHead(pdata);
        LeaveCriticalSection(&listMutex);
        ReleaseSemaphore(hSemaphore, 1, NULL);
    }

    // if Dequeue returns null it means the pipe was destroyed and no further queue method calls are legal
    // Dequeue caller is responsible to delete the returned instance
    T* Dequeue()
    {
        HANDLE handles[] = { hTerminateEvent1, hSemaphore };
        DWORD waitRes = WaitForMultipleObjects(2, handles, FALSE, INFINITE);
        if (waitRes==WAIT_OBJECT_0) {
            SetEvent(hTerminateEvent2);
            return NULL; // terminated
        }
        EnterCriticalSection(&listMutex);
        T* elem = list.RemoveTail(); 
        LeaveCriticalSection(&listMutex);
        return elem; // handler must delete item
    }

    void Destroy() {
        SetEvent(hTerminateEvent1);
        WaitForSingleObject(hTerminateEvent2, INFINITE);
        EnterCriticalSection(&listMutex);
        POSITION pos = list.GetHeadPosition(); 
        for (int i = 0; i < list.GetCount(); i++) delete list.GetNext(pos); 
        LeaveCriticalSection(&listMutex);
        DeleteCriticalSection(&listMutex);
        CloseHandle(hSemaphore);
    }

    ~CMultiThreadPipe() { 
        Destroy();
    }
};

代码的使用方式如下:

代码语言:javascript
复制
class QueueData {
    public:
        QueueData(int i) : m_data(i) {};
        int m_data;
};

UINT DequeueThreadProc(LPVOID dummy);

CMultiThreadedPipe<QueueData>* pPipe = NULL;

void main() {
    pPipe = new CMultiThreadedPipe<QueueData>();
    start new thread running DequeueThreadProc

    int counter=0;
    for (int counter=0; counter<10; counter++)
    {
        pPipe->Enqueue(new QueueData(counter));
        Sleep(300);
    }
    delete pPipe;
}

UINT DequeueThreadProc(LPVOID ignore)
{
    QueueData* queueData;
    while ((queueData = pPipe->Dequeue()) != NULL) {
        delete queueData;
        Sleep(1000);
    };
    return 0;
}

我遇到的问题是,在上面的实现中,当管道被销毁(总是被enqueing线程破坏)时,它正在等待测试线程知道它在删除队列之前终止。它必须这样做,以防止这样的情况下,测试线程试图排排后,管道被破坏。

如果去队列线程没有一直在调用排队列,那么第一个线程将挂在析构函数中,如果在调用之间等待很长时间来排队列,那么第一个线程的析构函数就会相应地停留在那里。

我读过很多关于它的帖子,没有提到安全的毁灭。任何帮助都很感激!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-08-25 12:41:45

对于从多个线程访问的安全销毁对象,需要使用参考计数。在将对象指针传递到新线程之前--增加对对象的引用。当线程不再使用对象时,或者如果创建线程失败,则减少引用计数。当对象上一次引用发布时,您可以安全地调用对象的析构函数。你不需要在这里等任何线索。

另外,为了实现这种队列中的窗口,在用户空间(在称为KQUEUE的内核空间中)存在特殊的对象名为KQUEUE。有了这个对象--实现将更高效、更简单--您不需要管理自列表(代码中的CList),同步访问它--所有这些都将在内核空间为您完成(PostQueuedCompletionStatus,->,KeInsertQueueGetQueuedCompletionStatus )。您只需要创建iocp (kqueue)对象。

代码语言:javascript
复制
class CMultiThreadPipe {

public:

    class __declspec(novtable) QueueData {
    public:

        virtual void ProcessItem() = 0;

        virtual ~QueueData()
        {
            DbgPrint("%x: %s<%p>\n", GetCurrentThreadId(), __FUNCTION__, this);
        }

        QueueData()
        {
            DbgPrint("%x: %s<%p>\n", GetCurrentThreadId(), __FUNCTION__, this);
        }
    };

private:
    HANDLE _hIOCP;
    LONG _dwRef;
    ULONG _nThreads;

    void DequeueThreadProc()
    {
        ULONG NumberOfBytesTransferred;
        QueueData* pData;
        OVERLAPPED* pOverlapped;

        while (GetQueuedCompletionStatus(_hIOCP, 
            &NumberOfBytesTransferred, 
            (ULONG_PTR*)&pData, 
            &pOverlapped, INFINITE))
        {
            if (pData)
            {
                pData->ProcessItem();
            }
            else
            {
                break;
            }
        }

        Release();
    }

    __declspec(noreturn) static DWORD CALLBACK _DequeueThreadProc(PVOID pThis)
    {
        reinterpret_cast<CMultiThreadPipe*>(pThis)->DequeueThreadProc();
        FreeLibraryAndExitThread((HMODULE)&__ImageBase, 0);
    }

    ~CMultiThreadPipe()
    {
        if (_hIOCP)
        {
            CloseHandle(_hIOCP);
        }
    }

public:

    CMultiThreadPipe() : _dwRef(1), _hIOCP(0)
    {
    }

    void AddRef()
    {
        InterlockedIncrement(&_dwRef);
    }

    void Release()
    {
        if (!InterlockedDecrement(&_dwRef))
        {
            delete this;
        }
    }

    ULONG Create(DWORD NumberOfDequeueThreads)
    {
        if (_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, NumberOfDequeueThreads))
        {
            ULONG n = 0;
            do 
            {
                HMODULE hModule;
                if (GetModuleHandleExW(GET_MODULE_HANDLE_EX_FLAG_FROM_ADDRESS, (PCWSTR)_DequeueThreadProc, &hModule))
                {
                    AddRef();

                    if (HANDLE hThread = CreateThread(0, 0, _DequeueThreadProc, this, 0, 0))
                    {
                        CloseHandle(hThread);
                        n++;
                    }
                    else
                    {
                        Release();
                        FreeLibrary(hModule);
                    }
                }

            } while (--NumberOfDequeueThreads);

            _nThreads = n;

            return n ? NOERROR : ERROR_GEN_FAILURE;
        }

        return GetLastError();
    }

    ULONG Enqueue(QueueData* pData)
    {
        return PostQueuedCompletionStatus(_hIOCP, 0, (ULONG_PTR)pData, 0) ? NOERROR : GetLastError();
    }

    void Destroy()
    {
        if (ULONG n = _nThreads)
        {
            do 
            {
                PostQueuedCompletionStatus(_hIOCP, 0, 0, 0);
            } while (--n);
        }
    }
};

和用法:

代码语言:javascript
复制
class QueueData : public CMultiThreadPipe::QueueData
{
    int m_data; 

    virtual void ProcessItem()
    {
        DbgPrint("%x: %s<%p>(%u)\n", GetCurrentThreadId(), __FUNCTION__, this, m_data);
        delete this;
    }
public:
    QueueData(int i) : m_data(i) {};
};

void testQueue()
{
    if (CMultiThreadPipe* pPipe = new CMultiThreadPipe)
    {
        if (pPipe->Create(8) == NOERROR)
        {
            int n = 64;

            do 
            {
                if (QueueData* pData = new QueueData(n))
                {
                    if (pPipe->Enqueue(pData))
                    {
                        delete pData;
                    }
                }
            } while (--n);

            pPipe->Destroy();
        }
        pPipe->Release();
    }
}

注意,使用这样的CMultiThreadPipe实现--当工作线程退出时,您不需要等待。即使您在dll中的代码和卸载dll -您不需要等待。每个线程对对象和模块都有自己的引用。并在出口处释放

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52015122

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档