Featured image of post P4示例程序-10 负载均衡与带内遥测

P4示例程序-10 负载均衡与带内遥测

交换机不仅基于ECMP实现负载均衡,还能根据运行时的队列拥塞清空主动规避拥塞路径,并将队列拥塞清空通报控制器。

例程介绍

基于ECMP的智能负载均衡和自适应拥塞探测方案:

  1. 基础功能:基于ECMP实现负载均衡;
  2. 带内遥测:收集队列的排队信息,将信息封装在数据包里传递,并返回回馈包;
  3. 动态绕路:当检测到路径上出现严重拥塞,就通知上图中的S1改变哈希的种子值,将流量通过其它路径传输。

流程如下:

  1. egress阶段检测到出端口的排队深度太大:克隆出一个反馈包,并recirculate到ingress阶段;
  2. ingress接收到反馈包:反馈包作为正常数据包进行转发;
  3. 首个交换机(例如上图中的交换机S1)接收到反馈包:调整ECMP哈希函数的种子值。

交换机程序

main函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
#include <core.p4>
#include <v1model.p4>

V1Switch(
    MyParser(),
    MyVerifyChecksum(),
    MyIngress(),
    MyEgress(),
    MyComputeChecksum(),
    MyDeparser()
) main;

定义头部

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
const bit<16> TYPE_IPV4 = 0x800;
// 标识是一个遥测包
const bit<16> TYPE_TELEMETRY = 0x7777;
// 标识是一个反馈包
const bit<16> TYPE_FEEDBACK = 0x7778;

// 标识下一个节点是主机
const bit<4>  TYPE_EGRESS_HOST = 1;
// 标识下一个节点是交换机
const bit<4>  TYPE_EGRESS_SWITCH = 2;

typedef bit<9>  egressSpec_t;
typedef bit<48> macAddr_t;
typedef bit<32> ip4Addr_t;

// 定义以太网头部字段
header ethernet_t {
    macAddr_t dstAddr;
    macAddr_t srcAddr;
    bit<16>   etherType;
}

// 定义遥测字段
header telemetry_t {
    // 记录沿路各出端口中最大的排队深度
    bit<16> enq_qdepth;
    // 备份原始数据包中以太网帧的etherType字段
    bit<16> nextHeaderType;
}

// 定义IPv4头部字段
header ipv4_t {
    bit<4>    version;
    bit<4>    ihl;
    bit<6>    dscp;
    bit<2>    ecn;
    bit<16>   totalLen;
    bit<16>   identification;
    bit<3>    flags;
    bit<13>   fragOffset;
    bit<8>    ttl;
    bit<8>    protocol;
    bit<16>   hdrChecksum;
    ip4Addr_t srcAddr;
    ip4Addr_t dstAddr;
}

// 定义TCP头部字段
header tcp_t{
    bit<16> srcPort;
    bit<16> dstPort;
    bit<32> seqNo;
    bit<32> ackNo;
    bit<4>  dataOffset;
    bit<4>  res;
    bit<1>  cwr;
    bit<1>  ece;
    bit<1>  urg;
    bit<1>  ack;
    bit<1>  psh;
    bit<1>  rst;
    bit<1>  syn;
    bit<1>  fin;
    bit<16> window;
    bit<16> checksum;
    bit<16> urgentPtr;
}

// 反馈包信息存储
struct feedback_t {
    // 猜测原作者想要在反馈包中封装出端口队列信息
    // 但没有开发实现
}

// 自定义元数据
struct metadata {
    bit<14> ecmp_hash;
    bit<14> ecmp_group_id;
    // 存储下一跳节点的类型
    bit<4>  egress_type;
    // 存储距离上一次发送探测包的时间戳
    bit<48> feedback_ts;
    // 存储在寄存器数组中索引的下标
    bit<12> feedback_register_index;
    // 当recirculate时,如果使用recirculate_preserving_field_list(0)
    // 就会把metadata中标记0的这个feedback保留
    @field_list(0) feedback_t feedback;
}

// 实例化数据包头部
struct headers {
    ethernet_t   ethernet;
    telemetry_t  telemetry;
    ipv4_t       ipv4;
    tcp_t        tcp;
}

