Featured image of post P4示例程序-5 L2自学习交换机

P4示例程序-5 L2自学习交换机

在L2交换机的基础上,引入控制平面使得交换机能够自学习到转发表、而无需预先配置,类似于实现经典SDN中的Packet-In消息与Packet-Out消息。

背景描述

在先前的L2交换机中,转发表项需要预先手动配置。为了使得交换机具有自主学习转发表的能力,需要引入控制平面。当有未知数据包到达交换机时,交换机需要将信息传递给控制平面,由控制平面计算、添加转发表。因此本文的核心是P4与控制器的通信,常见的方式有Clone Packets与Digest:

  1. Clone Packets:在数据平面流水线里,遇到特定条件时,用clone动作把原始数据包本身(或经过修改的数据包)复制一份,发送到控制器,控制器收到的是一个完整的数据包;
  2. Packets与Digest:在数据平面里,用digest动作可以把自定义的少量元数据信息打包、发送给控制器,因此控制器收到的是一个摘要(例如ingress_port、src_mac等)。

拓扑描述文件

JSON格式

在此前版本的基础上为交换机s1增加cpu_port属性,填写true使得交换机增加用于与控制器通信的端口。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{
  "p4_src": "p4src/l2_learning.p4",
  "cli": true,
  "pcap_dump": true,
  "enable_log": true,
  "topology": {
    "assignment_strategy": "l2",
    "default":{
      "auto_arp_tables": false
    },    
    "links": [["h1", "s1"], ["h2", "s1"], ["h3", "s1"], ["h4","s1"]],
    "hosts": {
      "h1": { },
      "h2": { },
      "h3": { },
      "h4": { }
    },
    "switches": {
      "s1": {
        "cpu_port" : true
      }
    }
  }
}

Python脚本

在此前版本的基础上添加enableCpuPortAll函数即可为全部交换机增加用于与控制器通信的端口。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from p4utils.mininetlib.network_API import NetworkAPI

net = NetworkAPI()

# Network general options
net.setLogLevel('info')
net.disableArpTables()

# Network definition
net.addP4Switch('s1')
net.setP4Source('s1','./p4src/l2_learning.p4')
net.addHost('h1')
net.addHost('h2')
net.addHost('h3')
net.addHost('h4')
net.addLink('s1', 'h1')
net.addLink('s1', 'h2')
net.addLink('s1', 'h3')
net.addLink('s1', 'h4')

# Assignment strategy
net.l2()

# Nodes general options
net.enableCpuPortAll()
net.enablePcapDumpAll()
net.enableLogAll()
net.enableCli()
net.startNetwork()

Clone Packets

交换机程序·定义头部

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
const bit<16> L2_LEARN_ETHER_TYPE = 0x1234;

typedef bit<48> macAddr_t;

header ethernet_t {
    macAddr_t dstAddr;
    macAddr_t srcAddr;
    bit<16>   etherType;
}

// 为使得控制器能够获取源MAC地址与入端口信息,定义头部cpu_t
header cpu_t {
    bit<48> srcAddr;
    bit<16> ingress_port;
}

// 由于克隆出的数据包会失去全部标准元数据,因此使用自定义的元数据储存后续将会使用的信息
struct metadata {
    bit<9> ingress_port;
}

struct headers {
    ethernet_t   ethernet;
    cpu_t        cpu;
}

交换机程序·入队列侧处理

交换机解析、提取数据包的源MAC地址,并在匹配表smac中查找是否存在该MAC地址,若未命中则需要告知控制器该信息,让控制器学习到该信息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
control MyIngress(inout headers hdr,
                  inout metadata meta,
                  inout standard_metadata_t standard_metadata) {
    action drop() {
        mark_to_drop(standard_metadata);
    }

    action mac_learn() {
        // 由于克隆出的数据包会失去全部标准元数据,因此将入端口号备份进自定义的元数据中
        meta.ingress_port = standard_metadata.ingress_port;
        // CloneType.I2E表示克隆数据包后,直接送到Egree部分处理
        // 100为Session ID,例如控制器连在交换机的CPU Port 7上
        // 需要使用mirroring_add 100 7命令将Session 100与CPU Port 7绑定
        // 克隆的数据包将被因此发至CPU Port 7
        // meta是要一起克隆过去的自定义元数据
        clone3(CloneType.I2E, 100, meta);
    }

    // 该匹配表用于判断数据包的源MAC地址是否已被记录
    // 如果数据包的源MAC在匹配表中,则无需操作
    // 如果不在匹配表中,则需要克隆数据包并发送至控制器
    table smac {
        key = {
            hdr.ethernet.srcAddr: exact;
        }

        actions = {
            mac_learn;
            NoAction;
        }
        size = 256;
        // 默认动作为mac_learn,表示匹配失败后,克隆数据包并发送至控制器
        default_action = mac_learn;
    }

    action forward(bit<9> egress_port) {
        standard_metadata.egress_spec = egress_port;
    }

    table dmac {
        key = {
            hdr.ethernet.dstAddr: exact;
        }

        actions = {
            forward;
            NoAction;
        }
        size = 256;
        default_action = NoAction;
    }

    action set_mcast_grp(bit<16> mcast_grp) {
        standard_metadata.mcast_grp = mcast_grp;
    }

    table broadcast {
        key = {
            standard_metadata.ingress_port: exact;
        }

        actions = {
            set_mcast_grp;
            NoAction;
        }
        size = 256;
        default_action = NoAction;
    }

    apply {
        // 判断是否需要克隆数据包并发送至控制器
        smac.apply();
        // 执行数据包的查表转发
        if (dmac.apply().hit){
            
        }
        else {
            // 若无匹配的转发表项,则泛洪出去
            broadcast.apply();
        }
    }
}

