Major refactoring and fixes.

* Split Node into multiple types and make sure fields are actually set
  when type says so.
* Refactor request handling.
* Start getting rid of moment as a dependency by using
  UnixTimestampSeconds instead.
This commit is contained in:
baldo 2022-07-21 18:39:33 +02:00
parent cfa784dfe2
commit 250353edbf
16 changed files with 676 additions and 455 deletions
server/services

View file

@ -1,5 +1,4 @@
import _ from "lodash";
import moment, {Moment, unitOfTime} from "moment";
import request from "request";
import {config} from "../config";
@ -18,21 +17,25 @@ import CONSTRAINTS from "../validation/constraints";
import {forConstraint} from "../validation/validator";
import {
Domain,
DurationSeconds,
Hostname,
isMonitoringSortField,
isOnlineState,
MAC,
MailType,
MonitoringSortField,
MonitoringState,
MonitoringToken,
Node,
NodeId,
NodeStateData,
OnlineState,
RunResult,
Site,
StoredNode,
toCreateOrUpdateNode,
UnixTimestampSeconds
} from "../types";
import {days, formatTimestamp, hours, now, parseTimestamp, subtract, weeks} from "../utils/time";
type NodeStateRow = {
id: number,
@ -56,27 +59,24 @@ const MONITORING_MAILS_DB_BATCH_SIZE = 50;
/**
* Defines the intervals emails are sent if a node is offline
*/
const MONITORING_OFFLINE_MAILS_SCHEDULE: { [key: number]: { amount: number, unit: unitOfTime.DurationConstructor } } = {
1: {amount: 3, unit: 'hours'},
2: {amount: 1, unit: 'days'},
3: {amount: 7, unit: 'days'}
};
const DELETE_OFFLINE_NODES_AFTER_DURATION: { amount: number, unit: unitOfTime.DurationConstructor } = {
amount: 100,
unit: 'days'
const MONITORING_OFFLINE_MAILS_SCHEDULE: Record<number, DurationSeconds> = {
1: hours(3),
2: days(1),
3: weeks(1),
};
const DELETE_OFFLINE_NODES_AFTER_DURATION: DurationSeconds = days(100);
export type ParsedNode = {
mac: MAC,
importTimestamp: Moment,
importTimestamp: UnixTimestampSeconds,
state: OnlineState,
lastSeen: Moment,
lastSeen: UnixTimestampSeconds,
site: Site,
domain: Domain,
};
export type NodesParsingResult = {
importTimestamp: Moment,
importTimestamp: UnixTimestampSeconds,
nodes: ParsedNode[],
failedNodesCount: number,
totalNodesCount: number,
@ -87,9 +87,9 @@ export type RetrieveNodeInformationResult = {
totalNodesCount: number,
};
let previousImportTimestamp: Moment | null = null;
let previousImportTimestamp: UnixTimestampSeconds | null = null;
async function insertNodeInformation(nodeData: ParsedNode, node: Node): Promise<void> {
async function insertNodeInformation(nodeData: ParsedNode, node: StoredNode): Promise<void> {
Logger
.tag('monitoring', 'information-retrieval')
.debug('Node is new in monitoring, creating data: %s', nodeData.mac);
@ -105,20 +105,20 @@ async function insertNodeInformation(nodeData: ParsedNode, node: Node): Promise<
nodeData.domain,
node.monitoringState,
nodeData.state,
nodeData.lastSeen.unix(),
nodeData.importTimestamp.unix(),
nodeData.lastSeen,
nodeData.importTimestamp,
null, // new node so we haven't send a mail yet
null // new node so we haven't send a mail yet
]
);
}
async function updateNodeInformation(nodeData: ParsedNode, node: Node, row: any): Promise<void> {
async function updateNodeInformation(nodeData: ParsedNode, node: StoredNode, row: any): Promise<void> {
Logger
.tag('monitoring', 'informacallbacktion-retrieval')
.debug('Node is known in monitoring: %s', nodeData.mac);
if (!moment(row.import_timestamp).isBefore(nodeData.importTimestamp)) {
if (row.import_timestamp >= nodeData.importTimestamp) {
Logger
.tag('monitoring', 'information-retrieval')
.debug('No new data for node, skipping: %s', nodeData.mac);
@ -147,9 +147,9 @@ async function updateNodeInformation(nodeData: ParsedNode, node: Node, row: any)
nodeData.domain || row.domain,
node.monitoringState,
nodeData.state,
nodeData.lastSeen.unix(),
nodeData.importTimestamp.unix(),
moment().unix(),
nodeData.lastSeen,
nodeData.importTimestamp,
now(),
row.id,
node.mac
@ -157,7 +157,7 @@ async function updateNodeInformation(nodeData: ParsedNode, node: Node, row: any)
);
}
async function storeNodeInformation(nodeData: ParsedNode, node: Node): Promise<void> {
async function storeNodeInformation(nodeData: ParsedNode, node: StoredNode): Promise<void> {
Logger.tag('monitoring', 'information-retrieval').debug('Storing status for node: %s', nodeData.mac);
const row = await db.get('SELECT * FROM node_state WHERE mac = ?', [node.mac]);
@ -171,15 +171,8 @@ async function storeNodeInformation(nodeData: ParsedNode, node: Node): Promise<v
const isValidMac = forConstraint(CONSTRAINTS.node.mac, false);
export function parseTimestamp(timestamp: any): Moment {
if (!_.isString(timestamp)) {
return moment.invalid();
}
return moment.utc(timestamp);
}
// TODO: Use sparkson for JSON parsing.
export function parseNode(importTimestamp: Moment, nodeData: any): ParsedNode {
export function parseNode(importTimestamp: UnixTimestampSeconds, nodeData: any): ParsedNode {
if (!_.isPlainObject(nodeData)) {
throw new Error(
'Unexpected node type: ' + (typeof nodeData)
@ -225,7 +218,7 @@ export function parseNode(importTimestamp: Moment, nodeData: any): ParsedNode {
const isOnline = nodeData.flags.online;
const lastSeen = parseTimestamp(nodeData.lastseen);
if (!lastSeen.isValid()) {
if (lastSeen === null) {
throw new Error(
'Node ' + nodeId + ': Invalid lastseen timestamp: ' + nodeData.lastseen
);
@ -245,7 +238,7 @@ export function parseNode(importTimestamp: Moment, nodeData: any): ParsedNode {
mac,
importTimestamp: importTimestamp,
state: isOnline ? OnlineState.ONLINE : OnlineState.OFFLINE,
lastSeen: lastSeen,
lastSeen,
site,
domain,
};
@ -266,17 +259,18 @@ export function parseNodesJson(body: string): NodesParsingResult {
throw new Error(`Unexpected nodes.json version "${json.version}". Expected: "${expectedVersion}"`);
}
const importTimestamp = parseTimestamp(json.timestamp);
if (importTimestamp === null) {
throw new Error('Invalid timestamp: ' + json.timestamp);
}
const result: NodesParsingResult = {
importTimestamp: parseTimestamp(json.timestamp),
importTimestamp,
nodes: [],
failedNodesCount: 0,
totalNodesCount: 0,
};
if (!result.importTimestamp.isValid()) {
throw new Error('Invalid timestamp: ' + json.timestamp);
}
if (!_.isArray(json.nodes)) {
throw new Error('Invalid nodes array type: ' + (typeof json.nodes));
}
@ -296,13 +290,13 @@ export function parseNodesJson(body: string): NodesParsingResult {
return result;
}
async function updateSkippedNode(id: NodeId, node?: Node): Promise<RunResult> {
async function updateSkippedNode(id: NodeId, node?: StoredNode): Promise<RunResult> {
return await db.run(
'UPDATE node_state ' +
'SET hostname = ?, monitoring_state = ?, modified_at = ?' +
'WHERE id = ?',
[
node ? node.hostname : '', node ? node.monitoringState : '', moment().unix(),
node ? node.hostname : '', node ? node.monitoringState : '', now(),
id
]
);
@ -328,7 +322,7 @@ async function sendMonitoringMailsBatched(
const mac = nodeState.mac;
Logger.tag('monitoring', 'mail-sending').debug('Loading node data for: %s', mac);
const result = await NodeService.getNodeDataWithSecretsByMac(mac);
const result = await NodeService.findNodeDataWithSecretsByMac(mac);
if (!result) {
Logger
.tag('monitoring', 'mail-sending')
@ -341,11 +335,11 @@ async function sendMonitoringMailsBatched(
const {node, nodeSecrets} = result;
if (!(node.monitoring && node.monitoringConfirmed)) {
if (node.monitoringState !== MonitoringState.ACTIVE) {
Logger
.tag('monitoring', 'mail-sending')
.debug('Monitoring disabled, skipping "%s" mail for: %s', name, mac);
await updateSkippedNode(nodeState.id);
await updateSkippedNode(nodeState.id, node);
continue;
}
@ -354,7 +348,7 @@ async function sendMonitoringMailsBatched(
Logger
.tag('monitoring', 'mail-sending')
.error('Node has no monitoring token. Cannot send mail "%s" for: %s', name, mac);
await updateSkippedNode(nodeState.id);
await updateSkippedNode(nodeState.id, node);
continue;
}
@ -377,13 +371,13 @@ async function sendMonitoringMailsBatched(
.tag('monitoring', 'mail-sending')
.debug('Updating node state: ', mac);
const now = moment().unix();
const timestamp = now();
await db.run(
'UPDATE node_state ' +
'SET hostname = ?, monitoring_state = ?, modified_at = ?, last_status_mail_sent = ?, last_status_mail_type = ?' +
'WHERE id = ?',
[
node.hostname, node.monitoringState, now, now, mailType,
node.hostname, node.monitoringState, timestamp, timestamp, mailType,
nodeState.id
]
);
@ -391,7 +385,7 @@ async function sendMonitoringMailsBatched(
}
}
async function sendOnlineAgainMails(startTime: Moment): Promise<void> {
async function sendOnlineAgainMails(startTime: UnixTimestampSeconds): Promise<void> {
await sendMonitoringMailsBatched(
'online again',
MailType.MONITORING_ONLINE_AGAIN,
@ -402,7 +396,7 @@ async function sendOnlineAgainMails(startTime: Moment): Promise<void> {
')' +
'ORDER BY id ASC LIMIT ?',
[
startTime.unix(),
startTime,
'ONLINE',
MONITORING_MAILS_DB_BATCH_SIZE
@ -411,7 +405,7 @@ async function sendOnlineAgainMails(startTime: Moment): Promise<void> {
);
}
async function sendOfflineMails(startTime: Moment, mailType: MailType): Promise<void> {
async function sendOfflineMails(startTime: UnixTimestampSeconds, mailType: MailType): Promise<void> {
const mailNumber = parseInteger(mailType.split("-")[2]);
await sendMonitoringMailsBatched(
'offline ' + mailNumber,
@ -424,7 +418,7 @@ async function sendOfflineMails(startTime: Moment, mailType: MailType): Promise<
const allowNull = mailNumber === 1 ? ' OR last_status_mail_type IS NULL' : '';
const schedule = MONITORING_OFFLINE_MAILS_SCHEDULE[mailNumber];
const scheduledTimeBefore = moment().subtract(schedule.amount, schedule.unit);
const scheduledTimeBefore = subtract(now(), schedule);
return await db.all(
'SELECT * FROM node_state ' +
@ -432,11 +426,11 @@ async function sendOfflineMails(startTime: Moment, mailType: MailType): Promise<
'last_seen <= ? AND (last_status_mail_sent <= ? OR last_status_mail_sent IS NULL) ' +
'ORDER BY id ASC LIMIT ?',
[
startTime.unix(),
startTime,
'OFFLINE',
previousType,
scheduledTimeBefore.unix(),
scheduledTimeBefore.unix(),
scheduledTimeBefore,
scheduledTimeBefore,
MONITORING_MAILS_DB_BATCH_SIZE
],
@ -487,10 +481,10 @@ async function retrieveNodeInformationForUrls(urls: string[]): Promise<RetrieveN
let totalNodesCount = 0;
for (const data of datas) {
if (data.importTimestamp.isAfter(maxTimestamp)) {
if (data.importTimestamp >= maxTimestamp) {
maxTimestamp = data.importTimestamp;
}
if (data.importTimestamp.isBefore(minTimestamp)) {
if (data.importTimestamp <= minTimestamp) {
minTimestamp = data.importTimestamp;
}
@ -498,13 +492,13 @@ async function retrieveNodeInformationForUrls(urls: string[]): Promise<RetrieveN
totalNodesCount += data.totalNodesCount;
}
if (previousImportTimestamp !== null && !maxTimestamp.isAfter(previousImportTimestamp)) {
if (previousImportTimestamp !== null && maxTimestamp >= previousImportTimestamp) {
Logger
.tag('monitoring', 'information-retrieval')
.debug(
'No new data, skipping. Current timestamp: %s, previous timestamp: %s',
maxTimestamp.format(),
previousImportTimestamp.format()
formatTimestamp(maxTimestamp),
formatTimestamp(previousImportTimestamp)
);
return {
failedParsingNodesCount,
@ -518,7 +512,7 @@ async function retrieveNodeInformationForUrls(urls: string[]): Promise<RetrieveN
const allNodes = _.flatMap(datas, data => data.nodes);
// Get rid of duplicates from different nodes.json files. Always use the one with the newest
const sortedNodes = _.orderBy(allNodes, [node => node.lastSeen.unix()], ['desc']);
const sortedNodes = _.orderBy(allNodes, [node => node.lastSeen], ['desc']);
const uniqueNodes = _.uniqBy(sortedNodes, function (node) {
return node.mac;
});
@ -526,7 +520,7 @@ async function retrieveNodeInformationForUrls(urls: string[]): Promise<RetrieveN
for (const nodeData of uniqueNodes) {
Logger.tag('monitoring', 'information-retrieval').debug('Importing: %s', nodeData.mac);
const result = await NodeService.getNodeDataByMac(nodeData.mac);
const result = await NodeService.findNodeDataByMac(nodeData.mac);
if (!result) {
Logger
.tag('monitoring', 'information-retrieval')
@ -551,8 +545,8 @@ async function retrieveNodeInformationForUrls(urls: string[]): Promise<RetrieveN
'SET state = ?, modified_at = ?' +
'WHERE import_timestamp < ?',
[
OnlineState.OFFLINE, moment().unix(),
minTimestamp.unix()
OnlineState.OFFLINE, now(),
minTimestamp
]
);
@ -627,34 +621,40 @@ export async function getByMacs(macs: MAC[]): Promise<Record<MAC, NodeStateData>
return nodeStateByMac;
}
export async function confirm(token: MonitoringToken): Promise<Node> {
export async function confirm(token: MonitoringToken): Promise<StoredNode> {
const {node, nodeSecrets} = await NodeService.getNodeDataWithSecretsByMonitoringToken(token);
if (!node.monitoring || !nodeSecrets.monitoringToken || nodeSecrets.monitoringToken !== token) {
if (node.monitoringState === MonitoringState.DISABLED || !nodeSecrets.monitoringToken || nodeSecrets.monitoringToken !== token) {
throw {data: 'Invalid token.', type: ErrorTypes.badRequest};
}
if (node.monitoringConfirmed) {
if (node.monitoringState === MonitoringState.ACTIVE) {
return node;
}
node.monitoringConfirmed = true;
const {node: newNode} = await NodeService.internalUpdateNode(node.token, node, nodeSecrets);
return newNode;
node.monitoringState = MonitoringState.ACTIVE;
return await NodeService.internalUpdateNode(
node.token,
toCreateOrUpdateNode(node),
node.monitoringState,
nodeSecrets
);
}
export async function disable(token: MonitoringToken): Promise<Node> {
export async function disable(token: MonitoringToken): Promise<StoredNode> {
const {node, nodeSecrets} = await NodeService.getNodeDataWithSecretsByMonitoringToken(token);
if (!node.monitoring || !nodeSecrets.monitoringToken || nodeSecrets.monitoringToken !== token) {
if (node.monitoringState === MonitoringState.DISABLED || !nodeSecrets.monitoringToken || nodeSecrets.monitoringToken !== token) {
throw {data: 'Invalid token.', type: ErrorTypes.badRequest};
}
node.monitoring = false;
node.monitoringConfirmed = false;
node.monitoringState = MonitoringState.DISABLED;
nodeSecrets.monitoringToken = undefined;
const {node: newNode} = await NodeService.internalUpdateNode(node.token, node, nodeSecrets);
return newNode;
return await NodeService.internalUpdateNode(
node.token,
toCreateOrUpdateNode(node),
node.monitoringState,
nodeSecrets
);
}
export async function retrieveNodeInformation(): Promise<RetrieveNodeInformationResult> {
@ -669,7 +669,7 @@ export async function retrieveNodeInformation(): Promise<RetrieveNodeInformation
export async function sendMonitoringMails(): Promise<void> {
Logger.tag('monitoring', 'mail-sending').debug('Sending monitoring mails...');
const startTime = moment();
const startTime = now();
try {
await sendOnlineAgainMails(startTime);
@ -696,24 +696,18 @@ export async function sendMonitoringMails(): Promise<void> {
}
}
function toUnixTimestamp(moment: Moment): UnixTimestampSeconds {
return moment.unix() as UnixTimestampSeconds;
}
export async function deleteOfflineNodes(): Promise<void> {
Logger
.tag('nodes', 'delete-offline')
.info(
'Deleting offline nodes older than ' +
DELETE_OFFLINE_NODES_AFTER_DURATION.amount + ' ' +
DELETE_OFFLINE_NODES_AFTER_DURATION.unit
`Deleting offline nodes older than ${DELETE_OFFLINE_NODES_AFTER_DURATION} seconds.`
);
const deleteBefore =
toUnixTimestamp(moment().subtract(
DELETE_OFFLINE_NODES_AFTER_DURATION.amount,
DELETE_OFFLINE_NODES_AFTER_DURATION.unit
));
subtract(
now(),
DELETE_OFFLINE_NODES_AFTER_DURATION,
);
await deleteNeverOnlineNodesBefore(deleteBefore);
await deleteNodesOfflineSinceBefore(deleteBefore);
@ -727,7 +721,7 @@ async function deleteNeverOnlineNodesBefore(deleteBefore: UnixTimestampSeconds):
deleteBefore
);
const deletionCandidates: Node[] = await NodeService.findNodesModifiedBefore(deleteBefore);
const deletionCandidates: StoredNode[] = await NodeService.findNodesModifiedBefore(deleteBefore);
Logger
.tag('nodes', 'delete-never-online')
@ -816,7 +810,7 @@ async function deleteNodeByMac(mac: MAC): Promise<void> {
let node;
try {
node = await NodeService.getNodeDataByMac(mac);
node = await NodeService.findNodeDataByMac(mac);
} catch (error) {
// Only log error. We try to delete the nodes state anyways.
Logger.tag('nodes', 'delete-offline').error('Could not find node to delete: ' + mac, error);