数据包解析

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
parser MyParser(packet_in packet,
                out headers hdr,
                inout metadata meta,
                inout standard_metadata_t standard_metadata) {
    state start {
        transition parse_ethernet;
    }

    state parse_ethernet {
        packet.extract(hdr.ethernet);
        transition select(hdr.ethernet.etherType){
            TYPE_IPV4: parse_ipv4;
            // 如果是遥测包,需要解析遥测头部的各字段
            TYPE_TELEMETRY: parse_telemetry;
            // 如果是反馈包,当普通IPv4包处理
            TYPE_FEEDBACK: parse_ipv4;
            default: accept;
        }
    }

    state parse_telemetry {
        packet.extract(hdr.telemetry);
        transition select(hdr.telemetry.nextHeaderType){
            TYPE_IPV4: parse_ipv4;
            default: accept;
        }
    }
    
    state parse_ipv4 {
        packet.extract(hdr.ipv4);
        transition select(hdr.ipv4.protocol){
            6 : parse_tcp;
            default: accept;
        }
    }

    state parse_tcp {
        packet.extract(hdr.tcp);
        transition accept;
    }
}

检查校验和

1
2
3
control MyVerifyChecksum(inout headers hdr, inout metadata meta) {
    apply { }
}

入队列侧处理

在“P4示例程序-8 ECMP”中的代码上进行修改:

  1. 仍然对每个流进行哈希,但增加了Flowlet编号参与哈希;
  2. 每当检测到包和包之间有比较大的空隙,就会更新Flowlet编号,因此会导致哈希结果发生变化。
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#define REGISTER_SIZE 1024
#define REGISTER_WIDTH 32

#define PKT_INSTANCE_TYPE_NORMAL 0
#define PKT_INSTANCE_TYPE_INGRESS_CLONE 1
#define PKT_INSTANCE_TYPE_EGRESS_CLONE 2
#define PKT_INSTANCE_TYPE_COALESCED 3
#define PKT_INSTANCE_TYPE_INGRESS_RECIRC 4
#define PKT_INSTANCE_TYPE_REPLICATION 5
#define PKT_INSTANCE_TYPE_RESUBMIT 6

