forked from CCCHH/hedgedoc-expire
5569119fcc
For the README: - Improve the format, getting rid of the textwidth-based linebreaks and instead putting each sentence on its own line. - Have an example for a local check execution to have the examples be more diverse. - Describe the docker compose example and have it be up-to-date. - Better describe the local setup example. - Reformat the section on commands and arguments, first describing the commands and then the arguments, as this makes more sense structurally. - Also change the title of the arguments and environment variables section to reflect its content on commands. For the help output mirror the command descriptions of the README and improve the format.
386 lines
18 KiB
Python
386 lines
18 KiB
Python
#!/bin/env python
|
|
import argparse
|
|
import base64
|
|
import binascii
|
|
import email
|
|
import json
|
|
import smtplib
|
|
import ssl
|
|
import sys
|
|
from datetime import datetime, timezone, timedelta
|
|
from email.mime.base import MIMEBase
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from os import getenv
|
|
from textwrap import dedent
|
|
from time import sleep
|
|
|
|
import humanize
|
|
import psycopg
|
|
from psycopg.rows import dict_row
|
|
|
|
|
|
class Config:
|
|
"""
|
|
Get config from environment variables
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.verbose = False
|
|
self.revision_age = timedelta(days=14)
|
|
self.note_age = timedelta(days=95)
|
|
|
|
self.postgres_connection_string = getenv('POSTGRES_CONNSTRING', 'postgresql://hedgedoc:geheim@localhost:5432/hedgedoc')
|
|
|
|
self.smtp_hostname = getenv('SMTP_HOSTNAME', 'localhost')
|
|
self.smtp_port = int(getenv('SMTP_PORT', '587'))
|
|
self.smtp_username = getenv('SMTP_USERNAME', '')
|
|
self.smtp_password = getenv('SMTP_PASSWORD', '')
|
|
self.smtp_from = getenv('SMTP_FROM', '')
|
|
self.url = getenv('URL', 'http://localhost:3000')
|
|
|
|
|
|
class EmailSender:
|
|
"""
|
|
Send email message through SMTP
|
|
"""
|
|
|
|
def __init__(self, hostname: str, port: int, username: str, password: str, mail_from: str):
|
|
self.hostname = hostname
|
|
self.port = port
|
|
self.username = username
|
|
self.password = password
|
|
self.mail_from = mail_from
|
|
|
|
def send(self, message: email.message.Message) -> None:
|
|
"""
|
|
Using the configured SMTP coordinates, send the message out. The code assumes the submission protocol with
|
|
StartTLS enabled, and authentication required.
|
|
:param message: to be sent
|
|
:return:
|
|
"""
|
|
try:
|
|
smtp_server = smtplib.SMTP(self.hostname, port=self.port)
|
|
context = ssl.create_default_context()
|
|
smtp_server.starttls(context=context)
|
|
if self.username != "" and self.password != "":
|
|
smtp_server.login(self.username, self.password)
|
|
smtp_server.send_message(message)
|
|
except Exception as e:
|
|
print(f'Unable to send mail through {self}: {e}')
|
|
raise e
|
|
print(f'Report email to {message["To"]} sent successfully.')
|
|
|
|
def __str__(self):
|
|
return f'EmailSender<{self.hostname},{self.port},{self.username},{self.mail_from}>'
|
|
|
|
|
|
class HedgedocExpire:
|
|
def __init__(self, config: Config, email_sender: EmailSender):
|
|
self.config = config
|
|
self.email_sender = email_sender
|
|
|
|
@staticmethod
|
|
def email_from_email_or_profile(row) -> str:
|
|
"""
|
|
Get the email address of the creator from a database row. If the email column is populated, use that, otherwise
|
|
try to extract it from the login profile. The profile is a JSON object that has an emails array. We're using the
|
|
first address from there.
|
|
:param row: database row as a dict with email and profile columns
|
|
:return: email address
|
|
"""
|
|
if row['email'] is not None:
|
|
return row['email']
|
|
profile = json.loads(row['profile'])
|
|
return profile['emails'][0]
|
|
|
|
def notes_to_be_expired(self, conn) -> list[any]:
|
|
"""
|
|
Get a list of all notes to be expired.
|
|
:return:
|
|
"""
|
|
cutoff = datetime.now(timezone.utc) - self.config.note_age
|
|
with conn.cursor(row_factory=dict_row) as cur:
|
|
cur.execute('''SELECT
|
|
"Notes"."alias",
|
|
"Notes"."content",
|
|
"Notes"."createdAt",
|
|
"Notes"."ownerId",
|
|
"Notes"."shortid",
|
|
"Notes"."id",
|
|
"Notes"."title",
|
|
"Notes"."updatedAt",
|
|
"Users"."email",
|
|
"Users"."profile"
|
|
FROM "Notes", "Users"
|
|
WHERE "Notes"."updatedAt" < %s
|
|
AND "Notes"."ownerId" = "Users"."id"
|
|
ORDER BY "Notes"."updatedAt"
|
|
''', [cutoff])
|
|
return cur.fetchall()
|
|
|
|
def revisions_to_be_expired(self, conn) -> list[any]:
|
|
"""
|
|
Obtain a list of revisions to be expired.
|
|
:param conn: the database connection
|
|
:return:
|
|
"""
|
|
cutoff = datetime.now(timezone.utc) - self.config.revision_age
|
|
with conn.cursor(row_factory=dict_row) as cur:
|
|
cur.execute('''SELECT
|
|
"Notes"."alias",
|
|
"Revisions"."createdAt",
|
|
"Users"."email",
|
|
"Users"."profile",
|
|
"Revisions"."id" as "revisionId",
|
|
"Notes"."id" as "noteId",
|
|
"Notes"."shortid" as "shortid",
|
|
"Notes"."title"
|
|
FROM "Revisions", "Notes", "Users"
|
|
WHERE "Revisions"."createdAt" < %s
|
|
AND "Revisions"."noteId" = "Notes"."id"
|
|
AND "Notes"."ownerId" = "Users"."id"
|
|
ORDER BY "Notes"."createdAt", "Revisions"."createdAt"
|
|
''', [cutoff])
|
|
return cur.fetchall()
|
|
|
|
def check_notes_to_be_expired(self, conn) -> str:
|
|
"""
|
|
Return a list of notes that will be expired.
|
|
:param conn: the database connection
|
|
:return: a multi-line text suitable for humans to read
|
|
"""
|
|
r = ''
|
|
cutoff = datetime.now(timezone.utc) - self.config.note_age
|
|
r += f'Notes to be deleted not changed since {cutoff} ({humanize.naturaldelta(self.config.note_age)}):\n'
|
|
for note in self.notes_to_be_expired(conn):
|
|
age = datetime.now(timezone.utc) - note['updatedAt']
|
|
url = self.config.url + '/' + (note["alias"] if note["alias"] is not None else note["shortid"])
|
|
r += f' {self.email_from_email_or_profile(note)} ({humanize.naturaldelta(age)}) {url}: {note["title"]}\n'
|
|
return r
|
|
|
|
def check_revisions_to_be_expired(self, conn) -> str:
|
|
"""
|
|
Return a list of revisions that will be expired.
|
|
:return: a multi-line text suitable for humans to read
|
|
"""
|
|
r = ''
|
|
cutoff = datetime.now(timezone.utc) - self.config.revision_age
|
|
r += f'Revisions to be deleted created before {cutoff} ({humanize.naturaldelta(self.config.revision_age)}):\n'
|
|
notes = {}
|
|
for row in self.revisions_to_be_expired(conn):
|
|
row['age'] = datetime.now(timezone.utc) - row['createdAt']
|
|
if row['noteId'] not in notes:
|
|
notes[row['noteId']] = []
|
|
notes[row['noteId']].append(row)
|
|
for revisionId, revisions in notes.items():
|
|
addr = self.email_from_email_or_profile(revisions[0])
|
|
url = self.config.url + '/' + (
|
|
revisions[0]["alias"] if revisions[0]["alias"] is not None else revisions[0]["shortid"])
|
|
r += f' {addr} {url}: {revisions[0]["title"]}\n'
|
|
for rev in revisions:
|
|
r += f' {humanize.naturaldelta(rev["age"])}: {rev["revisionId"]}\n'
|
|
return r
|
|
|
|
def expire_old_notes(self, conn) -> None:
|
|
"""
|
|
Email old notes to their owners, then delete them.
|
|
:param conn: the database connection
|
|
:return:
|
|
"""
|
|
with conn.cursor() as cur:
|
|
for note in self.notes_to_be_expired(conn):
|
|
try:
|
|
note_age = datetime.now(timezone.utc) - note['updatedAt']
|
|
msg = MIMEMultipart()
|
|
msg['From'] = self.email_sender.mail_from
|
|
msg['To'] = self.email_from_email_or_profile(note)
|
|
msg['Date'] = email.utils.formatdate()
|
|
msg['Subject'] = f'Your HedgeDoc Note "{note["title"]}" has expired'
|
|
msg.attach(MIMEText(dedent(f'''\
|
|
You created the note titled "{note["title"]}" on {note["createdAt"]}.
|
|
It was lasted updated {note['updatedAt']}, {humanize.naturaldelta(note_age)} ago. We expire all notes
|
|
that have not been updated within {humanize.naturaldelta(self.config.note_age)}.
|
|
|
|
Please find attached the contents of the latest revision of your note.
|
|
|
|
The admin team for {self.config.url}
|
|
|
|
'''), 'plain', 'utf-8'))
|
|
md = MIMEText(note["content"], 'markdown', 'utf-8')
|
|
filename = note['title'].encode('ascii', 'ignore').decode('utf-8')
|
|
if len(filename) == 0:
|
|
filename = 'note'
|
|
md.add_header('Content-Disposition', f'attachment; filename={filename}.md')
|
|
msg.attach(md)
|
|
self.email_sender.send(msg)
|
|
|
|
# email backup of the note sent, now we can delete it
|
|
cur.execute('DELETE FROM "Notes" WHERE "id" = %s', [note["id"]])
|
|
conn.commit()
|
|
|
|
if self.config.verbose:
|
|
url = self.config.url + '/' + (note["alias"] if note["alias"] is not None else note["shortid"])
|
|
print(f'Note "{note["title"]}" ({url}) emailed to {msg["To"]}')
|
|
|
|
try:
|
|
with conn.cursor(row_factory=dict_row) as user_history_cur:
|
|
# Calculate the urlid of the note from its id.
|
|
# See:
|
|
# - https://github.com/hedgedoc/hedgedoc/blob/380587b7fd65bc1eb71eef51a3aab324f9877650/lib/models/note.js#L167-L172
|
|
# - https://git.cccv.de/infra/ansible/roles/hedgedoc/-/blob/d69cef4bf6c7fe4e67570363659e4c20b0e102af/files/hedgedoc-util.py#L84
|
|
urlid = base64.urlsafe_b64encode(binascii.unhexlify(f"{note['id']}".replace('-', ''))).decode().replace('=', '')
|
|
|
|
# Get all users with note in history.
|
|
user_history_cur.execute('''SELECT
|
|
"Users"."id",
|
|
"Users"."history",
|
|
"Users"."email",
|
|
"Users"."profile"
|
|
FROM "Users"
|
|
WHERE jsonb_path_exists("Users"."history"::jsonb, '$[*] ? (@.id == $urlid)', jsonb_build_object('urlid', %s::text))
|
|
''', [urlid])
|
|
users_with_note = user_history_cur.fetchall()
|
|
|
|
for user in users_with_note:
|
|
history = json.loads(user["history"])
|
|
history_without_note = json.dumps([ entry for entry in history if entry["id"] != urlid ])
|
|
user_history_cur.execute('''UPDATE "Users"
|
|
SET "history" = %s
|
|
WHERE "id" = %s
|
|
''', [history_without_note, user["id"]])
|
|
|
|
conn.commit()
|
|
if self.config.verbose:
|
|
for user in users_with_note:
|
|
print(f' deleted history entry for {self.email_from_email_or_profile(user)}')
|
|
except Exception as e:
|
|
conn.rollback()
|
|
print(f'An error occured while trying to delete {note["id"]} from the users history: {e}', file=sys.stderr)
|
|
except Exception as e:
|
|
print(f'Unable to send email to {self.email_from_email_or_profile(note)}: {e}', file=sys.stderr)
|
|
|
|
def expire_old_revisions(self, conn) -> None:
|
|
"""
|
|
Removes all revision on all notes that have been modified earlier than age.
|
|
:param conn: the database connection
|
|
:return:
|
|
"""
|
|
cutoff = datetime.now(timezone.utc) - self.config.revision_age
|
|
with conn.cursor() as cur:
|
|
rows = list(cur.execute('DELETE FROM "Revisions" WHERE "createdAt" < %s RETURNING id', [cutoff]))
|
|
if self.config.verbose:
|
|
print(f'Deleted {len(rows)} old revisions')
|
|
conn.commit()
|
|
|
|
def cmd_check(self) -> None:
|
|
with psycopg.connect(self.config.postgres_connection_string) as conn:
|
|
if self.config.revision_age is not None:
|
|
print(self.check_revisions_to_be_expired(conn))
|
|
elif self.config.verbose:
|
|
print("Revisions weren't included in the check, not checking.\n")
|
|
|
|
if self.config.note_age is not None:
|
|
print(self.check_notes_to_be_expired(conn))
|
|
elif self.config.verbose:
|
|
print("Notes weren't included in the check, not checking.\n")
|
|
|
|
def cmd_emailcheck(self) -> None:
|
|
with psycopg.connect(self.config.postgres_connection_string) as conn:
|
|
report = ''
|
|
|
|
if self.config.revision_age is not None:
|
|
report += self.check_revisions_to_be_expired(conn)
|
|
else:
|
|
report += "Revisions weren't included in the check.\n"
|
|
|
|
if self.config.note_age is not None:
|
|
report += self.check_notes_to_be_expired(conn)
|
|
else:
|
|
report += "Notes weren't included in the check.\n"
|
|
msg = MIMEMultipart()
|
|
msg['From'] = self.email_sender.mail_from
|
|
msg['To'] = self.email_sender.mail_from
|
|
msg['Date'] = email.utils.formatdate()
|
|
msg['Subject'] = f'Hedgedoc Expire: Report'
|
|
msg.attach(MIMEText(dedent(f'''\
|
|
This report shows which notes and revisions would be deleted if expire would be run now.
|
|
|
|
''') + report + dedent(f'''\
|
|
|
|
The admin team for {self.config.url}
|
|
''')))
|
|
self.email_sender.send(msg)
|
|
|
|
def cmd_expire(self) -> None:
|
|
with psycopg.connect(self.config.postgres_connection_string) as conn:
|
|
if self.config.revision_age is not None:
|
|
self.expire_old_revisions(conn)
|
|
elif self.config.verbose:
|
|
print("Revisions weren't included in the expire action, not expiring.\n")
|
|
|
|
if self.config.note_age is not None:
|
|
self.expire_old_notes(conn)
|
|
elif self.config.verbose:
|
|
print("Notes weren't included in the expire action, not expiring.\n")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
prog='hedgedoc-expire',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
description=dedent('''\
|
|
Remove old notes and revisions from Hedgedoc
|
|
|
|
Notes that have not been updated in the specified time will be emailed to the creator and then deleted.
|
|
Revisions of notes that have been created before the specified time will be deleted.
|
|
'''),
|
|
epilog=dedent('''\
|
|
command is one of, which check being the default:
|
|
- check: Print a list of revisions and notes that would be expired, based on the given arguments -n and -r.
|
|
- cron: Run `expire` at 2 am local time each day. Will run until killed.
|
|
- emailcheck: Send an email from the configured sender to themselves with the the check report.
|
|
- expire: Expire old revisions and notes, based on the given arguments -n and -r.
|
|
|
|
See https://git.hamburg.ccc.de/CCCHH/hedgedoc-expire
|
|
''')
|
|
)
|
|
parser.add_argument('-n', '--notes', metavar='DAYS', type=float,
|
|
help='remove all notes not changed in these many days')
|
|
parser.add_argument('-r', '--revisions', metavar='DAYS', type=float,
|
|
help='remove all revisions created more than these many days ago')
|
|
parser.add_argument('command', choices=['check', 'cron', 'emailcheck', 'expire'], default='check', nargs='?',
|
|
help='action to perform')
|
|
parser.add_argument('-v', '--verbose', action='store_true', default=False,
|
|
help='print more info while running')
|
|
args = parser.parse_args()
|
|
|
|
config = Config()
|
|
config.note_age = timedelta(days=args.notes) if args.notes is not None else None
|
|
config.revision_age = timedelta(days=args.revisions) if args.revisions is not None else None
|
|
config.verbose = args.verbose
|
|
mail = EmailSender(config.smtp_hostname, config.smtp_port, config.smtp_username, config.smtp_password,
|
|
config.smtp_from)
|
|
hedgedoc_expire = HedgedocExpire(config, mail)
|
|
|
|
if args.command == 'check':
|
|
hedgedoc_expire.cmd_check()
|
|
elif args.command == 'cron':
|
|
while True:
|
|
next_expire = datetime.now().replace(hour=2, minute=0, second=0, microsecond=0) + timedelta(days=1)
|
|
if args.verbose:
|
|
print(f'Next expire execution: {next_expire}')
|
|
seconds = (next_expire - datetime.now()).total_seconds()
|
|
if seconds > 0:
|
|
sleep(seconds)
|
|
hedgedoc_expire.cmd_expire()
|
|
elif args.command == 'emailcheck':
|
|
hedgedoc_expire.cmd_emailcheck()
|
|
elif args.command == 'expire':
|
|
hedgedoc_expire.cmd_expire()
|
|
else:
|
|
parser.print_help()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|