From 6160d340c444e0bc8fbe772d31046f071a6f5605 Mon Sep 17 00:00:00 2001 From: "Brian C. Lane" Date: Thu, 11 Nov 2021 15:31:23 -0800 Subject: [PATCH] Fix monitor problem with split UTF8 characters Sometimes, depending on the buffering, or length of data being received, the end of the data could be the 1st byte or so of a UTF8 character. This would cause a crash when trying to decode the raw data buffer. This switches it to only decode once a full line has been found. It also adds tests for the LogMonitor class. --- src/pylorax/monitor.py | 33 +++++++---------- tests/pylorax/test_monitor.py | 69 +++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 19 deletions(-) create mode 100644 tests/pylorax/test_monitor.py diff --git a/src/pylorax/monitor.py b/src/pylorax/monitor.py index e0d2ed1a..8665d250 100644 --- a/src/pylorax/monitor.py +++ b/src/pylorax/monitor.py @@ -74,30 +74,25 @@ class LogRequestHandler(socketserver.BaseRequestHandler): Loops until self.server.kill is True """ log.info("Processing logs from %s", self.client_address) - line = "" + data = b"" while True: if self.server.kill: break try: - data = str(self.request.recv(4096), "utf8") - if self.fp: - self.fp.write(data) - self.fp.flush() - - # check the data for errors and set error flag - # need to assemble it into lines so we can test for the error - # string. - while data: - more = data.split("\n", 1) - line += more[0] - if len(more) > 1: - self.iserror(line) - line = "" - data = more[1] + data += self.request.recv(4096) + for line in data.splitlines(keepends=True): + if line.endswith(b"\n"): + # Ignore invalid UTF8 inside lines + self.iserror(str(line[:-1], "utf8", "ignore")) + if self.fp: + self.fp.write(str(line, "utf8", "ignore")) + self.fp.flush() + data = b"" else: - data = None - + # Not the end of the line, keep for later + data = line + break except socket.timeout: pass except Exception as e: # pylint: disable=broad-except @@ -106,7 +101,6 @@ class LogRequestHandler(socketserver.BaseRequestHandler): def finish(self): log.info("Shutting down log processing") - self.request.close() if self.fp: self.fp.close() @@ -202,3 +196,4 @@ class LogMonitor(object): """Force shutdown of the monitoring thread""" self.server.kill = True self.server_thread.join() + self.server.server_close() diff --git a/tests/pylorax/test_monitor.py b/tests/pylorax/test_monitor.py new file mode 100644 index 00000000..0e22caf5 --- /dev/null +++ b/tests/pylorax/test_monitor.py @@ -0,0 +1,69 @@ +import socket +import time +import unittest + +from pylorax.monitor import LogMonitor + +class LogMonitorTest(unittest.TestCase): + def test_monitor(self): + monitor = LogMonitor(timeout=1) + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect((monitor.host, monitor.port)) + s.sendall("Just a test string\nwith two and a half\nlines in it".encode("utf8")) + time.sleep(1) + self.assertFalse(monitor.server.log_check()) + s.sendall("\nAnother line\nTraceback (Not a real traceback)\n".encode("utf8")) + time.sleep(1) + self.assertTrue(monitor.server.log_check()) + self.assertEqual(monitor.server.error_line, "Traceback (Not a real traceback)") + finally: + monitor.shutdown() + + def test_monitor_IGNORED(self): + monitor = LogMonitor(timeout=1) + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect((monitor.host, monitor.port)) + s.sendall("Just a test string\nwith two and a half\nlines in it".encode("utf8")) + time.sleep(1) + self.assertFalse(monitor.server.log_check()) + s.sendall("\nAnother line\nIGNORED: Traceback (Not a real traceback)\n".encode("utf8")) + time.sleep(1) + self.assertFalse(monitor.server.log_check()) + self.assertEqual(monitor.server.error_line, "") + finally: + monitor.shutdown() + + def test_monitor_timeout(self): + # Timeout is in minutes so to shorten the test we pass 0.1 + monitor = LogMonitor(timeout=0.1) + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect((monitor.host, monitor.port)) + s.sendall("Just a test string\nwith two and a half\nlines in it".encode("utf8")) + time.sleep(1) + self.assertFalse(monitor.server.log_check()) + time.sleep(7) + self.assertTrue(monitor.server.log_check()) + self.assertEqual(monitor.server.error_line, "") + finally: + monitor.shutdown() + + + def test_monitor_utf8(self): + ## If a utf8 character spans the end of the 4096 byte buffer it will fail to + ## decode. Test to make sure it is reassembled correctly. + monitor = LogMonitor(timeout=1) + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect((monitor.host, monitor.port)) + + # Simulate a UTF8 character that gets broken into parts by buffering, etc. + data = "Just a test string\nTraceback (Not a real traceback)\nWith A" + s.sendall(data.encode("utf8") + b"\xc3") + time.sleep(1) + self.assertTrue(monitor.server.log_check()) + self.assertEqual(monitor.server.error_line, "Traceback (Not a real traceback)") + finally: + monitor.shutdown()