lldpd/tests/miscellaneous-tests/lldpd-tests.py

150 lines
5.0 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# SPDX-License-Identifier: LGPL-2.1+
# ~~~
# lldpd-test.py integration test
# Description: Test for lldpd: implementation of IEEE 802.1ab (LLDP)
#
# Author: Susant Sahani <susant@redhat.com>
# Copyright (c) 2018 Red Hat, Inc.
#~~~
import errno
import os
import sys
import time
import unittest
import subprocess
import signal
import shutil
import re
import socket
LLDPD_TCP_DUMP_FILE='/tmp/lldpd-tcp-dump.pcap'
LLDPD_PID_FILE='/var/run/lldpd.pid'
SERVICE_UNITDIR = '/run/systemd/system'
NETWORK_UNITDIR = '/run/systemd/network'
def setUpModule():
"""Initialize the environment, and perform sanity checks on it."""
if shutil.which('lldpd') is None:
raise OSError(errno.ENOENT, 'lldpd not found')
# Ensure the unit directory exists so tests can dump files into it.
os.makedirs(NETWORK_UNITDIR, exist_ok=True)
class lldpdUtilities():
"""Provide a set of utility functions start stop lldpd ."""
def Startlldpd(self):
"""Start lldpd interface lldpd-peer """
subprocess.check_output(['/usr/sbin/lldpd', '-cfse', '-D', '-C', 'lldpd-peer', '-I', 'lldpd-peer', '-S', 'lldpd-system-name','-m', '192.168.50.6'])
def Stoplldpd(self):
try:
with open(LLDPD_PID_FILE, 'r') as f:
pid = f.read().rstrip(' \t\r\n\0')
os.kill(int(pid), signal.SIGTERM)
os.remove(LLDPD_PID_FILE)
except IOError:
pass
def StartCaptureLLDPPackets(self):
"""Start tcpdump to capture packets"""
self.WriteServiceFile('tcpdump.service', '''\
[Unit]
Description=TCPDumpd
After=multi-user.target network.target
[Service]
Type=simple
ExecStart=/usr/sbin/tcpdump -pnnli lldpd ether proto 0x88cc -vvv -w "/tmp/lldpd-tcp-dump.pcap"
[Install]
WantedBy=multi-user.target
''')
subprocess.check_output(['systemctl','daemon-reload'])
subprocess.check_output(['systemctl','restart', 'tcpdump.service'])
def StopCapturingPackets(self):
subprocess.check_output(['systemctl', 'stop', 'tcpdump.service'])
time.sleep(3);
def SetupVethInterface(self):
"""Setup veth interface"""
subprocess.check_output(['ip', 'link', 'add', 'lldpd', 'type', 'veth', 'peer', 'name', 'lldpd-peer'])
subprocess.check_output(['ip', 'link', 'set', 'lldpd', 'address', '02:01:02:03:04:08'])
subprocess.check_output(['ip', 'link', 'set', 'lldpd-peer', 'address', '02:01:02:03:04:09'])
subprocess.check_output(['ip', 'link', 'set', 'lldpd', 'up'])
subprocess.check_output(['ip', 'link', 'set', 'lldpd-peer', 'up'])
time.sleep(3);
self.addCleanup(subprocess.call, ['ip', 'link', 'del', 'dev', 'lldpd'])
def WriteServiceFile(self, unit_name, contents):
"""Write a tcpdump unit file, and queue it to be removed."""
unit_path = os.path.join(SERVICE_UNITDIR, unit_name)
with open(unit_path, 'w') as unit:
unit.write(contents)
self.addCleanup(os.remove, unit_path)
def FindProtocolFieldsinTCPDump(self, **kwargs):
"""Look attributes in lldpd logs."""
contents = subprocess.check_output(['tcpdump', '-v', '-r', LLDPD_TCP_DUMP_FILE]).rstrip().decode('utf-8')
if kwargs is not None:
for key in kwargs:
self.assertRegex(contents, kwargs[key])
class lldpdTests(unittest.TestCase, lldpdUtilities):
def setUp(self):
""" Setup """
self.SetupVethInterface()
def tearDown(self):
self.Stoplldpd()
os.remove(LLDPD_TCP_DUMP_FILE)
def test_lldpd_trasmitted_lldp_attributes(self):
""" verify at the other end of veth received LLDP packets that contains attibutes (link address, hostname, TTL, system desc). tcpdump """
self.StartCaptureLLDPPackets()
time.sleep(5)
self.Startlldpd()
""" capture for 10 seconds """
time.sleep(10)
self.StopCapturingPackets()
self.FindProtocolFieldsinTCPDump(Chassis='Subtype MAC address \(4\): 02:01:02:03:04:09',
Port='Subtype MAC address \(3\): 02:01:02:03:04:09',
PortDesc='lldpd-peer',
TTL='TTL.*120s',
HostName=socket.gethostname() ,
System_Description='lldpd-system-name',
ManagementAddress='192.168.50.6')
def test_lldpd_trasmitted_lldp_packets(self):
""" verify at the other end of veth ifname lldpd has received LLDP packets. tcpdump """
self.StartCaptureLLDPPackets()
time.sleep(5)
self.Startlldpd()
""" capture for 10 seconds """
time.sleep(10)
self.StopCapturingPackets()
self.FindProtocolFieldsinTCPDump(MAC='02:01:02:03:04:09',
TTL='TTL 120s')
if __name__ == '__main__':
unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout,
verbosity=2))