背景描述
在先前的L2交换机中,转发表项需要预先手动配置。为了使得交换机具有自主学习转发表的能力,需要引入控制平面。当有未知数据包到达交换机时,交换机需要将信息传递给控制平面,由控制平面计算、添加转发表。因此本文的核心是P4与控制器的通信,常见的方式有Clone Packets与Digest:
- Clone Packets:在数据平面流水线里,遇到特定条件时,用clone动作把原始数据包本身(或经过修改的数据包)复制一份,发送到控制器,控制器收到的是一个完整的数据包;
- Packets与Digest:在数据平面里,用digest动作可以把自定义的少量元数据信息打包、发送给控制器,因此控制器收到的是一个摘要(例如ingress_port、src_mac等)。
拓扑描述文件
JSON格式
在此前版本的基础上为交换机s1增加cpu_port属性,填写true使得交换机增加用于与控制器通信的端口。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
{
"p4_src": "p4src/l2_learning.p4",
"cli": true,
"pcap_dump": true,
"enable_log": true,
"topology": {
"assignment_strategy": "l2",
"default":{
"auto_arp_tables": false
},
"links": [["h1", "s1"], ["h2", "s1"], ["h3", "s1"], ["h4","s1"]],
"hosts": {
"h1": { },
"h2": { },
"h3": { },
"h4": { }
},
"switches": {
"s1": {
"cpu_port" : true
}
}
}
}
|
Python脚本
在此前版本的基础上添加enableCpuPortAll函数即可为全部交换机增加用于与控制器通信的端口。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
from p4utils.mininetlib.network_API import NetworkAPI
net = NetworkAPI()
# Network general options
net.setLogLevel('info')
net.disableArpTables()
# Network definition
net.addP4Switch('s1')
net.setP4Source('s1','./p4src/l2_learning.p4')
net.addHost('h1')
net.addHost('h2')
net.addHost('h3')
net.addHost('h4')
net.addLink('s1', 'h1')
net.addLink('s1', 'h2')
net.addLink('s1', 'h3')
net.addLink('s1', 'h4')
# Assignment strategy
net.l2()
# Nodes general options
net.enableCpuPortAll()
net.enablePcapDumpAll()
net.enableLogAll()
net.enableCli()
net.startNetwork()
|
Clone Packets
交换机程序·定义头部
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
const bit<16> L2_LEARN_ETHER_TYPE = 0x1234;
typedef bit<48> macAddr_t;
header ethernet_t {
macAddr_t dstAddr;
macAddr_t srcAddr;
bit<16> etherType;
}
// 为使得控制器能够获取源MAC地址与入端口信息,定义头部cpu_t
header cpu_t {
bit<48> srcAddr;
bit<16> ingress_port;
}
// 由于克隆出的数据包会失去全部标准元数据,因此使用自定义的元数据储存后续将会使用的信息
struct metadata {
bit<9> ingress_port;
}
struct headers {
ethernet_t ethernet;
cpu_t cpu;
}
|
交换机程序·入队列侧处理
交换机解析、提取数据包的源MAC地址,并在匹配表smac中查找是否存在该MAC地址,若未命中则需要告知控制器该信息,让控制器学习到该信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
control MyIngress(inout headers hdr,
inout metadata meta,
inout standard_metadata_t standard_metadata) {
action drop() {
mark_to_drop(standard_metadata);
}
action mac_learn() {
// 由于克隆出的数据包会失去全部标准元数据,因此将入端口号备份进自定义的元数据中
meta.ingress_port = standard_metadata.ingress_port;
// CloneType.I2E表示克隆数据包后,直接送到Egree部分处理
// 100为Session ID,例如控制器连在交换机的CPU Port 7上
// 需要使用mirroring_add 100 7命令将Session 100与CPU Port 7绑定
// 克隆的数据包将被因此发至CPU Port 7
// meta是要一起克隆过去的自定义元数据
clone3(CloneType.I2E, 100, meta);
}
// 该匹配表用于判断数据包的源MAC地址是否已被记录
// 如果数据包的源MAC在匹配表中,则无需操作
// 如果不在匹配表中,则需要克隆数据包并发送至控制器
table smac {
key = {
hdr.ethernet.srcAddr: exact;
}
actions = {
mac_learn;
NoAction;
}
size = 256;
// 默认动作为mac_learn,表示匹配失败后,克隆数据包并发送至控制器
default_action = mac_learn;
}
action forward(bit<9> egress_port) {
standard_metadata.egress_spec = egress_port;
}
table dmac {
key = {
hdr.ethernet.dstAddr: exact;
}
actions = {
forward;
NoAction;
}
size = 256;
default_action = NoAction;
}
action set_mcast_grp(bit<16> mcast_grp) {
standard_metadata.mcast_grp = mcast_grp;
}
table broadcast {
key = {
standard_metadata.ingress_port: exact;
}
actions = {
set_mcast_grp;
NoAction;
}
size = 256;
default_action = NoAction;
}
apply {
// 判断是否需要克隆数据包并发送至控制器
smac.apply();
// 执行数据包的查表转发
if (dmac.apply().hit){
}
else {
// 若无匹配的转发表项,则泛洪出去
broadcast.apply();
}
}
}
|
交换机程序·出队列侧处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
control MyEgress(inout headers hdr,
inout metadata meta,
inout standard_metadata_t standard_metadata) {
apply {
// 当标准元数据的instance_type为1时,表示为克隆数据包(CloneType.I2E)
if (standard_metadata.instance_type == 1){
// 只有cpu头部被标记为valid时,该cpu头部在封装时才会被封装进数据包
hdr.cpu.setValid();
// 填写源MAC地址信息
hdr.cpu.srcAddr = hdr.ethernet.srcAddr;
// 填写数据包的入端口信息,注意此处需要类型转换,因为P4不会自动做隐式类型转换
// meta.ingress_port的类型是bit<9>,而cpu头部为了字节对齐定义ingress_port为bit<16>
hdr.cpu.ingress_port = (bit<16>)meta.ingress_port;
// 修改以太网帧类型字段,使得控制器能够识别出该数据包为学习包
hdr.ethernet.etherType = L2_LEARN_ETHER_TYPE;
// 截断数据包,由于只需链路层和cpu头部,无需负载部分
// P4流水线默认是保留原始包所有payload
// 在流水线上的操作只是修改头部部分
// 除非显式告诉硬件/软件丢弃payload的部分,P4会把原始payload封装起来
truncate((bit<32>)22);
}
}
}
|
交换机程序·封装数据包
1
2
3
4
5
6
7
|
control MyDeparser(packet_out packet, in headers hdr) {
apply {
packet.emit(hdr.ethernet);
// 只有进行了hdr.cpu.setValid()操作,才会把cpu头部封装进数据包
packet.emit(hdr.cpu);
}
}
|
控制器·初始化交换机
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
from p4utils.utils.helper import load_topo
from p4utils.utils.sswitch_thrift_API import SimpleSwitchThriftAPI
def __init__(self, sw_name):
# 网络启动后将自动生成topology.json文件,其中包含网络与节点信息
self.topo = load_topo('topology.json')
# 交换机名称(例如s1),根据名称进而可在topology.json文件中查找相关属性
self.sw_name = sw_name
# 获取其Thrift服务端端口号
self.thrift_port = self.topo.get_thrift_port(sw_name)
# 获取交换机用于向控制平面发送数据的CPU Port号
self.cpu_port = self.topo.get_cpu_port_index(self.sw_name)
# 实例化Thrift通信接口,进而可以控制交换机
self.controller = SimpleSwitchThriftAPI(self.thrift_port)
self.init()
def init(self):
# 初始化交换机的状态,包括清空所有匹配表里的表项、清空所有计数器和寄存器的值、重置表默认动作为默认值等
self.controller.reset_state()
# 添加组播相关配置
self.add_boadcast_groups()
# 添加与克隆数据包至控制器相关的命令
self.add_mirror()
# 添加基于目的MAC的转发表
self.fill_table_test()
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
def add_boadcast_groups(self):
# 获取当前交换机(如s1)的接口名(如s1-eth0)到端口号(如1)的映射
# 使用copy函数返回一个拷贝,防止后面修改影响原始数据
interfaces_to_port = self.topo.get_node_intfs(fields=['port'])[self.sw_name].copy()
# 删除掉loopback接口,因为泛洪无需泛洪到loopback接口
interfaces_to_port.pop('lo', None)
# 删除掉CPU Port,因为泛洪无需泛洪到控制器
# get_cpu_port_intf函数用于获取当前交换机(如s1)的CPU Port名称(如s1-cpu)
interfaces_to_port.pop(self.topo.get_cpu_port_intf(self.sw_name), None)
mc_grp_id = 1
rid = 0
# 遍历当前交换机的全部接口的端口号
for ingress_port in interfaces_to_port.values():
# 由于泛洪需要泛洪到其它端口,需要排除当前端口号
port_list = list(interfaces_to_port.values())
del (port_list[port_list.index(ingress_port)])
# 添加组播组mc_grp_id
self.controller.mc_mgrp_create(mc_grp_id)
# 添加多播节点组
handle = self.controller.mc_node_create(rid, port_list)
# 将多播组与多播节点组关联
self.controller.mc_node_associate(mc_grp_id, handle)
# 为broadcast匹配表添加表项,即添加入端口号ingress_port与动作set_mcast_grp、组播组编号mc_grp_id的映射
self.controller.table_add("broadcast", "set_mcast_grp", [str(ingress_port)], [str(mc_grp_id)])
# 下一个组播组的编号、下一个多播节点组的编号
mc_grp_id += 1
rid += 1
|
1
2
3
4
5
|
def add_mirror(self):
if self.cpu_port:
# 此前在交换机程序中设置克隆出数据包的Session为100
# 将Session 100与交换机上用于连接控制器的CPU Port绑定
self.controller.mirroring_add(100, self.cpu_port)
|
1
2
3
4
5
6
|
def fill_table_test(self):
# 为dmac匹配表添加表项,即添加目的MAC地址与动作forward、出端口编号的映射
self.controller.table_add("dmac", "forward", ['00:00:0a:00:00:01'], ['1'])
self.controller.table_add("dmac", "forward", ['00:00:0a:00:00:02'], ['2'])
self.controller.table_add("dmac", "forward", ['00:00:0a:00:00:03'], ['3'])
self.controller.table_add("dmac", "forward", ['00:00:0a:00:00:04'], ['4'])
|
控制器·监听
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
from scapy.all import Ether, sniff, Packet, BitField, raw
# 定义cpu头部字段
class CpuHeader(Packet):
name = 'CpuPacket'
# 该头部包括48位的目的MAC地址、16位的入端口编号
fields_desc = [BitField('macAddr', 0, 48), BitField('ingress_port', 0, 16)]
def recv_msg_cpu(self, pkt):
# 解析接收到的数据包
packet = Ether(raw(pkt))
# 链路层类型为0x1234的为交换机上传的克隆数据包
if packet.type == 0x1234:
# 继续解析以太网帧其后的cpu头部
cpu_header = CpuHeader(bytes(packet.load))
# 提取获取到的目的MAC地址macAddr、数据包入端口ingress_port,随后将学习到的信息进行处理
self.learn([(cpu_header.macAddr, cpu_header.ingress_port)])
def run_cpu_port_loop(self):
# 获取指定交换机暴露的CPU Port(如s1-cpu)
cpu_port_intf = str(self.topo.get_cpu_port_intf(self.sw_name).replace("eth0", "eth1"))
# 监听CPU Port,接收发自交换机的数据包
sniff(iface=cpu_port_intf, prn=self.recv_msg_cpu)
|
控制器·学习MAC地址
1
2
3
4
5
6
7
8
9
|
def learn(self, learning_data):
for mac_addr, ingress_port in learning_data:
# 打印出学习到的信息
print("mac: %012X ingress_port: %s " % (mac_addr, ingress_port))
# 在smac匹配表中添加表项,设置匹配到mac_addr的动作为NoAction
# 表明该控制器已经学习、处理该信息,之后无需再次上报
self.controller.table_add("smac", "NoAction", [str(mac_addr)])
# 在dmac匹配表中添加表项,设置匹配到mac_addr的动作为forward,并从ingress_port端口转发出去
self.controller.table_add("dmac", "forward", [str(mac_addr)], [str(ingress_port)])
|
Digest
digest与clone都是P4数据面与控制面交互的机制:digest只发送选定的少量字段信息给控制器、不传输完整数据包,但需要额外封装成控制消息且每条消息都要由控制器回复ACK确认,以避免丢包;clone则直接复制完整数据包到CPU Port,不需要ACK,也不封装成控制消息,更简单直接。因此digest在高频情况下由于ACK与消息处理开销,不一定比clone更快。
交换机程序
digest方式实现起来非常简单,只需在MyIngress中构造、发送消息,无需在MyEgress、MyDeparser中进行额外的处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
typedef bit<48> macAddr_t;
header ethernet_t {
macAddr_t dstAddr;
macAddr_t srcAddr;
bit<16> etherType;
}
// 定义digest消息的数据负载部分
struct learn_t {
bit<48> srcAddr;
bit<9> ingress_port;
}
// 自定义元数据,实例化learn_t
struct metadata {
learn_t learn;
}
struct headers {
ethernet_t ethernet;
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
control MyIngress(inout headers hdr,
inout metadata meta,
inout standard_metadata_t standard_metadata) {
action drop() {
mark_to_drop(standard_metadata);
}
action mac_learn(){
// 填充digest的负载部分
meta.learn.srcAddr = hdr.ethernet.srcAddr;
meta.learn.ingress_port = standard_metadata.ingress_port;
// 调用digest将信息直接发送至控制器
// 第一个参数是digest ID(类似clone中的Session ID),用于标识不同的digest消息类型
// 第二个参数是digest的负载,注意这里不能直接填写learn,因为learn是定义、不是实例化的变量
digest<learn_t>(1, meta.learn);
}
table smac {
key = {
hdr.ethernet.srcAddr: exact;
}
actions = {
mac_learn;
NoAction;
}
size = 256;
default_action = mac_learn;
}
action forward(bit<9> egress_port) {
standard_metadata.egress_spec = egress_port;
}
table dmac {
key = {
hdr.ethernet.dstAddr: exact;
}
actions = {
forward;
NoAction;
}
size = 256;
default_action = NoAction;
}
action set_mcast_grp(bit<16> mcast_grp) {
standard_metadata.mcast_grp = mcast_grp;
}
table broadcast {
key = {
standard_metadata.ingress_port: exact;
}
actions = {
set_mcast_grp;
NoAction;
}
size = 256;
default_action = NoAction;
}
apply {
smac.apply();
if (dmac.apply().hit){
}
else {
broadcast.apply();
}
}
}
|
1
2
3
4
5
|
control MyEgress(inout headers hdr,
inout metadata meta,
inout standard_metadata_t standard_metadata) {
apply { }
}
|
1
2
3
4
5
|
control MyDeparser(packet_out packet, in headers hdr) {
apply {
packet.emit(hdr.ethernet);
}
}
|
控制器·监听
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
```python
# BMv2的digest消息是通过nanomsg socket向外推送的
# nnpy库是Python对底层nanomsg消息传输库的封装
import nnpy
def unpack_digest(self, msg, num_samples):
digest = []
# 从msg的第32个字节开始读取,通常前32字节是消息头或其它元数据,需要跳过
starting_index = 32
# 解析消息中的每一个样本
for sample in range(num_samples):
# 从消息中取8个字节,使用struct.unpack按格式">LHH"解析
# >表示大端字节序,即网络序
# L是4字节无符号长整型,对应MAC地址的一部分
# H是2字节无符号短整型,对应MAC地址的另一部分
# H是2字节无符号短整型,对应ingress_port
mac0, mac1, ingress_port = struct.unpack(">LHH", msg[starting_index:starting_index+8])
# 读取指针向后偏移8各字节
starting_index +=8
# 拼接出完整的MAC地址
mac_addr = (mac0 << 16) + mac1
# 当前样本解析完成
digest.append((mac_addr, ingress_port))
return digest
def recv_msg_digest(self, msg):
# 根据BMv2定义的消息结构进行解析digest消息的头部
# topic为消息类型标识
# device_id为交换机设备编号
# ctx_id为上下文编号,交换机多流水线时区分
# list_id为用于区分不同类型的digest
# buffer_id为本digest次消息的唯一标识
# num为本次消息里的包含的样本数量
topic, device_id, ctx_id, list_id, buffer_id, num = struct.unpack("<iQiiQi", msg[:32])
# 从消息中提取出num条digest样本条目
digest = self.unpack_digest(msg, num)
# 调用函数对digest样本列表进行进一步的消息处理
self.learn(digest)
# 控制器收到后必须显式ACK确认,需传递上下文编号、digest类型编号、digest消息编号以唯一标识消息
self.controller.client.bm_learning_ack_buffer(ctx_id, list_id, buffer_id)
def run_digest_loop(self):
# 创建一个订阅套接字(SUB),准备订阅消息
sub = nnpy.Socket(nnpy.AF_SP, nnpy.SUB)
# 调用BMv2提供的管理接口,获取通知套接字地址
notifications_socket = self.controller.client.bm_mgmt_get_info().notifications_socket
# 连接到刚刚获取的通知套接字地址
sub.connect(notifications_socket)
# 设置订阅选项,''表示订阅所有消息(相当于不做过滤)
sub.setsockopt(nnpy.SUB, nnpy.SUB_SUBSCRIBE, '')
# 不断从订阅的套接字上接收消息
while True:
msg = sub.recv()
self.recv_msg_digest(msg)
|