223 lines
7.8 KiB
Python
Executable File
223 lines
7.8 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# SPDX-License-Identifier: LGPL-2.1+
|
|
# ~~~
|
|
# Description: Tests for dhcpcd - a DHCP client
|
|
#
|
|
# Author: Susant Sahani <susant@redhat.com>
|
|
# Copyright (c) 2018 Red Hat, Inc.
|
|
# ~~~
|
|
|
|
import errno
|
|
import os
|
|
import sys
|
|
import time
|
|
import unittest
|
|
import subprocess
|
|
import signal
|
|
import shutil
|
|
import psutil
|
|
import socket
|
|
from pyroute2 import IPRoute
|
|
|
|
DHCPCD_CI_DIR="/var/run/dhcpcd-ci"
|
|
DHCPCD_LOG_FILE='/var/run/dhcpcd-ci/dhcpcd-test.log'
|
|
|
|
DHCPCD_TCP_DUMP_FILE='/tmp/dhcpcd-tcp-dump.pcap'
|
|
|
|
DNSMASQ_PID_FILE='/var/run/dhcpcd-ci/test-dnsmasq.pid'
|
|
DNSMASQ_LOG_FILE='/var/run/dhcpcd-ci/dnsmasq.log'
|
|
|
|
def setUpModule():
|
|
"""Initialize the environment, and perform sanity checks on it."""
|
|
|
|
if shutil.which('dhcpcd') is None:
|
|
raise OSError(errno.ENOENT, 'dhcpcd not found')
|
|
|
|
if shutil.which('dnsmasq') is None:
|
|
raise OSError(errno.ENOENT, 'dnsmasq not found')
|
|
|
|
|
|
def tearDownModule():
|
|
pass
|
|
|
|
class GenericUtilities():
|
|
"""Provide a set of utility functions start stop daemons. write config files etc """
|
|
|
|
def StartDnsMasq(self, conf):
|
|
"""Start DnsMasq"""
|
|
|
|
conf_file=os.path.join(DHCPCD_CI_DIR, conf)
|
|
log_file=DNSMASQ_LOG_FILE + '.' + conf
|
|
|
|
subprocess.check_output(['dnsmasq', '-8', log_file, '--log-dhcp', '--pid-file=/var/run/dhcpcd-ci/test-dnsmasq.pid',
|
|
'-C', conf_file, '-i', 'veth-peer', '-R', '-z'])
|
|
|
|
def StartDhcpcd(self, conf):
|
|
""" Start dhcpcd """
|
|
|
|
conf_file=os.path.join(DHCPCD_CI_DIR, conf)
|
|
log_file=DHCPCD_LOG_FILE + '.' + conf
|
|
subprocess.check_output(['dhcpcd', '-4', '-M', '-d', '--logfile', log_file, '-f', conf_file, 'veth-test'])
|
|
|
|
def StopDhcpcd(self, conf):
|
|
""" Stop dhcpcd """
|
|
subprocess.check_output(['dhcpcd', '-x', 'veth-test'])
|
|
|
|
def StopDaemon(self, pid_file):
|
|
|
|
with open(pid_file, 'r') as f:
|
|
pid = f.read().rstrip(' \t\r\n\0')
|
|
os.kill(int(pid), signal.SIGTERM)
|
|
|
|
os.remove(pid_file)
|
|
|
|
def findTextInDaemonLogs(self, log_file, **kwargs):
|
|
"""dnsmasq server logs."""
|
|
|
|
if kwargs is not None:
|
|
with open (log_file, 'rt') as in_file:
|
|
contents = in_file.read()
|
|
for key in kwargs:
|
|
self.assertRegex(contents, kwargs[key])
|
|
|
|
def FindProtocolFieldsinTCPDump(self, **kwargs):
|
|
"""Look attributes in tcpdump."""
|
|
|
|
contents = subprocess.check_output(['tcpdump', '-vv', '-r', DHCPCD_TCP_DUMP_FILE]).rstrip().decode('utf-8')
|
|
if kwargs is not None:
|
|
for key in kwargs:
|
|
self.assertRegex(contents, kwargs[key])
|
|
|
|
def SetupVethInterface(self):
|
|
"""Setup veth interface"""
|
|
|
|
ip = IPRoute()
|
|
|
|
ip.link('add', ifname='veth-test', peer='veth-peer', kind='veth')
|
|
idx_veth_test = ip.link_lookup(ifname='veth-test')[0]
|
|
idx_veth_peer = ip.link_lookup(ifname='veth-peer')[0]
|
|
|
|
ip.link('set', index=idx_veth_test, address='02:01:02:03:04:08')
|
|
ip.link('set', index=idx_veth_peer, address='02:01:02:03:04:09')
|
|
ip.link('set', index=idx_veth_test, state='up')
|
|
ip.link('set', index=idx_veth_peer, state='up')
|
|
ip.addr('add', index=idx_veth_peer, address='192.168.111.50')
|
|
ip.close()
|
|
|
|
def TearDownVethInterface(self):
|
|
|
|
ip = IPRoute()
|
|
ip.link('del', index=ip.link_lookup(ifname='veth-test')[0])
|
|
ip.close()
|
|
|
|
def StartCaptureBOOTPPackets(self):
|
|
"""Start tcpdump to capture dhcp packets"""
|
|
|
|
subprocess.check_output(['systemctl','restart', 'tcpdumpd.service'])
|
|
|
|
def StopCapturingPackets(self):
|
|
subprocess.check_output(['systemctl', 'stop', 'tcpdumpd.service'])
|
|
time.sleep(3);
|
|
|
|
class DhcpcdTests(unittest.TestCase, GenericUtilities):
|
|
|
|
def setUp(self):
|
|
""" setup veth and write radvd and dhcpv6configs """
|
|
self.SetupVethInterface()
|
|
self.pid_file = subprocess.check_output(['dhcpcd', '--printpidfile']).rstrip().decode('utf-8')
|
|
|
|
def tearDown(self):
|
|
#self.StopDhcpcd()
|
|
self.StopDaemon(self.pid_file)
|
|
self.StopDaemon(DNSMASQ_PID_FILE)
|
|
|
|
self.TearDownVethInterface()
|
|
|
|
def test_dhcpcd_ipv4(self):
|
|
""" dhcpcd gets address """
|
|
|
|
self.StartDnsMasq('dnsmasq-ipv4.conf')
|
|
time.sleep(1)
|
|
self.StartDhcpcd('dhcpcd-domain-dns.conf')
|
|
|
|
time.sleep(5)
|
|
output=subprocess.check_output(['ip','address', 'show', 'veth-test']).rstrip().decode('utf-8')
|
|
|
|
# Address prefix
|
|
self.assertRegex(output, "192.168.111.*")
|
|
|
|
# Default route
|
|
output=subprocess.check_output(['ip','route', 'show', 'dev', 'veth-test']).rstrip().decode('utf-8')
|
|
self.assertRegex(output, "default via 192.168.1.1*")
|
|
|
|
def test_dhcpcd_dns_domain(self):
|
|
""" dhcpcd request DNS and domain name """
|
|
|
|
self.StartDnsMasq('dnsmasq-ipv4.conf')
|
|
time.sleep(1)
|
|
self.StartDhcpcd('dhcpcd-domain-dns.conf')
|
|
|
|
output=subprocess.check_output(['ip','address', 'show', 'veth-test']).rstrip().decode('utf-8')
|
|
|
|
# Address prefix
|
|
self.assertRegex(output, "192.168.111.*")
|
|
|
|
# Default route
|
|
output=subprocess.check_output(['ip','route', 'show', 'dev', 'veth-test']).rstrip().decode('utf-8')
|
|
self.assertRegex(output, "default via 192.168.1.1*")
|
|
|
|
# Dump the lease file
|
|
output=subprocess.check_output(['dhcpcd','-U', '-4', 'veth-test'], stderr=subprocess.STDOUT).rstrip().decode('utf-8')
|
|
self.assertRegex(output, 'domain_name=example-test.com')
|
|
self.assertRegex(output, 'domain_name_servers=\'?8.8.8.8 8.8.4.4\'?')
|
|
self.assertRegex(output, 'routers=192.168.1.1')
|
|
|
|
def test_dhcpcd_mtu(self):
|
|
""" dhcpcd gets MTU 1492 """
|
|
|
|
self.StartDnsMasq('dnsmasq-mtu.conf')
|
|
time.sleep(1)
|
|
|
|
self.StartDhcpcd('dhcpcd-mtu.conf')
|
|
time.sleep(5)
|
|
output=subprocess.check_output(['ip','address', 'show', 'veth-test']).rstrip().decode('utf-8')
|
|
|
|
# Address prefix
|
|
self.assertRegex(output, "192.168.111.*")
|
|
|
|
# Dump the lease file
|
|
output=subprocess.check_output(['dhcpcd','-U', '-4', 'veth-test'], stderr=subprocess.STDOUT).rstrip().decode('utf-8')
|
|
self.assertRegex(output, 'interface_mtu=1492')
|
|
|
|
@unittest.skip("Known to be failing")
|
|
def test_dhcpcd_clientid_vendorclassid_userclass(self):
|
|
""" verify dhcpcd sends custom clientid vendor class id and userclass """
|
|
|
|
self.debug()
|
|
self.StartDnsMasq('dnsmasq-vendorclass.conf')
|
|
time.sleep(1)
|
|
|
|
self.StartCaptureBOOTPPackets()
|
|
|
|
self.StartDhcpcd('dhcpcd-vendorclass.conf')
|
|
time.sleep(10)
|
|
self.StopCapturingPackets()
|
|
|
|
output=subprocess.check_output(['ip','address', 'show', 'veth-test']).rstrip().decode('utf-8')
|
|
# Address prefix
|
|
self.assertRegex(output, "192.168.111.*")
|
|
|
|
self.findTextInDaemonLogs(DNSMASQ_LOG_FILE, vendor_class='vendor class: Zeus_dhcpcd_vendorclass_id',
|
|
user_class='user class: AAAA BBBB CCCC DDDD',
|
|
host_name='client provides name: Zeus')
|
|
self.findTextInDaemonLogs(DHCPCD_LOG_FILE, client_id='using ClientID 00:11:11:12:12:13:13:14:14:15:15:16:16')
|
|
self.FindProtocolFieldsinTCPDump(vendor='Vendor-Option Option 43, length 11: 104.101.108.108.111.32.119.111.114.108.100',
|
|
user_class='instance#1:.*AAAA BBBB CCCC DDDD", length 19',
|
|
vendor_class='Vendor-Class Option 60, length 26:.*Zeus_dhcpcd_vendorclass_id',
|
|
host_name='Hostname.*Zeus')
|
|
os.remove(DHCPCD_TCP_DUMP_FILE)
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout,
|
|
verbosity=3))
|