機電之家資源網(wǎng)
單片機首頁|單片機基礎|單片機應用|單片機開發(fā)|單片機文案|軟件資料下載|音響制作|電路圖下載 |嵌入式開發(fā)
培訓信息
贊助商
linux線程池(zt)
linux線程池(zt)
 更新時間:2009-8-12 17:22:13  點擊數(shù):0
【字體: 字體顏色

http://blog.csdn.net/phus/archive/2005/06/09/390745.aspx
thrmgr.h文件

/*
* Copyright (C) 2004 Trog
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/

#ifndef __THRMGR_H__
#define __THRMGR_H__

#include
#include

typedef struct work_item_tag {
struct work_item_tag *next;
void *data;
struct timeval time_queued;
} work_item_t;

typedef struct work_queue_tag {
work_item_t *head;
work_item_t *tail;
int item_count;
} work_queue_t;

typedef enum {
POOL_INVALID,
POOL_VALID,
POOL_EXIT,
} pool_state_t;

typedef struct threadpool_tag {
pthread_mutex_t pool_mutex;
pthread_cond_t pool_cond;
pthread_attr_t pool_attr;

pool_state_t state;
int thr_max;
int thr_alive;
int thr_idle;
int idle_timeout;

void (*handler)(void *);

work_queue_t *queue;
} threadpool_t;

threadpool_t *thrmgr_new(int max_threads, int idle_timeout, void (*handler)(void *));
void thrmgr_destroy(threadpool_t *threadpool);
int thrmgr_dispatch(threadpool_t *threadpool, void *user_data);

#endif


thrmgr.c文件

/*
* Copyright (C) 2004 Trog
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/

#include
#include
#include

#include "thrmgr.h"

#include "others.h"
#include "memory.h"
#include "output.h"

#define FALSE (0)
#define TRUE (1)

work_queue_t *work_queue_new()
{
work_queue_t *work_q;

work_q = (work_queue_t *) mmalloc(sizeof(work_queue_t));

work_q->head = work_q->tail = NULL;
work_q->item_count = 0;
return work_q;
}

void work_queue_add(work_queue_t *work_q, void *data)
{
work_item_t *work_item;

if (!work_q) {
return;
}
work_item = (work_item_t *) mmalloc(sizeof(work_item_t));
work_item->next = NULL;
work_item->data = data;
gettimeofday(&(work_item->time_queued), NULL);

if (work_q->head == NULL) {
work_q->head = work_q->tail = work_item;
work_q->item_count = 1;
} else {
work_q->tail->next = work_item;
work_q->tail = work_item;
work_q->item_count++;
}
return;
}

void *work_queue_pop(work_queue_t *work_q)
{
work_item_t *work_item;
void *data;

if (!work_q || !work_q->head) {
return NULL;
}
work_item = work_q->head;
data = work_item->data;
work_q->head = work_item->next;
if (work_q->head == NULL) {
work_q->tail = NULL;
}
free(work_item);
return data;
}

void thrmgr_destroy(threadpool_t *threadpool)
{
if (!threadpool || (threadpool->state != POOL_VALID)) {
return;
}
if (pthread_mutex_lock(&threadpool->pool_mutex) != 0) {
logg("!Mutex lock failed\n");
exit(-1);
}
threadpool->state = POOL_EXIT;

/* wait for threads to exit */
if (threadpool->thr_alive > 0) {

/*通知兄弟們收工*/
if (pthread_cond_broadcast(&(threadpool->pool_cond)) != 0) {
pthread_mutex_unlock(&threadpool->pool_mutex);
return;
}
}
while (threadpool->thr_alive > 0) {

/*原來是這位老兄負責等最后一名兄弟的信號啊*/
if (pthread_cond_wait (&threadpool->pool_cond, &threadpool->pool_mutex) != 0) {
pthread_mutex_unlock(&threadpool->pool_mutex);
return;
}
}
if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) {
logg("!Mutex unlock failed\n");
exit(-1);
}

pthread_mutex_destroy(&(threadpool->pool_mutex));
pthread_cond_destroy(&(threadpool->pool_cond));
pthread_attr_destroy(&(threadpool->pool_attr));
free(threadpool);
return;
}

threadpool_t *thrmgr_new(int max_threads, int idle_timeout, void (*handler)(void *))
{
threadpool_t *threadpool;

if (max_threads <= 0) {
return NULL;
}

threadpool = (threadpool_t *) mmalloc(sizeof(threadpool_t));

threadpool->queue = work_queue_new();
if (!threadpool->queue) {
free(threadpool);
return NULL;
}
threadpool->thr_max = max_threads;
threadpool->thr_alive = 0;
threadpool->thr_idle = 0;
threadpool->idle_timeout = idle_timeout;
threadpool->handler = handler;

pthread_mutex_init(&(threadpool->pool_mutex), NULL);
if (pthread_cond_init(&(threadpool->pool_cond), NULL) != 0) {
free(threadpool);
return NULL;
}

if (pthread_attr_init(&(threadpool->pool_attr)) != 0) {
free(threadpool);
return NULL;
}

if (pthread_attr_setdetachstate(&(threadpool->pool_attr), PTHREAD_CREATE_DETACHED) != 0) {
free(threadpool);
return NULL;
}
threadpool->state = POOL_VALID;

return threadpool;
}

