分享

intel e1000 网卡 napi分析 — Windows Live

 jijo 2009-05-05


 
 内核如何从网卡接收数据,传统的过程:
1.数据到达网卡;
2.网卡产生一个中断给内核;
3.内核使用I/O指令,从网卡I/O区域中去读取数据;
 
我们在许多网卡驱动中(很老那些),都可以在网卡的中断函数中见到这一过程。
 
但是,这一种方法,有一种重要的问题,就是大流量的数据来到,网卡会产生大量的中断,内核在中断上下文中,会浪费大量的资源来处理中断本身。所以,就有一个问题,“可不可以不使用中断”,这就是轮询技术,所谓NAPI技术,说来也不神秘,就是说,内核屏蔽中断,然后隔一会儿就去问网卡,“你有没有数据啊?”……
 
从这个描述本身可以看到,如果数据量少,轮询同样占用大量的不必要的CPU资源,大家各有所长吧
 
OK,另一个问题,就是从网卡的I/O区域,包括I/O寄存器或I/O内存中去读取数据,这都要CPU去读,也要占用CPU资源,“CPU从I/O区域读,然后把它放到内存(这个内存指的是系统本身的物理内存,跟外设的内存不相干,也叫主内存)中”。于是自然地,就想到了DMA技术——让网卡直接从主内存之间读写它们的I/O数据,CPU,这儿不干你事,自己找乐子去:
1.首先,内核在主内存中为收发数据建立一个环形的缓冲队列(通常叫DMA环形缓冲区)。
2.内核将这个缓冲区通过DMA映射,把这个队列交给网卡;
3.网卡收到数据,就直接放进这个环形缓冲区了——也就是直接放进主内存了;然后,向系统产生一个中断;
4.内核收到这个中断,就取消DMA映射,这样,内核就直接从主内存中读取数据;
 
——呵呵,这一个过程比传统的过程少了不少工作,因为设备直接把数据放进了主内存,不需要CPU的干预,效率是不是提高不少?
 
对应以上4步,来看它的具体实现:
1)分配环形DMA缓冲区
Linux内核中,用skb来描述一个缓存,所谓分配,就是建立一定数量的skb,然后用e1000_rx_ring 环形缓冲区队列描述符连接起来
2)建立DMA映射
内核通过调用
dma_map_single(struct device *dev,void *buffer,size_t size,enum dma_data_direction direction)
建立映射关系。
struct device *dev  描述一个设备;
buffer:把哪个地址映射给设备;也就是某一个skb——要映射全部,当然是做一个双向链表的循环即可;
size:缓存大小;
direction:映射方向——谁传给谁:一般来说,是“双向”映射,数据在设备和内存之间双向流动;
对于PCI设备而言(网卡一般是PCI的),通过另一个包裹函数pci_map_single,这样,就把buffer交给设备了!设备可以直接从里边读/取数据。
3)这一步由硬件完成;
4)取消映射
dma_unmap_single,对PCI而言,大多调用它的包裹函数pci_unmap_single,不取消的话,缓存控制权还在设备手里,要调用它,把主动权掌握在CPU手里——因为我们已经接收到数据了,应该由CPU把数据交给上层网络栈;当然,不取消之前,通常要读一些状态位信息,诸如此类,一般是调用dma_sync_single_for_cpu()让CPU在取消映射前,就可以访问DMA缓冲区中的内容。 
 
 
原代码分析
基于linux v2.6.26
 
