LightRAG icon indicating copy to clipboard operation
LightRAG copied to clipboard

feat: Add workspace isolation support for pipeline status

Open BukeLy opened this issue 3 weeks ago • 9 comments

Summary

This PR implements workspace isolation for pipeline status to enable concurrent document processing across multiple tenants without blocking each other.

Problem

In multi-tenant scenarios, different workspaces currently share a single global pipeline_status namespace, causing pipelines from different tenants to block each other. This severely impacts concurrent processing performance.

Solution

Core Changes

  1. Extended get_namespace_data() in shared_storage.py

    • Now recognizes workspace-specific pipeline namespaces
    • Pattern: "{workspace}:pipeline" (following GraphDB pattern)
    • Maintains backward compatibility with "pipeline_status"
  2. Added workspace parameter to initialize_pipeline_status()

    • Supports per-tenant isolated pipeline namespaces
    • Empty workspace defaults to global namespace for backward compatibility
  3. Updated 5 call sites

    • 2 in lightrag.py: process_document_queue(), aremove_document()
    • 3 in document_routes.py: background_delete_documents(), clear_documents(), cancel_pipeline()
    • All now use workspace-aware locks via get_storage_keyed_lock()

Key Features

  • Concurrent Processing: Different workspaces can process documents in parallel
  • Backward Compatible: Empty workspace uses global "pipeline_status"
  • Fail-Fast: Uninitialized pipeline raises clear PipelineNotInitializedError
  • Performance: Expected N× improvement for N concurrent tenants

Bug Fixes

Fixed AttributeError caused by accessing non-existent self.global_config:

  • Used self.workspace in lightrag.py (2 locations)
  • Used rag.workspace in document_routes.py (3 locations)

Testing

All syntax checks passed and comprehensive testing completed:

  1. ✅ Global pipeline initialization (backward compatibility)
  2. ✅ Workspace-specific pipeline initialization
  3. ✅ Proper isolation between workspaces
  4. ✅ Uninitialized pipeline error detection
  5. ✅ Lock functionality with workspaces

Impact

Performance

Before: 3 tenants × 0.2s each = 0.6s (serial)
After:  3 tenants × 0.2s each = ~0.2s (parallel)
Result: 3× performance improvement

Code Changes

4 files changed, 121 insertions(+), 17 deletions(-)

BukeLy avatar Nov 13 '25 14:11 BukeLy

@codex review

danielaskdd avatar Nov 13 '25 14:11 danielaskdd

Fixed Issues

  1. Pipeline status endpoint (document_routes.py:2306)

    • ✅ Now uses workspace-specific namespace f"{workspace}:pipeline"
    • ✅ Each tenant sees their own pipeline status instead of global state
  2. Delete documents busy check (document_routes.py:2513)

    • ✅ Now checks workspace-specific busy flag
    • ✅ Uses proper keyed lock for workspace isolation
    • ✅ Prevents concurrent deletions within the same workspace

Both endpoints now follow the same pattern as the other updated handlers:

  • Get workspace from rag.workspace
  • Construct namespace: f"{workspace}:pipeline" or "pipeline_status" (backward compatible)
  • Initialize pipeline status before use
  • Use workspace-aware locks via get_storage_keyed_lock()

CI checks should pass now.

BukeLy avatar Nov 13 '25 14:11 BukeLy

@codex review

danielaskdd avatar Nov 13 '25 15:11 danielaskdd

Codex Review: Didn't find any major issues. Bravo.

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

目前的代码还需要解决对遗留代码的兼容性问题。LightRAG Core的用户的代码一直都是使用 initialize_pipeline_status() 来初始化流水线的,并不会传递workspace。因此 initialize_pipeline_status 函数应该在没有传递参数的时候自动使用默认的workspace来初始化流水线。因此 rag.initialize_storages 需要把默认 worksapce 的值保留在 shared_storage 中,以便 initialize_pipeline_status 函数能够获取和使用。

The current codebase still requires addressing compatibility with legacy code. Previously, user code utilizing LightRAG Core initialized the pipeline via initialize_pipeline_status(), which did not pass a workspace parameter. As such, the initialize_pipeline_status function should automatically use a default workspace when no parameter is provided. Consequently, rag.initialize_storages must retain the default workspace value in shared_storage to ensure it remains accessible and usable by initialize_pipeline_status.

danielaskdd avatar Nov 15 '25 00:11 danielaskdd

