線程池淺析及C++代碼實(shí)現(xiàn)
(1)什么是線程池
線程池是一種多線程處理技術(shù)。線程池先創(chuàng)建好若干線程,并管理這些線程。當(dāng)有新的任務(wù)到來(lái)時(shí),將任務(wù)添加到一個(gè)已創(chuàng)建的空閑線程中執(zhí)行。線程池所創(chuàng)建的線程優(yōu)先級(jí)都是一樣的,所以需要使用特定線程優(yōu)先級(jí)的任務(wù)不宜使用線程池。
(2)線程池的優(yōu)點(diǎn)和應(yīng)用
線程池統(tǒng)一管理線程的方式減少了頻繁創(chuàng)建和銷(xiāo)毀線程的系統(tǒng)調(diào)度開(kāi)銷(xiāo),很大程度上提高了服務(wù)器處理并發(fā)任務(wù)的性能。
線程池適用于頻繁的任務(wù)調(diào)度,如處理HTTP請(qǐng)求,任務(wù)多,并且任務(wù)周期小
(3)C++代碼實(shí)現(xiàn)
#include "stdafx.h"
#include "stdafx.h"
#include
#include
#include
#include
using namespace std;
class ITask
{
public:
virtual void ProcessTask(void* pUser)=0;
};
//線程池
class CThreadPool
{
public:
class ThreadInfo
{
public:
ThreadInfo() { m_hThread=0;m_bBusyWorking=false;}
ThreadInfo(HANDLEhandle, boolbBusy) { m_hThread=handle; m_bBusyWorking=bBusy; }
ThreadInfo(const ThreadInfo& info){ m_hThread=info.m_hThread; m_bBusyWorking=info.m_bBusyWorking;}
HANDLE m_hThread;
bool m_bBusyWorking;
};
typedef mapThreadInfoMap;
typedef ThreadInfoMap::iterator Iterator_ThreadInfoMap;
enum ThreadPoolStatus{ STATUS_BUSY, STATUS_IDLE,STATUS_NORMAL };
public:
CThreadPool()
{
InitializeCriticalSection(&m_CS);
}
virtual ~CThreadPool()
{
DeleteCriticalSection(&m_CS);
}
bool Start(unsigned short nStatic, unsigned short nMax)
{
if(nMax
{
assert(0);
return false;
}
HANDLE hThread;
DWORD nThreadId;
m_nNumberOfStaticThreads=nStatic;
m_nNumberOfTotalThreads=nMax;
//lock the resource
EnterCriticalSection(&m_CS);
//create an IO port
m_hMgrIoPort = CreateIoCompletionPort((HANDLE)INVALID_HANDLE_VALUE,NULL, 0, 0);
hThread = CreateThread(
NULL, // SD
0, // initial stack size
(LPTHREAD_START_ROUTINE)ManagerProc, // threadfunction
(LPVOID)this, // thread argument
0, // creationoption
&nThreadId ); // thread identifier
m_hMgrThread = hThread;
//now we start these worker threads
m_hWorkerIoPort = CreateIoCompletionPort((HANDLE)INVALID_HANDLE_VALUE,NULL, 0, 0);
for(long n = 0; n < nStatic; n++)
{
hThread = CreateThread(
NULL, // SD
0, // initial stack size
(LPTHREAD_START_ROUTINE)WorkerProc, // threadfunction
(LPVOID)this, //thread argument
0, //creation option
&nThreadId );
m_threadMap.insert(m_threadMap.end(),ThreadInfoMap::value_type(nThreadId,ThreadInfo(hThread, false)));
}
LeaveCriticalSection(&m_CS);
return true;
}
void Stop(bool bHash = false)
{
EnterCriticalSection(&m_CS);
?。海篜ostQueuedCompletionStatus(m_hMgrIoPort, 0, 0, (OVERLAPPED*)0xFFFFFFFF);
WaitForSingleObject(m_hMgrThread,INFINITE);
CloseHandle(m_hMgrThread);
CloseHandle(m_hMgrIoPort);
//shut down all the worker threads
UINT nCount=m_threadMap.size();
HANDLE* pThread= new HANDLE[nCount];
long n=0;
ThreadInfo info;
Iterator_ThreadInfoMap i=m_threadMap.begin();
while(i!=m_threadMap.end())
{
?。海篜ostQueuedCompletionStatus(m_hWorkerIoPort, 0, 0, (OVERLAPPED*)0xFFFFFFFF);
info=i->second;
pThread[n++]=info.m_hThread;
i++;
}
DWORD rc=WaitForMultipleObjects(nCount,pThread, TRUE,30000);//wait for 0.5 minutes, then start to killthreads
CloseHandle(m_hWorkerIoPort);
if(rc>=WAIT_OBJECT_0 && rc
{
for(unsigned int n=0;n
{
CloseHandle(pThread[n]);
}
}
else if(rc==WAIT_TIMEOUT&&bHash)
{
//some threadsnot terminated, we have to stop them.
DWORD exitCode;
for(unsigned int i=0; i
{
if (::GetExitCodeThread(pThread[i],&exitCode)==STILL_ACTIVE)
{
TerminateThread(pThread[i], 99);
}
CloseHandle(pThread[i]);
}
}
delete[] pThread;
LeaveCriticalSection(&m_CS);
}
void AddTask(void* pUser, ITask* pWorker)const
{
::PostQueuedCompletionStatus(m_hWorkerIoPort, \
reinterpret_cast(pWorker), \
reinterpret_cast(pUser),\
NULL);
}
protected:
HANDLE GetMgrIoPort()const { return m_hMgrIoPort; }
UINT GetMgrWaitTime()const { return1000; }
HANDLE GetWorkerIoPort()const { return m_hWorkerIoPort; }
private:
static DWORD WINAPI WorkerProc(void* p)
{
//convert the parameter to the server pointer.
CThreadPool* pServer=(CThreadPool*)p;
HANDLE IoPort = pServer->GetWorkerIoPort();
unsigned long pN1,pN2;
OVERLAPPED* pOverLapped;
DWORD threadId=::GetCurrentThreadId();
while(::GetQueuedCompletionStatus(IoPort, &pN1,&pN2,
&pOverLapped, INFINITE))
{
if(pOverLapped ==(OVERLAPPED*)0xFFFFFFFE)
{
pServer->RemoveThread(threadId);
break;
}