control MyIngress(inout headers hdr,
                  inout metadata meta,
                  inout standard_metadata_t standard_metadata) {
    // 为各流存储ECMP时的哈希种子,在链路拥塞时修改种子值,即可改变哈希结果
    register <bit<REGISTER_WIDTH>>(REGISTER_SIZE) loadbalance_seed;

    action drop() {
        mark_to_drop(standard_metadata);
    }
    
    // 设置下一跳类型是主机还是交换机
    action set_egress_type (bit<4> egress_type){
        // 将下一跳类型存入自定义元数据中
        meta.egress_type = egress_type;
    }

    // 匹配表存储各出端口对应的下一跳节点类型
    table egress_type {
        key = {
            standard_metadata.egress_spec: exact;
        }
        actions = {
            set_egress_type;
            NoAction;
        }
        size=64;
        default_action = NoAction;
    }
    
    // 在链路拥塞时触发调整种子值,改变ECMP哈希结果
    action update_flow_seed(){
        // 生成值介于0~1234567的随机数值作为新的种子
        bit<32> seed;
        random(seed, (bit<32>)0, (bit<32>)1234567);
        
        // 计算该流在寄存器数组中的位置
        bit<12> register_index;
        hash(register_index,
             HashAlgorithm.crc16,
             (bit<1>)0,
             { hdr.ipv4.dstAddr,
              hdr.ipv4.srcAddr,
              hdr.tcp.srcPort,
              hdr.tcp.dstPort,
              hdr.ipv4.protocol},
             (bit<12>)REGISTER_SIZE);
        
        // 在寄存器的正确位置写入新的种子值
        loadbalance_seed.write((bit<32>)register_index, seed);
    }

    action ecmp_group(bit<14> ecmp_group_id, bit<16> num_nhops) {
        // 计算该流在寄存器数组中的位置
        bit<12> register_index;
        hash(register_index,
             HashAlgorithm.crc16,
             (bit<1>)0,
             { hdr.ipv4.srcAddr,
              hdr.ipv4.dstAddr,
              hdr.tcp.srcPort,
              hdr.tcp.dstPort,
              hdr.ipv4.protocol},
             (bit<12>)REGISTER_SIZE);
        
        // 读取新的种子值
        bit<32> seed;
        loadbalance_seed.read(seed, (bit<32>)register_index);
        
        // 通过五元组+种子值计算哈希值,存储到自定义元数据中
        hash(meta.ecmp_hash,
             HashAlgorithm.crc16,
             (bit<1>)0,
             { hdr.ipv4.srcAddr,
              hdr.ipv4.dstAddr,
              hdr.tcp.srcPort,
              hdr.tcp.dstPort,
              hdr.ipv4.protocol,
              seed},
             num_nhops);
        // 存储ECMP组合到自定义元数据中
	    meta.ecmp_group_id = ecmp_group_id;
    }

    action set_nhop(macAddr_t dstAddr, egressSpec_t port) {
        hdr.ethernet.srcAddr = hdr.ethernet.dstAddr;
        hdr.ethernet.dstAddr = dstAddr;
        hdr.ipv4.ttl = hdr.ipv4.ttl - 1;
        
        standard_metadata.egress_spec = port;  
    }

    table ecmp_group_to_nhop {
        key = {
            meta.ecmp_group_id: exact;
            meta.ecmp_hash: exact;
        }
        actions = {
            drop;
            set_nhop;
        }
        size = 1024;
    }

    table ipv4_lpm {
        key = {
            hdr.ipv4.dstAddr: lpm;
        }
        actions = {
            set_nhop;
            ecmp_group;
            drop;
        }
        size = 1024;
        default_action = drop;
    }

    apply {
        // 从标准元数据中查看数据包类型,如果是recirculate过来的包,则为需要发送的反馈包
        if (standard_metadata.instance_type == PKT_INSTANCE_TYPE_INGRESS_RECIRC){
            // 反馈包要发送回源节点,因此交换源IP地址与目的IP地址
            bit<32> src_ip = hdr.ipv4.srcAddr;
            hdr.ipv4.srcAddr = hdr.ipv4.dstAddr;
            hdr.ipv4.dstAddr = src_ip;
            // 标记该数据包为反馈包
            hdr.ethernet.etherType = TYPE_FEEDBACK;
        }

        // ECMP相关的路由转发
        if (hdr.ipv4.isValid() && hdr.ipv4.ttl > 1){
            switch (ipv4_lpm.apply().action_run){
                ecmp_group: {
                    ecmp_group_to_nhop.apply();
                }
            }
        }
        
        // 在匹配表中查询下一跳节点的类型
        egress_type.apply();

        // 反馈包沿着路径反向传播
        // 反馈包最后要作用到路径上的第一台交换机,因为ECMP选路是第一台交换机进行的
        // 第一个条件:反馈包是正常数据包,不是egress克隆回来的数据包
        // 第二个条件:该数据包是一个反馈包
        // 第三个条件:下一跳节点是主机,表示是到达了路径上的第一台交换机
        if (standard_metadata.instance_type == PKT_INSTANCE_TYPE_NORMAL && hdr.ethernet.etherType == TYPE_FEEDBACK && meta.egress_type == TYPE_EGRESS_HOST){
            // 更新其ECMP哈希函数种子值
            update_flow_seed();
            // 完成作用,将该反馈包丢包
            drop();
        }
        
        // 克隆出来的反馈包类型为PKT_INSTANCE_TYPE_EGRESS_CLONE
        // 在ingress阶段不做处理,将自动到达随后的egress阶段
        // 其实克隆出来的数据包直接在ingress阶段即可直接处理、发送
        // 猜测原作者是想展示clone和recircle的用法
    }
}

