You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
tailchat/server/services/core/file.service.ts

328 lines
7.5 KiB
TypeScript

import {
TcService,
PureContext,
TcContext,
buildUploadUrl,
config,
TcDbService,
NoPermissionError,
} from 'tailchat-server-sdk';
import MinioService from 'moleculer-minio';
import _ from 'lodash';
import mime from 'mime';
import type { Client as MinioClient } from 'minio';
import { isValidStaticAssetsUrl, isValidStr } from '../../lib/utils';
import path from 'node:path';
import type { FileDocument, FileModel } from '../../models/file';
import { Types } from 'mongoose';
import got from 'got';
import { Readable } from 'node:stream';
interface FileService extends TcService, TcDbService<FileDocument, FileModel> {}
class FileService extends TcService {
get serviceName(): string {
return 'file';
}
get minioClient(): MinioClient {
return this.client;
}
get bucketName(): string {
return config.storage.bucketName;
}
onInit(): void {
this.registerLocalDb(require('../../models/file').default);
this.registerMixin(MinioService);
const minioUrl = config.storage.minioUrl;
const [endPoint, port] = minioUrl.split(':');
// https://github.com/designtesbrot/moleculer-minio#settings
this.registerSetting('endPoint', endPoint);
this.registerSetting('port', Number(port));
this.registerSetting('useSSL', false);
this.registerSetting('accessKey', config.storage.user);
this.registerSetting('secretKey', config.storage.pass);
this.registerAction('save', this.save);
this.registerAction('saveFileWithUrl', this.saveFileWithUrl, {
visibility: 'public',
params: {
fileUrl: 'string',
},
});
this.registerAction('get', this.get, {
params: {
objectName: 'string',
},
disableSocket: true,
});
}
async onInited() {
// TODO: 看看有没有办法用一个ctx包起来
// Services Available
if (config.feature.disableFileCheck) {
return;
}
const isExists = await this.actions['bucketExists'](
{
bucketName: this.bucketName,
},
{
timeout: 20000, // 20s
}
);
if (isExists === false) {
// bucket不存在创建新的
this.logger.info(
'[File]',
'Bucket 不存在, 创建新的Bucket',
this.bucketName
);
await this.actions['makeBucket']({
bucketName: this.bucketName,
});
}
const buckets = await this.actions['listBuckets']();
this.logger.info(`[File] MinioInfo: | buckets: ${JSON.stringify(buckets)}`);
}
/**
*
*/
async save(
ctx: TcContext<
{},
{
$params: any;
filename: any;
}
>
) {
const t = ctx.meta.t;
this.logger.info('Received upload $params:', ctx.meta.$params);
return new Promise(async (resolve, reject) => {
const userId = ctx.meta.userId;
this.logger.info('Received upload meta:', ctx.meta);
if (!isValidStr(userId)) {
throw new NoPermissionError(t('用户无上传权限'));
}
const originFilename = String(ctx.meta.filename);
const stream = ctx.params as NodeJS.ReadableStream;
(stream as any).on('error', (err) => {
// 这里是文件传输错误处理
// 比如文件过大
this.logger.error('File error received', err.message);
reject(err);
});
try {
const { etag, objectName, url } = await this.saveFileStream(
ctx,
originFilename,
stream
);
resolve({
etag,
path: `${this.bucketName}/${objectName}`,
url,
});
} catch (e) {
reject(e);
}
});
}
/**
* url
*
*
* NOTICE:
*
* @param fileUrl
*/
async saveFileWithUrl(
ctx: TcContext<{
fileUrl: string;
}>
) {
const fileUrl = ctx.params.fileUrl;
const t = ctx.meta.t;
if (!isValidStaticAssetsUrl(fileUrl)) {
throw new Error(t('文件地址不是一个合法的资源地址'));
}
return new Promise(async (resolve, reject) => {
const req = got.stream(fileUrl);
const stream = Readable.from(req);
stream.on('error', (err: Error) => {
// 这里是文件传输错误处理
// 比如文件过大
this.logger.error('File error received', err.message);
reject(err);
});
try {
const filename = _.last(fileUrl.split('/'));
const { etag, objectName, url } = await this.saveFileStream(
ctx,
filename,
stream
);
resolve({
etag,
path: `${this.bucketName}/${objectName}`,
url,
});
} catch (e) {
reject(e);
}
});
}
/**
*
*/
async saveFileStream(
ctx: TcContext,
filename: string,
fileStream: NodeJS.ReadableStream
): Promise<{ etag: string; url: string; objectName: string }> {
const span = ctx.startSpan('file.saveFileStream');
const ext = path.extname(filename);
// 临时仓库
const tmpObjectName = `tmp/${this.randomName()}${ext}`;
const { etag } = await this.actions['putObject'](fileStream, {
meta: {
bucketName: this.bucketName,
objectName: tmpObjectName,
metaData: {
'content-type': mime.getType(ext),
},
},
parentCtx: ctx,
});
const { url, objectName } = await this.persistFile(
ctx,
tmpObjectName,
etag,
ext
);
span.finish();
return {
etag,
url,
objectName,
};
}
/**
*
*/
async persistFile(
ctx: TcContext,
tmpObjectName: string,
etag: string,
ext: string
): Promise<{
url: string;
objectName: string;
}> {
const span = ctx.startSpan('file.persistFile');
const userId = ctx.meta.userId;
// 存储在上传者自己的子目录
const objectName = `files/${userId}/${etag}${ext}`;
try {
await this.actions['copyObject'](
{
bucketName: this.bucketName,
objectName,
sourceObject: `/${this.bucketName}/${tmpObjectName}`, // NOTICE: 此处要填入带bucketName的完成路径
conditions: {
matchETag: etag,
},
},
{
parentCtx: ctx,
}
);
} finally {
this.minioClient.removeObject(this.bucketName, tmpObjectName);
}
const url = buildUploadUrl(objectName);
// 异步执行, 将其存入数据库
this.minioClient
.statObject(this.bucketName, objectName)
.then((stat) =>
this.adapter.insert({
etag,
userId: new Types.ObjectId(userId),
bucketName: this.bucketName,
objectName,
url,
size: stat.size,
metaData: stat.metaData,
})
)
.catch((err) => {
this.logger.error(`持久化到数据库失败: ${objectName}`, err);
});
span.finish();
return {
url,
objectName,
};
}
/**
*
*/
async get(
ctx: PureContext<{
objectName: string;
}>
) {
const objectName = ctx.params.objectName;
const stream = await this.actions['getObject'](
{
bucketName: this.bucketName,
objectName,
},
{
parentCtx: ctx,
}
);
return stream;
}
private randomName() {
return `unnamed_${this.broker.nodeID}_${Date.now()}`;
}
}
export default FileService;