lexical
lexical copied to clipboard
Bug: Collaboration between multiple users not working
When using the Lexical editor’s CollaborationPlugin with the MercureProvider (based on y-websocket code), real-time collaboration features such as user cursor visibility and document updates are not working. Messages are being sent and received, but changes are not reflected across different users’ views.
Lexical version:
@lexical/react: ^0.13.1
lexical: ^0.13.1
y-protocols: ^1.0.6
yjs: ^13.6.18
Steps To Reproduce
- Initialize the Lexical editor with the CollaborationPlugin.
- Configure the plugin to use the MercureProvider as described in the collaborator.ts code below.
- Open the editor in multiple browsers with different user credentials.
- Attempt to edit the document and observe the behavior.
import { CollaborationPlugin } from '@lexical/react/LexicalCollaborationPlugin';
import { createMercureProvider } from './collaboration';
const initialConfig = {
namespace: 'Streamer',
nodes: [...EditorNodes],
onError: (error: Error) => {
throw error;
},
theme: EditorTheme,
editorState: initialEditorState,
editable: !readOnly,
};
return (
<LexicalComposer key={editorKey} initialConfig={initialConfig}>
<ToolbarPlugin
setIsLinkEditMode={setIsLinkEditMode}
{...toolbarProps}
/>
<div
ref={containerRef}
className={overrideTailwindClasses(
clsx(
'relative block w-full rounded-t-lg bg-white',
containerClassName
)
)}
>
<GlobalEventsPlugin
isTypeaheadMenuActive={isTypeaheadMenuActive}
onEnter={submitOnEnter ? submit : undefined}
onEscape={onCancel}
/>
<DragDropPastePlugin
canEmbedImages={
toolbarProps?.buttons?.insertImage ||
floatingToolbarProps?.buttons?.insertImage
}
onFilePaste={onFilePaste}
/>
{collaborationId ? (
<CollaborationPlugin
id={collaborationId}
username={currentUser?.name}
cursorColor={currentUser?.color || undefined}
cursorsContainerRef={containerRef}
providerFactory={createMercureProvider}
initialEditorState={initialEditorState}
shouldBootstrap={!skipCollaborationInit}
/>
) : (
<HistoryPlugin />
)}
// ...
import { Provider, ProviderAwareness } from '@lexical/yjs';
import * as decoding from 'lib0/decoding';
import * as encoding from 'lib0/encoding';
import { Observable } from 'lib0/observable';
import * as authProtocol from 'y-protocols/auth';
import * as awarenessProtocol from 'y-protocols/awareness';
import * as syncProtocol from 'y-protocols/sync';
import * as Y from 'yjs';
import MercureClient, { MercureMessage } from '../../utils/mercureClient';
export const messageSync = 0;
export const messageQueryAwareness = 3;
export const messageAwareness = 1;
export const messageAuth = 2;
const messageHandlers: Array<
(
encoder: encoding.Encoder,
decoder: decoding.Decoder,
provider: MercureProvider,
emitSynced: boolean,
messageType: number
) => void
> = [];
messageHandlers[messageSync] = (
encoder: encoding.Encoder,
decoder: decoding.Decoder,
provider: MercureProvider,
emitSynced: boolean,
_messageType: number
) => {
encoding.writeVarUint(encoder, messageSync);
const syncMessageType = syncProtocol.readSyncMessage(
decoder,
encoder,
provider.doc,
provider
);
if (
emitSynced &&
syncMessageType === syncProtocol.messageYjsSyncStep2 &&
!provider.getSynced()
) {
provider.setSynced(true);
}
};
messageHandlers[messageQueryAwareness] = (
encoder: encoding.Encoder,
_decoder: decoding.Decoder,
provider: MercureProvider,
_emitSynced: boolean,
_messageType: number
) => {
encoding.writeVarUint(encoder, messageAwareness);
encoding.writeVarUint8Array(
encoder,
awarenessProtocol.encodeAwarenessUpdate(
provider.awareness as unknown as awarenessProtocol.Awareness,
Array.from(provider.awareness.getStates().keys())
)
);
};
messageHandlers[messageAwareness] = (
_encoder: encoding.Encoder,
decoder: decoding.Decoder,
provider: MercureProvider,
_emitSynced: boolean,
_messageType: number
) => {
awarenessProtocol.applyAwarenessUpdate(
provider.awareness as unknown as awarenessProtocol.Awareness,
decoding.readVarUint8Array(decoder),
provider
);
};
messageHandlers[messageAuth] = (
_encoder: encoding.Encoder,
decoder: decoding.Decoder,
provider: MercureProvider,
_emitSynced: boolean,
_messageType: number
) => {
authProtocol.readAuthMessage(decoder, provider.doc, (_ydoc, reason) =>
console.warn(
`Permission denied to access ${provider.getRoomID()}.\n${reason}`
)
);
};
class MercureProvider extends Observable<string> implements Provider {
public doc: Y.Doc;
public awareness: ProviderAwareness;
private synced = false;
private roomID: string;
private eventListeners: Map<string, Set<(arg: any) => void>> = new Map();
constructor(roomID: string, doc: Y.Doc, { connect = true } = {}) {
super();
this.roomID = roomID;
this.doc = doc;
this.awareness = new awarenessProtocol.Awareness(
doc
) as unknown as ProviderAwareness;
this.doc.on('update', this.yjsUpdateHandler.bind(this));
(this.awareness as unknown as awarenessProtocol.Awareness).on(
'update',
this.yjsAwarenessUpdateHandler.bind(this)
);
if (connect) {
this.connect();
}
}
private readMessage(buf: Uint8Array, emitSynced: boolean): encoding.Encoder {
const decoder = decoding.createDecoder(buf);
const encoder = encoding.createEncoder();
const messageType = decoding.readVarUint(decoder);
const messageHandler = messageHandlers[messageType];
if (messageHandler) {
messageHandler(encoder, decoder, this, emitSynced, messageType);
} else {
console.error(`Unknown message type: ${messageType}`);
}
return encoder;
}
private ocpDataUpdateHandler(data: MercureMessage): void {
const message = typeof data === 'string' ? data : JSON.stringify(data);
const uint8Array = new Uint8Array(
message.split('').map((char) => char.charCodeAt(0))
);
const encoder = this.readMessage(uint8Array, true);
if (encoding.length(encoder) > 1) {
this.send(encoding.toUint8Array(encoder));
}
}
private yjsUpdateHandler(update: Uint8Array, origin: unknown): void {
if (origin !== this) {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageSync);
syncProtocol.writeUpdate(encoder, update);
this.send(encoding.toUint8Array(encoder));
}
this.dispatchEvent('update', update);
}
private yjsAwarenessUpdateHandler({
added,
updated,
removed,
}: {
added: number[];
updated: number[];
removed: number[];
}): void {
const changedClients = added.concat(updated).concat(removed);
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageAwareness);
encoding.writeVarUint8Array(
encoder,
awarenessProtocol.encodeAwarenessUpdate(
this.awareness as unknown as awarenessProtocol.Awareness,
changedClients
)
);
this.send(encoding.toUint8Array(encoder));
}
connect(): void {
MercureClient.subscribe(
`/rooms/${this.roomID}`,
this.ocpDataUpdateHandler.bind(this)
);
if (this.doc && this.awareness) {
this.sendSyncMessages();
this.dispatchEvent('status', { status: 'connected' });
}
}
disconnect(): void {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageAwareness);
encoding.writeVarUint8Array(
encoder,
awarenessProtocol.encodeAwarenessUpdate(
this.awareness as unknown as awarenessProtocol.Awareness,
[this.doc.clientID],
new Map()
)
);
this.send(encoding.toUint8Array(encoder));
MercureClient.unsubscribe(`/rooms/${this.roomID}`);
this.dispatchEvent('status', { status: 'disconnected' });
}
dispose(): void {
this.disconnect();
(this.awareness as unknown as awarenessProtocol.Awareness).off(
'update',
this.yjsAwarenessUpdateHandler
);
this.doc.off('update', this.yjsUpdateHandler);
super.destroy();
}
private sendSyncMessages(): void {
const encoderSync = encoding.createEncoder();
encoding.writeVarUint(encoderSync, messageSync);
syncProtocol.writeSyncStep1(encoderSync, this.doc);
this.send(encoding.toUint8Array(encoderSync));
const encoderState = encoding.createEncoder();
encoding.writeVarUint(encoderState, messageSync);
syncProtocol.writeSyncStep2(encoderState, this.doc);
this.send(encoding.toUint8Array(encoderState));
const encoderAwarenessQuery = encoding.createEncoder();
encoding.writeVarUint(encoderAwarenessQuery, messageQueryAwareness);
this.send(encoding.toUint8Array(encoderAwarenessQuery));
const encoderAwarenessState = encoding.createEncoder();
encoding.writeVarUint(encoderAwarenessState, messageAwareness);
encoding.writeVarUint8Array(
encoderAwarenessState,
awarenessProtocol.encodeAwarenessUpdate(
this.awareness as unknown as awarenessProtocol.Awareness,
[this.doc.clientID]
)
);
this.send(encoding.toUint8Array(encoderAwarenessState));
}
private send(message: Uint8Array): void {
MercureClient.publish(
`/rooms/${this.roomID}`,
String.fromCharCode(...message)
);
}
getRoomID(): string {
return this.roomID;
}
getSynced(): boolean {
return this.synced;
}
setSynced(value: boolean): void {
this.synced = value;
}
private dispatchEvent(type: string, arg: any): void {
const listeners = this.eventListeners.get(type);
if (listeners) {
listeners.forEach((cb) => cb(arg));
}
}
on(
type: 'sync' | 'update' | 'status' | 'reload',
cb: (arg: any) => void
): void {
if (!this.eventListeners.has(type)) {
this.eventListeners.set(type, new Set());
}
this.eventListeners.get(type)!.add(cb);
}
off(
type: 'sync' | 'update' | 'status' | 'reload',
cb: (arg: any) => void
): void {
if (this.eventListeners.has(type)) {
this.eventListeners.get(type)!.delete(cb);
}
}
}
export function createMercureProvider(
id: string,
yjsDocMap: Map<string, Y.Doc>
): Provider {
let doc = yjsDocMap.get(id);
if (!doc) {
doc = new Y.Doc();
yjsDocMap.set(id, doc);
} else {
doc.load();
}
return new MercureProvider(id, doc);
}
import axios from 'axios';
import * as pako from 'pako';
import { mergeDeep } from './mergeDeep';
export type MercureMessage = Record<string, any> | string;
export type MercureDependencies = Record<string, any>;
export type MercureEventSource = {
topic: string;
data: MercureMessage;
chunkIndex?: number;
totalChunks?: number;
};
class MercureClient {
private mercureHubUrl: string;
private mercureJwt: string;
private subscribedTopics: Record<
string,
(data: MercureMessage, dependencies: MercureDependencies) => void
> = {};
private dependencies: MercureDependencies = {};
private eventSource: EventSource | null = null;
private receivedChunks: Record<
string,
{ chunks: string[]; totalChunks: number }
> = {};
constructor(mercureHubUrl: string, mercureJwt: string) {
this.mercureHubUrl = mercureHubUrl;
this.mercureJwt = mercureJwt;
if (this.mercureHubUrl) {
this.initializeEventSource();
}
}
private initializeEventSource() {
const url = new URL(this.mercureHubUrl);
url.searchParams.append('topic', '/datasync');
this.eventSource = new EventSource(url.toString());
this.eventSource.onmessage = (event) => {
const { topic, data, chunkIndex, totalChunks }: MercureEventSource =
JSON.parse(event.data);
let jsonData;
try {
if (chunkIndex !== undefined && totalChunks !== undefined) {
// Handle chunked data
if (!this.receivedChunks[topic]) {
this.receivedChunks[topic] = { chunks: [], totalChunks };
}
this.receivedChunks[topic].chunks[chunkIndex] = data as string;
if (window.appConfig?.isDev) {
console.debug(
`Received chunk ${
chunkIndex + 1
}/${totalChunks} for topic ${topic}`
);
}
if (
this.receivedChunks[topic].chunks.filter(Boolean).length ===
totalChunks
) {
const fullData = this.receivedChunks[topic].chunks.join('');
const decodedData = atob(fullData);
const charData = decodedData.split('').map((x) => x.charCodeAt(0));
const binData = new Uint8Array(charData);
const decompressedData = pako.inflate(binData, { to: 'string' });
jsonData = JSON.parse(decompressedData);
// Clean up after processing
delete this.receivedChunks[topic];
} else {
return; // Wait for more chunks
}
} else {
// Handle non-chunked data
try {
const decodedData = atob(data as string);
const charData = decodedData.split('').map((x) => x.charCodeAt(0));
const binData = new Uint8Array(charData);
const decompressedData = pako.inflate(binData, { to: 'string' });
jsonData = JSON.parse(decompressedData);
} catch (e) {
// If decoding and decompressing fails, set jsonData to data directly
jsonData = JSON.parse(data as string);
}
}
} catch (error) {
// If parsing fails, set jsonData to the original data
jsonData = data;
}
const callback = this.subscribedTopics[topic];
if (callback) {
if (window.appConfig?.isDev) {
console.debug(`Message received on topic ${topic}:`, jsonData);
}
callback(jsonData, this.dependencies);
}
};
}
setDependencies(dependencies: Record<string, any>) {
this.dependencies = mergeDeep(this.dependencies, dependencies);
}
subscribe(
topic: string,
callback: (data: MercureMessage, dependencies: MercureDependencies) => void
) {
if (!this.subscribedTopics[topic]) {
this.subscribedTopics[topic] = callback;
}
}
unsubscribe(topic: string) {
delete this.subscribedTopics[topic];
}
transfer(topic: string, data: MercureMessage) {
const callback = this.subscribedTopics[topic];
if (callback) {
callback(data, this.dependencies);
}
}
async publish(topic: string, data: MercureMessage) {
try {
const formData = new URLSearchParams();
formData.append('topic', '/datasync');
formData.append('data', JSON.stringify({ topic, data }));
await axios.post(this.mercureHubUrl, formData, {
headers: {
Authorization: `Bearer ${this.mercureJwt}`,
},
});
} catch (error) {
console.error('Error publishing data:', error);
}
}
}
export default new MercureClient(
window.appConfig.mercureHubUrl,
window.appConfig.mercureJwt
);
The current behavior
Although no errors are thrown, the collaboration does not seem to be working between multiple users. Messages are sent and received, but user cursors and updated content are not visible across different users.
The expected behavior
User cursors and document updates should be visible to all connected users in real-time.
Impact of fix
This bug affects all users attempting to use the collaboration feature, causing a lack of real-time collaboration functionality. Fixing this issue would benefit any team or group relying on real-time editing, improving the overall usability of the Lexical editor.