This commit is contained in:
Philipinho
2026-01-17 02:23:47 +00:00
parent bcb004af21
commit c3a9a52b7f
15 changed files with 1033 additions and 6 deletions
@@ -67,4 +67,8 @@ export class CollaborationGateway {
async destroy(): Promise<void> {
await this.hocuspocus.destroy();
}
async openDirectConnection(documentName: string) {
return this.hocuspocus.openDirectConnection(documentName);
}
}
@@ -1,4 +1,12 @@
import { StarterKit } from '@tiptap/starter-kit';
import { EditorState, TextSelection } from '@tiptap/pm/state';
import {
initProseMirrorDoc,
relativePositionToAbsolutePosition,
updateYFragment,
} from 'y-prosemirror';
import * as Y from 'yjs';
import { Document } from '@hocuspocus/server';
import { TextAlign } from '@tiptap/extension-text-align';
import { TaskList } from '@tiptap/extension-task-list';
import { TaskItem } from '@tiptap/extension-task-item';
@@ -116,3 +124,96 @@ export function jsonToNode(tiptapJson: JSONContent) {
export function getPageId(documentName: string) {
return documentName.split('.')[1];
}
export type YjsSelection = {
anchor: any;
head: any;
};
export function setYjsMark(
doc: Document,
fragment: Y.XmlFragment,
yjsSelection: YjsSelection,
markName: string,
markAttributes: Record<string, any>,
) {
const schema = getSchema(tiptapExtensions);
const { doc: pNode, mapping } = initProseMirrorDoc(fragment, schema);
// Convert JSON positions to Y.js RelativePosition objects
const anchorRelPos = Y.createRelativePositionFromJSON(yjsSelection.anchor);
const headRelPos = Y.createRelativePositionFromJSON(yjsSelection.head);
console.log(anchorRelPos, headRelPos);
const anchor = relativePositionToAbsolutePosition(
doc,
fragment,
anchorRelPos,
mapping,
);
const head = relativePositionToAbsolutePosition(
doc,
fragment,
headRelPos,
mapping,
);
console.log('second')
console.log(anchor, head);
if (anchor === null || head === null) {
throw new Error('Could not resolve Y.js relative positions to absolute positions');
}
const state = EditorState.create({
doc: pNode,
schema: schema,
selection: TextSelection.create(pNode, anchor, head),
});
const tr = setMarkInProsemirror(schema.marks[markName], markAttributes, state);
// Update the Y.js fragment with the modified ProseMirror document
// @ts-ignore
updateYFragment(doc, fragment, tr.doc, mapping);
}
function setMarkInProsemirror(
type: any,
attributes: Record<string, any>,
state: EditorState,
) {
let tr = state.tr;
const { selection } = state;
const { ranges } = selection;
ranges.forEach((range) => {
const from = range.$from.pos;
const to = range.$to.pos;
state.doc.nodesBetween(from, to, (node, pos) => {
const trimmedFrom = Math.max(pos, from);
const trimmedTo = Math.min(pos + node.nodeSize, to);
const someHasMark = node.marks.find((mark) => mark.type === type);
if (someHasMark) {
node.marks.forEach((mark) => {
if (type === mark.type) {
tr = tr.addMark(
trimmedFrom,
trimmedTo,
type.create({
...mark.attrs,
...attributes,
}),
);
}
});
} else {
tr = tr.addMark(trimmedFrom, trimmedTo, type.create(attributes));
}
});
});
return tr;
}
@@ -157,7 +157,7 @@ export class PersistenceExtension implements Extension {
page: {
...page,
content: tiptapJson,
lastUpdatedById: context.user.id,
lastUpdatedById: context?.user?.id,
},
});
@@ -179,7 +179,7 @@ export class PersistenceExtension implements Extension {
async onChange(data: onChangePayload) {
const documentName = data.documentName;
const userId = data.context?.user.id;
const userId = data.context?.user?.id;
if (!userId) return;
if (!this.contributors.has(documentName)) {
+349
View File
@@ -0,0 +1,349 @@
import { TiptapTransformer } from "@hocuspocus/transformer";
import test from "ava";
import * as Y from "yjs";
import { newHocuspocus, newHocuspocusProvider, sleep } from "../utils/index.ts";
test("direct connection prevents document from being removed from memory", async (t) => {
await new Promise(async (resolve) => {
const server = await newHocuspocus();
await server.openDirectConnection("hocuspocus-test");
const provider = newHocuspocusProvider(server, {
onSynced() {
provider.configuration.websocketProvider.destroy();
provider.destroy();
sleep(server.configuration.debounce + 50).then(() => {
t.is(server.getDocumentsCount(), 1);
resolve("done");
});
},
});
});
});
test("direct connection works even if provider is connected", async (t) => {
await new Promise(async (resolve) => {
const server = await newHocuspocus();
const provider = newHocuspocusProvider(server, {
onSynced() {
provider.document.getMap("config").set("a", "valueFromProvider");
},
});
await sleep(150);
const directConnection =
await server.openDirectConnection("hocuspocus-test");
await directConnection.transact((doc) => {
t.is("valueFromProvider", String(doc.getMap("config").get("a")));
doc.getMap("config").set("b", "valueFromServerDirectConnection");
});
await sleep(100);
t.is(
"valueFromServerDirectConnection",
String(provider.document.getMap("config").get("b")),
);
resolve(1);
t.pass();
});
});
test("direct connection can apply yjsUpdate", async (t) => {
await new Promise(async (resolve) => {
const server = await newHocuspocus();
const provider = newHocuspocusProvider(server);
t.is("", provider.document.getXmlFragment("default").toJSON());
const directConnection =
await server.openDirectConnection("hocuspocus-test");
await directConnection.transact((doc) => {
Y.applyUpdate(
doc,
Y.encodeStateAsUpdate(
TiptapTransformer.toYdoc({
type: "doc",
content: [
{
type: "paragraph",
content: [
{
type: "text",
text: "Example Paragraph",
},
],
},
],
}),
),
);
});
await sleep(100);
t.is(
"<paragraph>Example Paragraph</paragraph>",
provider.document.getXmlFragment("default").toJSON(),
);
resolve(1);
t.pass();
});
});
test("direct connection can transact", async (t) => {
const server = await newHocuspocus();
const direct = await server.openDirectConnection("hocuspocus-test");
await direct.transact((document) => {
document.getArray("test").insert(0, ["value"]);
});
t.is(direct.document?.getArray("test").toJSON()[0], "value");
});
test("direct connection cannot transact once closed", async (t) => {
const server = await newHocuspocus();
const direct = await server.openDirectConnection("hocuspocus-test");
await direct.disconnect();
try {
await direct.transact((document) => {
document.getArray("test").insert(0, ["value"]);
});
t.fail(
"DirectConnection should throw an error when transacting on closed connection",
);
} catch (err) {
if (err instanceof Error && err.message === "direct connection closed") {
t.pass();
} else {
t.fail("unknown error");
}
}
});
test("if a direct connection closes, the document should be unloaded if there is no other connection left", async (t) => {
await new Promise(async (resolve) => {
const server = await newHocuspocus();
const direct = await server.openDirectConnection("hocuspocus-test1");
t.is(server.getDocumentsCount(), 1);
t.is(server.getConnectionsCount(), 1);
await direct.transact((document) => {
document.getArray("test").insert(0, ["value"]);
});
await direct.disconnect();
t.is(server.getConnectionsCount(), 0);
t.is(server.getDocumentsCount(), 0);
resolve("done");
});
});
test("direct connection transact awaits until onStoreDocument has finished", async (t) => {
let onStoreDocumentFinished = false;
await new Promise(async (resolve) => {
const server = await newHocuspocus({
onStoreDocument: async () => {
onStoreDocumentFinished = false;
await sleep(200);
onStoreDocumentFinished = true;
},
});
const direct = await server.openDirectConnection("hocuspocus-test2");
t.is(server.getDocumentsCount(), 1);
t.is(server.getConnectionsCount(), 1);
t.is(onStoreDocumentFinished, false);
await direct.transact((document) => {
document.getArray("test").insert(0, ["value"]);
});
await direct.disconnect();
t.is(onStoreDocumentFinished, true);
t.is(server.getConnectionsCount(), 0);
t.is(server.getDocumentsCount(), 0);
t.is(onStoreDocumentFinished, true);
resolve("done");
});
});
test("direct connection transact awaits until onStoreDocument has finished, even if unloadImmediately=false", async (t) => {
let onStoreDocumentFinished = false;
let directConnDisconnecting = false;
let storedAfterDisconnect = false;
await new Promise(async (resolve) => {
const server = await newHocuspocus({
unloadImmediately: false,
onStoreDocument: async () => {
onStoreDocumentFinished = false;
await sleep(200);
onStoreDocumentFinished = true;
if (directConnDisconnecting) {
storedAfterDisconnect = true;
}
},
afterUnloadDocument: async (data) => {
if (!storedAfterDisconnect) {
t.fail("this shouldnt be called");
}
},
});
const direct = await server.openDirectConnection("hocuspocus-test");
t.is(server.getDocumentsCount(), 1);
t.is(server.getConnectionsCount(), 1);
t.is(onStoreDocumentFinished, false);
await direct.transact((document) => {
document.getArray("test").insert(0, ["value"]);
});
const provider = newHocuspocusProvider(server);
provider.document.getMap("aaa").set("bb", "b");
provider.disconnect();
provider.configuration.websocketProvider.disconnect();
await sleep(100);
directConnDisconnecting = true;
await direct.disconnect();
t.is(onStoreDocumentFinished, true);
t.is(server.getConnectionsCount(), 0);
t.is(storedAfterDisconnect, true);
resolve("done");
});
});
test("does not unload document if an earlierly started onStoreDocument is still running", async (t) => {
let onStoreDocumentStarted = 0;
let onStoreDocumentFinished = 0;
const server = await newHocuspocus({
unloadImmediately: false,
debounce: 100,
onStoreDocument: async () => {
onStoreDocumentStarted++;
if (onStoreDocumentStarted === 1) {
// Simulate a long running onStoreDocument for the first debounced save
await sleep(500);
}
onStoreDocumentFinished++;
},
afterUnloadDocument: async (data) => {},
});
// Trigger a change, which will start a debounced onStoreDocument after 100ms
const provider = newHocuspocusProvider(server);
provider.document.getMap("aaa").set("bb", "b");
await new Promise(async (resolve) => {
provider.on("synced", resolve);
if (!provider.unsyncedChanges) resolve("");
});
t.is(server.getDocumentsCount(), 1);
t.is(server.getConnectionsCount(), 1);
// Wait for the debounced onStoreDocument to start
await sleep(110);
t.is(onStoreDocumentStarted, 1);
t.is(onStoreDocumentFinished, 0);
// Open direct connection to prevent document from being unloaded
const direct = await server.openDirectConnection("hocuspocus-test");
t.is(server.getDocumentsCount(), 1);
t.is(server.getConnectionsCount(), 2);
// Close the websocket client
provider.disconnect();
provider.configuration.websocketProvider.disconnect();
await sleep(50);
t.is(server.getDocumentsCount(), 1);
t.is(server.getConnectionsCount(), 1);
t.is(onStoreDocumentStarted, 1);
t.is(onStoreDocumentFinished, 0);
direct.disconnect();
await sleep(50);
// Another save must not start before the first one has finished
t.is(onStoreDocumentStarted, 1);
t.is(onStoreDocumentFinished, 0);
// Document must not be unloaded yet, because the first onStoreDocument is still running
t.is(server.getDocumentsCount(), 1);
t.is(server.getConnectionsCount(), 0);
// Wait enough time to be sure the onStoreDocument has finished and ensure that the document was eventually unloaded
await sleep(500);
// The second onStoreDocument triggered by direct.disconnect must have started and finished now
t.is(onStoreDocumentStarted, 2);
t.is(onStoreDocumentFinished, 2);
// The document must have been unloaded now as well
t.is(server.getDocumentsCount(), 0);
});
test("creating a websocket connection after transact but before debounce interval doesnt create different docs", async (t) => {
let onStoreDocumentFinished = false;
let disconnected = false;
await new Promise(async (resolve) => {
const server = await newHocuspocus({
onStoreDocument: async () => {
onStoreDocumentFinished = false;
await sleep(200);
onStoreDocumentFinished = true;
},
async afterUnloadDocument(data) {
console.log("called");
if (disconnected) {
t.fail("must not be called");
}
},
});
const direct = await server.openDirectConnection("hocuspocus-test");
t.is(server.getDocumentsCount(), 1);
t.is(server.getConnectionsCount(), 1);
t.is(onStoreDocumentFinished, false);
await direct.transact((document) => {
document.transact(() => {
document.getArray("test").insert(0, ["value"]);
}, "testOrigin");
});
await direct.disconnect();
t.is(onStoreDocumentFinished, true);
disconnected = true;
t.is(server.getConnectionsCount(), 0);
t.is(server.getDocumentsCount(), 0);
t.is(onStoreDocumentFinished, true);
const provider = newHocuspocusProvider(server);
await sleep(server.configuration.debounce * 2);
resolve("done");
});
});
@@ -10,6 +10,7 @@ import {
} from '@nestjs/common';
import { CommentService } from './comment.service';
import { CreateCommentDto } from './dto/create-comment.dto';
import { CreateReadOnlyCommentDto } from './dto/create-readonly-comment.dto';
import { UpdateCommentDto } from './dto/update-comment.dto';
import { PageIdDto, CommentIdDto } from './dto/comments.input';
import { AuthUser } from '../../common/decorators/auth-user.decorator';
@@ -62,6 +63,28 @@ export class CommentController {
);
}
@HttpCode(HttpStatus.OK)
@Post('create-readonly')
async createReadOnly(
@Body() createCommentDto: CreateReadOnlyCommentDto,
@AuthUser() user: User,
@AuthWorkspace() workspace: Workspace,
) {
const page = await this.pageRepo.findById(createCommentDto.pageId);
if (!page || page.deletedAt) {
throw new NotFoundException('Page not found');
}
return this.commentService.createReadOnlyComment(
{
userId: user.id,
page,
workspaceId: workspace.id,
},
createCommentDto,
);
}
@HttpCode(HttpStatus.OK)
@Post('/')
async findPageComments(
@@ -1,9 +1,10 @@
import { Module } from '@nestjs/common';
import { CommentService } from './comment.service';
import { CommentController } from './comment.controller';
import { CollaborationModule } from '../../collaboration/collaboration.module';
@Module({
imports: [],
imports: [CollaborationModule],
controllers: [CommentController],
providers: [CommentService],
exports: [CommentService],
@@ -2,9 +2,11 @@ import {
BadRequestException,
ForbiddenException,
Injectable,
Logger,
NotFoundException,
} from '@nestjs/common';
import { CreateCommentDto } from './dto/create-comment.dto';
import { CreateReadOnlyCommentDto } from './dto/create-readonly-comment.dto';
import { UpdateCommentDto } from './dto/update-comment.dto';
import { CommentRepo } from '@docmost/db/repos/comment/comment.repo';
import { Comment, Page, User } from '@docmost/db/types/entity.types';
@@ -12,13 +14,19 @@ import { PaginationOptions } from '@docmost/db/pagination/pagination-options';
import { PaginationResult } from '@docmost/db/pagination/pagination';
import { PageRepo } from '@docmost/db/repos/page/page.repo';
import { SpaceMemberRepo } from '@docmost/db/repos/space/space-member.repo';
import { CollaborationGateway } from '../../collaboration/collaboration.gateway';
import { setYjsMark } from '../../collaboration/collaboration.util';
import * as Y from 'yjs';
@Injectable()
export class CommentService {
private readonly logger = new Logger(CommentService.name);
constructor(
private commentRepo: CommentRepo,
private pageRepo: PageRepo,
private spaceMemberRepo: SpaceMemberRepo,
private collaborationGateway: CollaborationGateway,
) {}
async findById(commentId: string) {
@@ -105,4 +113,49 @@ export class CommentService {
return comment;
}
async createReadOnlyComment(
opts: { userId: string; page: Page; workspaceId: string },
createCommentDto: CreateReadOnlyCommentDto,
): Promise<Comment> {
const { userId, page, workspaceId } = opts;
const commentContent = JSON.parse(createCommentDto.content);
const comment = await this.commentRepo.insertComment({
pageId: page.id,
content: commentContent,
selection: createCommentDto?.selection?.substring(0, 250),
type: 'inline',
creatorId: userId,
workspaceId: workspaceId,
spaceId: page.spaceId,
});
const documentName = `page.${page.id}`;
const directConnection =
await this.collaborationGateway.openDirectConnection(documentName);
try {
await directConnection.transact((doc) => {
const fragment = doc.getXmlFragment('default');
setYjsMark(doc, fragment, createCommentDto.yjsSelection, 'comment', {
commentId: comment.id,
resolved: false,
});
});
} catch (error) {
this.logger.error(
`Failed to apply comment mark for comment ${comment.id}`,
error,
);
await this.commentRepo.deleteComment(comment.id);
throw new BadRequestException(
'Failed to apply comment mark. Selection may have changed.',
);
} finally {
await directConnection.disconnect();
}
return comment;
}
}
@@ -0,0 +1,19 @@
import { IsJSON, IsObject, IsOptional, IsString } from 'class-validator';
export class CreateReadOnlyCommentDto {
@IsString()
pageId: string;
@IsJSON()
content: any;
@IsOptional()
@IsString()
selection: string;
@IsObject()
yjsSelection: {
anchor: any;
head: any;
};
}
+181
View File
@@ -0,0 +1,181 @@
async createThread(options: {
initialComment: { body: CommentBody; metadata?: any };
metadata?: any;
}) {
const thread = await threadStore.createThread(options);
if (threadStore.addThreadToDocument) {
const view = editor.prosemirrorView!;
const pmSelection = view.state.selection;
const ystate = ySyncPluginKey.getState(view.state);
const selection = {
prosemirror: {
head: pmSelection.head,
anchor: pmSelection.anchor,
},
yjs: ystate
? getRelativeSelection(ystate.binding, view.state)
: undefined,
};
await threadStore.addThreadToDocument({
threadId: thread.id,
selection,
});
} else {
(editor as any)._tiptapEditor.commands.setMark(markType, {
orphan: false,
threadId: thread.id,
});
}
},
userStore,
commentEditorSchema,
---
public addThreadToDocument = async (options: {
threadId: string;
selection: {
prosemirror: {
head: number;
anchor: number;
};
yjs: {
head: any;
anchor: any;
};
};
}) => {
const { threadId, ...rest } = options;
return this.doRequest(`/${threadId}/addToDocument`, "POST", rest);
};
-----
// addToDocument
router.post("/:threadId/addToDocument", async (c) => {
const json = await c.req.json();
// TODO: you'd probably validate the request json here
const doc = c.get("document");
const fragment = doc.getXmlFragment("doc");
setMark(doc, fragment, json.selection.yjs, "comment", {
orphan: false,
threadId: c.req.param("threadId"),
});
return c.json({ message: "Thread added to document" });
});
----
import { ServerBlockNoteEditor } from "@blocknote/server-util";
import { Document } from "@hocuspocus/server";
import { EditorState, TextSelection } from "prosemirror-state";
import {
initProseMirrorDoc,
relativePositionToAbsolutePosition,
updateYFragment,
} from "y-prosemirror";
import * as Y from "yjs";
/**
* Sets a mark in the yjs document based on a yjs selection
*/
export function setMark(
doc: Document,
fragment: Y.XmlFragment,
yjsSelection: {
anchor: any;
head: any;
},
markName: string,
markAttributes: any
) {
// needed to get the pmSchema
// if you use a BlockNote custom schema, make sure to pass it to the create options
const editor = ServerBlockNoteEditor.create();
// get the prosemirror document
const { doc: pNode, mapping } = initProseMirrorDoc(
fragment,
editor.editor.pmSchema as any
);
// get the prosemirror positions based on the yjs positions
// we need to get this from yjs because other users might have made changes in between
const anchor = relativePositionToAbsolutePosition(
doc,
fragment,
yjsSelection.anchor,
mapping
);
const head = relativePositionToAbsolutePosition(
doc,
fragment,
yjsSelection.head,
mapping
);
// now, let's create the mark in the prosemirror document
const state = EditorState.create({
doc: pNode,
schema: editor.editor.pmSchema as any,
selection: TextSelection.create(pNode, anchor!, head!),
});
const tr = setMarkInProsemirror(
editor.editor.pmSchema.marks[markName],
markAttributes,
state
);
// finally, update the yjs document with the new prosemirror document
updateYFragment(doc, fragment, tr.doc, mapping);
}
// based on https://github.com/ueberdosis/tiptap/blob/f3258d9ee5fb7979102fe63434f6ea4120507311/packages/core/src/commands/setMark.ts#L66
export const setMarkInProsemirror = (
type: any,
attributes = {},
state: EditorState
) => {
let tr = state.tr;
const { selection } = state;
const { ranges } = selection;
ranges.forEach((range) => {
const from = range.$from.pos;
const to = range.$to.pos;
state.doc.nodesBetween(from, to, (node, pos) => {
const trimmedFrom = Math.max(pos, from);
const trimmedTo = Math.min(pos + node.nodeSize, to);
const someHasMark = node.marks.find((mark) => mark.type === type);
// if there is already a mark of this type
// we know that we have to merge its attributes
// otherwise we add a fresh new mark
if (someHasMark) {
node.marks.forEach((mark) => {
if (type === mark.type) {
tr = tr.addMark(
trimmedFrom,
trimmedTo,
type.create({
...mark.attrs,
...attributes,
})
);
}
});
} else {
tr = tr.addMark(trimmedFrom, trimmedTo, type.create(attributes));
}
});
});
return tr;
};