#!/usr/bin/python3 import argparse import pam import pwd import os import signal import sys import gi gi.require_version('AccountsService', '1.0') from gi.repository import AccountsService, GLib def run_desktop_in_new_session(pam_environment, user, session_desktop, tty_input, tty_output): keyfile = GLib.KeyFile() keyfile.load_from_data_dirs(f'wayland-sessions/{session_desktop}.desktop', GLib.KeyFileFlags.NONE) try: can_run_headless = keyfile.get_boolean(GLib.KEY_FILE_DESKTOP_GROUP, 'X-GDM-CanRunHeadless') except GLib.GError: raise Exception(f"Session {session_desktop} can't run headlessly") if not can_run_headless: raise Exception(f"Session {session_desktop} can't run headlessly") executable = keyfile.get_string(GLib.KEY_FILE_DESKTOP_GROUP, GLib.KEY_FILE_DESKTOP_KEY_TRY_EXEC) if GLib.find_program_in_path(executable) is None: raise Exception(f"Invalid session {session_desktop}") command = keyfile.get_string(GLib.KEY_FILE_DESKTOP_GROUP, GLib.KEY_FILE_DESKTOP_KEY_EXEC) [success, args] = GLib.shell_parse_argv(command) pam_handle = pam.pam() for key, value in pam_environment.items(): pam_handle.putenv(f'{key}={value}') if not pam_handle.authenticate(user, '', service='gdm-autologin', call_end=False): raise Exception("Authentication failed") for key, value in pam_environment.items(): pam_handle.putenv(f'{key}={value}') if pam_handle.open_session() != pam.PAM_SUCCESS: raise Exception("Failed to open PAM session") session_environment = os.environ.copy() session_environment.update(pam_handle.getenvlist()) user_info = pwd.getpwnam(user) uid = user_info.pw_uid gid = user_info.pw_gid old_tty_output = os.fdopen(os.dup(2), 'w') pid = os.fork() if pid == 0: try: os.setsid() except OSError as e: print(f"Could not create new pid session: {e}", file=old_tty_output) try: os.dup2(tty_input.fileno(), 0) os.dup2(tty_output.fileno(), 1) os.dup2(tty_output.fileno(), 2) except OSError as e: print(f"Could not set up standard i/o: {e}", file=old_tty_output) try: os.initgroups(user, gid) os.setgid(gid) os.setuid(uid); except OSError as e: print(f"Could not become user {user} (uid={uid}): {e}", file=old_tty_output) try: os.execvpe(args[0], args, session_environment) except OSError as e: print(f"Could not run program \"{' '.join(arguments)}\": {e}", file=old_tty_output) os._exit(1) def signal_handler(sig, frame): os.kill(pid, sig) signal.signal(signal.SIGTERM, signal_handler) try: (_, exit_code) = os.waitpid(pid, 0); except KeyboardInterrupt: os.kill(pid, signal.SIGTERM) except OSError as e: print(f"Could not wait for program to finish: {e}", file=old_tty_output) if os.WIFEXITED(exit_code): exit_code = os.WEXITSTATUS(exit_code) else: os.kill(os.getpid(), os.WTERMSIG(exit_code)) old_tty_output.close() if pam_handle.close_session() != pam.PAM_SUCCESS: raise Exception("Failed to close PAM session") pam_handle.end() return exit_code def wait_for_user_data(user): main_context = GLib.MainContext.default() while not user.is_loaded(): main_context.iteration(True) def main(): parser = argparse.ArgumentParser(description='Run a desktop session in a PAM session as a specified user.') parser.add_argument('--user', help='Username for which to run the session') args = parser.parse_args() if args.user is None: parser.print_usage() sys.exit(1) try: tty_path = '/dev/null' tty_input = open(tty_path, 'r') tty_output = open(tty_path, 'w') except OSError as e: raise Exception(f"Error opening /dev/null as tty associated with VT {vt}: {e}") user_manager = AccountsService.UserManager().get_default() user = user_manager.get_user(args.user) wait_for_user_data(user) session_desktop = user.get_session() if not session_desktop: session_desktop = 'gnome' pam_environment = {} pam_environment['XDG_SESSION_TYPE'] = 'wayland' pam_environment['XDG_SESSION_CLASS'] = 'user' pam_environment['XDG_SESSION_DESKTOP'] = session_desktop try: result = run_desktop_in_new_session(pam_environment, args.user, session_desktop, tty_input, tty_output) except Exception as e: raise Exception(f"Error running desktop session \"{session_desktop}\": {e}") tty_input.close() tty_output.close() sys.exit(result) if __name__ == '__main__': main()