//e1000_probe 网卡初始化 (重点关注两部分 1注册poll函数 2设置接收缓冲的大小)
static int __devinit e1000_probe(struct pci_dev *pdev,const struct pci_device_id *ent){
      struct net_device *netdev;
      struct e1000_adapter *adapter;
      ....
      err=pci_enable_device(pdev);   
      ...
      err=pci_set_dma_mask(pdev,DMA_64BIT_MASK);    //设置pci设备的dma掩码
      ...
      netdev = alloc_etherdev(sizeof(struct e1000_adapter)); //为e1000网卡对应的net_device结构分配内存
      ...
      pci_set_drvdata(pdev,netdev);
      adapter=netdev_priv(netdev);
      adapter->netdev=netdev;
      adapter->pdev=pdev;
      ...
      mmio_start = pci_resource_start(pdev,0);
      mmio_len = pci_resource_len(pdev,0);
      ....
      /*将e1000网卡驱动的相应函数注册到net_device中*/
      netdev->open = &e1000_open;
      netdev->stop = &e1000_close;
      ...
      netif_napi_add(netdev,&adapter->napi,e1000_clean,64);    // 注册poll函数为e1000_clean, weight为64
      ...
      netdev->mem_start = mmio_start;
      netdev->mem_end = mmio_start+mmio_len;
      ....
      if(e1000e_read_mac_addr(&adapter->hw))   ndev_err(...);   //从网卡设备的EEPROM中读取mac地址
      memcpy(netdev->dev_addr, adapter->hw.mac.addr, netdev->addr_len);
      memcpy(netdev->perm_addr, adapter->hw.mac.addr, netdev->addr_len);
      ....
      adapter->rx_ring->count = 256;  //设置接收环型缓冲区队列的缺省大小
      ...
      e1000_reset(adapter);
      ...
      strcpy(netdev->name,"eth%d");
      err= register_netdev(netdev); //将当前网络设备注册到系统的dev_base[]设备数组当中
      ....
      return 0;
 }
e1000_open 各种数据结构初始化 (环形缓冲区队列的初始化)
static int e1000_open(struct net_device *netdev){
        struct e1000_adapter *adapter = netdev_priv(netdev);
        ....
        err = e1000_setup_all_rx_resoures(adapter)   //预先分配缓冲区资源
        ....
        err = e1000_request_irq(adapter); //分配irq中断
        ....
}
int e1000_setup_all_rx_resources(struct e1000_adapter *adapter){
        int i,err=0;
        for(i=0 ; i<adapter->num_rx_queues ; i++){
             err = e1000_setup_rx_resources(adapter,&adapter->rx_ring[i]);
             if(err){
                 ...
             }
        }
        return err;
}
e1000_rx_ring 环形缓冲区队列(接收缓冲队列由多个描述符组成,每个描述符中都包含一个缓冲区buffer,该buffer以dma方式存放数据包,整个缓冲队列以环形排列 每个描述符都有一个状态变量以表示该缓冲区buffer是否可以被新到的数据包覆盖)
struct e1000_rx_ring{
        void *desc;    //指向该环形缓冲区
        dma_addr_t dma;      //dma物理地址
        unsigned int size;       
        unsigned int count;     //环形队列由多少个描述符组成,这个在probe中定义了
        unsigned int next_to_use;    //下一个可使用的描述符号
        unsigned int next_to_clean;  //该描述符状态(是否正在使用,是否脏)
       struct e1000_buffer *buffer_info;   //缓冲区buffer
       ...
}
struct e1000_buffer{
        struct sk_buff *skb;     
        ....
}
static int e1000_setup_rx_resources(struct e1000_adapt *adapter, struct e1000_rx_ring *rxdr){
        struct pci_dev  *pdev = adapter->pdev;
        int size,desc_len;
        size = sizeof(struct e1000_buffer) * rxdr->count;
        rxdr->buffer_info = vmalloc(size);
        memset(rxdr->buffer_info,0,size);        //分配buffer所使用的内存
        ....
        if(adapter->hw.mac_type <= e1000_82547_rec_2)
                 desc_len = sizeof(struct e1000_rx_desc);
        else ....
       
        rxdr->size = rxdr->count * desc_len;
        rxdr->size = ALIGN(rxdr->size,4096);
        rxdr->desc = pci_alloc_consistent(pdev,rxdr->size,&rxdr->dma); 
        ...
        memset(rxdr->desc,0,rxdr->size);   //分配缓冲队列所使用的内存
        rxdr->next_to_clean =0;
        rxdr->next_to_use =0;
        return 0;
}
e1000_up 启动网卡函数  调用alloc_rx_buf来建立环形缓冲队列
int e1000_up(struct e1000_adapter *adapter){
        e1000_configure(adatper);
         ....
}
static void e1000_configure(struct e1000_adapter *adapter){
         struct net_device *netdev = adapter->netdev;
         int i;
         ...
          e1000_configure_rx(adapter);
         ...
         for (i=0;i<adapter->num_rx_queues;i++){
                 struct e1000_rx_ring *ring = &adapter ->rx_ring[i];   
                 adapter->alloc_rx_buf(adapter,ring,E1000_DESC_UNUSED(ring));   //从这里就可以看出 环形缓冲区并不是一开始就完全建好的,建了部分
         }
          ...
}
static void e1000_configure_rx(struct e1000_adapter *adapter){
         ....
         adapter->clean_rx = e1000_clean_rx_irq;     //后面会提到的poll()
          adapter->alloc_rx_buf = e1000_alloc_rx_irq      //建立环形缓冲队列函数 这里实际调用的是e1000_alloc_rx_buffers
}
e1000_alloc_rx_buffers ----因为其中有些参数要看完下面的才能理解,所以这个函数最后再写