交换机程序·出队列侧处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
control MyEgress(inout headers hdr,
                 inout metadata meta,
                 inout standard_metadata_t standard_metadata) {
    apply {
        // 当标准元数据的instance_type为1时,表示为克隆数据包(CloneType.I2E)
        if (standard_metadata.instance_type == 1){
            // 只有cpu头部被标记为valid时,该cpu头部在封装时才会被封装进数据包
            hdr.cpu.setValid();
            // 填写源MAC地址信息
            hdr.cpu.srcAddr = hdr.ethernet.srcAddr;
            // 填写数据包的入端口信息,注意此处需要类型转换,因为P4不会自动做隐式类型转换
            // meta.ingress_port的类型是bit<9>,而cpu头部为了字节对齐定义ingress_port为bit<16>
            hdr.cpu.ingress_port = (bit<16>)meta.ingress_port;
            // 修改以太网帧类型字段,使得控制器能够识别出该数据包为学习包
            hdr.ethernet.etherType = L2_LEARN_ETHER_TYPE;
            // 截断数据包,由于只需链路层和cpu头部,无需负载部分
            // P4流水线默认是保留原始包所有payload
            // 在流水线上的操作只是修改头部部分
            // 除非显式告诉硬件/软件丢弃payload的部分,P4会把原始payload封装起来
            truncate((bit<32>)22);
        }
    }
}

交换机程序·封装数据包

1
2
3
4
5
6
7
control MyDeparser(packet_out packet, in headers hdr) {
    apply {
        packet.emit(hdr.ethernet);
        // 只有进行了hdr.cpu.setValid()操作,才会把cpu头部封装进数据包
        packet.emit(hdr.cpu);
    }
}

控制器·初始化交换机

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from p4utils.utils.helper import load_topo
from p4utils.utils.sswitch_thrift_API import SimpleSwitchThriftAPI

def __init__(self, sw_name):
    # 网络启动后将自动生成topology.json文件,其中包含网络与节点信息
    self.topo = load_topo('topology.json')
    # 交换机名称(例如s1),根据名称进而可在topology.json文件中查找相关属性
    self.sw_name = sw_name
    # 获取其Thrift服务端端口号
    self.thrift_port = self.topo.get_thrift_port(sw_name)
    # 获取交换机用于向控制平面发送数据的CPU Port号
    self.cpu_port = self.topo.get_cpu_port_index(self.sw_name)
    # 实例化Thrift通信接口,进而可以控制交换机
    self.controller = SimpleSwitchThriftAPI(self.thrift_port)
    self.init()

def init(self):
    # 初始化交换机的状态,包括清空所有匹配表里的表项、清空所有计数器和寄存器的值、重置表默认动作为默认值等
    self.controller.reset_state()
    # 添加组播相关配置
    self.add_boadcast_groups()
    # 添加与克隆数据包至控制器相关的命令
    self.add_mirror()
    # 添加基于目的MAC的转发表
    self.fill_table_test()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def add_boadcast_groups(self):
    # 获取当前交换机(如s1)的接口名(如s1-eth0)到端口号(如1)的映射
    # 使用copy函数返回一个拷贝,防止后面修改影响原始数据
    interfaces_to_port = self.topo.get_node_intfs(fields=['port'])[self.sw_name].copy()
    # 删除掉loopback接口,因为泛洪无需泛洪到loopback接口
    interfaces_to_port.pop('lo', None)
    # 删除掉CPU Port,因为泛洪无需泛洪到控制器
    # get_cpu_port_intf函数用于获取当前交换机(如s1)的CPU Port名称(如s1-cpu)
    interfaces_to_port.pop(self.topo.get_cpu_port_intf(self.sw_name), None)

    mc_grp_id = 1
    rid = 0
    # 遍历当前交换机的全部接口的端口号
    for ingress_port in interfaces_to_port.values():
        # 由于泛洪需要泛洪到其它端口,需要排除当前端口号
        port_list = list(interfaces_to_port.values())
        del (port_list[port_list.index(ingress_port)])
        # 添加组播组mc_grp_id
        self.controller.mc_mgrp_create(mc_grp_id)
        # 添加多播节点组
        handle = self.controller.mc_node_create(rid, port_list)
        # 将多播组与多播节点组关联
        self.controller.mc_node_associate(mc_grp_id, handle)
        # 为broadcast匹配表添加表项,即添加入端口号ingress_port与动作set_mcast_grp、组播组编号mc_grp_id的映射
        self.controller.table_add("broadcast", "set_mcast_grp", [str(ingress_port)], [str(mc_grp_id)])
        # 下一个组播组的编号、下一个多播节点组的编号
        mc_grp_id += 1
        rid += 1
