obs-websocket icon indicating copy to clipboard operation
obs-websocket copied to clipboard

Feature/scene item transitions

Open LinuxMainframe opened this issue 1 month ago • 2 comments

Adds support for per-scene-item show and hide transitions via new WebSocket RPC requests

Key Changes:

  • Introduced four new non-breaking RPC requests:
    • GetSceneItemShowTransition: Retrieves the current show transition configuration (name, UUID, kind, duration) for a specified scene item.
    • SetSceneItemShowTransition: Sets or updates the show transition for a scene item, supporting parameters like transitionName (null to disable) and transitionDuration (in milliseconds).
    • GetSceneItemHideTransition: Retrieves the hide transition details, similar to the show variant.
    • SetSceneItemHideTransition: Configures the hide transition, with analogous parameters.
  • These requests enable granular control over individual scene item visibility transitions, compatible with scenes and scene groups.
  • Added #include <optional> for full compatibility with OBS Studio v32, ensuring safe handling of nullable parameters.

This implementation extends the API to allow remote management of source appearances, such as fading in/out specific items without affecting the entire scene.

Motivation: The change addresses a long-standing feature gap where per-source show/hide transitions, available in OBS Studio's UI, were not exposed via WebSocket. This limits automation and dynamic streaming setups, as clients couldn't gracefully introduce or modify sources remotely.

Closes #906 by providing dedicated RPC endpoints for these settings, aligning with the request for integration into source management workflows.

Tested with: A comprehensive Python test suite using asyncio and websockets library. The script:

  • Connects to OBS WebSocket (v5.6.3) at ws://localhost:4455.
  • Sets up a test scene ("Test_Transitions_Scene") with a color source.
  • Validates Get/SetSceneItemShowTransition and Get/SetSceneItemHideTransition through cycles of enabling (e.g., Fade at 500ms/750ms), updating durations/names, disabling via null, and verifying responses.
  • Tests visibility toggles with SetSceneItemEnabled to confirm transitions apply during show/hide.
  • Covers edge cases: invalid scene names, item IDs, and transition names, ensuring proper error handling (e.g., "No transition was found"). All tests passed, with outcomes confirming correct UUIDs, kinds (e.g., "fade_transition"), and durations.

