Refactor Animation to simplify

Also add some documentation on how to write animations.
This commit is contained in:
Stefan Bethke 2025-06-16 09:33:22 +02:00
commit 64a3c729be
12 changed files with 302 additions and 223 deletions

View file

@ -48,21 +48,29 @@ Buba is configured completely through environment variables:
## Creating Animation Plugins
Buba instantiates objects of type `BubaAnimation` and runs through them in a loop. Each animation must implement the
`run()` method, which should send data to the display. The animation is run in its own thread, therefor, the animation
`show()` method, which should send data to the display. The animation is run in its own thread, therefore, the animation
should sleep an appropriate time to let users take in the information. See the existing animations
in [buba/animations](./buba/animations) for inspiration.
Note: if you need to fetch and update external information regularly, you should start your own thread when initalizing
your animation.
To implement your own animation, subclass [BubaAnimation](buba/bubaanimation.py).
In the `__init__()` method, you will want to set `self.title`, and configure your data source.
Implement `update()` to fetch any data and fill in `self.rows` (list of rows). Each row in `self.rows` is a list of
values for the columns in the layout.
The default layout has a wide, left-aligned column for text, and a short, right-aligned column for a time period or a
value. You can define your own layout if necessary.
If you do not want to show tabular data, you can override `show()` and implement sending text to the display yourself.
## Character Set
The display uses [Code Page 437](https://en.wikipedia.org/wiki/Code_page_437), with a few exceptions. Due to the limited
resolution of the segments, the display can deviate significantly.
Note that the [Python codecs](https://docs.python.org/3/library/codecs.html) for `CP437` do not map all special characters correctly.
Note that the [Python codecs](https://docs.python.org/3/library/codecs.html) for `CP437` do not map all special
characters correctly.
| Code | CP437 | Geavision Spaltenschrift |
|------|--------------|--------------------------|

View file

@ -5,12 +5,7 @@ from bottle_log import LoggingPlugin
from bottle_websocket import websocket, GeventWebSocketServer
from geventwebsocket.websocket import WebSocket
from buba.animations.clubassistant import Clubassistant
from buba.animations.dbf import DBFAnimation
from buba.animations.icalevents import IcalEvents
from buba.animations.snake import SnakeAnimation
from buba.animations.spaceapi import Spaceapi
from buba.animations.time import BubaTime
from buba.animationconfig import setup_animations
from buba.appconfig import AppConfig
from buba.bubaanimator import BubaAnimator
from buba.bubacmd import BubaCmd
@ -30,18 +25,7 @@ TEMPLATE_PATH.insert(0, config.templatepath)
websocket_clients = WebSocketClients()
buba = BubaCmd(config.serial, websocket_clients.send)
animator = BubaAnimator(buba)
# animator.add(BubaCharset) #, single=12)
animator.add(BubaTime)
animator.add(DBFAnimation, ds100="AHST", station="Holstenstraße")
animator.add(DBFAnimation, ds100="AHS", station="Altona", count=9)
animator.add(IcalEvents, url="https://cloud.hamburg.ccc.de/remote.php/dav/public-calendars/QJAdExziSnNJEz5g?export",
title="CCCHH Events")
animator.add(Spaceapi, "https://spaceapi.hamburg.ccc.de", "CCCHH")
if config.clubassistant_token is not None:
animator.add(Clubassistant, "https://club-assistant.ccchh.net", config.clubassistant_token)
else:
log.warning("Club Assistant token not set, not activating animation")
animator.add(SnakeAnimation)
setup_animations(config, animator)
@app.route("/static/<filepath>")

43
buba/animationconfig.py Normal file
View file

@ -0,0 +1,43 @@
import logging
from buba.animations.charset import BubaCharset
from buba.animations.dbf import DBF
from buba.animations.homeassistant import HomeAssistant
from buba.animations.icalevents import IcalEvents
from buba.animations.snake import SnakeAnimation
from buba.animations.spaceapi import Spaceapi
from buba.animations.time import BubaTime
from buba.bubaanimator import BubaAnimator
LOG = logging.getLogger(__name__)
def setup_animations(config, animator: BubaAnimator):
cs = BubaCharset(animator.buba)
# animator.add(cs)
bt = BubaTime(animator.buba)
animator.add(bt)
snake = SnakeAnimation(animator.buba)
animator.add(snake)
dbf_ahst = DBF(animator.buba, ds100="AHST", station="Holstenstraße")
dbf_ahs = DBF(animator.buba, ds100="AHS", station="Altona", count=9)
animator.add(dbf_ahst)
animator.add(dbf_ahs)
ccchh_events = IcalEvents(animator.buba,
url="https://cloud.hamburg.ccc.de/remote.php/dav/public-calendars/QJAdExziSnNJEz5g?export",
title="CCCHH Events")
animator.add(ccchh_events)
ccchh_spaceapi = Spaceapi(animator.buba, "https://spaceapi.hamburg.ccc.de", "CCCHH")
animator.add(ccchh_spaceapi)
if config.clubassistant_token is not None:
ca = HomeAssistant(animator.buba, "https://club-assistant.ccchh.net", "Club Assistant",
config.clubassistant_token)
animator.add(ca)
else:
LOG.warning("Club Assistant token not set, not activating animation")

View file

@ -1,6 +1,6 @@
from time import sleep
from buba.bubaanimator import BubaAnimation
from buba.bubaanimation import BubaAnimation
from buba.bubacmd import BubaCmd
@ -10,7 +10,7 @@ class BubaCharset(BubaAnimation):
self.charset = bytes(range(256)).decode("CP437")
self.single = single
def run(self):
def show(self):
if self.single is not None:
while True:
self.render(self.single)

View file

@ -10,12 +10,13 @@ from time import sleep
from deutschebahn.db_infoscreen import DBInfoscreen
from buba.bubaanimator import BubaAnimation, LineLayoutColumn
from buba.bubaanimator import LineLayoutColumn
from buba.bubaanimation import BubaAnimation
from buba.bubacmd import BubaCmd
LOG = logging.getLogger(__name__)
class DBFAnimation(BubaAnimation):
class DBF(BubaAnimation):
dbf_layout = [
LineLayoutColumn(12, BubaCmd.ALIGN_LEFT),
LineLayoutColumn(81, BubaCmd.ALIGN_LEFT),
@ -26,30 +27,21 @@ class DBFAnimation(BubaAnimation):
super().__init__(buba)
self.dbi = DBInfoscreen("trains.xatlabs.com")
self.ds100 = ds100
self.station = station
self.title = station
self.layout = self.dbf_layout
self.trains = []
self.count = count
Thread(target=self.update, daemon=True).start()
def __repr__(self):
return f"<{type(self).__name__}, {self.ds100}>"
def fetch(self):
def update(self):
trains = self.dbi.calc_real_times(self.dbi.get_trains(self.ds100)) # Station
# trains = [t for t in trains] # if t['platform'] == "1"] # platform gleis
trains.sort(key=self.dbi.time_sort)
self.trains = trains
self.log.info(f"Fetched {len(trains)} trains")
def update(self):
while True:
try:
self.fetch()
except Exception as e:
self.log.warning(f"Unable to fetch {self.station}: {e}")
pass
sleep(60)
@staticmethod
def countdown(dt: datetime, cancelled:bool):
if cancelled:
@ -105,9 +97,10 @@ class DBFAnimation(BubaAnimation):
train = train.replace(" ", "")
return train
def run(self):
self.pages(self.station, [[
def show(self):
self.rows = [[
self.short_train(train['train']),
self.short_station(train['destination']),
self.countdown(train['actualDeparture'], train['isCancelled']),
] for train in self.trains[:self.count]], layout=self.dbf_layout)
] for train in self.trains[:self.count]]
self.show_pages()

View file

@ -7,7 +7,7 @@ from time import sleep
import requests
from buba.bubaanimator import BubaAnimation
from buba.bubaanimation import BubaAnimation
default_endpoints = [
{
@ -32,31 +32,21 @@ default_endpoints = [
}
]
class Clubassistant(BubaAnimation):
def __init__(self, buba, url, token, endpoints=None):
class HomeAssistant(BubaAnimation):
def __init__(self, buba, url, title, token, endpoints=None):
if endpoints is None:
endpoints = default_endpoints
super().__init__(buba)
self.url = url
self.title = title
self.token = token
self.endpoints = endpoints
self.data = []
self.load()
Thread(target=self.update, daemon=True).start()
def load(self):
data = []
def update(self):
rows = []
for e in self.endpoints:
res = requests.get(self.url + "/api/states/" + e["endpoint"], headers={'Authorization': 'Bearer ' + self.token})
res.raise_for_status()
js = json.loads(res.text)
data.append([e["name"], f"{js['state']}{js['attributes']['unit_of_measurement']}"])
self.data = data
def update(self):
while True:
self.load()
sleep(60)
def run(self):
self.pages("Club Assistant", self.data)
rows.append([e["name"], f"{js['state']}{js['attributes']['unit_of_measurement']}"])
self.rows = rows

View file

@ -6,7 +6,7 @@ from time import sleep
import icalevents.icalevents
from pytz import timezone
from buba.bubaanimator import BubaAnimation
from buba.bubaanimation import BubaAnimation
from buba.bubacmd import BubaCmd
@ -15,20 +15,13 @@ class IcalEvents(BubaAnimation):
super().__init__(buba)
self.url = url
self.title = title
self.events = []
Thread(target=self.update, daemon=True).start()
def __repr__(self):
return f"<{type(self).__name__}, {self.url}>"
def update(self):
while True:
tz = timezone(os.getenv("TZ", "Europe/Berlin"))
events = icalevents.icalevents.events(self.url, tzinfo=tz, sort=True, end=datetime.now(tz) + timedelta(days=14))
for event in events:
event.start = event.start.astimezone(tz)
self.events = events
sleep(600)
def run(self):
self.pages(self.title, [[e.summary, self.countdown(e.start)] for e in self.events])
self.rows = [[e.summary, self.countdown(e.start)] for e in events]

View file

@ -1,7 +1,7 @@
import random
from time import sleep
from buba.bubaanimator import BubaAnimation
from buba.bubaanimation import BubaAnimation
class SnakeAnimation(BubaAnimation):
@ -34,7 +34,7 @@ class SnakeAnimation(BubaAnimation):
random.seed()
def run(self):
def show(self):
self.grid = [list([0] * self.width) for i in range(self.height)]
self.prev_grid = [list([0] * self.width) for i in range(self.height)]
x = random.randrange(self.width)

View file

@ -6,7 +6,7 @@ from time import sleep
import requests
from buba.bubaanimator import BubaAnimation
from buba.bubaanimation import BubaAnimation
from buba.bubacmd import BubaCmd
@ -15,41 +15,21 @@ class Spaceapi(BubaAnimation):
super().__init__(buba)
self.url = url
self.title = title
self.data = {}
self.load()
Thread(target=self.update, daemon=True).start()
def load(self):
res = requests.get(self.url)
self.data = json.loads(res.text)
def update(self):
while True:
self.load()
sleep(60)
res = requests.get(self.url)
data = json.loads(res.text)
def humanize(self, dt):
td = dt - datetime.now().astimezone()
self.log.debug(f"dt {dt}, td {td}, {td.total_seconds()}")
if td.total_seconds() > -60:
return "just now"
if td.total_seconds() > -3600:
return f"for {-int(td.total_seconds()/60)} minutes"
if td.total_seconds() >- 86400:
return f"for {-int(td.total_seconds()/3600)} hours"
return dt.strftime("since %y-%m-%d %H:%M")
def run(self):
open = "open" if self.data["state"]["open"] else "closed"
since = datetime.fromtimestamp(self.data["state"]["lastchange"]).astimezone()
temp = int(self.data["sensors"]["temperature"][0]["value"])
hum = int(self.data["sensors"]["humidity"][0]["value"])
open = "open" if data["state"]["open"] else "closed"
since = datetime.fromtimestamp(data["state"]["lastchange"]).astimezone()
temp = int(data["sensors"]["temperature"][0]["value"])
hum = int(data["sensors"]["humidity"][0]["value"])
printers = {}
for p in self.data["sensors"]["ext_3d_printer_busy_state"]:
for p in data["sensors"]["ext_3d_printer_busy_state"]:
printers[p["name"]] = {
"busy": p["value"] != 0,
}
for p in self.data["sensors"]["ext_3d_printer_minutes_remaining"]:
for p in data["sensors"]["ext_3d_printer_minutes_remaining"]:
printers[p["name"]]["remaining"] = p["value"]
printstatus = []
for n, p in sorted(printers.items()):
@ -57,9 +37,18 @@ class Spaceapi(BubaAnimation):
printstatus.append(f"{n} {p['remaining']}m left")
else:
printstatus.append(f"{n} idle")
self.pages("CCCHH Space API", [
self.rows = [
[f"CCCHH {open} {self.humanize(since)}"],
[f"Outside: {temp}°C at {hum}% rel.hum."],
[", ".join(printstatus)]
])
]
def humanize(self, dt):
td = dt - datetime.now().astimezone()
if td.total_seconds() > -60:
return "just now"
if td.total_seconds() > -3600:
return f"for {-int(td.total_seconds()/60)} minutes"
if td.total_seconds() >- 86400:
return f"for {-int(td.total_seconds()/3600)} hours"
return dt.strftime("since %y-%m-%d %H:%M")

View file

@ -1,7 +1,7 @@
from datetime import datetime
from time import sleep
from buba.bubaanimator import BubaAnimation
from buba.bubaanimation import BubaAnimation
from buba.bubacmd import BubaCmd
@ -9,7 +9,7 @@ class BubaTime(BubaAnimation):
def __init__(self, buba: BubaCmd):
super().__init__(buba)
def run(self):
def show(self):
self.buba.text(page=0, row=0, col_start=0, col_end=119, text="Chaos Computer Club", align=BubaCmd.ALIGN_CENTER)
self.buba.text(page=0, row=1, col_start=0, col_end=119, text="Hansestadt Hamburg", align=BubaCmd.ALIGN_CENTER)
self.buba.text(page=0, row=2, col_start=0, col_end=119, text="", align=BubaCmd.ALIGN_CENTER)

180
buba/bubaanimation.py Normal file
View file

@ -0,0 +1,180 @@
import logging
from datetime import timedelta, datetime
from itertools import islice
from threading import Thread
from time import sleep
from buba.bubaanimator import LineLayoutColumn, WEEKDAYS_DE
from buba.bubacmd import BubaCmd
class BubaAnimation:
default_layout = [
LineLayoutColumn(90, BubaCmd.ALIGN_LEFT),
LineLayoutColumn(30, BubaCmd.ALIGN_RIGHT),
]
def __init__(self, buba: BubaCmd):
self.log = logging.getLogger(type(self).__name__)
self.buba = buba
self.update_interval = timedelta(minutes=1)
self.page_interval = timedelta(seconds=7)
self.title = None
self.layout = self.default_layout
self.rows = []
self.thread = Thread(target=self.update_rows, daemon=True)
self.thread.start()
pass
def __repr__(self):
return f"<{type(self).__name__}>"
def update(self) -> None:
"""
Do everything necessary to produce the information to be displayed.
:return:
"""
def update_rows(self) -> None:
"""
Update loop. This will be run continuously in a thread to call update().
:return:
"""
sleep(1) # give the code a bit of time to configure stuff
while True:
try:
self.update()
except Exception as e:
self.log.warning(f"unable to update: {e}")
sleep(self.update_interval.total_seconds())
def show(self) -> None:
"""
Write information to the display.
You should also implement any waiting time for people to be able to take the information in.
"""
self.show_pages()
@staticmethod
def chunk(it, size):
"""
Return list in groups of size.
:param it: list
:param size: chunk size
:return: list of chunks
"""
it = iter(it)
return iter(lambda: tuple(islice(it, size)), ())
@staticmethod
def humanize_delta(dt, now_delta, day_delta):
"""
Produce a short text representing a target date and time.
:param dt: the target date and time
:param now_delta: time delta of the target time to now
:param day_delta: delta rounded to the full day
:return:
"""
if now_delta < timedelta(seconds=60):
return "jetzt"
if now_delta < timedelta(minutes=90):
return f"{int(now_delta.seconds / 60)}m"
if day_delta < timedelta(hours=24):
return f"{int((now_delta.seconds + 3599) / 3600)}h"
if day_delta < timedelta(days=7):
# return dt.strftime("%a") # weekday
return WEEKDAYS_DE[dt.weekday()]
return dt.strftime("%d.%m.")
@staticmethod
def countdown(dt: datetime):
"""
Compute a human-readable time specification until the target event starts. The day starts at 04:00.
:param dt: datetime timezone-aware datetime
:return:
"""
now = datetime.now(dt.tzinfo)
from_day_start = now.replace(hour=4, minute=0, second=0, microsecond=0)
now_delta = dt - now
day_delta = dt - from_day_start
h = BubaAnimation.humanize_delta(dt, now_delta, day_delta)
return h
@staticmethod
def ellipsis(text, max=28):
"""
If the text is longer that max, shorten it and add ellipsis.
:param text: to be shortened
:param max: max length
:return: shortened text
"""
if len(text) > max:
text = text[:max - 2] + "..." # we can get away with just 2, since the periods are very narrow
return text
def _write_row(self, nrow: int, row:list[str], layout: list[LineLayoutColumn]) -> None:
"""
Write one row to the display.
:param nrow: row number to write
:param row: contents of the row
:param layout: layout to use
:return:
"""
col = 0
for i, ll in enumerate(layout):
t = row[i] if len(row) > i else ""
if ll.width > 24:
t = self.ellipsis(t, int(ll.width / 3.7))
self.buba.text(page=0, row=nrow, col_start=col, col_end=col + ll.width - 1, text=t, align=ll.align)
col += ll.width
def write_title(self, row: list[str], layout: list[LineLayoutColumn] = None) -> None:
"""
Write the title row. If row has one element, fill the entire row with that; if it has two, the second one
is right-aligned and shows a page number or the current time.
:param row: the title info
:param layout: the layout to use
:return:
"""
if len(row) == 1:
self._write_row(0, row, [LineLayoutColumn(120, BubaCmd.ALIGN_LEFT)])
else:
self._write_row(0, row, [
LineLayoutColumn(100, BubaCmd.ALIGN_LEFT),
LineLayoutColumn(20, BubaCmd.ALIGN_RIGHT),
])
def write_row(self, nrow: int, row: list[str]) -> None:
"""
Write one row to the display. Use the layout specification to format the row.
:param nrow: row number to be written
:param row: contents of the row
:param layout: layout to use
:return:
"""
if len(row) == 1:
# if the row only has a single part, simply show that over the entire width
self._write_row(nrow, row, [LineLayoutColumn(120, BubaCmd.ALIGN_LEFT)])
else:
self._write_row(nrow, row, self.layout)
def show_pages(self) -> None:
"""
Show rows on the display. Paginate the rows if there are more rows than the display can show.
:return:
"""
pages = list(self.chunk(self.rows, 3))
for n, page in enumerate(pages):
if len(pages) <= 1:
self.write_title([self.title])
else:
self.write_title([self.title, f"({n + 1}/{len(pages)})"])
for i in range(3):
if i >= len(page):
self.buba.text(page=0, row=i + 1, col_start=0, col_end=119, text="")
else:
p = page[i]
if isinstance(p, str):
p = [p, ]
self.write_row(i + 1, p)
sleep(self.page_interval.total_seconds())

View file

@ -1,8 +1,6 @@
import logging
import random
from dataclasses import dataclass
from datetime import datetime, timedelta
from itertools import islice
from threading import Thread
from time import sleep
@ -28,110 +26,6 @@ class LineLayoutColumn:
align: int
class BubaAnimation:
default_layout = [
LineLayoutColumn(90, BubaCmd.ALIGN_LEFT),
LineLayoutColumn(30, BubaCmd.ALIGN_RIGHT),
]
def __init__(self, buba: BubaCmd):
self.log = logging.getLogger(type(self).__name__)
self.buba = buba
pass
def __repr__(self):
return f"<{type(self).__name__}>"
def run(self):
raise Exception("Your class must implement a run() method")
@staticmethod
def chunk(it, size):
"""
Return list in groups of size.
:param it: list
:param size: chunk size
:return: list of chunks
"""
it = iter(it)
return iter(lambda: tuple(islice(it, size)), ())
@staticmethod
def humanize_delta(dt, now_delta, day_delta):
if now_delta < timedelta(seconds=60):
return "jetzt"
if now_delta < timedelta(minutes=90):
return f"{int(now_delta.seconds / 60)}m"
if day_delta < timedelta(hours=24):
return f"{int((now_delta.seconds + 3599) / 3600)}h"
if day_delta < timedelta(days=7):
# return dt.strftime("%a") # weekday
return WEEKDAYS_DE[dt.weekday()]
return dt.strftime("%d.%m.")
@staticmethod
def countdown(dt: datetime):
"""
Compute a human-readable time specification until the target event starts. The day starts at 04:00.
:param dt: datetime timezone-aware datetime
:return:
"""
now = datetime.now(dt.tzinfo)
from_day_start = now.replace(hour=4, minute=0, second=0, microsecond=0)
now_delta = dt - now
day_delta = dt - from_day_start
h = BubaAnimation.humanize_delta(dt, now_delta, day_delta)
LOG.debug(f"countdown({dt}) {now_delta} {day_delta} {h}")
return h
@staticmethod
def ellipsis(text, max=28):
"""
If the text is longer that max, shorten it and add ellipsis.
:param text: to be shortened
:param max: max length
:return: shortened text
"""
if len(text) > max:
text = text[:max - 2] + "..." # we can get away with just 2, since the periods are very narrow
return text
def write_row(self, row: int, line: list[str], layout: list[LineLayoutColumn]) -> None:
# pass a layout with n columns, or make it an instance variable
if len(line) == 1:
self.buba.text(page=0, row=row, col_start=0, col_end=119,
text=self.ellipsis(line[0], 35), align=BubaCmd.ALIGN_LEFT)
elif len(line) < len(layout) and len(line) == 2:
self.write_row(row, line, self.default_layout)
else:
col = 0
for i, ll in enumerate(layout):
t = line[i] if len(line) > i else ""
if ll.width > 24:
t = self.ellipsis(t, int(ll.width / 3.7))
self.buba.text(page=0, row=row, col_start=col, col_end=col+ll.width-1, text=t, align=ll.align)
col += ll.width
def pages(self, title: str, lines: list[list[str]], layout=None) -> None:
# pass the layout through to write_row
if layout is None:
layout = self.default_layout
pages = list(self.chunk(lines, 3))
for n, page in enumerate(pages):
if len(pages) <= 1:
self.write_row(0, [title], layout)
else:
self.write_row(0, [title, f"({n + 1}/{len(pages)})"], layout)
for i in range(3):
if i >= len(page):
self.buba.text(page=0, row=i + 1, col_start=0, col_end=119, text="")
else:
p = page[i]
if isinstance(p, str):
p = [p, ]
self.write_row(i + 1, p, layout)
sleep(10)
class BubaAnimator:
def __init__(self, buba: BubaCmd):
self.log = logging.getLogger(__name__)
@ -140,7 +34,11 @@ class BubaAnimator:
Thread(target=self.run, daemon=True).start()
def run(self):
last = -1
for nrow in range(4):
self.buba.text(page=0, row=nrow, col_start=0, col_end=119, text="")
self.buba.text(page=0, row=1, col_start=0, col_end=119, text="...booting...", align=BubaCmd.ALIGN_CENTER)
sleep(5)
while True:
if len(self.animations) == 0:
self.log.debug("No animations, sleeping...")
@ -148,10 +46,11 @@ class BubaAnimator:
else:
for a in random.sample(self.animations, len(self.animations)):
self.log.debug(f"Starting animation: {a}")
a.run()
def add(self, animation, *args, **kwargs):
try:
self.animations.append(animation(self.buba, *args, **kwargs))
a.show()
except Exception as e:
self.log.error(f"Failed to add animation: {e}")
self.log.warning(f"Exception while showing animation: {e}")
sleep(2)
def add(self, animation):
self.animations.append(animation)