1
2
3
4
5
def add_mirror(self):
    if self.cpu_port:
        # 此前在交换机程序中设置克隆出数据包的Session为100
        # 将Session 100与交换机上用于连接控制器的CPU Port绑定
        self.controller.mirroring_add(100, self.cpu_port)
1
2
3
4
5
6
def fill_table_test(self):
    # 为dmac匹配表添加表项,即添加目的MAC地址与动作forward、出端口编号的映射
    self.controller.table_add("dmac", "forward", ['00:00:0a:00:00:01'], ['1'])
    self.controller.table_add("dmac", "forward", ['00:00:0a:00:00:02'], ['2'])
    self.controller.table_add("dmac", "forward", ['00:00:0a:00:00:03'], ['3'])
    self.controller.table_add("dmac", "forward", ['00:00:0a:00:00:04'], ['4'])

控制器·监听

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from scapy.all import Ether, sniff, Packet, BitField, raw

# 定义cpu头部字段
class CpuHeader(Packet):
    name = 'CpuPacket'
    # 该头部包括48位的目的MAC地址、16位的入端口编号
    fields_desc = [BitField('macAddr', 0, 48), BitField('ingress_port', 0, 16)]
    
def recv_msg_cpu(self, pkt):
    # 解析接收到的数据包
    packet = Ether(raw(pkt))
    # 链路层类型为0x1234的为交换机上传的克隆数据包
    if packet.type == 0x1234:
        # 继续解析以太网帧其后的cpu头部
        cpu_header = CpuHeader(bytes(packet.load))
        # 提取获取到的目的MAC地址macAddr、数据包入端口ingress_port,随后将学习到的信息进行处理
        self.learn([(cpu_header.macAddr, cpu_header.ingress_port)])

def run_cpu_port_loop(self):
    # 获取指定交换机暴露的CPU Port(如s1-cpu)
    cpu_port_intf = str(self.topo.get_cpu_port_intf(self.sw_name).replace("eth0", "eth1"))
    # 监听CPU Port,接收发自交换机的数据包
    sniff(iface=cpu_port_intf, prn=self.recv_msg_cpu)

控制器·学习MAC地址

1
2
3
4
5
6
7
8
9
def learn(self, learning_data):
    for mac_addr, ingress_port in learning_data:
        # 打印出学习到的信息
        print("mac: %012X ingress_port: %s " % (mac_addr, ingress_port))
        # 在smac匹配表中添加表项,设置匹配到mac_addr的动作为NoAction
        # 表明该控制器已经学习、处理该信息,之后无需再次上报
        self.controller.table_add("smac", "NoAction", [str(mac_addr)])
        # 在dmac匹配表中添加表项,设置匹配到mac_addr的动作为forward,并从ingress_port端口转发出去
        self.controller.table_add("dmac", "forward", [str(mac_addr)], [str(ingress_port)])

Digest

digest与clone都是P4数据面与控制面交互的机制:digest只发送选定的少量字段信息给控制器、不传输完整数据包,但需要额外封装成控制消息且每条消息都要由控制器回复ACK确认,以避免丢包;clone则直接复制完整数据包到CPU Port,不需要ACK,也不封装成控制消息,更简单直接。因此digest在高频情况下由于ACK与消息处理开销,不一定比clone更快。

交换机程序

digest方式实现起来非常简单,只需在MyIngress中构造、发送消息,无需在MyEgress、MyDeparser中进行额外的处理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
typedef bit<48> macAddr_t;

header ethernet_t {
    macAddr_t dstAddr;
    macAddr_t srcAddr;
    bit<16>   etherType;
}

// 定义digest消息的数据负载部分
struct learn_t {
    bit<48> srcAddr;
    bit<9>  ingress_port;
}

// 自定义元数据,实例化learn_t
struct metadata {
    learn_t learn;
}

