From c6c9716d9267db5c5c6293744d2cd6df5e00690c Mon Sep 17 00:00:00 2001 From: "Miss Islington (bot)" <31488909+miss-islington@users.noreply.github.com> Date: Wed, 25 Jun 2025 13:07:29 +0200 Subject: [PATCH 1/4] bpo-12800: tarfile: Restore fix from 011525ee9 (GH-21409) Restore fix from 011525ee92eb1c13ad1a62d28725a840e28f8160. Backported to Python 3.6 from 9d2c2a8e3b8fe18ee1568bfa4a419847b3e78575 Co-authored-by: Julien Palard --- Lib/tarfile.py | 3 +++ Lib/test/test_tarfile.py | 6 +++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/Lib/tarfile.py b/Lib/tarfile.py index 85adf90..41cb4d5 100755 --- a/Lib/tarfile.py +++ b/Lib/tarfile.py @@ -2535,6 +2535,9 @@ class TarFile(object): try: # For systems that support symbolic and hard links. if tarinfo.issym(): + if os.path.lexists(targetpath): + # Avoid FileExistsError on following os.symlink. + os.unlink(targetpath) os.symlink(tarinfo.linkname, targetpath) else: if os.path.exists(tarinfo._link_target): diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py index 173aefd..d2d5bba 100644 --- a/Lib/test/test_tarfile.py +++ b/Lib/test/test_tarfile.py @@ -1364,11 +1364,11 @@ class WriteTest(WriteTestBase, unittest.TestCase): f.write('something\n') os.symlink(source_file, target_file) tar = tarfile.open(temparchive,'w') - tar.add(source_file) - tar.add(target_file) + tar.add(source_file, arcname="source") + tar.add(target_file, arcname="symlink") tar.close() # Let's extract it to the location which contains the symlink - tar = tarfile.open(temparchive,'r') + tar = tarfile.open(temparchive,'r', errorlevel=2) # this should not raise OSError: [Errno 17] File exists try: tar.extractall(path=tempdir) -- 2.49.0 From 3dc72bdd66f3151c7566f2f595921f37cf940561 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Langa?= Date: Tue, 24 Jun 2025 13:21:45 +0200 Subject: [PATCH 2/4] bpo-43757: Make pathlib use os.path.realpath() to resolve symlinks in a path (GH-25264) (GH-135035) Also adds a new "strict" argument to realpath() to avoid changing the default behaviour of pathlib while sharing the implementation. Backported to 3.6 from commit 00af9794dd118f7b835dd844b2b609a503ad951e Co-authored-by: Barney Gale --- Lib/pathlib.py | 140 ++++++++++--------------------------- Lib/posixpath.py | 26 +++++-- Lib/test/test_posixpath.py | 57 ++++++++++++++- 3 files changed, 112 insertions(+), 111 deletions(-) diff --git a/Lib/pathlib.py b/Lib/pathlib.py index 5e13011..361f2d1 100644 --- a/Lib/pathlib.py +++ b/Lib/pathlib.py @@ -15,15 +15,6 @@ from urllib.parse import quote_from_bytes as urlquote_from_bytes supports_symlinks = True -if os.name == 'nt': - import nt - if sys.getwindowsversion()[:2] >= (6, 0): - from nt import _getfinalpathname - else: - supports_symlinks = False - _getfinalpathname = None -else: - nt = None __all__ = [ @@ -35,6 +26,11 @@ __all__ = [ # Internals # +_WINERROR_NOT_READY = 21 # drive exists but is not accessible +_WINERROR_INVALID_NAME = 123 # fix for bpo-35306 +_WINERROR_CANT_RESOLVE_FILENAME = 1921 # broken symlink pointing to itself + + def _is_wildcard_pattern(pat): # Whether this pattern needs actual matching using fnmatch, or can # be looked up directly as a file. @@ -178,30 +174,6 @@ class _WindowsFlavour(_Flavour): def casefold_parts(self, parts): return [p.lower() for p in parts] - def resolve(self, path, strict=False): - s = str(path) - if not s: - return os.getcwd() - previous_s = None - if _getfinalpathname is not None: - if strict: - return self._ext_to_normal(_getfinalpathname(s)) - else: - tail_parts = [] # End of the path after the first one not found - while True: - try: - s = self._ext_to_normal(_getfinalpathname(s)) - except FileNotFoundError: - previous_s = s - s, tail = os.path.split(s) - tail_parts.append(tail) - if previous_s == s: - return path - else: - return os.path.join(s, *reversed(tail_parts)) - # Means fallback on absolute - return None - def _split_extended_path(self, s, ext_prefix=ext_namespace_prefix): prefix = '' if s.startswith(ext_prefix): @@ -212,10 +184,6 @@ class _WindowsFlavour(_Flavour): s = '\\' + s[3:] return prefix, s - def _ext_to_normal(self, s): - # Turn back an extended path into a normal DOS-like path - return self._split_extended_path(s)[1] - def is_reserved(self, parts): # NOTE: the rules for reserved names seem somewhat complicated # (e.g. r"..\NUL" is reserved but not r"foo\NUL"). @@ -300,51 +268,6 @@ class _PosixFlavour(_Flavour): def casefold_parts(self, parts): return parts - def resolve(self, path, strict=False): - sep = self.sep - accessor = path._accessor - seen = {} - def _resolve(path, rest): - if rest.startswith(sep): - path = '' - - for name in rest.split(sep): - if not name or name == '.': - # current dir - continue - if name == '..': - # parent dir - path, _, _ = path.rpartition(sep) - continue - newpath = path + sep + name - if newpath in seen: - # Already seen this path - path = seen[newpath] - if path is not None: - # use cached value - continue - # The symlink is not resolved, so we must have a symlink loop. - raise RuntimeError("Symlink loop from %r" % newpath) - # Resolve the symbolic link - try: - target = accessor.readlink(newpath) - except OSError as e: - if e.errno != EINVAL and strict: - raise - # Not a symlink, or non-strict mode. We just leave the path - # untouched. - path = newpath - else: - seen[newpath] = None # not resolved symlink - path = _resolve(path, target) - seen[newpath] = path # resolved symlink - - return path - # NOTE: according to POSIX, getcwd() cannot contain path components - # which are symlinks. - base = '' if path.is_absolute() else os.getcwd() - return _resolve(base, str(path)) or sep - def is_reserved(self, parts): return False @@ -421,17 +344,12 @@ class _NormalAccessor(_Accessor): replace = _wrap_binary_strfunc(os.replace) - if nt: - if supports_symlinks: - symlink = _wrap_binary_strfunc(os.symlink) - else: - def symlink(a, b, target_is_directory): - raise NotImplementedError("symlink() not available on this system") + if hasattr(os, "symlink"): + symlink = os.symlink else: - # Under POSIX, os.symlink() takes two args - @staticmethod - def symlink(a, b, target_is_directory): - return os.symlink(str(a), str(b)) + def symlink(self, src, dst, target_is_directory=False): + raise NotImplementedError("os.symlink() not available on this system") + utime = _wrap_strfunc(os.utime) @@ -439,6 +357,12 @@ class _NormalAccessor(_Accessor): def readlink(self, path): return os.readlink(path) + getcwd = os.getcwd + + expanduser = staticmethod(os.path.expanduser) + + realpath = staticmethod(os.path.realpath) + _normal_accessor = _NormalAccessor() @@ -1138,17 +1062,27 @@ class Path(PurePath): """ if self._closed: self._raise_closed() - s = self._flavour.resolve(self, strict=strict) - if s is None: - # No symlink resolution => for consistency, raise an error if - # the path doesn't exist or is forbidden - self.stat() - s = str(self.absolute()) - # Now we have no symlinks in the path, it's safe to normalize it. - normed = self._flavour.pathmod.normpath(s) - obj = self._from_parts((normed,), init=False) - obj._init(template=self) - return obj + + def check_eloop(e): + winerror = getattr(e, 'winerror', 0) + if e.errno == ELOOP or winerror == _WINERROR_CANT_RESOLVE_FILENAME: + raise RuntimeError("Symlink loop from %r" % e.filename) + + try: + s = self._accessor.realpath(self, strict=strict) + except OSError as e: + check_eloop(e) + raise + p = self._from_parts((s,)) + + # In non-strict mode, realpath() doesn't raise on symlink loops. + # Ensure we get an exception by calling stat() + if not strict: + try: + p.stat() + except OSError as e: + check_eloop(e) + return p def stat(self): """ diff --git a/Lib/posixpath.py b/Lib/posixpath.py index ca578a5..a941d94 100644 --- a/Lib/posixpath.py +++ b/Lib/posixpath.py @@ -388,16 +388,16 @@ def abspath(path): # Return a canonical path (i.e. the absolute location of a file on the # filesystem). -def realpath(filename): +def realpath(filename, *, strict=False): """Return the canonical path of the specified filename, eliminating any symbolic links encountered in the path.""" filename = os.fspath(filename) - path, ok = _joinrealpath(filename[:0], filename, {}) + path, ok = _joinrealpath(filename[:0], filename, strict, {}) return abspath(path) # Join two paths, normalizing and eliminating any symbolic links # encountered in the second path. -def _joinrealpath(path, rest, seen): +def _joinrealpath(path, rest, strict, seen): if isinstance(path, bytes): sep = b'/' curdir = b'.' @@ -426,7 +426,15 @@ def _joinrealpath(path, rest, seen): path = pardir continue newpath = join(path, name) - if not islink(newpath): + try: + st = os.lstat(newpath) + except OSError: + if strict: + raise + is_link = False + else: + is_link = stat.S_ISLNK(st.st_mode) + if not is_link: path = newpath continue # Resolve the symbolic link @@ -437,10 +445,14 @@ def _joinrealpath(path, rest, seen): # use cached value continue # The symlink is not resolved, so we must have a symlink loop. - # Return already resolved part + rest of the path unchanged. - return join(newpath, rest), False + if strict: + # Raise OSError(errno.ELOOP) + os.stat(newpath) + else: + # Return already resolved part + rest of the path unchanged. + return join(newpath, rest), False seen[newpath] = None # not resolved symlink - path, ok = _joinrealpath(path, os.readlink(newpath), seen) + path, ok = _joinrealpath(path, os.readlink(newpath), strict, seen) if not ok: return join(path, rest), False seen[newpath] = path # resolved symlink diff --git a/Lib/test/test_posixpath.py b/Lib/test/test_posixpath.py index e73b31c..d55a78f 100644 --- a/Lib/test/test_posixpath.py +++ b/Lib/test/test_posixpath.py @@ -348,12 +348,25 @@ class PosixPathTest(unittest.TestCase): finally: support.unlink(ABSTFN) + @unittest.skipUnless(hasattr(os, "symlink"), + "Missing symlink implementation") + @skip_if_ABSTFN_contains_backslash + def test_realpath_strict(self): + # Bug #43757: raise FileNotFoundError in strict mode if we encounter + # a path that does not exist. + try: + os.symlink(ABSTFN+"1", ABSTFN) + self.assertRaises(FileNotFoundError, realpath, ABSTFN, strict=True) + self.assertRaises(FileNotFoundError, realpath, ABSTFN + "2", strict=True) + finally: + support.unlink(ABSTFN) + @unittest.skipUnless(hasattr(os, "symlink"), "Missing symlink implementation") @skip_if_ABSTFN_contains_backslash def test_realpath_symlink_loops(self): # Bug #930024, return the path unchanged if we get into an infinite - # symlink loop. + # symlink loop in non-strict mode (default). try: os.symlink(ABSTFN, ABSTFN) self.assertEqual(realpath(ABSTFN), ABSTFN) @@ -390,6 +403,48 @@ class PosixPathTest(unittest.TestCase): support.unlink(ABSTFN+"c") support.unlink(ABSTFN+"a") + @unittest.skipUnless(hasattr(os, "symlink"), + "Missing symlink implementation") + @skip_if_ABSTFN_contains_backslash + def test_realpath_symlink_loops_strict(self): + # Bug #43757, raise OSError if we get into an infinite symlink loop in + # strict mode. + try: + os.symlink(ABSTFN, ABSTFN) + self.assertRaises(OSError, realpath, ABSTFN, strict=True) + + os.symlink(ABSTFN+"1", ABSTFN+"2") + os.symlink(ABSTFN+"2", ABSTFN+"1") + self.assertRaises(OSError, realpath, ABSTFN+"1", strict=True) + self.assertRaises(OSError, realpath, ABSTFN+"2", strict=True) + + self.assertRaises(OSError, realpath, ABSTFN+"1/x", strict=True) + self.assertRaises(OSError, realpath, ABSTFN+"1/..", strict=True) + self.assertRaises(OSError, realpath, ABSTFN+"1/../x", strict=True) + os.symlink(ABSTFN+"x", ABSTFN+"y") + self.assertRaises(OSError, realpath, + ABSTFN+"1/../" + basename(ABSTFN) + "y", strict=True) + self.assertRaises(OSError, realpath, + ABSTFN+"1/../" + basename(ABSTFN) + "1", strict=True) + + os.symlink(basename(ABSTFN) + "a/b", ABSTFN+"a") + self.assertRaises(OSError, realpath, ABSTFN+"a", strict=True) + + os.symlink("../" + basename(dirname(ABSTFN)) + "/" + + basename(ABSTFN) + "c", ABSTFN+"c") + self.assertRaises(OSError, realpath, ABSTFN+"c", strict=True) + + # Test using relative path as well. + with support.change_cwd(dirname(ABSTFN)): + self.assertRaises(OSError, realpath, basename(ABSTFN), strict=True) + finally: + support.unlink(ABSTFN) + support.unlink(ABSTFN+"1") + support.unlink(ABSTFN+"2") + support.unlink(ABSTFN+"y") + support.unlink(ABSTFN+"c") + support.unlink(ABSTFN+"a") + @unittest.skipUnless(hasattr(os, "symlink"), "Missing symlink implementation") @skip_if_ABSTFN_contains_backslash -- 2.49.0 From 1daf53d8c03e981ec29b0e217aee8da8a159fea3 Mon Sep 17 00:00:00 2001 From: "T. Wouters" Date: Wed, 25 Jun 2025 13:13:34 +0200 Subject: [PATCH 3/4] gh-135034: Normalize link targets in tarfile, add `os.path.realpath(strict='allow_missing')` (GH-135037) (GH-135084) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses CVEs 2024-12718, 2025-4138, 2025-4330, and 2025-4517. (cherry picked from commit 3612d8f51741b11f36f8fb0494d79086bac9390a) Co-authored-by: Ɓukasz Langa Co-authored-by: Petr Viktorin Co-authored-by: Seth Michael Larson Co-authored-by: Adam Turner <9087854+AA-Turner@users.noreply.github.com> Co-authored-by: Serhiy Storchaka --- Lib/genericpath.py | 12 +- Lib/posixpath.py | 15 +- Lib/tarfile.py | 161 +++++++++++++++---- Lib/test/test_ntpath.py | 2 + Lib/test/test_posixpath.py | 281 +++++++++++++++++++++++++++------ Lib/test/test_tarfile.py | 314 ++++++++++++++++++++++++++++++++++--- 6 files changed, 683 insertions(+), 102 deletions(-) diff --git a/Lib/genericpath.py b/Lib/genericpath.py index 303b3b3..4a80461 100644 --- a/Lib/genericpath.py +++ b/Lib/genericpath.py @@ -8,7 +8,7 @@ import stat __all__ = ['commonprefix', 'exists', 'getatime', 'getctime', 'getmtime', 'getsize', 'isdir', 'isfile', 'samefile', 'sameopenfile', - 'samestat'] + 'samestat', 'ALLOW_MISSING'] # Does a path exist? @@ -149,3 +149,13 @@ def _check_arg_types(funcname, *args): (funcname, s.__class__.__name__)) from None if hasstr and hasbytes: raise TypeError("Can't mix strings and bytes in path components") from None + + +# A singleton with a true boolean value. +@object.__new__ +class ALLOW_MISSING: + """Special value for use in realpath().""" + def __repr__(self): + return 'os.path.ALLOW_MISSING' + def __reduce__(self): + return self.__class__.__name__ diff --git a/Lib/posixpath.py b/Lib/posixpath.py index a941d94..bbc6fa4 100644 --- a/Lib/posixpath.py +++ b/Lib/posixpath.py @@ -35,7 +35,7 @@ __all__ = ["normcase","isabs","join","splitdrive","split","splitext", "samefile","sameopenfile","samestat", "curdir","pardir","sep","pathsep","defpath","altsep","extsep", "devnull","realpath","supports_unicode_filenames","relpath", - "commonpath"] + "commonpath", "ALLOW_MISSING"] def _get_sep(path): @@ -406,6 +406,15 @@ def _joinrealpath(path, rest, strict, seen): sep = '/' curdir = '.' pardir = '..' + getcwd = os.getcwd + if strict is ALLOW_MISSING: + ignored_error = FileNotFoundError + elif strict: + ignored_error = () + else: + ignored_error = OSError + + maxlinks = None if isabs(rest): rest = rest[1:] @@ -428,9 +437,7 @@ def _joinrealpath(path, rest, strict, seen): newpath = join(path, name) try: st = os.lstat(newpath) - except OSError: - if strict: - raise + except ignored_error: is_link = False else: is_link = stat.S_ISLNK(st.st_mode) diff --git a/Lib/tarfile.py b/Lib/tarfile.py index 41cb4d5..7d94b5c 100755 --- a/Lib/tarfile.py +++ b/Lib/tarfile.py @@ -769,10 +769,22 @@ class LinkOutsideDestinationError(FilterError): super().__init__(f'{tarinfo.name!r} would link to {path!r}, ' + 'which is outside the destination') +class LinkFallbackError(FilterError): + def __init__(self, tarinfo, path): + self.tarinfo = tarinfo + self._path = path + super().__init__(f'link {tarinfo.name!r} would be extracted as a ' + + f'copy of {path!r}, which was rejected') + +# Errors caused by filters -- both "fatal" and "non-fatal" -- that +# we consider to be issues with the argument, rather than a bug in the +# filter function +_FILTER_ERRORS = (FilterError, OSError, ExtractError) + def _get_filtered_attrs(member, dest_path, for_data=True): new_attrs = {} name = member.name - dest_path = os.path.realpath(dest_path) + dest_path = os.path.realpath(dest_path, strict=os.path.ALLOW_MISSING) # Strip leading / (tar's directory separator) from filenames. # Include os.sep (target OS directory separator) as well. if name.startswith(('/', os.sep)): @@ -782,7 +794,8 @@ def _get_filtered_attrs(member, dest_path, for_data=True): # For example, 'C:/foo' on Windows. raise AbsolutePathError(member) # Ensure we stay in the destination - target_path = os.path.realpath(os.path.join(dest_path, name)) + target_path = os.path.realpath(os.path.join(dest_path, name), + strict=os.path.ALLOW_MISSING) if os.path.commonpath([target_path, dest_path]) != dest_path: raise OutsideDestinationError(member, target_path) # Limit permissions (no high bits, and go-w) @@ -820,6 +833,9 @@ def _get_filtered_attrs(member, dest_path, for_data=True): if member.islnk() or member.issym(): if os.path.isabs(member.linkname): raise AbsoluteLinkError(member) + normalized = os.path.normpath(member.linkname) + if normalized != member.linkname: + new_attrs['linkname'] = normalized if member.issym(): target_path = os.path.join(dest_path, os.path.dirname(name), @@ -827,7 +843,8 @@ def _get_filtered_attrs(member, dest_path, for_data=True): else: target_path = os.path.join(dest_path, member.linkname) - target_path = os.path.realpath(target_path) + target_path = os.path.realpath(target_path, + strict=os.path.ALLOW_MISSING) if os.path.commonpath([target_path, dest_path]) != dest_path: raise LinkOutsideDestinationError(member, target_path) return new_attrs @@ -2285,30 +2302,58 @@ class TarFile(object): members = self for member in members: - tarinfo = self._get_extract_tarinfo(member, filter_function, path) + tarinfo, unfiltered = self._get_extract_tarinfo( + member, filter_function, path) if tarinfo is None: continue if tarinfo.isdir(): # For directories, delay setting attributes until later, # since permissions can interfere with extraction and # extracting contents can reset mtime. - directories.append(tarinfo) + directories.append(unfiltered) self._extract_one(tarinfo, path, set_attrs=not tarinfo.isdir(), - numeric_owner=numeric_owner) + numeric_owner=numeric_owner, + filter_function=filter_function) # Reverse sort directories. directories.sort(key=lambda a: a.name, reverse=True) + # Set correct owner, mtime and filemode on directories. - for tarinfo in directories: - dirpath = os.path.join(path, tarinfo.name) + for unfiltered in directories: try: + # Need to re-apply any filter, to take the *current* filesystem + # state into account. + try: + tarinfo = filter_function(unfiltered, path) + except _FILTER_ERRORS as exc: + self._log_no_directory_fixup(unfiltered, repr(exc)) + continue + if tarinfo is None: + self._log_no_directory_fixup(unfiltered, + 'excluded by filter') + continue + dirpath = os.path.join(path, tarinfo.name) + try: + lstat = os.lstat(dirpath) + except FileNotFoundError: + self._log_no_directory_fixup(tarinfo, 'missing') + continue + if not stat.S_ISDIR(lstat.st_mode): + # This is no longer a directory; presumably a later + # member overwrote the entry. + self._log_no_directory_fixup(tarinfo, 'not a directory') + continue self.chown(tarinfo, dirpath, numeric_owner=numeric_owner) self.utime(tarinfo, dirpath) self.chmod(tarinfo, dirpath) except ExtractError as e: self._handle_nonfatal_error(e) + def _log_no_directory_fixup(self, member, reason): + self._dbg(2, "tarfile: Not fixing up directory %r (%s)" % + (member.name, reason)) + def extract(self, member, path="", set_attrs=True, *, numeric_owner=False, filter=None): """Extract a member from the archive to the current working directory, @@ -2324,41 +2369,56 @@ class TarFile(object): String names of common filters are accepted. """ filter_function = self._get_filter_function(filter) - tarinfo = self._get_extract_tarinfo(member, filter_function, path) + tarinfo, unfiltered = self._get_extract_tarinfo( + member, filter_function, path) if tarinfo is not None: self._extract_one(tarinfo, path, set_attrs, numeric_owner) def _get_extract_tarinfo(self, member, filter_function, path): - """Get filtered TarInfo (or None) from member, which might be a str""" + """Get (filtered, unfiltered) TarInfos from *member* + + *member* might be a string. + + Return (None, None) if not found. + """ + if isinstance(member, str): - tarinfo = self.getmember(member) + unfiltered = self.getmember(member) else: - tarinfo = member + unfiltered = member - unfiltered = tarinfo + filtered = None try: - tarinfo = filter_function(tarinfo, path) + filtered = filter_function(unfiltered, path) except (OSError, FilterError) as e: self._handle_fatal_error(e) except ExtractError as e: self._handle_nonfatal_error(e) - if tarinfo is None: + if filtered is None: self._dbg(2, "tarfile: Excluded %r" % unfiltered.name) - return None + return None, None + # Prepare the link target for makelink(). - if tarinfo.islnk(): - tarinfo = copy.copy(tarinfo) - tarinfo._link_target = os.path.join(path, tarinfo.linkname) - return tarinfo + if filtered.islnk(): + filtered = copy.copy(filtered) + filtered._link_target = os.path.join(path, filtered.linkname) + return filtered, unfiltered - def _extract_one(self, tarinfo, path, set_attrs, numeric_owner): - """Extract from filtered tarinfo to disk""" + def _extract_one(self, tarinfo, path, set_attrs, numeric_owner, + filter_function=None): + """Extract from filtered tarinfo to disk. + + filter_function is only used when extracting a *different* + member (e.g. as fallback to creating a symlink) + """ self._check("r") try: self._extract_member(tarinfo, os.path.join(path, tarinfo.name), set_attrs=set_attrs, - numeric_owner=numeric_owner) + numeric_owner=numeric_owner, + filter_function=filter_function, + extraction_root=path) except OSError as e: self._handle_fatal_error(e) except ExtractError as e: @@ -2415,9 +2475,13 @@ class TarFile(object): return None def _extract_member(self, tarinfo, targetpath, set_attrs=True, - numeric_owner=False): - """Extract the TarInfo object tarinfo to a physical + numeric_owner=False, *, filter_function=None, + extraction_root=None): + """Extract the filtered TarInfo object tarinfo to a physical file called targetpath. + + filter_function is only used when extracting a *different* + member (e.g. as fallback to creating a symlink) """ # Fetch the TarInfo object for the given name # and build the destination pathname, replacing @@ -2446,7 +2510,10 @@ class TarFile(object): elif tarinfo.ischr() or tarinfo.isblk(): self.makedev(tarinfo, targetpath) elif tarinfo.islnk() or tarinfo.issym(): - self.makelink(tarinfo, targetpath) + self.makelink_with_filter( + tarinfo, targetpath, + filter_function=filter_function, + extraction_root=extraction_root) elif tarinfo.type not in SUPPORTED_TYPES: self.makeunknown(tarinfo, targetpath) else: @@ -2528,10 +2595,18 @@ class TarFile(object): os.makedev(tarinfo.devmajor, tarinfo.devminor)) def makelink(self, tarinfo, targetpath): + return self.makelink_with_filter(tarinfo, targetpath, None, None) + + def makelink_with_filter(self, tarinfo, targetpath, + filter_function, extraction_root): """Make a (symbolic) link called targetpath. If it cannot be created (platform limitation), we try to make a copy of the referenced file instead of a link. + + filter_function is only used when extracting a *different* + member (e.g. as fallback to creating a link). """ + keyerror_to_extracterror = False try: # For systems that support symbolic and hard links. if tarinfo.issym(): @@ -2539,18 +2614,38 @@ class TarFile(object): # Avoid FileExistsError on following os.symlink. os.unlink(targetpath) os.symlink(tarinfo.linkname, targetpath) + return else: if os.path.exists(tarinfo._link_target): os.link(tarinfo._link_target, targetpath) - else: - self._extract_member(self._find_link_target(tarinfo), - targetpath) + return except symlink_exception: + keyerror_to_extracterror = True + + try: + unfiltered = self._find_link_target(tarinfo) + except KeyError: + if keyerror_to_extracterror: + raise ExtractError( + "unable to resolve link inside archive") + else: + raise + + if filter_function is None: + filtered = unfiltered + else: + if extraction_root is None: + raise ExtractError( + "makelink_with_filter: if filter_function is not None, " + + "extraction_root must also not be None") try: - self._extract_member(self._find_link_target(tarinfo), - targetpath) - except KeyError: - raise ExtractError("unable to resolve link inside archive") + filtered = filter_function(unfiltered, extraction_root) + except _FILTER_ERRORS as cause: + raise LinkFallbackError(tarinfo, unfiltered.name) from cause + if filtered is not None: + self._extract_member(filtered, targetpath, + filter_function=filter_function, + extraction_root=extraction_root) def chown(self, tarinfo, targetpath, numeric_owner): """Set owner of targetpath according to tarinfo. If numeric_owner diff --git a/Lib/test/test_ntpath.py b/Lib/test/test_ntpath.py index bba1712..6666cea 100644 --- a/Lib/test/test_ntpath.py +++ b/Lib/test/test_ntpath.py @@ -1,8 +1,10 @@ import ntpath import os +import subprocess import sys import unittest import warnings +from ntpath import ALLOW_MISSING from test.support import TestFailed, FakePath from test import support, test_genericpath from tempfile import TemporaryFile diff --git a/Lib/test/test_posixpath.py b/Lib/test/test_posixpath.py index d55a78f..f41d032 100644 --- a/Lib/test/test_posixpath.py +++ b/Lib/test/test_posixpath.py @@ -1,8 +1,10 @@ import os +import sys import posixpath import unittest import warnings -from posixpath import realpath, abspath, dirname, basename +from functools import partial +from posixpath import realpath, abspath, dirname, basename, ALLOW_MISSING from test import support, test_genericpath from test.support import FakePath from unittest import mock @@ -33,6 +35,26 @@ def safe_rmdir(dirname): except OSError: pass +def _parameterize(*parameters): + """Simplistic decorator to parametrize a test + + Runs the decorated test multiple times in subTest, with a value from + 'parameters' passed as an extra positional argument. + Does *not* call doCleanups() after each run. + + Not for general use. Intended to avoid indenting for easier backports. + + See https://discuss.python.org/t/91827 for discussing generalizations. + """ + def _parametrize_decorator(func): + def _parameterized(self, *args, **kwargs): + for parameter in parameters: + with self.subTest(parameter): + func(self, *args, parameter, **kwargs) + return _parameterized + return _parametrize_decorator + + class PosixPathTest(unittest.TestCase): def setUp(self): @@ -308,43 +330,47 @@ class PosixPathTest(unittest.TestCase): b"/foo/bar") @skip_if_ABSTFN_contains_backslash - def test_realpath_curdir(self): - self.assertEqual(realpath('.'), os.getcwd()) - self.assertEqual(realpath('./.'), os.getcwd()) - self.assertEqual(realpath('/'.join(['.'] * 100)), os.getcwd()) + @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING}) + def test_realpath_curdir(self, kwargs): + self.assertEqual(realpath('.', **kwargs), os.getcwd()) + self.assertEqual(realpath('./.', **kwargs), os.getcwd()) + self.assertEqual(realpath('/'.join(['.'] * 100), **kwargs), os.getcwd()) - self.assertEqual(realpath(b'.'), os.getcwdb()) - self.assertEqual(realpath(b'./.'), os.getcwdb()) - self.assertEqual(realpath(b'/'.join([b'.'] * 100)), os.getcwdb()) + self.assertEqual(realpath(b'.', **kwargs), os.getcwdb()) + self.assertEqual(realpath(b'./.', **kwargs), os.getcwdb()) + self.assertEqual(realpath(b'/'.join([b'.'] * 100), **kwargs), os.getcwdb()) @skip_if_ABSTFN_contains_backslash - def test_realpath_pardir(self): - self.assertEqual(realpath('..'), dirname(os.getcwd())) - self.assertEqual(realpath('../..'), dirname(dirname(os.getcwd()))) - self.assertEqual(realpath('/'.join(['..'] * 100)), '/') + @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING}) + def test_realpath_pardir(self, kwargs): + self.assertEqual(realpath('..', **kwargs), dirname(os.getcwd())) + self.assertEqual(realpath('../..', **kwargs), dirname(dirname(os.getcwd()))) + self.assertEqual(realpath('/'.join(['..'] * 100), **kwargs), '/') - self.assertEqual(realpath(b'..'), dirname(os.getcwdb())) - self.assertEqual(realpath(b'../..'), dirname(dirname(os.getcwdb()))) - self.assertEqual(realpath(b'/'.join([b'..'] * 100)), b'/') + self.assertEqual(realpath(b'..', **kwargs), dirname(os.getcwdb())) + self.assertEqual(realpath(b'../..', **kwargs), dirname(dirname(os.getcwdb()))) + self.assertEqual(realpath(b'/'.join([b'..'] * 100), **kwargs), b'/') @unittest.skipUnless(hasattr(os, "symlink"), "Missing symlink implementation") @skip_if_ABSTFN_contains_backslash - def test_realpath_basic(self): + @_parameterize({}, {'strict': ALLOW_MISSING}) + def test_realpath_basic(self, kwargs): # Basic operation. try: os.symlink(ABSTFN+"1", ABSTFN) - self.assertEqual(realpath(ABSTFN), ABSTFN+"1") + self.assertEqual(realpath(ABSTFN, **kwargs), ABSTFN+"1") finally: support.unlink(ABSTFN) @unittest.skipUnless(hasattr(os, "symlink"), "Missing symlink implementation") @skip_if_ABSTFN_contains_backslash - def test_realpath_relative(self): + @_parameterize({}, {'strict': ALLOW_MISSING}) + def test_realpath_relative(self, kwargs): try: os.symlink(posixpath.relpath(ABSTFN+"1"), ABSTFN) - self.assertEqual(realpath(ABSTFN), ABSTFN+"1") + self.assertEqual(realpath(ABSTFN, **kwargs), ABSTFN+"1") finally: support.unlink(ABSTFN) @@ -361,9 +387,107 @@ class PosixPathTest(unittest.TestCase): finally: support.unlink(ABSTFN) + def test_realpath_invalid_paths(self): + path = '/\x00' + self.assertRaises(ValueError, realpath, path, strict=False) + self.assertRaises(ValueError, realpath, path, strict=True) + self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING) + path = b'/\x00' + self.assertRaises(ValueError, realpath, path, strict=False) + self.assertRaises(ValueError, realpath, path, strict=True) + self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING) + path = '/nonexistent/x\x00' + self.assertRaises(ValueError, realpath, path, strict=False) + self.assertRaises(FileNotFoundError, realpath, path, strict=True) + self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING) + path = b'/nonexistent/x\x00' + self.assertRaises(ValueError, realpath, path, strict=False) + self.assertRaises(FileNotFoundError, realpath, path, strict=True) + self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING) + path = '/\x00/..' + self.assertRaises(ValueError, realpath, path, strict=False) + self.assertRaises(ValueError, realpath, path, strict=True) + self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING) + path = b'/\x00/..' + self.assertRaises(ValueError, realpath, path, strict=False) + self.assertRaises(ValueError, realpath, path, strict=True) + self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING) + + path = '/nonexistent/x\x00/..' + self.assertRaises(ValueError, realpath, path, strict=False) + self.assertRaises(FileNotFoundError, realpath, path, strict=True) + self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING) + path = b'/nonexistent/x\x00/..' + self.assertRaises(ValueError, realpath, path, strict=False) + self.assertRaises(FileNotFoundError, realpath, path, strict=True) + self.assertRaises(ValueError, realpath, path, strict=ALLOW_MISSING) + + path = '/\udfff' + if sys.platform == 'win32': + self.assertEqual(realpath(path, strict=False), path) + self.assertRaises(FileNotFoundError, realpath, path, strict=True) + self.assertEqual(realpath(path, strict=ALLOW_MISSING), path) + else: + self.assertRaises(UnicodeEncodeError, realpath, path, strict=False) + self.assertRaises(UnicodeEncodeError, realpath, path, strict=True) + self.assertRaises(UnicodeEncodeError, realpath, path, strict=ALLOW_MISSING) + path = '/nonexistent/\udfff' + if sys.platform == 'win32': + self.assertEqual(realpath(path, strict=False), path) + self.assertEqual(realpath(path, strict=ALLOW_MISSING), path) + else: + self.assertRaises(UnicodeEncodeError, realpath, path, strict=False) + self.assertRaises(UnicodeEncodeError, realpath, path, strict=ALLOW_MISSING) + self.assertRaises(FileNotFoundError, realpath, path, strict=True) + path = '/\udfff/..' + if sys.platform == 'win32': + self.assertEqual(realpath(path, strict=False), '/') + self.assertRaises(FileNotFoundError, realpath, path, strict=True) + self.assertEqual(realpath(path, strict=ALLOW_MISSING), '/') + else: + self.assertRaises(UnicodeEncodeError, realpath, path, strict=False) + self.assertRaises(UnicodeEncodeError, realpath, path, strict=True) + self.assertRaises(UnicodeEncodeError, realpath, path, strict=ALLOW_MISSING) + path = '/nonexistent/\udfff/..' + if sys.platform == 'win32': + self.assertEqual(realpath(path, strict=False), '/nonexistent') + self.assertEqual(realpath(path, strict=ALLOW_MISSING), '/nonexistent') + else: + self.assertRaises(UnicodeEncodeError, realpath, path, strict=False) + self.assertRaises(UnicodeEncodeError, realpath, path, strict=ALLOW_MISSING) + self.assertRaises(FileNotFoundError, realpath, path, strict=True) + + path = b'/\xff' + if sys.platform == 'win32': + self.assertRaises(UnicodeDecodeError, realpath, path, strict=False) + self.assertRaises(UnicodeDecodeError, realpath, path, strict=True) + self.assertRaises(UnicodeDecodeError, realpath, path, strict=ALLOW_MISSING) + else: + self.assertEqual(realpath(path, strict=False), path) + self.assertRaises(FileNotFoundError, realpath, path, strict=True) + self.assertEqual(realpath(path, strict=ALLOW_MISSING), path) + path = b'/nonexistent/\xff' + if sys.platform == 'win32': + self.assertRaises(UnicodeDecodeError, realpath, path, strict=False) + self.assertRaises(UnicodeDecodeError, realpath, path, strict=ALLOW_MISSING) + else: + self.assertEqual(realpath(path, strict=False), path) + self.assertRaises(FileNotFoundError, realpath, path, strict=True) + @unittest.skipUnless(hasattr(os, "symlink"), "Missing symlink implementation") @skip_if_ABSTFN_contains_backslash + @_parameterize({}, {'strict': ALLOW_MISSING}) + def test_realpath_missing_pardir(self, kwargs): + try: + os.symlink(support.TESTFN + "1", support.TESTFN) + self.assertEqual( + realpath("nonexistent/../" + support.TESTFN, **kwargs), ABSTFN + "1") + finally: + support.unlink(support.TESTFN) + + @support.skip_unless_symlink + @skip_if_ABSTFN_contains_backslash def test_realpath_symlink_loops(self): # Bug #930024, return the path unchanged if we get into an infinite # symlink loop in non-strict mode (default). @@ -406,37 +530,38 @@ class PosixPathTest(unittest.TestCase): @unittest.skipUnless(hasattr(os, "symlink"), "Missing symlink implementation") @skip_if_ABSTFN_contains_backslash - def test_realpath_symlink_loops_strict(self): + @_parameterize({'strict': True}, {'strict': ALLOW_MISSING}) + def test_realpath_symlink_loops_strict(self, kwargs): # Bug #43757, raise OSError if we get into an infinite symlink loop in - # strict mode. + # the strict modes. try: os.symlink(ABSTFN, ABSTFN) - self.assertRaises(OSError, realpath, ABSTFN, strict=True) + self.assertRaises(OSError, realpath, ABSTFN, **kwargs) os.symlink(ABSTFN+"1", ABSTFN+"2") os.symlink(ABSTFN+"2", ABSTFN+"1") - self.assertRaises(OSError, realpath, ABSTFN+"1", strict=True) - self.assertRaises(OSError, realpath, ABSTFN+"2", strict=True) + self.assertRaises(OSError, realpath, ABSTFN+"1", **kwargs) + self.assertRaises(OSError, realpath, ABSTFN+"2", **kwargs) - self.assertRaises(OSError, realpath, ABSTFN+"1/x", strict=True) - self.assertRaises(OSError, realpath, ABSTFN+"1/..", strict=True) - self.assertRaises(OSError, realpath, ABSTFN+"1/../x", strict=True) + self.assertRaises(OSError, realpath, ABSTFN+"1/x", **kwargs) + self.assertRaises(OSError, realpath, ABSTFN+"1/..", **kwargs) + self.assertRaises(OSError, realpath, ABSTFN+"1/../x", **kwargs) os.symlink(ABSTFN+"x", ABSTFN+"y") self.assertRaises(OSError, realpath, - ABSTFN+"1/../" + basename(ABSTFN) + "y", strict=True) + ABSTFN+"1/../" + basename(ABSTFN) + "y", **kwargs) self.assertRaises(OSError, realpath, - ABSTFN+"1/../" + basename(ABSTFN) + "1", strict=True) + ABSTFN+"1/../" + basename(ABSTFN) + "1", **kwargs) os.symlink(basename(ABSTFN) + "a/b", ABSTFN+"a") - self.assertRaises(OSError, realpath, ABSTFN+"a", strict=True) + self.assertRaises(OSError, realpath, ABSTFN+"a", **kwargs) os.symlink("../" + basename(dirname(ABSTFN)) + "/" + basename(ABSTFN) + "c", ABSTFN+"c") - self.assertRaises(OSError, realpath, ABSTFN+"c", strict=True) + self.assertRaises(OSError, realpath, ABSTFN+"c", **kwargs) # Test using relative path as well. with support.change_cwd(dirname(ABSTFN)): - self.assertRaises(OSError, realpath, basename(ABSTFN), strict=True) + self.assertRaises(OSError, realpath, basename(ABSTFN), **kwargs) finally: support.unlink(ABSTFN) support.unlink(ABSTFN+"1") @@ -448,13 +573,14 @@ class PosixPathTest(unittest.TestCase): @unittest.skipUnless(hasattr(os, "symlink"), "Missing symlink implementation") @skip_if_ABSTFN_contains_backslash - def test_realpath_repeated_indirect_symlinks(self): + @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING}) + def test_realpath_repeated_indirect_symlinks(self, kwargs): # Issue #6975. try: os.mkdir(ABSTFN) os.symlink('../' + basename(ABSTFN), ABSTFN + '/self') os.symlink('self/self/self', ABSTFN + '/link') - self.assertEqual(realpath(ABSTFN + '/link'), ABSTFN) + self.assertEqual(realpath(ABSTFN + '/link', **kwargs), ABSTFN) finally: support.unlink(ABSTFN + '/self') support.unlink(ABSTFN + '/link') @@ -463,14 +589,15 @@ class PosixPathTest(unittest.TestCase): @unittest.skipUnless(hasattr(os, "symlink"), "Missing symlink implementation") @skip_if_ABSTFN_contains_backslash - def test_realpath_deep_recursion(self): + @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING}) + def test_realpath_deep_recursion(self, kwargs): depth = 10 try: os.mkdir(ABSTFN) for i in range(depth): os.symlink('/'.join(['%d' % i] * 10), ABSTFN + '/%d' % (i + 1)) os.symlink('.', ABSTFN + '/0') - self.assertEqual(realpath(ABSTFN + '/%d' % depth), ABSTFN) + self.assertEqual(realpath(ABSTFN + '/%d' % depth, **kwargs), ABSTFN) # Test using relative path as well. with support.change_cwd(ABSTFN): @@ -483,7 +610,8 @@ class PosixPathTest(unittest.TestCase): @unittest.skipUnless(hasattr(os, "symlink"), "Missing symlink implementation") @skip_if_ABSTFN_contains_backslash - def test_realpath_resolve_parents(self): + @_parameterize({}, {'strict': ALLOW_MISSING}) + def test_realpath_resolve_parents(self, kwargs): # We also need to resolve any symlinks in the parents of a relative # path passed to realpath. E.g.: current working directory is # /usr/doc with 'doc' being a symlink to /usr/share/doc. We call @@ -494,7 +622,8 @@ class PosixPathTest(unittest.TestCase): os.symlink(ABSTFN + "/y", ABSTFN + "/k") with support.change_cwd(ABSTFN + "/k"): - self.assertEqual(realpath("a"), ABSTFN + "/y/a") + self.assertEqual(realpath("a", **kwargs), + ABSTFN + "/y/a") finally: support.unlink(ABSTFN + "/k") safe_rmdir(ABSTFN + "/y") @@ -503,7 +632,8 @@ class PosixPathTest(unittest.TestCase): @unittest.skipUnless(hasattr(os, "symlink"), "Missing symlink implementation") @skip_if_ABSTFN_contains_backslash - def test_realpath_resolve_before_normalizing(self): + @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING}) + def test_realpath_resolve_before_normalizing(self, kwargs): # Bug #990669: Symbolic links should be resolved before we # normalize the path. E.g.: if we have directories 'a', 'k' and 'y' # in the following hierarchy: @@ -518,10 +648,10 @@ class PosixPathTest(unittest.TestCase): os.symlink(ABSTFN + "/k/y", ABSTFN + "/link-y") # Absolute path. - self.assertEqual(realpath(ABSTFN + "/link-y/.."), ABSTFN + "/k") + self.assertEqual(realpath(ABSTFN + "/link-y/..", **kwargs), ABSTFN + "/k") # Relative path. with support.change_cwd(dirname(ABSTFN)): - self.assertEqual(realpath(basename(ABSTFN) + "/link-y/.."), + self.assertEqual(realpath(basename(ABSTFN) + "/link-y/..", **kwargs), ABSTFN + "/k") finally: support.unlink(ABSTFN + "/link-y") @@ -532,7 +662,8 @@ class PosixPathTest(unittest.TestCase): @unittest.skipUnless(hasattr(os, "symlink"), "Missing symlink implementation") @skip_if_ABSTFN_contains_backslash - def test_realpath_resolve_first(self): + @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING}) + def test_realpath_resolve_first(self, kwargs): # Bug #1213894: The first component of the path, if not absolute, # must be resolved too. @@ -542,13 +673,70 @@ class PosixPathTest(unittest.TestCase): os.symlink(ABSTFN, ABSTFN + "link") with support.change_cwd(dirname(ABSTFN)): base = basename(ABSTFN) - self.assertEqual(realpath(base + "link"), ABSTFN) - self.assertEqual(realpath(base + "link/k"), ABSTFN + "/k") + self.assertEqual(realpath(base + "link", **kwargs), ABSTFN) + self.assertEqual(realpath(base + "link/k", **kwargs), ABSTFN + "/k") finally: support.unlink(ABSTFN + "link") safe_rmdir(ABSTFN + "/k") safe_rmdir(ABSTFN) + @support.skip_unless_symlink + @skip_if_ABSTFN_contains_backslash + @unittest.skipIf(os.chmod not in os.supports_follow_symlinks, "Can't set symlink permissions") + @unittest.skipIf(sys.platform != "darwin", "only macOS requires read permission to readlink()") + @_parameterize({'strict': True}, {'strict': ALLOW_MISSING}) + def test_realpath_unreadable_symlink_strict(self, kwargs): + try: + os.symlink(ABSTFN+"1", ABSTFN) + os.chmod(ABSTFN, 0o000, follow_symlinks=False) + with self.assertRaises(PermissionError): + realpath(ABSTFN, **kwargs) + with self.assertRaises(PermissionError): + realpath(ABSTFN + '/foo', **kwargs), + with self.assertRaises(PermissionError): + realpath(ABSTFN + '/../foo', **kwargs) + with self.assertRaises(PermissionError): + realpath(ABSTFN + '/foo/..', **kwargs) + finally: + os.chmod(ABSTFN, 0o755, follow_symlinks=False) + os.unlink(ABSTFN) + + @skip_if_ABSTFN_contains_backslash + @support.skip_unless_symlink + def test_realpath_unreadable_directory(self): + try: + os.mkdir(ABSTFN) + os.mkdir(ABSTFN + '/k') + os.chmod(ABSTFN, 0o000) + self.assertEqual(realpath(ABSTFN, strict=False), ABSTFN) + self.assertEqual(realpath(ABSTFN, strict=True), ABSTFN) + self.assertEqual(realpath(ABSTFN, strict=ALLOW_MISSING), ABSTFN) + + try: + os.stat(ABSTFN) + except PermissionError: + pass + else: + self.skipTest('Cannot block permissions') + + self.assertEqual(realpath(ABSTFN + '/k', strict=False), + ABSTFN + '/k') + self.assertRaises(PermissionError, realpath, ABSTFN + '/k', + strict=True) + self.assertRaises(PermissionError, realpath, ABSTFN + '/k', + strict=ALLOW_MISSING) + + self.assertEqual(realpath(ABSTFN + '/missing', strict=False), + ABSTFN + '/missing') + self.assertRaises(PermissionError, realpath, ABSTFN + '/missing', + strict=True) + self.assertRaises(PermissionError, realpath, ABSTFN + '/missing', + strict=ALLOW_MISSING) + finally: + os.chmod(ABSTFN, 0o755) + safe_rmdir(ABSTFN + '/k') + safe_rmdir(ABSTFN) + def test_relpath(self): (real_getcwd, os.getcwd) = (os.getcwd, lambda: r"/home/user/bar") try: @@ -725,9 +913,12 @@ class PathLikeTests(unittest.TestCase): def test_path_abspath(self): self.assertPathEqual(self.path.abspath) - def test_path_realpath(self): + @_parameterize({}, {'strict': True}, {'strict': ALLOW_MISSING}) + def test_path_realpath(self, kwargs): self.assertPathEqual(self.path.realpath) + self.assertPathEqual(partial(self.path.realpath, **kwargs)) + def test_path_relpath(self): self.assertPathEqual(self.path.relpath) diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py index d2d5bba..a58994c 100644 --- a/Lib/test/test_tarfile.py +++ b/Lib/test/test_tarfile.py @@ -2246,9 +2246,35 @@ class MiscTest(unittest.TestCase): 'tar_filter', 'FilterError', 'AbsoluteLinkError', 'OutsideDestinationError', 'SpecialFileError', 'AbsolutePathError', 'LinkOutsideDestinationError', + 'LinkFallbackError', } support.check__all__(self, tarfile, blacklist=blacklist) + @unittest.skipUnless(support.can_symlink(), 'requires symlink support') + @unittest.skipUnless(hasattr(os, 'chmod'), "missing os.chmod") + @unittest.mock.patch('os.chmod') + def test_deferred_directory_attributes_update(self, mock_chmod): + # Regression test for gh-127987: setting attributes on arbitrary files + tempdir = os.path.join(TEMPDIR, 'test127987') + def mock_chmod_side_effect(path, mode, **kwargs): + target_path = os.path.realpath(path) + if os.path.commonpath([target_path, tempdir]) != tempdir: + raise Exception("should not try to chmod anything outside the destination", target_path) + mock_chmod.side_effect = mock_chmod_side_effect + + outside_tree_dir = os.path.join(TEMPDIR, 'outside_tree_dir') + with ArchiveMaker() as arc: + arc.add('x', symlink_to='.') + arc.add('x', type=tarfile.DIRTYPE, mode='?rwsrwsrwt') + arc.add('x', symlink_to=outside_tree_dir) + + os.makedirs(outside_tree_dir) + try: + arc.open().extractall(path=tempdir, filter='tar') + finally: + support.rmtree(outside_tree_dir) + support.rmtree(tempdir) + class CommandLineTest(unittest.TestCase): @@ -2785,6 +2811,10 @@ class NoneInfoExtractTests(ReadTest): got_paths = set( p.relative_to(directory) for p in pathlib.Path(directory).glob('**/*')) + if self.extraction_filter == 'data': + # The 'data' filter is expected to reject special files + for path in 'ustar/fifotype', 'ustar/blktype', 'ustar/chrtype': + got_paths.discard(pathlib.Path(path)) self.assertEqual(self.control_paths, got_paths) @contextmanager @@ -3011,12 +3041,28 @@ class ArchiveMaker: self.bio = None def add(self, name, *, type=None, symlink_to=None, hardlink_to=None, - mode=None, size=None, **kwargs): - """Add a member to the test archive. Call within `with`.""" + mode=None, size=None, content=None, **kwargs): + """Add a member to the test archive. Call within `with`. + + Provides many shortcuts: + - default `type` is based on symlink_to, hardlink_to, and trailing `/` + in name (which is stripped) + - size & content defaults are based on each other + - content can be str or bytes + - mode should be textual ('-rwxrwxrwx') + + (add more! this is unstable internal test-only API) + """ name = str(name) tarinfo = tarfile.TarInfo(name).replace(**kwargs) + if content is not None: + if isinstance(content, str): + content = content.encode() + size = len(content) if size is not None: tarinfo.size = size + if content is None: + content = bytes(tarinfo.size) if mode: tarinfo.mode = _filemode_to_int(mode) if symlink_to is not None: @@ -3030,7 +3076,7 @@ class ArchiveMaker: if type is not None: tarinfo.type = type if tarinfo.isreg(): - fileobj = io.BytesIO(bytes(tarinfo.size)) + fileobj = io.BytesIO(content) else: fileobj = None self.tar_w.addfile(tarinfo, fileobj) @@ -3052,7 +3098,7 @@ class TestExtractionFilters(unittest.TestCase): destdir = outerdir / 'dest' @contextmanager - def check_context(self, tar, filter): + def check_context(self, tar, filter, *, check_flag=True, ignored_trees=()): """Extracts `tar` to `self.destdir` and allows checking the result If an error occurs, it must be checked using `expect_exception` @@ -3061,27 +3107,46 @@ class TestExtractionFilters(unittest.TestCase): except the destination directory itself and parent directories of other files. When checking directories, do so before their contents. + + A file called 'flag' is made in outerdir (i.e. outside destdir) + before extraction; it should not be altered nor should its contents + be read/copied. + + *ignored_trees* is a set of directories to remove (including their + contents) right after the archive is extracted. It is a workaround + for Path.glob() failing to get all files in Python 3.10 and below. """ with support.temp_dir(self.outerdir): + flag_path = self.outerdir / 'flag' + flag_path.write_text('capture me') try: tar.extractall(self.destdir, filter=filter) except Exception as exc: self.raised_exception = exc + self.reraise_exception = True self.expected_paths = set() else: + for ignored_tree in ignored_trees: + support.rmtree((self.destdir / ignored_tree).resolve()) self.raised_exception = None + self.reraise_exception = False self.expected_paths = set(self.outerdir.glob('**/*')) self.expected_paths.discard(self.destdir) + self.expected_paths.discard(flag_path) try: - yield + yield self finally: tar.close() - if self.raised_exception: + if self.reraise_exception: raise self.raised_exception self.assertEqual(self.expected_paths, set()) + if check_flag: + self.assertEqual(flag_path.read_text(), 'capture me') + else: + assert filter == 'fully_trusted' def expect_file(self, name, type=None, symlink_to=None, mode=None, - size=None): + size=None, content=None): """Check a single file. See check_context.""" if self.raised_exception: raise self.raised_exception @@ -3091,7 +3156,7 @@ class TestExtractionFilters(unittest.TestCase): self.expected_paths.remove(path) # When checking mode, ignore Windows (which can only set user read and - # user write bits). Newer versions of Python use `os_helper.can_chmod()` + # user write bits). Newer versions of Python use `support.can_chmod()` # instead of hardcoding Windows. if mode is not None and sys.platform != 'win32': got = stat.filemode(stat.S_IMODE(path.stat().st_mode)) @@ -3105,26 +3170,45 @@ class TestExtractionFilters(unittest.TestCase): # The symlink might be the same (textually) as what we expect, # but some systems change the link to an equivalent path, so # we fall back to samefile(). - if expected != got: - self.assertTrue(got.samefile(expected)) + try: + if expected != got: + self.assertTrue(got.samefile(expected)) + except Exception as e: + # attach a note, so it's shown even if `samefile` fails + e.add_note(f'{expected=}, {got=}') + raise elif type == tarfile.REGTYPE or type is None: self.assertTrue(path.is_file()) elif type == tarfile.DIRTYPE: self.assertTrue(path.is_dir()) elif type == tarfile.FIFOTYPE: self.assertTrue(path.is_fifo()) + elif type == tarfile.SYMTYPE: + self.assertTrue(path.is_symlink()) else: raise NotImplementedError(type) if size is not None: self.assertEqual(path.stat().st_size, size) + if content is not None: + self.assertEqual(path.read_text(), content) for parent in path.parents: self.expected_paths.discard(parent) + def expect_any_tree(self, name): + """Check a directory; forget about its contents.""" + tree_path = (self.destdir / name).resolve() + self.expect_file(tree_path, type=tarfile.DIRTYPE) + self.expected_paths = { + p for p in self.expected_paths + if tree_path not in p.parents + } + def expect_exception(self, exc_type, message_re='.'): with self.assertRaisesRegex(exc_type, message_re): if self.raised_exception is not None: raise self.raised_exception - self.raised_exception = None + self.reraise_exception = False + return self.raised_exception def test_benign_file(self): with ArchiveMaker() as arc: @@ -3201,6 +3285,78 @@ class TestExtractionFilters(unittest.TestCase): with self.check_context(arc.open(), 'data'): self.expect_file('parent/evil') + @support.skip_unless_symlink + def test_realpath_limit_attack(self): + # (CVE-2025-4517) + + with ArchiveMaker() as arc: + # populate the symlinks and dirs that expand in os.path.realpath() + # The component length is chosen so that in common cases, the unexpanded + # path fits in PATH_MAX, but it overflows when the final symlink + # is expanded + steps = "abcdefghijklmnop" + if sys.platform == 'win32': + component = 'd' * 25 + elif 'PC_PATH_MAX' in os.pathconf_names: + max_path_len = os.pathconf(self.outerdir.parent, "PC_PATH_MAX") + path_sep_len = 1 + dest_len = len(str(self.destdir)) + path_sep_len + component_len = (max_path_len - dest_len) // (len(steps) + path_sep_len) + component = 'd' * component_len + else: + raise NotImplementedError("Need to guess component length for {sys.platform}") + path = "" + step_path = "" + for i in steps: + arc.add(os.path.join(path, component), type=tarfile.DIRTYPE, + mode='drwxrwxrwx') + arc.add(os.path.join(path, i), symlink_to=component) + path = os.path.join(path, component) + step_path = os.path.join(step_path, i) + # create the final symlink that exceeds PATH_MAX and simply points + # to the top dir. + # this link will never be expanded by + # os.path.realpath(strict=False), nor anything after it. + linkpath = os.path.join(*steps, "l"*254) + parent_segments = [".."] * len(steps) + arc.add(linkpath, symlink_to=os.path.join(*parent_segments)) + # make a symlink outside to keep the tar command happy + arc.add("escape", symlink_to=os.path.join(linkpath, "..")) + # use the symlinks above, that are not checked, to create a hardlink + # to a file outside of the destination path + arc.add("flaglink", hardlink_to=os.path.join("escape", "flag")) + # now that we have the hardlink we can overwrite the file + arc.add("flaglink", content='overwrite') + # we can also create new files as well! + arc.add("escape/newfile", content='new') + + with (self.subTest('fully_trusted'), + self.check_context(arc.open(), filter='fully_trusted', + check_flag=False, ignored_trees={component})): + if sys.platform == 'win32': + self.expect_exception((FileNotFoundError, FileExistsError)) + elif self.raised_exception: + # Cannot symlink/hardlink: tarfile falls back to getmember() + self.expect_exception(KeyError) + # Otherwise, this block should never enter. + else: + self.expect_file('flaglink', content='overwrite') + self.expect_file('../newfile', content='new') + self.expect_file('escape', type=tarfile.SYMTYPE) + self.expect_file('a', symlink_to=component) + + for filter in 'tar', 'data': + with self.subTest(filter), self.check_context(arc.open(), filter=filter): + exc = self.expect_exception((OSError, KeyError)) + if isinstance(exc, OSError): + if sys.platform == 'win32': + # 3: ERROR_PATH_NOT_FOUND + # 5: ERROR_ACCESS_DENIED + # 206: ERROR_FILENAME_EXCED_RANGE + self.assertIn(exc.winerror, (3, 5, 206)) + else: + self.assertEqual(exc.errno, errno.ENAMETOOLONG) + def test_parent_symlink2(self): # Test interplaying symlinks # Inspired by 'dirsymlink2b' in jwilk/traversal-archives @@ -3319,8 +3475,8 @@ class TestExtractionFilters(unittest.TestCase): def test_deep_symlink(self): with ArchiveMaker() as arc: arc.add('targetdir/target', size=3) - arc.add('linkdir/hardlink', hardlink_to='targetdir/target') - arc.add('linkdir/symlink', symlink_to='../targetdir/target') + arc.add('linkdir/hardlink', hardlink_to=os.path.join('targetdir', 'target')) + arc.add('linkdir/symlink', symlink_to=os.path.join('..', 'targetdir', 'target')) for filter in 'tar', 'data', 'fully_trusted': with self.check_context(arc.open(), filter): @@ -3332,6 +3488,126 @@ class TestExtractionFilters(unittest.TestCase): else: self.expect_file('linkdir/symlink', size=3) + def test_sneaky_hardlink_fallback(self): + # (CVE-2025-4330) + # Test that when hardlink extraction falls back to extracting members + # from the archive, the extracted member is (re-)filtered. + with ArchiveMaker() as arc: + # Create a directory structure so the c/escape symlink stays + # inside the path + arc.add("a/t/dummy") + # Create b/ directory + arc.add("b/") + # Point "c" to the bottom of the tree in "a" + arc.add("c", symlink_to=os.path.join("a", "t")) + # link to non-existant location under "a" + arc.add("c/escape", symlink_to=os.path.join("..", "..", + "link_here")) + # Move "c" to point to "b" ("c/escape" no longer exists) + arc.add("c", symlink_to="b") + # Attempt to create a hard link to "c/escape". Since it doesn't + # exist it will attempt to extract "cescape" but at "boom". + arc.add("boom", hardlink_to=os.path.join("c", "escape")) + + with self.check_context(arc.open(), 'data'): + if not support.can_symlink(): + # When 'c/escape' is extracted, 'c' is a regular + # directory, and 'c/escape' *would* point outside + # the destination if symlinks were allowed. + self.expect_exception( + tarfile.LinkOutsideDestinationError) + elif sys.platform == "win32": + # On Windows, 'c/escape' points outside the destination + self.expect_exception(tarfile.LinkOutsideDestinationError) + else: + e = self.expect_exception( + tarfile.LinkFallbackError, + "link 'boom' would be extracted as a copy of " + "'c/escape', which was rejected") + self.assertIsInstance(e.__cause__, + tarfile.LinkOutsideDestinationError) + for filter in 'tar', 'fully_trusted': + with self.subTest(filter), self.check_context(arc.open(), filter): + if not support.can_symlink(): + self.expect_file("a/t/dummy") + self.expect_file("b/") + self.expect_file("c/") + else: + self.expect_file("a/t/dummy") + self.expect_file("b/") + self.expect_file("a/t/escape", symlink_to='../../link_here') + self.expect_file("boom", symlink_to='../../link_here') + self.expect_file("c", symlink_to='b') + + def test_exfiltration_via_symlink(self): + # (CVE-2025-4138) + # Test changing symlinks that result in a symlink pointing outside + # the extraction directory, unless prevented by 'data' filter's + # normalization. + with ArchiveMaker() as arc: + arc.add("escape", symlink_to=os.path.join('link', 'link', '..', '..', 'link-here')) + arc.add("link", symlink_to='./') + + for filter in 'tar', 'data', 'fully_trusted': + with self.check_context(arc.open(), filter): + if support.can_symlink(): + self.expect_file("link", symlink_to='./') + if filter == 'data': + self.expect_file("escape", symlink_to='link-here') + else: + self.expect_file("escape", + symlink_to='link/link/../../link-here') + else: + # Nothing is extracted. + pass + + def test_chmod_outside_dir(self): + # (CVE-2024-12718) + # Test that members used for delayed updates of directory metadata + # are (re-)filtered. + with ArchiveMaker() as arc: + # "pwn" is a veeeery innocent symlink: + arc.add("a/pwn", symlink_to='.') + # But now "pwn" is also a directory, so it's scheduled to have its + # metadata updated later: + arc.add("a/pwn/", mode='drwxrwxrwx') + # Oops, "pwn" is not so innocent any more: + arc.add("a/pwn", symlink_to='x/../') + # Newly created symlink points to the dest dir, + # so it's OK for the "data" filter. + arc.add('a/x', symlink_to=('../')) + # But now "pwn" points outside the dest dir + + for filter in 'tar', 'data', 'fully_trusted': + with self.check_context(arc.open(), filter) as cc: + if not support.can_symlink(): + self.expect_file("a/pwn/") + elif filter == 'data': + self.expect_file("a/x", symlink_to='../') + self.expect_file("a/pwn", symlink_to='.') + else: + self.expect_file("a/x", symlink_to='../') + self.expect_file("a/pwn", symlink_to='x/../') + if sys.platform != "win32": + st_mode = cc.outerdir.stat().st_mode + self.assertNotEqual(st_mode & 0o777, 0o777) + + def test_link_fallback_normalizes(self): + # Make sure hardlink fallbacks work for non-normalized paths for all + # filters + with ArchiveMaker() as arc: + arc.add("dir/") + arc.add("dir/../afile") + arc.add("link1", hardlink_to='dir/../afile') + arc.add("link2", hardlink_to='dir/../dir/../afile') + + for filter in 'tar', 'data', 'fully_trusted': + with self.check_context(arc.open(), filter) as cc: + self.expect_file("dir/") + self.expect_file("afile") + self.expect_file("link1") + self.expect_file("link2") + def test_modes(self): # Test how file modes are extracted # (Note that the modes are ignored on platforms without working chmod) @@ -3433,7 +3709,7 @@ class TestExtractionFilters(unittest.TestCase): # The 'tar' filter returns TarInfo objects with the same name/type. # (It can also fail for particularly "evil" input, but we don't have # that in the test archive.) - with tarfile.TarFile.open(tarname) as tar: + with tarfile.TarFile.open(tarname, encoding="iso8859-1") as tar: for tarinfo in tar.getmembers(): filtered = tarfile.tar_filter(tarinfo, '') self.assertIs(filtered.name, tarinfo.name) @@ -3442,7 +3718,7 @@ class TestExtractionFilters(unittest.TestCase): def test_data_filter(self): # The 'data' filter either raises, or returns TarInfo with the same # name/type. - with tarfile.TarFile.open(tarname) as tar: + with tarfile.TarFile.open(tarname, encoding="iso8859-1") as tar: for tarinfo in tar.getmembers(): try: filtered = tarfile.data_filter(tarinfo, '') @@ -3572,13 +3848,13 @@ class TestExtractionFilters(unittest.TestCase): # If errorlevel is 0, errors affected by errorlevel are ignored with self.check_context(arc.open(errorlevel=0), extracterror_filter): - self.expect_file('file') + pass with self.check_context(arc.open(errorlevel=0), filtererror_filter): - self.expect_file('file') + pass with self.check_context(arc.open(errorlevel=0), oserror_filter): - self.expect_file('file') + pass with self.check_context(arc.open(errorlevel=0), tarerror_filter): self.expect_exception(tarfile.TarError) @@ -3589,7 +3865,7 @@ class TestExtractionFilters(unittest.TestCase): # If 1, all fatal errors are raised with self.check_context(arc.open(errorlevel=1), extracterror_filter): - self.expect_file('file') + pass with self.check_context(arc.open(errorlevel=1), filtererror_filter): self.expect_exception(tarfile.FilterError) -- 2.49.0 From f60e23abc887f7409a31c30f3ae2eeb5c75cfaae Mon Sep 17 00:00:00 2001 From: Lumir Balhar Date: Wed, 25 Jun 2025 13:17:00 +0200 Subject: [PATCH 4/4] Code and compatibility fixes for Python 3.6 --- Lib/pathlib.py | 2 +- Lib/test/test_tarfile.py | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/Lib/pathlib.py b/Lib/pathlib.py index 361f2d1..0f0fc38 100644 --- a/Lib/pathlib.py +++ b/Lib/pathlib.py @@ -8,7 +8,7 @@ import re import sys from collections import Sequence from contextlib import contextmanager -from errno import EINVAL, ENOENT, ENOTDIR +from errno import EINVAL, ENOENT, ENOTDIR, ELOOP from operator import attrgetter from stat import S_ISDIR, S_ISLNK, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO from urllib.parse import quote_from_bytes as urlquote_from_bytes diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py index a58994c..01da819 100644 --- a/Lib/test/test_tarfile.py +++ b/Lib/test/test_tarfile.py @@ -1,3 +1,4 @@ +import errno import sys import os import io @@ -3175,7 +3176,9 @@ class TestExtractionFilters(unittest.TestCase): self.assertTrue(got.samefile(expected)) except Exception as e: # attach a note, so it's shown even if `samefile` fails - e.add_note(f'{expected=}, {got=}') + # add_note not supported on Python < 3.11 + # new style formating with = not supported in 3.6 as well + # e.add_note(f'{expected=}, {got=}') raise elif type == tarfile.REGTYPE or type is None: self.assertTrue(path.is_file()) @@ -3330,9 +3333,9 @@ class TestExtractionFilters(unittest.TestCase): # we can also create new files as well! arc.add("escape/newfile", content='new') - with (self.subTest('fully_trusted'), + with self.subTest('fully_trusted'), \ self.check_context(arc.open(), filter='fully_trusted', - check_flag=False, ignored_trees={component})): + check_flag=False, ignored_trees={component}): if sys.platform == 'win32': self.expect_exception((FileNotFoundError, FileExistsError)) elif self.raised_exception: -- 2.49.0