1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
import nnpy
import struct
from p4utils.utils.helper import load_topo
from p4utils.utils.sswitch_thrift_API import SimpleSwitchThriftAPI
from scapy.all import Ether, sniff, Packet, BitField, raw
# 定义CPU Header头部字段
class CpuHeader(Packet):
name = 'CpuPacket'
fields_desc = [BitField('macAddr',0,48), BitField('ingress_port', 0, 16)]
class L2Controller(object):
def __init__(self, sw_name):
# mininet启动后 将生成topology.json文件 其中是网络配置信息
self.topo = load_topo('topology.json')
self.sw_name = sw_name
self.thrift_port = self.topo.get_thrift_port(sw_name)
# 从网络配置信息中获取交换机上与控制器通信的端口
self.cpu_port = self.topo.get_cpu_port_index(self.sw_name)
# 控制器交互API接口
self.controller = SimpleSwitchThriftAPI(self.thrift_port)
self.init()
def init(self):
# 初始化交换机的状态
self.controller.reset_state()
self.add_boadcast_groups()
self.add_mirror()
#self.fill_table_test()
def add_mirror(self):
if self.cpu_port:
# 将Session 100与交换机上连接控制器的端口绑定
self.controller.mirroring_add(100, self.cpu_port)
def add_boadcast_groups(self):
interfaces_to_port = self.topo.get_node_intfs(fields=['port'])[self.sw_name].copy()
# Filter lo and cpu port
interfaces_to_port.pop('lo', None)
interfaces_to_port.pop(self.topo.get_cpu_port_intf(self.sw_name), None)
mc_grp_id = 1
rid = 0
for ingress_port in interfaces_to_port.values():
port_list = list(interfaces_to_port.values())
del(port_list[port_list.index(ingress_port)])
# 添加组播组
self.controller.mc_mgrp_create(mc_grp_id)
# 添加多播组
handle = self.controller.mc_node_create(rid, port_list)
# 将多播组与组播组关联
self.controller.mc_node_associate(mc_grp_id, handle)
# 将关系写入表broadcast中
self.controller.table_add("broadcast", "set_mcast_grp", [str(ingress_port)], [str(mc_grp_id)])
mc_grp_id +=1
rid +=1
def fill_table_test(self):
# 添加L2转发表
self.controller.table_add("dmac", "forward", ['00:00:0a:00:00:01'], ['1'])
self.controller.table_add("dmac", "forward", ['00:00:0a:00:00:02'], ['2'])
self.controller.table_add("dmac", "forward", ['00:00:0a:00:00:03'], ['3'])
self.controller.table_add("dmac", "forward", ['00:00:0a:00:00:04'], ['4'])
def learn(self, learning_data):
# 进行下发 分别写入smac与dmac表
for mac_addr, ingress_port in learning_data:
print("mac: %012X ingress_port: %s " % (mac_addr, ingress_port))
self.controller.table_add("smac", "NoAction", [str(mac_addr)])
self.controller.table_add("dmac", "forward", [str(mac_addr)], [str(ingress_port)])
def unpack_digest(self, msg, num_samples):
digest = []
starting_index = 32
for sample in range(num_samples):
mac0, mac1, ingress_port = struct.unpack(">LHH", msg[starting_index:starting_index+8])
starting_index +=8
mac_addr = (mac0 << 16) + mac1
digest.append((mac_addr, ingress_port))
return digest
def recv_msg_digest(self, msg):
topic, device_id, ctx_id, list_id, buffer_id, num = struct.unpack("<iQiiQi",
msg[:32])
digest = self.unpack_digest(msg, num)
self.learn(digest)
#Acknowledge digest
self.controller.client.bm_learning_ack_buffer(ctx_id, list_id, buffer_id)
def run_digest_loop(self):
sub = nnpy.Socket(nnpy.AF_SP, nnpy.SUB)
notifications_socket = self.controller.client.bm_mgmt_get_info().notifications_socket
sub.connect(notifications_socket)
sub.setsockopt(nnpy.SUB, nnpy.SUB_SUBSCRIBE, '')
while True:
msg = sub.recv()
self.recv_msg_digest(msg)
def recv_msg_cpu(self, pkt):
# 解析接收到的数据包
packet = Ether(raw(pkt))
# 链路层类型需为0x1234
if packet.type == 0x1234:
# 继续解析其后的cpu_header
cpu_header = CpuHeader(bytes(packet.load))
# 将学习到的进行下发
self.learn([(cpu_header.macAddr, cpu_header.ingress_port)])
def run_cpu_port_loop(self):
# 监听端口 获取接收到的数据包
cpu_port_intf = str(self.topo.get_cpu_port_intf(self.sw_name).replace("eth0", "eth1"))
sniff(iface=cpu_port_intf, prn=self.recv_msg_cpu)
if __name__ == "__main__":
import sys
sw_name = sys.argv[1]
receive_from = sys.argv[2]
if receive_from == "digest":
controller = L2Controller(sw_name).run_digest_loop()
elif receive_from == "cpu":
controller = L2Controller(sw_name).run_cpu_port_loop()
|