import logging from datetime import datetime, timedelta from itertools import islice from threading import Thread from time import sleep from buba.bubacmd import BubaCmd LOG = logging.getLogger(__name__) WEEKDAYS_DE = [ "Mo", "Di", "Mi", "Do", "Fr", "Sa", "So" ] class BubaAnimation: def __init__(self, buba: BubaCmd): self.log = logging.getLogger(type(self).__name__) self.buba = buba pass def __repr__(self): return f"<{type(self).__name__}>" def run(self): raise Exception("Your class must implement a run() method") @staticmethod def chunk(it, size): """ Return list in groups of size. :param it: list :param size: chunk size :return: list of chunks """ it = iter(it) return iter(lambda: tuple(islice(it, size)), ()) @staticmethod def humanize_delta(dt, now_delta, day_delta): if now_delta < timedelta(seconds=60): return "jetzt" if now_delta < timedelta(minutes=90): return f"{int(now_delta.seconds / 60)}m" if day_delta < timedelta(hours=24): return f"{int((now_delta.seconds + 3599) / 3600)}h" if day_delta < timedelta(days=7): # return dt.strftime("%a") # weekday return WEEKDAYS_DE[dt.weekday()] return dt.strftime("%d.%m.") @staticmethod def countdown(dt: datetime): """ Compute a human-readable time specification until the target event starts. The day starts at 04:00. :param dt: datetime timezone-aware datetime :return: """ now = datetime.now(dt.tzinfo) from_day_start = now.replace(hour=4, minute=0, second=0, microsecond=0) now_delta = dt - now day_delta = dt - from_day_start h = BubaAnimation.humanize_delta(dt, now_delta, day_delta) LOG.debug(f"countdown({dt}) {now_delta} {day_delta} {h}") return h @staticmethod def ellipsis(text, max=28): """ If the text is longer that max, shorten it and add ellipsis. :param text: to be shortened :param max: max length :return: shortened text """ if len(text) > max: text = text[:max - 2] + "..." # we can get away with just 2, since the periods are very narrow return text class BubaAnimator: def __init__(self, buba: BubaCmd): self.log = logging.getLogger(__name__) self.buba = buba self.animations = [] Thread(target=self.run, daemon=True).start() def run(self): while True: if len(self.animations) == 0: self.log.debug("No animations, sleeping...") sleep(2) else: for a in self.animations: self.log.debug(f"Starting animation: {a}") a.run() def add(self, animation, *args, **kwargs): self.animations.append(animation(self.buba, *args, **kwargs))