a1810f2a01
The content of this branch was automatically imported from Fedora ELN with the following as its source: https://src.fedoraproject.org/rpms/net-snmp#be26e76db31e227dcc75fe872ad69f39629713ad
176 lines
6.6 KiB
Python
Executable File
176 lines
6.6 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# SPDX-License-Identifier: LGPL-2.1+
|
|
# ~~~
|
|
# Description: Tests for snmpd
|
|
#
|
|
# Author: Susant Sahani <susant@redhat.com>
|
|
# Copyright (c) 2018 Red Hat, Inc.
|
|
# ~~~
|
|
|
|
import errno
|
|
import os
|
|
import sys
|
|
import time
|
|
import unittest
|
|
import subprocess
|
|
import signal
|
|
import shutil
|
|
import psutil
|
|
import socket
|
|
import platform
|
|
import re
|
|
from pyroute2 import IPRoute
|
|
from psutil import virtual_memory
|
|
from collections import OrderedDict
|
|
|
|
HOST='192.168.111.50'
|
|
|
|
def setUpModule():
|
|
"""Initialize the environment, and perform sanity checks on it."""
|
|
|
|
if shutil.which('snmpd') is None:
|
|
raise OSError(errno.ENOENT, 'snmpd not found')
|
|
|
|
if shutil.which('snmpwalk') is None:
|
|
raise OSError(errno.ENOENT, 'snmpwalk not found')
|
|
|
|
def tearDownModule():
|
|
pass
|
|
|
|
class GenericUtilities():
|
|
"""Provide a set of utility functions start stop daemons. write config files etc """
|
|
|
|
def StartSnmpd(self):
|
|
"""Start snmpd"""
|
|
subprocess.check_output(['systemctl', 'start', 'snmpd'])
|
|
|
|
def StopSnmpd(self):
|
|
"""Stop snmpd"""
|
|
subprocess.check_output(['systemctl', 'stop', 'snmpd'])
|
|
|
|
def SetupVethInterface(self):
|
|
"""Setup veth interface"""
|
|
|
|
ip = IPRoute()
|
|
|
|
ip.link('add', ifname='veth-test', peer='veth-peer', kind='veth')
|
|
idx_veth_test = ip.link_lookup(ifname='veth-test')[0]
|
|
idx_veth_peer = ip.link_lookup(ifname='veth-peer')[0]
|
|
|
|
ip.link('set', index=idx_veth_test, address='12:11:12:13:14:18')
|
|
ip.link('set', index=idx_veth_peer, address='22:21:22:23:24:29')
|
|
ip.link('set', index=idx_veth_test, state='up')
|
|
ip.link('set', index=idx_veth_peer, state='up')
|
|
ip.addr('add', index=idx_veth_test, address='192.168.111.50')
|
|
ip.addr('add', index=idx_veth_peer, address='192.168.111.51')
|
|
|
|
ip.close()
|
|
|
|
def TearDownVethInterface(self):
|
|
ip = IPRoute()
|
|
|
|
ip.link('del', index=ip.link_lookup(ifname='veth-test')[0])
|
|
ip.close()
|
|
|
|
class SnmpdTests(unittest.TestCase, GenericUtilities):
|
|
|
|
def setUp(self):
|
|
self.SetupVethInterface()
|
|
time.sleep(1)
|
|
self.StartSnmpd()
|
|
|
|
def tearDown(self):
|
|
self.StopSnmpd()
|
|
self.TearDownVethInterface()
|
|
|
|
def test_UCD_SNMP_MIB_memory(self):
|
|
''' UCD-SNMP-MIB::memory '''
|
|
|
|
subprocess.check_output(['snmpwalk', '-v2c', '-c' , 'public', HOST, 'UCD-SNMP-MIB::memory'])
|
|
|
|
meminfo=OrderedDict()
|
|
with open('/proc/meminfo') as f:
|
|
for line in f:
|
|
meminfo[line.split(':')[0]] = line.split(':')[1].strip()
|
|
|
|
output=subprocess.check_output(['snmpwalk', '-v2c', '-c' , 'public', HOST, 'UCD-SNMP-MIB::memTotalReal.0']).rstrip().decode('utf-8')
|
|
self.assertRegex(output, meminfo['MemTotal'])
|
|
|
|
def test_SNMP_hrSWRunPath(self):
|
|
""" process id """
|
|
output=subprocess.check_output(['snmpwalk', '-v2c', '-c' , 'public', HOST, 'HOST-RESOURCES-MIB::hrSWRunPath.1']).rstrip().decode('utf-8')
|
|
self.assertRegex(output, 'systemd')
|
|
|
|
def test_SNMP_IF_MIB_network_interface(self):
|
|
""" verify network interface (1.3.6.1.2.1.2.2.1) SNMP variables """
|
|
|
|
ip = IPRoute()
|
|
|
|
subprocess.check_output(['snmpwalk', '-v2c', '-c', 'public', HOST, '1.3.6.1.2.1.2.2.1'])
|
|
|
|
# 1.3.6.1.2.1.2.2.1.1 IF-MIB::ifIndex
|
|
output=subprocess.check_output(['snmpwalk', '-v2c', '-c', 'public', HOST, '1.3.6.1.2.1.2.2.1.1']).rstrip().decode('utf-8')
|
|
self.assertRegex(output, 'IF-MIB::ifIndex.1 = INTEGER: 1')
|
|
|
|
# 1.3.6.1.2.1.2.2.1.1 IF-MIB::ifDescr
|
|
output=subprocess.check_output(['snmpwalk', '-v2c', '-c', 'public', HOST, '1.3.6.1.2.1.2.2.1.2']).rstrip().decode('utf-8')
|
|
for link in ip.get_links():
|
|
self.assertRegex(output, link.get_attr('IFLA_IFNAME'))
|
|
|
|
# IP-MIB::ipAdEntAddr 1.3.6.1.2.1.4.20.1.1
|
|
output=subprocess.check_output(['snmpwalk', '-v2c', '-c', 'public', HOST, '1.3.6.1.2.1.4.20.1.1']).rstrip().decode('utf-8')
|
|
for addr in ip.get_addr():
|
|
if addr.get_attr('IFA_ADDRESS'):
|
|
if addr.get_attr('IFA_ADDRESS') != '::1' and addr.get_attr('Ifamily') == 2:
|
|
self.assertRegex(output, addr.get_attr('IFA_ADDRESS'))
|
|
|
|
# IF-MIB::ifPhysAddress. 1.3.6.1.2.1.2.2.1.6
|
|
output=subprocess.check_output(['snmpwalk', '-v2c', '-c', 'public', HOST, '1.3.6.1.2.1.2.2.1.6']).rstrip().decode('utf-8')
|
|
for link in ip.get_links():
|
|
if link.get_attr('IFLA_ADDRESS') and link.get_attr('IFLA_ADDRESS') != '00:00:00:00:00:00':
|
|
snmp_mac = re.sub(r'\b0+(\d)', r'\1', link.get_attr('IFLA_ADDRESS')).lstrip('0')
|
|
self.assertRegex(output, snmp_mac)
|
|
|
|
ip.close()
|
|
|
|
def test_SNMP_MIB_2_System(self):
|
|
""" verify RFC 1213 System (1.3.6.1.2.1.1) SNMP variables"""
|
|
|
|
subprocess.check_output(['snmpwalk', '-v2c', '-c', 'public', HOST, '1.3.6.1.2.1.1']).rstrip().decode('utf-8')
|
|
|
|
# 1.3.6.1.2.1.1.1 - sysDescr
|
|
output=subprocess.check_output(['snmpwalk', '-v2c', '-c', 'public', HOST, '1.3.6.1.2.1.1.1']).rstrip().decode('utf-8')
|
|
self.assertRegex(output, platform.machine())
|
|
self.assertRegex(output, platform.node())
|
|
self.assertRegex(output, platform.processor())
|
|
self.assertRegex(output, platform.release())
|
|
self.assertRegex(output, platform.version())
|
|
|
|
# 1.3.6.1.2.1.1.2 - sysObjectID
|
|
subprocess.check_output(['snmpwalk', '-v2c', '-c', 'public', HOST, '1.3.6.1.2.1.1.2'])
|
|
|
|
# 1.3.6.1.2.1.1.3 - sysUpTime
|
|
subprocess.check_output(['snmpwalk', '-v2c', '-c', 'public', HOST, '1.3.6.1.2.1.1.3'])
|
|
|
|
# 1.3.6.1.2.1.1.4 - sysContact
|
|
output=subprocess.check_output(['snmpwalk', '-v2c', '-c', 'public', HOST, '1.3.6.1.2.1.1.4']).rstrip().decode('utf-8')
|
|
self.assertRegex(output, 'fedora-ci <fedoraci@fedoraproject.org>')
|
|
|
|
# 1.3.6.1.2.1.1.5 - sysName
|
|
output=subprocess.check_output(['snmpwalk', '-v2c', '-c', 'public', HOST, '1.3.6.1.2.1.1.5']).rstrip().decode('utf-8')
|
|
self.assertRegex(output, socket.gethostname())
|
|
|
|
# 1.3.6.1.2.1.1.6 - sysLocation
|
|
output=subprocess.check_output(['snmpwalk', '-v2c', '-c', 'public', HOST, '1.3.6.1.2.1.1.6']).rstrip().decode('utf-8')
|
|
self.assertRegex(output, 'Pune, IN')
|
|
|
|
def test_basic_snmpwalk(self):
|
|
""" verify snmpwalk getting success snmpwalk -v2c -c public localhost """
|
|
|
|
subprocess.check_output(['snmpwalk', '-v2c', '-c', 'public', HOST])
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout,
|
|
verbosity=3))
|