Monitoring job

This commit is contained in:
baldo 2016-05-24 23:32:04 +02:00
parent 0ccab4cac8
commit 5ff76db5a9
5 changed files with 380 additions and 155 deletions

View file

@ -0,0 +1 @@
ALTER TABLE node_state ADD COLUMN last_status_mail_type VARCHAR(20);

View file

@ -0,0 +1,13 @@
'use strict';
angular.module('ffffng').factory('MonitoringMailsSendingJob', function (MonitoringService, Logger) {
return {
run: function () {
MonitoringService.sendMonitoringMails(function (err) {
if (err) {
Logger.tag('monitoring', 'mail-sending').error('Error sending monitoring mails:', err);
}
});
}
};
});

View file

@ -31,7 +31,8 @@ angular.module('ffffng').factory('Scheduler', function ($injector, Logger, confi
schedule('0 */1 * * * *', 'MailQueueJob'); schedule('0 */1 * * * *', 'MailQueueJob');
if (config.client.monitoring.enabled) { if (config.client.monitoring.enabled) {
schedule('30 */5 * * * *', 'NodeInformationRetrievalJob'); schedule('30 */15 * * * *', 'NodeInformationRetrievalJob');
schedule('45 */5 * * * *', 'MonitoringMailsSendingJob');
schedule('0 0 3 * * *', 'NodeInformationCleanupJob'); // every night at 3:00 schedule('0 0 3 * * *', 'NodeInformationCleanupJob'); // every night at 3:00
} }
} }

View file

