diff --git a/src/pylorax/monitor.py b/src/pylorax/monitor.py index e0d2ed1a..8665d250 100644 --- a/src/pylorax/monitor.py +++ b/src/pylorax/monitor.py @@ -74,30 +74,25 @@ class LogRequestHandler(socketserver.BaseRequestHandler): Loops until self.server.kill is True """ log.info("Processing logs from %s", self.client_address) - line = "" + data = b"" while True: if self.server.kill: break try: - data = str(self.request.recv(4096), "utf8") - if self.fp: - self.fp.write(data) - self.fp.flush() - - # check the data for errors and set error flag - # need to assemble it into lines so we can test for the error - # string. - while data: - more = data.split("\n", 1) - line += more[0] - if len(more) > 1: - self.iserror(line) - line = "" - data = more[1] + data += self.request.recv(4096) + for line in data.splitlines(keepends=True): + if line.endswith(b"\n"): + # Ignore invalid UTF8 inside lines + self.iserror(str(line[:-1], "utf8", "ignore")) + if self.fp: + self.fp.write(str(line, "utf8", "ignore")) + self.fp.flush() + data = b"" else: - data = None - + # Not the end of the line, keep for later + data = line + break except socket.timeout: pass except Exception as e: # pylint: disable=broad-except @@ -106,7 +101,6 @@ class LogRequestHandler(socketserver.BaseRequestHandler): def finish(self): log.info("Shutting down log processing") - self.request.close() if self.fp: self.fp.close() @@ -202,3 +196,4 @@ class LogMonitor(object): """Force shutdown of the monitoring thread""" self.server.kill = True self.server_thread.join() + self.server.server_close() diff --git a/tests/pylorax/test_monitor.py b/tests/pylorax/test_monitor.py new file mode 100644 index 00000000..0e22caf5 --- /dev/null +++ b/tests/pylorax/test_monitor.py @@ -0,0 +1,69 @@ +import socket +import time +import unittest + +from pylorax.monitor import LogMonitor + +class LogMonitorTest(unittest.TestCase): + def test_monitor(self): + monitor = LogMonitor(timeout=1) + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect((monitor.host, monitor.port)) + s.sendall("Just a test string\nwith two and a half\nlines in it".encode("utf8")) + time.sleep(1) + self.assertFalse(monitor.server.log_check()) + s.sendall("\nAnother line\nTraceback (Not a real traceback)\n".encode("utf8")) + time.sleep(1) + self.assertTrue(monitor.server.log_check()) + self.assertEqual(monitor.server.error_line, "Traceback (Not a real traceback)") + finally: + monitor.shutdown() + + def test_monitor_IGNORED(self): + monitor = LogMonitor(timeout=1) + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect((monitor.host, monitor.port)) + s.sendall("Just a test string\nwith two and a half\nlines in it".encode("utf8")) + time.sleep(1) + self.assertFalse(monitor.server.log_check()) + s.sendall("\nAnother line\nIGNORED: Traceback (Not a real traceback)\n".encode("utf8")) + time.sleep(1) + self.assertFalse(monitor.server.log_check()) + self.assertEqual(monitor.server.error_line, "") + finally: + monitor.shutdown() + + def test_monitor_timeout(self): + # Timeout is in minutes so to shorten the test we pass 0.1 + monitor = LogMonitor(timeout=0.1) + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect((monitor.host, monitor.port)) + s.sendall("Just a test string\nwith two and a half\nlines in it".encode("utf8")) + time.sleep(1) + self.assertFalse(monitor.server.log_check()) + time.sleep(7) + self.assertTrue(monitor.server.log_check()) + self.assertEqual(monitor.server.error_line, "") + finally: + monitor.shutdown() + + + def test_monitor_utf8(self): + ## If a utf8 character spans the end of the 4096 byte buffer it will fail to + ## decode. Test to make sure it is reassembled correctly. + monitor = LogMonitor(timeout=1) + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect((monitor.host, monitor.port)) + + # Simulate a UTF8 character that gets broken into parts by buffering, etc. + data = "Just a test string\nTraceback (Not a real traceback)\nWith A" + s.sendall(data.encode("utf8") + b"\xc3") + time.sleep(1) + self.assertTrue(monitor.server.log_check()) + self.assertEqual(monitor.server.error_line, "Traceback (Not a real traceback)") + finally: + monitor.shutdown()