出队列侧处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
control MyEgress(inout headers hdr,
                 inout metadata meta,
                 inout standard_metadata_t standard_metadata) {
    // 寄存器数组记录每个流上一次发送反馈包的时间,避免其频繁反馈
    register <bit<48>>(REGISTER_SIZE) feedback_ts;

    // 读取上一次发送反馈包的时间戳
    action read_feedback_ts(){
        // 确定读取寄存器数组的索引
        hash(meta.feedback_register_index,
             HashAlgorithm.crc16,
             (bit<1>)0,
             { hdr.ipv4.srcAddr,
              hdr.ipv4.dstAddr,
              hdr.tcp.srcPort,
              hdr.tcp.dstPort,
              hdr.ipv4.protocol},
             (bit<12>)REGISTER_SIZE);
        // 读取寄存器数组索引位置的值
        feedback_ts.read(meta.feedback_ts, (bit<32>)meta.feedback_register_index);
    }

    apply {
        // 克隆出来的反馈包到了egress阶段,需要送回ingress阶段处理
        if (standard_metadata.instance_type == PKT_INSTANCE_TYPE_EGRESS_CLONE) {
            // 送回ingress阶段处理,元数据中保持有@field_list(0)标记的字段不变
            recirculate_preserving_field_list(0);
        }
        // 对于正常传输的数据包
        else if (standard_metadata.instance_type == PKT_INSTANCE_TYPE_NORMAL && hdr.ethernet.etherType != TYPE_FEEDBACK) {
            if (hdr.tcp.isValid()){
                // 如果该正常的数据包具有遥测字段
                if (hdr.telemetry.isValid()) {
                    // 如果下一跳节点是交换机,且当前的出队列排队深度更大,则需要更新遥测字段
                    if (hdr.telemetry.enq_qdepth < (bit<16>)standard_metadata.enq_qdepth && meta.egress_type == TYPE_EGRESS_SWITCH) {
                        // 更新遥测字段为当前出队列的排队深度,因此该字段仅记录最大排队深度
                        hdr.telemetry.enq_qdepth = (bit<16>)standard_metadata.enq_qdepth;
                    }
                    // 如果下一跳节点是主机,则恢复为原始数据包类型
                    else if (meta.egress_type == TYPE_EGRESS_HOST){
                        // 恢复以太网帧类型为IPv4
                        hdr.ethernet.etherType = TYPE_IPV4;
                        // 移除遥测字段
                        hdr.telemetry.setInvalid();

                        // 如果记录的排队深度超过了设置的阈值
                        if (hdr.telemetry.enq_qdepth > 50) {
                            // 读取上一次发送反馈包的时间戳
                            read_feedback_ts();
                            // 随机生成一个值,用于确定是否需要发送反馈包
                            bit<48> backoff;
                            random(backoff, 48w500000, 48w1000000);
                            // 如果时间间隔超过了该随机值,则需要发送反馈包
                            if ((standard_metadata.ingress_global_timestamp - meta.feedback_ts) > backoff) {
                                // 更新发送反馈包的时间戳
                                feedback_ts.write((bit<32>)meta.feedback_register_index, standard_metadata.ingress_global_timestamp);
                                // 使用随机数判断是否真的需要发送反馈包,概率为1/4
                                bit<8> probability;
                                random(probability, 8w0, 8w3);
                                if (probability == 0) {
                                    // 在ingress阶段克隆出一个新的数据包
                                    clone(CloneType.E2E, 100);
                                }
                             }
                        }
                    }
                }
                else {
                    // 如果该正常的数据包没有遥测字段
                    if (meta.egress_type == TYPE_EGRESS_SWITCH){
                        // 为其启用遥测字段
                        hdr.telemetry.setValid();
                        // 填写当前出队列的数据包堆积深度
                        hdr.telemetry.enq_qdepth = (bit<16>)standard_metadata.enq_qdepth;
                        // 修改以太网帧类型为遥测类型
                        hdr.ethernet.etherType = TYPE_TELEMETRY;
                        // 备份原始数据包的网络层类型
                        hdr.telemetry.nextHeaderType = TYPE_IPV4;
                    }
                }
            }
        }
    }
}

计算校验值

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
control MyComputeChecksum(inout headers  hdr, inout metadata meta) {
    apply {
        // 由于修改了IPv4中的字段(包括TTL),因此需要重新计算校验和
        update_checksum(
                hdr.ipv4.isValid(),
                { hdr.ipv4.version,
                hdr.ipv4.ihl,
                hdr.ipv4.diffserv,
                hdr.ipv4.totalLen,
                hdr.ipv4.identification,
                hdr.ipv4.flags,
                hdr.ipv4.fragOffset,
                hdr.ipv4.ttl,
                hdr.ipv4.protocol,
                hdr.ipv4.srcAddr,
                hdr.ipv4.dstAddr },
                hdr.ipv4.hdrChecksum,
                HashAlgorithm.csum16);
    }
}

封装数据包

1
2
3
4
5
6
7
8
control MyDeparser(packet_out packet, in headers hdr) {
    apply {
        packet.emit(hdr.ethernet);
        packet.emit(hdr.telemetry);
        packet.emit(hdr.ipv4);
        packet.emit(hdr.tcp);
    }
}
Licensed under CC BY-NC-SA 4.0
皖ICP备2025083746号-1
公安备案 陕公网安备61019002003315号



使用 Hugo 构建
主题 StackJimmy 设计