@ -2,7 +2,7 @@
angular.module('ffffng') angular.module('ffffng')
.service('MailService', function (Database, UrlBuilder, config, _, async, deepExtend, fs, moment, Logger) { .service('MailService', function (Database, UrlBuilder, config, _, async, deepExtend, fs, moment, Logger) {
var MAIL_QUEUE_DB_BATCH_SIZE = 2; var MAIL_QUEUE_DB_BATCH_SIZE = 50;
var MAIL_QUEUE_MAX_PARALLEL_SENDING = 3; var MAIL_QUEUE_MAX_PARALLEL_SENDING = 3;
var transporter = require('nodemailer').createTransport(deepExtend( var transporter = require('nodemailer').createTransport(deepExtend(

View file

@ -9,12 +9,21 @@ angular.module('ffffng')
ErrorTypes, ErrorTypes,
Logger, Logger,
moment, moment,
MailService,
NodeService, NodeService,
request, request,
Strings, Strings,
UrlBuilder,
Validator, Validator,
Constraints Constraints
) { ) {
var MONITORING_MAILS_DB_BATCH_SIZE = 50;
var MONITORING_OFFLINE_MAILS_SCHEDULE = {
1: { amount: 3, unit: 'hours' },
2: { amount: 1, unit: 'days' },
3: { amount: 7, unit: 'days' }
};
var previousImportTimestamp = null; var previousImportTimestamp = null;
function insertNodeInformation(nodeData, node, callback) { function insertNodeInformation(nodeData, node, callback) {
@ -24,13 +33,14 @@ angular.module('ffffng')
return Database.run( return Database.run(
'INSERT INTO node_state ' + 'INSERT INTO node_state ' +
'(mac, state, last_seen, import_timestamp, last_status_mail_send) ' + '(mac, state, last_seen, import_timestamp, last_status_mail_send, last_status_mail_type) ' +
'VALUES (?, ?, ?, ?, ?)', 'VALUES (?, ?, ?, ?, ?, ?)',
[ [
node.mac, node.mac,
nodeData.state, nodeData.state,
nodeData.lastSeen.unix(), nodeData.lastSeen.unix(),
nodeData.importTimestamp.unix(), nodeData.importTimestamp.unix(),
null, // new node so we haven't send a mail yet
null // new node so we haven't send a mail yet null // new node so we haven't send a mail yet
], ],
callback callback
@ -185,6 +195,162 @@ angular.module('ffffng')
callback(null, data); callback(null, data);
} }
function sendMonitoringMailsBatched(name, mailType, findBatchFun, callback) {
Logger.tag('monitoring', 'mail-sending').info('Sending "%s" mails...', name);
var sendNextBatch = function (err) {
if (err) {
return callback(err);
}
Logger.tag('monitoring', 'mail-sending').info('Sending next batch...');
findBatchFun(function (err, nodeStates) {
if (err) {
return callback(err);
}
if (_.isEmpty(nodeStates)) {
Logger.tag('monitoring', 'mail-sending').info('Done sending "%s" mails.', name);
return callback(null);
}
async.each(
nodeStates,
function (nodeState, mailCallback) {
var mac = nodeState.mac;
Logger.tag('monitoring', 'mail-sending').debug('Loading node data for: %s', mac);
NodeService.findNodeDataByMac(mac, function (err, node, nodeSecrets) {
if (err) {
Logger
.tag('monitoring', 'mail-sending')
.error('Error sending "' + name + '" mail for node: ' + mac, err);
return callback(err);
}
if (!node) {
Logger
.tag('monitoring', 'mail-sending')
.warn(
'Node not found. Skipping sending of "' + name + '" mail: ' + mac
);
return callback(null);
}
if (node.monitoring && node.monitoringConfirmed) {
Logger
.tag('monitoring', 'mail-sending')
.info('Sending "%s" mail for: %s', name, mac);
MailService.enqueue(
config.server.email.from,
node.nickname + ' <' + node.email + '>',
mailType,
{
node: node,
lastSeen: nodeState.last_seen,
disableUrl: UrlBuilder.monitoringDisableUrl(nodeSecrets)
},
function (err) {
if (err) {
Logger
.tag('monitoring', 'mail-sending')
.error('Error sending "' + name + '" mail for node: ' + mac, err);
return mailCallback(err);
}
Logger
.tag('monitoring', 'mail-sending')
.debug('Updating node state: ', mac);
var now = moment().unix();
Database.run(
'UPDATE node_state ' +
'SET modified_at = ?, last_status_mail_send = ?, last_status_mail_type = ?' +
'WHERE id = ?',
[
now, now, mailType,
nodeState.id
],
mailCallback
);
}
);
} else {
Logger
.tag('monitoring', 'mail-sending')
.info('Monitoring disabled, skipping "%s" mail for: %s', name, mac);
}
});
},
sendNextBatch
);
});
};
sendNextBatch(null);
}
function sendOnlineAgainMails(startTime, callback) {
sendMonitoringMailsBatched(
'online again',
'monitoring-online-again',
function (findBatchCallback) {
Database.all(
'SELECT * FROM node_state ' +
'WHERE modified_at < ? AND state = ? AND last_status_mail_type IN (' +
'\'monitoring-offline-1\', \'monitoring-offline-2\', \'monitoring-offline-3\'' +
')' +
'ORDER BY id ASC LIMIT ?',
[
startTime.unix(),
'ONLINE',
MONITORING_MAILS_DB_BATCH_SIZE
],
findBatchCallback
);
},
callback
);
}
function sendOfflineMails(startTime, mailNumber, callback) {
sendMonitoringMailsBatched(
'offline ' + mailNumber,
'monitoring-offline-' + mailNumber,
function (findBatchCallback) {
var previousType =
mailNumber === 1
? 'monitoring-online-again'
: ('monitoring-offline-' + (mailNumber - 1));
// the first time the first offline mail is send, there was no mail before
var allowNull = mailNumber === 1 ? ' OR last_status_mail_type IS NULL' : '';
var schedule = MONITORING_OFFLINE_MAILS_SCHEDULE[mailNumber];
var lastSeenBefore = moment().subtract(schedule.amount, schedule.unit);
Database.all(
'SELECT * FROM node_state ' +
'WHERE modified_at < ? AND state = ? AND (last_status_mail_type = ?' + allowNull + ') AND ' +
'last_seen < ? ' +
'ORDER BY id ASC LIMIT ?',
[
startTime.unix(),
'OFFLINE',
previousType,
lastSeenBefore.unix(),
MONITORING_MAILS_DB_BATCH_SIZE
],
findBatchCallback
);
},
callback
);
}
return { return {
confirm: function (token, callback) { confirm: function (token, callback) {
NodeService.getNodeDataByMonitoringToken(token, function (err, node, nodeSecrets) { NodeService.getNodeDataByMonitoringToken(token, function (err, node, nodeSecrets) {
@ -307,6 +473,50 @@ angular.module('ffffng')
}); });
}, },
sendMonitoringMails: function (callback) {
Logger.tag('monitoring', 'mail-sending').info('Sending monitoring mails...');
var startTime = moment();
sendOnlineAgainMails(startTime, function (err) {
if (err) {
// only logging an continuing with next type
Logger
.tag('monitoring', 'mail-sending')
.error('Error sending "online again" mails.', err);
}
sendOfflineMails(startTime, 1, function (err) {
if (err) {
// only logging an continuing with next type
Logger
.tag('monitoring', 'mail-sending')
.error('Error sending "offline 1" mails.', err);
}
sendOfflineMails(startTime, 2, function (err) {
if (err) {
// only logging an continuing with next type
Logger
.tag('monitoring', 'mail-sending')
.error('Error sending "offline 2" mails.', err);
}
sendOfflineMails(startTime, 3, function (err) {
if (err) {
// only logging an continuing with next type
Logger
.tag('monitoring', 'mail-sending')
.error('Error sending "offline 3" mails.', err);
}
callback(null);
});
});
});
});
},
cleanupNodeInformation: function (callback) { cleanupNodeInformation: function (callback) {
var daysBeforeCleanup = 30; var daysBeforeCleanup = 30;
Logger Logger