From e0fefc986f0e564ee2d7d7117b2d997e05b22b22 Mon Sep 17 00:00:00 2001 From: Laura Hausmann Date: Fri, 29 Sep 2023 23:29:14 +0200 Subject: [PATCH] [mastodon-client] Respect idempotency-key for new posts --- .../server/api/mastodon/endpoints/status.ts | 61 ++++++++++++++++--- 1 file changed, 52 insertions(+), 9 deletions(-) diff --git a/packages/backend/src/server/api/mastodon/endpoints/status.ts b/packages/backend/src/server/api/mastodon/endpoints/status.ts index 5331d2617..eacec8ffa 100644 --- a/packages/backend/src/server/api/mastodon/endpoints/status.ts +++ b/packages/backend/src/server/api/mastodon/endpoints/status.ts @@ -13,11 +13,13 @@ import { UserHelpers } from "@/server/api/mastodon/helpers/user.js"; import { convertPaginationArgsIds, limitToInt, normalizeUrlQuery } from "@/server/api/mastodon/endpoints/timeline.js"; import { PaginationHelpers } from "@/server/api/mastodon/helpers/pagination.js"; import { UserConverter } from "@/server/api/mastodon/converters/user.js"; +import { Cache } from "@/misc/cache.js"; +import { redisClient } from "@/db/redis.js"; +import AsyncLock from "async-lock"; +import { ILocalUser } from "@/models/entities/user.js"; -function normalizeQuery(data: any) { - const str = querystring.stringify(data); - return qs.parse(str); -} +const postIdempotencyCache = new Cache<{status?: MastodonEntity.Status}>('postIdempotencyCache', 60 * 60); +const postIdempotencyLocks = new AsyncLock(); export function setupEndpointsStatus(router: Router): void { router.post("/v1/statuses", async (ctx) => { @@ -30,14 +32,26 @@ export function setupEndpointsStatus(router: Router): void { return; } + const key = getIdempotencyKey(ctx.headers, user); + if (key !== null) { + const result = await getFromIdempotencyCache(key); + + if (result) { + ctx.body = result; + return; + } + } + let request = NoteHelpers.normalizeComposeOptions(ctx.request.body); - const note = await NoteHelpers.createNote(request, user) - .then(p => NoteConverter.encode(p, user)); - ctx.body = convertStatus(note); + ctx.body = await NoteHelpers.createNote(request, user) + .then(p => NoteConverter.encode(p, user)) + .then(p => convertStatus(p)); + + if (key !== null) postIdempotencyCache.set(key, {status: ctx.body}); } catch (e: any) { console.error(e); - ctx.status = 401; - ctx.body = e.response.data; + ctx.status = 500; + ctx.body = { error: e.message }; } }); router.put("/v1/statuses/:id", async (ctx) => { @@ -621,3 +635,32 @@ export function setupEndpointsStatus(router: Router): void { }, ); } + +function normalizeQuery(data: any) { + const str = querystring.stringify(data); + return qs.parse(str); +} + +function getIdempotencyKey(headers: any, user: ILocalUser): string | null { + if (headers["idempotency-key"] === undefined || headers["idempotency-key"] === null) return null; + return `${user.id}-${Array.isArray(headers["idempotency-key"]) ? headers["idempotency-key"].at(-1)! : headers["idempotency-key"]}`; +} + +async function getFromIdempotencyCache(key: string): Promise { + return postIdempotencyLocks.acquire(key, async (): Promise => { + if (await postIdempotencyCache.get(key) !== undefined) { + let i = 5; + while ((await postIdempotencyCache.get(key))?.status === undefined) { + if (++i > 5) throw new Error('Post is duplicate but unable to resolve original'); + await new Promise((resolve) => { + setTimeout(resolve, 500); + }); + } + + return (await postIdempotencyCache.get(key))?.status; + } else { + await postIdempotencyCache.set(key, {}); + return undefined; + } + }); +}