Adding a Pusher container as a middleware/dispatcher between front and back

This commit is contained in:
David Négrier 2020-11-13 18:00:22 +01:00
parent 6c6d046891
commit 4c1e566a6c
86 changed files with 12172 additions and 983 deletions

View file

@ -0,0 +1,7 @@
import {MessageUserPosition} from "../Model/Websocket/MessageUserPosition";
export interface Distance {
distance: number,
first: MessageUserPosition,
second: MessageUserPosition,
}

138
pusher/src/Model/Group.ts Normal file
View file

@ -0,0 +1,138 @@
import { ConnectCallback, DisconnectCallback } from "./PusherRoom";
import { User } from "./User";
import {PositionInterface} from "_Model/PositionInterface";
import {Movable} from "_Model/Movable";
import {PositionDispatcher} from "_Model/PositionDispatcher";
import {gaugeManager} from "../Services/GaugeManager";
export class Group implements Movable {
static readonly MAX_PER_GROUP = 4;
private static nextId: number = 1;
private id: number;
private users: Set<User>;
private x!: number;
private y!: number;
private hasEditedGauge: boolean = false;
private wasDestroyed: boolean = false;
private roomId: string;
constructor(roomId: string, users: User[], private connectCallback: ConnectCallback, private disconnectCallback: DisconnectCallback, private positionNotifier: PositionDispatcher) {
this.roomId = roomId;
this.users = new Set<User>();
this.id = Group.nextId;
Group.nextId++;
//we only send a event for prometheus metrics if the group lives more than 5 seconds
setTimeout(() => {
if (!this.wasDestroyed) {
this.hasEditedGauge = true;
gaugeManager.incNbGroupsPerRoomGauge(roomId);
}
}, 5000);
users.forEach((user: User) => {
this.join(user);
});
this.updatePosition();
}
getUsers(): User[] {
return Array.from(this.users.values());
}
getId() : number {
return this.id;
}
/**
* Returns the barycenter of all users (i.e. the center of the group)
*/
getPosition(): PositionInterface {
return {
x: this.x,
y: this.y
};
}
/**
* Computes the barycenter of all users (i.e. the center of the group)
*/
updatePosition(): void {
const oldX = this.x;
const oldY = this.y;
let x = 0;
let y = 0;
// Let's compute the barycenter of all users.
this.users.forEach((user: User) => {
const position = user.getPosition();
x += position.x;
y += position.y;
});
x /= this.users.size;
y /= this.users.size;
if (this.users.size === 0) {
throw new Error("EMPTY GROUP FOUND!!!");
}
this.x = x;
this.y = y;
if (oldX === undefined) {
this.positionNotifier.enter(this);
} else {
this.positionNotifier.updatePosition(this, {x, y}, {x: oldX, y: oldY});
}
}
isFull(): boolean {
return this.users.size >= Group.MAX_PER_GROUP;
}
isEmpty(): boolean {
return this.users.size <= 1;
}
join(user: User): void
{
// Broadcast on the right event
this.connectCallback(user, this);
this.users.add(user);
user.group = this;
}
leave(user: User): void
{
const success = this.users.delete(user);
if (success === false) {
throw new Error("Could not find user "+user.id+" in the group "+this.id);
}
user.group = undefined;
if (this.users.size !== 0) {
this.updatePosition();
}
// Broadcast on the right event
this.disconnectCallback(user, this);
}
/**
* Let's kick everybody out.
* Usually used when there is only one user left.
*/
destroy(): void
{
if (this.hasEditedGauge) gaugeManager.decNbGroupsPerRoomGauge(this.roomId);
for (const user of this.users) {
this.leave(user);
}
this.wasDestroyed = true;
}
get getSize(){
return this.users.size;
}
}

View file

@ -0,0 +1,8 @@
import {PositionInterface} from "_Model/PositionInterface";
/**
* A physical object that can be placed into a Zone
*/
export interface Movable {
getPosition(): PositionInterface
}

View file