struct headers {
    ethernet_t   ethernet;
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
control MyIngress(inout headers hdr,
                  inout metadata meta,
                  inout standard_metadata_t standard_metadata) {
    action drop() {
        mark_to_drop(standard_metadata);
    }

    action mac_learn(){
        // 填充digest的负载部分
        meta.learn.srcAddr = hdr.ethernet.srcAddr;
        meta.learn.ingress_port = standard_metadata.ingress_port;
        // 调用digest将信息直接发送至控制器
        // 第一个参数是digest ID(类似clone中的Session ID),用于标识不同的digest消息类型
        // 第二个参数是digest的负载,注意这里不能直接填写learn,因为learn是定义、不是实例化的变量
        digest<learn_t>(1, meta.learn);
    }

    table smac {
        key = {
            hdr.ethernet.srcAddr: exact;
        }

        actions = {
            mac_learn;
            NoAction;
        }
        size = 256;
        default_action = mac_learn;
    }

    action forward(bit<9> egress_port) {
        standard_metadata.egress_spec = egress_port;
    }

    table dmac {
        key = {
            hdr.ethernet.dstAddr: exact;
        }

        actions = {
            forward;
            NoAction;
        }
        size = 256;
        default_action = NoAction;
    }

    action set_mcast_grp(bit<16> mcast_grp) {
        standard_metadata.mcast_grp = mcast_grp;
    }

    table broadcast {
        key = {
            standard_metadata.ingress_port: exact;
        }

        actions = {
            set_mcast_grp;
            NoAction;
        }
        size = 256;
        default_action = NoAction;
    }

    apply {
        smac.apply();
        if (dmac.apply().hit){
        }
        else {
            broadcast.apply();
        }
    }
}
1
2
3
4
5
control MyEgress(inout headers hdr,
                 inout metadata meta,
                 inout standard_metadata_t standard_metadata) {
    apply { }
}
1
2
3
4
5
control MyDeparser(packet_out packet, in headers hdr) {
    apply {
        packet.emit(hdr.ethernet);
    }
}

控制器·监听

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
```python
# BMv2的digest消息是通过nanomsg socket向外推送的
# nnpy库是Python对底层nanomsg消息传输库的封装
import nnpy

def unpack_digest(self, msg, num_samples):
    digest = []
    # 从msg的第32个字节开始读取,通常前32字节是消息头或其它元数据,需要跳过
    starting_index = 32
    # 解析消息中的每一个样本
    for sample in range(num_samples):
        # 从消息中取8个字节,使用struct.unpack按格式">LHH"解析
        # >表示大端字节序,即网络序
        # L是4字节无符号长整型,对应MAC地址的一部分
        # H是2字节无符号短整型,对应MAC地址的另一部分
        # H是2字节无符号短整型,对应ingress_port
        mac0, mac1, ingress_port = struct.unpack(">LHH", msg[starting_index:starting_index+8])
        # 读取指针向后偏移8各字节
        starting_index +=8
        # 拼接出完整的MAC地址
        mac_addr = (mac0 << 16) + mac1
        # 当前样本解析完成
        digest.append((mac_addr, ingress_port))
    return digest

def recv_msg_digest(self, msg):
    # 根据BMv2定义的消息结构进行解析digest消息的头部
    # topic为消息类型标识
    # device_id为交换机设备编号
    # ctx_id为上下文编号,交换机多流水线时区分
    # list_id为用于区分不同类型的digest
    # buffer_id为本digest次消息的唯一标识
    # num为本次消息里的包含的样本数量
    topic, device_id, ctx_id, list_id, buffer_id, num = struct.unpack("<iQiiQi", msg[:32])
    # 从消息中提取出num条digest样本条目
    digest = self.unpack_digest(msg, num)
    # 调用函数对digest样本列表进行进一步的消息处理
    self.learn(digest)
    # 控制器收到后必须显式ACK确认,需传递上下文编号、digest类型编号、digest消息编号以唯一标识消息
    self.controller.client.bm_learning_ack_buffer(ctx_id, list_id, buffer_id)
        
def run_digest_loop(self):
    # 创建一个订阅套接字(SUB),准备订阅消息
    sub = nnpy.Socket(nnpy.AF_SP, nnpy.SUB)
    # 调用BMv2提供的管理接口,获取通知套接字地址
    notifications_socket = self.controller.client.bm_mgmt_get_info().notifications_socket
    # 连接到刚刚获取的通知套接字地址
    sub.connect(notifications_socket)
    # 设置订阅选项,''表示订阅所有消息(相当于不做过滤)
    sub.setsockopt(nnpy.SUB, nnpy.SUB_SUBSCRIBE, '')
    # 不断从订阅的套接字上接收消息
    while True:
        msg = sub.recv()
        self.recv_msg_digest(msg)
Licensed under CC BY-NC-SA 4.0
皖ICP备2025083746号-1
公安备案 陕公网安备61019002003315号



使用 Hugo 构建
主题 StackJimmy 设计