服务器
1 // 2 // a simple agi server using epoll in linux 3 // 4 // 2010-12-20 5 // by nsy 6 // 7 #include < sys / socket.h > 8 #include < sys / epoll.h > 9 #include < netinet / in .h > 10 #include < arpa / inet.h > 11 #include < fcntl.h > 12 #include < unistd.h > 13 #include < stdio.h > 14 #include < stdlib.h > 15 #include < errno.h > 16 #include < string .h > 17 #include " CallSvr.h " 18 #include < pthread.h > 19 #include " epoll.h " 20 21 // test 22 #include " msg.h " 23 24 // set event 25 void EventSet( struct myevent_s * ev, int fd, int status) 26 { 27 ev -> fd = fd; 28 ev -> status = status; 29 ev -> last_active = time(NULL); 30 fprintf(stderr, " function=%s,line=%d,fd=%d\n " ,__func__,__LINE__,fd); 31 } 32 // add/mod an event to epoll 33 void EventAdd( int epollFd, int events, struct myevent_s * ev) 34 { 35 struct epoll_event epv = { 0 , { 0 }}; 36 int op; 37 epv.data.ptr = ev; 38 epv.events = events; 39 if (ev -> status == 1 ){ 40 op = EPOLL_CTL_MOD; 41 fprintf(stderr, " mod:function=%s,line=%d,fd=%d,status=%d\n " ,__func__,__LINE__,ev -> fd,ev -> status); 42 } 43 else { 44 op = EPOLL_CTL_ADD; 45 ev -> status = 1 ; 46 fprintf(stderr, " add:function=%s,line=%d,fd=%d,status=%d\n " ,__func__,__LINE__,ev -> fd,ev -> status); 47 } 48 if (epoll_ctl(epollFd, op, ev -> fd, & epv) < 0 ) 49 { 50 fprintf(stderr, " failed:function=%s,line=%d,fd=%d:errno=%d\n " ,__func__,__LINE__,ev -> fd,errno); 51 } 52 else 53 { 54 fprintf(stderr, " success:function=%s,line=%d,fd=%d\n " ,__func__,__LINE__,ev -> fd); 55 } 56 } 57 // delete an event from epoll 58 void EventDel( int epollFd, struct myevent_s * ev) 59 { 60 struct epoll_event epv = { 0 , { 0 }}; 61 if (ev -> status != 1 ) return ; 62 epv.data.ptr = ev; 63 ev -> status = 0 ; 64 epoll_ctl(epollFd, EPOLL_CTL_DEL, ev -> fd, & epv); 65 fprintf(stderr, " function=%s,line=%d,fd=%d\n " ,__func__,__LINE__,ev -> fd); 66 } 67 68 // receive data 69 void RecvData( struct myevent_s * ev) 70 { 71 msg_header header; 72 int recvbytes; 73 if ((recvbytes = recv(ev -> fd, & header, sizeof (msg_header), 0 )) ==- 1 ) 74 { 75 fprintf(stderr, " RecvHeaderErr:function=%s,line=%d,fd=%d\n " ,__func__,__LINE__,ev -> fd); 76 goto errret; 77 } 78 if (recvbytes == sizeof (msg_header)) 79 { 80 switch (header.msg_type) 81 { 82 case msg_lost: 83 { 84 rq_lost rq; 85 if ((recvbytes = recv(ev -> fd, (( char * )( & rq)) + sizeof (msg_header), sizeof (rq_lost) - sizeof (msg_header), 0 )) ==- 1 ) 86 { 87 fprintf(stderr, " RecvAfter:function=%s,line=%d,fd=%d\n " ,__func__,__LINE__,ev -> fd); 88 goto errret; 89 } 90 else if (recvbytes > 0 ) 91 { 92 printf( " recv sucess%d,hope to recv %d\n " ,recvbytes, sizeof (rq_lost) - sizeof (msg_header)); 93 printf( " cardno is '%s'\n " ,rq.cardno); 94 printf( " password is '%s'\n " ,rq.password); 95 96 // init ev recv 97 memcpy( & ev -> header, & header, sizeof (msg_header)); 98 memcpy( & ev -> recv_buff, & rq, sizeof (rq_lost)); 99 ev -> recv_len = recvbytes; 100 fprintf(stderr, " addtologicqueue:function=%s,line=%d,fd=%d\n " ,__func__,__LINE__,ev -> fd); 101 // add to logic queue 102 sem_wait( & bin_sem_logic_data_produce); 103 struct QUEUE_LOGIC_DATA_ITEM * item; 104 item = malloc( sizeof ( struct QUEUE_LOGIC_DATA_ITEM)); 105 item -> ev = ev; 106 pthread_mutex_lock( & queue_logic_data_mutex); 107 TAILQ_INSERT_TAIL( & queue_logic_data_header,item,logic_data_entries); 108 pthread_mutex_unlock( & queue_logic_data_mutex); 109 sem_post( & bin_sem_logic_data_consume); 110 fprintf(stderr, " addtologicqueue--end:function=%s,line=%d,fd=%d\n " ,__func__,__LINE__,ev -> fd); 111 } else 112 { 113 fprintf(stderr, " RecvAfter:function=%s,line=%d,fd=%d:errno=%d\n " ,__func__,__LINE__,ev -> fd,errno); 114 goto errret; 115 } 116 break ; 117 } 118 } 119 return ; // switch end function end 120 } else 121 { 122 fprintf(stderr, " function=%s,line=%d,fd=%d\n " ,__func__,__LINE__,ev -> fd); 123 goto errret; 124 } 125 errret: 126 EventDel(g_epollFd, ev); 127 close(ev -> fd); 128 } 129 // send data 130 void SendData( struct myevent_s * ev) 131 { 132 fprintf(stderr, " JustIn:function=%s,line=%d,fd=%d\n " ,__func__,__LINE__,ev -> fd); 133 int len; 134 // send data 135 len = send(ev -> fd, ev -> send_buff, ev -> send_len, 0 ); 136 ev -> send_len = 0 ; 137 fprintf(stderr, " sendlen=%d:function=%s,line=%d,fd=%d\n " ,len,__func__,__LINE__,ev -> fd); 138 if (len < 0 ) 139 { 140 close(ev -> fd); 141 fprintf(stderr, " err=%d:function=%s,line=%d,fd=%d\n " ,errno,__func__,__LINE__,ev -> fd); 142 } else 143 { 144 // let system known can recv 145 EventAdd(g_epollFd, EPOLLIN | EPOLLET,ev); 146 } 147 } 148 149 void * accept_thread_work( void * arg) 150 { 151 while ( 1 ) 152 { 153 int * plistenFd = ( int * )arg; 154 fprintf(stderr, " function=%s,line=%d,listenfd=%d\n " ,__func__,__LINE__, * plistenFd); 155 struct sockaddr_in sin; 156 socklen_t len = sizeof ( struct sockaddr_in); 157 int nfd, i; 158 // accept 159 if ((nfd = accept( * plistenFd, ( struct sockaddr * ) & sin, & len)) == - 1 ) 160 { 161 if (errno != EAGAIN && errno != EINTR) 162 { 163 fprintf(stderr, " %s: bad accept " , __func__); 164 } 165 continue ; 166 } 167 do 168 { 169 for (i = 0 ; i < MAX_EVENTS; i ++ ) 170 { 171 if (g_Events[i].status == 0 ) 172 { 173 fprintf(stderr, " function=%s,line=%d,listenfd=%d,currentindex=%d\n " ,__func__,__LINE__, * plistenFd,i); 174 break ; 175 } 176 } 177 if (i == MAX_EVENTS) 178 { 179 fprintf(stderr, " max events:function=%s,line=%d,listenFd=%d\n " ,__func__,__LINE__, * plistenFd); 180 break ; 181 } 182 // set nonblocking 183 fprintf(stderr, " set nonblocking:function=%s,line=%d,listenfd=%d\n " ,__func__,__LINE__, * plistenFd); 184 if (fcntl(nfd, F_SETFL, O_NONBLOCK) < 0 ) break ; 185 // add a read event for receive data 186 EventSet( & g_Events[i], nfd, 0 ); 187 EventAdd(g_epollFd, EPOLLIN | EPOLLET, & g_Events[i]); 188 fprintf(stderr, " new conn[%s:%d][time:%d]\n " , inet_ntoa(sin.sin_addr), ntohs(sin.sin_port),( int ) g_Events[i].last_active); 189 } while ( 0 ); 190 } 191 return NULL; 192 } 193 194 void * epoll_wait_thread_work( void * arg) 195 { 196 fprintf(stderr, " justin:function=%s,line=%d\n " ,__func__,__LINE__); 197 // event loop 198 struct epoll_event events[MAX_EVENTS]; 199 200 int checkPos = 0 ; 201 while ( 1 ){ 202 // a simple timeout check here, every time 100, better to use a mini-heap, and add timer event 203 long now = time(NULL); 204 int i; 205 for (i = 0 ; i < 100 ; i ++ , checkPos ++ ) // doesn't check listen fd 206 { 207 if (checkPos == MAX_EVENTS) checkPos = 0 ; // recycle 208 if (g_Events[checkPos].status != 1 ) continue ; 209 long duration = now - g_Events[checkPos].last_active; 210 if (duration >= 60 ) // 60s timeout 211 { 212 close(g_Events[checkPos].fd); 213 fprintf(stderr, " [fd=%d] timeout[%d--%d].\n " ,( int ) g_Events[checkPos].fd,( int ) g_Events[checkPos].last_active, ( int )now); 214 EventDel(g_epollFd, & g_Events[checkPos]); 215 } 216 } 217 // wait for events to happen 218 int fds = epoll_wait(g_epollFd, events, MAX_EVENTS, 1000 ); 219 if (fds < 0 ){ 220 fprintf(stderr, " epoll_wait error, exit\n " ); 221 break ; 222 } 223 for (i = 0 ; i < fds; i ++ ){ 224 struct myevent_s * ev = ( struct myevent_s * )events[i].data.ptr; 225 if (events[i].events & EPOLLIN) // read event 226 { 227 sem_wait( & bin_sem_recv_fd_produce); 228 fprintf(stderr, " readEvent:function=%s,line=%d:fd=%d\n " ,__func__,__LINE__,ev -> fd); 229 // ev->call_back(ev->fd, events[i].events, ev->arg); 230 struct QUEUE_RECV_FD_ITEM * item; 231 item = malloc( sizeof ( struct QUEUE_RECV_FD_ITEM)); 232 item -> ev = ev; 233 pthread_mutex_lock( & queue_recv_fd_mutex); 234 TAILQ_INSERT_TAIL( & queue_recv_fd_header,item,recv_fd_entries); 235 pthread_mutex_unlock( & queue_recv_fd_mutex); 236 sem_post( & bin_sem_recv_fd_consume); 237 } else if (events[i].events & EPOLLOUT) // write event 238 { 239 sem_post( & bin_sem_send_fd_consume); 240 fprintf(stderr, " post send fd consume:function=%s,line=%d:fd=%d\n " ,__func__,__LINE__,ev -> fd); 241 } 242 } 243 } 244 return NULL; 245 } 246 247 void * recv_data_thread_work( void * arg) 248 { 249 while ( 1 ) 250 { 251 sem_wait( & bin_sem_recv_fd_consume); 252 fprintf(stderr, " justin:function=%s,line=%d\n " ,__func__,__LINE__); 253 int index = ( int )arg; 254 fprintf(stderr, " recv thread id is %d\n " ,index); 255 pthread_mutex_lock( & queue_recv_fd_mutex); 256 struct QUEUE_RECV_FD_ITEM * item; 257 item = TAILQ_FIRST( & queue_recv_fd_header); 258 TAILQ_REMOVE( & queue_recv_fd_header,item,recv_fd_entries); 259 pthread_mutex_unlock( & queue_recv_fd_mutex); 260 RecvData(item -> ev); 261 } 262 return NULL; 263 } 264 265 void * send_data_thread_work( void * arg) 266 { 267 while ( 1 ) 268 { 269 sem_wait( & bin_sem_send_fd_consume); 270 fprintf(stderr, " justin:function=%s,line=%d\n " ,__func__,__LINE__); 271 pthread_mutex_lock( & queue_send_fd_mutex); 272 struct QUEUE_SEND_FD_ITEM * item; 273 item = TAILQ_FIRST( & queue_send_fd_header); 274 TAILQ_REMOVE( & queue_send_fd_header,item,send_fd_entries); 275 pthread_mutex_unlock( & queue_send_fd_mutex); 276 SendData(item -> ev); 277 } 278 return NULL; 279 } 280 281 void * logic_data_thread_work( void * arg) 282 { 283 while ( 1 ) 284 { 285 // remove logic queue 286 sem_wait( & bin_sem_logic_data_consume); 287 // for test 288 int index = ( int )arg; 289 fprintf(stderr, " logic thread id is %d\n " ,index); 290 291 pthread_mutex_lock( & queue_logic_data_mutex); 292 struct QUEUE_LOGIC_DATA_ITEM * item; 293 item = TAILQ_FIRST( & queue_logic_data_header); 294 TAILQ_REMOVE( & queue_logic_data_header,item,logic_data_entries); 295 pthread_mutex_unlock( & queue_logic_data_mutex); 296 // logic header 297 switch (item -> ev -> header.msg_type) 298 { 299 case msg_lost: 300 { 301 rq_lost * rq = (rq_lost * )item -> ev -> recv_buff; 302 303 rs_lost rs; 304 rs.header.msg_type = msg_lost; 305 rs.header.size = sizeof (rs_lost); 306 rs.header.length = 0 ; 307 308 if (strcmp(rq -> cardno, " 12345 " ) == 0 ) 309 { 310 rs.is_ok = 1 ; 311 } 312 else 313 { 314 rs.is_ok = 0 ; 315 } 316 memcpy( & item -> ev -> header, & rs.header, sizeof (msg_header)); 317 item -> ev -> send_len = sizeof (rs); 318 memcpy(item -> ev -> send_buff, & rs, sizeof (rs)); 319 break ; 320 } 321 } 322 323 // add to send fd queue 324 sem_wait( & bin_sem_send_fd_produce); 325 fprintf(stderr, " after wait send fd produce\n " ); 326 struct QUEUE_SEND_FD_ITEM * sendItem; 327 sendItem = malloc( sizeof ( struct QUEUE_SEND_FD_ITEM)); 328 sendItem -> ev = item -> ev; 329 pthread_mutex_lock( & queue_send_fd_mutex); 330 TAILQ_INSERT_TAIL( & queue_send_fd_header,sendItem,send_fd_entries); 331 pthread_mutex_unlock( & queue_send_fd_mutex); 332 // let system known can send 333 EventAdd(g_epollFd, EPOLLOUT | EPOLLET, item -> ev); 334 } 335 return NULL; 336 } 337 338 int main( int argc, char ** argv) 339 { 340 int res; 341 // recv fd queue 342 TAILQ_INIT( & queue_recv_fd_header); 343 res = sem_init( & bin_sem_recv_fd_consume, 0 , 0 ); 344 if (res) 345 { 346 fprintf(stderr, " sem init consume failed\n " ); 347 exit(EXIT_FAILURE); 348 } 349 350 res = sem_init( & bin_sem_recv_fd_produce, 0 ,MAX_EVENTS); 351 if (res) 352 { 353 fprintf(stderr, " sem init produce failed\n " ); 354 exit(EXIT_FAILURE); 355 } 356 357 res = pthread_mutex_init( & queue_recv_fd_mutex,NULL); 358 if (res != 0 ) 359 { 360 perror( " create mutex for queue recv failed\n " ); 361 exit(EXIT_FAILURE); 362 } 363 // logic data queue 364 TAILQ_INIT( & queue_logic_data_header); 365 res = sem_init( & bin_sem_logic_data_consume, 0 , 0 ); 366 if (res) 367 { 368 fprintf(stderr, " sem init logic data consume failed\n " ); 369 exit(EXIT_FAILURE); 370 } 371 372 res = sem_init( & bin_sem_logic_data_produce, 0 ,MAX_EVENTS); 373 if (res) 374 { 375 fprintf(stderr, " sem init logic data produce failed\n " ); 376 exit(EXIT_FAILURE); 377 } 378 379 res = pthread_mutex_init( & queue_logic_data_mutex,NULL); 380 if (res != 0 ) 381 { 382 perror( " create mutex for queue logic data failed\n " ); 383 exit(EXIT_FAILURE); 384 } 385 386 // send fd queue 387 TAILQ_INIT( & queue_send_fd_header); 388 res = sem_init( & bin_sem_send_fd_consume, 0 , 0 ); 389 if (res) 390 { 391 fprintf(stderr, " sem init send fd consume failed\n " ); 392 exit(EXIT_FAILURE); 393 } 394 395 res = sem_init( & bin_sem_send_fd_produce, 0 ,MAX_EVENTS); 396 if (res) 397 { 398 fprintf(stderr, " sem init send fd produce failed\n " ); 399 exit(EXIT_FAILURE); 400 } 401 402 res = pthread_mutex_init( & queue_send_fd_mutex,NULL); 403 if (res != 0 ) 404 { 405 perror( " create mutex for queue send fd failed\n " ); 406 exit(EXIT_FAILURE); 407 } 408 409 short port = 3342 ; // default port 410 if (argc == 2 ){ 411 port = atoi(argv[ 1 ]); 412 } 413 // create epoll 414 g_epollFd = epoll_create(MAX_EVENTS); 415 if (g_epollFd <= 0 ) 416 { 417 fprintf(stderr, " create epoll failed:fd=%d:function=%s,line=%d\n " , g_epollFd,__func__,__LINE__); 418 exit(EXIT_FAILURE); 419 } 420 // create & bind listen socket 421 int listenFd = socket(AF_INET, SOCK_STREAM, 0 ); 422 // bind & listen 423 struct sockaddr_in sin; 424 bzero( & sin, sizeof (sin)); 425 sin.sin_family = AF_INET; 426 sin.sin_addr.s_addr = INADDR_ANY; 427 sin.sin_port = htons(port); 428 bind(listenFd, ( const struct sockaddr * ) & sin, sizeof (sin)); 429 listen(listenFd, 5 ); 430 fprintf(stderr, " server running:port[%d]\n " , port); 431 // create accept thread 432 433 void * thread_result; 434 pthread_t accept_t; 435 res = pthread_create( & accept_t,NULL,accept_thread_work,( void * ) & listenFd); 436 if (res != 0 ) 437 { 438 perror( " accept create failed\n " ); 439 exit(EXIT_FAILURE); 440 } 441 442 // create epoll wait thread 443 pthread_t epoll_wait_t; 444 res = pthread_create( & epoll_wait_t,NULL,epoll_wait_thread_work,NULL); 445 if (res != 0 ) 446 { 447 perror( " create epoll wait thread failed\n " ); 448 exit(EXIT_FAILURE); 449 } 450 // create two recv data thread 451 pthread_t recv_data_t; 452 res = pthread_create( & recv_data_t,NULL,recv_data_thread_work,( void * ) 1 ); 453 if (res != 0 ) 454 { 455 perror( " create recv data thread failed\n " ); 456 exit(EXIT_FAILURE); 457 } 458 459 pthread_t recv_data_t_1; 460 res = pthread_create( & recv_data_t_1,NULL,recv_data_thread_work,( void * ) 2 ); 461 if (res != 0 ) 462 { 463 perror( " create recv data thread failed\n " ); 464 exit(EXIT_FAILURE); 465 } 466 // create two send data thread 467 pthread_t send_data_t; 468 res = pthread_create( & send_data_t,NULL,send_data_thread_work,( void * ) 1 ); 469 if (res != 0 ) 470 { 471 perror( " create send data thread failed\n " ); 472 exit(EXIT_FAILURE); 473 } 474 475 pthread_t send_data_t_1; 476 res = pthread_create( & send_data_t_1,NULL,send_data_thread_work,( void * ) 2 ); 477 if (res != 0 ) 478 { 479 perror( " create send data thread failed\n " ); 480 exit(EXIT_FAILURE); 481 } 482 483 // create two logic work thread 484 pthread_t logic_work_t; 485 res = pthread_create( & logic_work_t,NULL,logic_data_thread_work,( void * ) 1 ); 486 if (res != 0 ) 487 { 488 perror( " create logic work thread failed\n " ); 489 exit(EXIT_FAILURE); 490 } 491 492 pthread_t logic_work_t_1; 493 res = pthread_create( & logic_work_t_1,NULL,logic_data_thread_work,( void * ) 2 ); 494 if (res != 0 ) 495 { 496 perror( " create logic work thread failed\n " ); 497 exit(EXIT_FAILURE); 498 } 499 500 // wait child thread 501 res = pthread_join(accept_t, & thread_result); 502 if (res != 0 ) 503 { 504 perror( " accept thread join failed\n " ); 505 exit(EXIT_FAILURE); 506 } 507 508 // wait child thread 509 res = pthread_join(epoll_wait_t, & thread_result); 510 if (res != 0 ) 511 { 512 perror( " epoll wait thread join failed\n " ); 513 exit(EXIT_FAILURE); 514 } 515 516 // wait child thread 517 res = pthread_join(recv_data_t, & thread_result); 518 if (res != 0 ) 519 { 520 perror( " recv data thread join failed\n " ); 521 exit(EXIT_FAILURE); 522 } 523 // wait child thread 524 res = pthread_join(recv_data_t_1, & thread_result); 525 if (res != 0 ) 526 { 527 perror( " recv data thread join failed\n " ); 528 exit(EXIT_FAILURE); 529 } 530 531 // wait child thread 532 res = pthread_join(send_data_t, & thread_result); 533 if (res != 0 ) 534 { 535 perror( " send data thread join failed\n " ); 536 exit(EXIT_FAILURE); 537 } 538 // wait child thread 539 res = pthread_join(send_data_t_1, & thread_result); 540 if (res != 0 ) 541 { 542 perror( " send data thread join failed\n " ); 543 exit(EXIT_FAILURE); 544 } 545 // wait child thread 546 res = pthread_join(logic_work_t, & thread_result); 547 if (res != 0 ) 548 { 549 perror( " logic work thread join failed\n " ); 550 exit(EXIT_FAILURE); 551 } 552 // wait child thread 553 res = pthread_join(logic_work_t_1, & thread_result); 554 if (res != 0 ) 555 { 556 perror( " logic work thread join failed\n " ); 557 exit(EXIT_FAILURE); 558 } 559 // free resource 560 close(g_epollFd); 561 sem_destroy( & bin_sem_recv_fd_consume); 562 sem_destroy( & bin_sem_recv_fd_produce); 563 pthread_mutex_destroy( & queue_recv_fd_mutex); 564 565 sem_destroy( & bin_sem_logic_data_consume); 566 sem_destroy( & bin_sem_logic_data_produce); 567 pthread_mutex_destroy( & queue_logic_data_mutex); 568 569 sem_destroy( & bin_sem_send_fd_consume); 570 sem_destroy( & bin_sem_send_fd_produce); 571 pthread_mutex_destroy( & queue_send_fd_mutex); 572 return 0 ; 573
服务器头
1 #ifndef _epoll_h_ 2 #define _epoll_h_ 3 4 #include " sys/queue.h " 5 #include < semaphore.h > 6 #include " msg.h " 7 8 #define MAX_EVENTS 500 9 10 int g_epollFd; 11 12 void * accept_thread_work( void * arg); 13 void * epoll_wait_thread_work( void * arg); 14 void * recv_data_thread_work( void * arg); 15 void * send_data_thread_work( void * arg); 16 void * logic_data_thread_work( void * arg); 17 18 struct myevent_s 19 { 20 int fd; 21 int status; // 1: in epoll wait list, 0 not in 22 msg_header header; 23 char recv_buff[ 256 ]; // recv data buffer 24 int recv_len; 25 char send_buff[ 256 ]; // send data buffer 26 int send_len; 27 long last_active; // last active time 28 }; 29 30 struct myevent_s g_Events[MAX_EVENTS + 1 ]; // g_Events[MAX_EVENTS] is used by listen fd 31 32 // recv fd queue 33 struct QUEUE_RECV_FD_ITEM{ 34 struct myevent_s * ev; 35 TAILQ_ENTRY(QUEUE_RECV_FD_ITEM) recv_fd_entries; 36 }; 37 38 TAILQ_HEAD(,QUEUE_RECV_FD_ITEM) queue_recv_fd_header; 39 40 sem_t bin_sem_recv_fd_produce; 41 sem_t bin_sem_recv_fd_consume; 42 43 pthread_mutex_t queue_recv_fd_mutex; 44 45 // send fd queue 46 struct QUEUE_SEND_FD_ITEM{ 47 struct myevent_s * ev; 48 TAILQ_ENTRY(QUEUE_SEND_FD_ITEM) send_fd_entries; 49 }; 50 51 TAILQ_HEAD(,QUEUE_SEND_FD_ITEM) queue_send_fd_header; 52 53 sem_t bin_sem_send_fd_produce; 54 sem_t bin_sem_send_fd_consume; 55 56 pthread_mutex_t queue_send_fd_mutex; 57 58 // logic data buff 59 struct QUEUE_LOGIC_DATA_ITEM{ 60 struct myevent_s * ev; 61 TAILQ_ENTRY(QUEUE_LOGIC_DATA_ITEM) logic_data_entries; 62 }; 63 64 TAILQ_HEAD(,QUEUE_LOGIC_DATA_ITEM) queue_logic_data_header; 65 66 sem_t bin_sem_logic_data_produce; 67 sem_t bin_sem_logic_data_consume; 68 69 pthread_mutex_t queue_logic_data_mutex; 70 71 #endif 72
}