diff --git a/tools/relay/server.ts b/tools/relay/server.ts new file mode 100644 index 0000000..008f38e --- /dev/null +++ b/tools/relay/server.ts @@ -0,0 +1,159 @@ +import { Server } from "@modelcontextprotocol/sdk/server/index.js"; +import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js"; +import { + CallToolRequestSchema, + ListToolsRequestSchema, +} from "@modelcontextprotocol/sdk/types.js"; +import http from "node:http"; +import { RelayQueue, isRole } from "./queue.ts"; + +const PORT = 7331; +const queue = new RelayQueue(); + +const mcpServer = new Server( + { name: "relay", version: "0.1.0" }, + { capabilities: { tools: {} } } +); + +const TOOLS = [ + { + name: "post_message", + description: + "Push a message to a recipient's inbox. Returns the assigned message id.", + inputSchema: { + type: "object" as const, + properties: { + from: { + type: "string", + enum: ["pm", "dev-a", "dev-b"], + description: "Your role name", + }, + to: { + type: "string", + enum: ["pm", "dev-a", "dev-b"], + description: "Recipient role name", + }, + kind: { + type: "string", + enum: ["status", "question", "directive", "free"], + description: "Message type matching the coordination protocol", + }, + body: { + type: "string", + description: "Message body — freeform markdown, typically the full formatted block", + }, + }, + required: ["from", "to", "kind", "body"], + }, + }, + { + name: "read_messages", + description: + "Pop and return all pending messages for this recipient. Inbox is empty after this call (consume-once).", + inputSchema: { + type: "object" as const, + properties: { + for: { + type: "string", + enum: ["pm", "dev-a", "dev-b"], + description: "Your role name", + }, + }, + required: ["for"], + }, + }, + { + name: "list_pending", + description: + "Return count and kinds of pending messages without consuming them.", + inputSchema: { + type: "object" as const, + properties: { + for: { + type: "string", + enum: ["pm", "dev-a", "dev-b"], + description: "Your role name", + }, + }, + required: ["for"], + }, + }, +]; + +mcpServer.setRequestHandler(ListToolsRequestSchema, async () => ({ tools: TOOLS })); + +mcpServer.setRequestHandler(CallToolRequestSchema, async (request) => { + const { name, arguments: args } = request.params; + const a = args as Record; + + if (name === "post_message") { + if (!isRole(a.from)) { + return { content: [{ type: "text" as const, text: `Error: unknown role "${a.from}"` }], isError: true }; + } + if (!isRole(a.to)) { + return { content: [{ type: "text" as const, text: `Error: unknown role "${a.to}"` }], isError: true }; + } + const kind = a.kind as "status" | "question" | "directive" | "free"; + const msg = queue.post(a.from, a.to, kind, a.body); + const ts = new Date(msg.ts).toTimeString().slice(0, 8); + const preview = a.body.slice(0, 60).replace(/\n/g, " "); + const ellipsis = a.body.length > 60 ? "..." : ""; + process.stdout.write(`[${ts}] ${a.from} → ${a.to} [${kind}] "${preview}${ellipsis}"\n`); + return { content: [{ type: "text" as const, text: JSON.stringify({ id: msg.id }) }] }; + } + + if (name === "read_messages") { + if (!isRole(a.for)) { + return { content: [{ type: "text" as const, text: `Error: unknown role "${a.for}"` }], isError: true }; + } + const messages = queue.read(a.for); + return { content: [{ type: "text" as const, text: JSON.stringify(messages) }] }; + } + + if (name === "list_pending") { + if (!isRole(a.for)) { + return { content: [{ type: "text" as const, text: `Error: unknown role "${a.for}"` }], isError: true }; + } + const result = queue.pending(a.for); + return { content: [{ type: "text" as const, text: JSON.stringify(result) }] }; + } + + return { + content: [{ type: "text" as const, text: `Error: unknown tool "${name}"` }], + isError: true, + }; +}); + +const transports = new Map(); + +const httpServer = http.createServer(async (req, res) => { + try { + if (req.method === "GET" && req.url === "/sse") { + const transport = new SSEServerTransport("/message", res); + transports.set(transport.sessionId, transport); + transport.onclose = () => transports.delete(transport.sessionId); + await mcpServer.connect(transport); + } else if (req.method === "POST" && req.url?.startsWith("/message")) { + const url = new URL(req.url, `http://127.0.0.1:${PORT}`); + const sessionId = url.searchParams.get("sessionId") ?? ""; + const transport = transports.get(sessionId); + if (transport) { + await transport.handlePostMessage(req, res); + } else { + res.writeHead(404, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ error: "session not found" })); + } + } else { + res.writeHead(404).end("not found"); + } + } catch (err) { + console.error("[relay] error:", err); + if (!res.headersSent) res.writeHead(500).end(String(err)); + } +}); + +httpServer.listen(PORT, "127.0.0.1", () => { + console.log(`[relay] server ready on :${PORT}`); + console.log(`[relay] tools: post_message, read_messages, list_pending`); + console.log(`[relay] waiting for connections — Ctrl-C to stop`); +});