博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
epoll 多线程 服务器
阅读量:5265 次
发布时间:2019-06-14

本文共 19107 字,大约阅读时间需要 63 分钟。

ExpandedBlockStart.gif
服务器
  1 
//
 
  2 
//
 a simple agi server using epoll in linux
  3 
//
 
  4 
//
 2010-12-20
  5 
//
 by nsy
  6 
//
  7 
#include 
<
sys
/
socket.h
>
  8 
#include 
<
sys
/
epoll.h
>
  9 
#include 
<
netinet
/
in
.h
>
 10 
#include 
<
arpa
/
inet.h
>
 11 
#include 
<
fcntl.h
>
 12 
#include 
<
unistd.h
>
 13 
#include 
<
stdio.h
>
 14 
#include 
<
stdlib.h
>
 15 
#include 
<
errno.h
>
 16 
#include 
<
string
.h
>
 17 
#include 
"
CallSvr.h
"
 18 
#include 
<
pthread.h
>
 19 
#include 
"
epoll.h
"
 20 
 21 
//
test
 22 
#include 
"
msg.h
"
 23 
 24 
//
 set event
 25 
void
 EventSet(
struct
 myevent_s 
*
ev, 
int
 fd,
int
 status)
 26 
{
 27 
  ev
->
fd 
=
 fd;
 28 
  ev
->
status 
=
 status;
 29 
  ev
->
last_active 
=
 time(NULL);
 30 
  fprintf(stderr,
"
function=%s,line=%d,fd=%d\n
"
,__func__,__LINE__,fd);
 31 
}
 32 
//
 add/mod an event to epoll
 33 
void
 EventAdd(
int
 epollFd, 
int
 events,
struct
 myevent_s 
*
ev)
 34 
{
 35 
  
struct
 epoll_event epv 
=
 {
0
, {
0
}};
 36 
  
int
 op;
 37 
  epv.data.ptr 
=
 ev;
 38 
  epv.events 
=
 events;
 39 
  
if
(ev
->
status 
==
 
1
){
 40 
    op 
=
 EPOLL_CTL_MOD;
 41 
    fprintf(stderr,
"
mod:function=%s,line=%d,fd=%d,status=%d\n
"
,__func__,__LINE__,ev
->
fd,ev
->
status);
 42 
  }
 43 
  
else
{
 44 
    op 
=
 EPOLL_CTL_ADD;
 45 
    ev
->
status 
=
 
1
;
 46 
    fprintf(stderr,
"
add:function=%s,line=%d,fd=%d,status=%d\n
"
,__func__,__LINE__,ev
->
fd,ev
->
status);
 47 
  }
 48 
  
if
(epoll_ctl(epollFd, op, ev
->
fd, 
&
epv) 
<
 
0
)
 49 
    {
 50 
      fprintf(stderr,
"
failed:function=%s,line=%d,fd=%d:errno=%d\n
"
,__func__,__LINE__,ev
->
fd,errno);
 51 
    }
 52 
  
else
 53 
    {
 54 
      fprintf(stderr,
"
success:function=%s,line=%d,fd=%d\n
"
,__func__,__LINE__,ev
->
fd);
 55 
    }
 56 
}
 57 
//
 delete an event from epoll
 58 