还有另一个兼容问题,lightrag_server.py 文件的健康检查端点 /health 实现函数 get_status 中需要向前端返回流水线的状态。前端UI需要根据这个状态来更新流水线的状态和做许多界面更新的判断。这里有几个问题需要解决:

  1. 现有前端是不知道有 workspace 这个概念的,因此调用 /health 是没有送 workspace 参数的,这个可以通过上一个评论中的方法让 get_status 函数返回默认workspace 的流水线状态。
  2. 需要让前端能够主动获取不同 workspace 的流水线状态。为了保持所有API对multi workspace 操作兼容,建议在HTTP请求头中添加一个 LIGHTRAG-WORKSPACE 请求头,/health 端点通过识别这个请求通来确定用户是希望访问哪一个workspace的状态。日后所有的API端点都将通过 LIGHTRAG-WORKSPACE 请求来是被请求需要访问的 workspace。

Another compatibility issue exists: the health check endpoint /health in lightrag_server.py, implemented by the get_status function, must return the pipeline status to the frontend. The frontend UI relies on this status to update pipeline states and perform various UI-related decisions. Several issues need to be addressed:

  1. The current frontend is unaware of the workspace concept and does not include a workspace parameter when calling /health. This can be resolved by modifying get_status to return the pipeline status of a default workspace, as suggested in the previous comment.

  2. The frontend must be enabled to proactively retrieve pipeline statuses for different workspaces. To ensure full compatibility with multi-workspace operations across all API endpoints, we recommend introducing a custom HTTP request header: LIGHTRAG-WORKSPACE. The /health endpoint will detect this header to determine which workspace’s status the client intends to access. Going forward, all API endpoints will rely on the LIGHTRAG-WORKSPACE header to identify the target workspace for each request.

danielaskdd avatar Nov 15 '25 01:11 danielaskdd

已修复兼容性问题 / Compatibility Issues Fixed

@danielaskdd 您好!我已经修复了您提出的两个兼容性问题:

Hi @danielaskdd! I have fixed both compatibility issues you mentioned:


🔧 问题1:initialize_pipeline_status() 默认workspace支持

🔧 Issue 1: Default Workspace Support for initialize_pipeline_status()

问题描述 / Problem:

  • lightrag_server.py 调用 initialize_pipeline_status() 时不传递 workspace 参数
  • 导致 pipeline 始终初始化到全局 "pipeline_status" namespace,而不是 rag 对象的 workspace
  • The lightrag_server.py calls initialize_pipeline_status() without workspace parameter
  • This causes pipeline to always initialize in global "pipeline_status" namespace instead of rag's workspace

解决方案 / Solution:

  1. shared_storage.py 中添加了默认 workspace 机制:

    • Added default workspace mechanism in shared_storage.py:
    _default_workspace: Optional[str] = None
    
    def set_default_workspace(workspace: str)
    def get_default_workspace() -> str
    
  2. lightrag.pyinitialize_storages() 中自动设置默认 workspace:

    • Automatically set default workspace in LightRAG.initialize_storages():
    from lightrag.kg.shared_storage import set_default_workspace
    set_default_workspace(self.workspace)
    
  3. 修改 initialize_pipeline_status() 在无参数时使用默认 workspace:

    • Modified initialize_pipeline_status() to use default workspace when called without parameters:
    async def initialize_pipeline_status(workspace: str = ""):
        if not workspace:
            workspace = get_default_workspace()  # 向后兼容 / Backward compatibility
        ...
    

效果 / Result:

  • ✅ 旧代码无需修改即可正常工作 / Old code works without modification
  • lightrag_server.py 中的 pipeline 现在会正确初始化到 rag 的 workspace / Pipeline in lightrag_server.py now correctly initializes to rag's workspace
  • ✅ 完全向后兼容 / Fully backward compatible

🔧 问题2:/health 端点支持 LIGHTRAG-WORKSPACE 请求头

🔧 Issue 2: /health Endpoint Support for LIGHTRAG-WORKSPACE Header

问题描述 / Problem:

  • /health 端点硬编码使用 "pipeline_status",无法返回 workspace-specific 状态
  • 前端 UI 无法指定要查询哪个 workspace 的状态
  • /health endpoint hardcoded to use "pipeline_status", cannot return workspace-specific status
  • Frontend UI cannot specify which workspace's status to query

