mirror of
https://github.com/AlecM33/Werewolf.git
synced 2025-12-26 15:57:50 +01:00
223 lines
7.8 KiB
JavaScript
223 lines
7.8 KiB
JavaScript
const { RateLimiterMemory } = require('rate-limiter-flexible');
|
|
const redis = require('redis');
|
|
const Events = require('../Events');
|
|
const { EVENT_IDS, REDIS_CHANNELS, PRIMITIVES, SOCKET_EVENTS, TIMER_EVENTS, SYNCABLE_EVENTS } = require('../../config/globals');
|
|
|
|
class EventManager {
|
|
constructor (logger, instanceId) {
|
|
if (EventManager.instance) {
|
|
throw new Error('The server attempted to instantiate more than one EventManager.');
|
|
}
|
|
logger.info('CREATING SINGLETON EVENT MANAGER');
|
|
this.logger = logger;
|
|
this.io = null;
|
|
this.publisher = null;
|
|
this.subscriber = null;
|
|
this.timerManager = null;
|
|
this.gameManager = null;
|
|
this.instanceId = instanceId;
|
|
EventManager.instance = this;
|
|
}
|
|
|
|
broadcast = (message) => {
|
|
this.io?.emit(EVENT_IDS.BROADCAST, message);
|
|
};
|
|
|
|
createRedisPublisher = async () => {
|
|
this.publisher = process.env.NODE_ENV.trim() === 'development'
|
|
? redis.createClient()
|
|
: redis.createClient({
|
|
url: process.env.REDIS_URL
|
|
});
|
|
this.publisher.on('error', (e) => {
|
|
throw new Error('PUBLISHER CLIENT FAILED because: ' + e);
|
|
});
|
|
try {
|
|
await this.publisher.connect();
|
|
this.logger.info('EVENT MANAGER - CREATED PUBLISHER');
|
|
} catch (e) {
|
|
throw new Error('UNABLE TO CONNECT TO REDIS because: ' + e);
|
|
}
|
|
}
|
|
|
|
createGameSyncSubscriber = async (gameManager, eventManager) => {
|
|
this.subscriber = this.publisher.duplicate();
|
|
this.subscriber.on('error', (e) => {
|
|
throw new Error('REDIS SUBSCRIBER CLIENT ERROR: ' + e);
|
|
});
|
|
try {
|
|
await this.subscriber.connect();
|
|
this.logger.info('EVENT MANAGER - CREATED SUBSCRIBER');
|
|
} catch (e) {
|
|
throw new Error('UNABLE TO CONNECT TO REDIS because: ' + e);
|
|
}
|
|
|
|
await this.subscriber.subscribe(REDIS_CHANNELS.ACTIVE_GAME_STREAM, async (message) => {
|
|
this.logger.debug('MESSAGE: ' + message);
|
|
let messageComponents, args;
|
|
try {
|
|
messageComponents = message.split(';', 3);
|
|
if (messageComponents[messageComponents.length - 1] === this.instanceId) {
|
|
this.logger.debug('Disregarding self-authored message');
|
|
return;
|
|
}
|
|
args = JSON.parse(
|
|
message.slice(
|
|
message.indexOf(messageComponents[messageComponents.length - 1]) + (PRIMITIVES.INSTANCE_ID_LENGTH + 1)
|
|
)
|
|
);
|
|
} catch (e) {
|
|
this.logger.error('MALFORMED MESSAGE RESULTED IN ERROR: ' + e + '; DISREGARDING');
|
|
return;
|
|
}
|
|
if (messageComponents) {
|
|
const game = await gameManager.getActiveGame(messageComponents[0]);
|
|
if (game) {
|
|
await eventManager.handleEventById(
|
|
messageComponents[1],
|
|
messageComponents[messageComponents.length - 1],
|
|
game,
|
|
null,
|
|
game?.accessCode || messageComponents[0],
|
|
args || null,
|
|
null,
|
|
true
|
|
);
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
createMessageToPublish = (...args) => {
|
|
let message = '';
|
|
for (let i = 0; i < args.length; i ++) {
|
|
message += args[i];
|
|
if (i !== args.length - 1) {
|
|
message += ';';
|
|
}
|
|
}
|
|
return message;
|
|
}
|
|
|
|
createSocketServer = (main, app, port, logger) => {
|
|
let io;
|
|
if (process.env.NODE_ENV.trim() === 'development') {
|
|
io = require('socket.io')(main, {
|
|
cors: { origin: 'http://localhost:' + port }
|
|
});
|
|
} else {
|
|
io = require('socket.io')(main, {
|
|
cors: { origin: 'https://play-werewolf.app' }
|
|
});
|
|
}
|
|
|
|
registerRateLimiter(io, logger);
|
|
this.io = io;
|
|
|
|
return io;
|
|
};
|
|
|
|
createGameSocketNamespace = (server, logger, gameManager) => {
|
|
const namespace = server.of('/in-game');
|
|
const registerSocketHandler = this.registerSocketHandler;
|
|
registerRateLimiter(namespace, logger);
|
|
namespace.on('connection', function (socket) {
|
|
socket.on('disconnecting', (reason) => {
|
|
logger.trace('client socket disconnecting because: ' + reason);
|
|
});
|
|
|
|
registerSocketHandler(namespace, socket, gameManager);
|
|
});
|
|
return server.of('/in-game');
|
|
};
|
|
|
|
registerSocketHandler = (namespace, socket, gameManager) => {
|
|
socket.on(SOCKET_EVENTS.IN_GAME_MESSAGE, async (eventId, accessCode, args = null, ackFn = null) => {
|
|
const game = await gameManager.getActiveGame(accessCode);
|
|
if (game) {
|
|
if (TIMER_EVENTS().includes(eventId)) {
|
|
await this.handleEventById(
|
|
EVENT_IDS.TIMER_EVENT,
|
|
null,
|
|
game,
|
|
socket.id,
|
|
game.accessCode,
|
|
args,
|
|
ackFn,
|
|
true,
|
|
eventId
|
|
);
|
|
} else {
|
|
await this.handleAndSyncSocketEvent(eventId, game, socket, args, ackFn);
|
|
}
|
|
} else {
|
|
ackFn(null);
|
|
}
|
|
});
|
|
};
|
|
|
|
handleAndSyncSocketEvent = async (eventId, game, socket, socketArgs, ackFn) => {
|
|
await this.handleEventById(eventId, null, game, socket?.id, game.accessCode, socketArgs, ackFn, false);
|
|
/* This server should publish events initiated by a connected socket to Redis for consumption by other instances. */
|
|
if (SYNCABLE_EVENTS().includes(eventId)) {
|
|
await this.gameManager.refreshGame(game);
|
|
await this.publisher?.publish(
|
|
REDIS_CHANNELS.ACTIVE_GAME_STREAM,
|
|
this.createMessageToPublish(game.accessCode, eventId, this.instanceId, JSON.stringify(socketArgs))
|
|
);
|
|
}
|
|
}
|
|
|
|
handleEventById = async (
|
|
eventId,
|
|
senderInstanceId,
|
|
game,
|
|
requestingSocketId,
|
|
accessCode,
|
|
socketArgs,
|
|
ackFn,
|
|
syncOnly,
|
|
timerEventSubtype = null
|
|
) => {
|
|
this.logger.trace('ARGS TO HANDLER: ' + JSON.stringify(socketArgs));
|
|
const event = Events.find((event) => event.id === eventId);
|
|
const additionalVars = {
|
|
gameManager: this.gameManager,
|
|
timerManager: this.timerManager,
|
|
eventManager: this,
|
|
requestingSocketId: requestingSocketId,
|
|
ackFn: ackFn,
|
|
logger: this.logger,
|
|
instanceId: this.instanceId,
|
|
senderInstanceId: senderInstanceId,
|
|
timerEventSubtype: timerEventSubtype
|
|
};
|
|
if (event) {
|
|
if (!syncOnly || eventId === EVENT_IDS.RESTART_GAME) {
|
|
await event.stateChange(game, socketArgs, additionalVars);
|
|
}
|
|
await event.communicate(game, socketArgs, additionalVars);
|
|
}
|
|
}
|
|
}
|
|
|
|
function registerRateLimiter (server, logger) {
|
|
const rateLimiter = new RateLimiterMemory(
|
|
{
|
|
points: 10,
|
|
duration: 1
|
|
});
|
|
|
|
server.use(async (socket, next) => {
|
|
try {
|
|
await rateLimiter.consume(socket.handshake.address);
|
|
logger.trace('consumed point from ' + socket.handshake.address);
|
|
next();
|
|
} catch (rejection) {
|
|
next(new Error('Your connection has been blocked.'));
|
|
}
|
|
});
|
|
}
|
|
|
|
module.exports = EventManager;
|