diff --git a/buba/__main__.py b/buba/__main__.py index c35b646..6b0f1d6 100644 --- a/buba/__main__.py +++ b/buba/__main__.py @@ -5,13 +5,13 @@ from bottle_log import LoggingPlugin from bottle_websocket import websocket, GeventWebSocketServer from geventwebsocket.websocket import WebSocket +from buba.animations.dbf import DBFAnimation from buba.animations.icalevents import IcalEvents +from buba.animations.snake import SnakeAnimation +from buba.animations.time import BubaTime from buba.appconfig import AppConfig from buba.bubaanimator import BubaAnimator -from buba.animations.time import BubaTime from buba.bubacmd import BubaCmd - -from buba.animations.dbf import DBFAnimation from buba.websocketcomm import WebSocketClients config = AppConfig() @@ -25,7 +25,6 @@ if config.debug: app.install(LoggingPlugin(app.config)) TEMPLATE_PATH.insert(0, config.templatepath) - websocket_clients = WebSocketClients() buba = BubaCmd(config.serial, websocket_clients.send) animator = BubaAnimator(buba) @@ -33,7 +32,9 @@ animator = BubaAnimator(buba) animator.add(BubaTime) animator.add(DBFAnimation, ds100="AHST", station="Holstenstraße") animator.add(DBFAnimation, ds100="AHS", station="Altona", count=9) -animator.add(IcalEvents, url="https://cloud.hamburg.ccc.de/remote.php/dav/public-calendars/QJAdExziSnNJEz5g?export", title="CCCHH Events") +animator.add(IcalEvents, url="https://cloud.hamburg.ccc.de/remote.php/dav/public-calendars/QJAdExziSnNJEz5g?export", + title="CCCHH Events") +animator.add(SnakeAnimation) @app.route("/static/") def server_static(filepath): @@ -45,6 +46,7 @@ def server_static(filepath): def root(): return {} + @app.get('/ws', apply=[websocket]) def websocket_endpoint(ws: WebSocket): try: diff --git a/buba/animations/dbf.py b/buba/animations/dbf.py index 693afc1..9c32980 100644 --- a/buba/animations/dbf.py +++ b/buba/animations/dbf.py @@ -16,6 +16,7 @@ from buba.bubacmd import BubaCmd class DBFAnimation(BubaAnimation): def __init__(self, buba: BubaCmd, ds100="AHST", station="Holstenstraße", count=3): super().__init__(buba) + self.dbi = DBInfoscreen("trains.xatlabs.com") self.ds100 = ds100 self.station = station self.trains = [] @@ -26,18 +27,22 @@ class DBFAnimation(BubaAnimation): return f"<{type(self).__name__}, {self.ds100}>" def fetch(self): - dbi = DBInfoscreen("trains.xatlabs.com") - trains = dbi.calc_real_times(dbi.get_trains(self.ds100)) # Station - trains = [t for t in trains] # if t['platform'] == "1"] # platform gleis - trains.sort(key=dbi.time_sort) + trains = self.dbi.calc_real_times(self.dbi.get_trains(self.ds100)) # Station + # trains = [t for t in trains] # if t['platform'] == "1"] # platform gleis + trains.sort(key=self.dbi.time_sort) self.trains = trains + self.log.info(f"Fetched {len(trains)} trains") def update(self): - self.fetch() + try: + self.fetch() + except Exception as e: + self.log.warning(f"Unable to fetch {self.station}: {e}") + pass sleep(60) @staticmethod - def countdown(dt:datetime): + def countdown(dt: datetime): now = datetime.datetime.now() try: dep_time = datetime.datetime.strptime(dt, "%H:%M").time() @@ -64,8 +69,6 @@ class DBFAnimation(BubaAnimation): # Recalculate the timedelta return BubaAnimation.countdown(datetime.datetime.combine(dep_date, dep_time)) -# dep_td = datetime.datetime.combine(dep_date, dep_time) - now -# return round(dep_td.total_seconds() / 60) @staticmethod def short_station(station: str) -> str: @@ -78,6 +81,8 @@ class DBFAnimation(BubaAnimation): station = station[:-8] if station == "Hbf": station = "Hauptbahnhof" + if station == "Wedel(Holst)": + station = "Wedel" return station @staticmethod @@ -101,7 +106,7 @@ class DBFAnimation(BubaAnimation): def run(self): all = self.trains[:self.count] - all_len = int((len(all)+1)/3) + all_len = int((len(all) + 1) / 3) if len(self.trains) == 0: sleep(5) @@ -110,7 +115,7 @@ class DBFAnimation(BubaAnimation): if all_len == 1: title = self.station else: - title = f"{self.station} ({page+1}/{all_len})" + title = f"{self.station} ({page + 1}/{all_len})" self.buba.text(page=0, row=0, col_start=0, col_end=92, text=title, align=BubaCmd.ALIGN_LEFT) for i, train in enumerate(trains): if train['isCancelled']: @@ -118,10 +123,12 @@ class DBFAnimation(BubaAnimation): else: when = self.countdown(train['actualDeparture']) self.buba.text(page=0, row=i + 1, col_start=0, col_end=11, text=self.short_train(train['train'])) - self.buba.text(page=0, row=i + 1, col_start=12, col_end=104, text=self.short_station(train['destination'])) + self.buba.text(page=0, row=i + 1, col_start=12, col_end=104, + text=self.short_station(train['destination'])) self.buba.text(page=0, row=i + 1, col_start=105, col_end=119, text=when, align=BubaCmd.ALIGN_RIGHT) self.buba.set_page(0) for i in range(5): - self.buba.text(page=0, row=0, col_start=93, col_end=119, text=datetime.datetime.now().strftime("%H:%M"), align=BubaCmd.ALIGN_RIGHT) + self.buba.text(page=0, row=0, col_start=93, col_end=119, text=datetime.datetime.now().strftime("%H:%M"), + align=BubaCmd.ALIGN_RIGHT) sleep(2) diff --git a/buba/animations/icalevents.py b/buba/animations/icalevents.py index e605b38..a51f16f 100644 --- a/buba/animations/icalevents.py +++ b/buba/animations/icalevents.py @@ -23,7 +23,7 @@ class IcalEvents(BubaAnimation): def update(self): tz = timezone(os.getenv("TZ", "Europe/Berlin")) - events = icalevents.icalevents.events(self.url, tzinfo=tz, sort=True, end=datetime.now(tz)+timedelta(days=14)) + events = icalevents.icalevents.events(self.url, tzinfo=tz, sort=True, end=datetime.now(tz) + timedelta(days=14)) for event in events: event.start = event.start.astimezone(tz) self.events = events @@ -33,7 +33,8 @@ class IcalEvents(BubaAnimation): for (page, events) in enumerate(self.chunk(self.events, 3)): if len(self.events) > 3: self.buba.text(page=0, row=0, col_start=0, col_end=119, - text=f"{self.title} ({page + 1}/{int((len(self.events)+2) / 3)})", align=BubaCmd.ALIGN_LEFT) + text=f"{self.title} ({page + 1}/{int((len(self.events) + 2) / 3)})", + align=BubaCmd.ALIGN_LEFT) else: self.buba.text(page=0, row=0, col_start=0, col_end=119, text=self.title, align=BubaCmd.ALIGN_LEFT) for i in range(3): @@ -41,7 +42,7 @@ class IcalEvents(BubaAnimation): self.buba.text(page=0, row=i + 1, col_start=0, col_end=119, text="") else: event = events[i] - self.buba.text(page=0, row=i + 1, col_start=0, col_end=103, text=self.ellipsis(event.summary, 25)) - self.buba.text(page=0, row=i + 1, col_start=104, col_end=119, + self.buba.text(page=0, row=i + 1, col_start=0, col_end=100, text=self.ellipsis(event.summary, 25)) + self.buba.text(page=0, row=i + 1, col_start=101, col_end=119, text=self.countdown(event.start), align=BubaCmd.ALIGN_RIGHT) sleep(10) diff --git a/buba/animations/snake.py b/buba/animations/snake.py new file mode 100644 index 0000000..404cad6 --- /dev/null +++ b/buba/animations/snake.py @@ -0,0 +1,107 @@ +import random +from time import sleep + +from buba.bubaanimator import BubaAnimation + + +class SnakeAnimation(BubaAnimation): + def __init__(self, buba): + super().__init__(buba) + # characters to render the grid and the snake + # Because of Python's limited codec, instead of using the correct Unicode + # codepoints, we're using the CP437 codepoints directly. + self.width = 20 + self.height = 4 + self.grid = [] + self.prev_grid = [] + self.body = [] + # 0:Space + self.charset = " " + # 1:up 2:right 3:down 4:left + self.charset += "\u001e\u0010\u001f\u0011" + # 5:vertical 6:horizontal + self.charset += "\u2551\u2550" + # 7:up-right 8:down-right 9:down-left 10:up-left + self.charset += "\u255a\u2554\u2557\u255d" + # 11:tail + self.charset += "\u25a0" + self.turn = [ + [5, 8, 5, 9], # have been going up, so coming from down + [10, 6, 9, 6], # have been going right, so coming from left + [5, 7, 5, 10], # have been going down, so coming from up + [7, 6, 8, 6], # have been going left, so coming from right + ] + + random.seed() + + def run(self): + self.grid = [list([0] * self.width) for i in range(self.height)] + self.prev_grid = [list([0] * self.width) for i in range(self.height)] + x = random.randrange(self.width) + y = random.randrange(self.height) + d = random.randrange(4) + for r in range(self.height): + self.buba.simple_text(0, r, 0, "") # clear display + self.grid[y][x] = 1 + d + self.body = [(x, y)] + self.render() + iterations = 0 + while True: + if self.is_blocked(x, y, d): + end = True + prev_d = d + for n in self.shift(list(range(4))): + if not self.is_blocked(x, y, n): + end = False + d = n + break + if end: + self.grid[y][x] = 11 + self.render() + break + self.grid[y][x] = self.turn[prev_d][d] + else: + self.grid[y][x] = 5 + (d % 2) + (x, y) = self.next(x, y, d) + self.grid[y][x] = 1 + d + iterations += 1 + self.body.append((x, y)) + if iterations % 3 == 0: + (tx, ty) = self.body.pop(0) + self.grid[ty][tx] = 0 + (tx, ty) = self.body[0] + self.grid[ty][tx] = 11 + self.render() + sleep(1) + sleep(5) + + @staticmethod + def shift(a): + i = random.randrange(1, len(a)) + return a[i:] + a[:i] + + def next(self, x, y, d): + match d: + case 0: + y -= 1 + case 1: + x += 1 + case 2: + y += 1 + case 3: + x -= 1 + return (x, y) + + def is_blocked(self, x, y, d): + (x, y) = self.next(x, y, d) + return (y < 0 or y >= self.height or + x < 0 or x >= self.width or + self.grid[y][x] != 0) + + def render(self): + for x in range(self.width): + for y in range(self.height): + if self.grid[y][x] != self.prev_grid[y][x]: + c = x*6 + self.buba.text(0, y, c, c+5, self.charset[self.grid[y][x]]) + self.prev_grid[y][x] = self.grid[y][x] diff --git a/buba/bubaanimator.py b/buba/bubaanimator.py index 20226a7..b34bef1 100644 --- a/buba/bubaanimator.py +++ b/buba/bubaanimator.py @@ -31,7 +31,7 @@ class BubaAnimation: return iter(lambda: tuple(islice(it, size)), ()) @staticmethod - def countdown(dt:datetime): + def countdown(dt: datetime): """ Compute a human-readable time specification until the target event starts. The day starts at 04:00. :param dt: datetime timezone-aware datetime @@ -41,17 +41,16 @@ class BubaAnimation: from_day_start = now.replace(hour=4, minute=0, second=0, microsecond=0) now_delta = dt - now day_delta = dt - from_day_start - if now_delta < timedelta(seconds=0): + if now_delta < timedelta(seconds=60): return "now" if now_delta < timedelta(minutes=30): - return f"{int(now_delta.seconds/60)}m" + return f"{int(now_delta.seconds / 60)}m" if day_delta < timedelta(hours=24): - return f"{int((now_delta.seconds+3599)/3600)}h" + return f"{int((now_delta.seconds + 3599) / 3600)}h" if day_delta < timedelta(days=7): - return dt.strftime("%a") # weekday + return dt.strftime("%a") # weekday return dt.strftime("%d.%m.") - @staticmethod def ellipsis(text, max=28): """ @@ -61,7 +60,7 @@ class BubaAnimation: :return: shortened text """ if len(text) > max: - text = text[:max - 2] + "..." # we can get away with just 2, since the periods are very narrow + text = text[:max - 2] + "..." # we can get away with just 2, since the periods are very narrow return text diff --git a/buba/bubacmd.py b/buba/bubacmd.py index ad1b28a..030e22e 100644 --- a/buba/bubacmd.py +++ b/buba/bubacmd.py @@ -12,7 +12,7 @@ class BubaCmd: ALIGN_LEFT = 0x00 ALIGN_RIGHT = 0x01 ALIGN_CENTER = 0x02 - ALIGN_SCROLL = 0x03 # apparently not supported + ALIGN_SCROLL = 0x03 # apparently not supported def __init__(self, serial: str, send: Callable): self.log = logging.getLogger(__name__) @@ -38,7 +38,7 @@ class BubaCmd: :param align: alignment options, see MIS1TextDisplay.ALIGN_* :return: """ - text = text.encode("CP437", "replace").decode("CP437") # force text to be CP437 compliant + text = text.encode("CP437", "replace").decode("CP437") # force text to be CP437 compliant if self.display is not None: self.display.simple_text(page, row, col, text, align) self.send({ @@ -50,7 +50,6 @@ class BubaCmd: 'align': align, }) - def text(self, page, row, col_start, col_end, text, align=MIS1TextDisplay.ALIGN_LEFT): """ Send text to the specified row, placing it between col_start and col_end. @@ -65,7 +64,7 @@ class BubaCmd: :param align: alignment options, see MIS1TextDisplay.ALIGN_* :return: """ - text = text.encode("CP437", "replace").decode("CP437") # force text to be CP437 compliant + text = text.encode("CP437", "replace").decode("CP437") # force text to be CP437 compliant if self.display is not None: self.display.text(page, row, col_start, col_end, text, align) self.send({ @@ -78,7 +77,6 @@ class BubaCmd: 'align': align, }) - def set_page(self, page): """ Display the given page. @@ -88,7 +86,6 @@ class BubaCmd: """ return self.set_pages([(page, 255)]) - def set_pages(self, pages): """ Configure automatic paging. diff --git a/buba/experiments.py b/buba/experiments.py deleted file mode 100644 index 690878b..0000000 --- a/buba/experiments.py +++ /dev/null @@ -1,124 +0,0 @@ -from time import sleep - -from pyfis.aegmis import MIS1TextDisplay -from pyfis.utils import debug_hex - - -def receive(display): - if display.use_rts: - display.port.setRTS(1) - display.port.write([0x04, 0x81, 0x05]) - if display.use_rts: - display.port.setRTS(0) - return display.read_response(True) - -def decode(display): - r = receive(display) - if r[0] != 0x02: - print("Invalid byte {r[0]:02x}, expected EOT") - d = { - 'code': r[1], - 'subcode': r[2], - } - m = [] - esc = False - for i in range(3, len(r)-1): - if esc: - m.append(r[i]) - erc = False - elif r[i] == 0x10: - esc = True - else: - if r[i] == 0x03: - # compute checksum - next - m.append(r[i]) - m = m[0:-1] - print(f"m({len(m)}): {debug_hex(m)}") - return m - - -def reset_and_get_config(display): - display.send_command(0x31, 0x00, []) - r = decode(display) - return { - 'type': r[0], - 'control-units': r[1], - 'digits-a': r[2] * 256 + r[3], - 'lines-a': r[4], - 'digits-b': r[5] * 256 + r[6], - 'lines-b': r[7], - 'timeout': r[8] * 128.0 + r[9] * 0.5, - } - -def get_config(display): - display.send_command(0x38, 0x00, []) - r = decode(display) - return r - -def text_raw(display, page, row, text): - data = [display.ALIGN_LEFT, page, row, 0] + text - return display.send_command(0x11, 0x00, data, expect_response=False) - - -def charset(): - lines = [] - for i in range(0, 256, 16): - label = f"{i:03d}: " - b = list(label.encode("CP437")) - for j in range(i, i+16): - if j == 0: - b.append(0x20) - else: - b.append(j) - lines.append(b) - - display.set_page(1) - - while True: - for p in range(1,16): - for l in range(0,4): - text_raw(display, 1, l, lines[(p+l-1) % len(lines)]) - sleep(5) - -print("Opening serial port") -display = MIS1TextDisplay("/dev/ttyUSB0") - -print("Sending text") -display.debug = True -#r = display.send_command(0x31, 0x00, []) # reset -#r = display.send_command(0x32, 0x01, []) # test -#r = display.send_command(0x38, 0x01, []) # read config -#r = display.read_response() -#print(f"Response: {r}") -#r = display.send_command(0x52, 0x01, []) # read config -#display.send_raw_telegram([0xff]) -#display.send_raw_data([0x04, 0x81, 0x02, 0x03, 0x00]) -#print(f"response: {display.read_response()}") -#r = reset_and_get_config(display) -r = get_config(display) -print(f"reply {len(r)}: {r}") - -#display.simple_text(page=1, row=0, col=0, text="MEOW MEOW MEOW MEOW MEOW MEOW MEOW MEOW", align=display.ALIGN_CENTER) -#display.simple_text(page=1, row=1, col=0, text="MEOW MEOW MEOW MEOW MEOW MEOW MEOW MEOW", align=display.ALIGN_CENTER) -#display.simple_text(page=1, row=2, col=0, text="MEOW MEOW MEOW MEOW MEOW MEOW MEOW MEOW", align=display.ALIGN_CENTER) -#display.simple_text(page=1, row=3, col=0, text="MEOW MEOW MEOW MEOW MEOW MEOW MEOW MEOW", align=display.ALIGN_CENTER) - -#display.simple_text(page=0, row=3, col=0, text="MEOW MEOW MEOW MEOW MEOW MEOW", align=display.ALIGN_CENTER) - -#print("page 1") -#display.set_page(1) -#sleep(10) - -#for l in range(0,4): -# display.simple_text(page=0, row=l, col=0, text="") -#print("page 0") -#display.set_page(0) -#sleep(10) - -#print("page 1") -#display.set_page(1) -#sleep(10) - -charset() -print("Thats all, folks!")