@ -0,0 +1,120 @@
/**
* Tracks the position of every player on the map, and sends notifications to the players interested in knowing about the move
* (i.e. players that are looking at the zone the player is currently in)
*
* Internally, the PositionNotifier works with Zones. A zone is a square area of a map.
* Each player is in a given zone, and each player tracks one or many zones (depending on the player viewport)
*
* The PositionNotifier is important for performance. It allows us to send the position of players only to a restricted
* number of players around the current player.
*/
import {Zone, ZoneEventListener} from "./Zone";
import {ViewportInterface} from "_Model/Websocket/ViewportMessage";
import {ExSocketInterface} from "_Model/Websocket/ExSocketInterface";
//import Debug from "debug";
//const debug = Debug('positiondispatcher');
interface ZoneDescriptor {
i: number;
j: number;
}
export class PositionDispatcher {
// TODO: we need a way to clean the zones if noone is in the zone and noone listening (to free memory!)
private zones: Zone[][] = [];
constructor(public readonly roomId: string, private zoneWidth: number, private zoneHeight: number, private socketListener: ZoneEventListener) {
}
private getZoneDescriptorFromCoordinates(x: number, y: number): ZoneDescriptor {
return {
i: Math.floor(x / this.zoneWidth),
j: Math.floor(y / this.zoneHeight),
}
}
/**
* Sets the viewport coordinates.
*/
public setViewport(socket: ExSocketInterface, viewport: ViewportInterface): void {
if (viewport.left > viewport.right || viewport.top > viewport.bottom) {
console.warn('Invalid viewport received: ', viewport);
return;
}
const oldZones = socket.listenedZones;
const newZones = new Set<Zone>();
const topLeftDesc = this.getZoneDescriptorFromCoordinates(viewport.left, viewport.top);
const bottomRightDesc = this.getZoneDescriptorFromCoordinates(viewport.right, viewport.bottom);
for (let j = topLeftDesc.j; j <= bottomRightDesc.j; j++) {
for (let i = topLeftDesc.i; i <= bottomRightDesc.i; i++) {
newZones.add(this.getZone(i, j));
}
}
const addedZones = [...newZones].filter(x => !oldZones.has(x));
const removedZones = [...oldZones].filter(x => !newZones.has(x));
for (const zone of addedZones) {
zone.startListening(socket);
}
for (const zone of removedZones) {
this.stopListening(zone, socket);
}
}
private stopListening(zone: Zone, socket: ExSocketInterface): void {
zone.stopListening(socket);
if (!zone.hasListeners()) {
zone.close();
this.deleteZone(zone);
}
}
/**
* Removes the zone from the dispatcher.
* Warning, zone is not closed by this method.
*/
private deleteZone(zone: Zone): void {
delete this.zones[zone.y][zone.x];
if (Object.keys(this.zones[zone.y]).length === 0) {
delete this.zones[zone.y];
}
}
public removeViewport(socket: ExSocketInterface): void {
// Also, let's stop listening on viewports
for (const zone of socket.listenedZones) {
this.stopListening(zone, socket);
}
}
public isEmpty(): boolean {
return Object.keys(this.zones).length === 0;
}
private getZone(i: number, j: number): Zone {
let zoneRow = this.zones[j];
if (zoneRow === undefined) {
zoneRow = new Array<Zone>();
this.zones[j] = zoneRow;
}
let zone = this.zones[j][i];
if (zone === undefined) {
zone = new Zone(this, this.socketListener, i, j, (e, myZone) => {
// On failure, we delete the zone from the dispatcher so it can be recreated later.
this.deleteZone(myZone);
// TODO: we should check if the position dispatcher is still containing a room and propagate the onFailure to the parent if it is empty.
});
zone.init();
this.zones[j][i] = zone;
}
return zone;
}
}

View file

@ -0,0 +1,4 @@
export interface PositionInterface {
x: number,
y: number
}

View file

