线程池源代码

//threadpool.h

/************************************************************************/
/* 线程池源码                                                            */
/*作者:双刃剑                                                            */
/*时间:2010-10-28                                                        */
/************************************************************************/

#pragma once
#include <Windows.h>
#include "uxilog.h"
//用户使用定义的工作线程回调
typedef unsigned int (__stdcall *PThrdFunc)(void*);

enum ThreadRunStatus
{
    SUCCESS_RETURN,//正常返回(初始化的)
    PARMNULL_RETURN,//无参数返回
    EXECEPTION_RETURN//异常返回,表示线程还在执行
};

typedef struct _THREADINFO
{
    DWORD                        dwThrdID;        //线程ID
    HANDLE                        hThrd;            //线程句柄
    PThrdFunc                    pCallBack;        //线程回调
    void*                        pParam;            //线程参数
    BOOL                        bRunStatus;        //执行状态(ThreadRunStatus)
    BOOL                        bActive;        //线程是否存活(0,1)
    struct _THREADINFO*            pNext;            //链表队列下一个
}THREADINFO,*PTHREADINFO;

class ThreadPool
{
public:
    explicit ThreadPool(long nMaxCount = 1 );
    virtual ~ThreadPool();
    //提供一个启动和停止函数,便于控制
    BOOL                 start();
    BOOL                 stop();

    //======================================================================
    //创建新线程
    //参数:pCallBack回调工作函数,pParam工作函数的参数,bRun是否是立即执行
    //返回:void*线程地址
    //======================================================================
    void*                 CreateThread( PThrdFunc pCallBack,void* pParam );

    //======================================================================
    //终止线程
    //参数:pThrd线程指针
    //返回:
    //======================================================================
    void                EndThread(void* pThrd);

    __inline long        GetMaxThreadCount()
    {
        return nMaxThrdCount;
    }
    __inline long        GetCurThreadCount()
    {
        return nCurThrdCount;
    }

protected:
private:
    BOOL                bRun;
    long                nMaxThrdCount;//最大线程数
    long                nCurThrdCount;//当前线程数
    PTHREADINFO         pListHead ;//队列的头
    PTHREADINFO         pListTail ;//队列的尾
    long                nListSize;//可用线程链表节点数
    CRITICAL_SECTION    csListHead;//队列头临界区
    CRITICAL_SECTION    csListTail;//队列尾临界区
    static    ThreadPool* pThis;
    PTHREADINFO*        ThrdArray;

private:
    void                clearlistqueue();
    static unsigned int __stdcall    work(void* p);
    //归还线程
    void                revertthread(void* p);

    __inline PTHREADINFO         getfront()
    {
        return pListHead;
    }

    __inline PTHREADINFO         getback()
    {
        return pListTail;
    }

    __inline PTHREADINFO         pop()
    {
        PTHREADINFO pRet = NULL;
        lockH();
        pRet = pListHead;
        if (pListHead != pListTail)
            pListHead = pListHead->pNext;
        unlockH();
        return pRet;
       
    }

    __inline void                push(PTHREADINFO p)
    {
        lockT();
        if (p)
        {
            p->pNext = NULL;
            pListTail->pNext = p;
            pListTail = p;
        }
        unlockT();
    }
   
   

    __inline    void    lockH()
    {
        EnterCriticalSection(&csListHead);
    }
    __inline    void    unlockH()
    {
        LeaveCriticalSection(&csListHead);
    }
    __inline    void    lockT()
    {
        EnterCriticalSection(&csListTail);
    }
    __inline    void    unlockT()
    {
        LeaveCriticalSection(&csListTail);
    }

};

//threadpool.cpp

/************************************************************************/
/* 线程池源码                                                            */
/*作者:双刃剑                                                            */
/*时间:2010-10-28                                                        */
/************************************************************************/

#pragma once
#include "stdafx.h"
#include <process.h>
#include "threadpool.h"

ThreadPool* ThreadPool::pThis = NULL;

ThreadPool:: ThreadPool(long nMaxCount)
{   
    if (nMaxCount < 0)
        exit(0);
    bRun            = FALSE;
    nMaxThrdCount    = nMaxCount;//最大线程数
    nCurThrdCount    = 0;            //当前线程数
    pListHead        = NULL;            //队列的头
    pListTail        = NULL ;        //队列的尾
    nListSize        = nMaxCount;//可用线程链表节点数
    InitializeCriticalSection(&csListHead);
    InitializeCriticalSection(&csListTail);
    pThis = this;
    ThrdArray = new PTHREADINFO[nMaxCount];   
    memset(ThrdArray,0,sizeof(PTHREADINFO)*nMaxThrdCount);
}

ThreadPool:: ~ThreadPool()
{
    if (bRun)
    {
        stop();
    }
    if (ThrdArray)
    {
        delete[] ThrdArray;
        ThrdArray = NULL;
    }
    bRun            = FALSE;
    nMaxThrdCount    = 0;            //最大线程数
    nCurThrdCount    = 0;            //当前线程数
    pListHead        = NULL;            //队列的头
    pListTail        = NULL ;        //队列的尾
    nListSize        = 0;            //可用线程链表节点数
    DeleteCriticalSection(&csListHead);
    DeleteCriticalSection(&csListTail);
}

