diff --git a/umu/umu_run.py b/umu/umu_run.py index f690c6a02..b7e7dca96 100755 --- a/umu/umu_run.py +++ b/umu/umu_run.py @@ -27,7 +27,9 @@ ): sys.path.append(os.environ["UMU_CLIENT_RTPATH"]) -from Xlib import Xatom, display +from Xlib import X, Xatom, display +from Xlib.protocol.event import AnyEvent +from Xlib.xobject.drawable import Window from umu.umu_consts import ( DEBUG_FORMAT, @@ -439,57 +441,39 @@ def build_command( return command -def get_window_client_ids() -> list[str]: - """Get the list of client windows.""" - d = display.Display(":1") +def get_window_client_ids(d: display.Display) -> list[str]: + """Get the list of new client windows under the root window.""" try: - root = d.screen().root - - max_wait_time = 30 # Maximum wait time in seconds - wait_interval = 1 # Interval between checks in seconds - elapsed_time = 0 - window_ids: list[str] = [] - - while elapsed_time < max_wait_time: - children = root.query_tree().children - if children and len(children) > 1: - for child in children: - log.debug("Window ID: %s", child.id) - log.debug("Window Name: %s", child.get_wm_name()) - log.debug("Window Class: %s", child.get_wm_class()) - log.debug("Window Geometry: %s", child.get_geometry()) - log.debug("Window Attributes: %s", child.get_attributes()) - # if "steam_app" in str(child.get_wm_class()): - window_ids.append(child.id) - return window_ids - time.sleep(wait_interval) - elapsed_time += wait_interval - return [] - finally: - d.close() + event: AnyEvent = d.next_event() + + if event.type == X.CreateNotify: + log.debug("Found new child windows") + return [ + child.id for child in d.screen().root.query_tree().children + ] + except Exception as e: + log.exception(e) + + return [] -def set_steam_game_property( # noqa: D103 - window_ids: list[str], steam_assigned_layer_id: int +def set_steam_game_property( + d: display.Display, window_ids: list[str], steam_assigned_layer_id: int ) -> None: - d = display.Display(":1") + """Set Steam's assigned layer ID on a list of windows.""" try: - root = d.screen().root - log.debug("Root: %s", root) - + log.debug("steam_layer: %s", steam_assigned_layer_id) for window_id in window_ids: log.debug("window_id: %s", window_id) - log.debug("steam_layer: %s", steam_assigned_layer_id) try: - window = d.create_resource_object("window", int(window_id)) - window.get_full_property( - d.intern_atom("STEAM_GAME"), Xatom.CARDINAL + window: Window = d.create_resource_object( + "window", int(window_id) ) window.change_property( - d.intern_atom("STEAM_GAME"), + d.get_atom("STEAM_GAME"), Xatom.CARDINAL, 32, - [int(steam_assigned_layer_id)], + [steam_assigned_layer_id], ) log.debug( "Successfully set STEAM_GAME property for window ID: %s", @@ -503,20 +487,16 @@ def set_steam_game_property( # noqa: D103 log.exception(e) except Exception as e: log.exception(e) - finally: - d.close() -def get_gamescope_baselayer_order() -> list[int] | None: # noqa: D103 - d = display.Display(":0") +def get_gamescope_baselayer_order(d: display.Display) -> list[int] | None: + """Get the gamescope base layer seq on the primary root window.""" try: - root = d.screen().root - # Intern the atom for GAMESCOPECTRL_BASELAYER_APPID - atom = d.intern_atom("GAMESCOPECTRL_BASELAYER_APPID") + atom = d.get_atom("GAMESCOPECTRL_BASELAYER_APPID") # Get the property value - prop = root.get_full_property(atom, Xatom.CARDINAL) + prop = d.screen().root.get_full_property(atom, Xatom.CARDINAL) if prop: # Extract and return the value @@ -525,15 +505,14 @@ def get_gamescope_baselayer_order() -> list[int] | None: # noqa: D103 except Exception as e: log.error("Error getting GAMESCOPECTRL_BASELAYER_APPID property") log.exception(e) - finally: - d.close() return None -def rearrange_gamescope_baselayer_order( # noqa +def rearrange_gamescope_baselayer_order( sequence: list[int], ) -> tuple[list[int], int]: + """Rearrange a gamescope base layer sequence retrieved from a window.""" # Ensure there are exactly 4 numbers if len(sequence) != 4: err = "Unexpected number of elements in sequence" @@ -541,21 +520,23 @@ def rearrange_gamescope_baselayer_order( # noqa # Rearrange the sequence rearranged = [sequence[0], sequence[3], sequence[1], sequence[2]] + log.debug("Rearranging base layer sequence") + log.debug("'%s' -> '%s'", sequence, rearranged) # Return the rearranged sequence and the second element return rearranged, rearranged[1] -def set_gamescope_baselayer_order(rearranged: list[int]) -> None: # noqa +def set_gamescope_baselayer_order( + d: display.Display, rearranged: list[int] +) -> None: + """Set a new gamescope base layer seq on the primary root window.""" try: - d = display.Display(":0") - root = d.screen().root - # Intern the atom for GAMESCOPECTRL_BASELAYER_APPID - atom = d.intern_atom("GAMESCOPECTRL_BASELAYER_APPID") + atom = d.get_atom("GAMESCOPECTRL_BASELAYER_APPID") # Set the property value - root.change_property(atom, Xatom.CARDINAL, 32, rearranged) + d.screen().root.change_property(atom, Xatom.CARDINAL, 32, rearranged) log.debug( "Successfully set GAMESCOPECTRL_BASELAYER_APPID property: %s", ", ".join(map(str, rearranged)), @@ -563,39 +544,79 @@ def set_gamescope_baselayer_order(rearranged: list[int]) -> None: # noqa except Exception as e: log.error("Error setting GAMESCOPECTRL_BASELAYER_APPID property") log.exception(e) - finally: - d.close() -def window_setup(gamescope_baselayer_sequence: list[int]) -> None: # noqa +def window_setup( # noqa + d_primary: display.Display, + d_secondary: display.Display, + gamescope_baselayer_sequence: list[int], + game_window_ids: list[str], +) -> None: if gamescope_baselayer_sequence: # Rearrange the sequence rearranged_sequence, steam_assigned_layer_id = ( rearrange_gamescope_baselayer_order(gamescope_baselayer_sequence) ) + # Assign our window a STEAM_GAME id - game_window_ids = get_window_client_ids() - if game_window_ids: - set_steam_game_property(game_window_ids, steam_assigned_layer_id) + set_steam_game_property( + d_secondary, game_window_ids, steam_assigned_layer_id + ) - set_gamescope_baselayer_order(rearranged_sequence) + set_gamescope_baselayer_order(d_primary, rearranged_sequence) -def monitor_layers( # noqa - gamescope_baselayer_sequence: list[int], window_client_list: list[str] +def monitor_baselayer( + d_primary: display.Display, + gamescope_baselayer_sequence: list[int], ) -> None: + """Monitor for broken gamescope baselayer sequences.""" + root_primary: Window = d_primary.screen().root + atom = d_primary.get_atom("GAMESCOPECTRL_BASELAYER_APPID") + root_primary.change_attributes(event_mask=X.PropertyChangeMask) + + log.debug("Monitoring base layers") + while True: - # Check if the window sequence has changed: - current_window_list = get_window_client_ids() - if current_window_list != window_client_list: - window_setup(gamescope_baselayer_sequence) + event: AnyEvent = d_primary.next_event() + + # Check if the layer sequence has changed to the broken one + if event.type == X.PropertyNotify and event.atom == atom: + prop = root_primary.get_full_property(atom, Xatom.CARDINAL) - # Check if the layer sequence has changed - current_sequence = get_gamescope_baselayer_order() - if current_sequence == gamescope_baselayer_sequence: - window_setup(gamescope_baselayer_sequence) + log.debug("Property value for atom '%s': %s", atom, prop.value) + if prop.value == gamescope_baselayer_sequence: + log.debug("Broken base layer sequence detected") + rearranged, _ = rearrange_gamescope_baselayer_order(prop.value) + log.debug("'%s' -> '%s'", prop.value, rearranged) + set_gamescope_baselayer_order(d_primary, rearranged) + continue - time.sleep(5) # Check every 5 seconds + time.sleep(0.1) + + +def monitor_windows( + d_secondary: display.Display, + gamescope_baselayer_sequence: list[int], + window_client_list: list[str], +) -> None: + """Monitor for new windows and assign them Steam's layer ID.""" + steam_assigned_layer_id: int = gamescope_baselayer_sequence[-1] + + log.debug("Monitoring windows") + + while True: + # Check if the window sequence has changed + current_window_list = get_window_client_ids(d_secondary) + + if not current_window_list: + continue + + if current_window_list != window_client_list: + log.debug("New windows detected") + set_steam_game_property( + d_secondary, current_window_list, steam_assigned_layer_id + ) def run_command(command: list[AnyPath]) -> int: @@ -605,6 +626,11 @@ def run_command(command: list[AnyPath]) -> int: proc: Popen ret: int = 0 libc: str = get_libc() + # Primary display of the focusable app under the gamescope session + d_primary: display.Display | None = None + # Display of the client application under the gamescope session + d_secondary: display.Display | None = None + # GAMESCOPECTRL_BASELAYER_APPID value on the primary's window gamescope_baselayer_sequence: list[int] | None = None if not command: @@ -641,23 +667,62 @@ def run_command(command: list[AnyPath]) -> int: ) if os.environ.get("XDG_CURRENT_DESKTOP") == "gamescope": - gamescope_baselayer_sequence = get_gamescope_baselayer_order() + # :0 is where the primary xwayland server is on the Steam Deck + d_primary = display.Display(":0") + gamescope_baselayer_sequence = get_gamescope_baselayer_order(d_primary) # Dont do window fuckery if we're not inside gamescope if gamescope_baselayer_sequence and not os.environ.get("EXE", "").endswith( "winetricks" ): - window_client_list = get_window_client_ids() - window_setup(gamescope_baselayer_sequence) - monitor_thread = threading.Thread( - target=monitor_layers, - args=(gamescope_baselayer_sequence, window_client_list), + d_secondary = display.Display(":1") + d_secondary.screen().root.change_attributes( + event_mask=X.SubstructureNotifyMask + ) + window_client_list: list[str] = [] + + # Get new windows under the client display's window + while not window_client_list: + window_client_list = get_window_client_ids(d_secondary) + + # Setup the windows + window_setup( + d_primary, + d_secondary, + gamescope_baselayer_sequence, + window_client_list, ) - monitor_thread.daemon = True - monitor_thread.start() - ret = proc.wait() - log.debug("Child %s exited with wait status: %s", proc.pid, ret) + # Monitor for new windows + window_thread = threading.Thread( + target=monitor_windows, + args=( + d_secondary, + gamescope_baselayer_sequence, + window_client_list, + ), + ) + window_thread.daemon = True + window_thread.start() + + # Monitor for broken baselayers + baselayer_thread = threading.Thread( + target=monitor_baselayer, + args=(d_primary, gamescope_baselayer_sequence), + ) + baselayer_thread.daemon = True + baselayer_thread.start() + + try: + ret = proc.wait() + log.debug("Child %s exited with wait status: %s", proc.pid, ret) + except KeyboardInterrupt: + raise + finally: + if d_primary: + d_primary.close() + if d_secondary: + d_secondary.close() return ret