Mail-Queue management in admin panel.

This commit is contained in:
baldo 2016-06-11 12:08:50 +02:00
parent 807f3f5fb2
commit 26aaec385a
8 changed files with 376 additions and 3 deletions

View file

@ -21,6 +21,7 @@ require('./utils/urlBuilder');
require('./resources/frontendResource');
require('./resources/taskResource');
require('./resources/mailResource');
require('./resources/nodeResource');
require('./resources/monitoringResource');

View file

@ -0,0 +1,101 @@
'use strict';
angular.module('ffffng').factory('MailResource', function (
Constraints,
Validator,
MailService,
Resources,
Logger,
ErrorTypes,
Strings
) {
var isValidId = Validator.forConstraint(Constraints.id);
function withValidMailId(req, res, callback) {
var id = Strings.normalizeString(Resources.getData(req).id);
if (!isValidId(id)) {
return callback({data: 'Invalid mail id.', type: ErrorTypes.badRequest});
}
callback(null, id);
}
return {
get: function (req, res) {
withValidMailId(req, res, function (err, id) {
if (err) {
return Resources.error(res, err);
}
MailService.getMail(id, function (err, mail) {
if (err) {
Logger.tag('mails', 'admin').error('Error getting mail:', err);
return Resources.error(res, {data: 'Internal error.', type: ErrorTypes.internalError});
}
if (!mail) {
return Resources.error(res, {data: 'Mail not found.', type: ErrorTypes.notFound});
}
return Resources.success(res, mail);
});
});
},
getAll: function (req, res) {
Resources.getValidRestParams('list', req, function (err, restParams) {
if (err) {
return Resources.error(res, err);
}
return MailService.getPendingMails(
restParams,
function (err, mails, total) {
if (err) {
Logger.tag('mails', 'admin').error('Could not get pending mails:', err);
return Resources.error(res, {data: 'Internal error.', type: ErrorTypes.internalError});
}
res.set('X-Total-Count', total);
return Resources.success(res, mails);
}
);
});
},
delete: function (req, res) {
withValidMailId(req, res, function (err, id) {
if (err) {
return Resources.error(res, err);
}
MailService.deleteMail(id, function (err) {
if (err) {
Logger.tag('mails', 'admin').error('Error deleting mail:', err);
return Resources.error(res, {data: 'Internal error.', type: ErrorTypes.internalError});
}
return Resources.success(res);
});
});
},
resetFailures: function (req, res) {
withValidMailId(req, res, function (err, id) {
if (err) {
return Resources.error(res, err);
}
MailService.resetFailures(id, function (err, mail) {
if (err) {
Logger.tag('mails', 'admin').error('Error resetting failure count:', err);
return Resources.error(res, {data: 'Internal error.', type: ErrorTypes.internalError});
}
return Resources.success(res, mail);
});
});
}
};
});

View file

