Feature/scene item transitions
Adds support for per-scene-item show and hide transitions via new WebSocket RPC requests
Key Changes:
- Introduced four new non-breaking RPC requests:
GetSceneItemShowTransition: Retrieves the current show transition configuration (name, UUID, kind, duration) for a specified scene item.SetSceneItemShowTransition: Sets or updates the show transition for a scene item, supporting parameters liketransitionName(null to disable) andtransitionDuration(in milliseconds).GetSceneItemHideTransition: Retrieves the hide transition details, similar to the show variant.SetSceneItemHideTransition: Configures the hide transition, with analogous parameters.
- These requests enable granular control over individual scene item visibility transitions, compatible with scenes and scene groups.
- Added
#include <optional>for full compatibility with OBS Studio v32, ensuring safe handling of nullable parameters.
This implementation extends the API to allow remote management of source appearances, such as fading in/out specific items without affecting the entire scene.
Motivation: The change addresses a long-standing feature gap where per-source show/hide transitions, available in OBS Studio's UI, were not exposed via WebSocket. This limits automation and dynamic streaming setups, as clients couldn't gracefully introduce or modify sources remotely.
Closes #906 by providing dedicated RPC endpoints for these settings, aligning with the request for integration into source management workflows.
Tested with: A comprehensive Python test suite using asyncio and websockets library. The script:
- Connects to OBS WebSocket (v5.6.3) at ws://localhost:4455.
- Sets up a test scene ("Test_Transitions_Scene") with a color source.
- Validates
Get/SetSceneItemShowTransitionandGet/SetSceneItemHideTransitionthrough cycles of enabling (e.g., Fade at 500ms/750ms), updating durations/names, disabling via null, and verifying responses. - Tests visibility toggles with
SetSceneItemEnabledto confirm transitions apply during show/hide. - Covers edge cases: invalid scene names, item IDs, and transition names, ensuring proper error handling (e.g., "No transition was found"). All tests passed, with outcomes confirming correct UUIDs, kinds (e.g., "fade_transition"), and durations.
Tested OS(s): Linux (Ubuntu-based development environment with OBS Studio v32+).
-
New request/event (non-breaking)
-
[x] I have read the Contributing Guidelines.
-
[x] All commit messages are properly formatted and commits squashed where appropriate.
-
[x] My code is not on
masteror arelease/*branch. -
[x] The code has been tested.
-
[x] I have included updates to all appropriate documentation.
Commits:
- First commit (538dfa8616): Core feature addition, closing #906 partially.
- Second commit (29c7a4e454): Completes closure with
<optional>include.
Test cases for my commit:
#!/usr/bin/env python3
"""
Test set for OBS WebSocket scene item transition functions.
This script tests the functionality for handling remote show/hide transitions
on scene items via the OBS WebSocket API.
AUTHOR: Aidan A. Bradley
DATE: November 4th, 2025
"""
import asyncio
import json
import time
from typing import Optional, Dict, Any
import websockets
import argparse
import base64
import hashlib
class OBSWebSocketTest:
def __init__(self, host: str = "localhost", port: int = 4455, password: Optional[str] = None):
"""
Initialize the OBS WebSocket test client.
:param host: Hostname or IP of the OBS WebSocket server (default: 'localhost')
:param port: Port of the OBS WebSocket server (default: 4455)
:param password: Password for authentication if required (default: None)
"""
self.host = host
self.port = port
self.password = password
self.ws: Optional[websockets.WebSocketClientProtocol] = None
self.request_id: int = 0
async def connect(self) -> None:
"""
Establish a connection to the OBS WebSocket server, handle authentication if required,
and complete the identification process.
"""
uri = f"ws://{self.host}:{self.port}"
print(f"Connecting to {uri}...")
self.ws = await websockets.connect(uri)
# Receive the initial Hello message from the server
hello = json.loads(await self.ws.recv())
print(f"Received Hello: {hello['op']} - OBS WebSocket version {hello['d']['obsWebSocketVersion']}")
identify: Dict[str, Any] = {
"op": 1,
"d": {
"rpcVersion": 1,
"eventSubscriptions": 0
}
}
# Handle authentication if required
if 'authentication' in hello['d'] and hello['d']['authentication'] is not None:
if not self.password:
raise ValueError("Password required but not provided")
print("Authentication required")
salt = hello['d']['authentication']['salt']
challenge = hello['d']['authentication']['challenge']
# Compute the authentication response using SHA256 and base64
passhash = hashlib.sha256((self.password + salt).encode('utf-8')).digest()
secret = base64.b64encode(passhash)
response_hash = hashlib.sha256(secret + challenge.encode('utf-8')).digest()
auth_response = base64.b64encode(response_hash).decode('utf-8')
identify["d"]["authentication"] = auth_response
else:
print("No authentication required")
# Send the Identify message
await self.ws.send(json.dumps(identify))
# Receive the Identified message confirming connection
identified = json.loads(await self.ws.recv())
print(f"Connected! Negotiated RPC version: {identified['d']['negotiatedRpcVersion']}")
async def request(self, request_type: str, request_data: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Send a request to the OBS WebSocket server and await the response.
:param request_type: The type of request (e.g., 'GetSceneItemList')
:param request_data: Optional data payload for the request
:return: The response data from the server
"""
self.request_id += 1
request: Dict[str, Any] = {
"op": 6,
"d": {
"requestType": request_type,
"requestId": str(self.request_id),
"requestData": request_data or {}
}
}
await self.ws.send(json.dumps(request))
# Loop until the matching response is received
while True:
response = json.loads(await self.ws.recv())
if response['op'] == 7 and response['d']['requestId'] == str(self.request_id):
return response['d']
async def close(self) -> None:
"""
Close the WebSocket connection if it is open.
"""
if self.ws:
await self.ws.close()
print("Connection closed")
async def setup_test_scene(self) -> tuple[str, int]:
"""
Set up a test scene and add a color source for testing transitions.
:return: Tuple containing the scene name and scene item ID
:raises Exception: If unable to create or retrieve the test scene item
"""
print("\n=== Setting up test environment ===")
# Define test scene and source names
scene_name = "Test_Transitions_Scene"
print(f"Creating scene: {scene_name}")
try:
await self.request("CreateScene", {"sceneName": scene_name})
except Exception:
print(f"Scene {scene_name} might already exist, continuing...")
source_name = "Test_Color_Source"
print(f"Creating color source: {source_name}")
try:
await self.request("CreateInput", {
"sceneName": scene_name,
"inputName": source_name,
"inputKind": "color_source_v3",
"inputSettings": {
"color": 4278190335 # ARGB value for blue
}
})
except Exception as e:
print(f"Source might already exist: {str(e)}")
# Retrieve the list of scene items to get the item ID
response = await self.request("GetSceneItemList", {"sceneName": scene_name})
if response['requestStatus']['result']:
scene_items = response['responseData']['sceneItems']
if scene_items:
scene_item_id = scene_items[0]['sceneItemId']
print(f"Found scene item ID: {scene_item_id}")
return scene_name, scene_item_id
raise Exception("Failed to create test scene item")
async def test_get_scene_item_show_transition(self, scene_name: str, scene_item_id: int) -> Dict[str, Any]:
"""
Test the GetSceneItemShowTransition request to retrieve show transition details.
:param scene_name: Name of the scene
:param scene_item_id: ID of the scene item
:return: The response from the request
"""
print("\n=== Testing GetSceneItemShowTransition ===")
response = await self.request("GetSceneItemShowTransition", {
"sceneName": scene_name,
"sceneItemId": scene_item_id
})
if response['requestStatus']['result']:
data = response.get('responseData', {})
is_set = data.get('transitionName') is not None
print(f"✓ Show transition set: {is_set}")
print(f" Transition name: {data.get('transitionName')}")
print(f" Transition UUID: {data.get('transitionUuid')}")
print(f" Transition kind: {data.get('transitionKind')}")
print(f" Transition duration: {data.get('transitionDuration')} ms")
else:
print(f"✗ Failed: {response['requestStatus'].get('comment', 'No error message')}")
return response
async def get_available_transitions(self) -> Optional[str]:
"""
Retrieve the list of available transitions in OBS and return the first one.
:return: Name of the first available transition or None if none found
"""
print("\n=== Getting available transitions ===")
response = await self.request("GetSceneTransitionList", {})
if response['requestStatus']['result']:
transitions = response.get('responseData', {}).get('transitions', [])
if transitions:
transition_names = [t.get('transitionName') for t in transitions]
print(f"Available transitions: {transition_names}")
return transition_names[0] # Return the first available transition
else:
print("✗ No transitions available")
return None
else:
print(f"✗ Failed to get transitions: {response['requestStatus'].get('comment', 'No error')}")
return None
async def test_set_scene_item_show_transition(self, scene_name: str, scene_item_id: int) -> None:
"""
Test the SetSceneItemShowTransition request, including enabling, updating, and disabling.
:param scene_name: Name of the scene
:param scene_item_id: ID of the scene item
"""
print("\n=== Testing SetSceneItemShowTransition ===")
# Retrieve a valid transition name
transition_name = await self.get_available_transitions()
if not transition_name:
print("✗ Cannot test without an available transition")
return
# Test 1: Enable the show transition with a duration
print(f"\nTest 1: Enable with {transition_name} transition, 500ms duration")
response = await self.request("SetSceneItemShowTransition", {
"sceneName": scene_name,
"sceneItemId": scene_item_id,
"transitionName": transition_name,
"transitionDuration": 500
})
if response['requestStatus']['result']:
print("✓ Successfully set show transition")
else:
print(f"✗ Failed: {response['requestStatus']['comment']}")
# Verify the change by getting the transition details
await asyncio.sleep(0.1)
await self.test_get_scene_item_show_transition(scene_name, scene_item_id)
# Test 2: Update only the duration
print("\nTest 2: Change duration to 1000ms")
response = await self.request("SetSceneItemShowTransition", {
"sceneName": scene_name,
"sceneItemId": scene_item_id,
"transitionDuration": 1000
})
if response['requestStatus']['result']:
print("✓ Successfully updated duration")
else:
print(f"✗ Failed: {response['requestStatus']['comment']}")
# Test 3: Disable the show transition by setting name to None
print("\nTest 3: Disable show transition")
response = await self.request("SetSceneItemShowTransition", {
"sceneName": scene_name,
"sceneItemId": scene_item_id,
"transitionName": None
})
if response['requestStatus']['result']:
print("✓ Successfully disabled show transition")
else:
print(f"✗ Failed: {response['requestStatus']['comment']}")
async def test_get_scene_item_hide_transition(self, scene_name: str, scene_item_id: int) -> Dict[str, Any]:
"""
Test the GetSceneItemHideTransition request to retrieve hide transition details.
:param scene_name: Name of the scene
:param scene_item_id: ID of the scene item
:return: The response from the request
"""
print("\n=== Testing GetSceneItemHideTransition ===")
response = await self.request("GetSceneItemHideTransition", {
"sceneName": scene_name,
"sceneItemId": scene_item_id
})
if response['requestStatus']['result']:
data = response.get('responseData', {})
print(f"✓ Response data: {data}")
print(f" Transition name: {data.get('transitionName')}")
print(f" Transition duration: {data.get('transitionDuration')} ms")
else:
print(f"✗ Failed: {response['requestStatus'].get('comment', 'No error message')}")
return response
async def test_set_scene_item_hide_transition(self, scene_name: str, scene_item_id: int) -> None:
"""
Test the SetSceneItemHideTransition request, including enabling, updating, and disabling.
:param scene_name: Name of the scene
:param scene_item_id: ID of the scene item
"""
print("\n=== Testing SetSceneItemHideTransition ===")
# Retrieve a valid transition name
transition_name = await self.get_available_transitions()
if not transition_name:
print("✗ Cannot test without an available transition")
return
# Test 1: Enable the hide transition with a duration
print(f"\nTest 1: Enable with {transition_name} transition, 750ms duration")
response = await self.request("SetSceneItemHideTransition", {
"sceneName": scene_name,
"sceneItemId": scene_item_id,
"transitionName": transition_name,
"transitionDuration": 750
})
if response['requestStatus']['result']:
print("✓ Successfully set hide transition")
else:
print(f"✗ Failed: {response['requestStatus']['comment']}")
# Verify the change by getting the transition details
await asyncio.sleep(0.1)
await self.test_get_scene_item_hide_transition(scene_name, scene_item_id)
# Test 2: Update only the transition name (using the same for simplicity)
print(f"\nTest 2: Change transition name to {transition_name}")
response = await self.request("SetSceneItemHideTransition", {
"sceneName": scene_name,
"sceneItemId": scene_item_id,
"transitionName": transition_name
})
if response['requestStatus']['result']:
print("✓ Successfully updated transition name")
else:
print(f"✗ Failed: {response['requestStatus']['comment']}")
# Test 3: Disable the hide transition by setting name to None
print("\nTest 3: Disable hide transition")
response = await self.request("SetSceneItemHideTransition", {
"sceneName": scene_name,
"sceneItemId": scene_item_id,
"transitionName": None
})
if response['requestStatus']['result']:
print("✓ Successfully disabled hide transition")
else:
print(f"✗ Failed: {response['requestStatus']['comment']}")
async def test_visibility_transitions(self, scene_name: str, scene_item_id: int) -> None:
"""
Test visibility changes (show/hide) with transitions enabled to ensure they apply correctly.
:param scene_name: Name of the scene
:param scene_item_id: ID of the scene item
"""
print("\n=== Testing Visibility Changes with Transitions ===")
# Retrieve a valid transition name
transition_name = await self.get_available_transitions()
if not transition_name:
print("✗ Cannot test without an available transition")
return
# Set up transitions for both show and hide
print(f"Setting up {transition_name} transitions for visibility test...")
await self.request("SetSceneItemShowTransition", {
"sceneName": scene_name,
"sceneItemId": scene_item_id,
"transitionName": transition_name,
"transitionDuration": 300
})
await self.request("SetSceneItemHideTransition", {
"sceneName": scene_name,
"sceneItemId": scene_item_id,
"transitionName": transition_name,
"transitionDuration": 300
})
# Test hiding the item
print("\nHiding scene item (should use hide transition)...")
await self.request("SetSceneItemEnabled", {
"sceneName": scene_name,
"sceneItemId": scene_item_id,
"sceneItemEnabled": False
})
await asyncio.sleep(0.5) # Wait for the transition to complete
print("✓ Hide complete")
# Test showing the item
print("\nShowing scene item (should use show transition)...")
await self.request("SetSceneItemEnabled", {
"sceneName": scene_name,
"sceneItemId": scene_item_id,
"sceneItemEnabled": True
})
await asyncio.sleep(0.5) # Wait for the transition to complete
print("✓ Show complete")
async def test_edge_cases(self, scene_name: str, scene_item_id: int) -> None:
"""
Test edge cases for error handling, such as invalid scene names, item IDs, and transition names.
:param scene_name: Name of the scene
:param scene_item_id: ID of the scene item
"""
print("\n=== Testing Edge Cases ===")
# Test 1: Invalid scene name
print("\nTest 1: Invalid scene name")
response = await self.request("GetSceneItemShowTransition", {
"sceneName": "NonExistentScene",
"sceneItemId": scene_item_id
})
if not response['requestStatus']['result']:
print(f"✓ Correctly failed: {response['requestStatus']['comment']}")
else:
print("✗ Should have failed with invalid scene")
# Test 2: Invalid scene item ID
print("\nTest 2: Invalid scene item ID")
response = await self.request("GetSceneItemShowTransition", {
"sceneName": scene_name,
"sceneItemId": 99999
})
if not response['requestStatus']['result']:
print(f"✓ Correctly failed: {response['requestStatus']['comment']}")
else:
print("✗ Should have failed with invalid item ID")
# Test 3: Invalid transition name
print("\nTest 3: Invalid transition name")
response = await self.request("SetSceneItemShowTransition", {
"sceneName": scene_name,
"sceneItemId": scene_item_id,
"transitionName": "nonexistent_transition_12345"
})
print(f" Result: {response['requestStatus']['result']}")
print(f" Comment: {response['requestStatus'].get('comment', 'No comment')}")
async def main() -> None:
"""
Main entry point for the script. Parses arguments, connects to OBS, runs tests, and cleans up.
"""
parser = argparse.ArgumentParser(description='Test OBS WebSocket Scene Item Transitions')
parser.add_argument('--host', default='localhost', help='OBS WebSocket host')
parser.add_argument('--port', type=int, default=4455, help='OBS WebSocket port')
parser.add_argument('--password', help='OBS WebSocket password')
args = parser.parse_args()
test = OBSWebSocketTest(host=args.host, port=args.port, password=args.password)
try:
await test.connect()
scene_name, scene_item_id = await test.setup_test_scene()
# Execute the test suite
await test.test_get_scene_item_show_transition(scene_name, scene_item_id)
await test.test_set_scene_item_show_transition(scene_name, scene_item_id)
await test.test_get_scene_item_hide_transition(scene_name, scene_item_id)
await test.test_set_scene_item_hide_transition(scene_name, scene_item_id)
await test.test_visibility_transitions(scene_name, scene_item_id)
await test.test_edge_cases(scene_name, scene_item_id)
print("\n" + "=" * 50)
print("All tests completed!")
print("=" * 50)
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
finally:
await test.close()
if __name__ == '__main__':
asyncio.run(main())
I added the test case because I want to show what I tried. I am not sure if this covers everything, as I have kind of learned the protocol over time and only recently got into the source code. So if I am missing anything, documentation or otherwise, please say so. I have been programming a while but not with any group nor to open-source, so I am very new.