Typescript migration: Major refactoring.
This commit is contained in:
parent
31ecc0cf4f
commit
52822207a5
31 changed files with 2068 additions and 2268 deletions
server/services
174
server/services/mailService.ts
Normal file
174
server/services/mailService.ts
Normal file
|
@ -0,0 +1,174 @@
|
|||
import _ from "lodash";
|
||||
import deepExtend from "deep-extend";
|
||||
import moment, {Moment} from "moment";
|
||||
import {createTransport} from "nodemailer";
|
||||
|
||||
import {config} from "../config";
|
||||
import {db, Statement} from "../db/database";
|
||||
import Logger from "../logger";
|
||||
import * as MailTemplateService from "./mailTemplateService";
|
||||
import * as Resources from "../utils/resources";
|
||||
import {RestParams} from "../utils/resources";
|
||||
import {Mail, MailData, MailId, MailType} from "../types";
|
||||
|
||||
const MAIL_QUEUE_DB_BATCH_SIZE = 50;
|
||||
|
||||
const transporter = createTransport(deepExtend(
|
||||
{},
|
||||
config.server.email.smtp,
|
||||
{
|
||||
transport: 'smtp',
|
||||
pool: true
|
||||
}
|
||||
));
|
||||
|
||||
MailTemplateService.configureTransporter(transporter);
|
||||
|
||||
async function sendMail(options: Mail): Promise<void> {
|
||||
Logger
|
||||
.tag('mail', 'queue')
|
||||
.info(
|
||||
'Sending pending mail[%d] of type %s. ' +
|
||||
'Had %d failures before.',
|
||||
options.id, options.email, options.failures
|
||||
);
|
||||
|
||||
const renderedTemplate = await MailTemplateService.render(options);
|
||||
|
||||
const mailOptions = {
|
||||
from: options.sender,
|
||||
to: options.recipient,
|
||||
subject: renderedTemplate.subject,
|
||||
html: renderedTemplate.body
|
||||
};
|
||||
|
||||
await transporter.sendMail(mailOptions);
|
||||
|
||||
Logger.tag('mail', 'queue').info('Mail[%d] has been send.', options.id);
|
||||
}
|
||||
|
||||
async function findPendingMailsBefore(beforeMoment: Moment, limit: number): Promise<Mail[]> {
|
||||
const rows = await db.all(
|
||||
'SELECT * FROM email_queue WHERE modified_at < ? AND failures < ? ORDER BY id ASC LIMIT ?',
|
||||
[beforeMoment.unix(), 5, limit],
|
||||
);
|
||||
|
||||
return _.map(rows, row => deepExtend(
|
||||
{},
|
||||
row,
|
||||
{
|
||||
data: JSON.parse(row.data)
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
async function removePendingMailFromQueue(id: MailId): Promise<void> {
|
||||
await db.run('DELETE FROM email_queue WHERE id = ?', [id]);
|
||||
}
|
||||
|
||||
async function incrementFailureCounterForPendingEmail(id: MailId): Promise<void> {
|
||||
const now = moment();
|
||||
await db.run(
|
||||
'UPDATE email_queue SET failures = failures + 1, modified_at = ? WHERE id = ?',
|
||||
[now.unix(), id],
|
||||
);
|
||||
}
|
||||
|
||||
async function sendPendingMail(pendingMail: Mail): Promise<void> {
|
||||
try {
|
||||
await sendMail(pendingMail);
|
||||
}
|
||||
catch (error) {
|
||||
// we only log the error and increment the failure counter as we want to continue with pending mails
|
||||
Logger.tag('mail', 'queue').error('Error sending pending mail[' + pendingMail.id + ']:', error);
|
||||
|
||||
await incrementFailureCounterForPendingEmail(pendingMail.id);
|
||||
return;
|
||||
}
|
||||
|
||||
await removePendingMailFromQueue(pendingMail.id);
|
||||
}
|
||||
|
||||
async function doGetMail(id: MailId): Promise<Mail> {
|
||||
return await db.get('SELECT * FROM email_queue WHERE id = ?', [id]);
|
||||
}
|
||||
|
||||
export async function enqueue (sender: string, recipient: string, email: MailType, data: MailData): Promise<void> {
|
||||
if (!_.isPlainObject(data)) {
|
||||
throw new Error('Unexpected data: ' + data);
|
||||
}
|
||||
await db.run(
|
||||
'INSERT INTO email_queue ' +
|
||||
'(failures, sender, recipient, email, data) ' +
|
||||
'VALUES (?, ?, ?, ?, ?)',
|
||||
[0, sender, recipient, email, JSON.stringify(data)],
|
||||
);
|
||||
}
|
||||
|
||||
export async function getMail (id: MailId): Promise<Mail> {
|
||||
return await doGetMail(id);
|
||||
}
|
||||
|
||||
export async function getPendingMails (restParams: RestParams): Promise<{mails: Mail[], total: number}> {
|
||||
const row = await db.get(
|
||||
'SELECT count(*) AS total FROM email_queue',
|
||||
[],
|
||||
);
|
||||
|
||||
const total = row.total;
|
||||
|
||||
const filter = Resources.filterClause(
|
||||
restParams,
|
||||
'id',
|
||||
['id', 'failures', 'sender', 'recipient', 'email', 'created_at', 'modified_at'],
|
||||
['id', 'failures', 'sender', 'recipient', 'email']
|
||||
);
|
||||
|
||||
const mails = await db.all(
|
||||
'SELECT * FROM email_queue WHERE ' + filter.query,
|
||||
_.concat([], filter.params),
|
||||
);
|
||||
|
||||
return {
|
||||
mails,
|
||||
total
|
||||
}
|
||||
}
|
||||
|
||||
export async function deleteMail (id: MailId): Promise<void> {
|
||||
await removePendingMailFromQueue(id);
|
||||
}
|
||||
|
||||
export async function resetFailures (id: MailId): Promise<Mail> {
|
||||
const statement = await db.run(
|
||||
'UPDATE email_queue SET failures = 0, modified_at = ? WHERE id = ?',
|
||||
[moment().unix(), id],
|
||||
);
|
||||
|
||||
if (!statement.changes) {
|
||||
throw new Error('Error: could not reset failure count for mail: ' + id);
|
||||
}
|
||||
|
||||
return await doGetMail(id);
|
||||
}
|
||||
|
||||
export async function sendPendingMails (): Promise<void> {
|
||||
Logger.tag('mail', 'queue').debug('Start sending pending mails...');
|
||||
|
||||
const startTime = moment();
|
||||
|
||||
while (true) {
|
||||
Logger.tag('mail', 'queue').debug('Sending next batch...');
|
||||
|
||||
const pendingMails = await findPendingMailsBefore(startTime, MAIL_QUEUE_DB_BATCH_SIZE);
|
||||
|
||||
if (_.isEmpty(pendingMails)) {
|
||||
Logger.tag('mail', 'queue').debug('Done sending pending mails.');
|
||||
return;
|
||||
}
|
||||
|
||||
for (const pendingMail of pendingMails) {
|
||||
await sendPendingMail(pendingMail);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue