Featured image of post DPDK单核数据包发送案例

DPDK单核数据包发送案例

以DPDK官方例程skeleton、basicfwd为基础,编写数据包构造与发送的应用程序,该应用程序运行在主线程上

初始化等操作

环境、内存池、端口初始化等操作均与“DPDK单核数据包接收案例”一致,可参考:DPDK单核数据包接收案例

启动发送主线程

与接收一致,需要检查当前网络端口和所在线程是否位于同一个NUMA Socket。

1
2
3
4
5
6
7
// Check that the port is on the same NUMA node as the polling thread for best performance
if (rte_eth_dev_socket_id(portid) >= 0 && rte_eth_dev_socket_id(portid) != (int)rte_socket_id())
{
    printf("[TASK_SEND], port %u is on remote NUMA node to polling thread.\n", portid);
    printf("Performance will not be optimal.\n");
}
printf("Core %u receiving packets. [Ctrl+C to quit]\n", rte_lcore_id());

在后续构造数据包时,需要将其放置到内存池中。因此,可通过rte_mempool_lookup()函数获取在初始化阶段创建的指定内存池,该函数的参数为所需内存池的名称。内存池在内部以类似链表的形式组织,查找过程中,函数会依次遍历所有已注册的内存池,并比较名称,若匹配成功则返回对应的内存池对象。

1
2
3
4
5
6
7
// Get mempool
struct rte_mempool *mempool = rte_mempool_lookup("mbuf_pool");
if (mempool == NULL)
{
    printf("[TASK_SEND] Mempool is not found!\n");
    return;
}

随后可准备以太网帧所需的MAC地址信息,其中的源MAC地址可借助rte_eth_macaddr_get()函数获取,并填充到rte_ether_addr结构体中,而目的MAC地址可手动填写。

1
2
3
4
5
6
7
8
// L2 Address
struct rte_ether_addr src_mac;
if (rte_eth_macaddr_get(portid, &src_mac) != 0)
{
    printf("[TASK_SEND] Failed to get source MAC address\n");
    return;
}
struct rte_ether_addr dst_mac = {{0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb}};

随后准备网络层、传输层所需的IP地址、端口号、负载

1
2
3
4
5
6
7
8
// L3 Address
uint32_t src_ip = inet_addr("192.168.226.128");
uint32_t dst_ip = inet_addr("192.168.226.129");

// L4 Port and Payload
uint16_t dst_port = 9000;
uint16_t src_port = 12345;
char *base_payload = "Hello from DPDK";

主循环持续运行直至force_quit标志被置为true。每次循环调用rte_pktmbuf_alloc()从内存池mempool中分配一个数据包缓冲区mbuf。随后调用build_and_send_packet()函数,传入端口编号、缓冲区、源和目的MAC地址、源和目的IP地址、UDP端口号以及负载,完成数据包的构建和发送。每次发送后计数器递增,循环中通过 rte_delay_us(1000000) 进行1秒的延迟,控制发送频率。

需要注意的是,rte_delay_us()是一个忙等待(busy-wait)函数,会持续占用CPU资源进行精确的微秒级延迟,适合短时间、低延迟、高精度的等待。而标准的sleep()函数则会将线程挂起,释放CPU给其他进程使用,适合较长时间的延迟,但精度较低且响应延迟较大。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Main work of application loop
int count = 0;
while (!force_quit)
{
    // Generate payload
    char payload[128];
    snprintf(payload, sizeof(payload), "%s %d", base_payload, count);

    // Call building function
    struct rte_mbuf *mbuf = rte_pktmbuf_alloc(mempool);
    if (!mbuf)
    {
        printf("[TASK_SEND] Failed to allocate mbuf\n");
        return;
    }
    build_and_send_packet(portid,
                        mbuf,
                        &src_mac,
                        &dst_mac,
                        src_ip,
                        dst_ip,
                        src_port,
                        dst_port,
                        payload);
    count++;
    rte_delay_us(1000000);
}

数据包构造与发送

build_and_send_packet()函数接收端口编号、缓冲区、源和目的MAC地址、源和目的IP地址、UDP端口号以及负载,以完成数据包的构建和发送。

1
2
3
4
5
6
7
void build_and_send_packet(uint16_t portid,
                           struct rte_mbuf *mbuf,
                           struct rte_ether_addr *src_mac,
                           struct rte_ether_addr *dst_mac,
                           uint32_t src_ip, uint32_t dst_ip,
                           uint16_t src_port, uint16_t dst_port,
                           const char *payload)

