perf(server): cache blocking
This commit is contained in:
parent
b7522f69e7
commit
0c12e80106
8 changed files with 99 additions and 65 deletions
|
@ -10,10 +10,9 @@ import { isUserRelated } from '@/misc/is-user-related.js';
|
|||
import { GlobalEventService } from '@/core/GlobalEventService.js';
|
||||
import { PushNotificationService } from '@/core/PushNotificationService.js';
|
||||
import * as Acct from '@/misc/acct.js';
|
||||
import { Cache } from '@/misc/cache.js';
|
||||
import type { Packed } from '@/misc/schema.js';
|
||||
import { DI } from '@/di-symbols.js';
|
||||
import type { MutingsRepository, BlockingsRepository, NotesRepository, AntennaNotesRepository, AntennasRepository, UserGroupJoiningsRepository, UserListJoiningsRepository } from '@/models/index.js';
|
||||
import type { MutingsRepository, NotesRepository, AntennaNotesRepository, AntennasRepository, UserGroupJoiningsRepository, UserListJoiningsRepository } from '@/models/index.js';
|
||||
import { UtilityService } from '@/core/UtilityService.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import { StreamMessages } from '@/server/api/stream/types.js';
|
||||
|
@ -23,7 +22,6 @@ import type { OnApplicationShutdown } from '@nestjs/common';
|
|||
export class AntennaService implements OnApplicationShutdown {
|
||||
private antennasFetched: boolean;
|
||||
private antennas: Antenna[];
|
||||
private blockingCache: Cache<User['id'][]>;
|
||||
|
||||
constructor(
|
||||
@Inject(DI.redisSubscriber)
|
||||
|
@ -32,9 +30,6 @@ export class AntennaService implements OnApplicationShutdown {
|
|||
@Inject(DI.mutingsRepository)
|
||||
private mutingsRepository: MutingsRepository,
|
||||
|
||||
@Inject(DI.blockingsRepository)
|
||||
private blockingsRepository: BlockingsRepository,
|
||||
|
||||
@Inject(DI.notesRepository)
|
||||
private notesRepository: NotesRepository,
|
||||
|
||||
|
@ -59,7 +54,6 @@ export class AntennaService implements OnApplicationShutdown {
|
|||
) {
|
||||
this.antennasFetched = false;
|
||||
this.antennas = [];
|
||||
this.blockingCache = new Cache<User['id'][]>(1000 * 60 * 5);
|
||||
|
||||
this.redisSubscriber.on('message', this.onRedisMessage);
|
||||
}
|
||||
|
@ -156,10 +150,6 @@ export class AntennaService implements OnApplicationShutdown {
|
|||
if (note.visibility === 'specified') return false;
|
||||
if (note.visibility === 'followers') return false;
|
||||
|
||||
// アンテナ作成者がノート作成者にブロックされていたらスキップ
|
||||
const blockings = await this.blockingCache.fetch(noteUser.id, () => this.blockingsRepository.findBy({ blockerId: noteUser.id }).then(res => res.map(x => x.blockeeId)));
|
||||
if (blockings.some(blocking => blocking === antenna.userId)) return false;
|
||||
|
||||
if (!antenna.withReplies && note.replyId != null) return false;
|
||||
|
||||
if (antenna.src === 'home') {
|
||||
|
|
|
@ -1,17 +1,17 @@
|
|||
import { Inject, Injectable } from '@nestjs/common';
|
||||
import { Not } from 'typeorm';
|
||||
import { DI } from '@/di-symbols.js';
|
||||
import type { NotesRepository, UsersRepository, BlockingsRepository, PollsRepository, PollVotesRepository } from '@/models/index.js';
|
||||
import type { NotesRepository, UsersRepository, PollsRepository, PollVotesRepository } from '@/models/index.js';
|
||||
import type { Note } from '@/models/entities/Note.js';
|
||||
import { RelayService } from '@/core/RelayService.js';
|
||||
import type { CacheableUser } from '@/models/entities/User.js';
|
||||
import { IdService } from '@/core/IdService.js';
|
||||
import { GlobalEventService } from '@/core/GlobalEventService.js';
|
||||
import { CreateNotificationService } from '@/core/CreateNotificationService.js';
|
||||
import { ApRendererService } from '@/core/activitypub/ApRendererService.js';
|
||||
import { UserEntityService } from '@/core/entities/UserEntityService.js';
|
||||
import { ApDeliverManagerService } from '@/core/activitypub/ApDeliverManagerService.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import { UserBlockingService } from '@/core/UserBlockingService.js';
|
||||
|
||||
@Injectable()
|
||||
export class PollService {
|
||||
|
@ -28,14 +28,11 @@ export class PollService {
|
|||
@Inject(DI.pollVotesRepository)
|
||||
private pollVotesRepository: PollVotesRepository,
|
||||
|
||||
@Inject(DI.blockingsRepository)
|
||||
private blockingsRepository: BlockingsRepository,
|
||||
|
||||
private userEntityService: UserEntityService,
|
||||
private idService: IdService,
|
||||
private relayService: RelayService,
|
||||
private globalEventService: GlobalEventService,
|
||||
private createNotificationService: CreateNotificationService,
|
||||
private userBlockingService: UserBlockingService,
|
||||
private apRendererService: ApRendererService,
|
||||
private apDeliverManagerService: ApDeliverManagerService,
|
||||
) {
|
||||
|
@ -52,11 +49,8 @@ export class PollService {
|
|||
|
||||
// Check blocking
|
||||
if (note.userId !== user.id) {
|
||||
const block = await this.blockingsRepository.findOneBy({
|
||||
blockerId: note.userId,
|
||||
blockeeId: user.id,
|
||||
});
|
||||
if (block) {
|
||||
const blocked = await this.userBlockingService.checkBlocked(note.userId, user.id);
|
||||
if (blocked) {
|
||||
throw new Error('blocked');
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,8 @@ import { UserEntityService } from '@/core/entities/UserEntityService.js';
|
|||
import { ApRendererService } from '@/core/activitypub/ApRendererService.js';
|
||||
import { MetaService } from '@/core/MetaService.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import { UtilityService } from './UtilityService.js';
|
||||
import { UtilityService } from '@/core/UtilityService.js';
|
||||
import { UserBlockingService } from '@/core/UserBlockingService.js';
|
||||
|
||||
const legacies: Record<string, string> = {
|
||||
'like': '👍',
|
||||
|
@ -73,6 +74,7 @@ export class ReactionService {
|
|||
private metaService: MetaService,
|
||||
private userEntityService: UserEntityService,
|
||||
private noteEntityService: NoteEntityService,
|
||||
private userBlockingService: UserBlockingService,
|
||||
private idService: IdService,
|
||||
private globalEventService: GlobalEventService,
|
||||
private apRendererService: ApRendererService,
|
||||
|
@ -86,11 +88,8 @@ export class ReactionService {
|
|||
public async create(user: { id: User['id']; host: User['host']; isBot: User['isBot'] }, note: Note, reaction?: string) {
|
||||
// Check blocking
|
||||
if (note.userId !== user.id) {
|
||||
const block = await this.blockingsRepository.findOneBy({
|
||||
blockerId: note.userId,
|
||||
blockeeId: user.id,
|
||||
});
|
||||
if (block) {
|
||||
const blocked = await this.userBlockingService.checkBlocked(note.userId, user.id);
|
||||
if (blocked) {
|
||||
throw new IdentifiableError('e70412a4-7197-4726-8e74-f3e0deb92aa7');
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
|
||||
import { Inject, Injectable } from '@nestjs/common';
|
||||
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
|
||||
import Redis from 'ioredis';
|
||||
import { IdService } from '@/core/IdService.js';
|
||||
import type { CacheableUser, User } from '@/models/entities/User.js';
|
||||
import type { Blocking } from '@/models/entities/Blocking.js';
|
||||
|
@ -7,7 +8,6 @@ import { QueueService } from '@/core/QueueService.js';
|
|||
import { GlobalEventService } from '@/core/GlobalEventService.js';
|
||||
import PerUserFollowingChart from '@/core/chart/charts/per-user-following.js';
|
||||
import { DI } from '@/di-symbols.js';
|
||||
import logger from '@/logger.js';
|
||||
import type { UsersRepository, FollowingsRepository, FollowRequestsRepository, BlockingsRepository, UserListsRepository, UserListJoiningsRepository } from '@/models/index.js';
|
||||
import Logger from '@/logger.js';
|
||||
import { UserEntityService } from '@/core/entities/UserEntityService.js';
|
||||
|
@ -15,12 +15,20 @@ import { ApRendererService } from '@/core/activitypub/ApRendererService.js';
|
|||
import { LoggerService } from '@/core/LoggerService.js';
|
||||
import { WebhookService } from '@/core/WebhookService.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import { Cache } from '@/misc/cache.js';
|
||||
import { StreamMessages } from '@/server/api/stream/types.js';
|
||||
|
||||
@Injectable()
|
||||
export class UserBlockingService {
|
||||
export class UserBlockingService implements OnApplicationShutdown {
|
||||
private logger: Logger;
|
||||
|
||||
// キーがユーザーIDで、値がそのユーザーがブロックしているユーザーのIDのリストなキャッシュ
|
||||
private blockingsByUserIdCache: Cache<User['id'][]>;
|
||||
|
||||
constructor(
|
||||
@Inject(DI.redisSubscriber)
|
||||
private redisSubscriber: Redis.Redis,
|
||||
|
||||
@Inject(DI.usersRepository)
|
||||
private usersRepository: UsersRepository,
|
||||
|
||||
|
@ -49,6 +57,37 @@ export class UserBlockingService {
|
|||
private loggerService: LoggerService,
|
||||
) {
|
||||
this.logger = this.loggerService.getLogger('user-block');
|
||||
|
||||
this.blockingsByUserIdCache = new Cache<User['id'][]>(Infinity);
|
||||
|
||||
this.redisSubscriber.on('message', this.onMessage);
|
||||
}
|
||||
|
||||
@bindThis
|
||||
private async onMessage(_: string, data: string): Promise<void> {
|
||||
const obj = JSON.parse(data);
|
||||
|
||||
if (obj.channel === 'internal') {
|
||||
const { type, body } = obj.message as StreamMessages['internal']['payload'];
|
||||
switch (type) {
|
||||
case 'blockingCreated': {
|
||||
const cached = this.blockingsByUserIdCache.get(body.blockerId);
|
||||
if (cached) {
|
||||
this.blockingsByUserIdCache.set(body.blockerId, [...cached, ...[body.blockeeId]]);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'blockingDeleted': {
|
||||
const cached = this.blockingsByUserIdCache.get(body.blockerId);
|
||||
if (cached) {
|
||||
this.blockingsByUserIdCache.set(body.blockerId, cached.filter(x => x !== body.blockeeId));
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@bindThis
|
||||
|
@ -72,6 +111,11 @@ export class UserBlockingService {
|
|||
|
||||
await this.blockingsRepository.insert(blocking);
|
||||
|
||||
this.globalEventService.publishInternalEvent('blockingCreated', {
|
||||
blockerId: blocker.id,
|
||||
blockeeId: blockee.id,
|
||||
});
|
||||
|
||||
if (this.userEntityService.isLocalUser(blocker) && this.userEntityService.isRemoteUser(blockee)) {
|
||||
const content = this.apRendererService.renderActivity(this.apRendererService.renderBlock(blocking));
|
||||
this.queueService.deliver(blocker, content, blockee.inbox);
|
||||
|
@ -210,10 +254,31 @@ export class UserBlockingService {
|
|||
|
||||
await this.blockingsRepository.delete(blocking.id);
|
||||
|
||||
this.globalEventService.publishInternalEvent('blockingDeleted', {
|
||||
blockerId: blocker.id,
|
||||
blockeeId: blockee.id,
|
||||
});
|
||||
|
||||
// deliver if remote bloking
|
||||
if (this.userEntityService.isLocalUser(blocker) && this.userEntityService.isRemoteUser(blockee)) {
|
||||
const content = this.apRendererService.renderActivity(this.apRendererService.renderUndo(this.apRendererService.renderBlock(blocking), blocker));
|
||||
this.queueService.deliver(blocker, content, blockee.inbox);
|
||||
}
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public async checkBlocked(blockerId: User['id'], blockeeId: User['id']): Promise<boolean> {
|
||||
const blockedUserIds = await this.blockingsByUserIdCache.fetch(blockerId, () => this.blockingsRepository.find({
|
||||
where: {
|
||||
blockerId,
|
||||
},
|
||||
select: ['blockeeId'],
|
||||
}).then(records => records.map(record => record.blockeeId)));
|
||||
return blockedUserIds.includes(blockeeId);
|
||||
}
|
||||
|
||||
@bindThis
|
||||
public onApplicationShutdown(signal?: string | undefined) {
|
||||
this.redisSubscriber.off('message', this.onMessage);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,10 +12,11 @@ import { FederatedInstanceService } from '@/core/FederatedInstanceService.js';
|
|||
import { WebhookService } from '@/core/WebhookService.js';
|
||||
import { CreateNotificationService } from '@/core/CreateNotificationService.js';
|
||||
import { DI } from '@/di-symbols.js';
|
||||
import type { BlockingsRepository, FollowingsRepository, FollowRequestsRepository, InstancesRepository, UserProfilesRepository, UsersRepository } from '@/models/index.js';
|
||||
import type { FollowingsRepository, FollowRequestsRepository, InstancesRepository, UserProfilesRepository, UsersRepository } from '@/models/index.js';
|
||||
import { UserEntityService } from '@/core/entities/UserEntityService.js';
|
||||
import { ApRendererService } from '@/core/activitypub/ApRendererService.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import { UserBlockingService } from '@/core/UserBlockingService.js';
|
||||
import Logger from '../logger.js';
|
||||
|
||||
const logger = new Logger('following/create');
|
||||
|
@ -48,13 +49,11 @@ export class UserFollowingService {
|
|||
@Inject(DI.followRequestsRepository)
|
||||
private followRequestsRepository: FollowRequestsRepository,
|
||||
|
||||
@Inject(DI.blockingsRepository)
|
||||
private blockingsRepository: BlockingsRepository,
|
||||
|
||||
@Inject(DI.instancesRepository)
|
||||
private instancesRepository: InstancesRepository,
|
||||
|
||||
private userEntityService: UserEntityService,
|
||||
private userBlockingService: UserBlockingService,
|
||||
private idService: IdService,
|
||||
private queueService: QueueService,
|
||||
private globalEventService: GlobalEventService,
|
||||
|
@ -62,7 +61,6 @@ export class UserFollowingService {
|
|||
private federatedInstanceService: FederatedInstanceService,
|
||||
private webhookService: WebhookService,
|
||||
private apRendererService: ApRendererService,
|
||||
private globalEventService: GlobalEventService,
|
||||
private perUserFollowingChart: PerUserFollowingChart,
|
||||
private instanceChart: InstanceChart,
|
||||
) {
|
||||
|
@ -77,14 +75,8 @@ export class UserFollowingService {
|
|||
|
||||
// check blocking
|
||||
const [blocking, blocked] = await Promise.all([
|
||||
this.blockingsRepository.findOneBy({
|
||||
blockerId: follower.id,
|
||||
blockeeId: followee.id,
|
||||
}),
|
||||
this.blockingsRepository.findOneBy({
|
||||
blockerId: followee.id,
|
||||
blockeeId: follower.id,
|
||||
}),
|
||||
this.userBlockingService.checkBlocked(follower.id, followee.id),
|
||||
this.userBlockingService.checkBlocked(followee.id, follower.id),
|
||||
]);
|
||||
|
||||
if (this.userEntityService.isRemoteUser(follower) && this.userEntityService.isLocalUser(followee) && blocked) {
|
||||
|
@ -94,7 +86,7 @@ export class UserFollowingService {
|
|||
return;
|
||||
} else if (this.userEntityService.isRemoteUser(follower) && this.userEntityService.isLocalUser(followee) && blocking) {
|
||||
// リモートフォローを受けてブロックされているはずの場合だったら、ブロック解除しておく。
|
||||
await this.blockingsRepository.delete(blocking.id);
|
||||
await this.userBlockingService.unblock(follower, followee);
|
||||
} else {
|
||||
// それ以外は単純に例外
|
||||
if (blocking != null) throw new IdentifiableError('710e8fb0-b8c3-4922-be49-d5d93d8e6a6e', 'blocking');
|
||||
|
@ -357,14 +349,8 @@ export class UserFollowingService {
|
|||
|
||||
// check blocking
|
||||
const [blocking, blocked] = await Promise.all([
|
||||
this.blockingsRepository.findOneBy({
|
||||
blockerId: follower.id,
|
||||
blockeeId: followee.id,
|
||||
}),
|
||||
this.blockingsRepository.findOneBy({
|
||||
blockerId: followee.id,
|
||||
blockeeId: follower.id,
|
||||
}),
|
||||
this.userBlockingService.checkBlocked(follower.id, followee.id),
|
||||
this.userBlockingService.checkBlocked(followee.id, follower.id),
|
||||
]);
|
||||
|
||||
if (blocking != null) throw new Error('blocking');
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
import { bindThis } from '@/decorators.js';
|
||||
|
||||
// TODO: メモリ節約のためあまり参照されないキーを定期的に削除できるようにする?
|
||||
|
||||
export class Cache<T> {
|
||||
public cache: Map<string | null, { date: number; value: T; }>;
|
||||
private lifetime: number;
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import { Not } from 'typeorm';
|
||||
import { Inject, Injectable } from '@nestjs/common';
|
||||
import type { UsersRepository, BlockingsRepository, PollsRepository, PollVotesRepository } from '@/models/index.js';
|
||||
import type { UsersRepository, PollsRepository, PollVotesRepository } from '@/models/index.js';
|
||||
import type { IRemoteUser } from '@/models/entities/User.js';
|
||||
import { IdService } from '@/core/IdService.js';
|
||||
import { Endpoint } from '@/server/api/endpoint-base.js';
|
||||
|
@ -11,6 +11,7 @@ import { ApRendererService } from '@/core/activitypub/ApRendererService.js';
|
|||
import { GlobalEventService } from '@/core/GlobalEventService.js';
|
||||
import { CreateNotificationService } from '@/core/CreateNotificationService.js';
|
||||
import { DI } from '@/di-symbols.js';
|
||||
import { UserBlockingService } from '@/core/UserBlockingService.js';
|
||||
import { ApiError } from '../../../error.js';
|
||||
|
||||
export const meta = {
|
||||
|
@ -77,9 +78,6 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
|
|||
@Inject(DI.usersRepository)
|
||||
private usersRepository: UsersRepository,
|
||||
|
||||
@Inject(DI.blockingsRepository)
|
||||
private blockingsRepository: BlockingsRepository,
|
||||
|
||||
@Inject(DI.pollsRepository)
|
||||
private pollsRepository: PollsRepository,
|
||||
|
||||
|
@ -93,6 +91,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
|
|||
private apRendererService: ApRendererService,
|
||||
private globalEventService: GlobalEventService,
|
||||
private createNotificationService: CreateNotificationService,
|
||||
private userBlockingService: UserBlockingService,
|
||||
) {
|
||||
super(meta, paramDef, async (ps, me) => {
|
||||
const createdAt = new Date();
|
||||
|
@ -109,11 +108,8 @@ export default class extends Endpoint<typeof meta, typeof paramDef> {
|
|||
|
||||
// Check blocking
|
||||
if (note.userId !== me.id) {
|
||||
const block = await this.blockingsRepository.findOneBy({
|
||||
blockerId: note.userId,
|
||||
blockeeId: me.id,
|
||||
});
|
||||
if (block) {
|
||||
const blocked = await this.userBlockingService.checkBlocked(note.userId, me.id);
|
||||
if (blocked) {
|
||||
throw new ApiError(meta.errors.youHaveBeenBlocked);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,8 @@ export interface InternalStreamTypes {
|
|||
remoteUserUpdated: { id: User['id']; };
|
||||
follow: { followerId: User['id']; followeeId: User['id']; };
|
||||
unfollow: { followerId: User['id']; followeeId: User['id']; };
|
||||
blockingCreated: { blockerId: User['id']; blockeeId: User['id']; };
|
||||
blockingDeleted: { blockerId: User['id']; blockeeId: User['id']; };
|
||||
policiesUpdated: Role['policies'];
|
||||
roleCreated: Role;
|
||||
roleDeleted: Role;
|
||||
|
|
Loading…
Reference in a new issue