/*工作線程.該工作線程遍歷工作鏈表,如果有活干就干,沒活干就等活干,難怪叫民工

*/

void *thrmgr_worker(void *arg)
{
threadpool_t *threadpool = (threadpool_t *) arg;
void *job_data;
int retval, must_exit = FALSE;
struct timespec timeout;

/* loop looking for work */
for (;;) {
if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
/* Fatal error */
logg("!Fatal: mutex lock failed\n");
exit(-2);
}
timeout.tv_sec = time(NULL) + threadpool->idle_timeout;
timeout.tv_nsec = 0;
threadpool->thr_idle++;
while (((job_data=work_queue_pop(threadpool->queue)) == NULL)
&& (threadpool->state != POOL_EXIT)) {
/* Sleep, awaiting wakeup ,注意,民工等一段時間,如果沒有活干就結(jié)束該線程*/
retval = pthread_cond_timedwait(&(threadpool->pool_cond),
&(threadpool->pool_mutex), &timeout);
if (retval == ETIMEDOUT) {
must_exit = TRUE;
break;
}
}
threadpool->thr_idle--;//要干活了,閑著的民工少了一位
if (threadpool->state == POOL_EXIT) {
must_exit = TRUE;
}

if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
/* Fatal error */
logg("!Fatal: mutex unlock failed\n");
exit(-2);
}
if (job_data) {
threadpool->handler(job_data);
} else if (must_exit) {

/*如果沒有等到活或者要結(jié)束整個線程池時,該線程收工*/
break;
}
}
if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
/* Fatal error */
logg("!Fatal: mutex lock failed\n");
exit(-2);
}
threadpool->thr_alive--;//活干完了,該走人了(人又少了一個)
if (threadpool->thr_alive == 0) {
/* signal that all threads are finished */
pthread_cond_broadcast(&threadpool->pool_cond);//人都跑光了,誰還聽得到這個信號?多次一舉嗎?
}
if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
/* Fatal error */
logg("!Fatal: mutex unlock failed\n");
exit(-2);
}
return NULL;
}

/*創(chuàng)建一個工作線程,如果目前有等待條件信號的工作線程,則喚醒該工作線程處理數(shù)據(jù)

*/

int thrmgr_dispatch(threadpool_t *threadpool, void *user_data)
{
pthread_t thr_id;

if (!threadpool) {
return FALSE;
}

/* Lock the threadpool */
if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
logg("!Mutex lock failed\n");
return FALSE;
}

if (threadpool->state != POOL_VALID) {
if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
logg("!Mutex unlock failed\n");
return FALSE;
}
return FALSE;
}
work_queue_add(threadpool->queue, user_data);

/*只有當目前沒有線程idle且目前生成的線程數(shù)小于最大線程要求時

*創(chuàng)建新的線程

*/

if ((threadpool->thr_idle == 0) &&
(threadpool->thr_alive < threadpool->thr_max)) {
/* Start a new thread */
if (pthread_create(&thr_id, &(threadpool->pool_attr),
thrmgr_worker, threadpool) != 0) {
logg("!pthread_create failed\n");
} else {
threadpool->thr_alive++;
}
}

/*釋放條件信號,如果有正在等待該信號的線程,則該線程運行*/
pthread_cond_signal(&(threadpool->pool_cond));

if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
logg("!Mutex unlock failed\n");
return FALSE;
}
return TRUE;
}

/*
使用方法,以一個tcp服務器為例,簡單列出,可能有問題,請牛人指正.

1, thrmgr_new初始話

2, while(1)

{

accept(......);

//構(gòu)建輸入?yún)?shù)

thrmgr_dispach(...);

}

thrmgr_destory(...);

*/

Posted by rui at August 21, 2005 0
  • 上一篇: Linux嵌入式系統(tǒng)與硬件平臺的關系-
  • 下一篇: 沒有了
  • 發(fā)表評論   告訴好友   打印此文  收藏此頁  關閉窗口  返回頂部
    熱點文章
     
    推薦文章
     
    相關文章
    網(wǎng)友評論:(只顯示最新5條。)
    關于我們 | 聯(lián)系我們 | 廣告合作 | 付款方式 | 使用幫助 | 機電之家 | 會員助手 | 免費鏈接

    點擊這里給我發(fā)消息66821730(技術支持)點擊這里給我發(fā)消息66821730(廣告投放) 點擊這里給我發(fā)消息41031197(編輯) 點擊這里給我發(fā)消息58733127(審核)
    本站提供的機電設備,機電供求等信息由機電企業(yè)自行提供,該企業(yè)負責信息內(nèi)容的真實性、準確性和合法性。
    機電之家對此不承擔任何保證責任,有侵犯您利益的地方請聯(lián)系機電之家,機電之家將及時作出處理。
    Copyright 2007 機電之家 Inc All Rights Reserved.機電之家-由機電一體化網(wǎng)更名-聲明
    電話:0571-87774297 傳真:0571-87774298
    杭州濱興科技有限公司提供技術支持

    主辦:杭州市高新區(qū)(濱江)機電一體化學會
    中國行業(yè)電子商務100強網(wǎng)站

    網(wǎng)站經(jīng)營許可證:浙B2-20080178-1