Creating only one WS connection to pusher from admin

Also: migration to Typescript 4.5 and µWebsockets 1.20.4
This commit is contained in:
David Négrier 2021-12-01 10:12:07 +01:00 committed by Alexis Faizeau
parent 4875a8fed9
commit 4cff230256
8 changed files with 206 additions and 87 deletions

View file

@ -44,7 +44,7 @@ export class AdminController extends BaseController {
const roomId: string = body.roomId;
await apiClientRepository.getClient(roomId).then((roomClient) => {
return new Promise((res, rej) => {
return new Promise<void>((res, rej) => {
const roomMessage = new RefreshRoomPromptMessage();
roomMessage.setRoomid(roomId);
@ -101,7 +101,7 @@ export class AdminController extends BaseController {
await Promise.all(
targets.map((roomId) => {
return apiClientRepository.getClient(roomId).then((roomClient) => {
return new Promise((res, rej) => {
return new Promise<void>((res, rej) => {
if (type === "message") {
const roomMessage = new AdminRoomMessage();
roomMessage.setMessage(text);

View file

@ -0,0 +1,9 @@
/**
* Errors related to variable handling.
*/
export class InvalidTokenError extends Error {
constructor(message: string) {
super(message);
Object.setPrototypeOf(this, InvalidTokenError.prototype);
}
}

View file

@ -22,14 +22,17 @@ import {
import { UserMovesMessage } from "../Messages/generated/messages_pb";
import { TemplatedApp } from "uWebSockets.js";
import { parse } from "query-string";
import { jwtTokenManager, tokenInvalidException } from "../Services/JWTTokenManager";
import { AdminSocketTokenData, jwtTokenManager, tokenInvalidException } from "../Services/JWTTokenManager";
import { adminApi, FetchMemberDataByUuidResponse } from "../Services/AdminApi";
import { SocketManager, socketManager } from "../Services/SocketManager";
import { emitInBatch } from "../Services/IoSocketHelpers";
import { ADMIN_SOCKETS_TOKEN, ADMIN_API_URL, DISABLE_ANONYMOUS, SOCKET_IDLE_TIMER } from "../Enum/EnvironmentVariable";
import { ADMIN_API_URL, DISABLE_ANONYMOUS, SOCKET_IDLE_TIMER } from "../Enum/EnvironmentVariable";
import { Zone } from "_Model/Zone";
import { ExAdminSocketInterface } from "_Model/Websocket/ExAdminSocketInterface";
import { CharacterTexture } from "../Services/AdminApi/CharacterTexture";
import { isAdminMessageInterface } from "../Model/Websocket/Admin/AdminMessages";
import Axios from "axios";
import { InvalidTokenError } from "../Controller/InvalidTokenError";
export class IoSocketController {
private nextUserId: number = 1;
@ -42,59 +45,108 @@ export class IoSocketController {
adminRoomSocket() {
this.app.ws("/admin/rooms", {
upgrade: (res, req, context) => {
const query = parse(req.getQuery());
const websocketKey = req.getHeader("sec-websocket-key");
const websocketProtocol = req.getHeader("sec-websocket-protocol");
const websocketExtensions = req.getHeader("sec-websocket-extensions");
const token = query.token;
let authorizedRoomIds: string[];
try {
const data = jwtTokenManager.verifyAdminSocketToken(token as string);
authorizedRoomIds = data.authorizedRoomIds;
} catch (e) {
console.error("Admin access refused for token: " + token);
res.writeStatus("401 Unauthorized").end("Incorrect token");
return;
}
const roomId = query.roomId;
if (typeof roomId !== "string" || !authorizedRoomIds.includes(roomId)) {
console.error("Invalid room id");
res.writeStatus("403 Bad Request").end("Invalid room id");
return;
}
res.upgrade({ roomId }, websocketKey, websocketProtocol, websocketExtensions, context);
res.upgrade({}, websocketKey, websocketProtocol, websocketExtensions, context);
},
open: (ws) => {
console.log("Admin socket connect for room: " + ws.roomId);
console.log("Admin socket connect to client on " + Buffer.from(ws.getRemoteAddressAsText()).toString());
ws.disconnecting = false;
socketManager.handleAdminRoom(ws as ExAdminSocketInterface, ws.roomId as string);
},
message: (ws, arrayBuffer, isBinary): void => {
try {
//TODO refactor message type and data
const message: { event: string; message: { type: string; message: unknown; userUuid: string } } =
JSON.parse(new TextDecoder("utf-8").decode(new Uint8Array(arrayBuffer)));
const message = JSON.parse(new TextDecoder("utf-8").decode(new Uint8Array(arrayBuffer)));
if (message.event === "user-message") {
const messageToEmit = message.message as { message: string; type: string; userUuid: string };
if (messageToEmit.type === "banned") {
socketManager.emitBan(
messageToEmit.userUuid,
messageToEmit.message,
messageToEmit.type,
ws.roomId as string
if (!isAdminMessageInterface(message)) {
console.error("Invalid message received.", message);
ws.send(
JSON.stringify({
type: "Error",
data: {
message: "Invalid message received! The connection has been closed.",
},
})
);
ws.close();
return;
}
const token = message.jwt;
let data: AdminSocketTokenData;
try {
data = jwtTokenManager.verifyAdminSocketToken(token);
} catch (e) {
console.error("Admin socket access refused for token: " + token, e);
ws.send(
JSON.stringify({
type: "Error",
data: {
message: "Admin socket access refused! The connection has been closed.",
},
})
);
ws.close();
return;
}
const authorizedRoomIds = data.authorizedRoomIds;
if (message.event === "listen") {
const notAuthorizedRoom = message.roomIds.filter(
(roomId) => !authorizedRoomIds.includes(roomId)
);
if (notAuthorizedRoom.length > 0) {
const errorMessage = `Admin socket refused for client on ${Buffer.from(
ws.getRemoteAddressAsText()
).toString()} listening of : \n${JSON.stringify(notAuthorizedRoom)}`;
console.error();
ws.send(
JSON.stringify({
type: "Error",
data: {
message: errorMessage,
},
})
);
ws.close();
return;
}
if (messageToEmit.type === "ban") {
socketManager.emitSendUserMessage(
messageToEmit.userUuid,
messageToEmit.message,
messageToEmit.type,
ws.roomId as string
);
for (const roomId of message.roomIds) {
socketManager
.handleAdminRoom(ws as ExAdminSocketInterface, roomId)
.catch((e) => console.error(e));
}
} else if (message.event === "user-message") {
const messageToEmit = message.message;
// Get roomIds of the world where we want broadcast the message
const roomIds = authorizedRoomIds.filter(
(authorizeRoomId) => authorizeRoomId.split("/")[5] === message.world
);
for (const roomId of roomIds) {
if (messageToEmit.type === "banned") {
socketManager
.emitBan(messageToEmit.userUuid, messageToEmit.message, messageToEmit.type, roomId)
.catch((error) => console.error(error));
} else if (messageToEmit.type === "ban") {
socketManager
.emitSendUserMessage(
messageToEmit.userUuid,
messageToEmit.message,
messageToEmit.type,
roomId
)
.catch((error) => console.error(error));
}
}
} else {
const tmp: never = message.event;
}
} catch (err) {
console.error(err);
@ -202,28 +254,30 @@ export class IoSocketController {
try {
userData = await adminApi.fetchMemberDataByUuid(userIdentifier, roomId, IPAddress);
} catch (err) {
if (err?.response?.status == 404) {
// If we get an HTTP 404, the token is invalid. Let's perform an anonymous login!
if (Axios.isAxiosError(err)) {
if (err?.response?.status == 404) {
// If we get an HTTP 404, the token is invalid. Let's perform an anonymous login!
console.warn(
'Cannot find user with email "' +
(userIdentifier || "anonymous") +
'". Performing an anonymous login instead.'
);
} else if (err?.response?.status == 403) {
// If we get an HTTP 403, the world is full. We need to broadcast a special error to the client.
// we finish immediately the upgrade then we will close the socket as soon as it starts opening.
return res.upgrade(
{
rejected: true,
message: err?.response?.data.message,
status: err?.response?.status,
},
websocketKey,
websocketProtocol,
websocketExtensions,
context
);
console.warn(
'Cannot find user with email "' +
(userIdentifier || "anonymous") +
'". Performing an anonymous login instead.'
);
} else if (err?.response?.status == 403) {
// If we get an HTTP 403, the world is full. We need to broadcast a special error to the client.
// we finish immediately the upgrade then we will close the socket as soon as it starts opening.
return res.upgrade(
{
rejected: true,
message: err?.response?.data.message,
status: err?.response?.status,
},
websocketKey,
websocketProtocol,
websocketExtensions,
context
);
}
} else {
throw err;
}
@ -302,17 +356,31 @@ export class IoSocketController {
context
);
} catch (e) {
res.upgrade(
{
rejected: true,
reason: e.reason || null,
message: e.message ? e.message : "500 Internal Server Error",
},
websocketKey,
websocketProtocol,
websocketExtensions,
context
);
if (e instanceof Error) {
res.upgrade(
{
rejected: true,
reason: e instanceof InvalidTokenError ? tokenInvalidException : null,
message: e.message,
},
websocketKey,
websocketProtocol,
websocketExtensions,
context
);
} else {
res.upgrade(
{
rejected: true,
reason: null,
message: "500 Internal Server Error",
},
websocketKey,
websocketProtocol,
websocketExtensions,
context
);
}
}
})();
},

View file

@ -0,0 +1,30 @@
import * as tg from "generic-type-guard";
export const isBanBannedAdminMessageInterface = new tg.IsInterface()
.withProperties({
type: tg.isSingletonStringUnion("ban", "banned"),
message: tg.isString,
userUuid: tg.isString,
})
.get();
export const isUserMessageAdminMessageInterface = new tg.IsInterface()
.withProperties({
event: tg.isSingletonString("user-message"),
message: isBanBannedAdminMessageInterface,
world: tg.isString,
jwt: tg.isString,
})
.get();
export const isListenRoomsMessageInterface = new tg.IsInterface()
.withProperties({
event: tg.isSingletonString("listen"),
roomIds: tg.isArray(tg.isString),
jwt: tg.isString,
})
.get();
export const isAdminMessageInterface = tg.isUnion(isUserMessageAdminMessageInterface, isListenRoomsMessageInterface);
export type AdminMessageInterface = tg.GuardedType<typeof isAdminMessageInterface>;

View file

@ -3,6 +3,7 @@ import { uuid } from "uuidv4";
import Jwt, { verify } from "jsonwebtoken";
import { TokenInterface } from "../Controller/AuthenticateController";
import { adminApi, AdminBannedData } from "../Services/AdminApi";
import { InvalidTokenError } from "../Controller/InvalidTokenError";
export interface AuthTokenData {
identifier: string; //will be a email if logged in or an uuid if anonymous
@ -26,7 +27,12 @@ class JWTTokenManager {
try {
return Jwt.verify(token, SECRET_KEY, { ignoreExpiration }) as AuthTokenData;
} catch (e) {
throw { reason: tokenInvalidException, message: e.message };
if (e instanceof Error) {
// FIXME: we are loosing the stacktrace here.
throw new InvalidTokenError(e.message);
} else {
throw e;
}
}
}
}

View file

@ -132,6 +132,12 @@ export class SocketManager implements ZoneEventListener {
const message = new AdminPusherToBackMessage();
message.setSubscribetoroom(roomId);
console.log(
`Admin socket handle room ${roomId} connections for a client on ${Buffer.from(
client.getRemoteAddressAsText()
).toString()}`
);
adminRoomStream.write(message);
}