分享

Linux内核数据包处理流程-数据包接收

 mrjbydd 2010-12-15
Linux内核数据包处理流程-数据包接收
2010-04-01 23:49

CODE:
/*
*    网络模块的核心处理模块.
*/
static int __init net_dev_init(void)
{
int i, rc = -ENOMEM;

BUG_ON(!dev_boot_phase);

net_random_init();

if (dev_proc_init()) /*初始化proc文件系统*/
goto out;

if (netdev_sysfs_init()) /*初始化sysfs文件系统*/
goto out;

/*ptype_all和ptype_base是重点,后面会详细分析,它们都是
struct list_head类型变量,这里初始化链表成员*/
INIT_LIST_HEAD(&ptype_all);
for (i = 0; i < 16; i++)
INIT_LIST_HEAD(&ptype_base[i]);

for (i = 0; i < ARRAY_SIZE(dev_name_head); i++)
INIT_HLIST_HEAD(&dev_name_head[i]);

for (i = 0; i < ARRAY_SIZE(dev_index_head); i++)
INIT_HLIST_HEAD(&dev_index_head[i]);

/*
    * 初始化包接收队列,这里我们的重点了.
    */

/*遍历每一个CPU,取得它的softnet_data,我们说过,它是一个struct softnet_data的Per-CPU变量*/
for (i = 0; i < NR_CPUS; i++) {
struct softnet_data *queue;

/*取得第i个CPU的softnet_data,因为队列是包含在它里边的,所以,我会直接说,“取得队列”*/
queue = &per_cpu(softnet_data, i);
/*初始化队列头*/
skb_queue_head_init(&queue->input_pkt_queue);
queue->throttle = 0;
queue->cng_level = 0;
queue->avg_blog = 10; /* arbitrary non-zero */
queue->completion_queue = NULL;
INIT_LIST_HEAD(&queue->poll_list);
set_bit(__LINK_STATE_START, &queue->backlog_dev.state);
queue->backlog_dev.weight = weight_p;
/*这里,队列中backlog_dev设备,它是一个伪网络设备,不对应任何物理设备,它的poll函数,指向了
process_backlog,后面我们会详细分析*/
queue->backlog_dev.poll = process_backlog;
atomic_set(&queue->backlog_dev.refcnt, 1);
}

#ifdef OFFLINE_SAMPLE
samp_timer.expires = jiffies + (10 * HZ);
add_timer(&samp_timer);
#endif

dev_boot_phase = 0;

/*注册收/发软中断*/
open_softirq(NET_TX_SOFTIRQ, net_tx_action, NULL);
open_softirq(NET_RX_SOFTIRQ, net_rx_action, NULL);

hotcpu_notifier(dev_cpu_callback, 0);
dst_init();
dev_mcast_init();
rc = 0;
out:
return rc;
}

这样,初始化完成后,在驱动程序中,在中断处理函数中,会调用netif_rx将数据交上来,这与采用轮询技术,有本质的不同:


CODE:
int netif_rx(struct sk_buff *skb)
{
int this_cpu;
struct softnet_data *queue;
unsigned long flags;

/* if netpoll wants it, pretend we never saw it */
if (netpoll_rx(skb))
return NET_RX_DROP;

/*接收时间戳未设置,设置之*/
if (!skb->stamp.tv_sec)
net_timestamp(&skb->stamp);

/*
    * 这里准备将数据包放入接收队列,需要禁止本地中断,在入队操作完成后,再打开中断.
    */
local_irq_save(flags);
/*获取当前CPU对应的softnet_data变量*/
this_cpu = smp_processor_id();
queue = &__get_cpu_var(softnet_data);

/*接收计数器累加*/
__get_cpu_var(netdev_rx_stat).total++;

/*接收队列是否已满*/
if (queue->input_pkt_queue.qlen <= netdev_max_backlog) {
if (queue->input_pkt_queue.qlen) {
if (queue->throttle) /*拥塞发生了,丢弃数据包*/
goto drop;

/*数据包入队操作*/
enqueue:
dev_hold(skb->dev); /*累加设备引入计数器*/
__skb_queue_tail(&queue->input_pkt_queue, skb); /*将数据包加入接收队列*/
#ifndef OFFLINE_SAMPLE
get_sample_stats(this_cpu);
#endif
local_irq_restore(flags);
return queue->cng_level;
}

/*
    * 驱动程序不断地调用net_rx函数,实现接收数据包的入队操作,当queue->input_pkt_queue.qlen == 0时(?什么情况下设置)
    * 则进入这段代码,这里,如果已经被设置拥塞标志的话,则清除它,因为这里将要调用软中断,开始将数据包交给
    * 上层了,即上层协议的接收函数将执行出队操作,拥塞自然而然也就不存在了。
    */
if (queue->throttle)
queue->throttle = 0;

/*
    * netif_rx_schedule函数完成两件重要的工作:
    * 1、将bakclog_dev设备加入“处理数据包的设备”的链表当中;
    * 2、触发软中断函数,进行数据包接收处理;
    */
netif_rx_schedule(&queue->backlog_dev);
goto enqueue;
}

/*前面判断了队列是否已满,如果已满而标志未设置,设置之,并累加拥塞计数器*/
if (!queue->throttle) {
queue->throttle = 1;
__get_cpu_var(netdev_rx_stat).throttled++;
}

/*拥塞发生,累加丢包计数器,释放数据包*/
drop:
__get_cpu_var(netdev_rx_stat).dropped++;
local_irq_restore(flags);

kfree_skb(skb);
return NET_RX_DROP;
}

从这段代码的分析中,我们可以看到,当数据被接收后,netif_rx的工作,就是取得当前CPU的队列,然后入队,然后返回,然后中断函数
现调用它,它再把数据包入队……
当队列接收完成后,netif_rx就调用netif_rx_schedule进一步处理数据包,我们注意到:
1、前面讨论过,采用轮询技术时,同样地,也是调用netif_rx_schedule,把设备自己传递了过去;
2、这里,采用中断方式,传递的是队列中的一个“伪设备”,并且,这个伪设备的poll函数指针,指向了一个叫做process_backlog的函数;

netif_rx_schedule函数完成两件重要的工作:
1、将bakclog_dev设备加入“处理数据包的设备”的链表当中;
2、触发软中断函数,进行数据包接收处理;

这样,我们可以猜想,在软中断函数中,不论是伪设备bakclog_dev,还是真实的设备(如前面讨论过的e100),都会被软中断函数以:
dev->poll()
的形式调用,对于e100来说,poll函数的接收过程已经分析了,而对于其它所有没有采用轮询技术的网络设备来说,它们将统统调用
process_backlog函数(我觉得把它改名为pseudo-poll是否更合适一些^o^)。

OK,我想分析到这里,关于中断处理与轮询技术的差异,已经基本分析开了……

继续来看,netif_rx_schedule进一步调用__netif_rx_schedule:

CODE:
/* Try to reschedule poll. Called by irq handler. */

static inline void netif_rx_schedule(struct net_device *dev)
{
if (netif_rx_schedule_prep(dev))
__netif_rx_schedule(dev);
}

CODE:
/* Add interface to tail of rx poll list. This assumes that _prep has
* already been called and returned 1.
*/

static inline void __netif_rx_schedule(struct net_device *dev)
{
unsigned long flags;

local_irq_save(flags);
dev_hold(dev);
/*伪设备也好,真实的设备也罢,都被加入了队列层的设备列表*/
list_add_tail(&dev->poll_list, &__get_cpu_var(softnet_data).poll_list);
if (dev->quota < 0)
dev->quota += dev->weight;
else
dev->quota = dev->weight;
/*触发软中断*/
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
local_irq_restore(flags);
}

软中断被触发,注册的net_rx_action函数将被调用:

CODE:
/*接收的软中断处理函数*/
static void net_rx_action(struct softirq_action *h)
{
struct softnet_data *queue = &__get_cpu_var(softnet_data);
unsigned long start_time = jiffies;
int budget = netdev_max_backlog;


local_irq_disable();

/*
    * 遍历队列的设备链表,如前所述,__netif_rx_schedule已经执行了
    * list_add_tail(&dev->poll_list, &__get_cpu_var(softnet_data).poll_list);
    * 设备bakclog_dev已经被添加进来了
    */
while (!list_empty(&queue->poll_list)) {
struct net_device *dev;

if (budget <= 0 || jiffies - start_time > 1)
goto softnet_break;

local_irq_enable();

/*取得链表中的设备*/
dev = list_entry(queue->poll_list.next,
    struct net_device, poll_list);
netpoll_poll_lock(dev);

/*调用设备的poll函数,处理接收数据包,这样,采用轮询技术的网卡,它的真实的poll函数将被调用,
这就回到我们上一节讨论的e100_poll函数去了,而对于采用传统中断处理的设备,它们调用的,都将是
bakclog_dev的process_backlog函数*/
if (dev->quota <= 0 || dev->poll(dev, &budget)) {
netpoll_poll_unlock(dev);

/*处理完成后,把设备从设备链表中删除,又重置于末尾*/
local_irq_disable();
list_del(&dev->poll_list);
list_add_tail(&dev->poll_list, &queue->poll_list);
if (dev->quota < 0)
dev->quota += dev->weight;
else
dev->quota = dev->weight;
} else {
netpoll_poll_unlock(dev);
dev_put(dev);
local_irq_disable();
}
}
out:
local_irq_enable();
return;

softnet_break:
__get_cpu_var(netdev_rx_stat).time_squeeze++;
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
goto out;
}

对于dev->poll(dev, &budget)的调用,一个真实的poll函数的例子,我们已经分析过了,现在来看process_backlog,

CODE:
static int process_backlog(struct net_device *backlog_dev, int *budget)
{
int work = 0;
int quota = min(backlog_dev->quota, *budget);
struct softnet_data *queue = &__get_cpu_var(softnet_data);
unsigned long start_time = jiffies;

backlog_dev->weight = weight_p;

/*在这个循环中,执行出队操作,把数据从队列中取出来,交给netif_receive_skb,直至队列为空*/
for (;;) {
struct sk_buff *skb;
struct net_device *dev;

local_irq_disable();
skb = __skb_dequeue(&queue->input_pkt_queue);
if (!skb)
goto job_done;
local_irq_enable();

dev = skb->dev;

netif_receive_skb(skb);

dev_put(dev);

work++;

if (work >= quota || jiffies - start_time > 1)
break;

}

backlog_dev->quota -= work;
*budget -= work;
return -1;

/*当队列中的数据包被全部处理后,将执行到这里*/
job_done:
backlog_dev->quota -= work;
*budget -= work;

list_del(&backlog_dev->poll_list);
smp_mb__before_clear_bit();
netif_poll_enable(backlog_dev);

if (queue->throttle)
queue->throttle = 0;
local_irq_enable();
return 0;
}


这个函数重要的工作,就是出队,然后调用netif_receive_skb()将数据包交给上层,这与上一节讨论的poll是一样的。这也是为什么,
在网卡驱动的编写中,采用中断技术,要调用netif_rx,而采用轮询技术,要调用netif_receive_skb啦!

到了这里,就处理完数据包与设备相关的部分了,数据包将进入上层协议栈……

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多