@ -0,0 +1,68 @@
import {PointInterface} from "./Websocket/PointInterface";
import {Group} from "./Group";
import {User} from "./User";
import {ExSocketInterface} from "_Model/Websocket/ExSocketInterface";
import {PositionInterface} from "_Model/PositionInterface";
import {Identificable} from "_Model/Websocket/Identificable";
import {PositionDispatcher} from "./PositionDispatcher";
import {ViewportInterface} from "_Model/Websocket/ViewportMessage";
import {Movable} from "_Model/Movable";
import {extractDataFromPrivateRoomId, extractRoomSlugPublicRoomId, isRoomAnonymous} from "./RoomIdentifier";
import {arrayIntersect} from "../Services/ArrayHelper";
import {MAX_USERS_PER_ROOM} from "../Enum/EnvironmentVariable";
import {ZoneEventListener} from "_Model/Zone";
export type ConnectCallback = (user: User, group: Group) => void;
export type DisconnectCallback = (user: User, group: Group) => void;
export enum GameRoomPolicyTypes {
ANONYMUS_POLICY = 1,
MEMBERS_ONLY_POLICY,
USE_TAGS_POLICY,
}
export class PusherRoom {
private readonly positionNotifier: PositionDispatcher;
public readonly anonymous: boolean;
public tags: string[];
public policyType: GameRoomPolicyTypes;
public readonly roomSlug: string;
public readonly worldSlug: string = '';
public readonly organizationSlug: string = '';
constructor(public readonly roomId: string,
private socketListener: ZoneEventListener)
{
this.anonymous = isRoomAnonymous(roomId);
this.tags = [];
this.policyType = GameRoomPolicyTypes.ANONYMUS_POLICY;
if (this.anonymous) {
this.roomSlug = extractRoomSlugPublicRoomId(this.roomId);
} else {
const {organizationSlug, worldSlug, roomSlug} = extractDataFromPrivateRoomId(this.roomId);
this.roomSlug = roomSlug;
this.organizationSlug = organizationSlug;
this.worldSlug = worldSlug;
}
// A zone is 10 sprites wide.
this.positionNotifier = new PositionDispatcher(this.roomId, 320, 320, this.socketListener);
}
public setViewport(socket : ExSocketInterface, viewport: ViewportInterface): void {
this.positionNotifier.setViewport(socket, viewport);
}
public leave(socket : ExSocketInterface){
this.positionNotifier.removeViewport(socket);
}
public canAccess(userTags: string[]): boolean {
return arrayIntersect(userTags, this.tags);
}
public isEmpty(): boolean {
return this.positionNotifier.isEmpty();
}
}

View file

@ -0,0 +1,30 @@
//helper functions to parse room IDs
export const isRoomAnonymous = (roomID: string): boolean => {
if (roomID.startsWith('_/')) {
return true;
} else if(roomID.startsWith('@/')) {
return false;
} else {
throw new Error('Incorrect room ID: '+roomID);
}
}
export const extractRoomSlugPublicRoomId = (roomId: string): string => {
const idParts = roomId.split('/');
if (idParts.length < 3) throw new Error('Incorrect roomId: '+roomId);
return idParts.slice(2).join('/');
}
export interface extractDataFromPrivateRoomIdResponse {
organizationSlug: string;
worldSlug: string;
roomSlug: string;
}
export const extractDataFromPrivateRoomId = (roomId: string): extractDataFromPrivateRoomIdResponse => {
const idParts = roomId.split('/');
if (idParts.length < 4) throw new Error('Incorrect roomId: '+roomId);
const organizationSlug = idParts[1];
const worldSlug = idParts[2];
const roomSlug = idParts[3];
return {organizationSlug, worldSlug, roomSlug}
}

35
pusher/src/Model/User.ts Normal file
View file

@ -0,0 +1,35 @@
import { Group } from "./Group";
import { PointInterface } from "./Websocket/PointInterface";
import {Zone} from "_Model/Zone";
import {Movable} from "_Model/Movable";
import {PositionInterface} from "_Model/PositionInterface";
import {PositionDispatcher} from "_Model/PositionDispatcher";
import {ExSocketInterface} from "_Model/Websocket/ExSocketInterface";
export class User implements Movable {
public listenedZones: Set<Zone>;
public group?: Group;
public constructor(
public id: number,
public uuid: string,
private position: PointInterface,
public silent: boolean,
private positionNotifier: PositionDispatcher,
public readonly socket: ExSocketInterface
) {
this.listenedZones = new Set<Zone>();
this.positionNotifier.enter(this);
}
public getPosition(): PointInterface {
return this.position;
}
public setPosition(position: PointInterface): void {
const oldPosition = this.position;
this.position = position;
this.positionNotifier.updatePosition(this, position, oldPosition);
}
}

