redis effort part 5

This commit is contained in:
AlecM33
2023-01-17 21:00:41 -05:00
parent 81a132458e
commit e39cc61b4e
5 changed files with 167 additions and 98 deletions

View File

@@ -10,33 +10,43 @@
const GameManager = require('./server/modules/singletons/GameManager'); const GameManager = require('./server/modules/singletons/GameManager');
const eventManager = require('./server/modules/singletons/EventManager'); const eventManager = require('./server/modules/singletons/EventManager');
const globals = require('./server/config/globals'); const globals = require('./server/config/globals');
app.use(express.json({limit: '10kb'}));
const args = ServerBootstrapper.processCLIArgs(); const args = ServerBootstrapper.processCLIArgs();
const logger = require('./server/modules/Logger')(args.logLevel); const logger = require('./server/modules/Logger')(args.logLevel);
logger.info('LOG LEVEL IS: ' + args.logLevel);
const port = parseInt(process.env.PORT) || args.port || 8080; const port = parseInt(process.env.PORT) || args.port || 8080;
const webServer = ServerBootstrapper.createServerWithCorrectHTTPProtocol(app, args.useHttps, args.port, logger); const webServer = ServerBootstrapper.createServerWithCorrectHTTPProtocol(app, args.useHttps, args.port, logger);
const singletons = ServerBootstrapper.singletons(logger, (() => { const instanceId = (() => {
let id = ''; let id = '';
for (let i = 0; i < globals.INSTANCE_ID_LENGTH; i ++) { for (let i = 0; i < globals.INSTANCE_ID_LENGTH; i ++) {
id += globals.INSTANCE_ID_CHAR_POOL[Math.floor(Math.random() * globals.INSTANCE_ID_CHAR_POOL.length)]; id += globals.INSTANCE_ID_CHAR_POOL[Math.floor(Math.random() * globals.INSTANCE_ID_CHAR_POOL.length)];
} }
return id; return id;
})()); })()
const singletons = ServerBootstrapper.singletons(logger, instanceId);
app.use(express.json({limit: '10kb'}));
logger.info('LOG LEVEL IS: ' + args.logLevel);
singletons.gameManager.timerManager = timerManager.instance; singletons.gameManager.timerManager = timerManager.instance;
singletons.gameManager.eventManager = eventManager.instance; singletons.gameManager.eventManager = eventManager.instance;
singletons.eventManager.timerManager = timerManager.instance; singletons.eventManager.timerManager = timerManager.instance;
singletons.eventManager.gameManager = GameManager.instance; singletons.eventManager.gameManager = GameManager.instance;
//singletons.timerManager.setUpSignalHandler();
process.on( "SIGINT", function() {
console.log( "\ngracefully shutting down from SIGINT (Crtl-C)" );
process.exit();
} );
process.on( "exit", function() {
console.log( "never see this log message" );
} );
//process.kill(process.pid, "SIGTERM");
try { try {
await singletons.eventManager.client.connect(); await singletons.eventManager.client.connect();
logger.info('Root Redis client connected'); logger.info('Root Redis client connected');
} catch(e) { } catch(e) {
reject(new Error('UNABLE TO CONNECT TO REDIS because: '+ e)); reject(new Error('UNABLE TO CONNECT TO REDIS because: '+ e));
} }

View File