void
 EventDel(
int
 epollFd,
struct
 myevent_s 
*
ev)
 59 
{
 60 
  
struct
 epoll_event epv 
=
 {
0
, {
0
}};
 61 
  
if
(ev
->
status 
!=
 
1
return
;
 62 
  epv.data.ptr 
=
 ev;
 63 
  ev
->
status 
=
 
0
;
 64 
  epoll_ctl(epollFd, EPOLL_CTL_DEL, ev
->
fd, 
&
epv);
 65 
  fprintf(stderr,
"
function=%s,line=%d,fd=%d\n
"
,__func__,__LINE__,ev
->
fd);
 66 
}
 67 
 68 
//
 receive data
 69 
void
 RecvData(
struct
 myevent_s 
*
ev)
 70 
{
 71 
  msg_header header;
 72 
  
int
 recvbytes;
 73 
  
if
 ((recvbytes
=
recv(ev
->
fd, 
&
header, 
sizeof
(msg_header), 
0
)) 
==-
1
 74 
    {
 75 
      fprintf(stderr,
"
RecvHeaderErr:function=%s,line=%d,fd=%d\n
"
,__func__,__LINE__,ev
->
fd);
 76 
      
goto
 errret;
 77 
    }
 78 
  
if
(recvbytes 
==
 
sizeof
(msg_header))
 79 
    {
 80 
      
switch
(header.msg_type)
 81 
    {
 82 
    
case
 msg_lost:
 83 
      {
 84 
        rq_lost rq;
 85 
        
if
 ((recvbytes
=
recv(ev
->
fd, ((
char
*
)(
&
rq))
+
sizeof
(msg_header), 
sizeof
(rq_lost)
-
sizeof
(msg_header), 
0
)) 
==-
1
)
 86 
          {
 87 
        fprintf(stderr,
"
RecvAfter:function=%s,line=%d,fd=%d\n
"
,__func__,__LINE__,ev
->
fd);
 88 
        
goto
 errret;
 89 
          }
 90 
        
else
 
if
(recvbytes 
>
 
0
)
 91 
          {                
 92 
        printf(
"
recv sucess%d,hope to recv %d\n
"
,recvbytes,
sizeof
(rq_lost)
-
sizeof
(msg_header));
 93 
        printf(
"
cardno is '%s'\n
"
,rq.cardno);
 94 
        printf(
"
password is '%s'\n
"
,rq.password);
 95 
    
 96 
        
//
init ev recv
 97 
        memcpy(
&
ev
->
header,
&
header,
sizeof
(msg_header));
 98 
        memcpy(
&
ev
->
recv_buff,
&
rq,
sizeof
(rq_lost));
 99 
        ev
->
recv_len 
=
 recvbytes;
100 
        fprintf(stderr,
"
addtologicqueue:function=%s,line=%d,fd=%d\n
"
,__func__,__LINE__,ev
->
fd);
101 
        
//
add to logic queue
102 
        sem_wait(
&
bin_sem_logic_data_produce);
103 
        
struct
 QUEUE_LOGIC_DATA_ITEM 
*
item;
104 
        item 
=
 malloc(
sizeof
(
struct
 QUEUE_LOGIC_DATA_ITEM));
105 
        item
->
ev 
=
 ev;
106 
        pthread_mutex_lock(
&
queue_logic_data_mutex);
107 
        TAILQ_INSERT_TAIL(
&
queue_logic_data_header,item,logic_data_entries);
108 
        pthread_mutex_unlock(
&
queue_logic_data_mutex);
109 
        sem_post(
&
bin_sem_logic_data_consume);
110 
        fprintf(stderr,
"
addtologicqueue--end:function=%s,line=%d,fd=%d\n
"
,__func__,__LINE__,ev
->
fd);
111 
          }
else
112 
          {
113 
        fprintf(stderr,
"
RecvAfter:function=%s,line=%d,fd=%d:errno=%d\n
"
,__func__,__LINE__,ev
->
fd,errno);
114 
        
goto
 errret;
115 
          }
116 
        
break
;
117 
      }
118 
    }
119 
      
return
;
//
switch end function end
120 
    }
else
121 
    {
122 
      fprintf(stderr,
"
function=%s,line=%d,fd=%d\n
"
,__func__,__LINE__,ev
->
fd);
123 
      
goto
 errret;
124 
    }
125 
 errret:
126 
  EventDel(g_epollFd, ev);
127 
  close(ev
->
fd);
128 
}
129 
//
 send data
130 
void
 SendData(
struct
 myevent_s 
*
ev)
131 
{
132 
  fprintf(stderr,
"
JustIn:function=%s,line=%d,fd=%d\n
"
,__func__,__LINE__,ev
->
fd);
133 
  
int
 len;
134 
  
//
 send data
135 
  len 
=
 send(ev
->
fd, ev
->
send_buff, ev
->
send_len, 
0
);
136 
  ev
->
send_len 
=
 
0
;
137 
  fprintf(stderr,
"
sendlen=%d:function=%s,line=%d,fd=%d\n
"
,len,__func__,__LINE__,ev
->
fd);
138 
  
if
(len 
<
 
0
)   
139 
    {
140 
      close(ev
->
fd);
141 
      fprintf(stderr,
"
err=%d:function=%s,line=%d,fd=%d\n
"
,errno,__func__,__LINE__,ev
->
fd);
142 
    }
else
143 
    {
144 
      
//
let system known can recv
145 
      EventAdd(g_epollFd, EPOLLIN
|
EPOLLET,ev);
146 
    }
147 
}
148 
149 
void
 
*
accept_thread_work(
void
 
*
arg)
150 
{
151 
  
while
(
1
)
152 
    {
153 
      
int
*
 plistenFd 
=
 (
int
*
)arg;
154 
      fprintf(stderr,
"
function=%s,line=%d,listenfd=%d\n
"
,__func__,__LINE__,
*
plistenFd);
155 
      
struct
 sockaddr_in sin;
156 
      socklen_t len 
=
 
sizeof
(
struct
 sockaddr_in);
157 
      
int
 nfd, i;
158 
      
//
 accept
159 
      
if
((nfd 
=
 accept(
*
plistenFd, (
struct
 sockaddr
*
)
&
sin, 
&
len)) 
==
 
-
1
)
160 
    {
161 
      
if
(errno 
!=
 EAGAIN 
&&
 errno 
!=
 EINTR)
162 
        {
163 
          fprintf(stderr,
"
%s: bad accept
"
, __func__);
164 
        }
165 
      
continue
;
166 
    }
167 
      
do
168 
    {
169 
      
for
(i 
=
 
0
; i 
<
 MAX_EVENTS; i
++
)
170 
        {
171 
          
if
(g_Events[i].status 
==
 
0
)
172 
        {
173 
          fprintf(stderr,
"
function=%s,line=%d,listenfd=%d,currentindex=%d\n
"
,__func__,__LINE__,
*
plistenFd,i);
174 
          
break
;
175 
        }
176 
        }
177 
      
if
(i 
==
 MAX_EVENTS)
178 
        {
179 
          fprintf(stderr,
"
max events:function=%s,line=%d,listenFd=%d\n
"
,__func__,__LINE__,
*
plistenFd);
180 
          
break
;
181 
        }
182 
      
//
 set nonblocking
183 
      fprintf(stderr,
"
set nonblocking:function=%s,line=%d,listenfd=%d\n
"
,__func__,__LINE__,
*
plistenFd);
184 
      
if
(fcntl(nfd, F_SETFL, O_NONBLOCK) 
<
 
0
break
;
185 
      
//
 add a read event for receive data
186 
      EventSet(
&
g_Events[i], nfd,
0
);
187 
      EventAdd(g_epollFd, EPOLLIN
|
EPOLLET, 
&
g_Events[i]);
188 
      fprintf(stderr,
"
new conn[%s:%d][time:%d]\n
"
, inet_ntoa(sin.sin_addr), ntohs(sin.sin_port),(
int
) g_Events[i].last_active);
189 
    }
while
(
0
);
190 
    }
191 
  
return
 NULL;
192 
}
193 
194 
void
 
*
epoll_wait_thread_work(
void
 
*
arg)
195 
{
196 
  fprintf(stderr,
"
justin:function=%s,line=%d\n
"
,__func__,__LINE__);
197 
  
//
 event loop
198 
  
struct
 epoll_event events[MAX_EVENTS];
199 
200 
  
int
 checkPos 
=
 
0
;
201 
  
while
(
1
){
202 
    
//
 a simple timeout check here, every time 100, better to use a mini-heap, and add timer event
203 
    
long
 now 
=
 time(NULL);
204 
    
int
 i;
205 
    
for
(i 
=
 
0
; i 
<
 
100
; i
++
, checkPos
++
//
 doesn't check listen fd
206 
      {
207 
    
if
(checkPos 
==
 MAX_EVENTS) checkPos 
=
 
0
//
 recycle
208 
    
if
(g_Events[checkPos].status 
!=
 
1
continue
;
209 
    
long
 duration 
=
 now 
-
 g_Events[checkPos].last_active;
210 
    
if
(duration 
>=
 
60
//
 60s timeout
211 
      {
212 
        close(g_Events[checkPos].fd);
213 
        fprintf(stderr,
"
[fd=%d] timeout[%d--%d].\n
"
,(
int
) g_Events[checkPos].fd,(
int
) g_Events[checkPos].last_active, (
int
)now);
214 
        EventDel(g_epollFd, 
&
g_Events[checkPos]);
215 
      }
216 
      }
217 
    
//
 wait for events to happen
218 
    
int
 fds 
=
 epoll_wait(g_epollFd, events, MAX_EVENTS, 
1000
);
219 
    
if
(fds 
<
 
0
){
220 
      fprintf(stderr,
"
epoll_wait error, exit\n
"
);
221 
      
break
;
222 
    }
223 
    
for
(i 
=
 
0
; i 
<
 fds; i
++
){
224 
      
struct
 myevent_s 
*
ev 
=
 (
struct
 myevent_s
*
)events[i].data.ptr;
225 
      
if
(events[i].events
&
EPOLLIN) 
//
 read event
226 
    {
227 
      sem_wait(
&
bin_sem_recv_fd_produce);
228 
      fprintf(stderr,
"
readEvent:function=%s,line=%d:fd=%d\n
"
,__func__,__LINE__,ev
->
fd);
229 
      
//
ev->call_back(ev->fd, events[i].events, ev->arg);
230 
      
struct
 QUEUE_RECV_FD_ITEM 
*
item;
231 
      item 
=
 malloc(
sizeof
(
struct
 QUEUE_RECV_FD_ITEM));
232 
      item
->
ev 
=
 ev;
233 
      pthread_mutex_lock(
&
queue_recv_fd_mutex);
234 
      TAILQ_INSERT_TAIL(
&
queue_recv_fd_header,item,recv_fd_entries);
235 
      pthread_mutex_unlock(
&
queue_recv_fd_mutex);
236 
      sem_post(
&
bin_sem_recv_fd_consume);
237 
    }
else
 
if
(events[i].events
&
EPOLLOUT) 
//
 write event
238 
    {
239 
      sem_post(
&
bin_sem_send_fd_consume);
240 
      fprintf(stderr,
"
post send fd consume:function=%s,line=%d:fd=%d\n
"
,__func__,__LINE__,ev
->
fd);
241 
    }
242 
    }
243 
  }
244 
  
return
 NULL;
245 
}
246 
247 
void
 
*
recv_data_thread_work(
void
 
*
arg)
248 
{
249 
  
while
(
1
)
250 
    {
251 
      sem_wait(
&
bin_sem_recv_fd_consume);
252 
      fprintf(stderr,
"
justin:function=%s,line=%d\n
"
,__func__,__LINE__);
253 
      
int
 index 
=
 (
int
)arg;
254 
      fprintf(stderr,
"
recv thread id is %d\n
"
,index);
255 
      pthread_mutex_lock(
&
queue_recv_fd_mutex);
256 
      
struct
 QUEUE_RECV_FD_ITEM 
*
item;
257 
      item 
=
 TAILQ_FIRST(
&
queue_recv_fd_header);
258 
      TAILQ_REMOVE(
&
queue_recv_fd_header,item,recv_fd_entries); 
259 
      pthread_mutex_unlock(
&
queue_recv_fd_mutex);
260 
      RecvData(item
->
ev);
261 
    }
262 
  
return
 NULL;
263 
}
264 
265 
void
 
*
send_data_thread_work(
void
 
*
arg)
266 
{
267 
  
while
(
1
)
268 
    {  
269 
      sem_wait(
&
bin_sem_send_fd_consume);
270 
      fprintf(stderr,
"
justin:function=%s,line=%d\n
"
,__func__,__LINE__);
271 
      pthread_mutex_lock(
&
queue_send_fd_mutex);
272 
      
struct
 QUEUE_SEND_FD_ITEM 
*
item;
273 
      item 
=
 TAILQ_FIRST(
&
queue_send_fd_header);
274 
      TAILQ_REMOVE(
&
queue_send_fd_header,item,send_fd_entries); 
275 
      pthread_mutex_unlock(
&
queue_send_fd_mutex);
276 
      SendData(item
->
ev);
277 
    }
278 
  
return
 NULL;
279 
}
280 
281 
void
 
*
logic_data_thread_work(
void
 
*
arg)
282 
{
283 
  
while
(
1
)
284 
    {
285 
      
//
remove logic queue
286 
      sem_wait(
&
bin_sem_logic_data_consume);
287 
      
//
for test
288 
      
int
 index 
=
 (
int
)arg;
289 
      fprintf(stderr,
"
logic thread id is %d\n
"
,index);
290 
291 
      pthread_mutex_lock(
&
queue_logic_data_mutex);
292 
      
struct
 QUEUE_LOGIC_DATA_ITEM 
*
item;
293 
      item 
=
 TAILQ_FIRST(
&
queue_logic_data_header);
294 
      TAILQ_REMOVE(
&
queue_logic_data_header,item,logic_data_entries); 
295 
      pthread_mutex_unlock(
&
queue_logic_data_mutex);
296 
      
//
logic header
297 
      
switch
(item
->
ev
->
header.msg_type)
298 
    {
299 
    
case
 msg_lost:
300 
      {
301 
        rq_lost
*
 rq 
=
 (rq_lost
*
)item
->
ev
->
recv_buff;
302 
        
303 
        rs_lost rs;
304 
        rs.header.msg_type 
=
 msg_lost;
305 
        rs.header.size 
=
 
sizeof
(rs_lost);
306 
        rs.header.length 
=
 
0
;
307 
308 
        
if
(strcmp(rq
->
cardno,
"
12345
"
)
==
0
)
309 
          {
310 
        rs.is_ok 
=
 
1
;                        
311 
          }
312 
        
else
313 
          {
314 
        rs.is_ok 
=
 
0
;
315 
          }
316 
        memcpy(
&
item
->
ev
->
header,
&
rs.header,
sizeof
(msg_header));
317 
        item
->
ev
->
send_len 
=
 
sizeof
(rs);
318 
        memcpy(item
->
ev
->
send_buff,
&
rs,
sizeof
(rs));
319 
        
break
;
320 
      }
321 
    }
322 
   
323 
      
//
add to send fd queue
324 
      sem_wait(
&
bin_sem_send_fd_produce);
325 
      fprintf(stderr,
"
after wait send fd produce\n
"
);
326 
      
struct
 QUEUE_SEND_FD_ITEM 
*
sendItem;
327 
      sendItem 
=
 malloc(
sizeof
(
struct
 QUEUE_SEND_FD_ITEM));
328 
      sendItem
->
ev 
=
 item
->
ev;
329 
      pthread_mutex_lock(
&
queue_send_fd_mutex);
330 
      TAILQ_INSERT_TAIL(
&
queue_send_fd_header,sendItem,send_fd_entries);
331 
      pthread_mutex_unlock(
&
queue_send_fd_mutex);
332 
      
//
let system known can send
333 
      EventAdd(g_epollFd, EPOLLOUT
|
EPOLLET, item
->
ev);
334 
    }
335 
  
return
 NULL;
336 
}
337 
338 
int
 main(
int
 argc, 
char
 
**
argv)
339 
{
340 
  
int
 res;
341 
  
//
recv fd queue
342 
  TAILQ_INIT(
&
queue_recv_fd_header);
343 
  res 
=
 sem_init(
&
bin_sem_recv_fd_consume,
0
,
0
);
344 
  
if
(res)
345 
    {
346 
      fprintf(stderr,
"
sem init consume failed\n
"
);
347 
      exit(EXIT_FAILURE);
348 
    }
349 
350 
  res 
=
 sem_init(
&
bin_sem_recv_fd_produce,
0
,MAX_EVENTS);
351 
  
if
(res)
352 
    {
353 
      fprintf(stderr,
"
sem init produce failed\n
"
);
354 
      exit(EXIT_FAILURE);
355 
    }
356 
357 
  res 
=
 pthread_mutex_init(
&
queue_recv_fd_mutex,NULL);
358 
  
if
(res
!=
0
)
359 
    {
360 
      perror(
"
create mutex for queue recv failed\n
"
);
361 
      exit(EXIT_FAILURE);
362 
    }
363 
  
//
logic data queue
364 
  TAILQ_INIT(
&
queue_logic_data_header);
365 
  res 
=
 sem_init(
&
bin_sem_logic_data_consume,
0
,
0
);
366 
  
if
(res)
367 
    {
368 
      fprintf(stderr,
"
sem init logic data consume failed\n
"
);
369 
      exit(EXIT_FAILURE);
370 
    }
371 
372 
  res 
=
 sem_init(
&
bin_sem_logic_data_produce,
0
,MAX_EVENTS);
373 
  
if
(res)
374 
    {
375 
      fprintf(stderr,
"
sem init logic data produce failed\n
"
);
376 
      exit(EXIT_FAILURE);
377 
    }
378 
379 
  res 
=
 pthread_mutex_init(
&
queue_logic_data_mutex,NULL);
380 
  
if
(res
!=
0
)
381 
    {
382 
      perror(
"
create mutex for queue logic data failed\n
"
);
383 
      exit(EXIT_FAILURE);
384 
    }
385 
386 
  
//
send fd queue
387 
  TAILQ_INIT(
&
queue_send_fd_header);
388 
  res 
=
 sem_init(
&
bin_sem_send_fd_consume,
0
,
0
);
389 
  
if
(res)
390 
    {
391 
      fprintf(stderr,
"
sem init send fd consume failed\n
"
);
392 
      exit(EXIT_FAILURE);
393 
    }
394 
395 
  res 
=
 sem_init(
&
bin_sem_send_fd_produce,
0
,MAX_EVENTS);
396 
  
if
(res)
397 
    {
398 
      fprintf(stderr,
"
sem init send fd produce failed\n
"
);
399 
      exit(EXIT_FAILURE);
400 
    }
401 
402 
  res 
=
 pthread_mutex_init(
&
queue_send_fd_mutex,NULL);
403 
  
if
(res
!=
0
)
404 
    {
405 
      perror(
"
create mutex for queue send fd failed\n
"
);
406 
      exit(EXIT_FAILURE);
407 
    }
408 
409 
  
short
 port 
=
 
3342
//
 default port
410 
  
if
(argc 
==
 
2
){
411 
    port 
=
 atoi(argv[
1
]);
412 
  }
413 
  
//
 create epoll
414 
  g_epollFd 
=
 epoll_create(MAX_EVENTS);
415 
  
if
(g_epollFd 
<=
 
0
416 
    {
417 
      fprintf(stderr,
"
create epoll failed:fd=%d:function=%s,line=%d\n
"
, g_epollFd,__func__,__LINE__);
418 
      exit(EXIT_FAILURE);
419 
    }
420 
  
//
 create & bind listen socket
421 
  
int
 listenFd 
=
 socket(AF_INET, SOCK_STREAM, 
0
);
422 
  
//
 bind & listen
423 
  
struct
 sockaddr_in sin;
424 
  bzero(
&
sin, 
sizeof
(sin));
425 
  sin.sin_family 
=
 AF_INET;
426 
  sin.sin_addr.s_addr 
=
 INADDR_ANY;
427 
  sin.sin_port 
=
 htons(port);
428 
  bind(listenFd, (
const
 
struct
 sockaddr
*
)
&
sin, 
sizeof
(sin));
429 
  listen(listenFd, 
5
);
430 
  fprintf(stderr,
"
server running:port[%d]\n
"
, port);
431 
  
//
create accept thread
432 
433 
  
void
 
*
thread_result;
434 
  pthread_t accept_t;  
435 
  res 
=
 pthread_create(
&
accept_t,NULL,accept_thread_work,(
void
 
*
)
&
listenFd);
436 
  
if
(res 
!=
 
0
)
437 
    {
438 
      perror(
"
accept create failed\n
"
);
439 
      exit(EXIT_FAILURE);
440 
    }
441 
442 
  
//
create epoll wait thread
443 
  pthread_t epoll_wait_t;
444 
  res 
=
 pthread_create(
&
epoll_wait_t,NULL,epoll_wait_thread_work,NULL);
445 
  
if
(res 
!=
 
0
)
446 
    {
447 
      perror(
"
create epoll wait thread failed\n
"
);
448 
      exit(EXIT_FAILURE);
449 
    }
450 
  
//
create two recv data thread
451 
  pthread_t recv_data_t;
452 
  res 
=
 pthread_create(
&
recv_data_t,NULL,recv_data_thread_work,(
void
*
)
1
);
453 
  
if
(res
!=
0
)
454 
    {
455 
      perror(
"
create recv data thread failed\n
"
);
456 
      exit(EXIT_FAILURE);
457 
    }
458 
459 
  pthread_t recv_data_t_1;
460 
  res 
=
 pthread_create(
&
recv_data_t_1,NULL,recv_data_thread_work,(
void
*
)
2
);
461 
  
if
(res
!=
0
)
462 
    {
463 
      perror(
"
create recv data thread failed\n
"
);
464 
      exit(EXIT_FAILURE);
465 
    }
466 
  
//
create two send data thread
467 
  pthread_t send_data_t;
468 
  res 
=
 pthread_create(
&
send_data_t,NULL,send_data_thread_work,(
void
*
)
1
);
469 
  
if
(res
!=
0
)
470 
    {
471 
      perror(
"
create send data thread failed\n
"
);
472 
      exit(EXIT_FAILURE);
473 
    }
474 
475 
  pthread_t send_data_t_1;
476 
  res 
=
 pthread_create(
&
send_data_t_1,NULL,send_data_thread_work,(
void
*
)
2
);
477 
  
if
(res
!=
0
)
478 
    {
479 
      perror(
"
create send data thread failed\n
"
);
480 
      exit(EXIT_FAILURE);
481 
    }
482 
483 
  
//
create two logic work thread
484 
  pthread_t logic_work_t;
485 
  res 
=
 pthread_create(
&
logic_work_t,NULL,logic_data_thread_work,(
void
*
)
1
);
486 
  
if
(res
!=
0
)
487 
    {
488 
      perror(
"
create logic work thread failed\n
"
);
489 
      exit(EXIT_FAILURE);
490 
    }
491 
492 
 pthread_t logic_work_t_1;
493 
  res 
=
 pthread_create(
&
logic_work_t_1,NULL,logic_data_thread_work,(
void
*
)
2
);
494 
  
if
(res
!=
0
)
495 
    {
496 
      perror(
"
create logic work thread failed\n
"
);
497 
      exit(EXIT_FAILURE);
498 
    }
499 
500 
  
//
wait child thread
501 
  res 
=
 pthread_join(accept_t,
&
thread_result);
502 
  
if
(res
!=
0
)
503 
    {
504 
      perror(
"
accept thread join failed\n
"
);
505 
      exit(EXIT_FAILURE);
506 
    }
507 
508 
  
//
wait child thread
509 
  res 
=
 pthread_join(epoll_wait_t,
&
thread_result);
510 
  
if
(res
!=
0
)
511 
    {
512 
      perror(
"
epoll wait thread join failed\n
"
);
513 
      exit(EXIT_FAILURE);
514 
    }
515 
516 
  
//
wait child thread
517 
  res 
=
 pthread_join(recv_data_t,
&
thread_result);
518 
  
if
(res
!=
0
)
519 
    {
520 
      perror(
"
recv data thread join failed\n
"
);
521 
      exit(EXIT_FAILURE);      
522 
    }
523 
  
//
wait child thread
524 
  res 
=
 pthread_join(recv_data_t_1,
&
thread_result);
525 
  
if
(res
!=
0
)
526 
    {
527 
      perror(
"
recv data thread join failed\n
"
);
528 
      exit(EXIT_FAILURE);      
529 
    }
530 
531 
  
//
wait child thread
532 
  res 
=
 pthread_join(send_data_t,
&
thread_result);
533 
  
if
(res
!=
0
)
534 
    {
535 
      perror(
"
send data thread join failed\n
"
);
536 
      exit(EXIT_FAILURE);      
537 
    }
538 
  
//
wait child thread
539 
  res 
=
 pthread_join(send_data_t_1,
&
thread_result);
540 
  
if
(res
!=
0
)
541 
    {
542 
      perror(
"
send data thread join failed\n
"
);
543 
      exit(EXIT_FAILURE);      
544 
    }
545 
  
//
wait child thread
546 
  res 
=
 pthread_join(logic_work_t,
&
thread_result);
547 
  
if
(res
!=
0
)
548 
    {
549 
      perror(
"
logic work thread join failed\n
"
);
550 
      exit(EXIT_FAILURE);      
551 
    }
552 
  
//
wait child thread
553 
  res 
=
 pthread_join(logic_work_t_1,
&
thread_result);
554 
  
if
(res
!=
0
)
555 
    {
556 
      perror(
"
logic work thread join failed\n
"
);
557 
      exit(EXIT_FAILURE);      
558 
    }
559 
  
//
 free resource
560 
  close(g_epollFd);
561 
  sem_destroy(
&
bin_sem_recv_fd_consume);
562 
  sem_destroy(
&
bin_sem_recv_fd_produce);
563 
  pthread_mutex_destroy(
&
queue_recv_fd_mutex);
564 
565 
  sem_destroy(
&
bin_sem_logic_data_consume);
566 
  sem_destroy(
&
bin_sem_logic_data_produce);
567 
  pthread_mutex_destroy(
&
queue_logic_data_mutex);
568 
569 
  sem_destroy(
&
bin_sem_send_fd_consume);
570 
  sem_destroy(
&
bin_sem_send_fd_produce);
571 
  pthread_mutex_destroy(
&
queue_send_fd_mutex);
572 
  
return
 
0
;
573
ExpandedBlockStart.gif
服务器头
 1 
#ifndef _epoll_h_
 2 
#define
 _epoll_h_
 3 
 4 
#include 
"
sys/queue.h
"
 5 
#include 
<
semaphore.h
>
 6 
#include 
"
msg.h
"
 7 
 8 
#define
 MAX_EVENTS 500
 9 
10 
int
 g_epollFd;
11 
12 
void
 
*
accept_thread_work(
void
 
*
arg);
13 
void
 
*
epoll_wait_thread_work(
void
 
*
arg);
14 
void
 
*
recv_data_thread_work(
void
 
*
arg);
15 
void
 
*
send_data_thread_work(
void
 
*
arg);
16 
void
 
*
logic_data_thread_work(
void
 
*
arg);
17 
18 
struct
 myevent_s
19 
{
20 
  
int
 fd;
21 
  
int
 status; 
//
 1: in epoll wait list, 0 not in
22 
  msg_header header;
23 
  
char
 recv_buff[
256
]; 
//
 recv data buffer
24 
  
int
 recv_len;
25 
  
char
 send_buff[
256
];
//
send data buffer
26 
  
int
 send_len;
27 
  
long
 last_active; 
//
 last active time
28 
};
29 
30 
struct
 myevent_s g_Events[MAX_EVENTS
+
1
]; 
//
 g_Events[MAX_EVENTS] is used by listen fd
31 
32 
//
recv fd queue
33 
struct
 QUEUE_RECV_FD_ITEM{
34 
  
struct
 myevent_s
*
 ev;
35 
  TAILQ_ENTRY(QUEUE_RECV_FD_ITEM) recv_fd_entries;
36 
};
37 
38 
TAILQ_HEAD(,QUEUE_RECV_FD_ITEM) queue_recv_fd_header;
39 
40 
sem_t bin_sem_recv_fd_produce;
41 
sem_t bin_sem_recv_fd_consume;
42 
43 
pthread_mutex_t queue_recv_fd_mutex;
44 
45 
//
send fd queue
46 
struct
 QUEUE_SEND_FD_ITEM{
47 
  
struct
 myevent_s
*
 ev;
48 
  TAILQ_ENTRY(QUEUE_SEND_FD_ITEM) send_fd_entries;
49 
};
50 
51 
TAILQ_HEAD(,QUEUE_SEND_FD_ITEM) queue_send_fd_header;
52 
53 
sem_t bin_sem_send_fd_produce;
54 
sem_t bin_sem_send_fd_consume;
55 
56 
pthread_mutex_t queue_send_fd_mutex;
57 
58 
//
logic data buff
59 
struct
 QUEUE_LOGIC_DATA_ITEM{
60 
  
struct
 myevent_s
*
 ev;
61 
  TAILQ_ENTRY(QUEUE_LOGIC_DATA_ITEM) logic_data_entries;
62 
};
63 
64 
TAILQ_HEAD(,QUEUE_LOGIC_DATA_ITEM) queue_logic_data_header;
65 
66 
sem_t bin_sem_logic_data_produce;
67 
sem_t bin_sem_logic_data_consume;
68 
69 
pthread_mutex_t queue_logic_data_mutex;
70 
71 
#endif
72 

 

 
}

 

转载于:https://www.cnblogs.com/nanshouyong326/archive/2010/12/21/1912894.html

你可能感兴趣的文章
go:channel(未完)
查看>>
[JS]递归对象或数组
查看>>
LeetCode(17) - Letter Combinations of a Phone Number
查看>>
Linux查找命令对比(find、locate、whereis、which、type、grep)
查看>>
路由器外接硬盘做nas可行吗?
查看>>
python:从迭代器,到生成器,再到协程的示例代码
查看>>
Java多线程系列——原子类的实现(CAS算法)
查看>>
在Ubuntu下配置Apache多域名服务器
查看>>
多线程《三》进程与线程的区别
查看>>
linux sed命令
查看>>
html标签的嵌套规则
查看>>
[Source] Machine Learning Gathering/Surveys
查看>>
HTML <select> 标签
查看>>
类加载机制
查看>>
tju 1782. The jackpot
查看>>
湖南多校对抗赛(2015.03.28) H SG Value
查看>>
hdu1255扫描线计算覆盖两次面积
查看>>
hdu1565 用搜索代替枚举找可能状态或者轮廓线解(较优),参考poj2411
查看>>
bzoj3224 splay板子
查看>>
程序存储问题
查看>>