View file

@ -0,0 +1,42 @@
import {PointInterface} from "./PointInterface";
import {Identificable} from "./Identificable";
import {ViewportInterface} from "_Model/Websocket/ViewportMessage";
import {
BatchMessage,
PusherToBackMessage,
ServerToClientMessage,
SubMessage
} from "../../Messages/generated/messages_pb";
import {WebSocket} from "uWebSockets.js"
import {CharacterTexture} from "../../Services/AdminApi";
import {ClientDuplexStream} from "grpc";
import {Zone} from "_Model/Zone";
export type BackConnection = ClientDuplexStream<PusherToBackMessage, ServerToClientMessage>;
export interface CharacterLayer {
name: string,
url: string|undefined
}
export interface ExSocketInterface extends WebSocket, Identificable {
token: string;
roomId: string;
//userId: number; // A temporary (autoincremented) identifier for this user
userUuid: string; // A unique identifier for this user
name: string;
characterLayers: CharacterLayer[];
position: PointInterface;
viewport: ViewportInterface;
/**
* Pushes an event that will be sent in the next batch of events
*/
emitInBatch: (payload: SubMessage) => void;
batchedMessages: BatchMessage;
batchTimeout: NodeJS.Timeout|null;
disconnecting: boolean,
tags: string[],
textures: CharacterTexture[],
backConnection: BackConnection,
listenedZones: Set<Zone>;
}

View file

@ -0,0 +1,6 @@
import {PositionInterface} from "_Model/PositionInterface";
export interface GroupUpdateInterface {
position: PositionInterface,
groupId: number,
}

View file

@ -0,0 +1,3 @@
export interface Identificable {
userId: number;
}

View file

@ -0,0 +1,10 @@
import * as tg from "generic-type-guard";
export const isItemEventMessageInterface =
new tg.IsInterface().withProperties({
itemId: tg.isNumber,
event: tg.isString,
state: tg.isUnknown,
parameters: tg.isUnknown,
}).get();
export type ItemEventMessageInterface = tg.GuardedType<typeof isItemEventMessageInterface>;

View file

@ -0,0 +1,11 @@
import * as tg from "generic-type-guard";
import {isPointInterface} from "./PointInterface";
import {isViewport} from "./ViewportMessage";
export const isJoinRoomMessageInterface =
new tg.IsInterface().withProperties({
roomId: tg.isString,
position: isPointInterface,
viewport: isViewport
}).get();
export type JoinRoomMessageInterface = tg.GuardedType<typeof isJoinRoomMessageInterface>;

View file

@ -0,0 +1,6 @@
import {PointInterface} from "_Model/Websocket/PointInterface";
export class MessageUserJoined {
constructor(public userId: number, public name: string, public characterLayers: string[], public position: PointInterface) {
}
}

View file

@ -0,0 +1,11 @@
import {PointInterface} from "./PointInterface";
export class Point implements PointInterface{
constructor(public x : number, public y : number, public direction : string = "none", public moving : boolean = false) {
}
}
export class MessageUserPosition {
constructor(public userId: number, public name: string, public characterLayers: string[], public position: PointInterface) {
}
}

View file

@ -0,0 +1,17 @@
import * as tg from "generic-type-guard";
/*export interface PointInterface {
readonly x: number;
readonly y: number;
readonly direction: string;
readonly moving: boolean;
}*/
export const isPointInterface =
new tg.IsInterface().withProperties({
x: tg.isNumber,
y: tg.isNumber,
direction: tg.isString,
moving: tg.isBoolean
}).get();
export type PointInterface = tg.GuardedType<typeof isPointInterface>;