解决方案 / Solution:

  1. 添加了 workspace 提取辅助函数:

    • Added workspace extraction helper function:
    def get_workspace_from_request(request: Request) -> str:
        workspace = request.headers.get("LIGHTRAG-WORKSPACE", "").strip()
        if not workspace:
            workspace = args.workspace  # 回退到默认 / Fallback to default
        return workspace
    
  2. 修改 /health 端点支持请求头:

    • Modified /health endpoint to support custom header:
    @app.get("/health")
    async def get_status(request: Request):
        workspace = get_workspace_from_request(request)
        namespace = f"{workspace}:pipeline" if workspace else "pipeline_status"
        pipeline_status = await get_namespace_data(namespace)
        ...
    
  3. 返回完整的 workspace 信息:

    • Return complete workspace information:
    return {
        "configuration": {
            "workspace": workspace,              # 实际使用的 / Actually used
            "default_workspace": args.workspace, # 服务器默认 / Server default
            ...
        }
    }
    

使用方法 / Usage:

# 无请求头 - 使用默认 workspace / No header - use default workspace
curl http://localhost:9621/health

# 带请求头 - 查询指定 workspace / With header - query specific workspace
curl -H "LIGHTRAG-WORKSPACE: tenant_a" http://localhost:9621/health
curl -H "LIGHTRAG-WORKSPACE: tenant_b" http://localhost:9621/health

效果 / Result:

  • ✅ 前端可以查询任意 workspace 的 pipeline 状态 / Frontend can query pipeline status of any workspace
  • ✅ 无请求头时自动使用默认 workspace / Automatically uses default workspace when no header provided
  • ✅ 为未来所有 API 端点提供统一的 workspace 提取机制 / Provides unified workspace extraction mechanism for all future API endpoints

📦 Commit 信息 / Commit Details

5f153582 fix: Add default workspace support for backward compatibility

Changes:
- lightrag/kg/shared_storage.py: +33 lines
- lightrag/lightrag.py: +8 lines  
- lightrag/api/lightrag_server.py: +41 lines

Total: 3 files changed, 82 insertions(+), 5 deletions(-)

✅ 测试验证 / Testing

已通过以下测试 / Passed following tests:

  1. ✅ 默认 workspace 机制正常工作 / Default workspace mechanism works correctly
  2. ✅ LightRAG 集成测试通过 / LightRAG integration tests pass
  3. ✅ 向后兼容性验证 / Backward compatibility verified
  4. ✅ 多 workspace 隔离验证 / Multi-workspace isolation verified

🎯 设计说明 / Design Notes

为什么 pipeline 需要特殊处理?/ Why does pipeline need special handling?

其他存储锁(如 Redis、Vector DB)通过 storage.global_config.get("workspace") 获取 workspace,因为它们的函数都接收存储对象作为参数:

Other storage locks (like Redis, Vector DB) get workspace via storage.global_config.get("workspace") because their functions receive storage objects as parameters:

# 存储锁示例 / Storage lock example
async def _merge_nodes_then_upsert(entity_name: str, entities_vdb, ...):
    workspace = entities_vdb.global_config.get("workspace", "")  # ✓ 可以获取 / Can get it
    ...

initialize_pipeline_status() 是独立的初始化函数,没有存储对象可传递,因此需要默认 workspace 机制:

But initialize_pipeline_status() is a standalone initialization function with no storage object to pass, thus requiring the default workspace mechanism:

# Pipeline 初始化 / Pipeline initialization
await rag.initialize_storages()       # 设置默认 workspace / Set default workspace
await initialize_pipeline_status()    # 使用默认 workspace / Use default workspace

请您查看并确认这些修改是否符合您的预期。如有任何问题或建议,请随时告知!

Please review and confirm if these changes meet your expectations. Feel free to let me know if you have any questions or suggestions!

BukeLy avatar Nov 15 '25 04:11 BukeLy

Pipeline Status 的 workspace 隔离已经基本没有问题了;但是 In Memory Json KV Storage 的 Workspace 隔离依然没有解决。需要改动 Share Storage的地方比较多,我还是自己来完成修改吧。请帮忙测试一下修改否能够正确工作吧。修改后的最新代码在dev-workspace-isolation分支。修改说明详见 PR# #2366。如果有问题请直接修改和提PR,PR合并分支为 dev-workspace-isolation

The workspace isolation for Pipeline Status is largely resolved; however, workspace isolation in the In-Memory JSON KV Storage remains unaddressed. As the changes involve multiple components in the shared storage layer, I'll take ownership of the implementation. Please help verify the correctness of the updated functionality. The latest code is available in the dev-workspace-isolation branch. For details, refer to PR #2366.

danielaskdd avatar Nov 17 '25 01:11 danielaskdd