@@ -38,9 +38,9 @@ const Events = [
id: EVENT_IDS.FETCH_GAME_STATE, id: EVENT_IDS.FETCH_GAME_STATE,
stateChange: async (game, socketArgs, vars) => { stateChange: async (game, socketArgs, vars) => {
const matchingPerson = vars.gameManager.findPersonByField(game, 'cookie', socketArgs.personId); const matchingPerson = vars.gameManager.findPersonByField(game, 'cookie', socketArgs.personId);
if (matchingPerson && matchingPerson.socketId !== vars.socketId) { if (matchingPerson && matchingPerson.socketId !== vars.requestingSocketId) {
matchingPerson.socketId = vars.socketId; matchingPerson.socketId = vars.requestingSocketId;
vars.gameManager.namespace.sockets.get(vars.socketId)?.join(game.accessCode); vars.gameManager.namespace.sockets.get(vars.requestingSocketId)?.join(game.accessCode);
} }
}, },
communicate: async (game, socketArgs, vars) => { communicate: async (game, socketArgs, vars) => {
@@ -202,9 +202,10 @@ const Events = [
stateChange: async (game, socketArgs, vars) => { stateChange: async (game, socketArgs, vars) => {
if (vars.instanceId !== vars.senderInstanceId if (vars.instanceId !== vars.senderInstanceId
&& vars.timerManager.timerThreads[game.accessCode] && vars.timerManager.timerThreads[game.accessCode]
&& !vars.timerManager.timerThreads[game.accessCode].killed
) { ) {
vars.timerManager.timerThreads[game.accessCode].kill(); if (!vars.timerManager.timerThreads[game.accessCode].killed) {
vars.timerManager.timerThreads[game.accessCode].kill();
}
delete vars.timerManager.timerThreads[game.accessCode]; delete vars.timerManager.timerThreads[game.accessCode];
} }
}, },
@@ -220,33 +221,33 @@ const Events = [
stateChange: async (game, socketArgs, vars) => {}, stateChange: async (game, socketArgs, vars) => {},
communicate: async (game, socketArgs, vars) => { communicate: async (game, socketArgs, vars) => {
const thread = vars.timerManager.timerThreads[game.accessCode]; const thread = vars.timerManager.timerThreads[game.accessCode];
if (thread && (!thread.killed && thread.exitCode === null)) { if (thread) {
thread.send({ if (!thread.killed && thread.exitCode === null) {
command: vars.timerEventSubtype, thread.send({
accessCode: game.accessCode, command: vars.timerEventSubtype,
socketId: vars.socketId, accessCode: game.accessCode,
logLevel: vars.logger.logLevel socketId: vars.requestingSocketId,
}); logLevel: vars.logger.logLevel
} else if (thread) { });
if (vars.timerEventSubtype === EVENT_IDS.GET_TIME_REMAINING && game.timerParams && game.timerParams.timeRemaining === 0) { } else {
// vars.gameManager.namespace.to(vars.socketId) const socket = vars.gameManager.namespace.sockets.get(vars.requestingSocketId);
// .emit(globals.GAME_PROCESS_COMMANDS.GET_TIME_REMAINING, game.timerParams.timeRemaining, game.timerParams.paused); if (socket) {
// await vars.eventManager.publisher.publish( vars.gameManager.namespace.to(socket.id).emit(
// globals.REDIS_CHANNELS.ACTIVE_GAME_STREAM, globals.GAME_PROCESS_COMMANDS.GET_TIME_REMAINING,
// game.accessCode + ';' + globals.EVENT_IDS.SHARE_TIME_REMAINING + ';' + game.timerParams.timeRemaining,
// JSON.stringify({ game.timerParams.paused
// socketId: vars.socketId, );
// timeRemaining: game.timerParams.timeRemaining, }
// paused: game.timerParams.paused
// }) +
// ';' + vars.instanceId
// );
} }
} else { // we need to consult another container for the timer data } else { // we need to consult another container for the timer data
await vars.eventManager.publisher?.publish( await vars.eventManager.publisher?.publish(
globals.REDIS_CHANNELS.ACTIVE_GAME_STREAM, globals.REDIS_CHANNELS.ACTIVE_GAME_STREAM,
game.accessCode + ';' + globals.EVENT_IDS.SOURCE_TIMER_EVENT + ';' + vars.eventManager.createMessageToPublish(
JSON.stringify({ socketId: vars.socketId, timerEventSubtype: vars.timerEventSubtype }) + ';' + vars.instanceId game.accessCode,
globals.EVENT_IDS.SOURCE_TIMER_EVENT,
vars.instanceId,
JSON.stringify({ socketId: vars.requestingSocketId, timerEventSubtype: vars.timerEventSubtype })
)
); );
} }
} }
@@ -258,37 +259,35 @@ const Events = [
stateChange: async (game, socketArgs, vars) => {}, stateChange: async (game, socketArgs, vars) => {},
communicate: async (game, socketArgs, vars) => { communicate: async (game, socketArgs, vars) => {
const thread = vars.timerManager.timerThreads[game.accessCode]; const thread = vars.timerManager.timerThreads[game.accessCode];
if (thread && (!thread.killed && thread.exitCode === null)) { if (thread) {
thread.send({ if (!thread.killed && thread.exitCode === null) {
command: socketArgs.timerEventSubtype, thread.send({
accessCode: game.accessCode, command: socketArgs.timerEventSubtype,
socketId: socketArgs.socketId, accessCode: game.accessCode,
logLevel: vars.logger.logLevel
});
} else if (thread) {
if (game.timerParams && game.timerParams.timeRemaining === 0) {
vars.gameManager.namespace.to(vars.socketId)
.emit(globals.GAME_PROCESS_COMMANDS.GET_TIME_REMAINING, game.timerParams.timeRemaining, game.timerParams.paused);
}
await vars.eventManager.publisher.publish(
globals.REDIS_CHANNELS.ACTIVE_GAME_STREAM,
game.accessCode + ';' + globals.EVENT_IDS.SHARE_TIME_REMAINING + ';' +
JSON.stringify({
socketId: socketArgs.socketId, socketId: socketArgs.socketId,
timeRemaining: game.timerParams.timeRemaining, logLevel: vars.logger.logLevel
paused: game.timerParams.paused });
}) + } else {
';' + vars.instanceId await vars.eventManager.publisher.publish(
); globals.REDIS_CHANNELS.ACTIVE_GAME_STREAM,
vars.eventManager.createMessageToPublish(
game.accessCode,
socketArgs.timerEventSubtype,
vars.instanceId,
JSON.stringify({
socketId: socketArgs.socketId,
timeRemaining: game.timerParams.timeRemaining,
paused: game.timerParams.paused
})
)
);
}
} }
} }
}, },
{ {
id: EVENT_IDS.END_TIMER, id: EVENT_IDS.END_TIMER,
stateChange: async (game, socketArgs, vars) => { stateChange: async (game, socketArgs, vars) => {
if (vars.timerManager.timerThreads[game.accessCode]) {
delete vars.timerManager.timerThreads[game.accessCode];
}
game.timerParams.paused = false; game.timerParams.paused = false;
game.timerParams.timeRemaining = 0; game.timerParams.timeRemaining = 0;
}, },
@@ -318,9 +317,7 @@ const Events = [
}, },
{ {
id: EVENT_IDS.GET_TIME_REMAINING, id: EVENT_IDS.GET_TIME_REMAINING,
stateChange: async (game, socketArgs, vars) => { stateChange: async (game, socketArgs, vars) => {},
game.timerParams.timeRemaining = socketArgs.timeRemaining;
},
communicate: async (game, socketArgs, vars) => { communicate: async (game, socketArgs, vars) => {
const socket = vars.gameManager.namespace.sockets.get(socketArgs.socketId); const socket = vars.gameManager.namespace.sockets.get(socketArgs.socketId);
if (socket) { if (socket) {

View File

@@ -34,33 +34,53 @@ class EventManager {
this.subscriber = this.client.duplicate(); this.subscriber = this.client.duplicate();
await this.subscriber.connect(); await this.subscriber.connect();
await this.subscriber.subscribe(globals.REDIS_CHANNELS.ACTIVE_GAME_STREAM, async (message) => { await this.subscriber.subscribe(globals.REDIS_CHANNELS.ACTIVE_GAME_STREAM, async (message) => {
this.logger.info('MESSAGE: ' + message); this.logger.debug('MESSAGE: ' + message);
const messageComponents = message.split(';'); let messageComponents, args;
if (messageComponents[messageComponents.length - 1] === this.instanceId) { try {
this.logger.trace('Disregarding self-authored message'); messageComponents = message.split(';', 3);
if (messageComponents[messageComponents.length - 1] === this.instanceId) {
this.logger.trace('Disregarding self-authored message');
return;
}
args = JSON.parse(
message.slice(
message.indexOf(messageComponents[messageComponents.length - 1]) + (globals.INSTANCE_ID_LENGTH + 1)
)
)
} catch(e) {
this.logger.error('MALFORMED MESSAGE RESULTED IN ERROR: ' + e + '; DISREGARDING');
return; return;
} }
const game = await gameManager.getActiveGame(messageComponents[0]); if (messageComponents) {
let args; const game = await gameManager.getActiveGame(messageComponents[0]);
if (messageComponents[2]) { if (game) {
args = JSON.parse(messageComponents[2]); await eventManager.handleEventById(
} messageComponents[1],
if (game) { messageComponents[messageComponents.length - 1],
await eventManager.handleEventById( game,
messageComponents[1], null,
messageComponents[messageComponents.length - 1], game?.accessCode || messageComponents[0],
game, args || null,
null, null,
game?.accessCode || messageComponents[0], true
args || null, );
null, }
true
);
} }
}); });
this.logger.info('EVENT MANAGER - CREATED SUBSCRIBER'); this.logger.info('EVENT MANAGER - CREATED SUBSCRIBER');
} }
createMessageToPublish = (...args) => {
let message = '';
for (let i = 0; i < args.length; i ++) {
message += args[i];
if (i !== args.length - 1) {
message += ';';
}
}
return message;
}
createSocketServer = (main, app, port, logger) => { createSocketServer = (main, app, port, logger) => {
let io; let io;
if (process.env.NODE_ENV.trim() === 'development') { if (process.env.NODE_ENV.trim() === 'development') {
@@ -115,19 +135,19 @@ class EventManager {
await this.gameManager.refreshGame(game); await this.gameManager.refreshGame(game);
await this.publisher?.publish( await this.publisher?.publish(
globals.REDIS_CHANNELS.ACTIVE_GAME_STREAM, globals.REDIS_CHANNELS.ACTIVE_GAME_STREAM,
game.accessCode + ';' + eventId + ';' + JSON.stringify(socketArgs) + ';' + this.instanceId this.createMessageToPublish(game.accessCode, eventId, this.instanceId, JSON.stringify(socketArgs))
); );
} }
} }
handleEventById = async (eventId, senderInstanceId, game, socketId, accessCode, socketArgs, ackFn, syncOnly, timerEventSubtype = null) => { handleEventById = async (eventId, senderInstanceId, game, requestingSocketId, accessCode, socketArgs, ackFn, syncOnly, timerEventSubtype = null) => {
this.logger.trace('ARGS TO HANDLER: ' + JSON.stringify(socketArgs)); this.logger.trace('ARGS TO HANDLER: ' + JSON.stringify(socketArgs));
const event = Events.find((event) => event.id === eventId); const event = Events.find((event) => event.id === eventId);
const additionalVars = { const additionalVars = {
gameManager: this.gameManager, gameManager: this.gameManager,
timerManager: this.timerManager, timerManager: this.timerManager,
eventManager: this, eventManager: this,
socketId: socketId, requestingSocketId: requestingSocketId,
ackFn: ackFn, ackFn: ackFn,
logger: this.logger, logger: this.logger,
instanceId: this.instanceId, instanceId: this.instanceId,

View File

@@ -22,6 +22,12 @@ class GameManager {
getActiveGame = async (accessCode) => { getActiveGame = async (accessCode) => {
const r = await this.eventManager.client.get(accessCode); const r = await this.eventManager.client.get(accessCode);
if (r === null && this.timerManager.timerThreads[accessCode]) {
if (!this.timerManager.timerThreads[accessCode].killed) {
this.timerManager.timerThreads[accessCode].kill();
}
delete this.timerManager.timerThreads[accessCode];
}
return r === null ? r : JSON.parse(r); return r === null ? r : JSON.parse(r);
} }
@@ -31,7 +37,10 @@ class GameManager {
refreshGame = async (game) => { refreshGame = async (game) => {
this.logger.debug('PUSHING REFRESH OF ' + game.accessCode); this.logger.debug('PUSHING REFRESH OF ' + game.accessCode);
await this.eventManager.client.set(game.accessCode, JSON.stringify(game)); await this.eventManager.client.set(game.accessCode, JSON.stringify(game), {
KEEPTTL: true,
XX: true // only set the key if it already exists
});
} }
createGame = async (gameParams) => { createGame = async (gameParams) => {
@@ -52,7 +61,9 @@ class GameManager {
console.log(moderator); console.log(moderator);
moderator.assigned = true; moderator.assigned = true;
if (req.timerParams !== null) { if (req.timerParams !== null) {
req.timerParams.paused = false; req.timerParams.paused = true;
req.timerParams.timeRemaining = convertFromHoursToMilliseconds(req.timerParams.hours)
+ convertFromMinutesToMilliseconds(req.timerParams.minutes);
} }
const newGame = new Game( const newGame = new Game(
newAccessCode, newAccessCode,
@@ -161,7 +172,7 @@ class GameManager {
) { ) {
return Promise.reject({ status: 400, reason: 'There are too many people already spectating.' }); return Promise.reject({ status: 400, reason: 'There are too many people already spectating.' });
} else if (joinAsSpectator) { } else if (joinAsSpectator) {
return await addSpectator(game, name, this.logger, this.namespace, this.eventManager.publisher, this.instanceId, this.refreshGame); return await addSpectator(game, name, this.logger, this.namespace, this.eventManager, this.instanceId, this.refreshGame);
} }
const unassignedPerson = this.findPersonByField(game, 'id', game.currentModeratorId).assigned === false const unassignedPerson = this.findPersonByField(game, 'id', game.currentModeratorId).assigned === false
? this.findPersonByField(game, 'id', game.currentModeratorId) ? this.findPersonByField(game, 'id', game.currentModeratorId)
@@ -179,14 +190,19 @@ class GameManager {
); );
await this.eventManager.publisher?.publish( await this.eventManager.publisher?.publish(
globals.REDIS_CHANNELS.ACTIVE_GAME_STREAM, globals.REDIS_CHANNELS.ACTIVE_GAME_STREAM,
game.accessCode + ';' + globals.EVENT_IDS.PLAYER_JOINED + ';' + JSON.stringify(unassignedPerson) + ';' + this.instanceId this.eventManager.createMessageToPublish(
game.accessCode,
globals.EVENT_IDS.PLAYER_JOINED,
this.instanceId,
JSON.stringify(unassignedPerson)
)
); );
return Promise.resolve(unassignedPerson.cookie); return Promise.resolve(unassignedPerson.cookie);
} else { } else {
if (game.people.filter(person => person.userType === globals.USER_TYPES.SPECTATOR).length === globals.MAX_SPECTATORS) { if (game.people.filter(person => person.userType === globals.USER_TYPES.SPECTATOR).length === globals.MAX_SPECTATORS) {
return Promise.reject({ status: 400, reason: 'This game has reached the maximum number of players and spectators.' }); return Promise.reject({ status: 400, reason: 'This game has reached the maximum number of players and spectators.' });
} }
return await addSpectator(game, name, this.logger, this.namespace, this.eventManager.publisher, this.instanceId, this.refreshGame); return await addSpectator(game, name, this.logger, this.namespace, this.eventManager, this.instanceId, this.refreshGame);
} }
}; };
@@ -235,13 +251,20 @@ class GameManager {
game.status = globals.STATUS.IN_PROGRESS; game.status = globals.STATUS.IN_PROGRESS;
if (game.hasTimer) { if (game.hasTimer) {
game.timerParams.paused = true; game.timerParams.paused = true;
game.timerParams.timeRemaining = convertFromHoursToMilliseconds(game.timerParams.hours)
+ convertFromMinutesToMilliseconds(game.timerParams.minutes);
await this.timerManager.runTimer(game, namespace, this.eventManager, this); await this.timerManager.runTimer(game, namespace, this.eventManager, this);
} }
await this.refreshGame(game); await this.refreshGame(game);
await this.eventManager.publisher?.publish( await this.eventManager.publisher?.publish(
globals.REDIS_CHANNELS.ACTIVE_GAME_STREAM, globals.REDIS_CHANNELS.ACTIVE_GAME_STREAM,
game.accessCode + ';' + globals.EVENT_IDS.RESTART_GAME + ';' + JSON.stringify({}) + ';' + this.instanceId this.eventManager.createMessageToPublish(
game.accessCode,
globals.EVENT_IDS.RESTART_GAME,
this.instanceId,
'{}'
)
); );
namespace.in(game.accessCode).emit(globals.EVENT_IDS.RESTART_GAME); namespace.in(game.accessCode).emit(globals.EVENT_IDS.RESTART_GAME);
}; };
@@ -349,7 +372,7 @@ function getGameSize (cards) {
return quantity; return quantity;
} }
async function addSpectator (game, name, logger, namespace, publisher, instanceId, refreshGame) { async function addSpectator (game, name, logger, namespace, eventManager, instanceId, refreshGame) {
const spectator = new Person( const spectator = new Person(
createRandomId(), createRandomId(),
createRandomId(), createRandomId(),
@@ -363,11 +386,24 @@ async function addSpectator (game, name, logger, namespace, publisher, instanceI
globals.EVENT_IDS.ADD_SPECTATOR, globals.EVENT_IDS.ADD_SPECTATOR,
GameStateCurator.mapPerson(spectator) GameStateCurator.mapPerson(spectator)
); );
await publisher.publish( await eventManager.publisher.publish(
globals.REDIS_CHANNELS.ACTIVE_GAME_STREAM, globals.REDIS_CHANNELS.ACTIVE_GAME_STREAM,
game.accessCode + ';' + globals.EVENT_IDS.ADD_SPECTATOR + ';' + JSON.stringify(GameStateCurator.mapPerson(spectator)) + ';' + instanceId eventManager.createMessageToPublish(
game.accessCode,
globals.EVENT_IDS.ADD_SPECTATOR,
instanceId,
JSON.stringify(GameStateCurator.mapPerson(spectator))
)
); );
return Promise.resolve(spectator.cookie); return Promise.resolve(spectator.cookie);
} }
function convertFromMinutesToMilliseconds (minutes) {
return minutes * 60 * 1000;
}
function convertFromHoursToMilliseconds (hours) {
return hours * 60 * 60 * 1000;
}
module.exports = GameManager; module.exports = GameManager;

View File

@@ -15,6 +15,12 @@ class TimerManager {
TimerManager.instance = this; TimerManager.instance = this;
} }
setUpSignalHandler = () => {
process.on('SIGTERM', (code) => {
console.log('received sigterm');
});
}
runTimer = async (game, namespace, eventManager, gameManager) => { runTimer = async (game, namespace, eventManager, gameManager) => {
this.logger.debug('running timer for game ' + game.accessCode); this.logger.debug('running timer for game ' + game.accessCode);
const gameProcess = fork(path.join(__dirname, '../GameProcess.js')); const gameProcess = fork(path.join(__dirname, '../GameProcess.js'));
@@ -26,7 +32,7 @@ class TimerManager {
await gameManager.refreshGame(game); await gameManager.refreshGame(game);
await eventManager.publisher.publish( await eventManager.publisher.publish(
globals.REDIS_CHANNELS.ACTIVE_GAME_STREAM, globals.REDIS_CHANNELS.ACTIVE_GAME_STREAM,
game.accessCode + ';' + msg.command + ';' + JSON.stringify(msg) + ';' + this.instanceId eventManager.createMessageToPublish(game.accessCode, msg.command, this.instanceId, JSON.stringify(msg))
); );
}); });