基本上每一种编程语言都会支持多线程,这里我们只讲C/C++,C/C++是真正的native language。
- Posix Threads(pthread): 对不同platform的支持很好,你可以非常稳定可靠的使用,如果想让你的代码可移植性非常强的话,使用
- std::thread: 是从C++11中引入,在不同的平台上表现各异,比如在一些Android和Win64平台下不支持,或者有性能瓶颈,当然了如果你的代码只是考虑到Linux/gcc平台编译器的话,是个不错的选择,特别是对C++11的类非常友好
- boost::thread: 对
这里,还是采用基于Posix Threads
1 2 3 4 5 6 7 8 9
| static Result ThreadManager::create(ThreadManager** ppInstance, const char* pName); void destroy(); Result registerJobFamily(JobFunc jobFuncAddr, const char* pJobFuncName, JobHandle* phJob); Result unregisterJobFamily(JobFunc jobFuncAddr, const char* pJobFuncName, JobHandle* phJob); Result postJob(JobHandle hJob, void* pData, uint64_t requestId); Result removeJob(JobHandle hJob, void* pData); Result getAllPostedJobs(JobHandle hJob, std::vector<void*>& rPostedJobs); Result flushJob(JobHandle hJob, bool forceFlush); bool isJobAvailable(JobHandle hJob);
| typedef uint64 JobHandle;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| struct RegisteredJob { JobFunc funcAddr; char name[MaxNameLength]; bool isUsed; uint64_t hRegister; uint32_t slot; uint32_t uniqueCounter; OSThreadHandle hRegisteredThread; JobFlushStatus flushStatus; };
typedef void* (*JobFunc)(void* pArg); typedef pthread_t OSThreadHandle;
static const JobFlushStatus Noflush = 0; static const JobFlushStatus FlushRequested = 1; static const JobFlushStatus Flushed = 2;
- funcAddr: 是注册的Job的工作函数
- name: 是注册的Job的名称
- isUsed: 该Job是否已经被使用
- hRegister: 注册的Job的handle值,通过slot和count组合而成
- slot: 注册的Job的slot
- uniqueCounter: 注册的Job的id号
- hRegisteredThread: 注册的线程的句柄
- flushStatus: flush的状态值
1 2 3 4 5 6 7 8 9 10 11 12 13
| struct RuntimeJob { uint64_t hJob; void* pData; uint64_t requestId; JobStatus status; }; enum class JobStatus { Submitted, Ready, OnHold, Stopped, Invalid, };
- hJob: Job的handle唯一标识
- pData: 工作函数携带的私有数据
- requestId: 请求id号,可以通过requestId号来重排PendingQueue中任务的优先级
- status: 工作状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| struct ThreadControl { NativeMutex* pThreadLock; NativeMutex* pFlushJobSubmitLock; NativeMutex* pQueueLock; NativeMutex* pFlushLock; NativeCondition* pReadOK; NativeCondition* pFlushOK; CoreStatus status; volatile bool jobPending; volatile uint32_t blockingStatus; bool isAvailable; }; static const CoreStatus Error = 0; static const CoreStatus Initialized = 1; static const CoreStatus Stopped = 2;
- pThreadLock: 确保线程依次运行,不被抢占
- pFlushJobSubmitLock: 保护flashStatus
- pQueueLock: 保护PendingQueue数据
- pFlushLock: 保护flash状态
- pReadOK: 等待线程被触发信号
- pFlushOK: 等待线程被释放信号
- status: 线程的状态值
- jobPending: 任务队列是否空闲
- blockingStatus: 确保线程释放完成
- isAvailable: 线程是否空闲
1 2 3 4 5 6 7 8 9 10 11 12 13
| struct ThreadData { std::deque<RuntimeJob*> pq; };
struct ThreadConfig { uint32_t threadId; OSThreadHandle hWorkThread; JobFunc workThreadFunc; void* pContext; ThreadData data; ThreadControl ctrl; bool isUsed; };
- threadId: 线程ID号
- hWorkThread: 线程句柄
- workThreadFunc: 线程回调函数
- pContext: 上下文句柄,通常为ThreadManager指针
- data: 挂载在该线程上的等待队列
- ctrl: 线程控制类
- isUsed: 该线程是否被使用
1 2 3 4 5
| char m_name[MaxNameLength]; uint32_t m_totalNumOfRegisteredJob; RegisteredJob m_registeredJobs[MaxRegisteredJobs]; ThreadConfig m_threadWorker[MaxNumThread]; NativeMutex* m_pRegisteredJobLock;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| Result ThreadManager::create(ThreadManager** ppInstance, const char* pName) { Result result = -EFailed; ThreadManager* pLocalInstance = NULL;
pLocalInstance = new ThreadManager(); if(NULL != pLocalInstance) { result = pLocalInstance->initialize(pName); if(Ok != result) { delete pLocalInstance; pLocalInstance = NULL; LOG(ERROR) << "Failed to create threadmanager" << endl; } } if(Ok == result) { *ppInstance = pLocalInstance; } return result; }
Result ThreadManager::initialize(const char* pName) { Result result = Ok; if(NULL == pName) { pName = "ThreadManager"; } ::strncpy(m_name, pName, sizeof(m_name)); m_pRegisteredJobLock = NativeMutex::create(); if(NULL == m_pRegisteredJobLock) { LOG(ERROR) << "Failed to initialize " << pName << endl; result = -EFailed; } else { ::memset(&m_registeredJobs, 0x00, sizeof(m_registeredJobs)); for(uint32_t i = 0; i < MaxNumThread; i++) { m_threadWorker[i].threadId = 0; m_threadWorker[i].workThreadFunc = NULL; m_threadWorker[i].pContext = NULL; ::memset(&m_threadWorker[i].ctrl, 0x00, sizeof(ThreadControl)); m_threadWorker[i].isUsed = false; } }
return result; }
ThreadManager::~ThreadManager() { Result result = Ok;
if(m_totalNumOfRegisteredJob > 0) { RegisteredJob* pRegisteredJob = NULL; for(uint32_t i = 0; i < MaxRegisteredJobs; i++) { pRegisteredJob = &m_registeredJobs[i]; if(0 != pRegisteredJob->hRegister) { result = unregisterJobFamily(NULL, pRegisteredJob->name, pRegisteredJob->hRegister); if(Ok != result) { LOG(ERROR) << "Failed to process unregisterJobFamily slot " << pRegisteredJob->slot << endl; } } } } m_pRegisteredJobLock->destroy(); m_pRegisteredJobLock = NULL; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
| Result ThreadManager::registerJobFamily(JobFunc jobFuncAddr, const char* pJobFuncName, JobHandle* phJob) { Result result = Ok; uint32_t slot = 0; uint32_t count = 0; RegisteredJob* pRegisteredJob = NULL;
bool jobAlreadyRegistered = isJobAlreadyRegistered(phJob); bool freeSlotsAvailable = isFreeSlotAvailable(&slot, &count);
if((true == jobAlreadyRegistered) || (false == freeSlotsAvailable)) { LOG(ERROR) << "Failed to registerJobFamily, jobAlreadyRegistered: " << jobAlreadyRegistered << ", freeSlotsAvailable: " << freeSlotsAvailable << endl; result = -EFailed; } if(Ok == result) { pRegisteredJob = &m_registeredJobs[slot]; ::memset(pRegisteredJob, 0x00, sizeof(RegisteredJob)); pRegisteredJob->funcAddr = jobFuncAddr; pRegisteredJob->slot = slot; pRegisteredJob->uniqueCounter = count; pRegisteredJob->isUsed = true;
::strncpy(pRegisteredJob->name, pJobFuncName, MaxNameLength); *phJob = packJobHandleFromRegisteredJob(pRegisteredJob); pRegisteredJob->hRegister = *phJob; result = startThreads(slot, phJob); if(Ok == result) { if(0 != pRegisteredJob->hRegisteredThread) { setFlushStatus(*phJob, Noflush); } else { LOG(ERROR) << "Failed to get thread handle" << endl; result = -EFailed; } } } return result; }
JobHandle packJobHandleFromRegisteredJob(RegisteredJob* pRegisteredJob) { JobHandle handle = InvalidJobHandle; handle = pRegisteredJob->uniqueCounter; handle = handle << 32; handle |= pRegisteredJob->slot; return handle; }
Result ThreadManager::startThreads(uint32_t slot, JobHandle* phJob) { RegisteredJob* pRegisteredJob = getJobByHandle(*phJob); Result result = Ok; ThreadConfig* pCfg = NULL; pCfg = &m_threadWorker[slot];
if((NULL == pRegisteredJob) || (pCfg->isUsed == true)) { result = -EFailed; } else { pCfg->threadId = pRegisteredJob->uniqueCounter; pCfg->workThreadFunc = workerThreadBody; pCfg->ctrl.pReadOK = NativeCondition::create(); pCfg->ctrl.pThreadLock = NativeMutex::create(); pCfg->ctrl.pFlushJobSubmitLock = NativeMutex::create(); pCfg->ctrl.pQueueLock = NativeMutex::create(); pCfg->ctrl.pFlushOK = NativeCondition::create(); pCfg->ctrl.pFlushLock = NativeMutex::create(); pCfg->ctrl.isAvailable = true; pCfg->pContext = reinterpret_cast<void*>(this); pCfg->isUsed = true;
if((NULL == pCfg->ctrl.pReadOK) || (NULL == pCfg->ctrl.pThreadLock) || (NULL == pCfg->ctrl.pFlushJobSubmitLock) || (NULL == pCfg->ctrl.pQueueLock)) { LOG(ERROR) << "Couldn't read lock resource. slot " << slot << " m_totalNumOfRegisteredJob " << m_totalNumOfRegisteredJob << endl; pCfg->isUsed = false; result = -EFailed; } if(Ok == result) { result = OsUtils::threadCreate(pCfg->workThreadFunc, pCfg, &pCfg->hWorkThread); if(Ok == result) { pRegisteredJob->hRegisteredThread = pCfg->hWorkThread; pCfg->ctrl.pThreadLock->lock(); setStatus(&pCfg->ctrl, Initialized); pCfg->ctrl.pThreadLock->unlock(); } else { LOG(ERROR) << "Couldn't create worker thread, logical threadId " << pCfg->threadId << endl; pCfg->isUsed = false; result = -EFailed; } } } return result; }
void* ThreadManager::workerThreadBody(void* pArg) { ThreadConfig* pConfig = NULL; pConfig = reinterpret_cast<ThreadConfig*>(pArg); if(NULL != pConfig) { ThreadManager* pThreadManager = reinterpret_cast<ThreadManager*>(pConfig->pContext); pThreadManager->doWork(pArg); }
return NULL; }
void* ThreadManager::doWork(void* pArg) { Result result = Ok; ThreadConfig* pConfig = NULL; ThreadControl* pCtrl = NULL;
pConfig = reinterpret_cast<ThreadConfig*>(pArg); pCtrl = &pConfig->ctrl;
if(NULL == pCtrl) { LOG(ERROR) << "Failed to start" << endl; } else { pCtrl->pThreadLock->lock(); while(Stopped != getStatus(pCtrl)) { while((Stopped != getStatus(pCtrl)) && (false == pCtrl->jobPending)) { pCtrl->pReadOK->wait(pCtrl->pThreadLock->getNativeHandle()); } if(Stopped != getStatus(pCtrl)) { pCtrl->jobPending = false; pCtrl->pThreadLock->unlock(); } result = processJobQueue(pConfig); if(true == getFlushBlockStatus(pCtrl)) { pCtrl->pFlushLock->lock(); setFlushBlockStatus(pCtrl, false); pCtrl->pFlushOK->signal(); pCtrl->pFlushLock->unlock(); } pCtrl->pThreadLock->lock(); if(Ok != result) { LOG(ERROR) << "ProcessJobQueue failed with result" << endl; } } pCtrl->pThreadLock->unlock(); } return NULL; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
| Result ThreadManager::postJob(JobHandle hJob, void* pData, uint64_t requestId) { RuntimeJob* pRuntimeJob = NULL; Result result = Ok; ThreadControl* pCtrl = NULL; pCtrl = getThreadControlByHandle(hJob); if(NULL == pCtrl) { LOG(ERROR) << "pCtrl is null" << endl; result = -EFailed; } else { if(Initialized != getStatus(pCtrl)) { LOG(ERROR) << "Thread is not initialized threadId" << endl; result = -EFailed; } else { void* vbuf = malloc(sizeof(RuntimeJob)); ::memset(vbuf, 0x00, sizeof(RuntimeJob)); pRuntimeJob = reinterpret_cast<RuntimeJob*>(vbuf); if(NULL != pRuntimeJob) { pRuntimeJob->hJob = hJob; pRuntimeJob->requestId = requestId; pRuntimeJob->pData = pData;
pCtrl->pFlushJobSubmitLock->lock(); result = addToPriorityQueue(pRuntimeJob); pCtrl->pFlushJobSubmitLock->unlock();
if(Ok != result) { LOG(ERROR) << "Couldn't add job to Priority Queue" << endl; } else { result = trigger(hJob); if(Ok != result) { LOG(ERROR) << "Filed to trigger" << endl; } } } else { LOG(ERROR) << "Failed to create runtimejob out of memory" << endl; result = -ENoMemory; } } } if(result == Ok) { pCtrl->isAvailable = false; } return result; }
Result ThreadManager::trigger(JobHandle hJob) { Result result = Ok; ThreadControl* pCtrl = NULL;
pCtrl = getThreadControlByHandle(hJob); if(NULL == pCtrl) { LOG(ERROR) << "Failed to get threadctrl " << pCtrl << endl; result = -EFailed; } else { pCtrl->pThreadLock->lock(); pCtrl->jobPending = true; pCtrl->pThreadLock->unlock(); pCtrl->pReadOK->signal(); }
return result; }
Result ThreadManager::processJobQueue(void* pCfg) { uint32_t result = Ok; ThreadConfig* pConfig = NULL; ThreadControl* pCtrl = NULL; ThreadData* pData = NULL; RuntimeJob* pJob = NULL; bool isQueued = false;
pConfig = reinterpret_cast<ThreadConfig*>(pCfg); pCtrl = &pConfig->ctrl; pData = &pConfig->data;
pCtrl->pQueueLock->lock(); if(false == pData->pq.empty()) { isQueued = true; } pCtrl->pQueueLock->unlock(); while(true == isQueued) { pCtrl->pQueueLock->lock(); pJob = pData->pq.front(); if(NULL != pJob) { switch(pJob->status) { case JobStatus::Submitted: if(FlushRequested == getFlushStatus(pJob->hJob)) { pJob->status = JobStatus::Stopped; } else if(Noflush == getFlushStatus(pJob->hJob)) { pJob->status = JobStatus::Ready; } break; case JobStatus::Stopped: break; default: break; } pData->pq.pop_front(); pCtrl->pQueueLock->unlock();
if(pJob->status == JobStatus::Ready) { dispatchJob(pJob); } ::free(pJob); pJob = NULL; } pCtrl->pQueueLock->lock(); if(true == pData->pq.empty()) { isQueued = false; pCtrl->isAvailable = true; } pCtrl->pQueueLock->unlock(); } return result; }
void ThreadManager::dispatchJob(RuntimeJob* pJob) { JobFunc func = getJobFunc(pJob->hJob); if(NULL != func) { func(pJob->pData); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| Result ThreadManager::removeJob(JobHandle hJob, void* pRemoveJobData) { Result result = Ok; ThreadData* pData = NULL; ThreadControl* pCtrl = NULL;
pData = getThreadDataByHandle(hJob); pCtrl = getThreadControlByHandle(hJob);
if((NULL == pData) || (NULL == pCtrl)) { LOG(ERROR) << "Failed to get threaddata or threadcontrol" << endl; result = -EFailed; } else { pCtrl->pQueueLock->lock(); for(uint32_t i = 0; i < pData->pq.size(); ++i) { RuntimeJob* pJob = pData->pq[i]; if(pRemoveJobData == pJob->pData) { pData->pq.erase(pData->pq.begin() + i); } } pCtrl->pQueueLock->unlock(); } return result; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| Result ThreadManager::flushJob(JobHandle hJob, bool forceFlush) { Result result = Ok; ThreadControl* pCtrl = getThreadControlByHandle(hJob);
if(NULL == pCtrl) { result = -EFailed; } else { if(Initialized != getStatus(pCtrl)) { LOG(ERROR) << "Thread is not initialized" << endl; result = -EFailed; } else { if(Noflush == getFlushStatus(hJob)) { pCtrl->pFlushJobSubmitLock->lock(); setFlushStatus(hJob, FlushRequested); pCtrl->pFlushJobSubmitLock->unlock(); setFlushBlockStatus(pCtrl, true); result = trigger(hJob); if(Ok == result) { pCtrl->pFlushLock->lock(); if(true == getFlushBlockStatus(pCtrl)) { pCtrl->pFlushOK->wait(pCtrl->pFlushLock->getNativeHandle()); } pCtrl->pFlushLock->unlock(); } else { LOG(ERROR) << "Failed to trigger" << endl; } if(true == forceFlush) { pCtrl->pFlushJobSubmitLock->lock(); setFlushStatus(hJob, Noflush); pCtrl->pFlushJobSubmitLock->unlock(); } } } }
return result; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| Result ThreadManager::unregisterJobFamily(JobFunc jobFuncAddr, const char* pJobFuncName, JobHandle hJob) { Result result = -EFailed; RegisteredJob* pRegisteredJob = NULL;
m_pRegisteredJobLock->lock(); pRegisteredJob = getJobByHandle(hJob); if(true != isJobAlreadyRegistered(&hJob)) { LOG(ERROR) << "Failed to unregister job " << pJobFuncName << " FuncAddr " << jobFuncAddr << " result " << result << endl;; } else if(NULL != pRegisteredJob) { uint32_t slot = pRegisteredJob->slot; result = stopThreads(hJob); if(Ok == result) { m_threadWorker[slot].data.pq.clear(); m_threadWorker[slot].data.pq.shrink_to_fit(); ::memset(&m_registeredJobs[slot], 0x00, sizeof(RegisteredJob)); m_threadWorker[slot].threadId = 0; m_threadWorker[slot].workThreadFunc = NULL; m_threadWorker[slot].pContext = NULL; ::memset(&m_threadWorker[slot].ctrl, 0x00, sizeof(ThreadControl)); m_threadWorker[slot].isUsed = false; } else { LOG(ERROR) << "Failed to stop thread" << endl; } } m_pRegisteredJobLock->unlock();
return result; }
Result ThreadManager::stopThreads(JobHandle hJob) { Result result = Ok; ThreadConfig* pCfg = getThreadConfigByHandle(hJob); ThreadData* pData = getThreadDataByHandle(hJob); ThreadControl* pCtrl = getThreadControlByHandle(hJob);
if((NULL == pCfg) || (NULL == pData) || (NULL == pCtrl)) { LOG(ERROR) << "Failed to get threadconfig/threaddata/threadcontrol" << endl; result = -EFailed; } else { result = flushJob(hJob, false); pCtrl->pFlushJobSubmitLock->lock(); setFlushStatus(hJob, Noflush); pCtrl->pFlushJobSubmitLock->unlock();
pCtrl->pThreadLock->lock(); setStatus(pCtrl, Stopped); pCtrl->pReadOK->signal(); pCtrl->pThreadLock->unlock(); OsUtils::threadTerminate(pCfg->hWorkThread); pCtrl->pReadOK->destroy(); pCtrl->pReadOK = NULL; pCtrl->pThreadLock->destroy(); pCtrl->pThreadLock = NULL;
pCtrl->pFlushJobSubmitLock->destroy(); pCtrl->pFlushJobSubmitLock = NULL;
pCtrl->pQueueLock->destroy(); pCtrl->pQueueLock = NULL;
pCtrl->pFlushLock->destroy(); pCtrl->pFlushLock = NULL;
pCtrl->pFlushOK->destroy(); pCtrl->pFlushLock = NULL; }
return result; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| ThreadManager* threadManager;
static void* testJobFunc(void* ptr) { StopWatch stopWatch("job"); int32_t elapsedMillis = ns2ms(stopWatch.elapsedTime()); LOG(INFO) << "ElapsedMillis: " << elapsedMillis << endl; return NULL; }
int main(int argc, char** argv) { google::InstallFailureSignalHandler(); google::InstallFailureWriter(&signalHandle);
ThreadManager::create(&threadManager, "TestThreadManager"); for(int i = 0; i < TEST_MAX_COUNT; i ++) { jobs[i] = InvalidJobHandle; threadManager->registerJobFamily(testJobFunc, "testJobFunc", &jobs[i]); string serviceName = StringUtils::stringFormat("service-%04d", i); services[i] = hub->getService(serviceName.c_str()); }
****for(int i = 0; i < TEST_MAX_COUNT; i ++) { counts[i] = i; threadManager->postJob(jobs[i], &counts[i], 0); }
return 0; }