Sqlite upgrade and type refactorings
This commit is contained in:
parent
01691a0c20
commit
28c8429edd
20 changed files with 873 additions and 663 deletions
server/services
|
@ -9,7 +9,29 @@ import Logger from "../logger";
|
|||
import * as MailTemplateService from "./mailTemplateService";
|
||||
import * as Resources from "../utils/resources";
|
||||
import {RestParams} from "../utils/resources";
|
||||
import {isMailSortField, Mail, MailData, MailId, MailSortField, MailType} from "../types";
|
||||
import {
|
||||
EmailAddress, isJSONObject,
|
||||
isMailSortField, isMailType, JSONObject,
|
||||
Mail,
|
||||
MailData,
|
||||
MailId,
|
||||
MailSortField,
|
||||
MailType,
|
||||
parseJSON,
|
||||
UnixTimestampSeconds
|
||||
} from "../types";
|
||||
import ErrorTypes from "../utils/errorTypes";
|
||||
|
||||
type EmaiQueueRow = {
|
||||
id: MailId,
|
||||
created_at: UnixTimestampSeconds,
|
||||
data: string,
|
||||
email: string,
|
||||
failures: number,
|
||||
modified_at: UnixTimestampSeconds,
|
||||
recipient: EmailAddress,
|
||||
sender: EmailAddress,
|
||||
};
|
||||
|
||||
const MAIL_QUEUE_DB_BATCH_SIZE = 50;
|
||||
|
||||
|
@ -24,7 +46,7 @@ function transporter() {
|
|||
{
|
||||
transport: 'smtp',
|
||||
pool: true
|
||||
}
|
||||
} as JSONObject
|
||||
));
|
||||
|
||||
MailTemplateService.configureTransporter(transporterSingleton);
|
||||
|
@ -57,18 +79,29 @@ async function sendMail(options: Mail): Promise<void> {
|
|||
}
|
||||
|
||||
async function findPendingMailsBefore(beforeMoment: Moment, limit: number): Promise<Mail[]> {
|
||||
const rows = await db.all(
|
||||
const rows = await db.all<EmaiQueueRow>(
|
||||
'SELECT * FROM email_queue WHERE modified_at < ? AND failures < ? ORDER BY id ASC LIMIT ?',
|
||||
[beforeMoment.unix(), 5, limit],
|
||||
);
|
||||
|
||||
return _.map(rows, row => deepExtend(
|
||||
{},
|
||||
row,
|
||||
{
|
||||
data: JSON.parse(row.data)
|
||||
return rows.map(row => {
|
||||
const mailType = row.email;
|
||||
if (!isMailType(mailType)) {
|
||||
throw new Error(`Invalid mailtype in database: ${mailType}`);
|
||||
}
|
||||
));
|
||||
const data = parseJSON(row.data);
|
||||
if (!isJSONObject(data)) {
|
||||
throw new Error(`Invalid email data in database: ${typeof data}`);
|
||||
}
|
||||
return {
|
||||
id: row.id,
|
||||
email: mailType,
|
||||
sender: row.sender,
|
||||
recipient: row.recipient,
|
||||
data,
|
||||
failures: row.failures,
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
async function removePendingMailFromQueue(id: MailId): Promise<void> {
|
||||
|
@ -85,8 +118,7 @@ async function incrementFailureCounterForPendingEmail(id: MailId): Promise<void>
|
|||
async function sendPendingMail(pendingMail: Mail): Promise<void> {
|
||||
try {
|
||||
await sendMail(pendingMail);
|
||||
}
|
||||
catch (error) {
|
||||
} catch (error) {
|
||||
// we only log the error and increment the failure counter as we want to continue with pending mails
|
||||
Logger.tag('mail', 'queue').error('Error sending pending mail[' + pendingMail.id + ']:', error);
|
||||
|
||||
|
@ -98,10 +130,14 @@ async function sendPendingMail(pendingMail: Mail): Promise<void> {
|
|||
}
|
||||
|
||||
async function doGetMail(id: MailId): Promise<Mail> {
|
||||
return await db.get('SELECT * FROM email_queue WHERE id = ?', [id]);
|
||||
const row = await db.get<Mail>('SELECT * FROM email_queue WHERE id = ?', [id]);
|
||||
if (row === undefined) {
|
||||
throw {data: 'Mail not found.', type: ErrorTypes.notFound};
|
||||
}
|
||||
return row;
|
||||
}
|
||||
|
||||
export async function enqueue (sender: string, recipient: string, email: MailType, data: MailData): Promise<void> {
|
||||
export async function enqueue(sender: string, recipient: string, email: MailType, data: MailData): Promise<void> {
|
||||
if (!_.isPlainObject(data)) {
|
||||
throw new Error('Unexpected data: ' + data);
|
||||
}
|
||||
|
@ -113,17 +149,17 @@ export async function enqueue (sender: string, recipient: string, email: MailTyp
|
|||
);
|
||||
}
|
||||
|
||||
export async function getMail (id: MailId): Promise<Mail> {
|
||||
export async function getMail(id: MailId): Promise<Mail> {
|
||||
return await doGetMail(id);
|
||||
}
|
||||
|
||||
export async function getPendingMails (restParams: RestParams): Promise<{mails: Mail[], total: number}> {
|
||||
const row = await db.get(
|
||||
export async function getPendingMails(restParams: RestParams): Promise<{ mails: Mail[], total: number }> {
|
||||
const row = await db.get<{ total: number }>(
|
||||
'SELECT count(*) AS total FROM email_queue',
|
||||
[],
|
||||
);
|
||||
|
||||
const total = row.total;
|
||||
const total = row?.total || 0;
|
||||
|
||||
const filter = Resources.filterClause(
|
||||
restParams,
|
||||
|
@ -143,11 +179,11 @@ export async function getPendingMails (restParams: RestParams): Promise<{mails:
|
|||
}
|
||||
}
|
||||
|
||||
export async function deleteMail (id: MailId): Promise<void> {
|
||||
export async function deleteMail(id: MailId): Promise<void> {
|
||||
await removePendingMailFromQueue(id);
|
||||
}
|
||||
|
||||
export async function resetFailures (id: MailId): Promise<Mail> {
|
||||
export async function resetFailures(id: MailId): Promise<Mail> {
|
||||
const statement = await db.run(
|
||||
'UPDATE email_queue SET failures = 0, modified_at = ? WHERE id = ?',
|
||||
[moment().unix(), id],
|
||||
|
@ -160,7 +196,7 @@ export async function resetFailures (id: MailId): Promise<Mail> {
|
|||
return await doGetMail(id);
|
||||
}
|
||||
|
||||
export async function sendPendingMails (): Promise<void> {
|
||||
export async function sendPendingMails(): Promise<void> {
|
||||
Logger.tag('mail', 'queue').debug('Start sending pending mails...');
|
||||
|
||||
const startTime = moment();
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue