iceshrimp/packages/backend/src/queue/processors/deliver.ts
Johann150 85c584f582 server: avoid adding suspended instances to deliver queue
This should reduce the performance hit when adding large numbers of
instances to the deliver queue by making the check for suspended and
dead instances a bulk operation.

Changelog: Changed
Reviewed-on: https://akkoma.dev/FoundKeyGang/FoundKey/pulls/215
2022-11-09 17:19:29 -08:00

77 lines
2.4 KiB
TypeScript

import { URL } from 'node:url';
import request from '@/remote/activitypub/request.js';
import { registerOrFetchInstanceDoc } from '@/services/register-or-fetch-instance-doc.js';
import Logger from '@/services/logger.js';
import { Instances } from '@/models/index.js';
import { apRequestChart, federationChart, instanceChart } from '@/services/chart/index.js';
import { fetchInstanceMetadata } from '@/services/fetch-instance-metadata.js';
import { toPuny } from '@/misc/convert-host.js';
import { StatusError } from '@/misc/fetch.js';
import { shouldSkipInstance } from '@/misc/skipped-instances.js';
import type { DeliverJobData } from '@/queue/types.js';
import type Bull from 'bull';
const logger = new Logger('deliver');
let latest: string | null = null;
export default async (job: Bull.Job<DeliverJobData>) => {
const { host } = new URL(job.data.to);
const puny = toPuny(host);
if (await shouldSkipInstance(puny)) return 'skip';
try {
if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) {
logger.debug(`delivering ${latest}`);
}
await request(job.data.user, job.data.to, job.data.content);
// Update stats
registerOrFetchInstanceDoc(host).then(i => {
Instances.update(i.id, {
latestRequestSentAt: new Date(),
latestStatus: 200,
lastCommunicatedAt: new Date(),
isNotResponding: false,
});
fetchInstanceMetadata(i);
instanceChart.requestSent(i.host, true);
apRequestChart.deliverSucc();
federationChart.deliverd(i.host, true);
});
return 'Success';
} catch (res) {
// Update stats
registerOrFetchInstanceDoc(host).then(i => {
Instances.update(i.id, {
latestRequestSentAt: new Date(),
latestStatus: res instanceof StatusError ? res.statusCode : null,
isNotResponding: true,
});
instanceChart.requestSent(i.host, false);
apRequestChart.deliverFail();
federationChart.deliverd(i.host, false);
});
if (res instanceof StatusError) {
// 4xx
if (res.isClientError) {
// HTTPステータスコード4xxはクライアントエラーであり、それはつまり
// 何回再送しても成功することはないということなのでエラーにはしないでおく
return `${res.statusCode} ${res.statusMessage}`;
}
// 5xx etc.
throw new Error(`${res.statusCode} ${res.statusMessage}`);
} else {
// DNS error, socket error, timeout ...
throw res;
}
}
};