BOOL                 ThreadPool::start()
{
    if (!bRun)
    {
        PTHREADINFO pThrdInfo = NULL;
        for (int i = 0;i<nMaxThrdCount;++i)
        {
            pThrdInfo = new THREADINFO;
           
            if (NULL == pThrdInfo)
            {
                //清空链队
                clearlistqueue();
                return FALSE;
            }
            memset(pThrdInfo,0,sizeof(THREADINFO));
            pThrdInfo->hThrd = (HANDLE) _beginthreadex(NULL,0,work,pThrdInfo,CREATE_SUSPENDED
                ,(unsigned int *)&pThrdInfo->dwThrdID);
            ThrdArray[i] = pThrdInfo;
            //构造链队
            if (NULL == pListHead   && NULL == pListTail )
            {
                pThrdInfo->pNext = NULL;
                pListHead = pThrdInfo;
                pListTail = pListHead;
                continue;
            }
            else
            {
                pThrdInfo->pNext = pListHead;
                pListHead = pThrdInfo;
                continue;
            }
           
        }
        bRun = TRUE;
        return TRUE;
    }
    return FALSE;
}

BOOL                 ThreadPool::stop()
{
   
    if (bRun)
    {
        int nTerm = 0;
        bRun = FALSE;
        PTHREADINFO pTem ;
        for(int i = 0 ;i<nMaxThrdCount ; ++i)
        {
            pTem = ThrdArray[i];
            if (pTem)
            {
                TerminateThread(pTem->hThrd,0);
            }
        }

        //清空链队
        clearlistqueue();

        return TRUE;

    }

    return FALSE;
}

void                ThreadPool::clearlistqueue()
{
    //清空链队
    pListHead = pListTail = NULL;
    PTHREADINFO pTem;
    for(int i = 0 ;i<nMaxThrdCount ; ++i)
    {
        pTem = ThrdArray[i];
        if (pTem)
            delete pTem;
    }

}

unsigned int __stdcall    ThreadPool::work(void* p)
{
    PTHREADINFO pThrdInfo = (PTHREADINFO)p;
    if (!pThis->bRun)
    {
        pThis->revertthread(pThrdInfo);
    }
    else
    {
        while(pThis->bRun)
        {
            if (pThrdInfo->pCallBack && pThrdInfo->bActive)
            {
                pThrdInfo->pCallBack(pThrdInfo->pParam);
                //过程执行完毕,归还线程
                pThis->revertthread(pThrdInfo);
               
            }
            Sleep(1);
            continue;
        }
    }
   
    return 0;
}

void*                 ThreadPool::CreateThread(PThrdFunc pCallBack,void* pParam)
{
    if (!pCallBack)
        return NULL;
    if (0 == nListSize)
        return NULL;

    PTHREADINFO p = pop();
    if (p)
    {
        p->pCallBack    = pCallBack;
        p->pParam        = pParam;
        p->bActive        = 1;

        p->bRunStatu*ECEPTION_RETURN;//线程正在执行
        ResumeThread(p->hThrd);

        InterlockedIncrement(&nCurThrdCount);
        InterlockedDecrement(&nListSize);
    }
    return p;
}

void                ThreadPool::revertthread(void* pThrd)
{
    PTHREADINFO p = (PTHREADINFO)pThrd;
    if (p)
    {
       
        p->pCallBack = NULL;
        p->pParam    = NULL;
        p->bActive     = 0;
        p->bRunStatus = PARMNULL_RETURN;//线程在挂起执行
        InterlockedIncrement(&nListSize);
        InterlockedDecrement(&nCurThrdCount);
        push(p);
        SuspendThread(p->hThrd);
    }
}
   

void                ThreadPool::EndThread(void* pThrd)
{
    PTHREADINFO p = (PTHREADINFO)pThrd;
    if (p)
    {
        revertthread(p);
    }
}

 

//uxilog.h

#pragma once

//需要打印就定义不需要就不定义
#define DEBUG_LOG

void LogEvenF (char* format,…);
#define LOGEVEN  LogEvenF

 

//uxilog.cpp

 

#pragma once
#include "stdafx.h"
#include <Windows.h>
#include "uxilog.h"
#include <stdarg.h>

void LogEvenF (char* format,…)
{
#ifdef DEBUG_LOG
    char tem[1024]={0};
    va_list vl;
    va_start(vl,format);
    vsprintf(tem,format,vl);
    va_end(vl);
    OutputDebugStringA(tem);
#endif

}

 

 

//demo

// TreadPool_test.cpp : 定义控制台应用程序的入口点。
//

#include "stdafx.h"
#include "threadpool.h"

unsigned int __stdcall func(void* p)
{
    int i = int(p);
    while(1)
    {
        LOGEVEN("%d/n",i);
        Sleep(10);
    }
   
    return 0;
}

int _tmain(int argc, _TCHAR* argv[])
{
    ThreadPool tp(1000);
    tp.start();
    for (int i = 0 ;i<100;i++)
    {
        tp.CreateThread(func,(void*)i);
    }

    Sleep(3000);
    return 1;
}

 

 

Published by

风君子

独自遨游何稽首 揭天掀地慰生平

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注