refactoring

Resolve #7779
This commit is contained in:
syuilo
2021-11-12 02:02:25 +09:00
parent 037837b551
commit 0e4a111f81
1714 changed files with 20803 additions and 11751 deletions

View File

@ -0,0 +1,15 @@
import * as Bull from 'bull';
export function getJobInfo(job: Bull.Job, increment = false) {
const age = Date.now() - job.timestamp;
const formated = age > 60000 ? `${Math.floor(age / 1000 / 60)}m`
: age > 10000 ? `${Math.floor(age / 1000)}s`
: `${age}ms`;
// onActiveとかonCompletedのattemptsMadeがなぜか0始まりなのでインクリメントする
const currentAttempts = job.attemptsMade + (increment ? 1 : 0);
const maxAttempts = job.opts ? job.opts.attempts : 0;
return `id=${job.id} attempts=${currentAttempts}/${maxAttempts} age=${formated}`;
}

View File

@ -0,0 +1,255 @@
import * as httpSignature from 'http-signature';
import config from '@/config/index';
import { envOption } from '../env';
import processDeliver from './processors/deliver';
import processInbox from './processors/inbox';
import processDb from './processors/db/index';
import procesObjectStorage from './processors/object-storage/index';
import { queueLogger } from './logger';
import { DriveFile } from '@/models/entities/drive-file';
import { getJobInfo } from './get-job-info';
import { systemQueue, dbQueue, deliverQueue, inboxQueue, objectStorageQueue } from './queues';
import { ThinUser } from './types';
import { IActivity } from '@/remote/activitypub/type';
function renderError(e: Error): any {
return {
stack: e?.stack,
message: e?.message,
name: e?.name
};
}
const systemLogger = queueLogger.createSubLogger('system');
const deliverLogger = queueLogger.createSubLogger('deliver');
const inboxLogger = queueLogger.createSubLogger('inbox');
const dbLogger = queueLogger.createSubLogger('db');
const objectStorageLogger = queueLogger.createSubLogger('objectStorage');
systemQueue
.on('waiting', (jobId) => systemLogger.debug(`waiting id=${jobId}`))
.on('active', (job) => systemLogger.debug(`active id=${job.id}`))
.on('completed', (job, result) => systemLogger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err) => systemLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }))
.on('error', (job: any, err: Error) => systemLogger.error(`error ${err}`, { job, e: renderError(err) }))
.on('stalled', (job) => systemLogger.warn(`stalled id=${job.id}`));
deliverQueue
.on('waiting', (jobId) => deliverLogger.debug(`waiting id=${jobId}`))
.on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
.on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
.on('failed', (job, err) => deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`))
.on('error', (job: any, err: Error) => deliverLogger.error(`error ${err}`, { job, e: renderError(err) }))
.on('stalled', (job) => deliverLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
inboxQueue
.on('waiting', (jobId) => inboxLogger.debug(`waiting id=${jobId}`))
.on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`))
.on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`))
.on('failed', (job, err) => inboxLogger.warn(`failed(${err}) ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`, { job, e: renderError(err) }))
.on('error', (job: any, err: Error) => inboxLogger.error(`error ${err}`, { job, e: renderError(err) }))
.on('stalled', (job) => inboxLogger.warn(`stalled ${getJobInfo(job)} activity=${job.data.activity ? job.data.activity.id : 'none'}`));
dbQueue
.on('waiting', (jobId) => dbLogger.debug(`waiting id=${jobId}`))
.on('active', (job) => dbLogger.debug(`active id=${job.id}`))
.on('completed', (job, result) => dbLogger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err) => dbLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }))
.on('error', (job: any, err: Error) => dbLogger.error(`error ${err}`, { job, e: renderError(err) }))
.on('stalled', (job) => dbLogger.warn(`stalled id=${job.id}`));
objectStorageQueue
.on('waiting', (jobId) => objectStorageLogger.debug(`waiting id=${jobId}`))
.on('active', (job) => objectStorageLogger.debug(`active id=${job.id}`))
.on('completed', (job, result) => objectStorageLogger.debug(`completed(${result}) id=${job.id}`))
.on('failed', (job, err) => objectStorageLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }))
.on('error', (job: any, err: Error) => objectStorageLogger.error(`error ${err}`, { job, e: renderError(err) }))
.on('stalled', (job) => objectStorageLogger.warn(`stalled id=${job.id}`));
export function deliver(user: ThinUser, content: unknown, to: string | null) {
if (content == null) return null;
if (to == null) return null;
const data = {
user: {
id: user.id
},
content,
to
};
return deliverQueue.add(data, {
attempts: config.deliverJobMaxAttempts || 12,
timeout: 1 * 60 * 1000, // 1min
backoff: {
type: 'apBackoff'
},
removeOnComplete: true,
removeOnFail: true
});
}
export function inbox(activity: IActivity, signature: httpSignature.IParsedSignature) {
const data = {
activity: activity,
signature
};
return inboxQueue.add(data, {
attempts: config.inboxJobMaxAttempts || 8,
timeout: 5 * 60 * 1000, // 5min
backoff: {
type: 'apBackoff'
},
removeOnComplete: true,
removeOnFail: true
});
}
export function createDeleteDriveFilesJob(user: ThinUser) {
return dbQueue.add('deleteDriveFiles', {
user: user
}, {
removeOnComplete: true,
removeOnFail: true
});
}
export function createExportNotesJob(user: ThinUser) {
return dbQueue.add('exportNotes', {
user: user
}, {
removeOnComplete: true,
removeOnFail: true
});
}
export function createExportFollowingJob(user: ThinUser) {
return dbQueue.add('exportFollowing', {
user: user
}, {
removeOnComplete: true,
removeOnFail: true
});
}
export function createExportMuteJob(user: ThinUser) {
return dbQueue.add('exportMute', {
user: user
}, {
removeOnComplete: true,
removeOnFail: true
});
}
export function createExportBlockingJob(user: ThinUser) {
return dbQueue.add('exportBlocking', {
user: user
}, {
removeOnComplete: true,
removeOnFail: true
});
}
export function createExportUserListsJob(user: ThinUser) {
return dbQueue.add('exportUserLists', {
user: user
}, {
removeOnComplete: true,
removeOnFail: true
});
}
export function createImportFollowingJob(user: ThinUser, fileId: DriveFile['id']) {
return dbQueue.add('importFollowing', {
user: user,
fileId: fileId
}, {
removeOnComplete: true,
removeOnFail: true
});
}
export function createImportMutingJob(user: ThinUser, fileId: DriveFile['id']) {
return dbQueue.add('importMuting', {
user: user,
fileId: fileId
}, {
removeOnComplete: true,
removeOnFail: true
});
}
export function createImportBlockingJob(user: ThinUser, fileId: DriveFile['id']) {
return dbQueue.add('importBlocking', {
user: user,
fileId: fileId
}, {
removeOnComplete: true,
removeOnFail: true
});
}
export function createImportUserListsJob(user: ThinUser, fileId: DriveFile['id']) {
return dbQueue.add('importUserLists', {
user: user,
fileId: fileId
}, {
removeOnComplete: true,
removeOnFail: true
});
}
export function createDeleteAccountJob(user: ThinUser, opts: { soft?: boolean; } = {}) {
return dbQueue.add('deleteAccount', {
user: user,
soft: opts.soft
}, {
removeOnComplete: true,
removeOnFail: true
});
}
export function createDeleteObjectStorageFileJob(key: string) {
return objectStorageQueue.add('deleteFile', {
key: key
}, {
removeOnComplete: true,
removeOnFail: true
});
}
export function createCleanRemoteFilesJob() {
return objectStorageQueue.add('cleanRemoteFiles', {}, {
removeOnComplete: true,
removeOnFail: true
});
}
export default function() {
if (envOption.onlyServer) return;
deliverQueue.process(config.deliverJobConcurrency || 128, processDeliver);
inboxQueue.process(config.inboxJobConcurrency || 16, processInbox);
processDb(dbQueue);
procesObjectStorage(objectStorageQueue);
systemQueue.add('resyncCharts', {
}, {
repeat: { cron: '0 0 * * *' }
});
}
export function destroy() {
deliverQueue.once('cleaned', (jobs, status) => {
deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
});
deliverQueue.clean(0, 'delayed');
inboxQueue.once('cleaned', (jobs, status) => {
inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
});
inboxQueue.clean(0, 'delayed');
}

View File

@ -0,0 +1,33 @@
import * as Bull from 'bull';
import config from '@/config/index';
export function initialize<T>(name: string, limitPerSec = -1) {
return new Bull<T>(name, {
redis: {
port: config.redis.port,
host: config.redis.host,
password: config.redis.pass,
db: config.redis.db || 0,
},
prefix: config.redis.prefix ? `${config.redis.prefix}:queue` : 'queue',
limiter: limitPerSec > 0 ? {
max: limitPerSec,
duration: 1000
} : undefined,
settings: {
backoffStrategies: {
apBackoff
}
}
});
}
// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019
function apBackoff(attemptsMade: number, err: Error) {
const baseDelay = 60 * 1000; // 1min
const maxBackoff = 8 * 60 * 60 * 1000; // 8hours
let backoff = (Math.pow(2, attemptsMade) - 1) * baseDelay;
backoff = Math.min(backoff, maxBackoff);
backoff += Math.round(backoff * Math.random() * 0.2);
return backoff;
}

View File

@ -0,0 +1,3 @@
import Logger from '@/services/logger';
export const queueLogger = new Logger('queue', 'orange');

View File

@ -0,0 +1,94 @@
import * as Bull from 'bull';
import { queueLogger } from '../../logger';
import { DriveFiles, Notes, UserProfiles, Users } from '@/models/index';
import { DbUserDeleteJobData } from '@/queue/types';
import { Note } from '@/models/entities/note';
import { DriveFile } from '@/models/entities/drive-file';
import { MoreThan } from 'typeorm';
import { deleteFileSync } from '@/services/drive/delete-file';
import { sendEmail } from '@/services/send-email';
const logger = queueLogger.createSubLogger('delete-account');
export async function deleteAccount(job: Bull.Job<DbUserDeleteJobData>): Promise<string | void> {
logger.info(`Deleting account of ${job.data.user.id} ...`);
const user = await Users.findOne(job.data.user.id);
if (user == null) {
return;
}
{ // Delete notes
let cursor: Note['id'] | null = null;
while (true) {
const notes = await Notes.find({
where: {
userId: user.id,
...(cursor ? { id: MoreThan(cursor) } : {})
},
take: 100,
order: {
id: 1
}
});
if (notes.length === 0) {
break;
}
cursor = notes[notes.length - 1].id;
await Notes.delete(notes.map(note => note.id));
}
logger.succ(`All of notes deleted`);
}
{ // Delete files
let cursor: DriveFile['id'] | null = null;
while (true) {
const files = await DriveFiles.find({
where: {
userId: user.id,
...(cursor ? { id: MoreThan(cursor) } : {})
},
take: 10,
order: {
id: 1
}
});
if (files.length === 0) {
break;
}
cursor = files[files.length - 1].id;
for (const file of files) {
await deleteFileSync(file);
}
}
logger.succ(`All of files deleted`);
}
{ // Send email notification
const profile = await UserProfiles.findOneOrFail(user.id);
if (profile.email && profile.emailVerified) {
sendEmail(profile.email, 'Account deleted',
`Your account has been deleted.`,
`Your account has been deleted.`);
}
}
// soft指定されている場合は物理削除しない
if (job.data.soft) {
// nop
} else {
await Users.delete(job.data.user.id);
}
return 'Account deleted';
}

View File

@ -0,0 +1,56 @@
import * as Bull from 'bull';
import { queueLogger } from '../../logger';
import { deleteFileSync } from '@/services/drive/delete-file';
import { Users, DriveFiles } from '@/models/index';
import { MoreThan } from 'typeorm';
import { DbUserJobData } from '@/queue/types';
const logger = queueLogger.createSubLogger('delete-drive-files');
export async function deleteDriveFiles(job: Bull.Job<DbUserJobData>, done: any): Promise<void> {
logger.info(`Deleting drive files of ${job.data.user.id} ...`);
const user = await Users.findOne(job.data.user.id);
if (user == null) {
done();
return;
}
let deletedCount = 0;
let cursor: any = null;
while (true) {
const files = await DriveFiles.find({
where: {
userId: user.id,
...(cursor ? { id: MoreThan(cursor) } : {})
},
take: 100,
order: {
id: 1
}
});
if (files.length === 0) {
job.progress(100);
break;
}
cursor = files[files.length - 1].id;
for (const file of files) {
await deleteFileSync(file);
deletedCount++;
}
const total = await DriveFiles.count({
userId: user.id,
});
job.progress(deletedCount / total);
}
logger.succ(`All drive files (${deletedCount}) of ${user.id} has been deleted.`);
done();
}

View File

@ -0,0 +1,94 @@
import * as Bull from 'bull';
import * as tmp from 'tmp';
import * as fs from 'fs';
import { queueLogger } from '../../logger';
import addFile from '@/services/drive/add-file';
import * as dateFormat from 'dateformat';
import { getFullApAccount } from '@/misc/convert-host';
import { Users, Blockings } from '@/models/index';
import { MoreThan } from 'typeorm';
import { DbUserJobData } from '@/queue/types';
const logger = queueLogger.createSubLogger('export-blocking');
export async function exportBlocking(job: Bull.Job<DbUserJobData>, done: any): Promise<void> {
logger.info(`Exporting blocking of ${job.data.user.id} ...`);
const user = await Users.findOne(job.data.user.id);
if (user == null) {
done();
return;
}
// Create temp file
const [path, cleanup] = await new Promise<[string, any]>((res, rej) => {
tmp.file((e, path, fd, cleanup) => {
if (e) return rej(e);
res([path, cleanup]);
});
});
logger.info(`Temp file is ${path}`);
const stream = fs.createWriteStream(path, { flags: 'a' });
let exportedCount = 0;
let cursor: any = null;
while (true) {
const blockings = await Blockings.find({
where: {
blockerId: user.id,
...(cursor ? { id: MoreThan(cursor) } : {})
},
take: 100,
order: {
id: 1
}
});
if (blockings.length === 0) {
job.progress(100);
break;
}
cursor = blockings[blockings.length - 1].id;
for (const block of blockings) {
const u = await Users.findOne({ id: block.blockeeId });
if (u == null) {
exportedCount++; continue;
}
const content = getFullApAccount(u.username, u.host);
await new Promise<void>((res, rej) => {
stream.write(content + '\n', err => {
if (err) {
logger.error(err);
rej(err);
} else {
res();
}
});
});
exportedCount++;
}
const total = await Blockings.count({
blockerId: user.id,
});
job.progress(exportedCount / total);
}
stream.end();
logger.succ(`Exported to: ${path}`);
const fileName = 'blocking-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv';
const driveFile = await addFile(user, path, fileName, null, null, true);
logger.succ(`Exported to: ${driveFile.id}`);
cleanup();
done();
}

View File

@ -0,0 +1,94 @@
import * as Bull from 'bull';
import * as tmp from 'tmp';
import * as fs from 'fs';
import { queueLogger } from '../../logger';
import addFile from '@/services/drive/add-file';
import * as dateFormat from 'dateformat';
import { getFullApAccount } from '@/misc/convert-host';
import { Users, Followings } from '@/models/index';
import { MoreThan } from 'typeorm';
import { DbUserJobData } from '@/queue/types';
const logger = queueLogger.createSubLogger('export-following');
export async function exportFollowing(job: Bull.Job<DbUserJobData>, done: any): Promise<void> {
logger.info(`Exporting following of ${job.data.user.id} ...`);
const user = await Users.findOne(job.data.user.id);
if (user == null) {
done();
return;
}
// Create temp file
const [path, cleanup] = await new Promise<[string, any]>((res, rej) => {
tmp.file((e, path, fd, cleanup) => {
if (e) return rej(e);
res([path, cleanup]);
});
});
logger.info(`Temp file is ${path}`);
const stream = fs.createWriteStream(path, { flags: 'a' });
let exportedCount = 0;
let cursor: any = null;
while (true) {
const followings = await Followings.find({
where: {
followerId: user.id,
...(cursor ? { id: MoreThan(cursor) } : {})
},
take: 100,
order: {
id: 1
}
});
if (followings.length === 0) {
job.progress(100);
break;
}
cursor = followings[followings.length - 1].id;
for (const following of followings) {
const u = await Users.findOne({ id: following.followeeId });
if (u == null) {
exportedCount++; continue;
}
const content = getFullApAccount(u.username, u.host);
await new Promise<void>((res, rej) => {
stream.write(content + '\n', err => {
if (err) {
logger.error(err);
rej(err);
} else {
res();
}
});
});
exportedCount++;
}
const total = await Followings.count({
followerId: user.id,
});
job.progress(exportedCount / total);
}
stream.end();
logger.succ(`Exported to: ${path}`);
const fileName = 'following-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv';
const driveFile = await addFile(user, path, fileName, null, null, true);
logger.succ(`Exported to: ${driveFile.id}`);
cleanup();
done();
}

View File

@ -0,0 +1,94 @@
import * as Bull from 'bull';
import * as tmp from 'tmp';
import * as fs from 'fs';
import { queueLogger } from '../../logger';
import addFile from '@/services/drive/add-file';
import * as dateFormat from 'dateformat';
import { getFullApAccount } from '@/misc/convert-host';
import { Users, Mutings } from '@/models/index';
import { MoreThan } from 'typeorm';
import { DbUserJobData } from '@/queue/types';
const logger = queueLogger.createSubLogger('export-mute');
export async function exportMute(job: Bull.Job<DbUserJobData>, done: any): Promise<void> {
logger.info(`Exporting mute of ${job.data.user.id} ...`);
const user = await Users.findOne(job.data.user.id);
if (user == null) {
done();
return;
}
// Create temp file
const [path, cleanup] = await new Promise<[string, any]>((res, rej) => {
tmp.file((e, path, fd, cleanup) => {
if (e) return rej(e);
res([path, cleanup]);
});
});
logger.info(`Temp file is ${path}`);
const stream = fs.createWriteStream(path, { flags: 'a' });
let exportedCount = 0;
let cursor: any = null;
while (true) {
const mutes = await Mutings.find({
where: {
muterId: user.id,
...(cursor ? { id: MoreThan(cursor) } : {})
},
take: 100,
order: {
id: 1
}
});
if (mutes.length === 0) {
job.progress(100);
break;
}
cursor = mutes[mutes.length - 1].id;
for (const mute of mutes) {
const u = await Users.findOne({ id: mute.muteeId });
if (u == null) {
exportedCount++; continue;
}
const content = getFullApAccount(u.username, u.host);
await new Promise<void>((res, rej) => {
stream.write(content + '\n', err => {
if (err) {
logger.error(err);
rej(err);
} else {
res();
}
});
});
exportedCount++;
}
const total = await Mutings.count({
muterId: user.id,
});
job.progress(exportedCount / total);
}
stream.end();
logger.succ(`Exported to: ${path}`);
const fileName = 'mute-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv';
const driveFile = await addFile(user, path, fileName, null, null, true);
logger.succ(`Exported to: ${driveFile.id}`);
cleanup();
done();
}

View File

@ -0,0 +1,133 @@
import * as Bull from 'bull';
import * as tmp from 'tmp';
import * as fs from 'fs';
import { queueLogger } from '../../logger';
import addFile from '@/services/drive/add-file';
import * as dateFormat from 'dateformat';
import { Users, Notes, Polls } from '@/models/index';
import { MoreThan } from 'typeorm';
import { Note } from '@/models/entities/note';
import { Poll } from '@/models/entities/poll';
import { DbUserJobData } from '@/queue/types';
const logger = queueLogger.createSubLogger('export-notes');
export async function exportNotes(job: Bull.Job<DbUserJobData>, done: any): Promise<void> {
logger.info(`Exporting notes of ${job.data.user.id} ...`);
const user = await Users.findOne(job.data.user.id);
if (user == null) {
done();
return;
}
// Create temp file
const [path, cleanup] = await new Promise<[string, any]>((res, rej) => {
tmp.file((e, path, fd, cleanup) => {
if (e) return rej(e);
res([path, cleanup]);
});
});
logger.info(`Temp file is ${path}`);
const stream = fs.createWriteStream(path, { flags: 'a' });
await new Promise<void>((res, rej) => {
stream.write('[', err => {
if (err) {
logger.error(err);
rej(err);
} else {
res();
}
});
});
let exportedNotesCount = 0;
let cursor: any = null;
while (true) {
const notes = await Notes.find({
where: {
userId: user.id,
...(cursor ? { id: MoreThan(cursor) } : {})
},
take: 100,
order: {
id: 1
}
});
if (notes.length === 0) {
job.progress(100);
break;
}
cursor = notes[notes.length - 1].id;
for (const note of notes) {
let poll: Poll | undefined;
if (note.hasPoll) {
poll = await Polls.findOneOrFail({ noteId: note.id });
}
const content = JSON.stringify(serialize(note, poll));
await new Promise<void>((res, rej) => {
stream.write(exportedNotesCount === 0 ? content : ',\n' + content, err => {
if (err) {
logger.error(err);
rej(err);
} else {
res();
}
});
});
exportedNotesCount++;
}
const total = await Notes.count({
userId: user.id,
});
job.progress(exportedNotesCount / total);
}
await new Promise<void>((res, rej) => {
stream.write(']', err => {
if (err) {
logger.error(err);
rej(err);
} else {
res();
}
});
});
stream.end();
logger.succ(`Exported to: ${path}`);
const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.json';
const driveFile = await addFile(user, path, fileName, null, null, true);
logger.succ(`Exported to: ${driveFile.id}`);
cleanup();
done();
}
function serialize(note: Note, poll: Poll | null = null): any {
return {
id: note.id,
text: note.text,
createdAt: note.createdAt,
fileIds: note.fileIds,
replyId: note.replyId,
renoteId: note.renoteId,
poll: poll,
cw: note.cw,
viaMobile: note.viaMobile,
visibility: note.visibility,
visibleUserIds: note.visibleUserIds,
localOnly: note.localOnly
};
}

View File

@ -0,0 +1,71 @@
import * as Bull from 'bull';
import * as tmp from 'tmp';
import * as fs from 'fs';
import { queueLogger } from '../../logger';
import addFile from '@/services/drive/add-file';
import * as dateFormat from 'dateformat';
import { getFullApAccount } from '@/misc/convert-host';
import { Users, UserLists, UserListJoinings } from '@/models/index';
import { In } from 'typeorm';
import { DbUserJobData } from '@/queue/types';
const logger = queueLogger.createSubLogger('export-user-lists');
export async function exportUserLists(job: Bull.Job<DbUserJobData>, done: any): Promise<void> {
logger.info(`Exporting user lists of ${job.data.user.id} ...`);
const user = await Users.findOne(job.data.user.id);
if (user == null) {
done();
return;
}
const lists = await UserLists.find({
userId: user.id
});
// Create temp file
const [path, cleanup] = await new Promise<[string, any]>((res, rej) => {
tmp.file((e, path, fd, cleanup) => {
if (e) return rej(e);
res([path, cleanup]);
});
});
logger.info(`Temp file is ${path}`);
const stream = fs.createWriteStream(path, { flags: 'a' });
for (const list of lists) {
const joinings = await UserListJoinings.find({ userListId: list.id });
const users = await Users.find({
id: In(joinings.map(j => j.userId))
});
for (const u of users) {
const acct = getFullApAccount(u.username, u.host);
const content = `${list.name},${acct}`;
await new Promise<void>((res, rej) => {
stream.write(content + '\n', err => {
if (err) {
logger.error(err);
rej(err);
} else {
res();
}
});
});
}
}
stream.end();
logger.succ(`Exported to: ${path}`);
const fileName = 'user-lists-' + dateFormat(new Date(), 'yyyy-mm-dd-HH-MM-ss') + '.csv';
const driveFile = await addFile(user, path, fileName, null, null, true);
logger.succ(`Exported to: ${driveFile.id}`);
cleanup();
done();
}

View File

@ -0,0 +1,74 @@
import * as Bull from 'bull';
import { queueLogger } from '../../logger';
import * as Acct from 'misskey-js/built/acct';
import { resolveUser } from '@/remote/resolve-user';
import { downloadTextFile } from '@/misc/download-text-file';
import { isSelfHost, toPuny } from '@/misc/convert-host';
import { Users, DriveFiles, Blockings } from '@/models/index';
import { DbUserImportJobData } from '@/queue/types';
import block from '@/services/blocking/create';
const logger = queueLogger.createSubLogger('import-blocking');
export async function importBlocking(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> {
logger.info(`Importing blocking of ${job.data.user.id} ...`);
const user = await Users.findOne(job.data.user.id);
if (user == null) {
done();
return;
}
const file = await DriveFiles.findOne({
id: job.data.fileId
});
if (file == null) {
done();
return;
}
const csv = await downloadTextFile(file.url);
let linenum = 0;
for (const line of csv.trim().split('\n')) {
linenum++;
try {
const acct = line.split(',')[0].trim();
const { username, host } = Acct.parse(acct);
let target = isSelfHost(host!) ? await Users.findOne({
host: null,
usernameLower: username.toLowerCase()
}) : await Users.findOne({
host: toPuny(host!),
usernameLower: username.toLowerCase()
});
if (host == null && target == null) continue;
if (target == null) {
target = await resolveUser(username, host);
}
if (target == null) {
throw `cannot resolve user: @${username}@${host}`;
}
// skip myself
if (target.id === job.data.user.id) continue;
logger.info(`Block[${linenum}] ${target.id} ...`);
await block(user, target);
} catch (e) {
logger.warn(`Error in line:${linenum} ${e}`);
}
}
logger.succ('Imported');
done();
}

View File

@ -0,0 +1,73 @@
import * as Bull from 'bull';
import { queueLogger } from '../../logger';
import follow from '@/services/following/create';
import * as Acct from 'misskey-js/built/acct';
import { resolveUser } from '@/remote/resolve-user';
import { downloadTextFile } from '@/misc/download-text-file';
import { isSelfHost, toPuny } from '@/misc/convert-host';
import { Users, DriveFiles } from '@/models/index';
import { DbUserImportJobData } from '@/queue/types';
const logger = queueLogger.createSubLogger('import-following');
export async function importFollowing(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> {
logger.info(`Importing following of ${job.data.user.id} ...`);
const user = await Users.findOne(job.data.user.id);
if (user == null) {
done();
return;
}
const file = await DriveFiles.findOne({
id: job.data.fileId
});
if (file == null) {
done();
return;
}
const csv = await downloadTextFile(file.url);
let linenum = 0;
for (const line of csv.trim().split('\n')) {
linenum++;
try {
const acct = line.split(',')[0].trim();
const { username, host } = Acct.parse(acct);
let target = isSelfHost(host!) ? await Users.findOne({
host: null,
usernameLower: username.toLowerCase()
}) : await Users.findOne({
host: toPuny(host!),
usernameLower: username.toLowerCase()
});
if (host == null && target == null) continue;
if (target == null) {
target = await resolveUser(username, host);
}
if (target == null) {
throw `cannot resolve user: @${username}@${host}`;
}
// skip myself
if (target.id === job.data.user.id) continue;
logger.info(`Follow[${linenum}] ${target.id} ...`);
follow(user, target);
} catch (e) {
logger.warn(`Error in line:${linenum} ${e}`);
}
}
logger.succ('Imported');
done();
}

View File

@ -0,0 +1,83 @@
import * as Bull from 'bull';
import { queueLogger } from '../../logger';
import * as Acct from 'misskey-js/built/acct';
import { resolveUser } from '@/remote/resolve-user';
import { downloadTextFile } from '@/misc/download-text-file';
import { isSelfHost, toPuny } from '@/misc/convert-host';
import { Users, DriveFiles, Mutings } from '@/models/index';
import { DbUserImportJobData } from '@/queue/types';
import { User } from '@/models/entities/user';
import { genId } from '@/misc/gen-id';
const logger = queueLogger.createSubLogger('import-muting');
export async function importMuting(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> {
logger.info(`Importing muting of ${job.data.user.id} ...`);
const user = await Users.findOne(job.data.user.id);
if (user == null) {
done();
return;
}
const file = await DriveFiles.findOne({
id: job.data.fileId
});
if (file == null) {
done();
return;
}
const csv = await downloadTextFile(file.url);
let linenum = 0;
for (const line of csv.trim().split('\n')) {
linenum++;
try {
const acct = line.split(',')[0].trim();
const { username, host } = Acct.parse(acct);
let target = isSelfHost(host!) ? await Users.findOne({
host: null,
usernameLower: username.toLowerCase()
}) : await Users.findOne({
host: toPuny(host!),
usernameLower: username.toLowerCase()
});
if (host == null && target == null) continue;
if (target == null) {
target = await resolveUser(username, host);
}
if (target == null) {
throw `cannot resolve user: @${username}@${host}`;
}
// skip myself
if (target.id === job.data.user.id) continue;
logger.info(`Mute[${linenum}] ${target.id} ...`);
await mute(user, target);
} catch (e) {
logger.warn(`Error in line:${linenum} ${e}`);
}
}
logger.succ('Imported');
done();
}
async function mute(user: User, target: User) {
await Mutings.insert({
id: genId(),
createdAt: new Date(),
muterId: user.id,
muteeId: target.id,
});
}

View File

@ -0,0 +1,80 @@
import * as Bull from 'bull';
import { queueLogger } from '../../logger';
import * as Acct from 'misskey-js/built/acct';
import { resolveUser } from '@/remote/resolve-user';
import { pushUserToUserList } from '@/services/user-list/push';
import { downloadTextFile } from '@/misc/download-text-file';
import { isSelfHost, toPuny } from '@/misc/convert-host';
import { DriveFiles, Users, UserLists, UserListJoinings } from '@/models/index';
import { genId } from '@/misc/gen-id';
import { DbUserImportJobData } from '@/queue/types';
const logger = queueLogger.createSubLogger('import-user-lists');
export async function importUserLists(job: Bull.Job<DbUserImportJobData>, done: any): Promise<void> {
logger.info(`Importing user lists of ${job.data.user.id} ...`);
const user = await Users.findOne(job.data.user.id);
if (user == null) {
done();
return;
}
const file = await DriveFiles.findOne({
id: job.data.fileId
});
if (file == null) {
done();
return;
}
const csv = await downloadTextFile(file.url);
let linenum = 0;
for (const line of csv.trim().split('\n')) {
linenum++;
try {
const listName = line.split(',')[0].trim();
const { username, host } = Acct.parse(line.split(',')[1].trim());
let list = await UserLists.findOne({
userId: user.id,
name: listName
});
if (list == null) {
list = await UserLists.save({
id: genId(),
createdAt: new Date(),
userId: user.id,
name: listName,
userIds: []
});
}
let target = isSelfHost(host!) ? await Users.findOne({
host: null,
usernameLower: username.toLowerCase()
}) : await Users.findOne({
host: toPuny(host!),
usernameLower: username.toLowerCase()
});
if (target == null) {
target = await resolveUser(username, host);
}
if (await UserListJoinings.findOne({ userListId: list.id, userId: target.id }) != null) continue;
pushUserToUserList(target, list);
} catch (e) {
logger.warn(`Error in line:${linenum} ${e}`);
}
}
logger.succ('Imported');
done();
}

View File

@ -0,0 +1,33 @@
import * as Bull from 'bull';
import { DbJobData } from '@/queue/types';
import { deleteDriveFiles } from './delete-drive-files';
import { exportNotes } from './export-notes';
import { exportFollowing } from './export-following';
import { exportMute } from './export-mute';
import { exportBlocking } from './export-blocking';
import { exportUserLists } from './export-user-lists';
import { importFollowing } from './import-following';
import { importUserLists } from './import-user-lists';
import { deleteAccount } from './delete-account';
import { importMuting } from './import-muting';
import { importBlocking } from './import-blocking';
const jobs = {
deleteDriveFiles,
exportNotes,
exportFollowing,
exportMute,
exportBlocking,
exportUserLists,
importFollowing,
importMuting,
importBlocking,
importUserLists,
deleteAccount,
} as Record<string, Bull.ProcessCallbackFunction<DbJobData> | Bull.ProcessPromiseFunction<DbJobData>>;
export default function(dbQueue: Bull.Queue<DbJobData>) {
for (const [k, v] of Object.entries(jobs)) {
dbQueue.process(k, v);
}
}

View File

@ -0,0 +1,94 @@
import { URL } from 'url';
import * as Bull from 'bull';
import request from '@/remote/activitypub/request';
import { registerOrFetchInstanceDoc } from '@/services/register-or-fetch-instance-doc';
import Logger from '@/services/logger';
import { Instances } from '@/models/index';
import { instanceChart } from '@/services/chart/index';
import { fetchInstanceMetadata } from '@/services/fetch-instance-metadata';
import { fetchMeta } from '@/misc/fetch-meta';
import { toPuny } from '@/misc/convert-host';
import { Cache } from '@/misc/cache';
import { Instance } from '@/models/entities/instance';
import { DeliverJobData } from '../types';
import { StatusError } from '@/misc/fetch';
const logger = new Logger('deliver');
let latest: string | null = null;
const suspendedHostsCache = new Cache<Instance[]>(1000 * 60 * 60);
export default async (job: Bull.Job<DeliverJobData>) => {
const { host } = new URL(job.data.to);
// ブロックしてたら中断
const meta = await fetchMeta();
if (meta.blockedHosts.includes(toPuny(host))) {
return 'skip (blocked)';
}
// isSuspendedなら中断
let suspendedHosts = suspendedHostsCache.get(null);
if (suspendedHosts == null) {
suspendedHosts = await Instances.find({
where: {
isSuspended: true
},
});
suspendedHostsCache.set(null, suspendedHosts);
}
if (suspendedHosts.map(x => x.host).includes(toPuny(host))) {
return 'skip (suspended)';
}
try {
if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) {
logger.debug(`delivering ${latest}`);
}
await request(job.data.user, job.data.to, job.data.content);
// Update stats
registerOrFetchInstanceDoc(host).then(i => {
Instances.update(i.id, {
latestRequestSentAt: new Date(),
latestStatus: 200,
lastCommunicatedAt: new Date(),
isNotResponding: false
});
fetchInstanceMetadata(i);
instanceChart.requestSent(i.host, true);
});
return 'Success';
} catch (res) {
// Update stats
registerOrFetchInstanceDoc(host).then(i => {
Instances.update(i.id, {
latestRequestSentAt: new Date(),
latestStatus: res instanceof StatusError ? res.statusCode : null,
isNotResponding: true
});
instanceChart.requestSent(i.host, false);
});
if (res instanceof StatusError) {
// 4xx
if (res.isClientError) {
// HTTPステータスコード4xxはクライアントエラーであり、それはつまり
// 何回再送しても成功することはないということなのでエラーにはしないでおく
return `${res.statusCode} ${res.statusMessage}`;
}
// 5xx etc.
throw `${res.statusCode} ${res.statusMessage}`;
} else {
// DNS error, socket error, timeout ...
throw res;
}
}
};

View File

@ -0,0 +1,149 @@
import { URL } from 'url';
import * as Bull from 'bull';
import * as httpSignature from 'http-signature';
import perform from '@/remote/activitypub/perform';
import Logger from '@/services/logger';
import { registerOrFetchInstanceDoc } from '@/services/register-or-fetch-instance-doc';
import { Instances } from '@/models/index';
import { instanceChart } from '@/services/chart/index';
import { fetchMeta } from '@/misc/fetch-meta';
import { toPuny, extractDbHost } from '@/misc/convert-host';
import { getApId } from '@/remote/activitypub/type';
import { fetchInstanceMetadata } from '@/services/fetch-instance-metadata';
import { InboxJobData } from '../types';
import DbResolver from '@/remote/activitypub/db-resolver';
import { resolvePerson } from '@/remote/activitypub/models/person';
import { LdSignature } from '@/remote/activitypub/misc/ld-signature';
import { StatusError } from '@/misc/fetch';
const logger = new Logger('inbox');
// ユーザーのinboxにアクティビティが届いた時の処理
export default async (job: Bull.Job<InboxJobData>): Promise<string> => {
const signature = job.data.signature; // HTTP-signature
const activity = job.data.activity;
//#region Log
const info = Object.assign({}, activity) as any;
delete info['@context'];
logger.debug(JSON.stringify(info, null, 2));
//#endregion
const host = toPuny(new URL(signature.keyId).hostname);
// ブロックしてたら中断
const meta = await fetchMeta();
if (meta.blockedHosts.includes(host)) {
return `Blocked request: ${host}`;
}
const keyIdLower = signature.keyId.toLowerCase();
if (keyIdLower.startsWith('acct:')) {
return `Old keyId is no longer supported. ${keyIdLower}`;
}
// TDOO: キャッシュ
const dbResolver = new DbResolver();
// HTTP-Signature keyIdを元にDBから取得
let authUser = await dbResolver.getAuthUserFromKeyId(signature.keyId);
// keyIdでわからなければ、activity.actorを元にDBから取得 || activity.actorを元にリモートから取得
if (authUser == null) {
try {
authUser = await dbResolver.getAuthUserFromApId(getApId(activity.actor));
} catch (e) {
// 対象が4xxならスキップ
if (e instanceof StatusError && e.isClientError) {
return `skip: Ignored deleted actors on both ends ${activity.actor} - ${e.statusCode}`;
}
throw `Error in actor ${activity.actor} - ${e.statusCode || e}`;
}
}
// それでもわからなければ終了
if (authUser == null) {
return `skip: failed to resolve user`;
}
// publicKey がなくても終了
if (authUser.key == null) {
return `skip: failed to resolve user publicKey`;
}
// HTTP-Signatureの検証
const httpSignatureValidated = httpSignature.verifySignature(signature, authUser.key.keyPem);
// また、signatureのsignerは、activity.actorと一致する必要がある
if (!httpSignatureValidated || authUser.user.uri !== activity.actor) {
// 一致しなくても、でもLD-Signatureがありそうならそっちも見る
if (activity.signature) {
if (activity.signature.type !== 'RsaSignature2017') {
return `skip: unsupported LD-signature type ${activity.signature.type}`;
}
// activity.signature.creator: https://example.oom/users/user#main-key
// みたいになっててUserを引っ張れば公開キーも入ることを期待する
if (activity.signature.creator) {
const candicate = activity.signature.creator.replace(/#.*/, '');
await resolvePerson(candicate).catch(() => null);
}
// keyIdからLD-Signatureのユーザーを取得
authUser = await dbResolver.getAuthUserFromKeyId(activity.signature.creator);
if (authUser == null) {
return `skip: LD-Signatureのユーザーが取得できませんでした`;
}
if (authUser.key == null) {
return `skip: LD-SignatureのユーザーはpublicKeyを持っていませんでした`;
}
// LD-Signature検証
const ldSignature = new LdSignature();
const verified = await ldSignature.verifyRsaSignature2017(activity, authUser.key.keyPem).catch(() => false);
if (!verified) {
return `skip: LD-Signatureの検証に失敗しました`;
}
// もう一度actorチェック
if (authUser.user.uri !== activity.actor) {
return `skip: LD-Signature user(${authUser.user.uri}) !== activity.actor(${activity.actor})`;
}
// ブロックしてたら中断
const ldHost = extractDbHost(authUser.user.uri);
if (meta.blockedHosts.includes(ldHost)) {
return `Blocked request: ${ldHost}`;
}
} else {
return `skip: http-signature verification failed and no LD-Signature. keyId=${signature.keyId}`;
}
}
// activity.idがあればホストが署名者のホストであることを確認する
if (typeof activity.id === 'string') {
const signerHost = extractDbHost(authUser.user.uri!);
const activityIdHost = extractDbHost(activity.id);
if (signerHost !== activityIdHost) {
return `skip: signerHost(${signerHost}) !== activity.id host(${activityIdHost}`;
}
}
// Update stats
registerOrFetchInstanceDoc(authUser.user.host).then(i => {
Instances.update(i.id, {
latestRequestReceivedAt: new Date(),
lastCommunicatedAt: new Date(),
isNotResponding: false
});
fetchInstanceMetadata(i);
instanceChart.requestReceived(i.host);
});
// アクティビティを処理
await perform(authUser.user, activity);
return `ok`;
};

View File

@ -0,0 +1,50 @@
import * as Bull from 'bull';
import { queueLogger } from '../../logger';
import { deleteFileSync } from '@/services/drive/delete-file';
import { DriveFiles } from '@/models/index';
import { MoreThan, Not, IsNull } from 'typeorm';
const logger = queueLogger.createSubLogger('clean-remote-files');
export default async function cleanRemoteFiles(job: Bull.Job<{}>, done: any): Promise<void> {
logger.info(`Deleting cached remote files...`);
let deletedCount = 0;
let cursor: any = null;
while (true) {
const files = await DriveFiles.find({
where: {
userHost: Not(IsNull()),
isLink: false,
...(cursor ? { id: MoreThan(cursor) } : {})
},
take: 8,
order: {
id: 1
}
});
if (files.length === 0) {
job.progress(100);
break;
}
cursor = files[files.length - 1].id;
await Promise.all(files.map(file => deleteFileSync(file, true)));
deletedCount += 8;
const total = await DriveFiles.count({
userHost: Not(IsNull()),
isLink: false,
});
job.progress(deletedCount / total);
}
logger.succ(`All cahced remote files has been deleted.`);
done();
}

View File

@ -0,0 +1,11 @@
import { ObjectStorageFileJobData } from '@/queue/types';
import * as Bull from 'bull';
import { deleteObjectStorageFile } from '@/services/drive/delete-file';
export default async (job: Bull.Job<ObjectStorageFileJobData>) => {
const key: string = job.data.key;
await deleteObjectStorageFile(key);
return 'Success';
};

View File

@ -0,0 +1,15 @@
import * as Bull from 'bull';
import { ObjectStorageJobData } from '@/queue/types';
import deleteFile from './delete-file';
import cleanRemoteFiles from './clean-remote-files';
const jobs = {
deleteFile,
cleanRemoteFiles,
} as Record<string, Bull.ProcessCallbackFunction<ObjectStorageJobData> | Bull.ProcessPromiseFunction<ObjectStorageJobData>>;
export default function(q: Bull.Queue) {
for (const [k, v] of Object.entries(jobs)) {
q.process(k, 16, v);
}
}

View File

@ -0,0 +1,12 @@
import * as Bull from 'bull';
import { resyncCharts } from './resync-charts';
const jobs = {
resyncCharts,
} as Record<string, Bull.ProcessCallbackFunction<{}> | Bull.ProcessPromiseFunction<{}>>;
export default function(dbQueue: Bull.Queue<{}>) {
for (const [k, v] of Object.entries(jobs)) {
dbQueue.process(k, v);
}
}

View File

@ -0,0 +1,21 @@
import * as Bull from 'bull';
import { queueLogger } from '../../logger';
import { driveChart, notesChart, usersChart } from '@/services/chart/index';
const logger = queueLogger.createSubLogger('resync-charts');
export default async function resyncCharts(job: Bull.Job<{}>, done: any): Promise<void> {
logger.info(`Resync charts...`);
// TODO: ユーザーごとのチャートも更新する
// TODO: インスタンスごとのチャートも更新する
await Promise.all([
driveChart.resync(),
notesChart.resync(),
usersChart.resync(),
]);
logger.succ(`All charts successfully resynced.`);
done();
}

View File

@ -0,0 +1,9 @@
import config from '@/config/index';
import { initialize as initializeQueue } from './initialize';
import { DeliverJobData, InboxJobData, DbJobData, ObjectStorageJobData } from './types';
export const systemQueue = initializeQueue<{}>('system');
export const deliverQueue = initializeQueue<DeliverJobData>('deliver', config.deliverJobPerSec || 128);
export const inboxQueue = initializeQueue<InboxJobData>('inbox', config.inboxJobPerSec || 16);
export const dbQueue = initializeQueue<DbJobData>('db');
export const objectStorageQueue = initializeQueue<ObjectStorageJobData>('objectStorage');

View File

@ -0,0 +1,44 @@
import { DriveFile } from '@/models/entities/drive-file';
import { User } from '@/models/entities/user';
import { IActivity } from '@/remote/activitypub/type';
import * as httpSignature from 'http-signature';
export type DeliverJobData = {
/** Actor */
user: ThinUser;
/** Activity */
content: unknown;
/** inbox URL to deliver */
to: string;
};
export type InboxJobData = {
activity: IActivity;
signature: httpSignature.IParsedSignature;
};
export type DbJobData = DbUserJobData | DbUserImportJobData | DbUserDeleteJobData;
export type DbUserJobData = {
user: ThinUser;
};
export type DbUserDeleteJobData = {
user: ThinUser;
soft?: boolean;
};
export type DbUserImportJobData = {
user: ThinUser;
fileId: DriveFile['id'];
};
export type ObjectStorageJobData = ObjectStorageFileJobData | {};
export type ObjectStorageFileJobData = {
key: string;
};
export type ThinUser = {
id: User['id'];
};