import _ from "lodash"; import request from "request"; import { config } from "../config"; import { db } from "../db/database"; import * as DatabaseUtil from "../utils/databaseUtil"; import ErrorTypes from "../utils/errorTypes"; import Logger from "../logger"; import * as MailService from "../services/mailService"; import * as NodeService from "../services/nodeService"; import * as Resources from "../utils/resources"; import { RestParams } from "../utils/resources"; import { normalizeMac, parseInteger } from "../shared/utils/strings"; import { monitoringDisableUrl } from "../utils/urlBuilder"; import CONSTRAINTS from "../shared/validation/constraints"; import { forConstraint } from "../shared/validation/validator"; import { Domain, DurationSeconds, Hostname, isBoolean, isDomain, isMonitoringSortField, isOnlineState, isSite, isString, isUndefined, MAC, MailType, MonitoringSortField, MonitoringState, MonitoringToken, NodeId, NodeStateData, OnlineState, RunResult, Site, StoredNode, toCreateOrUpdateNode, UnixTimestampSeconds, } from "../types"; import { days, formatTimestamp, hours, now, parseTimestamp, subtract, weeks, } from "../utils/time"; type NodeStateRow = { id: number; created_at: UnixTimestampSeconds; domain: Domain | null; hostname: Hostname | null; import_timestamp: UnixTimestampSeconds; last_seen: UnixTimestampSeconds; last_status_mail_sent: string | null; last_status_mail_type: string | null; mac: MAC; modified_at: UnixTimestampSeconds; monitoring_state: string | null; site: Site | null; state: string; }; const MONITORING_STATE_MACS_CHUNK_SIZE = 100; const NEVER_ONLINE_NODES_DELETION_CHUNK_SIZE = 20; const MONITORING_MAILS_DB_BATCH_SIZE = 50; /** * Defines the intervals emails are sent if a node is offline */ const MONITORING_OFFLINE_MAILS_SCHEDULE: Record = { 1: hours(3), 2: days(1), 3: weeks(1), }; const DELETE_OFFLINE_NODES_AFTER_DURATION: DurationSeconds = days(100); export type ParsedNode = { mac: MAC; importTimestamp: UnixTimestampSeconds; state: OnlineState; lastSeen: UnixTimestampSeconds; site?: Site; domain?: Domain; }; export type NodesParsingResult = { importTimestamp: UnixTimestampSeconds; nodes: ParsedNode[]; failedNodesCount: number; totalNodesCount: number; }; export type RetrieveNodeInformationResult = { failedParsingNodesCount: number; totalNodesCount: number; }; let previousImportTimestamp: UnixTimestampSeconds | null = null; async function insertNodeInformation( nodeData: ParsedNode, node: StoredNode ): Promise { Logger.tag("monitoring", "information-retrieval").debug( "Node is new in monitoring, creating data: %s", nodeData.mac ); await db.run( "INSERT INTO node_state " + "(hostname, mac, site, domain, monitoring_state, state, last_seen, import_timestamp, last_status_mail_sent, last_status_mail_type) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", [ node.hostname, node.mac, nodeData.site, nodeData.domain, node.monitoringState, nodeData.state, nodeData.lastSeen, nodeData.importTimestamp, null, // new node so we haven't send a mail yet null, // new node so we haven't send a mail yet ] ); } async function updateNodeInformation( nodeData: ParsedNode, node: StoredNode, row: any ): Promise { Logger.tag("monitoring", "informacallbacktion-retrieval").debug( "Node is known in monitoring: %s", nodeData.mac ); if (row.import_timestamp >= nodeData.importTimestamp) { Logger.tag("monitoring", "information-retrieval").debug( "No new data for node, skipping: %s", nodeData.mac ); return; } Logger.tag("monitoring", "information-retrieval").debug( "New data for node, updating: %s", nodeData.mac ); await db.run( "UPDATE node_state " + "SET " + "hostname = ?, " + "site = ?, " + "domain = ?, " + "monitoring_state = ?, " + "state = ?, " + "last_seen = ?, " + "import_timestamp = ?, " + "modified_at = ? " + "WHERE id = ? AND mac = ?", [ node.hostname, nodeData.site || row.site, nodeData.domain || row.domain, node.monitoringState, nodeData.state, nodeData.lastSeen, nodeData.importTimestamp, now(), row.id, node.mac, ] ); } async function storeNodeInformation( nodeData: ParsedNode, node: StoredNode ): Promise { Logger.tag("monitoring", "information-retrieval").debug( "Storing status for node: %s", nodeData.mac ); const row = await db.get("SELECT * FROM node_state WHERE mac = ?", [ node.mac, ]); if (isUndefined(row)) { return await insertNodeInformation(nodeData, node); } else { return await updateNodeInformation(nodeData, node, row); } } const isValidMac = forConstraint(CONSTRAINTS.node.mac, false); export function parseNode( importTimestamp: UnixTimestampSeconds, nodeData: any ): ParsedNode { if (!_.isPlainObject(nodeData)) { throw new Error("Unexpected node type: " + typeof nodeData); } if (!_.isPlainObject(nodeData.nodeinfo)) { throw new Error( "Unexpected nodeinfo type: " + typeof nodeData.nodeinfo ); } const nodeId = nodeData.nodeinfo.node_id; if (!nodeId || !isString(nodeId)) { throw new Error( `Invalid node id of type "${typeof nodeId}": ${nodeId}` ); } if (!_.isPlainObject(nodeData.nodeinfo.network)) { throw new Error( "Node " + nodeId + ": Unexpected nodeinfo.network type: " + typeof nodeData.nodeinfo.network ); } if (!isValidMac(nodeData.nodeinfo.network.mac)) { throw new Error( "Node " + nodeId + ": Invalid MAC: " + nodeData.nodeinfo.network.mac ); } const mac = normalizeMac(nodeData.nodeinfo.network.mac) as MAC; if (!_.isPlainObject(nodeData.flags)) { throw new Error( "Node " + nodeId + ": Unexpected flags type: " + typeof nodeData.flags ); } if (!isBoolean(nodeData.flags.online)) { throw new Error( "Node " + nodeId + ": Unexpected flags.online type: " + typeof nodeData.flags.online ); } const isOnline = nodeData.flags.online; const lastSeen = parseTimestamp(nodeData.lastseen); if (lastSeen === null) { throw new Error( "Node " + nodeId + ": Invalid lastseen timestamp: " + nodeData.lastseen ); } let site: Site | undefined; if ( _.isPlainObject(nodeData.nodeinfo.system) && isSite(nodeData.nodeinfo.system.site_code) ) { site = nodeData.nodeinfo.system.site_code; } let domain: Domain | undefined; if ( _.isPlainObject(nodeData.nodeinfo.system) && isDomain(nodeData.nodeinfo.system.domain_code) ) { domain = nodeData.nodeinfo.system.domain_code; } return { mac, importTimestamp: importTimestamp, state: isOnline ? OnlineState.ONLINE : OnlineState.OFFLINE, lastSeen, site, domain, }; } export function parseNodesJson(body: string): NodesParsingResult { Logger.tag("monitoring", "information-retrieval").debug( "Parsing nodes.json..." ); const json = JSON.parse(body); if (!_.isPlainObject(json)) { throw new Error( `Expecting a JSON object as the nodes.json root, but got: ${typeof json}` ); } const expectedVersion = 2; if (json.version !== expectedVersion) { throw new Error( `Unexpected nodes.json version "${json.version}". Expected: "${expectedVersion}"` ); } const importTimestamp = parseTimestamp(json.timestamp); if (importTimestamp === null) { throw new Error("Invalid timestamp: " + json.timestamp); } const result: NodesParsingResult = { importTimestamp, nodes: [], failedNodesCount: 0, totalNodesCount: 0, }; if (!_.isArray(json.nodes)) { throw new Error("Invalid nodes array type: " + typeof json.nodes); } for (const nodeData of json.nodes) { result.totalNodesCount += 1; try { const parsedNode = parseNode(result.importTimestamp, nodeData); Logger.tag("monitoring", "parsing-nodes-json").debug( `Parsing node successful: ${parsedNode.mac}` ); result.nodes.push(parsedNode); } catch (error) { result.failedNodesCount += 1; Logger.tag("monitoring", "parsing-nodes-json").error( "Could not parse node.", error, nodeData ); } } return result; } async function updateSkippedNode( id: NodeId, node?: StoredNode ): Promise { return await db.run( "UPDATE node_state " + "SET hostname = ?, monitoring_state = ?, modified_at = ?" + "WHERE id = ?", [node ? node.hostname : "", node ? node.monitoringState : "", now(), id] ); } async function sendMonitoringMailsBatched( name: string, mailType: MailType, findBatchFun: () => Promise ): Promise { Logger.tag("monitoring", "mail-sending").debug( 'Sending "%s" mails...', name ); while (true) { Logger.tag("monitoring", "mail-sending").debug("Sending next batch..."); const nodeStates = await findBatchFun(); if (_.isEmpty(nodeStates)) { Logger.tag("monitoring", "mail-sending").debug( 'Done sending "%s" mails.', name ); return; } for (const nodeState of nodeStates) { const mac = nodeState.mac; Logger.tag("monitoring", "mail-sending").debug( "Loading node data for: %s", mac ); const result = await NodeService.findNodeDataWithSecretsByMac(mac); if (!result) { Logger.tag("monitoring", "mail-sending").debug( 'Node not found. Skipping sending of "' + name + '" mail: ' + mac ); await updateSkippedNode(nodeState.id); continue; } const { node, nodeSecrets } = result; if (node.monitoringState !== MonitoringState.ACTIVE) { Logger.tag("monitoring", "mail-sending").debug( 'Monitoring disabled, skipping "%s" mail for: %s', name, mac ); await updateSkippedNode(nodeState.id, node); continue; } const monitoringToken = nodeSecrets.monitoringToken; if (!monitoringToken) { Logger.tag("monitoring", "mail-sending").error( 'Node has no monitoring token. Cannot send mail "%s" for: %s', name, mac ); await updateSkippedNode(nodeState.id, node); continue; } Logger.tag("monitoring", "mail-sending").info( 'Sending "%s" mail for: %s', name, mac ); await MailService.enqueue( config.server.email.from, node.nickname + " <" + node.email + ">", mailType, { node: node, lastSeen: nodeState.last_seen, disableUrl: monitoringDisableUrl(monitoringToken), } ); Logger.tag("monitoring", "mail-sending").debug( "Updating node state: ", mac ); const timestamp = now(); await db.run( "UPDATE node_state " + "SET hostname = ?, monitoring_state = ?, modified_at = ?, last_status_mail_sent = ?, last_status_mail_type = ?" + "WHERE id = ?", [ node.hostname, node.monitoringState, timestamp, timestamp, mailType, nodeState.id, ] ); } } } async function sendOnlineAgainMails( startTime: UnixTimestampSeconds ): Promise { await sendMonitoringMailsBatched( "online again", MailType.MONITORING_ONLINE_AGAIN, async (): Promise => await db.all( "SELECT * FROM node_state " + "WHERE modified_at < ? AND state = ? AND last_status_mail_type IN (" + "'monitoring-offline-1', 'monitoring-offline-2', 'monitoring-offline-3'" + ")" + "ORDER BY id ASC LIMIT ?", [startTime, "ONLINE", MONITORING_MAILS_DB_BATCH_SIZE] ) ); } async function sendOfflineMails( startTime: UnixTimestampSeconds, mailType: MailType ): Promise { const mailNumber = parseInteger(mailType.split("-")[2]); await sendMonitoringMailsBatched( "offline " + mailNumber, mailType, async (): Promise => { const previousType = mailNumber === 1 ? "monitoring-online-again" : "monitoring-offline-" + (mailNumber - 1); // the first time the first offline mail is send, there was no mail before const allowNull = mailNumber === 1 ? " OR last_status_mail_type IS NULL" : ""; const schedule = MONITORING_OFFLINE_MAILS_SCHEDULE[mailNumber]; const scheduledTimeBefore = subtract(now(), schedule); return await db.all( "SELECT * FROM node_state " + "WHERE modified_at < ? AND state = ? AND (last_status_mail_type = ?" + allowNull + ") AND " + "last_seen <= ? AND (last_status_mail_sent <= ? OR last_status_mail_sent IS NULL) " + "ORDER BY id ASC LIMIT ?", [ startTime, "OFFLINE", previousType, scheduledTimeBefore, scheduledTimeBefore, MONITORING_MAILS_DB_BATCH_SIZE, ] ); } ); } function doRequest( url: string ): Promise<{ response: request.Response; body: string }> { return new Promise<{ response: request.Response; body: string }>( (resolve, reject) => { request(url, function (err, response, body) { if (err) { return reject(err); } resolve({ response, body }); }); } ); } async function withUrlsData(urls: string[]): Promise { const results: NodesParsingResult[] = []; for (const url of urls) { Logger.tag("monitoring", "information-retrieval").debug( "Retrieving nodes.json: %s", url ); const { response, body } = await doRequest(url); if (response.statusCode !== 200) { throw new Error( "Could not download nodes.json from " + url + ": " + response.statusCode + " - " + response.statusMessage ); } results.push(await parseNodesJson(body)); } return results; } async function retrieveNodeInformationForUrls( urls: string[] ): Promise { const datas = await withUrlsData(urls); let maxTimestamp = datas[0].importTimestamp; let minTimestamp = maxTimestamp; let failedParsingNodesCount = 0; let totalNodesCount = 0; for (const data of datas) { if (data.importTimestamp >= maxTimestamp) { maxTimestamp = data.importTimestamp; } if (data.importTimestamp <= minTimestamp) { minTimestamp = data.importTimestamp; } failedParsingNodesCount += data.failedNodesCount; totalNodesCount += data.totalNodesCount; } if ( previousImportTimestamp !== null && maxTimestamp >= previousImportTimestamp ) { Logger.tag("monitoring", "information-retrieval").debug( "No new data, skipping. Current timestamp: %s, previous timestamp: %s", formatTimestamp(maxTimestamp), formatTimestamp(previousImportTimestamp) ); return { failedParsingNodesCount, totalNodesCount, }; } previousImportTimestamp = maxTimestamp; // We do not parallelize here as the sqlite will start slowing down and blocking with too many // parallel queries. This has resulted in blocking other requests too and thus in a major slowdown. const allNodes = _.flatMap(datas, (data) => data.nodes); // Get rid of duplicates from different nodes.json files. Always use the one with the newest const sortedNodes = _.orderBy( allNodes, [(node) => node.lastSeen], ["desc"] ); const uniqueNodes = _.uniqBy(sortedNodes, function (node) { return node.mac; }); for (const nodeData of uniqueNodes) { Logger.tag("monitoring", "information-retrieval").debug( "Importing: %s", nodeData.mac ); const result = await NodeService.findNodeDataByMac(nodeData.mac); if (!result) { Logger.tag("monitoring", "information-retrieval").debug( "Unknown node, skipping: %s", nodeData.mac ); continue; } await storeNodeInformation(nodeData, result); Logger.tag("monitoring", "information-retrieval").debug( "Updating / deleting node data done: %s", nodeData.mac ); } Logger.tag("monitoring", "information-retrieval").debug( "Marking missing nodes as offline." ); // Mark nodes as offline that haven't been imported in this run. await db.run( "UPDATE node_state " + "SET state = ?, modified_at = ?" + "WHERE import_timestamp < ?", [OnlineState.OFFLINE, now(), minTimestamp] ); return { failedParsingNodesCount, totalNodesCount, }; } // FIXME: Replace any[] by type. export async function getAll( restParams: RestParams ): Promise<{ total: number; monitoringStates: any[] }> { const filterFields = [ "hostname", "mac", "monitoring_state", "state", "last_status_mail_type", ]; const where = Resources.whereCondition(restParams, filterFields); const row = await db.get<{ total: number }>( "SELECT count(*) AS total FROM node_state WHERE " + where.query, where.params ); const total = row?.total || 0; const filter = Resources.filterClause( restParams, MonitoringSortField.ID, isMonitoringSortField, filterFields ); const monitoringStates = await db.all( "SELECT * FROM node_state WHERE " + filter.query, filter.params ); return { monitoringStates, total }; } export async function getByMacs( macs: MAC[] ): Promise> { if (_.isEmpty(macs)) { return {}; } const nodeStateByMac: { [key: string]: NodeStateData } = {}; for (const subMacs of _.chunk(macs, MONITORING_STATE_MACS_CHUNK_SIZE)) { const inCondition = DatabaseUtil.inCondition("mac", subMacs); const rows = await db.all( "SELECT * FROM node_state WHERE " + inCondition.query, inCondition.params ); for (const row of rows) { const onlineState = row.state; if (!isOnlineState(onlineState)) { throw new Error( `Invalid online state in database: "${onlineState}"` ); } nodeStateByMac[row.mac] = { site: row.site || undefined, domain: row.domain || undefined, state: onlineState, }; } } return nodeStateByMac; } export async function confirm(token: MonitoringToken): Promise { const { node, nodeSecrets } = await NodeService.getNodeDataWithSecretsByMonitoringToken(token); if ( node.monitoringState === MonitoringState.DISABLED || !nodeSecrets.monitoringToken || nodeSecrets.monitoringToken !== token ) { throw { data: "Invalid token.", type: ErrorTypes.badRequest }; } if (node.monitoringState === MonitoringState.ACTIVE) { return node; } node.monitoringState = MonitoringState.ACTIVE; return await NodeService.internalUpdateNode( node.token, toCreateOrUpdateNode(node), node.monitoringState, nodeSecrets ); } export async function disable(token: MonitoringToken): Promise { const { node, nodeSecrets } = await NodeService.getNodeDataWithSecretsByMonitoringToken(token); if ( node.monitoringState === MonitoringState.DISABLED || !nodeSecrets.monitoringToken || nodeSecrets.monitoringToken !== token ) { throw { data: "Invalid token.", type: ErrorTypes.badRequest }; } node.monitoringState = MonitoringState.DISABLED; nodeSecrets.monitoringToken = undefined; return await NodeService.internalUpdateNode( node.token, toCreateOrUpdateNode(node), node.monitoringState, nodeSecrets ); } export async function retrieveNodeInformation(): Promise { const urls = config.server.map.nodesJsonUrl; if (_.isEmpty(urls)) { throw new Error( "No nodes.json-URLs set. Please adjust config.json: server.map.nodesJsonUrl" ); } return await retrieveNodeInformationForUrls(urls); } export async function sendMonitoringMails(): Promise { Logger.tag("monitoring", "mail-sending").debug( "Sending monitoring mails..." ); const startTime = now(); try { await sendOnlineAgainMails(startTime); } catch (error) { // only logging an continuing with next type Logger.tag("monitoring", "mail-sending").error( 'Error sending "online again" mails.', error ); } for (const mailType of [ MailType.MONITORING_OFFLINE_1, MailType.MONITORING_OFFLINE_2, MailType.MONITORING_OFFLINE_3, ]) { try { await sendOfflineMails(startTime, mailType); } catch (error) { // only logging an continuing with next type Logger.tag("monitoring", "mail-sending").error( 'Error sending "' + mailType + '" mails.', error ); } } } export async function deleteOfflineNodes(): Promise { Logger.tag("nodes", "delete-offline").info( `Deleting offline nodes older than ${DELETE_OFFLINE_NODES_AFTER_DURATION} seconds.` ); const deleteBefore = subtract(now(), DELETE_OFFLINE_NODES_AFTER_DURATION); await deleteNeverOnlineNodesBefore(deleteBefore); await deleteNodesOfflineSinceBefore(deleteBefore); } async function deleteNeverOnlineNodesBefore( deleteBefore: UnixTimestampSeconds ): Promise { Logger.tag("nodes", "delete-never-online").info( "Deleting nodes that were never online created before " + deleteBefore ); const deletionCandidates: StoredNode[] = await NodeService.findNodesModifiedBefore(deleteBefore); Logger.tag("nodes", "delete-never-online").info( "Number of nodes created before " + deleteBefore + ": " + deletionCandidates.length ); const deletionCandidateMacs: MAC[] = deletionCandidates.map( (node) => node.mac ); const chunks: MAC[][] = _.chunk( deletionCandidateMacs, NEVER_ONLINE_NODES_DELETION_CHUNK_SIZE ); Logger.tag("nodes", "delete-never-online").info( "Number of chunks to check for deletion: " + chunks.length ); for (const macs of chunks) { Logger.tag("nodes", "delete-never-online").info( "Checking chunk of " + macs.length + " MACs for deletion." ); const placeholders = macs.map(() => "?").join(","); const rows: { mac: MAC }[] = await db.all( `SELECT * FROM node_state WHERE mac IN (${placeholders})`, macs ); Logger.tag("nodes", "delete-never-online").info( "Of the chunk of " + macs.length + " MACs there were " + rows.length + " nodes found in monitoring database. Those should be skipped." ); const seenMacs: MAC[] = rows.map((row) => row.mac); const neverSeenMacs = _.difference(macs, seenMacs); Logger.tag("nodes", "delete-never-online").info( "Of the chunk of " + macs.length + " MACs there are " + neverSeenMacs.length + " nodes that were never online. Those will be deleted." ); for (const neverSeenMac of neverSeenMacs) { await deleteNodeByMac(neverSeenMac); } } } async function deleteNodesOfflineSinceBefore( deleteBefore: UnixTimestampSeconds ): Promise { const rows = await db.all( "SELECT * FROM node_state WHERE state = ? AND last_seen < ?", ["OFFLINE", deleteBefore] ); for (const row of rows) { await deleteNodeByMac(row.mac); } } async function deleteNodeByMac(mac: MAC): Promise { Logger.tag("nodes", "delete-offline").debug("Deleting node " + mac); let node; try { node = await NodeService.findNodeDataByMac(mac); } catch (error) { // Only log error. We try to delete the nodes state anyways. Logger.tag("nodes", "delete-offline").error( "Could not find node to delete: " + mac, error ); } if (node && node.token) { await NodeService.deleteNode(node.token); } try { await db.run("DELETE FROM node_state WHERE mac = ? AND state = ?", [ mac, "OFFLINE", ]); } catch (error) { // Only log error and continue with next node. Logger.tag("nodes", "delete-offline").error( "Could not delete node state: " + mac, error ); } }