@ -5,7 +5,8 @@ angular.module('ffffng').factory('Router', function (
FrontendResource,
NodeResource,
MonitoringResource,
TaskResource
TaskResource,
MailResource
) {
return {
init: function () {
@ -24,6 +25,11 @@ angular.module('ffffng').factory('Router', function (
app.put('/internal/api/tasks/enable/:id', TaskResource.enable);
app.put('/internal/api/tasks/disable/:id', TaskResource.disable);
app.get('/internal/api/mails', MailResource.getAll);
app.get('/internal/api/mails/:id', MailResource.get);
app.delete('/internal/api/mails/:id', MailResource.delete);
app.put('/internal/api/mails/reset/:id', MailResource.resetFailures);
app.put('/internal/api/nodes/:token', NodeResource.update);
app.delete('/internal/api/nodes/:token', NodeResource.delete);
app.get('/internal/api/nodes', NodeResource.getAll);

View file

@ -1,7 +1,18 @@
'use strict';
angular.module('ffffng')
.service('MailService', function (Database, UrlBuilder, config, _, async, deepExtend, fs, moment, Logger) {
.service('MailService', function (
Database,
UrlBuilder,
config,
_,
async,
deepExtend,
fs,
moment,
Logger,
Resources
) {
var MAIL_QUEUE_DB_BATCH_SIZE = 50;
var MAIL_QUEUE_MAX_PARALLEL_SENDING = 3;
@ -83,7 +94,7 @@ angular.module('ffffng')
function findPendingMailsBefore(beforeMoment, limit, callback) {
Database.all(
'SELECT * FROM email_queue WHERE modified_at < ? AND failures < ? ORDER BY id ASC LIMIT ?',
[beforeMoment.unix(), 5, limit], // TODO: retrycount
[beforeMoment.unix(), 5, limit],
function (err, rows) {
if (err) {
return callback(err);
@ -141,6 +152,10 @@ angular.module('ffffng')
});
}
function doGetMail(id, callback) {
Database.get('SELECT * FROM email_queue WHERE id = ?', [id], callback);
}
return {
enqueue: function (sender, recipient, email, data, callback) {
if (!_.isPlainObject(data)) {
@ -157,6 +172,65 @@ angular.module('ffffng')
);
},
getMail: function (id, callback) {
doGetMail(id, callback);
},
getPendingMails: function (restParams, callback) {
Database.get(
'SELECT count(*) AS total FROM email_queue',
[],
function (err, row) {
if (err) {
return callback(err);
}
var total = row.total;
var filter = Resources.filterClause(
restParams,
'id',
['id', 'failures', 'sender', 'recipient', 'email', 'created_at', 'modified_at'],
['id', 'failures', 'sender', 'recipient', 'email']
);
Database.all(
'SELECT * FROM email_queue WHERE ' + filter.query,
_.concat([], filter.params),
function (err, rows) {
if (err) {
return callback(err);
}
callback(null, rows, total);
}
);
}
);
},
deleteMail: function (id, callback) {
removePendingMailFromQueue(id, callback);
},
resetFailures: function (id, callback) {
Database.run(
'UPDATE email_queue SET failures = 0, modified_at = ? WHERE id = ?',
[moment().unix(), id],
function (err) {
if (err) {
return callback(err);
}
if (!this.changes) {
return callback('Error: could not reset failure count for mail: ' + id);
}
doGetMail(id, callback);
}
);
},
sendPendingMails: function (callback) {
Logger.tag('mail', 'queue').info('Start sending pending mails...');

View file

@ -15,6 +15,61 @@ angular.module('ffffng').factory('Resources', function (_, Constraints, Validato
}
}
function orderByClause(restParams, defaultSortField, allowedSortFields) {
var sortField = _.includes(allowedSortFields, restParams._sortField) ? restParams._sortField : undefined;
if (!sortField) {
sortField = defaultSortField;
}
return {
query: 'ORDER BY ' + sortField + ' ' + (restParams._sortDir === 'ASC' ? 'ASC' : 'DESC'),
params: []
}
}
function limitOffsetClause(restParams) {
var page = restParams._page;
var perPage = restParams._perPage;
return {
query: 'LIMIT ? OFFSET ?',
params: [perPage, ((page - 1) * perPage)]
};
}
function escapeForLikePattern(str) {
return str
.replace(/\\/g, '\\\\')
.replace(/%/g, '\\%')
.replace(/_/g, '\\_')
}
function filterCondition(restParams, filterFields) {
if (_.isEmpty(filterFields)) {
return {
query: '1 = 1',
params: []
}
}
var query = _.join(
_.map(filterFields, function (field) {
return 'LOWER(' + field + ') LIKE ?';
}),
' OR '
);
query += ' ESCAPE \'\\\'';
var search = '%' + (_.isString(restParams.q) ? escapeForLikePattern(_.toLower(restParams.q.trim())) : '') + '%';
var params = _.times(filterFields.length, _.constant(search));
return {
query: query,
params: params
};
}
return {
getData: function (req) {
return _.extend({}, req.body, req.params, req.query);
@ -92,6 +147,25 @@ angular.module('ffffng').factory('Resources', function (_, Constraints, Validato
return entities.slice((page - 1) * perPage, page * perPage);
},
filterClause: function (restParams, defaultSortField, allowedSortFields, filterFields) {
var orderBy = orderByClause(
restParams,
defaultSortField,
allowedSortFields
);
var limitOffset = limitOffsetClause(restParams);
var filter = filterCondition(
restParams,
filterFields
);
return {
query: filter.query + ' ' + orderBy.query + ' ' + limitOffset.query,
params: _.concat(filter.params, orderBy.params, limitOffset.params)
}
},
success: function (res, data) {
respond(res, 200, data, 'json');
},