View file

@ -0,0 +1,108 @@
import {PointInterface} from "./PointInterface";
import {
CharacterLayerMessage,
ItemEventMessage,
PointMessage,
PositionMessage
} from "../../Messages/generated/messages_pb";
import {CharacterLayer, ExSocketInterface} from "_Model/Websocket/ExSocketInterface";
import Direction = PositionMessage.Direction;
import {ItemEventMessageInterface} from "_Model/Websocket/ItemEventMessage";
import {PositionInterface} from "_Model/PositionInterface";
export class ProtobufUtils {
public static toPositionMessage(point: PointInterface): PositionMessage {
let direction: Direction;
switch (point.direction) {
case 'up':
direction = Direction.UP;
break;
case 'down':
direction = Direction.DOWN;
break;
case 'left':
direction = Direction.LEFT;
break;
case 'right':
direction = Direction.RIGHT;
break;
default:
throw new Error('unexpected direction');
}
const position = new PositionMessage();
position.setX(point.x);
position.setY(point.y);
position.setMoving(point.moving);
position.setDirection(direction);
return position;
}
public static toPointInterface(position: PositionMessage): PointInterface {
let direction: string;
switch (position.getDirection()) {
case Direction.UP:
direction = 'up';
break;
case Direction.DOWN:
direction = 'down';
break;
case Direction.LEFT:
direction = 'left';
break;
case Direction.RIGHT:
direction = 'right';
break;
default:
throw new Error("Unexpected direction");
}
// sending to all clients in room except sender
return {
x: position.getX(),
y: position.getY(),
direction,
moving: position.getMoving(),
};
}
public static toPointMessage(point: PositionInterface): PointMessage {
const position = new PointMessage();
position.setX(Math.floor(point.x));
position.setY(Math.floor(point.y));
return position;
}
public static toItemEvent(itemEventMessage: ItemEventMessage): ItemEventMessageInterface {
return {
itemId: itemEventMessage.getItemid(),
event: itemEventMessage.getEvent(),
parameters: JSON.parse(itemEventMessage.getParametersjson()),
state: JSON.parse(itemEventMessage.getStatejson()),
}
}
public static toItemEventProtobuf(itemEvent: ItemEventMessageInterface): ItemEventMessage {
const itemEventMessage = new ItemEventMessage();
itemEventMessage.setItemid(itemEvent.itemId);
itemEventMessage.setEvent(itemEvent.event);
itemEventMessage.setParametersjson(JSON.stringify(itemEvent.parameters));
itemEventMessage.setStatejson(JSON.stringify(itemEvent.state));
return itemEventMessage;
}
public static toCharacterLayerMessages(characterLayers: CharacterLayer[]): CharacterLayerMessage[] {
return characterLayers.map(function(characterLayer): CharacterLayerMessage {
const message = new CharacterLayerMessage();
message.setName(characterLayer.name);
if (characterLayer.url) {
message.setUrl(characterLayer.url);
}
return message;
});
}
}

View file

@ -0,0 +1,8 @@
import * as tg from "generic-type-guard";
export const isSetPlayerDetailsMessage =
new tg.IsInterface().withProperties({
name: tg.isString,
characterLayers: tg.isArray(tg.isString)
}).get();
export type SetPlayerDetailsMessage = tg.GuardedType<typeof isSetPlayerDetailsMessage>;

View file

@ -0,0 +1,5 @@
export interface UserInGroupInterface {
userId: number,
name: string,
initiator: boolean
}

View file

@ -0,0 +1,10 @@
import * as tg from "generic-type-guard";
export const isViewport =
new tg.IsInterface().withProperties({
left: tg.isNumber,
top: tg.isNumber,
right: tg.isNumber,
bottom: tg.isNumber,
}).get();
export type ViewportInterface = tg.GuardedType<typeof isViewport>;

View file

