#!/usr/bin/env python3 # SPDX-License-Identifier: LGPL-2.1+ # ~~~ # Description: Tests for dhcpcd - a DHCP client # # Author: Susant Sahani # Copyright (c) 2018 Red Hat, Inc. # ~~~ import errno import os import sys import time import unittest import subprocess import signal import shutil import psutil import socket from pyroute2 import IPRoute DHCPCD_CI_DIR="/var/run/dhcpcd-ci" DHCPCD_LOG_FILE='/var/run/dhcpcd-ci/dhcpcd-test-log' DHCPCD_PID_FILE='/var/run/dhcpcd.pid' DHCPCD_TCP_DUMP_FILE='/tmp/dhcpcd-tcp-dump.pcap' DNSMASQ_PID_FILE='/var/run/dhcpcd-ci/test-dnsmasq.pid' DNSMASQ_LOG_FILE='/var/run/dhcpcd-ci/dnsmasq-log-file' def setUpModule(): """Initialize the environment, and perform sanity checks on it.""" if shutil.which('dhcpcd') is None: raise OSError(errno.ENOENT, 'dhcpcd not found') if shutil.which('dnsmasq') is None: raise OSError(errno.ENOENT, 'dnsmasq not found') def tearDownModule(): pass class GenericUtilities(): """Provide a set of utility functions start stop daemons. write config files etc """ def StartDnsMasq(self, conf): """Start DnsMasq""" conf_file=os.path.join(DHCPCD_CI_DIR, conf) subprocess.check_output(['dnsmasq', '-8', DNSMASQ_LOG_FILE, '--log-dhcp', '--pid-file=/var/run/dhcpcd-ci/test-dnsmasq.pid', '-C', conf_file, '-i', 'veth-peer', '-R']) def StartDhcpcd(self, conf): """ Start dnsmaq """ conf_file=os.path.join(DHCPCD_CI_DIR, conf) subprocess.check_output(['dhcpcd', '-4', '-M', '-d', '--logfile', DHCPCD_LOG_FILE, '-f', conf_file, 'veth-test']) def StopDaemon(self, pid_file): with open(pid_file, 'r') as f: pid = f.read().rstrip(' \t\r\n\0') os.kill(int(pid), signal.SIGTERM) os.remove(pid_file) def findTextInDaemonLogs(self, log_file, **kwargs): """dnsmasq server logs.""" if kwargs is not None: with open (log_file, 'rt') as in_file: contents = in_file.read() for key in kwargs: self.assertRegex(contents, kwargs[key]) def FindProtocolFieldsinTCPDump(self, **kwargs): """Look attributes in tcpdump.""" contents = subprocess.check_output(['tcpdump', '-vv', '-r', DHCPCD_TCP_DUMP_FILE]).rstrip().decode('utf-8') if kwargs is not None: for key in kwargs: self.assertRegex(contents, kwargs[key]) def SetupVethInterface(self): """Setup veth interface""" ip = IPRoute() ip.link('add', ifname='veth-test', peer='veth-peer', kind='veth') idx_veth_test = ip.link_lookup(ifname='veth-test')[0] idx_veth_peer = ip.link_lookup(ifname='veth-peer')[0] ip.link('set', index=idx_veth_test, address='02:01:02:03:04:08') ip.link('set', index=idx_veth_peer, address='02:01:02:03:04:09') ip.link('set', index=idx_veth_test, state='up') ip.link('set', index=idx_veth_peer, state='up') ip.addr('add', index=idx_veth_peer, address='192.168.111.50') ip.close() def TearDownVethInterface(self): ip = IPRoute() ip.link('del', index=ip.link_lookup(ifname='veth-test')[0]) ip.close() def StartCaptureBOOTPPackets(self): """Start tcpdump to capture dhcp packets""" subprocess.check_output(['systemctl','restart', 'tcpdumpd.service']) def StopCapturingPackets(self): subprocess.check_output(['systemctl', 'stop', 'tcpdumpd.service']) time.sleep(3); class DhcpcdTests(unittest.TestCase, GenericUtilities): def setUp(self): """ setup veth and write radvd and dhcpv6configs """ self.SetupVethInterface() def tearDown(self): self.StopDaemon(DHCPCD_PID_FILE) self.StopDaemon(DNSMASQ_PID_FILE) self.TearDownVethInterface() def test_dhcpcd_ipv4(self): """ dhcpcd gets address """ self.StartDnsMasq('dnsmasq-ipv4.conf') time.sleep(1) self.StartDhcpcd('dhcpcd-domain-dns.conf') time.sleep(5) output=subprocess.check_output(['ip','address', 'show', 'veth-test']).rstrip().decode('utf-8') # Address prefix self.assertRegex(output, "192.168.111.*") # Default route output=subprocess.check_output(['ip','route', 'show', 'dev', 'veth-test']).rstrip().decode('utf-8') self.assertRegex(output, "default via 192.168.1.1*") def test_dhcpcd_dns_domain(self): """ dhcpcd request DNS and domain name """ self.StartDnsMasq('dnsmasq-ipv4.conf') time.sleep(1) self.StartDhcpcd('dhcpcd-domain-dns.conf') output=subprocess.check_output(['ip','address', 'show', 'veth-test']).rstrip().decode('utf-8') # Address prefix self.assertRegex(output, "192.168.111.*") # Default route output=subprocess.check_output(['ip','route', 'show', 'dev', 'veth-test']).rstrip().decode('utf-8') self.assertRegex(output, "default via 192.168.1.1*") # Dump the lease file output=subprocess.check_output(['dhcpcd','-U', '-4', 'veth-test'], stderr=subprocess.STDOUT).rstrip().decode('utf-8') self.assertRegex(output, 'domain_name=example-test.com') self.assertRegex(output, 'domain_name_servers=\'8.8.8.8 8.8.4.4\'') self.assertRegex(output, 'routers=192.168.1.1') def test_dhcpcd_mtu(self): """ dhcpcd gets MTU 1492 """ self.StartDnsMasq('dnsmasq-mtu.conf') time.sleep(1) self.StartDhcpcd('dhcpcd-mtu.conf') time.sleep(5) output=subprocess.check_output(['ip','address', 'show', 'veth-test']).rstrip().decode('utf-8') # Address prefix self.assertRegex(output, "192.168.111.*") # Dump the lease file output=subprocess.check_output(['dhcpcd','-U', '-4', 'veth-test'], stderr=subprocess.STDOUT).rstrip().decode('utf-8') self.assertRegex(output, 'interface_mtu=1492') def test_dhcpcd_clientid_vendorclassid_userclass(self): """ verify dhcpcd sends custom clientid vendor class id and userclass """ self.StartDnsMasq('dnsmasq-vendorclass.conf') time.sleep(1) self.StartCaptureBOOTPPackets() self.StartDhcpcd('dhcpcd-vendorclass.conf') time.sleep(5) self.StopCapturingPackets() output=subprocess.check_output(['ip','address', 'show', 'veth-test']).rstrip().decode('utf-8') # Address prefix self.assertRegex(output, "192.168.111.*") self.findTextInDaemonLogs(DNSMASQ_LOG_FILE, vendor_class='vendor class: Zeus_dhcpcd_vendorclass_id', user_class='user class: AAAA BBBB CCCC DDDD', host_name='client provides name: Zeus') self.findTextInDaemonLogs(DHCPCD_LOG_FILE, client_id='using ClientID 00:11:11:12:12:13:13:14:14:15:15:16:16') self.FindProtocolFieldsinTCPDump(vendor='Vendor-Option Option 43, length 11: 104.101.108.108.111.32.119.111.114.108.100', user_class='instance#1:.*AAAA BBBB CCCC DDDD", length 19', vendor_class='Vendor-Class Option 60, length 26:.*Zeus_dhcpcd_vendorclass_id', host_name='Hostname.*Zeus') os.remove(DHCPCD_TCP_DUMP_FILE) if __name__ == '__main__': unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=3))