hedgedoc-expire/hedgedoc-expire.py

225 lines
9.2 KiB
Python

#!/bin/env python
import argparse
import email
import json
import smtplib
import ssl
import sys
from datetime import datetime, timezone, timedelta
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from os import getenv
from textwrap import dedent
import humanize
import pgsql
class Config:
def __init__(self):
"""
Get config from environment variables
"""
self.postgres_hostname = getenv('POSTGRES_HOSTNAME', 'localhost')
self.postgres_username = getenv('POSTGRES_USERNAME', 'hedgedoc')
self.postgres_password = getenv('POSTGRES_PASSWORD', 'geheim')
self.postgres_database = getenv('POSTGRES_DATABASE', 'hedgedoc')
self.postgres_port = int(getenv('POSTGRES_PORT', '5432'))
self.smtp_hostname = getenv('SMTP_HOSTNAME', 'localhost')
self.smtp_port = int(getenv('SMTP_PORT', '587'))
self.smtp_username = getenv('SMTP_USERNAME', '')
self.smtp_password = getenv('SMTP_PASSWORD', '')
self.smtp_from = getenv('SMTP_FROM', '')
self.url = getenv('URL', 'http://localhost:3000')
class EmailSender:
def __init__(self, hostname: str, port: int, username: str, password: str, mail_from: str):
self.hostname = hostname
self.port = port
self.username = username
self.password = password
self.mail_from = mail_from
def send(self, message: email.message.Message) -> None:
smtp_server = smtplib.SMTP(self.hostname, port=self.port)
context = ssl.create_default_context()
smtp_server.starttls(context=context)
smtp_server.login(self.username, self.password)
smtp_server.send_message(message)
def email_from_email_or_profile(row):
if row['email'] is not None:
return row['email']
profile = json.loads(row['profile'])
return profile['emails'][0]
def notes_to_be_expired(cutoff: datetime) -> list[any]:
notes = []
with db.prepare('''SELECT
"Notes"."alias",
"Notes"."content",
"Notes"."createdAt",
"Notes"."ownerId",
"Notes"."shortid",
"Notes"."id",
"Notes"."title",
"Notes"."updatedAt",
"Users"."email",
"Users"."profile"
FROM "Notes", "Users"
WHERE "Notes"."updatedAt" < $1
AND "Notes"."ownerId" = "Users"."id"
ORDER BY "Notes"."updatedAt"
''') as notes_older_than:
for row in notes_older_than(cutoff):
notes.append({
'alias': row.alias if row.alias is not None else row.shortid,
'content': row.content,
'createdAt': row.createdAt,
'email': row.email,
"id": row.id,
'ownerId': row.ownerId,
'profile': row.profile,
'shortid': row.shortid,
'title': row.title,
'updatedAt': row.updatedAt
})
return notes
def revisions_to_be_expired(cutoff: datetime) -> list[any]:
"""
Obtain a list of revisions to be expired.
:param cutoff:
:return:
"""
revisions = []
with db.prepare('''SELECT
"Notes"."alias",
"Revisions"."createdAt",
"Users"."email",
"Users"."profile",
"Revisions"."id" as "revisionId",
"Notes"."id" as "noteId",
"Notes"."shortid" as "shortid",
"Notes"."title"
FROM "Revisions", "Notes", "Users"
WHERE "Revisions"."createdAt" < $1
AND "Revisions"."noteId" = "Notes"."id"
AND "Notes"."ownerId" = "Users"."id"
ORDER BY "Notes"."createdAt", "Revisions"."createdAt"
''') as revs_older_than:
for row in revs_older_than(cutoff):
revisions.append({
'alias': row.alias,
'createdAt': row.createdAt,
'email': row.email,
'noteId': row.noteId,
'profile': row.profile,
'revisionId': row.revisionId,
'shortid': row.shortid,
'title': row.title
})
return revisions
def check_notes_to_be_expired(age: timedelta, config: Config) -> None:
cutoff = datetime.now(timezone.utc) - age
print(f'Notes to be deleted older than {cutoff} ({humanize.naturaldelta(age)}):')
for note in notes_to_be_expired(cutoff):
age = datetime.now(timezone.utc) - datetime.fromisoformat(note['updatedAt'])
url = config.url + '/' + (note["alias"] if note["alias"] is not None else note["shortid"])
print(f' {email_from_email_or_profile(note)} ({humanize.naturaldelta(age)}) {url}: {note["title"]}')
def check_revisions_to_be_expired(age: timedelta, config: Config) -> None:
cutoff = datetime.now(timezone.utc) - age
print(f'Revisions to be deleted older than {cutoff} ({humanize.naturaldelta(age)}):')
notes = {}
for row in revisions_to_be_expired(cutoff):
row['age'] = datetime.now(timezone.utc) - datetime.fromisoformat(row['createdAt'])
if row['noteId'] not in notes:
notes[row['noteId']] = []
notes[row['noteId']].append(row)
for id, revisions in notes.items():
email = email_from_email_or_profile(revisions[0])
url = config.url + '/' + (revisions[0]["alias"] if revisions[0]["alias"] is not None else revisions[0]["shortid"])
print(f' {email} {url}: {revisions[0]["title"]}')
for rev in revisions:
print(f' {humanize.naturaldelta(rev["age"])}: {rev["revisionId"]}')
def expire_old_notes(age: timedelta, config: Config, mail: EmailSender) -> None:
cutoff = datetime.now(timezone.utc) - age
with db.prepare('DELETE FROM "Notes" WHERE "id" = $1') as delete_statement:
for note in notes_to_be_expired(cutoff):
try:
note_age = datetime.now(timezone.utc) - datetime.fromisoformat(note['updatedAt'])
msg = MIMEMultipart()
msg['From'] = mail.mail_from
msg['To'] = email_from_email_or_profile(note)
msg['Subject'] = f'Your HedgeDoc Note "{note["title"]}" has been expired'
msg.attach(MIMEText(dedent(f'''\
You created the note titled "{note["title"]}" on {note["createdAt"]}.
It was lasted updated {note['updatedAt']}, {humanize.naturaldelta(note_age)} ago. We expire all notes
that have not been updated within {humanize.naturaldelta(age)}.
Please find attached the contents of the latest revision of your note.
The admin team for {config.url}
'''
)))
md = MIMEBase('text', "markdown")
md.add_header('Content-Disposition', f'attachment; filename={note["title"]}')
md.set_payload(note["content"])
msg.attach(md)
mail.send(msg)
# email backup of the note sent, now we can delete it
delete_statement(note["id"])
except Exception as e:
print(f'Unable to send email to {note["email"]}: {e}', file=sys.stderr)
def expire_old_revisions(age: timedelta) -> None:
"""
Removes all revision on all notes that have been modified earlier than age.
:param age:
:return:
"""
cutoff = datetime.now(timezone.utc) - age
with db.prepare('DELETE FROM "Revisions" WHERE "createdAt" < $1') as delete:
delete(cutoff)
if __name__ == '__main__':
parser = argparse.ArgumentParser(
prog='hedgedoc-expire',
description='Remove old notes and revisions from Hedgedoc',
epilog='See https://git.hamburg.ccc.de/CCCHH/hedgedoc-expire')
parser.add_argument('-c', '--check', action='store_true',
help='print what would be done, then exit')
parser.add_argument('-n', '--notes', metavar='DAYS', type=float, default=95,
help='remove all notes not changed in these many days')
parser.add_argument('-r', '--revisions', metavar='DAYS', type=float, default=14,
help='remove all revisions created more than these many days ago')
args = parser.parse_args()
revisions_delta = timedelta(days=args.revisions)
notes_delta = timedelta(days=args.notes)
config = Config()
mail = EmailSender(config.smtp_hostname, config.smtp_port, config.smtp_username, config.smtp_password, config.smtp_from)
with pgsql.Connection((config.postgres_hostname, config.postgres_port), config.postgres_username, config.postgres_password) as db:
if args.check:
check_revisions_to_be_expired(revisions_delta, config)
check_notes_to_be_expired(notes_delta, config)
sys.exit(0)
expire_old_revisions(revisions_delta)
expire_old_notes(notes_delta, config, mail)