首先计算数据包的总长度,包含以太网头、IPv4头、UDP头以及负载的长度。然后调用rte_pktmbuf_append()函数在分配好的缓冲区mbuf中为数据包分配所需空间,返回指向数据区的指针。

需要注意的是rte_pktmbuf_append()并不分配新的内存,而是在已有缓存区mbuf中调整有效数据的长度和位置,类似于指针操作。具体而言,该函数会检查剩余空间是否足够,如果足够,则更新mbuf的data_len(指示当前mbuf中实际包含的数据长度)和pkt_len(指示整个数据包的总长度,因为一个数据包可能会跨多个mbuf进行存储)属性字段,并返回指向新增数据区起始地址的指针,供上层代码写入数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// Calculate the size of the packet
uint16_t payload_len = strlen(payload);
uint16_t pkt_len = sizeof(struct rte_ether_hdr) +
                   sizeof(struct rte_ipv4_hdr) +
                   sizeof(struct rte_udp_hdr) +
                   payload_len;

// Allocate mbuf space
char *pkt_data = rte_pktmbuf_append(mbuf, pkt_len);
if (pkt_data == NULL)
{
    printf("[TASK_SEND] Failed to append data to mbuf\n");
    rte_pktmbuf_free(mbuf);
    return;
}

在一块连续的内存区域pkt_data中,为以太网、IP、UDP头部及负载数据分别设置指针,便于逐层构造网络包的各个协议头。

1
2
3
4
5
// Prepare the pointer in advance
struct rte_ether_hdr *eth_hdr = (struct rte_ether_hdr *)pkt_data;
struct rte_ipv4_hdr *ip_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
struct rte_udp_hdr *udp_hdr = (struct rte_udp_hdr *)(ip_hdr + 1);
char *data_ptr = (char *)(udp_hdr + 1);

填充以太网帧头部:使用rte_ether_addr_copy()函数将MAC地址复制到以太网头部中的相应字段中,并设置以太网帧的类型字段,以表示该帧封装的是 IPv4 协议的数据,由于类型字段是2字节大小,需使用rte_cpu_to_be_16()函数转换数值为网络字节序。

1
2
3
4
// Filled Ethernet Header
rte_ether_addr_copy(src_mac, &eth_hdr->src_addr);
rte_ether_addr_copy(dst_mac, &eth_hdr->dst_addr);
eth_hdr->ether_type = rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4);

填充IPv4头部:

  1. version_ihl是组合字段,前4位是版本(IPv4是4),后4位是头部长度(单位是4字节,这里是5,表示20字节);
  2. type_of_service 是服务类型字段,通常设为0表示普通服务,无特殊优先级;
  3. total_length设置整个IP报文(IP头 + UDP头 + 负载)的总长度;
  4. packet_id是报文ID,通常用于分片标识,本示例中可简单设为0;
  5. fragment_offset是分片偏移和标志位,0表示不分片;
  6. time_to_live是TTL生存时间,表示报文可经过的最大路由跳数,通常设为64;
  7. next_proto_id是协议字段,填写17(IPPROTO_UDP)表示该IP报文封装的是UDP协议;
  8. src_addr和dst_addr填写源IP和目标IP地址,填写经过inet_addr()函数转换后的32位整数;
  9. hdr_checksum是校验和字段,可通过rte_ipv4_cksum()函数计算,但需要在计算前必须将校验和字段清零,这是因为校验和是对整个IP头部进行的一系列加法和反码运算,然而IP头本身包含一个校验和字段,如果不先将它清零,计算时这个旧值就会被错误地纳入计算,导致最终的校验和错误。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// Filled IP Header
ip_hdr->version_ihl = 0x45;
ip_hdr->type_of_service = 0;
ip_hdr->total_length = rte_cpu_to_be_16(sizeof(struct rte_ipv4_hdr) +
                                        sizeof(struct rte_udp_hdr) +
                                        payload_len);
ip_hdr->packet_id = rte_cpu_to_be_16(0);
ip_hdr->fragment_offset = 0;
ip_hdr->time_to_live = 64;
ip_hdr->next_proto_id = IPPROTO_UDP;
ip_hdr->hdr_checksum = 0;
ip_hdr->src_addr = src_ip;
ip_hdr->dst_addr = dst_ip;
ip_hdr->hdr_checksum = rte_ipv4_cksum(ip_hdr);

