How to Stop DocumentStoreServer?
I am trying to implement, session state-aware feature using Pathway, but I need to re-start the DocumentStoreServer, whenever there is change in the session state (new session, deleting previous session, or idle).
But I cannot figure out, how to stop Pathway's DocumentStoreServer. Kindly help?
Hey @udbhav-44 !
Thank you for your question. A DocumentStoreServer is not aimed to be stopped and restarted on demand.
We would need more information about your use case to help you. Can you explain the process you are trying to implement and what you mean by "session state-aware feature"?
Description
When attempting to dynamically reload DocumentStoreServer to handle session-specific document updates, the server fails to properly reinitialize with new session state. This prevents implementation of multi-tenant document isolation where each session requires its own document set.
Use Case Context
We need to support dynamic session management where:
- Each user session has isolated document storage
- Session configurations change via
.current_sessionfile updates - Server should hot-reload without downtime to serve new session documents
Expected Behavior
- Update
.current_sessionpath with new document location - DocumentStoreServer gracefully reloads configuration
- New queries reflect documents from updated session path
- Existing connections complete before old instances terminate
Hey, that's an interesting use case. Could you explain the error with a bit more details? What is the approach you are following to re-initialize with a state, could you share snippets if possible?
We don't have a way to persist & re-loads states for the Document store. However I can offer a way to achieve what you want.
If the parsers and embedders are shared among all users and only difference is the data sources/files, then you can modify metadata of each source to include a user_id that you can use to filter the documents. This way, Document store will have all the documents from all the users in the index, and you can generate user specific search by filtering with the metadata. For instance:
<<other filters, file names, ...>> && user_id == `user1`
To achieve this, you can use the following,
import pathway as pw
schema = pw.schema_from_types(data=bytes, _metadata=dict)
dummy_input = pw.debug.table_from_rows( # this should be your connector, ie. pw.io.fs.read(...)
schema=schema, rows=[("foo", {}),
("bar", {}),
("baz", {})
]
)
@pw.udf
def add_userid(metadata: pw.Json, id: str) -> dict:
meta = metadata.as_dict()
meta["user_id"] = id
return meta
dummy_input = dummy_input.with_columns(_metadata=add_userid(pw.this._metadata, "user1"))
# pw.debug.compute_and_print(input) # visualize if you need
# put the dummy_input into your DocumentStore
Then, while sending a query to the DocumentStore, you can add metadata_filter key to request JSON with the value user_id == "user1".
Keep in mind that pw.debug is only for demonstration, it will not work with the regular DocumentStore instance. You should apply the add_userid udf to your connector tables.
Hey @udbhav-44, just a quick point - in your use case, is it acceptable to "hot reload" the server (effectively stopping one user's session and start another user's)? For most of our users, this is an issue which precludes such a solution. Normally the recommended solution with Pathway would be to have multiple distinct Pathway pipelines running all the time, one pipeline per user, each in its own Docker, and rely on the operating system to manage memory and other resources.
Hey @dxtrous and @berkecanrizai , thanks for your response! Let me clarify my use case: Basically I am trying to implement Session-isolated RAG architecture for a Single-user Multi-Session Chatbot using Pathway The Data is structured in this way
data/
├── predefined/ # Immutable base knowledge
└── sessions/
├── session_001/ # Ephemeral user session storage
├── session_002/
└── session_active/ # Symlink to current session
My RAG pipeline works with three types of data:
- Predefined data (static knowledge base)
- Live data (fetched via function calls and scrapers)
- User-uploaded data (session-specific knowledge)
My DocumentStoreServer is connected to a filesystem (pw.io.fs) and listens to two main folders:
- Predefined data folder (static knowledge base)
- User knowledge base (session-specific uploads)
Key requirement: A user should be able to start multiple sessions, where:
- They can access the predefined knowledge base.
- They can upload their own documents and use them in queries.
- They can incorporate live data dynamically.
- Data isolation per session: To ensure security and avoid cross-session confusion, each user session should have its own subfolder within the user knowledge base.
When a user switches sessions, the DocumentStoreServer should reload to: This is managed by the ConfigWatcher
class ConfigWatcher(FileSystemEventHandler):
def __init__(self, manager: ServerManager):
super().__init__()
self.manager = manager
self.last_update = time.time()
self.debounce = 2.0
self.session_path = multiprocessing.Manager().Value(str, "")
def on_modified(self, event):
if "session_state" in event.src_path and time.time() - self.last_update > self.debounce:
self.last_update = time.time()
new_path = self._load_session_path()
if new_path != self.manager.shared_config.session_path:
self.manager.shared_config.session_path = new_path
self.manager.reload()
- Disconnect from the previous session’s data.
- Connect to the new session’s data.
Given this setup, a "hot reload" approach makes sense because it ensures that each session is isolated and only relevant data is available at any time. @berkecanrizai I will also explore the metadata method mentioned by you