diff --git a/server/db/patches/003_add-status-mail-type-to-note-state-table.sql b/server/db/patches/003_add-status-mail-type-to-note-state-table.sql new file mode 100644 index 0000000..432b8d6 --- /dev/null +++ b/server/db/patches/003_add-status-mail-type-to-note-state-table.sql @@ -0,0 +1 @@ +ALTER TABLE node_state ADD COLUMN last_status_mail_type VARCHAR(20); diff --git a/server/jobs/monitoringMailsSendingJob.js b/server/jobs/monitoringMailsSendingJob.js new file mode 100644 index 0000000..72819fc --- /dev/null +++ b/server/jobs/monitoringMailsSendingJob.js @@ -0,0 +1,13 @@ +'use strict'; + +angular.module('ffffng').factory('MonitoringMailsSendingJob', function (MonitoringService, Logger) { + return { + run: function () { + MonitoringService.sendMonitoringMails(function (err) { + if (err) { + Logger.tag('monitoring', 'mail-sending').error('Error sending monitoring mails:', err); + } + }); + } + }; +}); diff --git a/server/jobs/scheduler.js b/server/jobs/scheduler.js index 51589bb..c57697f 100644 --- a/server/jobs/scheduler.js +++ b/server/jobs/scheduler.js @@ -31,7 +31,8 @@ angular.module('ffffng').factory('Scheduler', function ($injector, Logger, confi schedule('0 */1 * * * *', 'MailQueueJob'); if (config.client.monitoring.enabled) { - schedule('30 */5 * * * *', 'NodeInformationRetrievalJob'); + schedule('30 */15 * * * *', 'NodeInformationRetrievalJob'); + schedule('45 */5 * * * *', 'MonitoringMailsSendingJob'); schedule('0 0 3 * * *', 'NodeInformationCleanupJob'); // every night at 3:00 } } diff --git a/server/services/mailService.js b/server/services/mailService.js index 3fbb984..0f1cb75 100644 --- a/server/services/mailService.js +++ b/server/services/mailService.js @@ -2,7 +2,7 @@ angular.module('ffffng') .service('MailService', function (Database, UrlBuilder, config, _, async, deepExtend, fs, moment, Logger) { - var MAIL_QUEUE_DB_BATCH_SIZE = 2; + var MAIL_QUEUE_DB_BATCH_SIZE = 50; var MAIL_QUEUE_MAX_PARALLEL_SENDING = 3; var transporter = require('nodemailer').createTransport(deepExtend( diff --git a/server/services/monitoringService.js b/server/services/monitoringService.js index 5d6502e..169558d 100644 --- a/server/services/monitoringService.js +++ b/server/services/monitoringService.js @@ -9,183 +9,349 @@ angular.module('ffffng') ErrorTypes, Logger, moment, + MailService, NodeService, request, Strings, + UrlBuilder, Validator, Constraints ) { - var previousImportTimestamp = null; + var MONITORING_MAILS_DB_BATCH_SIZE = 50; + var MONITORING_OFFLINE_MAILS_SCHEDULE = { + 1: { amount: 3, unit: 'hours' }, + 2: { amount: 1, unit: 'days' }, + 3: { amount: 7, unit: 'days' } + }; - function insertNodeInformation(nodeData, node, callback) { + var previousImportTimestamp = null; + + function insertNodeInformation(nodeData, node, callback) { + Logger + .tag('monitoring', 'information-retrieval') + .debug('Node is new in monitoring, creating data: %s', nodeData.mac); + + return Database.run( + 'INSERT INTO node_state ' + + '(mac, state, last_seen, import_timestamp, last_status_mail_send, last_status_mail_type) ' + + 'VALUES (?, ?, ?, ?, ?, ?)', + [ + node.mac, + nodeData.state, + nodeData.lastSeen.unix(), + nodeData.importTimestamp.unix(), + null, // new node so we haven't send a mail yet + null // new node so we haven't send a mail yet + ], + callback + ); + } + + function updateNodeInformation(nodeData, node, row, callback) { + Logger + .tag('monitoring', 'information-retrieval') + .debug('Node is known in monitoring: %s', nodeData.mac); + + if (!moment(row.import_timestamp).isBefore(nodeData.importTimestamp)) { Logger .tag('monitoring', 'information-retrieval') - .debug('Node is new in monitoring, creating data: %s', nodeData.mac); - - return Database.run( - 'INSERT INTO node_state ' + - '(mac, state, last_seen, import_timestamp, last_status_mail_send) ' + - 'VALUES (?, ?, ?, ?, ?)', - [ - node.mac, - nodeData.state, - nodeData.lastSeen.unix(), - nodeData.importTimestamp.unix(), - null // new node so we haven't send a mail yet - ], - callback - ); + .debug('No new data for node, skipping: %s', nodeData.mac); + return callback(); } - function updateNodeInformation(nodeData, node, row, callback) { - Logger - .tag('monitoring', 'information-retrieval') - .debug('Node is known in monitoring: %s', nodeData.mac); + Logger + .tag('monitoring', 'information-retrieval') + .debug('New data for node, updating: %s', nodeData.mac); - if (!moment(row.import_timestamp).isBefore(nodeData.importTimestamp)) { - Logger - .tag('monitoring', 'information-retrieval') - .debug('No new data for node, skipping: %s', nodeData.mac); - return callback(); - } + return Database.run( + 'UPDATE node_state ' + + 'SET state = ?, last_seen = ?, import_timestamp = ?, modified_at = ?' + + 'WHERE id = ? AND mac = ?', + [ + nodeData.state, nodeData.lastSeen.unix(), nodeData.importTimestamp.unix(), moment().unix(), + row.id, node.mac + ], + callback + ); + } - Logger - .tag('monitoring', 'information-retrieval') - .debug('New data for node, updating: %s', nodeData.mac); + function deleteNodeInformation(nodeData, node, callback) { + Logger + .tag('monitoring', 'information-retrieval') + .debug('Node is not being monitored, deleting monitoring data: %s', nodeData.mac); + return Database.run( + 'DELETE FROM node_state WHERE mac = ? AND import_timestamp < ?', + [node.mac, nodeData.importTimestamp.unix()], + callback + ); + } - return Database.run( - 'UPDATE node_state ' + - 'SET state = ?, last_seen = ?, import_timestamp = ?, modified_at = ?' + - 'WHERE id = ? AND mac = ?', - [ - nodeData.state, nodeData.lastSeen.unix(), nodeData.importTimestamp.unix(), moment().unix(), - row.id, node.mac - ], - callback - ); - } + function storeNodeInformation(nodeData, node, callback) { + if (node.monitoring && node.monitoringConfirmed) { + Logger.tag('monitoring', 'information-retrieval').debug('Node is being monitored: %s', nodeData.mac); - function deleteNodeInformation(nodeData, node, callback) { - Logger - .tag('monitoring', 'information-retrieval') - .debug('Node is not being monitored, deleting monitoring data: %s', nodeData.mac); - return Database.run( - 'DELETE FROM node_state WHERE mac = ? AND import_timestamp < ?', - [node.mac, nodeData.importTimestamp.unix()], - callback - ); - } - - function storeNodeInformation(nodeData, node, callback) { - if (node.monitoring && node.monitoringConfirmed) { - Logger.tag('monitoring', 'information-retrieval').debug('Node is being monitored: %s', nodeData.mac); - - return Database.get('SELECT * FROM node_state WHERE mac = ?', [node.mac], function (err, row) { - if (err) { - return callback(err); - } - - if (_.isUndefined(row)) { - return insertNodeInformation(nodeData, node, callback); - } else { - return updateNodeInformation(nodeData, node, row, callback); - } - }); - } else { - return deleteNodeInformation(nodeData, node, callback); - } - } - - var isValidMac = Validator.forConstraint(Constraints.node.mac); - - function parseNodesJson(body, callback) { - Logger.tag('monitoring', 'information-retrieval').info('Parsing nodes.json...'); - - function parseTimestamp(timestamp) { - if (!_.isString(json.timestamp)) { - return moment.invalid(); - } - return moment.utc(timestamp); - } - - var data = {}; - try { - var json = JSON.parse(body); - - if (json.version !== 1) { - return callback(new Error('Unexpected nodes.json version: ' + json.version)); + return Database.get('SELECT * FROM node_state WHERE mac = ?', [node.mac], function (err, row) { + if (err) { + return callback(err); } - data.importTimestamp = parseTimestamp(json.timestamp); - if (!data.importTimestamp.isValid()) { - return callback(new Error('Invalid timestamp: ' + json.timestamp)); + if (_.isUndefined(row)) { + return insertNodeInformation(nodeData, node, callback); + } else { + return updateNodeInformation(nodeData, node, row, callback); } + }); + } else { + return deleteNodeInformation(nodeData, node, callback); + } + } - if (!_.isPlainObject(json.nodes)) { - return callback(new Error('Invalid nodes object type: ' + (typeof json.nodes))); - } + var isValidMac = Validator.forConstraint(Constraints.node.mac); - data.nodes = _.values(_.map(json.nodes, function (nodeData, nodeId) { - if (!_.isPlainObject(nodeData)) { - throw new Error( - 'Node ' + nodeId + ': Unexpected node type: ' + (typeof nodeData) - ); - } + function parseNodesJson(body, callback) { + Logger.tag('monitoring', 'information-retrieval').info('Parsing nodes.json...'); - if (!_.isPlainObject(nodeData.nodeinfo)) { - throw new Error( - 'Node ' + nodeId + ': Unexpected nodeinfo type: ' + (typeof nodeData.nodeinfo) - ); - } - if (!_.isPlainObject(nodeData.nodeinfo.network)) { - throw new Error( - 'Node ' + nodeId + ': Unexpected nodeinfo.network type: ' + - (typeof nodeData.nodeinfo.network) - ); - } - - if (!isValidMac(nodeData.nodeinfo.network.mac)) { - throw new Error( - 'Node ' + nodeId + ': Invalid MAC: ' + nodeData.nodeinfo.network.mac - ); - } - var mac = Strings.normalizeMac(nodeData.nodeinfo.network.mac); - - if (!_.isPlainObject(nodeData.flags)) { - throw new Error( - 'Node ' + nodeId + ': Unexpected flags type: ' + (typeof nodeData.flags) - ); - } - if (!_.isBoolean(nodeData.flags.online)) { - throw new Error( - 'Node ' + nodeId + ': Unexpected flags.online type: ' + (typeof nodeData.flags.online) - ); - } - var isOnline = nodeData.flags.online; - - var lastSeen = parseTimestamp(nodeData.lastseen); - if (!lastSeen.isValid()) { - throw new Error( - 'Node ' + nodeId + ': Invalid lastseen timestamp: ' + nodeData.lastseen - ); - } - - return { - mac: mac, - importTimestamp: data.importTimestamp, - state: isOnline ? 'ONLINE' : 'OFFLINE', - lastSeen: lastSeen - }; - })); + function parseTimestamp(timestamp) { + if (!_.isString(json.timestamp)) { + return moment.invalid(); } - catch (error) { - return callback(error); - } - - callback(null, data); + return moment.utc(timestamp); } - return { + var data = {}; + try { + var json = JSON.parse(body); + + if (json.version !== 1) { + return callback(new Error('Unexpected nodes.json version: ' + json.version)); + } + + data.importTimestamp = parseTimestamp(json.timestamp); + if (!data.importTimestamp.isValid()) { + return callback(new Error('Invalid timestamp: ' + json.timestamp)); + } + + if (!_.isPlainObject(json.nodes)) { + return callback(new Error('Invalid nodes object type: ' + (typeof json.nodes))); + } + + data.nodes = _.values(_.map(json.nodes, function (nodeData, nodeId) { + if (!_.isPlainObject(nodeData)) { + throw new Error( + 'Node ' + nodeId + ': Unexpected node type: ' + (typeof nodeData) + ); + } + + if (!_.isPlainObject(nodeData.nodeinfo)) { + throw new Error( + 'Node ' + nodeId + ': Unexpected nodeinfo type: ' + (typeof nodeData.nodeinfo) + ); + } + if (!_.isPlainObject(nodeData.nodeinfo.network)) { + throw new Error( + 'Node ' + nodeId + ': Unexpected nodeinfo.network type: ' + + (typeof nodeData.nodeinfo.network) + ); + } + + if (!isValidMac(nodeData.nodeinfo.network.mac)) { + throw new Error( + 'Node ' + nodeId + ': Invalid MAC: ' + nodeData.nodeinfo.network.mac + ); + } + var mac = Strings.normalizeMac(nodeData.nodeinfo.network.mac); + + if (!_.isPlainObject(nodeData.flags)) { + throw new Error( + 'Node ' + nodeId + ': Unexpected flags type: ' + (typeof nodeData.flags) + ); + } + if (!_.isBoolean(nodeData.flags.online)) { + throw new Error( + 'Node ' + nodeId + ': Unexpected flags.online type: ' + (typeof nodeData.flags.online) + ); + } + var isOnline = nodeData.flags.online; + + var lastSeen = parseTimestamp(nodeData.lastseen); + if (!lastSeen.isValid()) { + throw new Error( + 'Node ' + nodeId + ': Invalid lastseen timestamp: ' + nodeData.lastseen + ); + } + + return { + mac: mac, + importTimestamp: data.importTimestamp, + state: isOnline ? 'ONLINE' : 'OFFLINE', + lastSeen: lastSeen + }; + })); + } + catch (error) { + return callback(error); + } + + callback(null, data); + } + + function sendMonitoringMailsBatched(name, mailType, findBatchFun, callback) { + Logger.tag('monitoring', 'mail-sending').info('Sending "%s" mails...', name); + + var sendNextBatch = function (err) { + if (err) { + return callback(err); + } + + Logger.tag('monitoring', 'mail-sending').info('Sending next batch...'); + + findBatchFun(function (err, nodeStates) { + if (err) { + return callback(err); + } + + if (_.isEmpty(nodeStates)) { + Logger.tag('monitoring', 'mail-sending').info('Done sending "%s" mails.', name); + return callback(null); + } + + async.each( + nodeStates, + function (nodeState, mailCallback) { + var mac = nodeState.mac; + Logger.tag('monitoring', 'mail-sending').debug('Loading node data for: %s', mac); + NodeService.findNodeDataByMac(mac, function (err, node, nodeSecrets) { + if (err) { + Logger + .tag('monitoring', 'mail-sending') + .error('Error sending "' + name + '" mail for node: ' + mac, err); + return callback(err); + } + + if (!node) { + Logger + .tag('monitoring', 'mail-sending') + .warn( + 'Node not found. Skipping sending of "' + name + '" mail: ' + mac + ); + return callback(null); + } + + if (node.monitoring && node.monitoringConfirmed) { + Logger + .tag('monitoring', 'mail-sending') + .info('Sending "%s" mail for: %s', name, mac); + MailService.enqueue( + config.server.email.from, + node.nickname + ' <' + node.email + '>', + mailType, + { + node: node, + lastSeen: nodeState.last_seen, + disableUrl: UrlBuilder.monitoringDisableUrl(nodeSecrets) + + }, + function (err) { + if (err) { + Logger + .tag('monitoring', 'mail-sending') + .error('Error sending "' + name + '" mail for node: ' + mac, err); + return mailCallback(err); + } + + Logger + .tag('monitoring', 'mail-sending') + .debug('Updating node state: ', mac); + + var now = moment().unix(); + Database.run( + 'UPDATE node_state ' + + 'SET modified_at = ?, last_status_mail_send = ?, last_status_mail_type = ?' + + 'WHERE id = ?', + [ + now, now, mailType, + nodeState.id + ], + mailCallback + ); + } + ); + } else { + Logger + .tag('monitoring', 'mail-sending') + .info('Monitoring disabled, skipping "%s" mail for: %s', name, mac); + } + }); + }, + sendNextBatch + ); + }); + }; + + sendNextBatch(null); + } + + function sendOnlineAgainMails(startTime, callback) { + sendMonitoringMailsBatched( + 'online again', + 'monitoring-online-again', + function (findBatchCallback) { + Database.all( + 'SELECT * FROM node_state ' + + 'WHERE modified_at < ? AND state = ? AND last_status_mail_type IN (' + + '\'monitoring-offline-1\', \'monitoring-offline-2\', \'monitoring-offline-3\'' + + ')' + + 'ORDER BY id ASC LIMIT ?', + [ + startTime.unix(), + 'ONLINE', + + MONITORING_MAILS_DB_BATCH_SIZE + ], + findBatchCallback + ); + }, + callback + ); + } + + function sendOfflineMails(startTime, mailNumber, callback) { + sendMonitoringMailsBatched( + 'offline ' + mailNumber, + 'monitoring-offline-' + mailNumber, + function (findBatchCallback) { + var previousType = + mailNumber === 1 + ? 'monitoring-online-again' + : ('monitoring-offline-' + (mailNumber - 1)); + + // the first time the first offline mail is send, there was no mail before + var allowNull = mailNumber === 1 ? ' OR last_status_mail_type IS NULL' : ''; + + var schedule = MONITORING_OFFLINE_MAILS_SCHEDULE[mailNumber]; + var lastSeenBefore = moment().subtract(schedule.amount, schedule.unit); + + Database.all( + 'SELECT * FROM node_state ' + + 'WHERE modified_at < ? AND state = ? AND (last_status_mail_type = ?' + allowNull + ') AND ' + + 'last_seen < ? ' + + 'ORDER BY id ASC LIMIT ?', + [ + startTime.unix(), + 'OFFLINE', + previousType, + lastSeenBefore.unix(), + + MONITORING_MAILS_DB_BATCH_SIZE + ], + findBatchCallback + ); + }, + callback + ); + } + + return { confirm: function (token, callback) { NodeService.getNodeDataByMonitoringToken(token, function (err, node, nodeSecrets) { if (err) { @@ -307,6 +473,50 @@ angular.module('ffffng') }); }, + sendMonitoringMails: function (callback) { + Logger.tag('monitoring', 'mail-sending').info('Sending monitoring mails...'); + + var startTime = moment(); + + sendOnlineAgainMails(startTime, function (err) { + if (err) { + // only logging an continuing with next type + Logger + .tag('monitoring', 'mail-sending') + .error('Error sending "online again" mails.', err); + } + + sendOfflineMails(startTime, 1, function (err) { + if (err) { + // only logging an continuing with next type + Logger + .tag('monitoring', 'mail-sending') + .error('Error sending "offline 1" mails.', err); + } + + sendOfflineMails(startTime, 2, function (err) { + if (err) { + // only logging an continuing with next type + Logger + .tag('monitoring', 'mail-sending') + .error('Error sending "offline 2" mails.', err); + } + + sendOfflineMails(startTime, 3, function (err) { + if (err) { + // only logging an continuing with next type + Logger + .tag('monitoring', 'mail-sending') + .error('Error sending "offline 3" mails.', err); + } + + callback(null); + }); + }); + }); + }); + }, + cleanupNodeInformation: function (callback) { var daysBeforeCleanup = 30; Logger