+ -
当前位置:首页 → 问答吧 → 分享一下我写的异步文件I/O的框架(改进版)欢迎大家拍砖

分享一下我写的异步文件I/O的框架(改进版)欢迎大家拍砖

时间:2010-07-21

来源:互联网

本帖最后由 cookis 于 2010-07-21 00:32 编辑

这是我做CDN系统写的一个独立的框架,不是项目,所以仅做参考,

前一阵翻了一下glibc 中的aio实现,竟然是用 pthread_create 为每一个aio_cb 创建一个dispatched线程,而且有好多条件变量和互斥量,
都是些用户层的调用,感觉不是很高效,所以自己尝试写了一个,经过稳定性测试,效果还不错。

特性
1. 不同于glibc 中 aio的频繁创建销毁线程,而是采用固定的线程池(也可在运行时动态增加)
2. 每个文件句柄有一个优先级,每个句柄关联的任务也有一个优先级,可按优先级进行排序处理
3. 设计原则本着使用者对同一文件访问多次时只打开一次,共享使用一个句柄,这样框架按照句柄归类,线程可按句柄提取任务,不必进行互斥
4. 相同优先级的同一个文件句柄的不同任务按照 I/O的偏移量来排序,减少磁头不必要的移动,更充分地利用vfs的预读功能
5. 采用数组+双向链表+单向链表结构,能够快速定位,快速插入、删除。
6. 内置一个小型的对象池,避免频繁分配小内存。

asynch_file_io.rar (4.1 KB)
下载次数: 8
2010-07-21 00:14


cpp的代码直接贴出来吧

