redis effort part 5

This commit is contained in:
AlecM33
2023-01-14 22:48:30 -05:00
parent 148cfb63db
commit 81a132458e
15 changed files with 254 additions and 294 deletions

View File

@@ -1,116 +0,0 @@
const { fork } = require('child_process');
const path = require('path');
const globals = require('../../config/globals');
const redis = require('redis');
class ActiveGameRunner {
constructor (logger, instanceId) {
if (ActiveGameRunner.instance) {
throw new Error('The server tried to instantiate more than one ActiveGameRunner');
}
logger.info('CREATING SINGLETON ACTIVE GAME RUNNER');
this.timerThreads = {};
this.logger = logger;
this.client = redis.createClient();
this.subscriber = null;
this.instanceId = instanceId;
ActiveGameRunner.instance = this;
}
getActiveGame = async (accessCode) => {
const r = await this.client.get(accessCode);
return r === null ? r : JSON.parse(r);
}
createGameSyncSubscriber = async (gameManager, socketManager) => {
this.subscriber = this.client.duplicate();
await this.subscriber.connect();
await this.subscriber.subscribe(globals.REDIS_CHANNELS.ACTIVE_GAME_STREAM, async (message) => {
this.logger.info('MESSAGE: ' + message);
const messageComponents = message.split(';');
if (messageComponents[messageComponents.length - 1] === this.instanceId) {
this.logger.trace('Disregarding self-authored message');
return;
}
const game = await this.getActiveGame(messageComponents[0]);
let args;
if (messageComponents[2]) {
args = JSON.parse(messageComponents[2]);
}
if (game) {
await socketManager.handleEventById(
messageComponents[1],
game,
null,
game?.accessCode || messageComponents[0],
args || null,
null,
true
);
}
});
this.logger.info('ACTIVE GAME RUNNER - CREATED GAME SYNC SUBSCRIBER');
}
/* We're only going to fork a child process for games with a timer. They will report back to the parent process whenever
the timer is up.
*/
runGame = async (game, namespace, socketManager, gameManager) => {
this.logger.debug('running game ' + game.accessCode);
const gameProcess = fork(path.join(__dirname, '../GameProcess.js'));
this.timerThreads[game.accessCode] = gameProcess;
console.log(this.timerThreads);
this.logger.debug('game ' + game.accessCode + ' now associated with subProcess ' + gameProcess.pid);
gameProcess.on('message', async (msg) => {
game = await this.getActiveGame(game.accessCode);
switch (msg.command) {
case globals.GAME_PROCESS_COMMANDS.END_TIMER:
await socketManager.handleEventById(globals.EVENT_IDS.END_TIMER, game, msg.socketId, game.accessCode, msg, null, false);
this.logger.trace('PARENT: END TIMER');
break;
case globals.GAME_PROCESS_COMMANDS.PAUSE_TIMER:
await socketManager.handleEventById(globals.EVENT_IDS.PAUSE_TIMER, game, msg.socketId, game.accessCode, msg, null, false);
break;
case globals.GAME_PROCESS_COMMANDS.RESUME_TIMER:
await socketManager.handleEventById(globals.EVENT_IDS.RESUME_TIMER, game, msg.socketId, game.accessCode, msg, null, false);
break;
case globals.GAME_PROCESS_COMMANDS.GET_TIME_REMAINING:
this.logger.trace(msg);
game.timerParams.timeRemaining = msg.timeRemaining;
this.logger.trace('PARENT: GET TIME REMAINING');
msg.paused = game.timerParams.paused;
await socketManager.publisher.publish(
globals.REDIS_CHANNELS.ACTIVE_GAME_STREAM,
game.accessCode + ';' + globals.EVENT_IDS.SHARE_TIME_REMAINING + ';' + JSON.stringify(msg) + ';' + this.instanceId
);
const socket = namespace.sockets.get(msg.socketId);
if (socket) {
namespace.to(socket.id).emit(globals.GAME_PROCESS_COMMANDS.GET_TIME_REMAINING, game.timerParams.timeRemaining, game.timerParams.paused);
}
break;
}
if (globals.SYNCABLE_EVENTS().includes(msg.command)) {
await gameManager.refreshGame(game);
await socketManager.publisher.publish(
globals.REDIS_CHANNELS.ACTIVE_GAME_STREAM,
game.accessCode + ';' + msg.command + ';' + JSON.stringify(msg) + ';' + this.instanceId
);
}
});
gameProcess.on('exit', (code, signal) => {
this.logger.debug('Game timer thread ' + gameProcess.pid + ' exiting with code ' + code + ' - game ' + game.accessCode);
});
gameProcess.send({
command: globals.GAME_PROCESS_COMMANDS.START_TIMER,
accessCode: game.accessCode,
logLevel: this.logger.logLevel,
hours: game.timerParams.hours,
minutes: game.timerParams.minutes
});
game.startTime = new Date().toJSON();
};
}
module.exports = ActiveGameRunner;