填充UDP头部:包括设置源端口号、目标端口号、UDP报文长度(UDP头部 + 负载的总字节数),UDP校验和字段可采用rte_ipv4_udptcp_cksum()函数计算得到,但同样需要提前预设为0。

1
2
3
4
5
6
// Filled UDP Header
udp_hdr->src_port = rte_cpu_to_be_16(src_port);
udp_hdr->dst_port = rte_cpu_to_be_16(dst_port);
udp_hdr->dgram_len = rte_cpu_to_be_16(sizeof(struct rte_udp_hdr) + payload_len);
udp_hdr->dgram_cksum = 0;
udp_hdr->dgram_cksum = rte_ipv4_udptcp_cksum(ip_hdr, udp_hdr);

将用户定义的UDP负载数据复制到数据包缓冲区中对应的位置,完成UD 数据部分的填充。

1
2
// Filled UDP Payload
rte_memcpy(data_ptr, payload, payload_len);

rte_eth_tx_burst()函数将数据包从指定网口发送出去,参数如下:

  1. portid:发送端口号;
  2. 0:发送队列号;
  3. &mbuf:指向要发送的rte_mbuf指针数组的地址;
  4. 1:表示发送一个数据包。

返回值nb_tx表示实际发送出去的数据包数量,根据返回值即可判断发送是否成功。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// Send the packet
uint16_t nb_tx = rte_eth_tx_burst(portid, 0, &mbuf, 1);
if (nb_tx != 1)
{
    printf("[TASK_SEND] Packet send failed\n");
    rte_pktmbuf_free(mbuf);
}
else
{
    struct in_addr src_addr, dst_addr;
    src_addr.s_addr = src_ip;
    dst_addr.s_addr = dst_ip;
    printf("Packet sent on port %u: %s:%u -> %s:%u, payload: '%s'\n",
           portid,
           inet_ntoa(src_addr), src_port,
           inet_ntoa(dst_addr), dst_port,
           payload);
}

程序测试

接收端采用Scapy解析数据包,接收程序参见附录的完整代码recv_udp_packet.py,需要预先安装Scapy,随后运行脚本。

1
2
sudo pip install -i https://pypi.tuna.tsinghua.edu.cn/simple scapy
sudo python3 recv_udp_packet.py

发送端编译运行该程序。

1
2
3
4
sudo make clean
sudo make
cd build
sudo ./recv

源码

task_send.c

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#include "task_send.h"

extern volatile bool force_quit;

void build_and_send_packet(uint16_t portid,
                           struct rte_mbuf *mbuf,
                           struct rte_ether_addr *src_mac,
                           struct rte_ether_addr *dst_mac,
                           uint32_t src_ip, uint32_t dst_ip,
                           uint16_t src_port, uint16_t dst_port,
                           const char *payload)
{
    // Calculate the size of the packet
    uint16_t payload_len = strlen(payload);
    uint16_t pkt_len = sizeof(struct rte_ether_hdr) +
                       sizeof(struct rte_ipv4_hdr) +
                       sizeof(struct rte_udp_hdr) +
                       payload_len;

    // Allocate mbuf space
    char *pkt_data = rte_pktmbuf_append(mbuf, pkt_len);
    if (pkt_data == NULL)
    {
        printf("[TASK_SEND] Failed to append data to mbuf\n");
        rte_pktmbuf_free(mbuf);
        return;
    }

    // Prepare the pointer in advance
    struct rte_ether_hdr *eth_hdr = (struct rte_ether_hdr *)pkt_data;
    struct rte_ipv4_hdr *ip_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
    struct rte_udp_hdr *udp_hdr = (struct rte_udp_hdr *)(ip_hdr + 1);
    char *data_ptr = (char *)(udp_hdr + 1);

    // Filled Ethernet Header
    rte_ether_addr_copy(src_mac, &eth_hdr->src_addr);
    rte_ether_addr_copy(dst_mac, &eth_hdr->dst_addr);
    eth_hdr->ether_type = rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4);

    // Filled IP Header
    ip_hdr->version_ihl = 0x45;
    ip_hdr->type_of_service = 0;
    ip_hdr->total_length = rte_cpu_to_be_16(sizeof(struct rte_ipv4_hdr) +
                                            sizeof(struct rte_udp_hdr) +
                                            payload_len);
    ip_hdr->packet_id = rte_cpu_to_be_16(0);
    ip_hdr->fragment_offset = 0;
    ip_hdr->time_to_live = 64;
    ip_hdr->next_proto_id = IPPROTO_UDP;
    ip_hdr->hdr_checksum = 0;
    ip_hdr->src_addr = src_ip;
    ip_hdr->dst_addr = dst_ip;
    ip_hdr->hdr_checksum = rte_ipv4_cksum(ip_hdr);

    // Filled UDP Header
    udp_hdr->src_port = rte_cpu_to_be_16(src_port);
    udp_hdr->dst_port = rte_cpu_to_be_16(dst_port);
    udp_hdr->dgram_len = rte_cpu_to_be_16(sizeof(struct rte_udp_hdr) + payload_len);
    udp_hdr->dgram_cksum = 0;
    udp_hdr->dgram_cksum = rte_ipv4_udptcp_cksum(ip_hdr, udp_hdr);

    // Filled UDP Payload
    rte_memcpy(data_ptr, payload, payload_len);

    // Send the packet
    uint16_t nb_tx = rte_eth_tx_burst(portid, 0, &mbuf, 1);
    if (nb_tx != 1)
    {
        printf("[TASK_SEND] Packet send failed\n");
        rte_pktmbuf_free(mbuf);
    }
    else
    {
        struct in_addr src_addr, dst_addr;
        src_addr.s_addr = src_ip;
        dst_addr.s_addr = dst_ip;
        printf("Packet sent on port %u: %s:%u -> %s:%u, payload: '%s'\n",
               portid,
               inet_ntoa(src_addr), src_port,
               inet_ntoa(dst_addr), dst_port,
               payload);
    }
}