vim set sw=2 ts=2 cinoptions={0,:0,g0,l1,t0,(0
  1. #include "ndk/asynch_file_io.h"

  2. #include <limits.h>
  3. #include <cassert>
  4. #include <unistd.h>

  5. namespace ndk
  6. {
  7. void aio_opt_t::reset()
  8. {
  9.   this->handle_       = NDK_INVALID_HANDLE;
  10.   this->opcode_       = -1;
  11.   this->errcode_      = 0;
  12.   this->fd_prio_      = 0;
  13.   this->io_prio_      = 0;
  14.   this->id_           = -1;
  15.   this->i_nbytes_     = 0;
  16.   this->o_nbytes_     = 0;
  17.   this->offset_       = 0;
  18.   this->buffer_       = 0;
  19.   this->aio_handler_  = 0;
  20.   this->prev_         = 0;
  21.   this->next_         = 0;
  22.   this->ptr_          = 0;
  23.   this->header_       = 0;
  24. }
  25. int asynch_file_io::open(size_t thr_num)
  26. {
  27.   int size = ndk::max_handles();
  28.   if (size <= 0) return -1;

  29.   this->fd_pool_size_ = size;

  30.   this->fd_pool_ = new aio_opt_t*[this->fd_pool_size_];
  31.   for (int i = 0; i < this->fd_pool_size_; ++i)
  32.     this->fd_pool_[i] = 0;

  33.   return this->activate(ndk::thread::thr_join, thr_num);
  34. }
  35. int asynch_file_io::start_aio(ndk::ndk_handle handle,
  36.                               int *aio_id,
  37.                               size_t nbytes,
  38.                               uint64_t offset,
  39.                               char *buff,
  40.                               asynch_handler *handler,
  41.                               int optcode,
  42.                               int fd_priority,
  43.                               int io_priority)
  44. {
  45.   assert(nbytes > 0);
  46.   assert(handle != NDK_INVALID_HANDLE);
  47.   assert(handler != 0);
  48.   assert(optcode == AIO_READ || optcode == AIO_WRITE);
  49.   assert(handle >= 0 && handle < this->fd_pool_size_);
  50.   assert(aio_id != 0);

  51.   aio_opt_t *aioopt = this->alloc_aio_opt();
  52.   if (aioopt == NULL) return -1;

  53.   // guard
  54.   ndk::guard<ndk::thread_mutex> g(this->queue_list_mtx_);

  55.   int id = ++this->id_itor_;
  56.   if (this->id_itor_ == INT_MAX)
  57.     this->id_itor_ = 0;

  58.   // assign data struct.
  59.   aioopt->aio_handler_  = handler;
  60.   aioopt->handle_       = handle;
  61.   aioopt->i_nbytes_     = nbytes;
  62.   aioopt->buffer_       = buff;
  63.   aioopt->offset_       = offset;
  64.   aioopt->opcode_       = optcode;
  65.   aioopt->fd_prio_      = fd_priority;
  66.   aioopt->io_prio_      = io_priority;
  67.   aioopt->id_           = id;
  68.   //
  69.   *aio_id = id;

  70.   aio_opt_t *top_node = 0;
  71.   if (this->fd_pool_[handle] == 0)
  72.   {
  73.     // slot is empty.
  74.     this->fd_pool_[handle] = aioopt;
  75.     top_node = aioopt;

  76.     if (this->queue_list_ == 0)
  77.     {
  78.       this->queue_list_ = top_node;
  79.     }else
  80.     {
  81.       // keep order.
  82.       if (top_node->fd_prio_ > this->queue_list_->fd_prio_)
  83.       {
  84.         top_node->next_ = this->queue_list_;
  85.         this->queue_list_->prev_ = top_node;

  86.         this->queue_list_ = top_node;
  87.         this->queue_list_->prev_ = 0;
  88.       }else
  89.       {
  90.         aio_opt_t *itor = this->queue_list_;
  91.         while (itor->next_
  92.                && (top_node->fd_prio_ <= itor->next_->fd_prio_))
  93.           itor = itor->next_;

  94.         top_node->next_ = itor->next_;
  95.         top_node->prev_ = itor;
  96.         if (itor->next_)
  97.           itor->next_->prev_ = top_node;
  98.         itor->next_ = top_node;
  99.       }
  100.     } // end of `if (this->queue_list_ == 0)'
  101.   }else  // end of `if (this->fd_pool_[handle] == 0)'
  102.     // link to sub queue
  103.   {
  104.     assert(this->queue_list_ != 0);
  105.     top_node = this->fd_pool_[handle];
  106.     if (aioopt->io_prio_ > top_node->io_prio_)
  107.     {
  108.       // replace old top node.
  109.       aioopt->fd_prio_ = top_node->fd_prio_; // in order to keep fd sequence.
  110.       this->fd_pool_[handle] = aioopt;
  111.       aioopt->header_ = top_node;

  112.       // reset header.
  113.       if (this->queue_list_ == top_node)
  114.         this->queue_list_ = aioopt;

  115.       // insert new node.
  116.       aioopt->next_ = top_node->next_;
  117.       if (top_node->next_)
  118.         top_node->next_->prev_ = aioopt;
  119.       aioopt->prev_ = top_node->prev_;
  120.       if (top_node->prev_)
  121.         top_node->prev_->next_ = aioopt;

  122.       // handle sub queue.
  123.       top_node->next_ = top_node->header_;
  124.       top_node->header_ = 0;
  125.     }else  // append to sub queue
  126.     {
  127.       if (top_node->header_ == 0)
  128.       {
  129.         top_node->header_ = aioopt;
  130.       }else
  131.       {
  132.         if (aioopt->io_prio_ > top_node->header_->io_prio_)
  133.         {
  134.           aioopt->next_ = top_node->header_;
  135.           top_node->header_ = aioopt;
  136.         }else
  137.         {
  138.           aio_opt_t *itor = top_node->header_;
  139. #if 0
  140.           while (itor->next_
  141.                  && aioopt->io_prio_ <= itor->next_->io_prio_)
  142.             itor = itor->next_;
  143. #else
  144.           while (itor->next_)
  145.           {
  146.             if (aioopt->io_prio_ > itor->next_->io_prio_)
  147.               break;
  148.             if (aioopt->io_prio_ == itor->next_->io_prio_
  149.                 && aioopt->offset_ < itor->next_->offset_)
  150.               break;
  151.             itor = itor->next_;
  152.           }
  153. #endif
  154.           aioopt->next_ = itor->next_;
  155.           itor->next_ = aioopt;
  156.         }
  157.       }
  158.     }
  159.   }

  160.   this->not_empty_cond_.signal(); //

  161.   return id;
  162. }
  163. int asynch_file_io::cancel_aio(ndk::ndk_handle handle, int id)
  164. {
  165.   assert(handle != NDK_INVALID_HANDLE
  166.          && (handle > 0 && handle < this->fd_pool_size_)
  167.          && id > 0);

  168.   ndk::guard<ndk::thread_mutex> g(this->queue_list_mtx_);
  169.   if (this->fd_pool_[handle] == 0)
  170.   {
  171.     return this->find_in_running_list(id) == 0
  172.       ? AIO_CANCELED : AIO_NOTCANCELED;
  173.   }

  174.   //
  175.   aio_opt_t *top_node = this->fd_pool_[handle];

  176.   if (top_node->id_ == id)
  177.   {
  178.     if (top_node->header_ == 0)
  179.     {
  180.       if (top_node->prev_)
  181.         top_node->prev_->next_ = top_node->next_;
  182.       if (top_node->next_)
  183.         top_node->next_->prev_ = top_node->prev_;

  184.       if (this->queue_list_ == top_node)
  185.         this->queue_list_ = this->queue_list_->next_;

  186.       this->fd_pool_[handle] = 0;
  187.     }else // has sub queue
  188.     {
  189.       top_node->header_->fd_prio_ = top_node->fd_prio_;

  190.       this->fd_pool_[handle] = top_node->header_;
  191.       this->fd_pool_[handle]->header_ = top_node->header_->next_;

  192.       // insert new node.
  193.       if (top_node->prev_)
  194.         top_node->prev_->next_ = this->fd_pool_[handle];
  195.       this->fd_pool_[handle]->prev_ = top_node->prev_;
  196.       if (top_node->next_)
  197.         top_node->next_->prev_ = this->fd_pool_[handle];
  198.       this->fd_pool_[handle]->next_ = top_node->next_;

  199.       //
  200.       if (this->queue_list_ == top_node)
  201.         this->queue_list_ = top_node->header_;
  202.     }

  203.     // release.
  204.     this->free_aio_opt(top_node);
  205.   }else if (top_node->header_ != 0)
  206.   {
  207.     if (top_node->header_->id_ == id)
  208.     {
  209.       aio_opt_t *item = top_node->header_;
  210.       top_node->header_ = item->next_;
  211.       this->free_aio_opt(item);
  212.     }else
  213.     {
  214.       aio_opt_t *itor = top_node->header_;
  215.       for (; itor->next_ != 0; itor = itor->next_)
  216.       {
  217.         if (itor->next_->id_ == id)
  218.         {
  219.           aio_opt_t *item = itor->next_;
  220.           itor->next_ = item->next_;
  221.           this->free_aio_opt(item);
  222.           break;
  223.         }
  224.       }
  225.     }
  226.   }

  227.   return this->find_in_running_list(id) == 0 ?
  228.     AIO_CANCELED : AIO_NOTCANCELED;
  229. }
  230. int asynch_file_io::svc()
  231. {
  232.   while (1)
  233.   {
  234.     aio_opt_t *running_list = 0;
  235.     {
  236.       ndk::guard<ndk::thread_mutex> g(this->queue_list_mtx_);
  237.       while (this->queue_list_ == 0)
  238.         this->not_empty_cond_.wait(0);

  239.       running_list = this->pop_some_request(running_list, 5);

  240.       this->enqueue_to_running_list(running_list);
  241.     }
  242.     this->handle_aio_requests(running_list);
  243.   }
  244.   return 0;
  245. }
  246. aio_opt_t *asynch_file_io::pop_some_request(aio_opt_t *pop_list, int num)
  247. {
  248.   /**
  249.    * Pop out from the front of the queue some requests.
  250.    * If the slot of <fd> has a sub-queue, then pop all of this sub-queue.
  251.    */
  252.   aio_opt_t *pop_list_itor = 0;
  253.   aio_opt_t *top_node = this->queue_list_;

  254.   for (int i = 0; i < num && top_node != 0; ++i)
  255.   {
  256.     this->fd_pool_[top_node->handle_] = 0;
  257.     // link top node.
  258.     if (pop_list == 0)
  259.     {
  260.       pop_list = top_node;
  261.       pop_list_itor = pop_list;
  262.     }else
  263.     {
  264.       pop_list_itor->next_ = top_node;
  265.       pop_list_itor = pop_list_itor->next_;
  266.     }

  267.     // link sub queue.
  268.     aio_opt_t *node = top_node->header_;

  269.     /**
  270.      * Important !!!.
  271.      * because pop_list_itor point to top node and
  272.      * pop_list_itor->next_ will be repoint, so we must
  273.      * store next top node.
  274.      */
  275.     top_node = top_node->next_;

  276.     for (; node != 0; ++i, node = node->next_)
  277.     {
  278.       pop_list_itor->next_ = node;
  279.       pop_list_itor = pop_list_itor->next_;
  280.     }
  281.   }
  282.   this->queue_list_ = top_node;
  283.   if (this->queue_list_ != 0)
  284.     this->queue_list_->prev_ = 0;
  285.   if (pop_list_itor)
  286.     pop_list_itor->next_ = 0;

  287.   return pop_list;
  288. }
  289. void asynch_file_io::handle_aio_requests(aio_opt_t *running_list)
  290. {
  291.   for (aio_opt_t *aioopt = running_list; aioopt != 0; aioopt = aioopt->next_)
  292.   {
  293.     int result = 0;
  294.     void (asynch_handler::*callback)(const aio_opt_t *) = 0;
  295.     switch(aioopt->opcode_)
  296.     {
  297.     case AIO_READ:
  298.       do
  299.       {
  300.         result = ::pread(aioopt->handle_,
  301.                          (void*)aioopt->buffer_,
  302.                          aioopt->i_nbytes_,
  303.                          aioopt->offset_);
  304.       }while (result == -1 && errno == EINTR);
  305.       callback = &asynch_handler::handle_aio_read;
  306.       break;
  307.     case AIO_WRITE:
  308.       do
  309.       {
  310.         result = ::pwrite(aioopt->handle_,
  311.                           (const void*)aioopt->buffer_,
  312.                           aioopt->i_nbytes_,
  313.                           aioopt->offset_);
  314.       }while (result == -1 && errno == EINTR);
  315.       callback = &asynch_handler::handle_aio_write;
  316.       break;
  317.     default:
  318.       assert(0);
  319.     }
  320.     if (result >= 0)
  321.     {
  322.       aioopt->errcode_  = 0;
  323.       aioopt->o_nbytes_ = result;
  324.     }else
  325.     {
  326.       aioopt->errcode_  = errno;
  327.     }
  328.     (aioopt->aio_handler_->*callback)(aioopt);
  329.     this->dequeue_from_running_list(aioopt);
  330.   }
  331.   this->free_aio_opt_n(running_list);
  332. }
  333. void asynch_file_io::enqueue_to_running_list(aio_opt_t *running_list)
  334. {
  335.   assert(running_list != 0);
  336.   ndk::guard<ndk::thread_mutex> g(this->running_list_mtx_);

  337.   aio_opt_t *aioopt = 0;
  338.   if (this->running_list_ == 0)
  339.   {
  340.     aioopt = this->alloc_aio_opt_i(this->free_running_list_);
  341.     aioopt->ptr_ = running_list;
  342.     this->running_list_tail_ = aioopt;
  343.     this->running_list_ = aioopt;
  344.     running_list = running_list->next_;
  345.   }

  346.   aio_opt_t *itor = running_list;
  347.   for (; itor != 0; itor = itor->next_)
  348.   {
  349.     aioopt = this->alloc_aio_opt_i(this->free_running_list_);
  350.     aioopt->ptr_ = itor;
  351.     this->running_list_tail_->next_ = aioopt;
  352.     this->running_list_tail_ = aioopt;
  353.   }
  354. }
  355. void asynch_file_io::dequeue_from_running_list(aio_opt_t *aioopt)
  356. {
  357.   ndk::guard<ndk::thread_mutex> g(this->running_list_mtx_);
  358.   assert(this->running_list_ != 0);

  359.   aio_opt_t *itor = this->running_list_;
  360.   aio_opt_t *free_ptr = 0;
  361.   if (itor->ptr_ == aioopt)
  362.   {
  363.     free_ptr = itor;
  364.     this->running_list_ = itor->next_;
  365.   }else
  366.   {
  367.     for (; itor->next_ != 0; itor = itor->next_)
  368.     {
  369.       if (itor->next_->ptr_ == aioopt)
  370.       {
  371.         if (itor->next_ == this->running_list_tail_)
  372.           this->running_list_tail_ = itor;
  373.         free_ptr = itor->next_;
  374.         itor->next_ = itor->next_->next_;
  375.         break;
  376.       }
  377.     }
  378.   }
  379.   assert(free_ptr != 0);
  380.   free_ptr->next_ = 0;
  381.   this->free_aio_opt_i(free_ptr, this->free_running_list_);

  382.   if (this->running_list_ == 0
  383.       || this->running_list_->next_ == 0)
  384.     this->running_list_tail_ = this->running_list_;
  385. }
  386. int asynch_file_io::find_in_running_list(int id)
  387. {
  388.   ndk::guard<ndk::thread_mutex> g(this->running_list_mtx_);
  389.   if (this->running_list_ == 0)
  390.     return 0;

  391.   aio_opt_t *itor = this->running_list_;
  392.   for (; itor != 0; itor = itor->next_)
  393.     if (itor->ptr_->id_ == id) return -1;
  394.   return 0;
  395. }
  396. void asynch_file_io::free_aio_opt_n(aio_opt_t *p)
  397. {
  398.   ndk::guard<ndk::thread_mutex> g(this->free_list_mtx_);
  399.   aio_opt_t *tail = p;
  400.   --this->queue_list_size_;
  401.   while (tail->next_)
  402.   {
  403.     tail = tail->next_;
  404.     --this->queue_list_size_;
  405.   }
  406.   tail->next_ = this->free_list_;
  407.   this->free_list_ = p;
  408. }
  409. #ifdef NDK_DUMP
  410. void asynch_file_io::dump()
  411. {
  412. }
  413. #endif
  414. } // namespace ndk
复制代码

作者: cookis   发布时间: 2010-07-21

貌似源代码不全吧?

作者: rain_fish   发布时间: 2010-07-21

回复 rain_fish


    对,只是一个框架,并不是一个完整的项目,有些依赖的代码,但我想会linux编程的应该都能看懂

作者: cookis   发布时间: 2010-07-21



QUOTE:
回复  rain_fish


    对,只是一个框架,并不是一个完整的项目,有些依赖的代码,但我想会linux编程的 ...
cookis 发表于 2010-07-21 09:48




    lz不厚道。。。

作者: rain_fish   发布时间: 2010-07-21

回复 rain_fish


    好吧,看这里 http://code.google.com/p/cute-ndk/

作者: cookis   发布时间: 2010-07-21



QUOTE:
回复  rain_fish


    好吧,看这里
cookis 发表于 2010-07-21 10:39




    呵呵,

作者: rain_fish   发布时间: 2010-07-21



QUOTE:
回复  rain_fish


    好吧,看这里
cookis 发表于 2010-07-21 10:39




    lz攒的宝贝还真不少啊。。。

作者: rain_fish   发布时间: 2010-07-21

顶下菊花。。

作者: peidright   发布时间: 2010-07-21