Tested OS(s): Linux (Ubuntu-based development environment with OBS Studio v32+).

  • New request/event (non-breaking)

  • [x] I have read the Contributing Guidelines.

  • [x] All commit messages are properly formatted and commits squashed where appropriate.

  • [x] My code is not on master or a release/* branch.

  • [x] The code has been tested.

  • [x] I have included updates to all appropriate documentation.

Commits:

  • First commit (538dfa8616): Core feature addition, closing #906 partially.
  • Second commit (29c7a4e454): Completes closure with <optional> include.

LinuxMainframe avatar Nov 06 '25 00:11 LinuxMainframe

Test cases for my commit:

#!/usr/bin/env python3
"""
Test set for OBS WebSocket scene item transition functions.
This script tests the functionality for handling remote show/hide transitions
on scene items via the OBS WebSocket API.

AUTHOR: Aidan A. Bradley
DATE: November 4th, 2025
"""

import asyncio
import json
import time
from typing import Optional, Dict, Any
import websockets
import argparse
import base64
import hashlib


class OBSWebSocketTest:
    def __init__(self, host: str = "localhost", port: int = 4455, password: Optional[str] = None):
        """
        Initialize the OBS WebSocket test client.

        :param host: Hostname or IP of the OBS WebSocket server (default: 'localhost')
        :param port: Port of the OBS WebSocket server (default: 4455)
        :param password: Password for authentication if required (default: None)
        """
        self.host = host
        self.port = port
        self.password = password
        self.ws: Optional[websockets.WebSocketClientProtocol] = None
        self.request_id: int = 0

    async def connect(self) -> None:
        """
        Establish a connection to the OBS WebSocket server, handle authentication if required,
        and complete the identification process.
        """
        uri = f"ws://{self.host}:{self.port}"
        print(f"Connecting to {uri}...")
        self.ws = await websockets.connect(uri)

        # Receive the initial Hello message from the server
        hello = json.loads(await self.ws.recv())
        print(f"Received Hello: {hello['op']} - OBS WebSocket version {hello['d']['obsWebSocketVersion']}")

        identify: Dict[str, Any] = {
            "op": 1,
            "d": {
                "rpcVersion": 1,
                "eventSubscriptions": 0
            }
        }

        # Handle authentication if required
        if 'authentication' in hello['d'] and hello['d']['authentication'] is not None:
            if not self.password:
                raise ValueError("Password required but not provided")
            print("Authentication required")
            salt = hello['d']['authentication']['salt']
            challenge = hello['d']['authentication']['challenge']
            # Compute the authentication response using SHA256 and base64
            passhash = hashlib.sha256((self.password + salt).encode('utf-8')).digest()
            secret = base64.b64encode(passhash)
            response_hash = hashlib.sha256(secret + challenge.encode('utf-8')).digest()
            auth_response = base64.b64encode(response_hash).decode('utf-8')
            identify["d"]["authentication"] = auth_response
        else:
            print("No authentication required")

        # Send the Identify message
        await self.ws.send(json.dumps(identify))

        # Receive the Identified message confirming connection
        identified = json.loads(await self.ws.recv())
        print(f"Connected! Negotiated RPC version: {identified['d']['negotiatedRpcVersion']}")

    async def request(self, request_type: str, request_data: Dict[str, Any] = None) -> Dict[str, Any]:
        """
        Send a request to the OBS WebSocket server and await the response.

        :param request_type: The type of request (e.g., 'GetSceneItemList')
        :param request_data: Optional data payload for the request
        :return: The response data from the server
        """
        self.request_id += 1
        request: Dict[str, Any] = {
            "op": 6,
            "d": {
                "requestType": request_type,
                "requestId": str(self.request_id),
                "requestData": request_data or {}
            }
        }

        await self.ws.send(json.dumps(request))

        # Loop until the matching response is received
        while True:
            response = json.loads(await self.ws.recv())
            if response['op'] == 7 and response['d']['requestId'] == str(self.request_id):
                return response['d']

    async def close(self) -> None:
        """
        Close the WebSocket connection if it is open.
        """
        if self.ws:
            await self.ws.close()
            print("Connection closed")

    async def setup_test_scene(self) -> tuple[str, int]:
        """
        Set up a test scene and add a color source for testing transitions.

        :return: Tuple containing the scene name and scene item ID
        :raises Exception: If unable to create or retrieve the test scene item
        """
        print("\n=== Setting up test environment ===")

        # Define test scene and source names
        scene_name = "Test_Transitions_Scene"
        print(f"Creating scene: {scene_name}")
        try:
            await self.request("CreateScene", {"sceneName": scene_name})
        except Exception:
            print(f"Scene {scene_name} might already exist, continuing...")

        source_name = "Test_Color_Source"
        print(f"Creating color source: {source_name}")
        try:
            await self.request("CreateInput", {
                "sceneName": scene_name,
                "inputName": source_name,
                "inputKind": "color_source_v3",
                "inputSettings": {
                    "color": 4278190335  # ARGB value for blue
                }
            })
        except Exception as e:
            print(f"Source might already exist: {str(e)}")

        # Retrieve the list of scene items to get the item ID
        response = await self.request("GetSceneItemList", {"sceneName": scene_name})
        if response['requestStatus']['result']:
            scene_items = response['responseData']['sceneItems']
            if scene_items:
                scene_item_id = scene_items[0]['sceneItemId']
                print(f"Found scene item ID: {scene_item_id}")
                return scene_name, scene_item_id

        raise Exception("Failed to create test scene item")

    async def test_get_scene_item_show_transition(self, scene_name: str, scene_item_id: int) -> Dict[str, Any]:
        """
        Test the GetSceneItemShowTransition request to retrieve show transition details.

        :param scene_name: Name of the scene
        :param scene_item_id: ID of the scene item
        :return: The response from the request
        """
        print("\n=== Testing GetSceneItemShowTransition ===")

        response = await self.request("GetSceneItemShowTransition", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id
        })

        if response['requestStatus']['result']:
            data = response.get('responseData', {})
            is_set = data.get('transitionName') is not None
            print(f"✓ Show transition set: {is_set}")
            print(f"  Transition name: {data.get('transitionName')}")
            print(f"  Transition UUID: {data.get('transitionUuid')}")
            print(f"  Transition kind: {data.get('transitionKind')}")
            print(f"  Transition duration: {data.get('transitionDuration')} ms")
        else:
            print(f"✗ Failed: {response['requestStatus'].get('comment', 'No error message')}")

        return response

    async def get_available_transitions(self) -> Optional[str]:
        """
        Retrieve the list of available transitions in OBS and return the first one.

        :return: Name of the first available transition or None if none found
        """
        print("\n=== Getting available transitions ===")
        response = await self.request("GetSceneTransitionList", {})

        if response['requestStatus']['result']:
            transitions = response.get('responseData', {}).get('transitions', [])
            if transitions:
                transition_names = [t.get('transitionName') for t in transitions]
                print(f"Available transitions: {transition_names}")
                return transition_names[0]  # Return the first available transition
            else:
                print("✗ No transitions available")
                return None
        else:
            print(f"✗ Failed to get transitions: {response['requestStatus'].get('comment', 'No error')}")

        return None

    async def test_set_scene_item_show_transition(self, scene_name: str, scene_item_id: int) -> None:
        """
        Test the SetSceneItemShowTransition request, including enabling, updating, and disabling.

        :param scene_name: Name of the scene
        :param scene_item_id: ID of the scene item
        """
        print("\n=== Testing SetSceneItemShowTransition ===")

        # Retrieve a valid transition name
        transition_name = await self.get_available_transitions()
        if not transition_name:
            print("✗ Cannot test without an available transition")
            return

        # Test 1: Enable the show transition with a duration
        print(f"\nTest 1: Enable with {transition_name} transition, 500ms duration")
        response = await self.request("SetSceneItemShowTransition", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id,
            "transitionName": transition_name,
            "transitionDuration": 500
        })

        if response['requestStatus']['result']:
            print("✓ Successfully set show transition")
        else:
            print(f"✗ Failed: {response['requestStatus']['comment']}")

        # Verify the change by getting the transition details
        await asyncio.sleep(0.1)
        await self.test_get_scene_item_show_transition(scene_name, scene_item_id)

        # Test 2: Update only the duration
        print("\nTest 2: Change duration to 1000ms")
        response = await self.request("SetSceneItemShowTransition", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id,
            "transitionDuration": 1000
        })

        if response['requestStatus']['result']:
            print("✓ Successfully updated duration")
        else:
            print(f"✗ Failed: {response['requestStatus']['comment']}")

        # Test 3: Disable the show transition by setting name to None
        print("\nTest 3: Disable show transition")
        response = await self.request("SetSceneItemShowTransition", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id,
            "transitionName": None
        })

        if response['requestStatus']['result']:
            print("✓ Successfully disabled show transition")
        else:
            print(f"✗ Failed: {response['requestStatus']['comment']}")

    async def test_get_scene_item_hide_transition(self, scene_name: str, scene_item_id: int) -> Dict[str, Any]:
        """
        Test the GetSceneItemHideTransition request to retrieve hide transition details.

        :param scene_name: Name of the scene
        :param scene_item_id: ID of the scene item
        :return: The response from the request
        """
        print("\n=== Testing GetSceneItemHideTransition ===")

        response = await self.request("GetSceneItemHideTransition", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id
        })

        if response['requestStatus']['result']:
            data = response.get('responseData', {})
            print(f"✓ Response data: {data}")
            print(f"  Transition name: {data.get('transitionName')}")
            print(f"  Transition duration: {data.get('transitionDuration')} ms")
        else:
            print(f"✗ Failed: {response['requestStatus'].get('comment', 'No error message')}")

        return response

    async def test_set_scene_item_hide_transition(self, scene_name: str, scene_item_id: int) -> None:
        """
        Test the SetSceneItemHideTransition request, including enabling, updating, and disabling.

        :param scene_name: Name of the scene
        :param scene_item_id: ID of the scene item
        """
        print("\n=== Testing SetSceneItemHideTransition ===")

        # Retrieve a valid transition name
        transition_name = await self.get_available_transitions()
        if not transition_name:
            print("✗ Cannot test without an available transition")
            return

        # Test 1: Enable the hide transition with a duration
        print(f"\nTest 1: Enable with {transition_name} transition, 750ms duration")
        response = await self.request("SetSceneItemHideTransition", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id,
            "transitionName": transition_name,
            "transitionDuration": 750
        })

        if response['requestStatus']['result']:
            print("✓ Successfully set hide transition")
        else:
            print(f"✗ Failed: {response['requestStatus']['comment']}")

        # Verify the change by getting the transition details
        await asyncio.sleep(0.1)
        await self.test_get_scene_item_hide_transition(scene_name, scene_item_id)

        # Test 2: Update only the transition name (using the same for simplicity)
        print(f"\nTest 2: Change transition name to {transition_name}")
        response = await self.request("SetSceneItemHideTransition", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id,
            "transitionName": transition_name
        })

        if response['requestStatus']['result']:
            print("✓ Successfully updated transition name")
        else:
            print(f"✗ Failed: {response['requestStatus']['comment']}")

        # Test 3: Disable the hide transition by setting name to None
        print("\nTest 3: Disable hide transition")
        response = await self.request("SetSceneItemHideTransition", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id,
            "transitionName": None
        })

        if response['requestStatus']['result']:
            print("✓ Successfully disabled hide transition")
        else:
            print(f"✗ Failed: {response['requestStatus']['comment']}")

    async def test_visibility_transitions(self, scene_name: str, scene_item_id: int) -> None:
        """
        Test visibility changes (show/hide) with transitions enabled to ensure they apply correctly.

        :param scene_name: Name of the scene
        :param scene_item_id: ID of the scene item
        """
        print("\n=== Testing Visibility Changes with Transitions ===")

        # Retrieve a valid transition name
        transition_name = await self.get_available_transitions()
        if not transition_name:
            print("✗ Cannot test without an available transition")
            return

        # Set up transitions for both show and hide
        print(f"Setting up {transition_name} transitions for visibility test...")
        await self.request("SetSceneItemShowTransition", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id,
            "transitionName": transition_name,
            "transitionDuration": 300
        })

        await self.request("SetSceneItemHideTransition", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id,
            "transitionName": transition_name,
            "transitionDuration": 300
        })

        # Test hiding the item
        print("\nHiding scene item (should use hide transition)...")
        await self.request("SetSceneItemEnabled", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id,
            "sceneItemEnabled": False
        })
        await asyncio.sleep(0.5)  # Wait for the transition to complete
        print("✓ Hide complete")

        # Test showing the item
        print("\nShowing scene item (should use show transition)...")
        await self.request("SetSceneItemEnabled", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id,
            "sceneItemEnabled": True
        })
        await asyncio.sleep(0.5)  # Wait for the transition to complete
        print("✓ Show complete")

    async def test_edge_cases(self, scene_name: str, scene_item_id: int) -> None:
        """
        Test edge cases for error handling, such as invalid scene names, item IDs, and transition names.

        :param scene_name: Name of the scene
        :param scene_item_id: ID of the scene item
        """
        print("\n=== Testing Edge Cases ===")

        # Test 1: Invalid scene name
        print("\nTest 1: Invalid scene name")
        response = await self.request("GetSceneItemShowTransition", {
            "sceneName": "NonExistentScene",
            "sceneItemId": scene_item_id
        })
        if not response['requestStatus']['result']:
            print(f"✓ Correctly failed: {response['requestStatus']['comment']}")
        else:
            print("✗ Should have failed with invalid scene")

        # Test 2: Invalid scene item ID
        print("\nTest 2: Invalid scene item ID")
        response = await self.request("GetSceneItemShowTransition", {
            "sceneName": scene_name,
            "sceneItemId": 99999
        })
        if not response['requestStatus']['result']:
            print(f"✓ Correctly failed: {response['requestStatus']['comment']}")
        else:
            print("✗ Should have failed with invalid item ID")

        # Test 3: Invalid transition name
        print("\nTest 3: Invalid transition name")
        response = await self.request("SetSceneItemShowTransition", {
            "sceneName": scene_name,
            "sceneItemId": scene_item_id,
            "transitionName": "nonexistent_transition_12345"
        })
        print(f"  Result: {response['requestStatus']['result']}")
        print(f"  Comment: {response['requestStatus'].get('comment', 'No comment')}")


async def main() -> None:
    """
    Main entry point for the script. Parses arguments, connects to OBS, runs tests, and cleans up.
    """
    parser = argparse.ArgumentParser(description='Test OBS WebSocket Scene Item Transitions')
    parser.add_argument('--host', default='localhost', help='OBS WebSocket host')
    parser.add_argument('--port', type=int, default=4455, help='OBS WebSocket port')
    parser.add_argument('--password', help='OBS WebSocket password')
    args = parser.parse_args()

    test = OBSWebSocketTest(host=args.host, port=args.port, password=args.password)

    try:
        await test.connect()
        scene_name, scene_item_id = await test.setup_test_scene()

        # Execute the test suite
        await test.test_get_scene_item_show_transition(scene_name, scene_item_id)
        await test.test_set_scene_item_show_transition(scene_name, scene_item_id)

        await test.test_get_scene_item_hide_transition(scene_name, scene_item_id)
        await test.test_set_scene_item_hide_transition(scene_name, scene_item_id)

        await test.test_visibility_transitions(scene_name, scene_item_id)

        await test.test_edge_cases(scene_name, scene_item_id)

        print("\n" + "=" * 50)
        print("All tests completed!")
        print("=" * 50)

    except Exception as e:
        print(f"Error: {e}")
        import traceback
        traceback.print_exc()
    finally:
        await test.close()


if __name__ == '__main__':
    asyncio.run(main())

LinuxMainframe avatar Nov 06 '25 00:11 LinuxMainframe

I added the test case because I want to show what I tried. I am not sure if this covers everything, as I have kind of learned the protocol over time and only recently got into the source code. So if I am missing anything, documentation or otherwise, please say so. I have been programming a while but not with any group nor to open-source, so I am very new.

LinuxMainframe avatar Nov 06 '25 02:11 LinuxMainframe