e1000_intr  e1000的中断处理函数
static irqreturn_t e1000_intr(int irq,void *data){
        struct net_device *netdev = data;
        struct e1000_adapter *adapter = netdev_priv(netdev);
        ..
        u32 icr =  E1000_READ_REG(hw,ICR);
    #ifdef CONFIG_E1000_NAPI
        int i;
    #endif 
         ...
    #ifdef CONFIG_E1000_NAPI           //进入轮询模式
         if(unlikely(hw->mac_type<e1000_82571)){
               E1000_WRITE_REG(hw,IMC,~0);   //关闭中断
               E1000_WRITE_FLUSH(hw);
         }
         if (likely(netif_rx_schedule_prep(netdev,&adapter->napi))){     //确定该设备处于运行状态, 而且还未被添加到网络层的poll队列中
                ...
                __netif_rx_schedule(netdev,&adapter->napi);  //将当前设备netdevice加到与cpu相关的softnet_data的轮旬设备列表poll_list中并触发NET_RX_SOFTIRQ软中断
         }
     #else     //进入中断模式
        {      ...
               for(i=0;i<E1000_MAX_INTR;i++){
                      if (unlikely(!adapter->clean_rx(adapter, adapter->rx_ring) &....    //执行clean_rx()中关于中断模式的代码  不走napi路径
                           break;
                     ...
               }
        }
         ....
        return IRQ_HANDLED;   
}
static inline int netif_rx_schedule_prep(struct net_device *dev,struct napi_struct *napi){
        return napi_schedule_prep(napi);
}
static inline int napi_schedule_prep(struct napi_struct *n){
       return !napi_disable_pending(n) &&
                   !test_and_set_bit(NAPI_STATE_SCHED,  &n->state);     //测试该设备是否已被被加到poll队列
}
static inline int napi_disable_pending (struct napi_struct *n){
        return test_bit(NAPI_STATE_DISABLE,&n->state);    //测试该设备是否停止运行
}
static inline void __netif_rx_schedule(struct net_device *dev,struct napi_struct *napi){
        __napi_schedule(napi);   
}
void __napi_schedule(struct napi_struct *n){
       unsigned long flags;
       local_irq_save(flags);
       list_add_tail(&n->poll_list, &__get_cpu_var(softnet_data).poll_list);
       __raise_softirq_irqoff(NET_RX_SOFTIRQ);    触发软中断
      local_irq_restore(flags);
}
#define __raise_softirq_irqoff(nr)  do {or_softirq_pending(iUL<<(nr)); } while(0)
用到的数据结构napi_struct
struct napi_struct{
        struct list_head poll_list;   //poll_list链表
        unsigned long state  //设备状态信息 往上看看
        int weight;   //设备预处理数据配额,作用是轮询时更公平的对待各个设备
        int  (*poll) (strcut napi_struct *,int);  
        .....
}
接下来就是软中断处理函数net_rx_action() 
static void net_rx_action(struct softirq_action *h){
        struct list_head *list = &__get_cpu_var(softnet_data).poll_list;
        unsigned long start_time = jiffies;
        int budget = netdev_budget;   //处理限额,一次最多只能处理这么多数据包
        ....
        local_irq_disable();
        while (!list_empty(list)){  
              struct napi_struct *n;
              int work,
                   weight;  
              if (unlikely(budget < 0 || jiffies != start_time))  //如果实际工作量work  超过限额,或处理时间超过1秒,则出于系统响应考虑立即从软中断处理函数中跳出来,  work是poll的返回值  限额budget每次都会根据返回的work值重新计算 ,配额weight和work配合来实现轮询算法,具体算法要看完e1000_clean(),e1000_rx_irq()才能清楚
                          goto softnet_break;
              local_irq_enalbe();
              n = list_entry(list->next,struct napi_struct,poll_list);
              weight = n->weight; 
              work 0;
              if (test_bit(NAPI_STATE_SCHED,&n->state))
                            work = n->poll(n,weight);        //调用设备的poll函数e1000_clean()
              ....
              budget -= work;      //更新限额
              local_irq_disable();
              if (unlikely(work == weight)){     //处理量大于配额
                      if(unlikely(napi_disable_pending(n)))
                               __napi_complete(n);
                      else
                               list_move_tail(&n->poll_list,list);    //该设备还有要接收的数据没被处理,因为轮询算法 被移动到poll_llst尾部等待处理
              }
             ...
        }
out:
           local_irq_enable();
           ...
           return;
softnet_break:
            __raise_softirq_irqoff(NET_RX_SOFTIRQ);
           goto out;
}
e1000网卡poll函数  e1000_clean()
static int e1000_clean(struct napi_struct *napi,int budget){    //此处的budget实际上是传过来的weight,不要和上面的budget弄混了
        struct e1000_adapter *adapter = container_of(napi,struct e1000_adapter,napi);
        struct net_device *poll_dev = adapter->netdev;
        int work_done = 0;
        adapter = poll_dev->priv;
        .....
        adapter ->clean_rx(adapter,&adapter ->rx_ring[0],&work_done,budget);     //实际调用的是clean_rx_irq()
        ...
        if(work_done<budget){       //如果完成的工作量(已处理了的接收数据)小于weight(budget=weight), 则说明处理完成了该设备所有要接收的数据包, 之后调用netif_rx_complete()将该设备从poll_list链表中删除,并打开中断退出轮询模式    
                 ...
                 netif_rx_complete(poll_dev,napi);   //__napi_complete()的包装函数
                 e1000_irq_enable(adapter);     //开中断
        }
         return work_done;
}
static inline void netif_rx_complete(struct net_device *dev,struct napi_struct *napi){
        unsigned long flags;
        local_irq_save(flags);
        __netif_rx_complete(dev,napi);
        local_irq_restore(flags);
}
static inline void __netif_rx_complete(struct net_device *dev,struct napi_struct *napi){
        __napi_complete(napi);
}
static inline void __napi_complete(struct napi_struct *n){
        ....
        list_del(&n->poll_list);     
        clear_bit(NAPI_STATE_SCHED,&n->state);
}
设备轮询接收机制中最重要的函数e1000_clean_rx_irq()
#ifdef CONFIG_E1000_NAPI
e1000_clean_rx_irq(struct e1000_adapter *adapter,struct e1000_rx_ring *rx_ring,int *work_done,int work_to_do)  //work_to_do实际上是传过来的配额weight
.....
{
         struct net_device *netdev = adapter->netdev;
         struct pci_dev *pdev = adapter->pdev;
         struct e1000_rx_desc *rx_desc,*next_rxd;
         struct e1000_buffer *buffer_info, *next_buffer;
         ...
         unsigned int i;
         int cleaned_count = 0;
         ....
         i = rx_ring->next_to_clean;        //next_to_clean是下一个可以被清除的描述符索引,上面讲过环形缓冲队列由多个描述符组成,每个描述符都有一个用于存放接收数据包的缓冲区buffer,这里所说的“可以被清除”并不是将其删除,而是标记这个缓冲区的数据已经处理(可能正在处理),但是否处理完了要看rx_desc->status&E1000_RXD_STAT_DD,当有新数据需要使用缓冲区时,只是将已处理的缓冲区覆盖而已,   这里的i可以理解为可以被新数据覆盖的缓冲区序号
         rx_desc = E1000_RX_DESC(*rx_ring,i);       //得到相应的描述符
         buffer_info = &rx_ring->buffer_info[i];           
         while(rx_desc->status & E1000_RXD_STAT_DD){         //测试其状态是否为已删除
                 struct sk_buff  *skb;
                 u8 status;
  #ifdef CONFIG_E1000_NAPI
                 if (*wrok_done>=work_to_do)     //如果所完成的工作>配额则直接退出
                              break;
                 (*work_done) ++
  #endif
                 status = rx_desc->status;
                 skb = buffer_info->skb;                    //得到缓冲区中的数据
                 buffer_info->skb = NULL;
                 prefetch(skb->data-NET_IP_ALIGN);
                 if(++i == rx_ring->count)                           //处理环形缓冲区达到队列末尾的情况,因为是环形的,所以到达末尾的下一个就是队列头,这样整个队列就不断地循环处理。然后获取下一格描述符的状态,看看是不是处理删除状态。如果处于就会将新到达的数据覆盖旧的缓冲区,如果不处于则跳出循环,并将当前缓冲区索引号置为下一次查询的目标
                           i = 0;
                 next_rxd = E1000_RX_DESC(*rx_ring,i);
                 next_buffer = &rx_ring->buffer_info[i];
                 cleaned = true ;
                 cleaned_count ++;  
                 pci_unmap_single(pdev,buffer_info->dma,buffer_info->length,PCI_DMA_FROMDEVICE);    //* 取消映射,因为通过DMA,网卡已经把数据放在了主内存中,这里一取消,也就意味着,CPU可以处理主内存中的数据了 */
                 ....
                //checksum
                 ...
   #ifdef CONFIG_E1000_NAPI
                   netif_receive_skb(skb);      //交由上层协议处理 , 如果数据包比较大,处理时间会相对较长
   #else
                   netif_rx(skb);            //进入中断模式 将数据包插入接收队列中,等待软中断处理    中断模式不用环形接收缓冲队列 
    #endif
                    netdev->last_rx = jiffies;
        
next_desc:
                 rx_desc->status =0;
                 if(unlikely(cleaned_count >= E1000_RX_BUFFER_WRITE)){    
                        adapter->alloc_rx_buf(adapter,rx_ring,cleaned_count);  //在e1000_up中已经调用了这个函数为环形缓冲区队列中的每个缓冲区分配了sk_buff内存,但是如果接收到数据以后,调用netif_receive_skb(skb)向上提交数据以后,这段内存将始终被这个skb占用(直到上层处理完以后才会调用_kfree_skb释放),换句话说,就是当前缓冲区必须重新申请分配sk_buff内存,为下一个数据作准备
                        cleaned_count = 0;
                 }
                 rx_desc = next_rxd;
                 buffer_info = next_buffer;
          }
          rx_ring->next_to_clean = i;
          cleaned_count = E1000_DESC_UNUSED(rx_ring);
          if(cleaned_count)
                  adapter->alloc_rx_buf(adapter,rx_ring,cleaned_count);
          ...
          return cleaned;     
}
static void e1000_alloc_rx_buffers(struct e1000_adapter *adapter,struct e1000_rx_ring *rx_ring,int cleaned_count){  
        struct net_device *netdev = adapter->netdev;
        struct pci_dev *pdev = adapter->pdev;
        struct e1000_rx_desc *rx_desc;
        struct e1000_buffer *buffer_info;
        struct sk_buff *skb;
        unsigned int i;
        unsigned int bufsz = adapter->rx_buffer_len+NET_IP_ALIGN;
        i=rx_ring->next_to_use;
        buffer_info = &rx_ring->buffer_info[i];
        while (cleaned_count--){
                skb = buffer_info ->skb;
                if(skb){
                    ....
                 }
                 skb = netdev_alloc_skb(netdev,bufsz);   //skb缓存的分配
                 if(unlikely(!skb)){
                         adapter->alloc_rx_buff_failed++;
                         break;
                 }
                 skb_reserve(skb,NET_IP_ALIGN);
                 buffer_info->skb = skb;
                 buffer_info->length = adapter ->rx_buffer_len;
map_skb:
                  buffer_info->dma = pci_map_single(pdev,skb->data, adapter->rx_buffer_len,PCI_DMA_FROMDEVICE); //建立DMA映射,把每一个缓冲区skb->data都映射给了设备,缓存区描述符利用dma保存了每一次映射的地址
                  ....
                  rx_desc = E1000_RX_DESC(*rx_ring, i);
                  rx_desc->buffer_addr = cpu_to_le64(buffer_info->dma);   
                  if (unlikely(++i == rx_ring->count))    //达到环形缓冲区末尾 
                            i =0 ;
                   buffer_info = &rx_ring->buffer_info[i];
         }
         if(likely(rx_ring->netx_to_use!=i)){
                  rx_ring->next_to_use = i;
                  if (unlikely(i-- == 0))
                          i = (rx_ring->count - 1);
                 ...
        }
}
简要流程
1
 
1
 
1
 
1
1
 
15:16:23 | 写入日志

评论

若要添加评论,请使用您的 Windows Live ID 登录(如果您使用过 Hotmail、Messenger 或 Xbox LIVE,您就拥有 Windows Live ID)。登录


还没有 Windows Live ID 吗?请注册

引用通告

此日志的引用通告 URL 是:
http://sh-neo.spaces./blog/cns!1E3CA285E5F9E122!526.trak
引用此项的网络日志

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多