ipset/tests/sanity-tests/ipset-tests.py

229 lines
7.5 KiB
Python
Raw Normal View History

Adds tests according to the CI justification Adds tests according to the CI wiki specifically the standard test interface in the spec. The playbook includes Tier1 level test cases that have been tested in the following contexts and is passing reliably: Classic. Test logs are stored in the artifacts directory. The following steps are used to execute the tests using the standard test interface: Test enveronment Make sure you have installed packages from the spec ``` ansible-2.4.1.0-2.fc28.noarch python2-dnf-2.7.5-1.fc28.noarch libselinux-python-2.7-2.fc28.x86_64 standard-test-roles-2.5-1.fc28.noarch Run tests for Classic ``` Snip of the example test run for Classic tests: ``` test_ipset_add_bitmap_ip (__main__.IpsetTests) ... 192.168.11.12 is in set testnet. 192.168.11.13 is in set testnet. 192.168.11.14 is in set testnet. 192.168.11.15 is in set testnet. ok test_ipset_bitmap_ip_netfilter (__main__.IpsetTests) ... 192.168.225.32 is in set testnetiperf. 192.168.225.33 is in set testnetiperf. Connecting to host 192.168.225.32, port 55555 [ 5] local 192.168.225.32 port 54652 connected to 192.168.225.32 port 55555 [ ID] Interval Transfer Bitrate Retr Cwnd [ 5] 0.00-1.00 sec 7.79 GBytes 66.9 Gbits/sec 0 895 KBytes [ 5] 1.00-2.00 sec 7.99 GBytes 68.6 Gbits/sec 0 895 KBytes [ 5] 2.00-3.00 sec 7.99 GBytes 68.6 Gbits/sec 0 1.75 MBytes [ 5] 3.00-4.00 sec 7.79 GBytes 66.9 Gbits/sec 0 1.75 MBytes [ 5] 4.00-5.00 sec 8.03 GBytes 69.0 Gbits/sec 0 1.75 MBytes [ 5] 5.00-6.00 sec 7.94 GBytes 68.2 Gbits/sec 0 2.25 MBytes [ 5] 6.00-7.00 sec 7.88 GBytes 67.7 Gbits/sec 0 2.25 MBytes [ 5] 7.00-8.00 sec 8.06 GBytes 69.2 Gbits/sec 0 2.25 MBytes [ 5] 8.00-9.00 sec 7.89 GBytes 67.8 Gbits/sec 0 2.25 MBytes [ 5] 9.00-10.00 sec 7.85 GBytes 67.4 Gbits/sec 0 2.25 MBytes - - - - - - - - - - - - - - - - - - - - - - - - - [ ID] Interval Transfer Bitrate Retr [ 5] 0.00-10.00 sec 79.2 GBytes 68.0 Gbits/sec 0 sender [ 5] 0.00-10.04 sec 79.2 GBytes 67.8 Gbits/sec receiver iperf Done. iperf3: error - unable to connect to server: Connection timed out ok test_ipset_delete_bitmap_ip (__main__.IpsetTests) ... 192.168.11.12 is in set testnet. 192.168.11.13 is in set testnet. 192.168.11.12 is NOT in set testnet. ok test_ipset_hash_bitmap_ipport (__main__.IpsetTests) ... 192.168.1.1,udp:53 is in set testipport. 192.168.1.1,tcp:5555 is in set testipport. 192.168.1.1,tcp:5555 is NOT in set testipport. ok test_ipset_hash_bitmap_ipportip (__main__.IpsetTests) ... 192.168.1.1,tcp:80,10.0.0.1 is in set testipportip. 192.168.1.1,tcp:80,10.0.0.1 is in set testipportip. 192.168.1.1,tcp:80,10.0.0.1 is NOT in set testipportip. ok 192.168.1.1,tcp:80,10.0.0.1 is NOT in set testipportip. ok test_ipset_hash_bitmap_mac (__main__.IpsetTests) ... 02:01:02:03:04:09 is in set testmac. 02:01:02:03:04:09 is NOT in set testmac. ok test_ipset_hash_bitmap_netiface (__main__.IpsetTests) ... 192.168.0.0/24,veth-test is in set testnetiface. 192.167.0.0/24,veth-peer is in set testnetiface. 192.168.0.0/24,veth-test is NOT in set testnetiface. ok ---------------------------------------------------------------------- Ran 7 tests in 15.545s OK :: [ 16:37:57 ] :: [ PASS ] :: Command '/usr/bin/python3 /usr/bin/ipset-tests.py' (Expected 0, got 0) :: [ 16:37:57 ] :: [ PASS ] :: Command '/usr/bin/python3 /usr/bin/ipset-tests.py' (Expected 0, got 0) :::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: :::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: :: Duration: 15s :: Duration: 15s :: Assertions: 1 good, 0 bad :: Assertions: 1 good, 0 bad :: RESULT: PASS :: RESULT: PASS ```
2018-06-24 11:08:43 +00:00
#!/usr/bin/env python3
# SPDX-License-Identifier: LGPL-2.1+
# ~~~
# Description: Tests for ipset
#
# Author: Susant Sahani <susant@redhat.com>
# Copyright (c) 2018 Red Hat, Inc.
# ~~~
import errno
import os
import sys
import time
import unittest
import subprocess
import signal
import shutil
import socket
from pyroute2 import IPRoute
def setUpModule():
if shutil.which('ipset') is None:
raise OSError(errno.ENOENT, 'ipset not found')
if shutil.which('iperf3') is None:
raise OSError(errno.ENOENT, 'iperf3 not found')
def tearDownModule():
pass
class GenericUtilities():
def SetupVethInterface(self):
ip = IPRoute()
ip.link('add', ifname='veth-test', peer='veth-peer', kind='veth')
idx_veth_test = ip.link_lookup(ifname='veth-test')[0]
idx_veth_peer = ip.link_lookup(ifname='veth-peer')[0]
ip.link('set', index=idx_veth_test, address='02:01:02:03:04:08')
ip.link('set', index=idx_veth_peer, address='02:01:02:03:04:09')
ip.addr('add', index=idx_veth_test, address='192.168.225.32', mask=24)
ip.addr('add', index=idx_veth_peer, address='192.168.225.33', mask=24)
ip.link('set', index=idx_veth_test, state='up')
ip.link('set', index=idx_veth_peer, state='up')
ip.close()
def TearDownVethInterface(self):
ip = IPRoute()
ip.link('del', index=ip.link_lookup(ifname='veth-test')[0])
ip.close()
def AddAddress(self, interface, address):
ip = IPRoute()
idx_veth_peer = ip.link_lookup(ifname=interface)[0]
ip.close()
def IPSetAdd(self, hashset, address):
subprocess.check_output(['ipset', 'add', hashset, address])
def IPSetRemove(self, hashset, address):
subprocess.check_output(['ipset', 'del', hashset, address])
def IPSetCreateHashSet(self, hashset, hashtype):
subprocess.check_output(['ipset', 'create', hashset, hashtype])
def IPSetDestroyHashSet(self, hashset):
subprocess.check_output(['ipset', 'destroy', hashset])
class IpsetTests(unittest.TestCase, GenericUtilities):
def setUp(self):
self.SetupVethInterface()
def tearDown(self):
self.TearDownVethInterface()
def test_ipset_bitmap_ip_netfilter(self):
self.IPSetCreateHashSet('testnetiperf', 'hash:ip')
self.IPSetAdd('testnetiperf', '192.168.225.32')
self.IPSetAdd('testnetiperf', '192.168.225.33')
subprocess.check_output(['ipset', 'test','testnetiperf','192.168.225.32'])
subprocess.check_output(['ipset', 'test','testnetiperf','192.168.225.33'])
subprocess.check_output(['systemctl', 'start', 'iperf3d.service'])
time.sleep(5)
r = subprocess.call("iperf3" + " -c 192.168.225.32 -p 55555 --connect-timeout 5", shell=True)
self.assertEqual(r, 0)
r = subprocess.call("iptables" + " -I INPUT -m set --match-set testnetiperf src -j DROP", shell=True)
self.assertEqual(r, 0)
r = subprocess.call("iperf3" + " -c 192.168.225.32 -p 55555 --connect-timeout 5", shell=True)
self.assertNotEqual(r, 0)
subprocess.check_output(['systemctl', 'stop', 'iperf3d.service'])
subprocess.call("iptables" + " --delete INPUT -m set --match-set testnetiperf src -j DROP" , shell=True)
self.IPSetDestroyHashSet('testnetiperf')
def test_ipset_add_bitmap_ip(self):
self.IPSetCreateHashSet('testnet', 'hash:ip')
self.IPSetAdd('testnet', '192.168.11.12')
self.IPSetAdd('testnet', '192.168.11.13')
self.IPSetAdd('testnet', '192.168.11.14')
self.IPSetAdd('testnet', '192.168.11.15')
subprocess.check_output(['ipset', 'test','testnet','192.168.11.12'])
subprocess.check_output(['ipset', 'test','testnet','192.168.11.13'])
subprocess.check_output(['ipset', 'test','testnet','192.168.11.14'])
subprocess.check_output(['ipset', 'test','testnet','192.168.11.15'])
self.IPSetDestroyHashSet('testnet')
def test_ipset_delete_bitmap_ip(self):
self.IPSetCreateHashSet('testnet', 'hash:ip')
self.IPSetAdd('testnet', '192.168.11.12')
self.IPSetAdd('testnet', '192.168.11.13')
subprocess.check_output(['ipset', 'test','testnet','192.168.11.12'])
subprocess.check_output(['ipset', 'test','testnet','192.168.11.13'])
self.IPSetRemove('testnet', '192.168.11.12')
r = subprocess.call("ipset" + " test testnet 192.168.11.12", shell=True)
self.assertEqual(r, 1)
self.IPSetDestroyHashSet('testnet')
def test_ipset_hash_bitmap_mac(self):
self.IPSetCreateHashSet('testmac', 'hash:mac')
self.IPSetAdd('testmac', '02:01:02:03:04:09')
subprocess.check_output(['ipset', 'test','testmac','02:01:02:03:04:09'])
self.IPSetRemove('testmac', '02:01:02:03:04:09')
r = subprocess.call("ipset" + " test testmac 02:01:02:03:04:09", shell=True)
self.assertEqual(r, 1)
self.IPSetDestroyHashSet('testmac')
def test_ipset_hash_bitmap_ipport(self):
self.IPSetCreateHashSet('testipport', 'hash:ip,mac')
self.IPSetAdd('testipport', '1.1.1.1,02:01:02:03:04:09')
subprocess.check_output(['ipset', 'test','testipport','1.1.1.1,02:01:02:03:04:09'])
self.IPSetRemove('testipport', '1.1.1.1,02:01:02:03:04:09')
r = subprocess.call("ipset" + " test testipport 1.1.1.1,02:01:02:03:04:09", shell=True)
self.assertEqual(r, 1)
self.IPSetDestroyHashSet('testipport')
def test_ipset_hash_bitmap_ipport(self):
self.IPSetCreateHashSet('testipport', 'hash:ip,port')
self.IPSetAdd('testipport', '192.168.1.1,udp:53')
self.IPSetAdd('testipport', '192.168.1.1,5555')
subprocess.check_output(['ipset', 'test','testipport','192.168.1.1,udp:53'])
subprocess.check_output(['ipset', 'test','testipport','192.168.1.1,5555'])
self.IPSetRemove('testipport', '192.168.1.1,5555')
r = subprocess.call("ipset" + " test testipport 192.168.1.1,5555", shell=True)
self.assertEqual(r, 1)
self.IPSetDestroyHashSet('testipport')
def test_ipset_hash_bitmap_ipportip(self):
self.IPSetCreateHashSet('testipportip', 'hash:ip,port,ip')
self.IPSetAdd('testipportip', '192.168.1.1,80,10.0.0.1')
self.IPSetAdd('testipportip', '192.168.1.2,80,10.0.0.2')
subprocess.check_output(['ipset', 'test','testipportip','192.168.1.1,80,10.0.0.1'])
subprocess.check_output(['ipset', 'test','testipportip','192.168.1.1,80,10.0.0.1'])
self.IPSetRemove('testipportip', '192.168.1.1,80,10.0.0.1')
r = subprocess.call("ipset" + " test testipportip 192.168.1.1,80,10.0.0.1", shell=True)
self.assertEqual(r, 1)
self.IPSetDestroyHashSet('testipportip')
def test_ipset_hash_bitmap_netiface(self):
self.IPSetCreateHashSet('testnetiface', 'hash:net,iface')
self.IPSetAdd('testnetiface', '192.168.0/24,veth-test')
self.IPSetAdd('testnetiface', '192.167.0/24,veth-peer')
subprocess.check_output(['ipset', 'test','testnetiface','192.168.0/24,veth-test'])
subprocess.check_output(['ipset', 'test','testnetiface','192.167.0/24,veth-peer'])
self.IPSetRemove('testnetiface', '192.168.0/24,veth-test')
r = subprocess.call("ipset" + " test testnetiface 192.168.0/24,veth-test", shell=True)
self.assertEqual(r, 1)
self.IPSetDestroyHashSet('testnetiface')
if __name__ == '__main__':
unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=3))