// Basic forwarding application lcore
void lcore_send_main(uint16_t portid)
{
    // Check that the port is on the same NUMA node as the polling thread for best performance
    if (rte_eth_dev_socket_id(portid) >= 0 && rte_eth_dev_socket_id(portid) != (int)rte_socket_id())
    {
        printf("[TASK_SEND], port %u is on remote NUMA node to polling thread.\n", portid);
        printf("Performance will not be optimal.\n");
    }
    printf("Core %u receiving packets. [Ctrl+C to quit]\n", rte_lcore_id());

    // Get mempool
    struct rte_mempool *mempool = rte_mempool_lookup("mbuf_pool");
    if (mempool == NULL)
    {
        printf("[TASK_SEND] Mempool is not found!\n");
        return;
    }

    // L2 Address
    struct rte_ether_addr src_mac;
    if (rte_eth_macaddr_get(portid, &src_mac) != 0)
    {
        printf("[TASK_SEND] Failed to get source MAC address\n");
        return;
    }
    struct rte_ether_addr dst_mac = {{0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb}};

    // L3 Address
    uint32_t src_ip = inet_addr("192.168.226.128");
    uint32_t dst_ip = inet_addr("192.168.226.129");

    // L4 Port and Payload
    uint16_t dst_port = 9000;
    uint16_t src_port = 12345;
    char *base_payload = "Hello from DPDK";

    // Main work of application loop
    int count = 0;
    while (!force_quit)
    {
        // Generate payload
        char payload[128];
        snprintf(payload, sizeof(payload), "%s %d", base_payload, count);

        // Call building function
        struct rte_mbuf *mbuf = rte_pktmbuf_alloc(mempool);
        if (!mbuf)
        {
            printf("[TASK_SEND] Failed to allocate mbuf\n");
            return;
        }
        build_and_send_packet(portid, mbuf, &src_mac, &dst_mac, src_ip, dst_ip, src_port, dst_port, payload);
        count++;
        rte_delay_us(1000000);
    }
}

recv_udp_packet.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from scapy.all import sniff, UDP, IP

def packet_callback(pkt):
    if UDP in pkt and IP in pkt:
        ip_layer = pkt[IP]
        udp_layer = pkt[UDP]
        payload = bytes(udp_layer.payload).decode(errors='ignore')

        print(f"Received UDP packet: {ip_layer.src}:{udp_layer.sport} -> {ip_layer.dst}:{udp_layer.dport}")
        print(f"Payload: {payload}")

if __name__ == "__main__":
    sniff(filter="udp", prn=packet_callback, iface="ens37", store=0)

源码打包下载

源码打包下载:dpdk_simple_send.zip

Licensed under CC BY-NC-SA 4.0
皖ICP备2025083746号-1
公安备案 陕公网安备61019002003315号



使用 Hugo 构建
主题 StackJimmy 设计