环境初始化
rte_eal_init()解析并处理与DPDK 相关的参数(如 CPU 核绑定、大页内存等),然后初始化内存管理、多核调度等底层资源。返回值ret表示已被rte_eal_init()消耗的参数数量,如果返回值小于 0,表示初始化失败(参数无效或系统资源不足等问题),需使用rte_exit()打印错误信息并退出程序(类似exit(),但带有DPDK的环境清理功能)。
DPDK的命令行命令参数包括:EAL参数、程序自身的参数,两个部分使用“–”隔开,例如./l2fwd -c 0x3 -n 4 – -p 3 -q 1,rte_eal_init()函数处理到“–”时就会自动停止继续解析。argc -= ret
将主函数的argc减去已被EAL消耗的参数数量,argv += ret
将参数数组指针向后移动 ret
个位置,这两句的功能在于跳过已经被EAL处理的DPDK参数,这样argc和argv就只保留了开发者自己关心的参数。
1
2
3
4
5
6
7
8
|
// Initialize the Environment Abstraction Layer (EAL)
int ret = rte_eal_init(argc, argv);
if (ret < 0)
{
rte_exit(EXIT_FAILURE, "Invalid EAL arguments\n");
}
argc -= ret;
argv += ret;
|
信号处理机制
信号处理机制的实现,通常用于捕捉用户中断(如 Ctrl+C)或系统终止信号,让程序在被终止时能执行开发者自定义的资源释放逻辑,而不是直接被操作系统强制终止。
volatile bool force_quit = false
定义一个全局变量force_quit,用于标记是否收到退出信号。使用volatile是因为这个变量可能会被异步的信号处理函数signal handler修改,告诉编译器不要优化对这个变量的访问。具体业务代码中的主循环通常会不断检查这个变量是否为true,以决定是否退出。
static void signal_handler(int signum)
定义一个静态的信号处理函数,用于处理来自系统的信号。signum是接收到的信号编号,比如:SIGINT为中断信号,通常由用户按下Ctrl+C触发;SIGTERM为终止信号,系统或其他进程请求终止本程序时发出。当程序收到SIGINT、SIGTERM这两种信号时,将标记force_quit,以提醒其他业务代码停止任务、清理释放资源。
使用标准库函数signal()注册信号处理函数signal_handler(),当程序收到SIGINT或SIGTERM信号时,signal_handler()会被自动调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
// Signal handler for exiting
volatile bool force_quit = false;
static void signal_handler(int signum)
{
if (signum == SIGINT || signum == SIGTERM)
{
printf("\n\nSignal %d received, preparing to exit...\n", signum);
force_quit = true;
}
}
// Register signal handlers
force_quit = false;
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
|
具体业务代码中的主循环示例,当force_quit被标记为true时,主循环停止,并调用rte_eal_cleanup()函数执行DPDK环境的清理,以安全退出程序。
1
2
3
4
5
6
7
8
|
while (!force_quit)
{
// 程序主逻辑
}
// Clean up the EAL
rte_eal_cleanup();
printf("Bye...\n");
|
内存池初始化
检测系统中可用的以太网端口数量,如果没有可用网口则终止程序。
1
2
3
4
5
6
7
|
// Get the number of available ports
uint16_t nb_ports = rte_eth_dev_count_avail();
if (nb_ports == 0)
{
rte_exit(EXIT_FAILURE, "[DEV_INIT] No Ethernet ports - bye\n");
}
printf("[DEV_INIT] Number of available Ethernet ports: %d\n", nb_ports);
|
内存池是 DPDK 中重要的内存管理机制,它预先分配一大块内存,并将其分割成许多小的内存块(通常是固定大小的对象),然后将这些小块组织起来。这可以提高了内存分配的效率,避免了传统的动态内存分配带来的性能瓶颈。通过内存池管理网络数据包的缓冲区(如mbuf),可以在高并发环境下有效地提高网络处理性能。
rte_pktmbuf_pool_create()是DPDK提供的API,用于快速创建一个用于mbufs的标准内存池,包括以下参数:
- “mbuf_pool”:给内存池命名为"mbuf_pool",根据名字可以借助rte_mempool_lookup()函数获取到对应的内存池对象(内存池之间是以类似链表的形式组织,查找时即遍历各内存池对象,比较名字是否一致,若一致就将内存池对象返回);
- NUM_MBUFS * nb_ports:设置内存池中要创建的mbuf数量,NUM_MBUFS是每个网口预分配的mbuf数量,用于缓存从该端口接收的或准备发送的数据包,乘以nb_ports就是总共的mbuf数量。NUM_MBUFS设置为8191(2的幂次方减1)是为了利用内存对齐、避免内存碎片的优势,优化内存池的使用效率;
- MBUF_CACHE_SIZE:每个线程本地缓存的mbuf数量。在多线程环境下,为了提高性能并减少对共享资源的访问,DPDK提供了本地缓存(Thread-local Cache)机制,每个线程会有自己的缓存区,用于存储一部分 mbuf,以减少对全局内存池的频繁访问,从而避免竞争和锁开销;
- 0:用户私有数据区的大小,用户私有数据区指的是在内存池中为每个元素(例如mbuf)预留的额外空间,它可以用于存储每个内存块(如mbuf)的私有数据(与该数据包相关的标识符、统计数据、指向其他结构的指针等)。一般设置为 0,表示不需要额外的私有空间;
- RTE_MBUF_DEFAULT_BUF_SIZE:每个mbuf能容纳的数据包大小(以字节为单位),一般是默认的 2048 字节,足够容纳普通以太网帧;
- rte_socket_id():该参数用于指示在哪个Socket上分配内存,注意Socket不是网络中的套接字,而是NUMA架构的Socket。在NUMA架构中,每个Socket上有数个Node,每个Node又包括数个Core,每个Socket有自己的内存,每个Socket里的处理器访问自己内存的速度最快,因此在创建缓冲区的时候就需要考虑到内存位置对性能的影响。rte_socket_id()函数获取获取当前线程所在的socket,进而指定在哪个NUMA节点上分配内存。
随后检查内存池是否创建成功,如果失败,则调用rte_exit()输出错误信息并终止程序,原因可能是内存不足、巨页配置问题等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
#define NUM_MBUFS 8191
#define MBUF_CACHE_SIZE 250
// Creates a new mempool in memory to hold the mbufs
struct rte_mempool *mbuf_pool = rte_pktmbuf_pool_create(
"mbuf_pool",
NUM_MBUFS * nb_ports,
MBUF_CACHE_SIZE,
0,
RTE_MBUF_DEFAULT_BUF_SIZE,
rte_socket_id()
);
if (mbuf_pool == NULL)
{
rte_exit(EXIT_FAILURE, "[DEV_INIT] Cannot init mbuf pool\n");
}
|
端口初始化
1
2
3
4
5
6
7
8
9
|
// Initializing all ports
uint16_t portid;
RTE_ETH_FOREACH_DEV(portid)
{
if (port_init(portid, mbuf_pool) == false)
{
rte_exit(EXIT_FAILURE, "[DEV_INIT] Cannot init port %d\n", portid);
}
}
|
端口初始化过程主要包括:
- 端口有效性检查:调用rte_eth_dev_is_valid_port()检查端口是否有效;
- 获取端口设备信息:使用rte_eth_dev_info_get()获取端口设备的信息;
- 若端口支持,则启用高速内存释放;
- 配置端口的接收和发送队列:调用rte_eth_dev_configure()配置端口的接收和发送队列;
- 配置接收队列:使用rte_eth_rx_queue_setup()为每个接收队列分配内存,并设置队列参数;
- 配置发送队列:使用默认发送队列配置设置发送队列的配置;
- 启动端口:调用rte_eth_dev_start()启动端口。
通过rte_eth_dev_is_valid_port()函数判断portid是否是一个有效的、存在的端口,若端口号非法则打印错误信息、退出程序。
1
2
3
4
5
6
|
// Check whether it is a valid and available port
if (!rte_eth_dev_is_valid_port(portid))
{
printf("[DEV_INIT] Inalid port %u\n", portid);
return false;
}
|
结构体rte_eth_conf是DPDK中用于配置端口设备参数的,包括接收参数、发送参数、offload功能、中断配置、多队列模式等。使用memset将结构体清零,以完成配置信息的初始化(即不启用任何高级功能)。
1
2
|
struct rte_eth_conf port_conf;
memset(&port_conf, 0, sizeof(struct rte_eth_conf));
|
随后查询指定端口的设备信息,并输出其驱动名称,为后续配置网卡参数做准备。需要注意的是,rte_eth_dev_info()函数是查询网卡能力信息的,支持查询网卡支持的功能、最大支持的接收队列数量、最大支持的发送队列数量、默认的接收队列配置、默认的发送队列配置等。rte_eth_conf()函数则是用于配置网卡队列的。
1
2
3
4
5
6
7
8
9
|
struct rte_eth_dev_info dev_info;
// Try to get the divce according to the given number of port
ret = rte_eth_dev_info_get(portid, &dev_info);
if (ret != 0)
{
printf("[DEV_INIT] Error during getting device (port %u) info: %s\n", portid, strerror(-ret));
return false;
}
printf("[DEV_INIT] Port %u, Driver name: %s\n", portid, dev_info.driver_name);
|
根据获取的端口设备信息dev_info,判断其是否支持MBUF_FAST_FREE特性,如果支持该特性,则启用TX Offload功能(将功能写入port_conf配置结构体中,稍后进行配置)。TX Offload技术在数据包成功发送后,立即快速释放数据缓冲区(mbuf)回内存池,可以提升性能、资源使用效率。
1
2
3
4
5
6
|
// Turn on DEV_TX_OFFLOAD_MBUF_FAST_FREE if the device support
// MBUF_FAST_FREE enables MBUF to be quickly released after packet is sent successfully
if (dev_info.tx_offload_capa & RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE)
{
port_conf.txmode.offloads |= RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE;
}
|
在端口设备配置参数构造完成后(上述仅开启了TX Offload功能),即可使用rte_eth_dev_configure()函数将端口安装指定的配置参数port_conf进行初始化。与此同时还将设置端口的接收队列数和发送队列数,简单场景下为每个端口设置1个接收队列和1个发送队列即足够,在多线程/多核收发包的场景下,在多线程或多核处理场景中,可以配置多个接收和发送队列,使得多个线程或多个CPU核能够并行地从同一个端口接收和发送数据,从而充分发挥多核系统的性能优势,提高数据包的处理吞吐量和系统的整体效率。
1
2
3
4
5
6
7
8
|
// Configure the number of queues for a port
const uint16_t rx_rings = 1, tx_rings = 1;
ret = rte_eth_dev_configure(portid, rx_rings, tx_rings, &port_conf);
if (ret != 0)
{
printf("[DEV_INIT] Cannot configure device: err=%d, port=%u\n", ret, portid);
return false;
}
|
描述符(Descriptor)是在网络设备和DPDK中用于管理数据包缓冲区的一种关键数据结构。它本质上是一个控制块,存储着数据包所在内存缓冲区的地址、长度以及状态信息,如数据是否准备好发送或已成功接收。
描述符是驱动程序在内存中通过环形队列(Ring Buffer)维护的,网卡通过DMA机制访问这块内存,读取和写入描述符中的信息,从而知道数据包缓冲区的位置,进行数据的收发:接收描述符告诉网卡可用的空缓冲区位置,供接收数据包使用;发送描述符则指示网卡从哪个缓冲区取出数据进行发送。
描述符的数量决定了网卡在任意时刻能够缓存和处理的最大数据包数量,直接影响数据传输的效率和系统吞吐量。对于小流量且对延迟敏感的应用,采用较少的描述符可以缩短队列长度,减少数据包在队列中的等待时间,从而降低整体处理延迟并节省内存资源。相反,在大流量和高吞吐场景下,适当增加描述符数量能够提升队列容量,更好地应对突发流量,避免因队列溢出导致的数据包丢失,从而保证网络性能的稳定性和可靠性。
用户首先定义了接收队列描述符数量RX_RING_SIZE和发送队列描述符数量TX_RING_SIZE为1024,随后使用rte_eth_dev_adjust_nb_rx_tx_desc()函数判断对于端口portid采用这样的描述符数量是否合适,函数会根据端口的硬件限制调整传入的描述符数量,使其不超过设备支持的最大值。
1
2
3
4
5
6
7
8
9
10
11
12
|
#define RX_RING_SIZE 1024
#define TX_RING_SIZE 1024
// Check whether the size of the TX and RX ring exceeds the limit of the network card
// If the value exceeds, function adjust the size to the boundary max value
uint16_t nb_rxd = RX_RING_SIZE;
uint16_t nb_txd = TX_RING_SIZE;
ret = rte_eth_dev_adjust_nb_rx_tx_desc(portid, &nb_rxd, &nb_txd);
if (ret != 0)
{
printf("[DEV_INIT] Cannot adjust number of descriptors: err=%d, port=%u\n", ret, portid);
return false;
}
|
针对端口的每一个接收队列(本程序只有1个接收队列)进行配置,rte_eth_rx_queue_setup()函数的参数如下:
- portid:要配置的端口编号;
- q:当前正在配置的接收队列索引;
- nb_rxd:每个接收队列的描述符数量,决定了接收队列大小;
- rte_eth_dev_socket_id(portid):获取端口对应的CPU Socket编号,便于内存亲和性优化;
- NULL:此处使用默认的接收队列配置;
- mbuf_pool:接收队列使用的内存池,用于存放接收到的数据包。
1
2
3
4
5
6
7
8
9
10
|
// Allocate and set up RX queue per Ethernet port
for (uint16_t q = 0; q < rx_rings; q++)
{
ret = rte_eth_rx_queue_setup(portid, q, nb_rxd,
rte_eth_dev_socket_id(portid), NULL, mbuf_pool);
if (ret < 0)
{
return false;
}
}
|
同样针对端口的每一个发送队列(本程序只有1个接收队列)进行配置。首先,从端口设备信息dev_info中提取默认的发送队列配置结构体,作为初始化的基础;随后,将先前配置的端口卸载能力(如MBUF_FAST_FREE)应用到发送队列;最后通过rte_eth_tx_queue_setup()函数进行配置,函数的参数如下:
- portid:要配置的端口编号;
- q:当前正在配置的发送队列索引;
- nb_txd:每个发送队列的描述符数量,决定了发送队列大小;
- rte_eth_dev_socket_id(portid):获取端口对应的CPU Socket编号,便于内存亲和性优化;
- &txconf:配置的发送队列配置参数。
需要注意rte_eth_tx_queue_setup()函数不需要指定mempool,这是因为应用程序在发送数据时,已经完成了mbuf的创建和填充,驱动只负责把它发送出去。也正是因为这样,mempool的指定是在应用程序构造mbuf时完成的,例如使用rte_pktmbuf_alloc()函数构造mbuf时需要指定mempool。与之对应,接收时需要驱动提前知道数据包的存放位置,所以在配置接收队列时需要指定mempool,用于为接收到的数据分配缓存空间。
1
2
3
4
5
6
7
8
9
10
11
12
|
// Allocate and set up TX queue per Ethernet port
txconf = dev_info.default_txconf;
txconf.offloads = port_conf.txmode.offloads;
for (uint16_t q = 0; q < tx_rings; q++)
{
ret = rte_eth_tx_queue_setup(portid, q, nb_txd,
rte_eth_dev_socket_id(portid), &txconf);
if (ret < 0)
{
return false;
}
}
|
调用rte_eth_dev_start()函数启动端口,正式使其具备收发数据的能力,失败则立即中止初始化。该函数的具体工作包括:
- 启动接收队列和发送队列:为每个已配置的接收队列和发送队列分配硬件资源,随后将这些队列标记为活跃状态,并执行相应的初始化操作;
- 启用链路状态监控:启动网卡的链路检测逻辑,读取包括通断状态、速率、工作模式在内的物理层状态;
- 清空缓冲、设置标志位:初始化收发路径的内部状态,可能启用某些offload功能,并标识状态为活跃状态,通知驱动准备接收和发送数据。
1
2
3
4
5
6
7
|
// Starting Ethernet port
ret = rte_eth_dev_start(portid);
if (ret < 0)
{
printf("[DEV_INIT] rte_eth_dev_start:err=%d, port=%u\n", ret, portid);
return false;
}
|
为便于调试,可以打印出端口的MAC地址。rte_ether_addr是DPDK提供的用于表示一个MAC地址的结构体,其是一个包含6字节的数组uint8_t addr_bytes[RTE_ETHER_ADDR_LEN];
。通过rte_eth_macaddr_get()函数可将指定端口portid的MAC地址写入mac_addr。
为了打印MAC地址,可采用DPDK提供的格式宏RTE_ETHER_ADDR_PRT_FMT,其实际上就是"%02X:%02X:%02X:%02X:%02X:%02X",可供printf辅助完成MAC地址的打印。
1
2
3
4
5
6
7
8
9
|
// Display the port MAC address
printf("[DEV_INIT] Done > Starting Ethernet port: ");
struct rte_ether_addr mac_addr;
ret = rte_eth_macaddr_get(portid, &mac_addr);
if (ret != 0)
{
return false;
}
printf("Port %u, MAC address: " RTE_ETHER_ADDR_PRT_FMT "\n", portid, RTE_ETHER_ADDR_BYTES(&mac_addr));
|
启动接收主线程
本示例的lcore_recv_main()是在主函数的主线程里被直接调用的,即同步运行在主线程上,直到函数运行完成退出后才继续向下执行。
1
2
|
// Call lcore_main on the main core only. Called on single lcore
lcore_recv_main();
|
检查每个端口与当前线程是否位于同一个NUMA Socket,跨NUMA节点访问会带来更高的内存访问延迟,影响性能:
- rte_eth_dev_socket_id()函数返回端口portid所属NUMA Socket编号:如果返回值是非负数,则表示该端口确实绑定到了一个具体的Socket;如果返回值是负数,则表示该端口没有绑定到特定的 Socket,可能是不支持NUMA或其他情况,此时无需判断跨节点访问性能影响;
- rte_socket_id()返回当前线程所在Socket的编号,若与端口位于不同的Socket,则提示会影响性能。
1
2
3
4
5
6
7
8
9
10
|
// Check that the port is on the same NUMA node as the polling thread for best performance
RTE_ETH_FOREACH_DEV(portid)
{
if (rte_eth_dev_socket_id(portid) >= 0 && rte_eth_dev_socket_id(portid) != (int)rte_socket_id())
{
printf("[TASK_RECV], port %u is on remote NUMA node to polling thread.\n", portid);
printf("Performance will not be optimal.\n");
}
printf("Core %u receiving packets. [Ctrl+C to quit]\n", rte_lcore_id());
}
|
接收任务主循环中的核心逻辑与框架如下:程序持续运行,直到收到退出信号(如 Ctrl+C)将force_qui 置为 true,才会停止循环退出,而在每一次循环中,遍历系统中所有可用的以太网端口,执行接收数据包或其他的的操作。
1
2
3
4
5
6
7
8
9
|
// Main work of application loop
uint16_t portid;
while (!force_quit)
{
RTE_ETH_FOREACH_DEV(portid)
{
// Receive packets on every port
}
}
|
在每一次循环中,rte_eth_rx_burst()负责从指定端口portid的接收队列0中批量接收数据包,一次性最多接收BURST_SIZE个,而接收到的数据包指针放入bufs,然后程序就可以通过bufs访问、处理这些数据包。rte_eth_rx_burst()的返回值标识本次接收到的数据包数量。
if (unlikely(nb_rx == 0))
中的unlikely是性能优化标识,告诉编译器该条件很少发生,如果本次没有接收到任何数据包,则跳过本次循环,继续下一轮接收。
1
2
3
4
5
6
7
8
|
// Get burst of RX packets
struct rte_mbuf *bufs[BURST_SIZE];
const uint16_t nb_rx = rte_eth_rx_burst(portid, 0, bufs, BURST_SIZE);
if (unlikely(nb_rx == 0))
{
continue;
}
|
遍历刚刚接收到的nb_rx个数据包,对于每一个数据包调用parse_and_print_udp_packet()函数进行进一步的处理,处理完成后调用rte_pktmbuf_free()将mbuf归还给内存池,释放占用的内存缓存区。
1
2
3
4
5
6
7
8
|
for (int i = 0; i < nb_rx; i++)
{
struct rte_mbuf *mbuf = bufs[i];
// Call parsing function
parse_and_print_udp_packet(mbuf);
// Free the mbuf after processing
rte_pktmbuf_free(mbuf);
}
|
数据包解析
rte_pktmbuf_mtod()函数用于将mbuf的数据指针转换为指定类型的结构体指针,这里将mbuf的首地址强制转换为rte_ether_hdr结构体指针类型,所以eth_hdr现在指向该数据包的以太网帧头。进而读取以太网类型字段ether_type,判断其是否携带的IPv4,若不是IPv4,就直接返回,不再继续处理该数据包。
需要注意的是,ether_type在数据包中是网络序,因此需要将匹配值RTE_ETHER_TYPE_IPV4(0x0800)通过rte_cpu_to_be_16()函数(16表示是2字节的转换)将主机字节序转换为网络字节序。类似的还有rte_cpu_to_be_32()函数、rte_cpu_to_be_64()函数分别将32位整数、64位整数从主机序转为网络序,rte_be_to_cpu_16()函数、rte_be_to_cpu_32()函数、rte_be_to_cpu_64()函数分别将16位整数、32位整数、64位整数从网络序转为主机序。
1
2
3
4
5
6
7
8
|
// Extract Ethernet header
struct rte_ether_hdr *eth_hdr = rte_pktmbuf_mtod(mbuf, struct rte_ether_hdr *);
// Check if it's an IPv4 packet
if (eth_hdr->ether_type != rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4))
{
return;
}
|
eth_hdr + 1
表示跳过以太网头部,并同样通过指针类型强制转换,将指针ipv4_hdr指向mbuf中的IPv4头部地址。进而读取表示IPv4数据包中指示上层协议类型的next_proto_id字段,若不为IPPROTO_UDP(值为17),则表示不是UDP包,就跳过处理。
由于这里的next_proto_id是单字节字段,不涉及大小端问题,因此无需进行字节序的转换。
1
2
3
4
5
6
7
8
|
// Extract IPv4 header
struct rte_ipv4_hdr *ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
// Check if it's a UDP packet
if (ipv4_hdr->next_proto_id != IPPROTO_UDP)
{
return;
}
|
同样的跳过IPv4头部,将指针udp_hdr指向UDP头部的起始位置。需要注意的是IPv4的选项字段是可变的,因此在实际的场景中,这种简单的指针移动方式是不安全的。
1
2
|
// Extract UDP header
struct rte_udp_hdr *udp_hdr = (struct rte_udp_hdr *)((uint8_t *)ipv4_hdr + sizeof(struct rte_ipv4_hdr));
|
读取源IP地址、目的IP地址、源UDP端口号、目的UDP端口号,同样根据字段的大小使用对应的字节序转换函数进行转换。
1
2
3
4
5
|
// Convert addresses and ports to host byte order
uint32_t src_ip = rte_be_to_cpu_32(ipv4_hdr->src_addr);
uint32_t dst_ip = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
uint16_t src_port = rte_be_to_cpu_16(udp_hdr->src_port);
uint16_t dst_port = rte_be_to_cpu_16(udp_hdr->dst_port);
|
UDP头部后携带的即为实际负载,该负载的长度可通过UDP报文的总长度字段dgram_len减去UDP头的固定长度得到。
1
2
3
|
// Calculate UDP payload start and length
char *payload = (char *)(udp_hdr + 1);
uint16_t udp_payload_len = rte_be_to_cpu_16(udp_hdr->dgram_len) - sizeof(struct rte_udp_hdr);
|
上述解析得到的IP地址是采用数值形式存储的,为了方便打印出来,可利用表示IPv4地址的标准结构体in_addr,将IP地址赋值给其中的s_addr变量,进而通过inet_ntoa()函数将IPv4地址从数值形式转换成点分十进制字符串。随后即可打印出数据包的具体信息。
需要注意的是,打印负载字符串使用的是fwrite()函数,而非printf()函数,这是因为printf()在读取到“\0”会提前终止,导致输出结果不完整甚至错误。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// Convert IP addresses to readable format
struct in_addr src_in_addr = {.s_addr = src_ip};
struct in_addr dst_in_addr = {.s_addr = dst_ip};
// Print parsed information
printf("Received UDP packet:\n");
printf(" Source IP: %s\n", inet_ntoa(src_in_addr));
printf(" Destination IP: %s\n", inet_ntoa(dst_in_addr));
printf(" Source Port: %u\n", src_port);
printf(" Destination Port: %u\n", dst_port);
printf(" Payload (%u bytes): ", udp_payload_len);
fwrite(payload, 1, udp_payload_len, stdout);
printf("\n");
|
程序测试
接收端编译运行该程序。
1
2
3
4
|
sudo make clean
sudo make
cd build
sudo ./recv
|
发送端采用Scapy构造数据包发送,发送程序参见附录的完整代码send_udp_packet.py,需要预先安装Scapy,随后运行脚本。
1
2
|
sudo pip install -i https://pypi.tuna.tsinghua.edu.cn/simple scapy
sudo python3 send_udp_packet.py
|


