From 0708302464a6c280098e23a921ee7cd195cb7e0f Mon Sep 17 00:00:00 2001 From: "Brian C. Lane" Date: Wed, 10 Jul 2019 17:30:10 -0700 Subject: [PATCH] Use binary mode to tail the file In python 3 f.seek() on text doesn't work like it does in py2/C because text is now unicode. So change read_tail to use byte mode and take unicode into account. Also add tests for it. --- src/pylorax/sysutils.py | 37 +++++++++++------ tests/pylorax/test_sysutils.py | 72 ++++++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+), 11 deletions(-) diff --git a/src/pylorax/sysutils.py b/src/pylorax/sysutils.py index c0bc4421..c445fae1 100644 --- a/src/pylorax/sysutils.py +++ b/src/pylorax/sysutils.py @@ -1,7 +1,7 @@ # # sysutils.py # -# Copyright (C) 2009-2015 Red Hat, Inc. +# Copyright (C) 2009-2019 Red Hat, Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -133,13 +133,28 @@ def flatconfig(filename): def read_tail(path, size): """Read up to `size` kibibytes from the end of a file""" - with open(path, "r") as f: - f.seek(0, 2) - end = f.tell() - if end < 1024 * size: - f.seek(0, 0) - else: - f.seek(end - (1024 * size)) - # Find the start of the next line and return the rest - f.readline() - return f.read() + + # NOTE: In py3 text files are unicode, not bytes so we have to open it as bytes + with open(path, "rb") as f: + return _read_file_end(f, size) + +def _read_file_end(f, size): + """Read the end of a file + + This skips to the next line to avoid starting in the middle of a unicode character. + And returns "" in the case of a UnicodeDecodeError + """ + f.seek(0, 2) + end = f.tell() + if end < 1024 * size: + f.seek(0, 0) + else: + f.seek(end - (1024 * size)) + data = f.read() + try: + # Find the first newline in the block + newline = min(1+data.find(b'\n'), len(data)) + text = data[newline:].decode("UTF-8") + except UnicodeDecodeError: + return "" + return text diff --git a/tests/pylorax/test_sysutils.py b/tests/pylorax/test_sysutils.py index 8b33e7a0..818709d9 100644 --- a/tests/pylorax/test_sysutils.py +++ b/tests/pylorax/test_sysutils.py @@ -14,11 +14,13 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . # +import io import unittest import tempfile import os from pylorax.sysutils import joinpaths, touch, replace, chown_, chmod_, remove, linktree +from pylorax.sysutils import _read_file_end class SysUtilsTest(unittest.TestCase): def joinpaths_test(self): @@ -72,3 +74,73 @@ class SysUtilsTest(unittest.TestCase): linktree(os.path.join(tdname, "one"), os.path.join(tdname, "copy")) self.assertTrue(os.path.exists(os.path.join(tdname, "copy", "two", "three", "lorax-link-test-file"))) + + def _generate_lines(self, unicode=False): + # helper to generate several KiB of lines of text + bio = io.BytesIO() + for i in range(0,1024): + if not unicode: + bio.write(b"Here is another line to test. It is line #%d\n" % i) + else: + bio.write(b"Here is \xc3\xa0n\xc3\xb2ther line t\xc3\xb2 test. It is line #%d\n" % i) + bio.seek(0) + return bio + + def read_file_end_test(self): + """Test reading from the end of a file""" + self.maxDiff = None + + # file of just lines + f = self._generate_lines() + + # Grab the end of the 'file' to compare with, starting at the next line (hard-coded) + f.seek(-987, 2) + result = f.read().decode("utf-8") + f.seek(0) + self.assertEqual(_read_file_end(f, 1), result) + + # file of lines with no final \n, chop off the trailing \n + f.seek(-1,2) + f.truncate() + f.seek(0) + self.assertEqual(_read_file_end(f, 1), result[:-1]) + + # short file, truncate it at 1023 characters + f.seek(1023) + f.truncate() + # Grab the end of the file, starting at the next line (hard-coded) + f.seek(44) + result = f.read().decode("utf-8") + f.seek(0) + self.assertEqual(_read_file_end(f, 1), result) + + # short file with no line endings + f.seek(43) + f.truncate() + # Grab the whole file + f.seek(0) + result = f.read().decode("utf-8") + f.seek(0) + self.assertEqual(_read_file_end(f, 1), result) + + # file with unicode in it + f = self._generate_lines(unicode=True) + + # Grab the end of the 'file' to compare with, starting at the next line (hard-coded) + f.seek(-1000, 2) + result = f.read().decode("utf-8") + f.seek(0) + self.assertEqual(_read_file_end(f, 1), result) + + # file with unicode right on block boundary, so that a decode of it would fail if it didn't + # move to the next line. + f.seek(-1000, 2) + result = f.read().decode("utf-8") + f.seek(-1025, 2) + f.write(b"\xc3\xb2") + f.seek(0) + self.assertEqual(_read_file_end(f, 1), result) + + # Test for UnicodeDecodeError returning an empty string + f = io.BytesIO(b"\xff\xff\xffHere is a string with invalid unicode in it.") + self.assertEqual(_read_file_end(f, 1), "")