/* The main device statistics structure */ structrtnl_link_stats64 { __u64 rx_packets; /* total packets received */ __u64 tx_packets; /* total packets transmitted */ __u64 rx_bytes; /* total bytes received */ __u64 tx_bytes; /* total bytes transmitted */ __u64 rx_errors; /* bad packets received */ __u64 tx_errors; /* packet transmit problems */ __u64 rx_dropped; /* no space in linux buffers */ __u64 tx_dropped; /* no space available in linux */ __u64 multicast; /* multicast packets received */ __u64 collisions;
/* * enqueue_to_backlog is called to queue an skb to a per CPU backlog * queue (may be a remote CPU queue). */ staticintenqueue_to_backlog(struct sk_buff *skb, int cpu, unsignedint *qtail) { structsoftnet_data *sd; unsignedlong flags; sd = &per_cpu(softnet_data, cpu); local_irq_save(flags); rps_lock(sd); /* 判断接收队列是否满,队列长度为 netdev_max_backlog */ if (skb_queue_len(&sd->input_pkt_queue) <= netdev_max_backlog)="">=> if (skb_queue_len(&sd->input_pkt_queue)) { enqueue: /* 队列如果不会空,将数据包添加到队列尾 */ __skb_queue_tail(&sd->input_pkt_queue, skb); input_queue_tail_incr_save(sd, qtail); rps_unlock(sd); local_irq_restore(flags); return NET_RX_SUCCESS; } /* Schedule NAPI for backlog device * We can use non atomic operation since we own the queue lock */ /* 队列如果为空,回到 ____napi_schedule加入poll_list轮询部分,并重新发起软中断 */ if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) { if (!rps_ipi_queued(sd)) ____napi_schedule(sd, &sd->backlog); } goto enqueue; } /* 队列满则直接丢弃,对应计数器 +1 */ sd->dropped++; rps_unlock(sd); local_irq_restore(flags); atomic_long_inc(&skb->dev->rx_dropped); kfree_skb(skb); return NET_RX_DROP; }
内核会为每个 CPU Core 都实例化一个 softnet_data 对象,这个对象中的 input_pkt_queue 用于管理接收的数据包。
假如所有的中断都由一个 CPU Core 来处理的话,那么所有数据包只能经由这个 CPU 的 input_pkt_queue。
/** * 文件:ixgbe_main.c * ixgbe_request_irq - initialize interrupts * @adapter: board private structure * * Attempts to configure interrupts using the best available * capabilities of the hardware and kernel. **/ staticintixgbe_request_irq(struct ixgbe_adapter *adapter) { structnet_device *netdev = adapter->netdev; int err;
/** * 文件:include/linux/netdevice.h * napi_schedule - schedule NAPI poll * @n: NAPI context * * Schedule NAPI poll routine to be called if it is not already * running. */ staticinlinevoidnapi_schedule(struct napi_struct *n) { if (napi_schedule_prep(n)) /* 注意下面调用的这个函数名字前是两个下划线 */ __napi_schedule(n); }
/** * 文件:net/core/dev.c * __napi_schedule - schedule for receive * @n: entry to schedule * * The entry's receive function will be scheduled to run. * Consider using __napi_schedule_irqoff() if hard irqs are masked. */ void __napi_schedule(struct napi_struct *n) { unsignedlong flags;
/* * 文件:net/core/dev.c * Initialize the DEV module. At boot time this walks the device list and * unhooks any devices that fail to initialise (normally hardware not * present) and leaves us with a valid list of present and active devices. * */
/* * This is called single threaded during boot, so no need * to take the rtnl semaphore. */ staticint __init net_dev_init(void) { … /* 分别注册TX和RX软中断的处理程序 */ open_softirq(NET_TX_SOFTIRQ, net_tx_action); open_softirq(NET_RX_SOFTIRQ, net_rx_action); … }
/* Even though interrupts have been re-enabled, this * access is safe because interrupts can only add new * entries to the tail of this list, and only ->poll() * calls can remove this head entry from the list. */ n = list_first_entry(&sd->poll_list, struct napi_struct, poll_list);
have = netpoll_poll_lock(n);
weight = n->weight;
/* This NAPI_STATE_SCHED test is for avoiding a race * with netpoll's poll_napi(). Only the entity which * obtains the lock and sees NAPI_STATE_SCHED set will * actually make the ->poll() call. Therefore we avoid * accidentally calling ->poll() when NAPI is not scheduled. */ work = 0; if (test_bit(NAPI_STATE_SCHED, &n->state)) { work = n->poll(n, weight); trace_napi_poll(n); }
…… }
}
(6) ixgbe_poll 之后的一系列调用就不一一详述了,有兴趣的同学可以自行研究,软中断部分有几个地方会有类似 if (static_key_false(&rps_needed))这样的判断,会进入前文所述有丢包风险的 enqueue_to_backlog 函数。
这里的逻辑为判断是否启用了 RPS 机制,RPS 是早期单队列网卡上将软中断负载均衡到多个 CPU Core 的技术,它对数据流进行 hash 并分配到对应的 CPU Core 上,发挥多核的性能。
这段调用的最后,deliver_skb 会将接收的数据传入一个 IP 层的数据结构中,至此完成二层的全部处理。
/** * netif_receive_skb - process receive buffer from network * @skb: buffer to process * * netif_receive_skb() is the main receive data processing function. * It always succeeds. The buffer may be dropped during processing * for congestion control or by the protocol layers. * * This function may only be called from softirq context and interrupts * should be enabled. * * Return values (usually ignored): * NET_RX_SUCCESS: no congestion * NET_RX_DROP: packet was dropped */ intnetif_receive_skb(struct sk_buff *skb) { int ret;
net_timestamp_check(netdev_tstamp_prequeue, skb);
if (skb_defer_rx_timestamp(skb)) return NET_RX_SUCCESS;
rcu_read_lock();
#ifdef CONFIG_RPS /* 判断是否启用RPS机制 */ if (static_key_false(&rps_needed)) { structrps_dev_flowvoidflow, *rflow = &voidflow; /* 获取对应的CPU Core */ int cpu = get_rps_cpu(skb->dev, skb, &rflow);
if (cpu >= 0) { ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail); rcu_read_unlock(); return ret; } } #endif ret = __netif_receive_skb(skb); rcu_read_unlock(); return ret; }
TCP/IP 协议栈
TCP/IP 协议栈逐层处理,最终交给用户空间读取,数据包进到 IP 层之后,经过 IP 层、TCP 层处理(校验、解析上层协议,发送给上层协议),放入 socket buffer。
staticintixgbe_acquire_msix_vectors(struct ixgbe_adapter *adapter) { structixgbe_hw *hw = &adapter->hw; int i, vectors, vector_threshold; /* We start by asking for one vector per queue pair with XDP queues * being stacked with TX queues. */ vectors = max(adapter->num_rx_queues, adapter->num_tx_queues); vectors = max(vectors, adapter->num_xdp_queues); /* It is easy to be greedy for MSI-X vectors. However, it really * doesn't do much good if we have a lot more vectors than CPUs. We'll * be somewhat conservative and only ask for (roughly) the same number * of vectors as there are CPUs. */ vectors = min_t(int, vectors, num_online_cpus());
通过加载网卡驱动,获取网卡型号和网卡硬件的队列数;但是在初始化 misx vector 的时候,还会结合系统在线 CPU 的数量,通过 Sum = Min(网卡队列,CPU Core) 来激活相应的网卡队列数量,并申请 Sum 个中断号。
如果 CPU 数量小于 64,会生成 CPU 数量的队列,也就是每个 CPU 会产生一个 external IRQ。