da83322200
* wrap follow/unfollow and block/unblock as job queue * create import job to follow in each iteration * make relationship jobs concurrent * replace to job queue if called repeatedly * use addBulk to import * omit stream when importing * fix job caller * use ThinUser instead of User to reduce redis memory consumption * createImportFollowingToDbJobの呼び出し方を変える, 型補強 * Force ThinUser * オブジェクト操作のみのメソッド名はgenerate...Data * Force ThinUser in generateRelationshipJobData * silent bulk unfollow at admin api endpoint --------- Co-authored-by: tamaina <tamaina@hotmail.co.jp>
348 lines
8.7 KiB
TypeScript
348 lines
8.7 KiB
TypeScript
import { Inject, Injectable } from '@nestjs/common';
|
|
import { v4 as uuid } from 'uuid';
|
|
import type { IActivity } from '@/core/activitypub/type.js';
|
|
import type { DriveFile } from '@/models/entities/DriveFile.js';
|
|
import type { Webhook, webhookEventTypes } from '@/models/entities/Webhook.js';
|
|
import type { Config } from '@/config.js';
|
|
import { DI } from '@/di-symbols.js';
|
|
import { bindThis } from '@/decorators.js';
|
|
import type { DbQueue, DeliverQueue, EndedPollNotificationQueue, InboxQueue, ObjectStorageQueue, RelationshipQueue, SystemQueue, WebhookDeliverQueue } from './QueueModule.js';
|
|
import type { DbJobData, RelationshipJobData, ThinUser } from '../queue/types.js';
|
|
import type httpSignature from '@peertube/http-signature';
|
|
import Bull from 'bull';
|
|
|
|
@Injectable()
|
|
export class QueueService {
|
|
constructor(
|
|
@Inject(DI.config)
|
|
private config: Config,
|
|
|
|
@Inject('queue:system') public systemQueue: SystemQueue,
|
|
@Inject('queue:endedPollNotification') public endedPollNotificationQueue: EndedPollNotificationQueue,
|
|
@Inject('queue:deliver') public deliverQueue: DeliverQueue,
|
|
@Inject('queue:inbox') public inboxQueue: InboxQueue,
|
|
@Inject('queue:db') public dbQueue: DbQueue,
|
|
@Inject('queue:relationship') public relationshipQueue: RelationshipQueue,
|
|
@Inject('queue:objectStorage') public objectStorageQueue: ObjectStorageQueue,
|
|
@Inject('queue:webhookDeliver') public webhookDeliverQueue: WebhookDeliverQueue,
|
|
) {}
|
|
|
|
@bindThis
|
|
public deliver(user: ThinUser, content: IActivity | null, to: string | null, isSharedInbox: boolean) {
|
|
if (content == null) return null;
|
|
if (to == null) return null;
|
|
|
|
const data = {
|
|
user: {
|
|
id: user.id,
|
|
},
|
|
content,
|
|
to,
|
|
isSharedInbox,
|
|
};
|
|
|
|
return this.deliverQueue.add(data, {
|
|
attempts: this.config.deliverJobMaxAttempts ?? 12,
|
|
timeout: 1 * 60 * 1000, // 1min
|
|
backoff: {
|
|
type: 'apBackoff',
|
|
},
|
|
removeOnComplete: true,
|
|
removeOnFail: true,
|
|
});
|
|
}
|
|
|
|
@bindThis
|
|
public inbox(activity: IActivity, signature: httpSignature.IParsedSignature) {
|
|
const data = {
|
|
activity: activity,
|
|
signature,
|
|
};
|
|
|
|
return this.inboxQueue.add(data, {
|
|
attempts: this.config.inboxJobMaxAttempts ?? 8,
|
|
timeout: 5 * 60 * 1000, // 5min
|
|
backoff: {
|
|
type: 'apBackoff',
|
|
},
|
|
removeOnComplete: true,
|
|
removeOnFail: true,
|
|
});
|
|
}
|
|
|
|
@bindThis
|
|
public createDeleteDriveFilesJob(user: ThinUser) {
|
|
return this.dbQueue.add('deleteDriveFiles', {
|
|
user: { id: user.id },
|
|
}, {
|
|
removeOnComplete: true,
|
|
removeOnFail: true,
|
|
});
|
|
}
|
|
|
|
@bindThis
|
|
public createExportCustomEmojisJob(user: ThinUser) {
|
|
return this.dbQueue.add('exportCustomEmojis', {
|
|
user: { id: user.id },
|
|
}, {
|
|
removeOnComplete: true,
|
|
removeOnFail: true,
|
|
});
|
|
}
|
|
|
|
@bindThis
|
|
public createExportNotesJob(user: ThinUser) {
|
|
return this.dbQueue.add('exportNotes', {
|
|
user: { id: user.id },
|
|
}, {
|
|
removeOnComplete: true,
|
|
removeOnFail: true,
|
|
});
|
|
}
|
|
|
|
@bindThis
|
|
public createExportFavoritesJob(user: ThinUser) {
|
|
return this.dbQueue.add('exportFavorites', {
|
|
user: { id: user.id },
|
|
}, {
|
|
removeOnComplete: true,
|
|
removeOnFail: true,
|
|
});
|
|
}
|
|
|
|
@bindThis
|
|
public createExportFollowingJob(user: ThinUser, excludeMuting = false, excludeInactive = false) {
|
|
return this.dbQueue.add('exportFollowing', {
|
|
user: { id: user.id },
|
|
excludeMuting,
|
|
excludeInactive,
|
|
}, {
|
|
removeOnComplete: true,
|
|
removeOnFail: true,
|
|
});
|
|
}
|
|
|
|
@bindThis
|
|
public createExportMuteJob(user: ThinUser) {
|
|
return this.dbQueue.add('exportMuting', {
|
|
user: { id: user.id },
|
|
}, {
|
|
removeOnComplete: true,
|
|
removeOnFail: true,
|
|
});
|
|
}
|
|
|
|
@bindThis
|
|
public createExportBlockingJob(user: ThinUser) {
|
|
return this.dbQueue.add('exportBlocking', {
|
|
user: { id: user.id },
|
|
}, {
|
|
removeOnComplete: true,
|
|
removeOnFail: true,
|
|
});
|
|
}
|
|
|
|
@bindThis
|
|
public createExportUserListsJob(user: ThinUser) {
|
|
return this.dbQueue.add('exportUserLists', {
|
|
user: { id: user.id },
|
|
}, {
|
|
removeOnComplete: true,
|
|
removeOnFail: true,
|
|
});
|
|
}
|
|
|
|
@bindThis
|
|
public createImportFollowingJob(user: ThinUser, fileId: DriveFile['id']) {
|
|
return this.dbQueue.add('importFollowing', {
|
|
user: { id: user.id },
|
|
fileId: fileId,
|
|
}, {
|
|
removeOnComplete: true,
|
|
removeOnFail: true,
|
|
});
|
|
}
|
|
|
|
@bindThis
|
|
public createImportFollowingToDbJob(user: ThinUser, targets: string[]) {
|
|
const jobs = targets.map(rel => this.generateToDbJobData('importFollowingToDb', { user, target: rel }));
|
|
return this.dbQueue.addBulk(jobs);
|
|
}
|
|
|
|
@bindThis
|
|
public createImportMutingJob(user: ThinUser, fileId: DriveFile['id']) {
|
|
return this.dbQueue.add('importMuting', {
|
|
user: { id: user.id },
|
|
fileId: fileId,
|
|
}, {
|
|
removeOnComplete: true,
|
|
removeOnFail: true,
|
|
});
|
|
}
|
|
|
|
@bindThis
|
|
public createImportBlockingJob(user: ThinUser, fileId: DriveFile['id']) {
|
|
return this.dbQueue.add('importBlocking', {
|
|
user: { id: user.id },
|
|
fileId: fileId,
|
|
}, {
|
|
removeOnComplete: true,
|
|
removeOnFail: true,
|
|
});
|
|
}
|
|
|
|
@bindThis
|
|
public createImportBlockingToDbJob(user: ThinUser, targets: string[]) {
|
|
const jobs = targets.map(rel => this.generateToDbJobData('importBlockingToDb', { user, target: rel }));
|
|
return this.dbQueue.addBulk(jobs);
|
|
}
|
|
|
|
@bindThis
|
|
private generateToDbJobData<T extends 'importFollowingToDb' | 'importBlockingToDb', D extends DbJobData<T>>(name: T, data: D): {
|
|
name: string,
|
|
data: D,
|
|
opts: Bull.JobOptions,
|
|
} {
|
|
return {
|
|
name,
|
|
data,
|
|
opts: {
|
|
removeOnComplete: true,
|
|
removeOnFail: true,
|
|
},
|
|
};
|
|
}
|
|
|
|
@bindThis
|
|
public createImportUserListsJob(user: ThinUser, fileId: DriveFile['id']) {
|
|
return this.dbQueue.add('importUserLists', {
|
|
user: { id: user.id },
|
|
fileId: fileId,
|
|
}, {
|
|
removeOnComplete: true,
|
|
removeOnFail: true,
|
|
});
|
|
}
|
|
|
|
@bindThis
|
|
public createImportCustomEmojisJob(user: ThinUser, fileId: DriveFile['id']) {
|
|
return this.dbQueue.add('importCustomEmojis', {
|
|
user: { id: user.id },
|
|
fileId: fileId,
|
|
}, {
|
|
removeOnComplete: true,
|
|
removeOnFail: true,
|
|
});
|
|
}
|
|
|
|
@bindThis
|
|
public createDeleteAccountJob(user: ThinUser, opts: { soft?: boolean; } = {}) {
|
|
return this.dbQueue.add('deleteAccount', {
|
|
user: { id: user.id },
|
|
soft: opts.soft,
|
|
}, {
|
|
removeOnComplete: true,
|
|
removeOnFail: true,
|
|
});
|
|
}
|
|
|
|
@bindThis
|
|
public createFollowJob(followings: { from: ThinUser, to: ThinUser, requestId?: string, silent?: boolean }[]) {
|
|
const jobs = followings.map(rel => this.generateRelationshipJobData('follow', rel));
|
|
return this.relationshipQueue.addBulk(jobs);
|
|
}
|
|
|
|
@bindThis
|
|
public createUnfollowJob(followings: { from: ThinUser, to: ThinUser, requestId?: string }[]) {
|
|
const jobs = followings.map(rel => this.generateRelationshipJobData('unfollow', rel));
|
|
return this.relationshipQueue.addBulk(jobs);
|
|
}
|
|
|
|
@bindThis
|
|
public createBlockJob(blockings: { from: ThinUser, to: ThinUser, silent?: boolean }[]) {
|
|
const jobs = blockings.map(rel => this.generateRelationshipJobData('block', rel));
|
|
return this.relationshipQueue.addBulk(jobs);
|
|
}
|
|
|
|
@bindThis
|
|
public createUnblockJob(blockings: { from: ThinUser, to: ThinUser, silent?: boolean }[]) {
|
|
const jobs = blockings.map(rel => this.generateRelationshipJobData('unblock', rel));
|
|
return this.relationshipQueue.addBulk(jobs);
|
|
}
|
|
|
|
@bindThis
|
|
private generateRelationshipJobData(name: 'follow' | 'unfollow' | 'block' | 'unblock', data: RelationshipJobData): {
|
|
name: string,
|
|
data: RelationshipJobData,
|
|
opts: Bull.JobOptions,
|
|
} {
|
|
return {
|
|
name,
|
|
data: {
|
|
from: { id: data.from.id },
|
|
to: { id: data.to.id },
|
|
silent: data.silent,
|
|
requestId: data.requestId,
|
|
},
|
|
opts: {
|
|
removeOnComplete: true,
|
|
removeOnFail: true,
|
|
},
|
|
};
|
|
}
|
|
|
|
@bindThis
|
|
public createDeleteObjectStorageFileJob(key: string) {
|
|
return this.objectStorageQueue.add('deleteFile', {
|
|
key: key,
|
|
}, {
|
|
removeOnComplete: true,
|
|
removeOnFail: true,
|
|
});
|
|
}
|
|
|
|
@bindThis
|
|
public createCleanRemoteFilesJob() {
|
|
return this.objectStorageQueue.add('cleanRemoteFiles', {}, {
|
|
removeOnComplete: true,
|
|
removeOnFail: true,
|
|
});
|
|
}
|
|
|
|
@bindThis
|
|
public webhookDeliver(webhook: Webhook, type: typeof webhookEventTypes[number], content: unknown) {
|
|
const data = {
|
|
type,
|
|
content,
|
|
webhookId: webhook.id,
|
|
userId: webhook.userId,
|
|
to: webhook.url,
|
|
secret: webhook.secret,
|
|
createdAt: Date.now(),
|
|
eventId: uuid(),
|
|
};
|
|
|
|
return this.webhookDeliverQueue.add(data, {
|
|
attempts: 4,
|
|
timeout: 1 * 60 * 1000, // 1min
|
|
backoff: {
|
|
type: 'apBackoff',
|
|
},
|
|
removeOnComplete: true,
|
|
removeOnFail: true,
|
|
});
|
|
}
|
|
|
|
@bindThis
|
|
public destroy() {
|
|
this.deliverQueue.once('cleaned', (jobs, status) => {
|
|
//deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
|
|
});
|
|
this.deliverQueue.clean(0, 'delayed');
|
|
|
|
this.inboxQueue.once('cleaned', (jobs, status) => {
|
|
//inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
|
|
});
|
|
this.inboxQueue.clean(0, 'delayed');
|
|
}
|
|
}
|