from __future__ import print_function import unittest, sys, time, threading, struct, logging, os import vpp_papi from subprocess import Popen, PIPE import time from ipaddress import * scriptdir = os.path.dirname(os.path.realpath(__file__)) papi_event = threading.Event() print(vpp_papi.vpe.VL_API_SW_INTERFACE_SET_FLAGS) def papi_event_handler(result): if result.vl_msg_id == vpp_papi.vpe.VL_API_SW_INTERFACE_SET_FLAGS: return if result.vl_msg_id == vpp_papi.vpe.VL_API_VNET_INTERFACE_COUNTERS: print('Interface counters', result) return if result.vl_msg_id == vpp_papi.vpe.VL_API_VNET_IP6_FIB_COUNTERS: print('IPv6 FIB counters', result) papi_event.set() return print('Unknown message id:', result.vl_msg_id) class ShellSession: def __init__(self, name): self.description = "Interactive shell session" self.name = name self.fname = "/tmp/session-" + name + "-output.txt" self.fw = open(self.fname, "wb") self.fr = open(self.fname, "r") self.p = Popen("/bin/bash", stdin = PIPE, stdout = self.fw, stderr = self.fw, bufsize = 1) def write(self, data): self.p.stdin.write(data) def read(self): return self.fr.read() def close(self): self.fr.close() self.fw.close() def connect_with(self, other): this_end = self.name + "_" + other.name other_end = other.name + "_" + self.name self.write("ip link add name " + this_end + " type veth peer name " + other_end + "\n") self.write("ip link set dev " + this_end + " up promisc on\n") other.write("echo $$\n") time.sleep(0.5) thepid = int(other.read()) self.write("ip link set dev " + other_end + " up promisc on netns /proc/"+str(thepid)+"/ns/net\n") time.sleep(0.3) def connect_tap_interface(osname): r = "" if osname in vpp_papi.bag["taps"]: r = vpp_papi.bag["taps"][osname] else: r = vpp_papi.tap_connect(True, osname, "AAAAAA", False, False) print("TAP ", osname, " reply ", r) vpp_papi.bag["taps"][osname] = r return r def get_session(sname): r = "" if sname in vpp_papi.bag["sessions"]: r = vpp_papi.bag["sessions"][sname] else: r = ShellSession(sname) vpp_papi.bag["sessions"][sname] = r return r def connect_afpacket_interface(osname): r = "" if osname in vpp_papi.bag["afpackets"]: r = vpp_papi.bag["afpackets"][osname] else: r = vpp_papi.af_packet_create(osname, "AAAAAA", True) print("AFPACKET ", osname, " reply ", r) vpp_papi.bag["afpackets"][osname] = r return r def ifup(sw_if_index, up=True): vpp_papi.sw_interface_set_flags(sw_if_index, up, False, False) def bridge_add(bd_id): vpp_papi.bridge_domain_add_del(bd_id, True, True, True, True, 0, True) def bridge_add_interface(bd_id, sw_if_index): print("Adding ", sw_if_index, " to bridge ", bd_id) vpp_papi.sw_interface_set_l2_bridge(sw_if_index, bd_id, False, False, True) def cli(cmd): print("Running " + cmd) reply = vpp_papi.cli_inband(len(cmd), cmd) print("Reply: ", reply) return reply def shell_session_open(name): if name in vpp_papi.bag["sessions"]: return vpp_papi.bag["sessions"][name] else: vpp_papi.bag["sessions"][name] = ShellSession(name) return vpp_papi.bag["sessions"][name] import glob, subprocess class TestPAPI(unittest.TestCase): @classmethod def setUpClass(cls): vpp_papi.bag = {} vpp_papi.bag["taps"] = {} vpp_papi.bag["afpackets"] = {} vpp_papi.bag["sessions"] = {} if True: return 1 # # Start main VPP process cls.vpp_bin = glob.glob(scriptdir+'/../../../build-root/install-vpp*-native/vpp/bin/vpp')[0] print("VPP BIN:", cls.vpp_bin) cls.vpp = subprocess.Popen([cls.vpp_bin, "unix", "nodaemon"], stderr=subprocess.PIPE) print('Started VPP') # For some reason unless we let VPP start up the API cannot connect. time.sleep(0.3) @classmethod def tearDownClass(cls): if True: return 1 cls.vpp.terminate() def setUp(self): print("Connecting API") r = vpp_papi.connect("test_papi") self.assertEqual(r, 0) get_session("s1").write("unshare -n /bin/bash\n") get_session("s2").write("unshare -n /bin/bash\n") get_session("s0").connect_with(get_session("s1")) get_session("s0").connect_with(get_session("s2")) ifup(connect_tap_interface("tap-0").sw_if_index) ifup(connect_tap_interface("tap-1").sw_if_index) ifup(connect_afpacket_interface("s0_s1").sw_if_index) ifup(connect_afpacket_interface("s0_s2").sw_if_index) bridge_add(42) bridge_add_interface(42, connect_tap_interface("tap-0").sw_if_index) bridge_add_interface(42, connect_tap_interface("tap-1").sw_if_index) bridge_add_interface(42, connect_afpacket_interface("s0_s1").sw_if_index) bridge_add_interface(42, connect_afpacket_interface("s0_s2").sw_if_index) get_session("s1").write("ip -6 addr add dev s1_s0 2001:db8:1::1/64\n") get_session("s1").write("ip link set dev s1_s0 up promisc on\n") get_session("s2").write("ip -6 addr add dev s2_s0 2001:db8:1::2/64\n") get_session("s2").write("ip -6 addr add dev s2_s0 2001:db8:1::3/64\n") get_session("s2").write("ip link set dev s2_s0 up promisc on\n") time.sleep(1) get_session("s1").write("ip addr\n") get_session("s2").write("ip addr\n") time.sleep(1) print("S1:", get_session("s1").read()) print("S2:", get_session("s2").read()) cli("classify table mask l3 ip6 dst buckets 64") cli("show classify tables") cli("classify session hit-next 0 table-index 0 match l3 ip6 dst 2001:db8:1::2 opaque-index 42") cli("set interface l2 input classify intfc host-s0_s1 ip6-table 0") raw_input('About to start testing. Press enter to continue: ') get_session("s1").write("ping6 -c 3 2001:db8:1::3\n") get_session("s1").write("ping6 -c 3 2001:db8:1::2\n") print("Pinging") time.sleep(10) print("S1:", get_session("s1").read()) raw_input('Press enter to continue: ') print("Session result:", get_session("s1").read()) def tearDown(self): r = vpp_papi.disconnect() self.assertEqual(r, 0) # # The tests themselves # # # Basic request / reply # def test_show_version(self): t = vpp_papi.show_version() print('T', t); program = t.program.decode().rstrip('\x00') self.assertEqual('vpe', program) # # Details / Dump # def xtest_details_dump(self): t = vpp_papi.sw_interface_dump(0, b'') print('Dump/details T', t) # # Arrays # def xtest_arrays(self): t = vpp_papi.vnet_get_summary_stats() print('Summary stats', t) print('Packets:', t.total_pkts[0]) print('Packets:', t.total_pkts[1]) # # Variable sized arrays and counters # #@unittest.skip("stats") def xtest_want_stats(self): pid = 123 vpp_papi.register_event_callback(papi_event_handler) papi_event.clear() # Need to configure IPv6 to get som IPv6 FIB stats t = vpp_papi.create_loopback('') print(t) self.assertEqual(t.retval, 0) ifindex = t.sw_if_index addr = str(IPv6Address(u'1::1').packed) t = vpp_papi.sw_interface_add_del_address(ifindex, 1, 1, 0, 16, addr) print(t) self.assertEqual(t.retval, 0) # Check if interface is up # XXX: Add new API to query interface state based on ifindex, instead of dump all. t = vpp_papi.sw_interface_set_flags(ifindex, 1, 1, 0) self.assertEqual(t.retval, 0) t = vpp_papi.want_stats(True, pid) print (t) # # Wait for some stats # self.assertEqual(papi_event.wait(15), True) t = vpp_papi.want_stats(False, pid) print (t) # # Plugins? # if __name__ == '__main__': #logging.basicConfig(level=logging.DEBUG) unittest.main() def test_papi(): print('test')