@ -0,0 +1,18 @@
import * as tg from "generic-type-guard";
export const isSignalData =
new tg.IsInterface().withProperties({
type: tg.isOptional(tg.isString)
}).get();
export const isWebRtcSignalMessageInterface =
new tg.IsInterface().withProperties({
receiverId: tg.isNumber,
signal: isSignalData
}).get();
export const isWebRtcScreenSharingStartMessageInterface =
new tg.IsInterface().withProperties({
userId: tg.isNumber,
roomId: tg.isString
}).get();
export type WebRtcSignalMessageInterface = tg.GuardedType<typeof isWebRtcSignalMessageInterface>;

333
pusher/src/Model/Zone.ts Normal file
View file

@ -0,0 +1,333 @@
import {ExSocketInterface} from "./Websocket/ExSocketInterface";
import {apiClientRepository} from "../Services/ApiClientRepository";
import {
BatchToPusherMessage,
CharacterLayerMessage, GroupLeftZoneMessage, GroupUpdateMessage, GroupUpdateZoneMessage,
PointMessage, PositionMessage, UserJoinedMessage,
UserJoinedZoneMessage, UserLeftZoneMessage, UserMovedMessage,
ZoneMessage
} from "../Messages/generated/messages_pb";
import * as messages_pb from "../Messages/generated/messages_pb";
import {ClientReadableStream} from "grpc";
import {PositionDispatcher} from "_Model/PositionDispatcher";
import {socketManager} from "../Services/SocketManager";
import {ProtobufUtils} from "_Model/Websocket/ProtobufUtils";
import Debug from "debug";
const debug = Debug("zone");
export interface ZoneEventListener {
onUserEnters(user: UserDescriptor, listener: ExSocketInterface): void;
onUserMoves(user: UserDescriptor, listener: ExSocketInterface): void;
onUserLeaves(userId: number, listener: ExSocketInterface): void;
onGroupEnters(group: GroupDescriptor, listener: ExSocketInterface): void;
onGroupMoves(group: GroupDescriptor, listener: ExSocketInterface): void;
onGroupLeaves(groupId: number, listener: ExSocketInterface): void;
}
/*export type EntersCallback = (thing: Movable, listener: User) => void;
export type MovesCallback = (thing: Movable, position: PositionInterface, listener: User) => void;
export type LeavesCallback = (thing: Movable, listener: User) => void;*/
export class UserDescriptor {
private constructor(public readonly userId: number, private name: string, private characterLayers: CharacterLayerMessage[], private position: PositionMessage) {
if (!Number.isInteger(this.userId)) {
throw new Error('UserDescriptor.userId is not an integer: '+this.userId);
}
}
public static createFromUserJoinedZoneMessage(message: UserJoinedZoneMessage): UserDescriptor {
const position = message.getPosition();
if (position === undefined) {
throw new Error('Missing position');
}
return new UserDescriptor(message.getUserid(), message.getName(), message.getCharacterlayersList(), position);
}
public update(userMovedMessage: UserMovedMessage) {
const position = userMovedMessage.getPosition();
if (position === undefined) {
throw new Error('Missing position');
}
this.position = position;
}
public toUserJoinedMessage(): UserJoinedMessage {
const userJoinedMessage = new UserJoinedMessage();
userJoinedMessage.setUserid(this.userId);
userJoinedMessage.setName(this.name);
userJoinedMessage.setCharacterlayersList(this.characterLayers);
userJoinedMessage.setPosition(this.position);
return userJoinedMessage;
}
public toUserMovedMessage(): UserMovedMessage {
const userMovedMessage = new UserMovedMessage();
userMovedMessage.setUserid(this.userId);
userMovedMessage.setPosition(this.position);
return userMovedMessage;
}
}
export class GroupDescriptor {
private constructor(public readonly groupId: number, private groupSize: number, private position: PointMessage) {
}
public static createFromGroupUpdateZoneMessage(message: GroupUpdateZoneMessage): GroupDescriptor {
const position = message.getPosition();
if (position === undefined) {
throw new Error('Missing position');
}
return new GroupDescriptor(message.getGroupid(), message.getGroupsize(), position);
}
public update(groupDescriptor: GroupDescriptor) {
this.groupSize = groupDescriptor.groupSize;
this.position = groupDescriptor.position;
}
public toGroupUpdateMessage(): GroupUpdateMessage {
const groupUpdateMessage = new GroupUpdateMessage();
if (!Number.isInteger(this.groupId)) {
throw new Error('GroupDescriptor.groupId is not an integer: '+this.groupId);
}
groupUpdateMessage.setGroupid(this.groupId);
groupUpdateMessage.setGroupsize(this.groupSize);
groupUpdateMessage.setPosition(this.position);
return groupUpdateMessage;
}
}
interface ZoneDescriptor {
x: number,
y: number
}
export class Zone {
//private things: Set<Movable> = new Set<Movable>();
private users: Map<number, UserDescriptor> = new Map<number, UserDescriptor>();
private groups: Map<number, GroupDescriptor> = new Map<number, GroupDescriptor>();
private listeners: Set<ExSocketInterface> = new Set<ExSocketInterface>();
private backConnection!: ClientReadableStream<BatchToPusherMessage>;
private isClosing: boolean = false;
constructor(private positionDispatcher: PositionDispatcher, private socketListener: ZoneEventListener, public readonly x: number, public readonly y: number, private onBackFailure: (e: Error|null, zone: Zone) => void) {
}
/**
* Creates a connection to the back server to track the users.
*/
public async init(): Promise<void> {
debug('Opening connection to zone %d, %d on back server', this.x, this.y);
const apiClient = await apiClientRepository.getClient(this.positionDispatcher.roomId);
const zoneMessage = new ZoneMessage();
zoneMessage.setRoomid(this.positionDispatcher.roomId);
zoneMessage.setX(this.x);
zoneMessage.setY(this.y);
this.backConnection = apiClient.listenZone(zoneMessage);
this.backConnection.on('data', (batch: BatchToPusherMessage) => {
for (const message of batch.getPayloadList()) {
if (message.hasUserjoinedzonemessage()) {
const userJoinedZoneMessage = message.getUserjoinedzonemessage() as UserJoinedZoneMessage;
const userDescriptor = UserDescriptor.createFromUserJoinedZoneMessage(userJoinedZoneMessage);
this.users.set(userJoinedZoneMessage.getUserid(), userDescriptor);
const fromZone = userJoinedZoneMessage.getFromzone();
this.notifyUserEnter(userDescriptor, fromZone?.toObject());
} else if (message.hasGroupupdatezonemessage()) {
const groupUpdateZoneMessage = message.getGroupupdatezonemessage() as GroupUpdateZoneMessage;
const groupDescriptor = GroupDescriptor.createFromGroupUpdateZoneMessage(groupUpdateZoneMessage);
// Do we have it already?
const groupId = groupUpdateZoneMessage.getGroupid();
const oldGroupDescriptor = this.groups.get(groupId);
if (oldGroupDescriptor !== undefined) {
oldGroupDescriptor.update(groupDescriptor);
this.notifyGroupMove(groupDescriptor);
} else {
this.groups.set(groupId, groupDescriptor);
const fromZone = groupUpdateZoneMessage.getFromzone();
this.notifyGroupEnter(groupDescriptor, fromZone?.toObject());
}
} else if (message.hasUserleftzonemessage()) {
const userLeftMessage = message.getUserleftzonemessage() as UserLeftZoneMessage;
this.users.delete(userLeftMessage.getUserid());
this.notifyUserLeft(userLeftMessage.getUserid(), userLeftMessage.getTozone()?.toObject());
} else if (message.hasGroupleftzonemessage()) {
const groupLeftMessage = message.getGroupleftzonemessage() as GroupLeftZoneMessage;
this.groups.delete(groupLeftMessage.getGroupid());
this.notifyGroupLeft(groupLeftMessage.getGroupid(), groupLeftMessage.getTozone()?.toObject());
} else if (message.hasUsermovedmessage()) {
const userMovedMessage = message.getUsermovedmessage() as UserMovedMessage;
const userId = userMovedMessage.getUserid();
const userDescriptor = this.users.get(userId);
if (userDescriptor === undefined) {
console.error('Unexpected move message received for user "'+userId+'"');
return;
}
userDescriptor.update(userMovedMessage);
this.notifyUserMove(userDescriptor);
} else {
throw new Error('Unexpected message');
}
}
});
this.backConnection.on('error', (e) => {
if (!this.isClosing) {
debug('Error on back connection')
this.close();
this.onBackFailure(e, this);
}
});
this.backConnection.on('close', () => {
if (!this.isClosing) {
debug('Close on back connection')
this.close();
this.onBackFailure(null, this);
}
});
}
public close(): void {
debug('Closing connection to zone %d, %d on back server', this.x, this.y);
this.isClosing = true;
this.backConnection.cancel();
}
public hasListeners(): boolean {
return this.listeners.size !== 0;
}
/**
* Notify listeners of this zone that this user entered
*/
private notifyUserEnter(user: UserDescriptor, oldZone: ZoneDescriptor|undefined) {
for (const listener of this.listeners) {
if (listener.userId === user.userId) {
continue;
}
if (oldZone === undefined || !this.isListeningZone(listener, oldZone.x, oldZone.y)) {
this.socketListener.onUserEnters(user, listener);
} else {
this.socketListener.onUserMoves(user, listener);
}
}
}
/**
* Notify listeners of this zone that this group entered
*/
private notifyGroupEnter(group: GroupDescriptor, oldZone: ZoneDescriptor|undefined) {
for (const listener of this.listeners) {
if (oldZone === undefined || !this.isListeningZone(listener, oldZone.x, oldZone.y)) {
this.socketListener.onGroupEnters(group, listener);
} else {
this.socketListener.onGroupMoves(group, listener);
}
}
}
/**
* Notify listeners of this zone that this user left
*/
private notifyUserLeft(userId: number, newZone: ZoneDescriptor|undefined) {
for (const listener of this.listeners) {
if (listener.userId === userId) {
continue;
}
if (newZone === undefined || !this.isListeningZone(listener, newZone.x, newZone.y)) {
this.socketListener.onUserLeaves(userId, listener);
} else {
// Do not send a signal. The move event will be triggered when joining the new room.
}
}
}
/**
* Notify listeners of this zone that this group left
*/
private notifyGroupLeft(groupId: number, newZone: ZoneDescriptor|undefined) {
for (const listener of this.listeners) {
if (listener.groupId === groupId) {
continue;
}
if (newZone === undefined || !this.isListeningZone(listener, newZone.x, newZone.y)) {
this.socketListener.onGroupLeaves(groupId, listener);
} else {
// Do not send a signal. The move event will be triggered when joining the new room.
}
}
}
private isListeningZone(socket: ExSocketInterface, x: number, y: number): boolean {
// TODO: improve efficiency by not doing a full scan of listened zones.
for (const zone of socket.listenedZones) {
if (zone.x === x && zone.y === y) {
return true;
}
}
return false;
}
private notifyGroupMove(groupDescriptor: GroupDescriptor) {
for (const listener of this.listeners) {
this.socketListener.onGroupMoves(groupDescriptor, listener);
}
}
private notifyUserMove(userDescriptor: UserDescriptor) {
for (const listener of this.listeners) {
if (listener.userId === userDescriptor.userId) {
continue;
}
this.socketListener.onUserMoves(userDescriptor, listener);
}
}
public startListening(listener: ExSocketInterface): void {
for (const [userId, user] of this.users.entries()) {
if (userId !== listener.userId) {
this.socketListener.onUserEnters(user, listener);
}
}
for (const [groupId, group] of this.groups.entries()) {
this.socketListener.onGroupEnters(group, listener);
}
this.listeners.add(listener);
listener.listenedZones.add(this);
}
public stopListening(listener: ExSocketInterface): void {
for (const [userId, user] of this.users.entries()) {
if (userId !== listener.userId) {
this.socketListener.onUserLeaves(userId, listener);
}
}
for (const [groupId, group] of this.groups.entries()) {
this.socketListener.onGroupLeaves(groupId, listener);
}
this.listeners.delete(listener);
listener.listenedZones.delete(this);
}
}