完整源码
main.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
// Include and define
#include <rte_eal.h>
#include <signal.h>
#include <rte_cycles.h>
#include <rte_lcore.h>
#include "device_init.h"
#include "task_recv.h"
// Signal handler for exiting
volatile bool force_quit = false;
static void signal_handler(int signum)
{
if (signum == SIGINT || signum == SIGTERM)
{
printf("\n\nSignal %d received, preparing to exit...\n", signum);
force_quit = true;
}
}
// The main function, which does initialization and calls the per-lcore functions
int main(int argc, char *argv[])
{
// Initialize the Environment Abstraction Layer (EAL)
int ret = rte_eal_init(argc, argv);
if (ret < 0)
{
rte_exit(EXIT_FAILURE, "Invalid EAL arguments\n");
}
argc -= ret;
argv += ret;
// Register signal handlers
force_quit = false;
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
// Initialize device and ports
device_init();
printf("**************************************************************************************\n");
// Call lcore_main on the main core only. Called on single lcore
lcore_recv_main();
// Clean up the EAL
rte_eal_cleanup();
printf("Bye...\n");
// Return 0 to indicate successful execution
return 0;
}
|
device_init.h
1
2
3
4
5
6
7
8
9
10
11
12
13
|
#ifndef _DEVICE_INIT_H
#define _DEVICE_INIT_H
#include <rte_cycles.h>
#include <rte_mempool.h>
#include <rte_malloc.h>
#include <rte_ethdev.h>
#include <rte_mbuf.h>
// Init device and ports
void device_init();
#endif /* _DEVICE_INIT_H */
|
device_init.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
#include "device_init.h"
#define RX_RING_SIZE 1024
#define TX_RING_SIZE 1024
#define NUM_MBUFS 8191
#define MBUF_CACHE_SIZE 250
// Main functional part of port initialization
bool port_init(uint16_t portid, struct rte_mempool *mbuf_pool)
{
struct rte_eth_conf port_conf;
struct rte_eth_txconf txconf;
struct rte_eth_dev_info dev_info;
int ret;
// Initializing port
printf("[DEV_INIT] Initializing port %u...\n", portid);
// Check whether it is a valid and available port
if (!rte_eth_dev_is_valid_port(portid))
{
printf("[DEV_INIT] Inalid port %u\n", portid);
return false;
}
memset(&port_conf, 0, sizeof(struct rte_eth_conf));
// Try to get the divce according to the given number of port
ret = rte_eth_dev_info_get(portid, &dev_info);
if (ret != 0)
{
printf("[DEV_INIT] Error during getting device (port %u) info: %s\n", portid, strerror(-ret));
return false;
}
printf("[DEV_INIT] Port %u, Driver name: %s\n", portid, dev_info.driver_name);
// Turn on DEV_TX_OFFLOAD_MBUF_FAST_FREE if the device support
// MBUF_FAST_FREE enables MBUF to be quickly released after packet is sent successfully
if (dev_info.tx_offload_capa & RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE)
{
port_conf.txmode.offloads |= RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE;
}
// Configure the number of queues for a port
const uint16_t rx_rings = 1, tx_rings = 1;
ret = rte_eth_dev_configure(portid, rx_rings, tx_rings, &port_conf);
if (ret != 0)
{
printf("[DEV_INIT] Cannot configure device: err=%d, port=%u\n", ret, portid);
return false;
}
// Check whether the size of the TX and RX ring exceeds the limit of the network card
// If the value exceeds, function adjust the size to the boundary max value
uint16_t nb_rxd = RX_RING_SIZE;
uint16_t nb_txd = TX_RING_SIZE;
ret = rte_eth_dev_adjust_nb_rx_tx_desc(portid, &nb_rxd, &nb_txd);
if (ret != 0)
{
printf("[DEV_INIT] Cannot adjust number of descriptors: err=%d, port=%u\n", ret, portid);
return false;
}
// Allocate and set up RX queue per Ethernet port
for (uint16_t q = 0; q < rx_rings; q++)
{
ret = rte_eth_rx_queue_setup(portid, q, nb_rxd,
rte_eth_dev_socket_id(portid), NULL, mbuf_pool);
if (ret < 0)
{
return false;
}
}
// Allocate and set up TX queue per Ethernet port
txconf = dev_info.default_txconf;
txconf.offloads = port_conf.txmode.offloads;
for (uint16_t q = 0; q < tx_rings; q++)
{
ret = rte_eth_tx_queue_setup(portid, q, nb_txd,
rte_eth_dev_socket_id(portid), &txconf);
if (ret < 0)
{
return false;
}
}
// Starting Ethernet port
ret = rte_eth_dev_start(portid);
if (ret < 0)
{
printf("[DEV_INIT] rte_eth_dev_start:err=%d, port=%u\n", ret, portid);
return false;
}
// Display the port MAC address
printf("[DEV_INIT] Done > Starting Ethernet port: ");
struct rte_ether_addr mac_addr;
ret = rte_eth_macaddr_get(portid, &mac_addr);
if (ret != 0)
{
return false;
}
printf("Port %u, MAC address: " RTE_ETHER_ADDR_PRT_FMT "\n", portid, RTE_ETHER_ADDR_BYTES(&mac_addr));
return true;
}
// Init device and ports
void device_init()
{
// Get the number of available ports
uint16_t nb_ports = rte_eth_dev_count_avail();
if (nb_ports == 0)
{
rte_exit(EXIT_FAILURE, "[DEV_INIT] No Ethernet ports - bye\n");
}
printf("[DEV_INIT] Number of available Ethernet ports: %d\n", nb_ports);
// Creates a new mempool in memory to hold the mbufs
struct rte_mempool *mbuf_pool = rte_pktmbuf_pool_create("mbuf_pool", NUM_MBUFS * nb_ports,
MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE,
rte_socket_id());
if (mbuf_pool == NULL)
{
rte_exit(EXIT_FAILURE, "[DEV_INIT] Cannot init mbuf pool\n");
}
// Initializing all ports
uint16_t portid;
RTE_ETH_FOREACH_DEV(portid)
{
if (port_init(portid, mbuf_pool) == false)
{
rte_exit(EXIT_FAILURE, "[DEV_INIT] Cannot init port %d\n", portid);
}
}
}
|
task_recv.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
#ifndef _TASK_RECV_H
#define _TASK_RECV_H
#include <stdio.h>
#include <stdint.h>
#include <inttypes.h>
#include <rte_common.h>
#include <rte_ethdev.h>
#include <rte_mbuf.h>
#include <rte_ip.h>
#include <rte_udp.h>
#include <arpa/inet.h>
void lcore_recv_main(void);
#endif /* _TASK_RECV_H */
|
task_recv.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
#include "task_recv.h"
#define BURST_SIZE 32
extern volatile bool force_quit;
void parse_and_print_udp_packet(struct rte_mbuf *mbuf)
{
// Extract Ethernet header
struct rte_ether_hdr *eth_hdr = rte_pktmbuf_mtod(mbuf, struct rte_ether_hdr *);
// Check if it's an IPv4 packet
if (eth_hdr->ether_type != rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4))
{
return;
}
// Extract IPv4 header
struct rte_ipv4_hdr *ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
// Check if it's a UDP packet
if (ipv4_hdr->next_proto_id != IPPROTO_UDP)
{
return;
}
// Extract UDP header
struct rte_udp_hdr *udp_hdr = (struct rte_udp_hdr *)((uint8_t *)ipv4_hdr + sizeof(struct rte_ipv4_hdr));
// Convert addresses and ports to host byte order
uint32_t src_ip = rte_be_to_cpu_32(ipv4_hdr->src_addr);
uint32_t dst_ip = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
uint16_t src_port = rte_be_to_cpu_16(udp_hdr->src_port);
uint16_t dst_port = rte_be_to_cpu_16(udp_hdr->dst_port);
// Calculate UDP payload start and length
char *payload = (char *)(udp_hdr + 1);
uint16_t udp_payload_len = rte_be_to_cpu_16(udp_hdr->dgram_len) - sizeof(struct rte_udp_hdr);
// Convert IP addresses to readable format
struct in_addr src_in_addr = {.s_addr = src_ip};
struct in_addr dst_in_addr = {.s_addr = dst_ip};
// Print parsed information
printf("Received UDP packet:\n");
printf(" Source IP: %s\n", inet_ntoa(src_in_addr));
printf(" Destination IP: %s\n", inet_ntoa(dst_in_addr));
printf(" Source Port: %u\n", src_port);
printf(" Destination Port: %u\n", dst_port);
printf(" Payload (%u bytes): ", udp_payload_len);
fwrite(payload, 1, udp_payload_len, stdout);
printf("\n");
}
// Basic forwarding application lcore
void lcore_recv_main(void)
{
uint16_t portid;
// Check that the port is on the same NUMA node as the polling thread for best performance
RTE_ETH_FOREACH_DEV(portid)
{
if (rte_eth_dev_socket_id(portid) >= 0 && rte_eth_dev_socket_id(portid) != (int)rte_socket_id())
{
printf("[TASK_RECV], port %u is on remote NUMA node to polling thread.\n", portid);
printf("Performance will not be optimal.\n");
}
printf("Core %u receiving packets. [Ctrl+C to quit]\n", rte_lcore_id());
}
// Main work of application loop
while (!force_quit)
{
// Receive packets on every port
RTE_ETH_FOREACH_DEV(portid)
{
// Get burst of RX packets
struct rte_mbuf *bufs[BURST_SIZE];
const uint16_t nb_rx = rte_eth_rx_burst(portid, 0, bufs, BURST_SIZE);
if (unlikely(nb_rx == 0))
{
continue;
}
for (int i = 0; i < nb_rx; i++)
{
struct rte_mbuf *mbuf = bufs[i];
// Call parsing function
parse_and_print_udp_packet(mbuf);
// Free the mbuf after processing
rte_pktmbuf_free(mbuf);
}
}
}
}
|
makefile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
# SPDX-License-Identifier: BSD-3-Clause
# Copyright(c) 2010-2014 Intel Corporation
# binary name
APP = recv
# all source are stored in SRCS-y
SRCS-y := main.c device_init.c task_recv.c
PKGCONF ?= pkg-config
# Build using pkg-config variables if possible
ifneq ($(shell $(PKGCONF) --exists libdpdk && echo 0),0)
$(error "no installation of DPDK found")
endif
all: shared
.PHONY: shared static
shared: build/$(APP)-shared
ln -sf $(APP)-shared build/$(APP)
static: build/$(APP)-static
ln -sf $(APP)-static build/$(APP)
PC_FILE := $(shell $(PKGCONF) --path libdpdk 2>/dev/null)
CFLAGS += -O3 $(shell $(PKGCONF) --cflags libdpdk)
LDFLAGS_SHARED = $(shell $(PKGCONF) --libs libdpdk)
LDFLAGS_STATIC = $(shell $(PKGCONF) --static --libs libdpdk)
ifeq ($(MAKECMDGOALS),static)
# check for broken pkg-config
ifeq ($(shell echo $(LDFLAGS_STATIC) | grep 'whole-archive.*l:lib.*no-whole-archive'),)
$(warning "pkg-config output list does not contain drivers between 'whole-archive'/'no-whole-archive' flags.")
$(error "Cannot generate statically-linked binaries with this version of pkg-config")
endif
endif
CFLAGS += -DALLOW_EXPERIMENTAL_API
build/$(APP)-shared: $(SRCS-y) Makefile $(PC_FILE) | build
$(CC) $(CFLAGS) $(SRCS-y) -o $@ $(LDFLAGS) $(LDFLAGS_SHARED)
build/$(APP)-static: $(SRCS-y) Makefile $(PC_FILE) | build
$(CC) $(CFLAGS) $(SRCS-y) -o $@ $(LDFLAGS) $(LDFLAGS_STATIC)
build:
@mkdir -p $@
.PHONY: clean
clean:
rm -f build/$(APP) build/$(APP)-static build/$(APP)-shared
test -d build && rmdir -p build || true
|
send_udp_packet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
from scapy.all import IP, UDP, Ether, sendp
import argparse
import time
def send_udp_loop(dst_ip, dst_port, src_ip, src_port, iface, base_payload):
count = 0
while True:
# Build dynamic payload
payload = f"{base_payload} {count}"
# Construct packet
pkt = Ether() / IP(src=src_ip, dst=dst_ip) / UDP(sport=src_port, dport=dst_port) / payload
# Send the packet
sendp(pkt, iface=iface, verbose=False)
print(f"[{count}] Packet sent: {src_ip}:{src_port} -> {dst_ip}:{dst_port}, Payload: '{payload}'")
count += 1
# Sleep 1 second between sends
time.sleep(1)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Continuously send UDP packets using Scapy")
parser.add_argument("--dst-ip", default="192.168.226.129", help="Destination IP address")
parser.add_argument("--dst-port", type=int, default=9000, help="Destination UDP port")
parser.add_argument("--src-ip", default="192.168.226.128", help="Source IP address")
parser.add_argument("--src-port", type=int, default=12345, help="Source UDP port")
parser.add_argument("--iface", default="ens37", help="Network interface to send on")
parser.add_argument("--payload", default="Hello from Scapy", help="Base payload string")
args = parser.parse_args()
send_udp_loop(
dst_ip=args.dst_ip,
dst_port=args.dst_port,
src_ip=args.src_ip,
src_port=args.src_port,
iface=args.iface,
base_payload=args.payload
)
|
源码打包下载
源码打包下载:dpdk_simple_recv.zip