|
|
import { Server as SocketServer } from 'socket.io';
|
|
|
import { createAdapter } from '@socket.io/redis-adapter';
|
|
|
import { instrument } from '@socket.io/admin-ui';
|
|
|
import RedisClient from 'ioredis';
|
|
|
import {
|
|
|
TcService,
|
|
|
TcContext,
|
|
|
UserJWTPayload,
|
|
|
parseLanguageFromHead,
|
|
|
config,
|
|
|
PureContext,
|
|
|
PureService,
|
|
|
PureServiceSchema,
|
|
|
Utils,
|
|
|
Errors,
|
|
|
} from 'tailchat-server-sdk';
|
|
|
import _ from 'lodash';
|
|
|
import { ServiceUnavailableError } from 'tailchat-server-sdk';
|
|
|
import { isValidStr } from '../lib/utils';
|
|
|
import bcrypt from 'bcryptjs';
|
|
|
import msgpackParser from 'socket.io-msgpack-parser';
|
|
|
|
|
|
const blacklist: (string | RegExp)[] = ['gateway.*'];
|
|
|
|
|
|
function checkBlacklist(eventName: string): boolean {
|
|
|
return blacklist.some((item) => {
|
|
|
if (_.isString(item)) {
|
|
|
return Utils.match(eventName, item);
|
|
|
} else if (_.isRegExp(item)) {
|
|
|
return item.test(eventName);
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* socket 用户房间编号
|
|
|
*/
|
|
|
function buildUserRoomId(userId: string) {
|
|
|
return `u-${userId}`;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* socket online 用户编号
|
|
|
*/
|
|
|
function buildUserOnlineKey(userId: string) {
|
|
|
return `tailchat-socketio.online:${userId}`;
|
|
|
}
|
|
|
|
|
|
const expiredTime = 1 * 24 * 60 * 60; // 1天
|
|
|
|
|
|
interface SocketIOService extends PureService {
|
|
|
io: SocketServer;
|
|
|
redis: RedisClient.Redis;
|
|
|
socketCloseCallbacks: (() => Promise<unknown>)[];
|
|
|
}
|
|
|
|
|
|
interface TcSocketIOServiceOptions {
|
|
|
/**
|
|
|
* 用户token校验
|
|
|
*/
|
|
|
userAuth: (token: string) => Promise<UserJWTPayload>;
|
|
|
|
|
|
/**
|
|
|
* 是否禁用msgpack
|
|
|
*/
|
|
|
disableMsgpack?: boolean;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Socket IO 服务 mixin
|
|
|
*/
|
|
|
export const TcSocketIOService = (
|
|
|
options: TcSocketIOServiceOptions
|
|
|
): Partial<PureServiceSchema> => {
|
|
|
const { userAuth } = options;
|
|
|
|
|
|
const schema: Partial<PureServiceSchema> = {
|
|
|
created(this: SocketIOService) {
|
|
|
this.broker.metrics.register({
|
|
|
type: 'gauge',
|
|
|
name: 'tailchat.socketio.online.count',
|
|
|
labelNames: ['nodeId'],
|
|
|
description: 'Number of online user',
|
|
|
});
|
|
|
},
|
|
|
async started(this: SocketIOService) {
|
|
|
if (!this.io) {
|
|
|
this.initSocketIO();
|
|
|
}
|
|
|
|
|
|
this.logger.info('SocketIO service started');
|
|
|
|
|
|
const io: SocketServer = this.io;
|
|
|
if (!config.redisUrl) {
|
|
|
throw new Errors.MoleculerClientError(
|
|
|
'SocketIO service failed to start, environment variables are required: `REDIS_URL`'
|
|
|
);
|
|
|
}
|
|
|
this.socketCloseCallbacks = []; // socketio服务关闭时需要执行的回调
|
|
|
|
|
|
const pubClient = new RedisClient(config.redisUrl, {
|
|
|
retryStrategy(times) {
|
|
|
const delay = Math.min(times * 50, 2000);
|
|
|
return delay;
|
|
|
},
|
|
|
});
|
|
|
const subClient = pubClient.duplicate();
|
|
|
io.adapter(
|
|
|
createAdapter(pubClient, subClient, {
|
|
|
key: 'tailchat-socket',
|
|
|
})
|
|
|
);
|
|
|
|
|
|
this.socketCloseCallbacks.push(async () => {
|
|
|
pubClient.disconnect(false);
|
|
|
subClient.disconnect(false);
|
|
|
});
|
|
|
this.logger.info('SocketIO is using Redis Adapter');
|
|
|
|
|
|
this.redis = pubClient;
|
|
|
|
|
|
io.use(async (socket, next) => {
|
|
|
// 授权
|
|
|
try {
|
|
|
if (
|
|
|
config.enableSocketAdmin &&
|
|
|
socket.handshake.headers['origin'] === 'https://admin.socket.io'
|
|
|
) {
|
|
|
// 如果是通过 admin-ui 访问的socket.io 直接链接
|
|
|
next();
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
const token = socket.handshake.auth['token'];
|
|
|
if (typeof token !== 'string') {
|
|
|
throw new Errors.MoleculerError('Token cannot be empty');
|
|
|
}
|
|
|
|
|
|
const user: UserJWTPayload = await userAuth(token);
|
|
|
|
|
|
if (!(user && user._id)) {
|
|
|
throw new Error('Token invalid');
|
|
|
}
|
|
|
|
|
|
this.logger.info('[Socket] Authenticated via JWT: ', user.nickname);
|
|
|
|
|
|
socket.data.user = user;
|
|
|
socket.data.token = token;
|
|
|
socket.data.userId = user._id;
|
|
|
|
|
|
next();
|
|
|
} catch (e) {
|
|
|
return next(e);
|
|
|
}
|
|
|
});
|
|
|
|
|
|
this.io.on('connection', (socket) => {
|
|
|
if (typeof socket.data.userId !== 'string') {
|
|
|
// 不应该进入的逻辑
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
this.broker.metrics.increment(
|
|
|
'tailchat.socketio.online.count',
|
|
|
{
|
|
|
nodeId: this.broker.nodeID,
|
|
|
},
|
|
|
1
|
|
|
);
|
|
|
|
|
|
const userId = socket.data.userId;
|
|
|
pubClient
|
|
|
.hset(buildUserOnlineKey(userId), socket.id, this.broker.nodeID)
|
|
|
.then(() => {
|
|
|
pubClient.expire(buildUserOnlineKey(userId), expiredTime);
|
|
|
});
|
|
|
|
|
|
// 加入自己userId所生产的id
|
|
|
socket.join(buildUserRoomId(userId));
|
|
|
|
|
|
/**
|
|
|
* 离线时移除在线映射
|
|
|
*/
|
|
|
const removeOnlineMapping = () => {
|
|
|
return pubClient.hdel(buildUserOnlineKey(userId), socket.id);
|
|
|
};
|
|
|
this.socketCloseCallbacks.push(removeOnlineMapping);
|
|
|
|
|
|
// 用户断线
|
|
|
socket.on('disconnecting', (reason) => {
|
|
|
this.logger.info(
|
|
|
'Socket Disconnect:',
|
|
|
reason,
|
|
|
'| Rooms:',
|
|
|
socket.rooms
|
|
|
);
|
|
|
|
|
|
this.broker.metrics.decrement(
|
|
|
'tailchat.socketio.online.count',
|
|
|
{
|
|
|
nodeId: this.broker.nodeID,
|
|
|
},
|
|
|
1
|
|
|
);
|
|
|
|
|
|
removeOnlineMapping();
|
|
|
_.pull(this.socketCloseCallbacks, removeOnlineMapping);
|
|
|
});
|
|
|
|
|
|
// 连接时
|
|
|
socket.onAny(
|
|
|
async (
|
|
|
eventName: string,
|
|
|
eventData: unknown,
|
|
|
cb: (data: unknown) => void
|
|
|
) => {
|
|
|
this.logger.info(
|
|
|
'[SocketIO]',
|
|
|
eventName,
|
|
|
'<=',
|
|
|
JSON.stringify(eventData)
|
|
|
);
|
|
|
|
|
|
// 检测是否允许调用
|
|
|
if (checkBlacklist(eventName)) {
|
|
|
const message = 'Not allowed request';
|
|
|
this.logger.warn('[SocketIO]', '=>', message);
|
|
|
cb({
|
|
|
result: false,
|
|
|
message,
|
|
|
});
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
// 接受任意消息, 并调用action
|
|
|
try {
|
|
|
const endpoint = this.broker.findNextActionEndpoint(eventName);
|
|
|
if (endpoint instanceof Error) {
|
|
|
if (endpoint instanceof Errors.ServiceNotFoundError) {
|
|
|
throw new ServiceUnavailableError();
|
|
|
}
|
|
|
|
|
|
throw endpoint;
|
|
|
}
|
|
|
|
|
|
if (
|
|
|
typeof endpoint.action.visibility === 'string' &&
|
|
|
endpoint.action.visibility !== 'published'
|
|
|
) {
|
|
|
throw new Errors.ServiceNotFoundError({
|
|
|
visibility: endpoint.action.visibility,
|
|
|
action: eventName,
|
|
|
});
|
|
|
}
|
|
|
|
|
|
if (endpoint.action.disableSocket === true) {
|
|
|
throw new Errors.ServiceNotFoundError({
|
|
|
disableSocket: true,
|
|
|
action: eventName,
|
|
|
});
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* TODO:
|
|
|
* 这里也许还可以被优化?看molecular的源码好像没有走远程调用这一步,但是没看懂如何实现的
|
|
|
* 需要研究一下
|
|
|
*/
|
|
|
const language = parseLanguageFromHead(
|
|
|
socket.handshake.headers['accept-language']
|
|
|
);
|
|
|
const data = await this.broker.call(eventName, eventData, {
|
|
|
meta: {
|
|
|
...socket.data,
|
|
|
socketId: socket.id,
|
|
|
language,
|
|
|
},
|
|
|
});
|
|
|
|
|
|
if (typeof cb === 'function') {
|
|
|
this.logger.debug(
|
|
|
'[SocketIO]',
|
|
|
eventName,
|
|
|
'=>',
|
|
|
JSON.stringify(data)
|
|
|
);
|
|
|
cb({ result: true, data });
|
|
|
}
|
|
|
} catch (err: unknown) {
|
|
|
const message = _.get(err, 'message', 'Service Error');
|
|
|
this.logger.debug('[SocketIO]', eventName, '=>', message);
|
|
|
this.logger.error('[SocketIO]', err);
|
|
|
cb({
|
|
|
result: false,
|
|
|
message,
|
|
|
});
|
|
|
}
|
|
|
}
|
|
|
);
|
|
|
});
|
|
|
},
|
|
|
async stopped(this: SocketIOService) {
|
|
|
if (this.io) {
|
|
|
this.io.close();
|
|
|
await Promise.all(this.socketCloseCallbacks.map((fn) => fn()));
|
|
|
}
|
|
|
this.logger.info('断开所有连接');
|
|
|
},
|
|
|
actions: {
|
|
|
joinRoom: {
|
|
|
visibility: 'public',
|
|
|
params: {
|
|
|
roomIds: 'array',
|
|
|
userId: [{ type: 'string', optional: true }], // 可选, 如果不填则为当前socket的id
|
|
|
},
|
|
|
async handler(
|
|
|
this: TcService,
|
|
|
ctx: TcContext<{ roomIds: string[]; userId?: string }>
|
|
|
) {
|
|
|
const roomIds = ctx.params.roomIds;
|
|
|
const userId = ctx.params.userId;
|
|
|
const searchId = isValidStr(userId)
|
|
|
? buildUserRoomId(userId)
|
|
|
: ctx.meta.socketId;
|
|
|
if (typeof searchId !== 'string') {
|
|
|
throw new Error(
|
|
|
'Unable to join the room, the query condition is invalid, please contact the administrator'
|
|
|
);
|
|
|
}
|
|
|
|
|
|
if (!Array.isArray(roomIds)) {
|
|
|
throw new Error(
|
|
|
'Unable to join the room, the parameter must be an array'
|
|
|
);
|
|
|
}
|
|
|
|
|
|
// 获取远程socket链接并加入
|
|
|
const io: SocketServer = this.io;
|
|
|
const remoteSockets = await io.in(searchId).fetchSockets();
|
|
|
if (remoteSockets.length === 0) {
|
|
|
this.logger.warn(
|
|
|
'Unable to join the room, unable to find the current socket link:',
|
|
|
searchId
|
|
|
);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
remoteSockets.forEach((rs) =>
|
|
|
rs.join(
|
|
|
roomIds.map(String) // 强制确保roomId为字符串,防止出现传个objectId类型的数据过来
|
|
|
)
|
|
|
);
|
|
|
},
|
|
|
},
|
|
|
leaveRoom: {
|
|
|
visibility: 'public',
|
|
|
params: {
|
|
|
roomIds: 'array',
|
|
|
userId: [{ type: 'string', optional: true }],
|
|
|
},
|
|
|
async handler(
|
|
|
this: TcService,
|
|
|
ctx: TcContext<{ roomIds: string[]; userId?: string }>
|
|
|
) {
|
|
|
const roomIds = ctx.params.roomIds;
|
|
|
const userId = ctx.params.userId;
|
|
|
const searchId = isValidStr(userId)
|
|
|
? buildUserRoomId(userId)
|
|
|
: ctx.meta.socketId;
|
|
|
if (typeof searchId !== 'string') {
|
|
|
this.logger.error(
|
|
|
'Unable to leave the room, the current socket connection does not exist'
|
|
|
);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
// 获取远程socket链接并离开
|
|
|
const io: SocketServer = this.io;
|
|
|
const remoteSockets = await io.in(searchId).fetchSockets();
|
|
|
if (remoteSockets.length === 0) {
|
|
|
this.logger.error(
|
|
|
`Can't leave room, can't find current socket link`
|
|
|
);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
remoteSockets.forEach((rs) => {
|
|
|
roomIds.forEach((roomId) => {
|
|
|
rs.leave(roomId);
|
|
|
});
|
|
|
});
|
|
|
},
|
|
|
},
|
|
|
|
|
|
/**
|
|
|
* 根据userId获取所有的用户链接
|
|
|
*/
|
|
|
fetchUserSocketIds: {
|
|
|
visibility: 'public',
|
|
|
params: {
|
|
|
userId: 'string',
|
|
|
},
|
|
|
async handler(
|
|
|
this: TcService,
|
|
|
ctx: TcContext<{ userId: string }>
|
|
|
): Promise<string[]> {
|
|
|
const userId = ctx.params.userId;
|
|
|
const io: SocketServer = this.io;
|
|
|
const remoteSockets = await io
|
|
|
.in(buildUserRoomId(userId))
|
|
|
.fetchSockets();
|
|
|
|
|
|
return remoteSockets.map((remoteSocket) => remoteSocket.id);
|
|
|
},
|
|
|
},
|
|
|
|
|
|
/**
|
|
|
* 获取userId获取所有的用户的token
|
|
|
*/
|
|
|
getUserSocketToken: {
|
|
|
visibility: 'public',
|
|
|
params: {
|
|
|
userId: 'string',
|
|
|
},
|
|
|
async handler(
|
|
|
this: TcService,
|
|
|
ctx: TcContext<{ userId: string }>
|
|
|
): Promise<string[]> {
|
|
|
const userId = ctx.params.userId;
|
|
|
const io: SocketServer = this.io;
|
|
|
const remoteSockets = await io
|
|
|
.in(buildUserRoomId(userId))
|
|
|
.fetchSockets();
|
|
|
|
|
|
return remoteSockets.map((remoteSocket) => remoteSocket.data.token);
|
|
|
},
|
|
|
},
|
|
|
|
|
|
/**
|
|
|
* 踢出用户
|
|
|
*/
|
|
|
tickUser: {
|
|
|
visibility: 'public',
|
|
|
params: {
|
|
|
userId: 'string',
|
|
|
},
|
|
|
async handler(this: TcService, ctx: TcContext<{ userId: string }>) {
|
|
|
const userId = ctx.params.userId;
|
|
|
const io: SocketServer = this.io;
|
|
|
const remoteSockets = await io
|
|
|
.in(buildUserRoomId(userId))
|
|
|
.fetchSockets();
|
|
|
|
|
|
remoteSockets.forEach((remoteSocket) => {
|
|
|
remoteSocket.disconnect(true);
|
|
|
});
|
|
|
},
|
|
|
},
|
|
|
|
|
|
/**
|
|
|
* 服务端通知
|
|
|
*/
|
|
|
notify: {
|
|
|
visibility: 'public',
|
|
|
params: {
|
|
|
type: 'string',
|
|
|
target: [
|
|
|
{ type: 'string', optional: true },
|
|
|
{ type: 'array', optional: true },
|
|
|
],
|
|
|
eventName: 'string',
|
|
|
eventData: 'any',
|
|
|
},
|
|
|
handler(
|
|
|
this: TcService,
|
|
|
ctx: PureContext<{
|
|
|
type: string;
|
|
|
target: string | string[];
|
|
|
eventName: string;
|
|
|
eventData: any;
|
|
|
}>
|
|
|
) {
|
|
|
const { type, target, eventName, eventData } = ctx.params;
|
|
|
const io: SocketServer = this.io;
|
|
|
if (type === 'unicast' && typeof target === 'string') {
|
|
|
// 单播
|
|
|
io.to(buildUserRoomId(target)).emit(eventName, eventData);
|
|
|
} else if (type === 'listcast' && Array.isArray(target)) {
|
|
|
// 列播
|
|
|
io.to(target.map((t) => buildUserRoomId(t))).emit(
|
|
|
eventName,
|
|
|
eventData
|
|
|
);
|
|
|
} else if (type === 'roomcast') {
|
|
|
// 组播
|
|
|
io.to(target).emit(eventName, eventData);
|
|
|
} else if (type === 'broadcast') {
|
|
|
// 广播
|
|
|
io.emit(eventName, eventData);
|
|
|
} else {
|
|
|
this.logger.warn(
|
|
|
'[SocketIO]',
|
|
|
'Unknown notify type or target',
|
|
|
type,
|
|
|
target
|
|
|
);
|
|
|
}
|
|
|
},
|
|
|
},
|
|
|
|
|
|
/**
|
|
|
* 检查用户在线状态
|
|
|
*/
|
|
|
checkUserOnline: {
|
|
|
params: {
|
|
|
userIds: 'array',
|
|
|
},
|
|
|
async handler(
|
|
|
this: TcService,
|
|
|
ctx: PureContext<{ userIds: string[] }>
|
|
|
) {
|
|
|
const userIds = ctx.params.userIds;
|
|
|
|
|
|
const status = await Promise.all(
|
|
|
userIds.map((userId) =>
|
|
|
(this.redis as RedisClient.Redis).exists(
|
|
|
buildUserOnlineKey(userId)
|
|
|
)
|
|
|
)
|
|
|
);
|
|
|
|
|
|
return status.map((d) => Boolean(d));
|
|
|
},
|
|
|
},
|
|
|
},
|
|
|
methods: {
|
|
|
initSocketIO() {
|
|
|
if (!this.server) {
|
|
|
throw new Errors.ServiceNotAvailableError(
|
|
|
'Need to use with [ApiGatewayMixin]'
|
|
|
);
|
|
|
}
|
|
|
this.io = new SocketServer(this.server, {
|
|
|
serveClient: false,
|
|
|
transports: ['websocket'],
|
|
|
cors: {
|
|
|
origin: '*',
|
|
|
methods: ['GET', 'POST'],
|
|
|
},
|
|
|
parser: options.disableMsgpack ? undefined : msgpackParser,
|
|
|
});
|
|
|
|
|
|
if (
|
|
|
isValidStr(process.env.ADMIN_USER) &&
|
|
|
isValidStr(process.env.ADMIN_PASS)
|
|
|
) {
|
|
|
this.logger.info('****************************************');
|
|
|
this.logger.info(`Detected that Admin management is enabled`);
|
|
|
this.logger.info('****************************************');
|
|
|
|
|
|
instrument(this.io, {
|
|
|
auth: {
|
|
|
type: 'basic',
|
|
|
username: process.env.ADMIN_USER,
|
|
|
password: bcrypt.hashSync(process.env.ADMIN_PASS, 10),
|
|
|
},
|
|
|
});
|
|
|
}
|
|
|
},
|
|
|
},
|
|
|
};
|
|
|
|
|
|
return schema;
|
|
|
};
|