View File

@@ -1,22 +1,23 @@
const globals = require('../../config/globals');
const EVENT_IDS = globals.EVENT_IDS;
const { RateLimiterMemory } = require('rate-limiter-flexible');
const redis = require('redis');
const Events = require('../Events');
class SocketManager {
class EventManager {
constructor (logger, instanceId) {
if (SocketManager.instance) {
throw new Error('The server attempted to instantiate more than one SocketManager.');
if (EventManager.instance) {
throw new Error('The server attempted to instantiate more than one EventManager.');
}
logger.info('CREATING SINGLETON SOCKET MANAGER');
logger.info('CREATING SINGLETON EVENT MANAGER');
this.logger = logger;
this.client = redis.createClient();
this.io = null;
this.publisher = null;
this.activeGameRunner = null;
this.subscriber = null;
this.timerManager = null;
this.gameManager = null;
this.instanceId = instanceId;
SocketManager.instance = this;
EventManager.instance = this;
}
broadcast = (message) => {
@@ -26,7 +27,38 @@ class SocketManager {
createRedisPublisher = async () => {
this.publisher = redis.createClient();
await this.publisher.connect();
this.logger.info('SOCKET MANAGER - CREATED GAME SYNC PUBLISHER');
this.logger.info('EVENT MANAGER - CREATED PUBLISHER');
}
createGameSyncSubscriber = async (gameManager, eventManager) => {
this.subscriber = this.client.duplicate();
await this.subscriber.connect();
await this.subscriber.subscribe(globals.REDIS_CHANNELS.ACTIVE_GAME_STREAM, async (message) => {
this.logger.info('MESSAGE: ' + message);
const messageComponents = message.split(';');
if (messageComponents[messageComponents.length - 1] === this.instanceId) {
this.logger.trace('Disregarding self-authored message');
return;
}
const game = await gameManager.getActiveGame(messageComponents[0]);
let args;
if (messageComponents[2]) {
args = JSON.parse(messageComponents[2]);
}
if (game) {
await eventManager.handleEventById(
messageComponents[1],
messageComponents[messageComponents.length - 1],
game,
null,
game?.accessCode || messageComponents[0],
args || null,
null,
true
);
}
});
this.logger.info('EVENT MANAGER - CREATED SUBSCRIBER');
}
createSocketServer = (main, app, port, logger) => {
@@ -49,24 +81,24 @@ class SocketManager {
createGameSocketNamespace = (server, logger, gameManager) => {
const namespace = server.of('/in-game');
const registerHandlers = this.registerHandlers;
const registerSocketHandler = this.registerSocketHandler;
registerRateLimiter(namespace, logger);
namespace.on('connection', function (socket) {
socket.on('disconnecting', (reason) => {
logger.trace('client socket disconnecting because: ' + reason);
});
registerHandlers(namespace, socket, gameManager);
registerSocketHandler(namespace, socket, gameManager);
});
return server.of('/in-game');
};
registerHandlers = (namespace, socket) => {
registerSocketHandler = (namespace, socket, gameManager) => {
socket.on(globals.SOCKET_EVENTS.IN_GAME_MESSAGE, async (eventId, accessCode, args = null, ackFn = null) => {
const game = await this.activeGameRunner.getActiveGame(accessCode);
const game = await gameManager.getActiveGame(accessCode);
if (game) {
if (globals.TIMER_EVENTS().includes(eventId)) {
await this.handleAndSyncTimerEvent(eventId, game, socket, args, ackFn, false);
await this.handleEventById(globals.EVENT_IDS.TIMER_EVENT, null, game, socket.id, game.accessCode, args, ackFn, true, eventId);
} else {
await this.handleAndSyncSocketEvent(eventId, game, socket, args, ackFn, false);
}
@@ -77,7 +109,7 @@ class SocketManager {
};
handleAndSyncSocketEvent = async (eventId, game, socket, socketArgs, ackFn) => {
await this.handleEventById(eventId, game, socket?.id, game.accessCode, socketArgs, ackFn, false);
await this.handleEventById(eventId, null, game, socket?.id, game.accessCode, socketArgs, ackFn, false);
/* This server should publish events initiated by a connected socket to Redis for consumption by other instances. */
if (globals.SYNCABLE_EVENTS().includes(eventId)) {
await this.gameManager.refreshGame(game);
@@ -88,33 +120,22 @@ class SocketManager {
}
}
handleAndSyncTimerEvent = async (eventId, game, socketId, accessCode, socketArgs, ackFn, syncOnly) => {
switch (eventId) {
case EVENT_IDS.PAUSE_TIMER:
await this.gameManager.pauseTimer(game, this.logger);
break;
case EVENT_IDS.RESUME_TIMER:
await this.gameManager.resumeTimer(game, this.logger);
break;
default:
break;
}
}
handleEventById = async (eventId, game, socketId, accessCode, socketArgs, ackFn, syncOnly) => {
handleEventById = async (eventId, senderInstanceId, game, socketId, accessCode, socketArgs, ackFn, syncOnly, timerEventSubtype = null) => {
this.logger.trace('ARGS TO HANDLER: ' + JSON.stringify(socketArgs));
const event = Events.find((event) => event.id === eventId);
const additionalVars = {
gameManager: this.gameManager,
activeGameRunner: this.activeGameRunner,
socketManager: this,
timerManager: this.timerManager,
eventManager: this,
socketId: socketId,
ackFn: ackFn,
logger: this.logger,
instanceId: this.instanceId
instanceId: this.instanceId,
senderInstanceId: senderInstanceId,
timerEventSubtype: timerEventSubtype
};
if (event) {
if (!syncOnly) {
if (!syncOnly || eventId === globals.EVENT_IDS.RESTART_GAME) {
await event.stateChange(game, socketArgs, additionalVars);
}
await event.communicate(game, socketArgs, additionalVars);
@@ -140,4 +161,4 @@ function registerRateLimiter (server, logger) {
});
}
module.exports = SocketManager;
module.exports = EventManager;

View File

@@ -13,20 +13,25 @@ class GameManager {
logger.info('CREATING SINGLETON GAME MANAGER');
this.logger = logger;
this.environment = environment;
this.activeGameRunner = null;
this.socketManager = null;
this.timerManager = null;
this.eventManager = null;
this.namespace = null;
this.instanceId = instanceId;
GameManager.instance = this;
}
getActiveGame = async (accessCode) => {
const r = await this.eventManager.client.get(accessCode);
return r === null ? r : JSON.parse(r);
}
setGameSocketNamespace = (namespace) => {
this.namespace = namespace;
};
refreshGame = async (game) => {
this.logger.debug('PUSHING REFRESH OF ' + game.accessCode);
await this.activeGameRunner.client.set(game.accessCode, JSON.stringify(game));
await this.eventManager.client.set(game.accessCode, JSON.stringify(game));
}
createGame = async (gameParams) => {
@@ -61,7 +66,7 @@ class GameManager {
new Date().toJSON(),
req.timerParams
);
await this.activeGameRunner.client.set(newAccessCode, JSON.stringify(newGame), {
await this.eventManager.client.set(newAccessCode, JSON.stringify(newGame), {
EX: globals.STALE_GAME_SECONDS
});
return Promise.resolve({ accessCode: newAccessCode, cookie: moderator.cookie, environment: this.environment });
@@ -73,7 +78,7 @@ class GameManager {
};
pauseTimer = async (game, logger) => {
const thread = this.activeGameRunner.timerThreads[game.accessCode];
const thread = this.timerManager.timerThreads[game.accessCode];
if (thread && !thread.killed) {
this.logger.debug('Timer thread found for game ' + game.accessCode);
thread.send({
@@ -85,7 +90,7 @@ class GameManager {
};
resumeTimer = async (game, logger) => {
const thread = this.activeGameRunner.timerThreads[game.accessCode];
const thread = this.timerManager.timerThreads[game.accessCode];
if (thread && !thread.killed) {
this.logger.debug('Timer thread found for game ' + game.accessCode);
thread.send({
@@ -98,7 +103,7 @@ class GameManager {
getTimeRemaining = async (game, socketId) => {
if (socketId) {
const thread = this.activeGameRunner.timerThreads[game.accessCode];
const thread = this.timerManager.timerThreads[game.accessCode];
if (thread && (!thread.killed && thread.exitCode === null)) {
thread.send({
command: globals.GAME_PROCESS_COMMANDS.GET_TIME_REMAINING,
@@ -115,7 +120,7 @@ class GameManager {
};
checkAvailability = async (code) => {
const game = await this.activeGameRunner.getActiveGame(code.toUpperCase().trim());
const game = await this.getActiveGame(code.toUpperCase().trim());
if (game) {
return Promise.resolve({ accessCode: code, playerCount: getGameSize(game.deck), timerParams: game.timerParams });
} else {
@@ -127,7 +132,7 @@ class GameManager {
const charCount = charPool.length;
let codeDigits, accessCode;
let attempts = 0;
while (!accessCode || ((await this.activeGameRunner.client.keys('*')).includes(accessCode)
while (!accessCode || ((await this.eventManager.client.keys('*')).includes(accessCode)
&& attempts < globals.ACCESS_CODE_GENERATION_ATTEMPTS)) {
codeDigits = [];
let iterations = globals.ACCESS_CODE_LENGTH;
@@ -138,7 +143,7 @@ class GameManager {
accessCode = codeDigits.join('');
attempts ++;
}
return (await this.activeGameRunner.client.keys('*')).includes(accessCode)
return (await this.eventManager.client.keys('*')).includes(accessCode)
? null
: accessCode;
};
@@ -156,7 +161,7 @@ class GameManager {
) {
return Promise.reject({ status: 400, reason: 'There are too many people already spectating.' });
} else if (joinAsSpectator) {
return await addSpectator(game, name, this.logger, this.namespace, this.socketManager.publisher, this.instanceId, this.refreshGame);
return await addSpectator(game, name, this.logger, this.namespace, this.eventManager.publisher, this.instanceId, this.refreshGame);
}
const unassignedPerson = this.findPersonByField(game, 'id', game.currentModeratorId).assigned === false
? this.findPersonByField(game, 'id', game.currentModeratorId)
@@ -172,7 +177,7 @@ class GameManager {
GameStateCurator.mapPerson(unassignedPerson),
game.isFull
);
await this.activeGameRunner.publisher?.publish(
await this.eventManager.publisher?.publish(
globals.REDIS_CHANNELS.ACTIVE_GAME_STREAM,
game.accessCode + ';' + globals.EVENT_IDS.PLAYER_JOINED + ';' + JSON.stringify(unassignedPerson) + ';' + this.instanceId
);
@@ -181,20 +186,20 @@ class GameManager {
if (game.people.filter(person => person.userType === globals.USER_TYPES.SPECTATOR).length === globals.MAX_SPECTATORS) {
return Promise.reject({ status: 400, reason: 'This game has reached the maximum number of players and spectators.' });
}
return await addSpectator(game, name, this.logger, this.namespace, this.socketManager.publisher, this.instanceId, this.refreshGame);
return await addSpectator(game, name, this.logger, this.namespace, this.eventManager.publisher, this.instanceId, this.refreshGame);
}
};
restartGame = async (game, namespace) => {
// kill any outstanding timer threads
const subProcess = this.activeGameRunner.timerThreads[game.accessCode];
const subProcess = this.timerManager.timerThreads[game.accessCode];
if (subProcess) {
if (!subProcess.killed) {
this.logger.info('Killing timer process ' + subProcess.pid + ' for: ' + game.accessCode);
this.activeGameRunner.timerThreads[game.accessCode].kill();
this.timerManager.timerThreads[game.accessCode].kill();
}
this.logger.debug('Deleting reference to subprocess ' + subProcess.pid);
delete this.activeGameRunner.timerThreads[game.accessCode];
delete this.timerManager.timerThreads[game.accessCode];
}
// re-shuffle the deck
@@ -230,11 +235,11 @@ class GameManager {
game.status = globals.STATUS.IN_PROGRESS;
if (game.hasTimer) {
game.timerParams.paused = true;
await this.activeGameRunner.runGame(game, namespace, this.socketManager, this);
await this.timerManager.runTimer(game, namespace, this.eventManager, this);
}
await this.refreshGame(game);
await this.socketManager.publisher?.publish(
await this.eventManager.publisher?.publish(
globals.REDIS_CHANNELS.ACTIVE_GAME_STREAM,
game.accessCode + ';' + globals.EVENT_IDS.RESTART_GAME + ';' + JSON.stringify({}) + ';' + this.instanceId
);

View File

@@ -0,0 +1,47 @@
const { fork } = require('child_process');
const path = require('path');
const globals = require('../../config/globals');
class TimerManager {
constructor (logger, instanceId) {
if (TimerManager.instance) {
throw new Error('The server tried to instantiate more than one TimerManager');
}
logger.info('CREATING SINGLETON TIMER MANAGER');
this.timerThreads = {};
this.logger = logger;
this.subscriber = null;
this.instanceId = instanceId;
TimerManager.instance = this;
}
runTimer = async (game, namespace, eventManager, gameManager) => {
this.logger.debug('running timer for game ' + game.accessCode);
const gameProcess = fork(path.join(__dirname, '../GameProcess.js'));
this.timerThreads[game.accessCode] = gameProcess;
this.logger.debug('game ' + game.accessCode + ' now associated with subProcess ' + gameProcess.pid);
gameProcess.on('message', async (msg) => {
game = await gameManager.getActiveGame(game.accessCode);
await eventManager.handleEventById(msg.command, null, game, msg.socketId, game.accessCode, msg, null, false);
await gameManager.refreshGame(game);
await eventManager.publisher.publish(
globals.REDIS_CHANNELS.ACTIVE_GAME_STREAM,
game.accessCode + ';' + msg.command + ';' + JSON.stringify(msg) + ';' + this.instanceId
);
});
gameProcess.on('exit', (code, signal) => {
this.logger.debug('Game timer thread ' + gameProcess.pid + ' exiting with code ' + code + ' - game ' + game.accessCode);
});
gameProcess.send({
command: globals.GAME_PROCESS_COMMANDS.START_TIMER,
accessCode: game.accessCode,
logLevel: this.logger.logLevel,
hours: game.timerParams.hours,
minutes: game.timerParams.minutes
});
game.startTime = new Date().toJSON();
}
}
module.exports = TimerManager;