Send regular updates to both display and browser
All checks were successful
docker-image / docker (push) Successful in 9m40s

This commit is contained in:
Stefan Bethke 2025-06-01 18:48:59 +02:00
commit 3568146cd6
8 changed files with 243 additions and 5 deletions

View file

@ -1,7 +1,5 @@
import json
import logging import logging
from os import getenv, path from os import getenv, path
from pathlib import Path
class AppConfig: class AppConfig:
@ -17,6 +15,7 @@ class AppConfig:
self.templatepath = path.join(self.basepath, "templates") self.templatepath = path.join(self.basepath, "templates")
self.url = getenv('BUBA_URL', 'http://localhost:3000') self.url = getenv('BUBA_URL', 'http://localhost:3000')
(self.listen_host, self.listen_port) = getenv('BUBA_LISTEN', '127.0.0.1:3000').split(':') (self.listen_host, self.listen_port) = getenv('BUBA_LISTEN', '127.0.0.1:3000').split(':')
self.serial = getenv('BUBA_SERIAL', '/dev/ttyUSB0')
if self.debug is not None and self.debug.lower not in ('0', 'f', 'false'): if self.debug is not None and self.debug.lower not in ('0', 'f', 'false'):
self.debug = True self.debug = True

24
buba/BubaAnimator.py Normal file
View file

@ -0,0 +1,24 @@
import logging
from datetime import datetime
from threading import Thread
from time import sleep
from buba.BubaCmd import BubaCmd
class BubaAnimator:
def __init__(self, buba:BubaCmd):
self.log = logging.getLogger(__name__)
self.buba = buba
Thread(target=self.run, daemon=True).start()
def run(self):
while True:
self.buba.simple_text(page=0, row=2, col=1, text="")
self.buba.simple_text(page=0, row=3, col=1, text="")
self.buba.simple_text(page=0, row=4, col=1, text="")
self.buba.simple_text(page=0, row=1, col=1, text="CCCHH Buba")
self.buba.simple_text(page=0, row=1, col=94, text=datetime.now().strftime("%H:%M:%S"))
self.buba.set_page(0)
sleep(1)

101
buba/BubaCmd.py Normal file
View file

@ -0,0 +1,101 @@
"""
Commands to send to the display. Commands are always sent to the websocket, so emulators can display the data. If a serial port is available, send the commands there as well.
"""
import logging
from os import path
from typing import Callable
from pyfis.aegmis import MIS1TextDisplay
class BubaCmd:
def __init__(self, serial: str, send: Callable):
self.log = logging.getLogger(__name__)
self.serial = serial
self.display = None
if not path.exists(serial):
self.serial = None
logging.warning(f"Unable to find serial port '{serial}', ignoring")
if self.serial is not None:
self.display = MIS1TextDisplay(serial)
self.send = send
def simple_text(self, page, row, col, text, align=MIS1TextDisplay.ALIGN_LEFT):
"""
Send text to the specified row.
The remainder of the row is cleared. This mirrors MIS1TextDisplay.simple_text
:param self:
:param page: which page to write the text to
:param row: which row to write the text to
:param col: in which column to start
:param text: to write
:param align: alignment options, see MIS1TextDisplay.ALIGN_*
:return:
"""
if self.display is not None:
self.display.simple_text(page, row, col, text, align)
self.send({
'cmd': 'simple_text',
'page': page,
'row': row,
'col': col,
'text': text,
'align': align,
})
def text(self, page, row, col_start, col_end, text, align=MIS1TextDisplay.ALIGN_LEFT):
"""
Send text to the specified row, placing it between col_start and col_end.
This mirrors MIS1TextDisplay.text
:param self:
:param page: which page to write the text to
:param row: which row to write the text to
:param col_start: starting column
:param col_end: ending column
:param text: to write
:param align: alignment options, see MIS1TextDisplay.ALIGN_*
:return:
"""
if self.display is not None:
self.display.text(page, row, col_start, col_end, text, align)
self.send({
'cmd': 'text',
'page': page,
'row': row,
'col_start': col_start,
'col_end': col_end,
'text': text,
'align': align,
})
def set_page(self, page):
"""
Display the given page.
:param self:
:param page: page number to display
:return:
"""
return self.set_pages([(page, 255)])
def set_pages(self, pages):
"""
Configure automatic paging.
You can specify up to 10 tuples of (pagenumber, delay).
The display will automatically step through the list and show each page number for
the given time. After all entries have been shown, the process starts over.
:param self:
:param pages:
:return:
"""
if self.display is not None:
self.display.set_pages(pages)
self.send({
'cmd': 'set_pages',
'pages': pages,
})

View file

@ -6,6 +6,9 @@ from bottle_websocket import websocket, GeventWebSocketServer
from geventwebsocket.websocket import WebSocket from geventwebsocket.websocket import WebSocket
from buba.AppConfig import AppConfig from buba.AppConfig import AppConfig
from buba.BubaAnimator import BubaAnimator
from buba.BubaCmd import BubaCmd
from buba.websocketcomm import WebSocketClients
config = AppConfig() config = AppConfig()
if config.debug: if config.debug:
@ -18,7 +21,9 @@ app.install(LoggingPlugin(app.config))
TEMPLATE_PATH.insert(0, config.templatepath) TEMPLATE_PATH.insert(0, config.templatepath)
# websocket_clients = WebSocketClients() websocket_clients = WebSocketClients()
buba = BubaCmd(config.serial, websocket_clients.send)
animator = BubaAnimator(buba)
# bottle_helpers = BottleHelpers(auth, group=config.requires_group, allowed=config.allowed) # bottle_helpers = BottleHelpers(auth, group=config.requires_group, allowed=config.allowed)
# update_poller = UpdatePoller(websocket_clients, ccujack, 1 if config.debug else 0.1) # update_poller = UpdatePoller(websocket_clients, ccujack, 1 if config.debug else 0.1)
@ -33,6 +38,19 @@ def server_static(filepath):
def root(): def root():
return {} return {}
@app.get('/ws', apply=[websocket])
def websocket_endpoint(ws: WebSocket):
try:
websocket_clients.add(ws)
# ws.send(json.dumps(update_poller.get_locks(True)))
while True:
m = ws.receive()
except Exception as e:
logging.debug("error in websocket", exc_info=e)
pass
finally:
websocket_clients.remove(ws)
if __name__ == '__main__': if __name__ == '__main__':
app.run(host=config.listen_host, port=config.listen_port, server=GeventWebSocketServer, debug=config.debug, app.run(host=config.listen_host, port=config.listen_port, server=GeventWebSocketServer, debug=config.debug,

View file

@ -5,6 +5,7 @@ export default class {
templateSvgUrl: "static/geascript-proportional.svg", templateSvgUrl: "static/geascript-proportional.svg",
rows: 4, rows: 4,
cols: 120, cols: 120,
segments: 24,
stripWidth: 0, stripWidth: 0,
}, config); }, config);
console.log("Building display..."); console.log("Building display...");
@ -69,11 +70,34 @@ export default class {
this.applyText(3, 1, "`abcdefghijklmnopqrstuvwxyz{|}~\u2302"); this.applyText(3, 1, "`abcdefghijklmnopqrstuvwxyz{|}~\u2302");
} }
eraseCol(row, col) {
for (let i = 1; i<=this.config.segments; i++) {
const e = this.container.querySelector(`#geavision__row_${row}_${col}_${i}`)
if (e) {
e.classList = "gvsoff";
} else {
console.log(`Unable to find element #geavision__row_${row}_${col}_${i} for segment '${c}'`)
}
}
}
eraseCols(row, col_start, col_end) {
for (let i = col_start; i <= col_end; i++) {
this.eraseCol(row, i);
}
}
eraseRow(row) {
this.eraseCol(row, 1, this.config.cols);
}
applyCharacter(row, col, char) { applyCharacter(row, col, char) {
let f = this.font[char] let f = this.font[char]
if (f === undefined) if (f === undefined)
return 0; return 0;
for (let c of this.font[char]) { for (let c of this.font[char]) {
if (col > this.config.cols)
return this.font[char].length;
for (let s = 0; s < c.length; s++) { for (let s = 0; s < c.length; s++) {
const e = this.container.querySelector(`#geavision__row_${row}_${col}_${s + 1}`) const e = this.container.querySelector(`#geavision__row_${row}_${col}_${s + 1}`)
if (e) { if (e) {
@ -94,6 +118,17 @@ export default class {
return col; return col;
} }
set_pages(pages) {
console.log("set pages", pages);
// ignore for now
}
simple_text(page, row, col, text, align) {
// console.log("simple text", page, row, col, text, align);
this.eraseCols(row, col, this.config.cols);
this.applyText(row, col, text);
}
defineFont() { defineFont() {
/* /*
* This is the font definition for the Geascript Proportional font, reverse engineered from the actual display. * This is the font definition for the Geascript Proportional font, reverse engineered from the actual display.

View file

@ -1,3 +1,7 @@
body {
font-family: Arial, Helvetica, sans-serif;
}
#geavision-display { #geavision-display {
padding: 1em; padding: 1em;
background-color: #222; background-color: #222;

View file

@ -2,6 +2,35 @@ import Display from "./display.js";
let container = document.getElementById("geavision-display"); let container = document.getElementById("geavision-display");
if (container) { if (container) {
const d = new Display(container); const display = new Display(container);
// do more stuff
function connect() {
const ws = new WebSocket("/ws");
ws.addEventListener("message", (event) => {
let m = JSON.parse(event.data);
if (m.cmd === undefined) {
console.log("undefined command", m)
}
switch(m.cmd) {
case "set_pages":
display.set_pages(m.pages);
break;
case "simple_text":
display.simple_text(m.page, m.row, m.col, m.text, m.align);
break;
case "text":
console.log("text", m);
break;
}
});
ws.addEventListener("close", (ev) => {
setTimeout(function () {
connect();
}, 1000)
});
ws.addEventListener("error", (ev) => {
ws.close();
});
}
connect();
} }

28
buba/websocketcomm.py Normal file
View file

@ -0,0 +1,28 @@
import json
import logging
class WebSocketClients:
def __init__(self):
self.clients = []
self.log = logging.getLogger(__name__)
def add(self, client):
self.clients.append(client)
def remove(self, client):
try:
client.close()
except Exception:
pass
if client in self.clients:
self.clients.remove(client)
def send(self, data):
for client in self.clients:
try:
client.send(json.dumps(data))
except Exception as e:
self.log.debug(f"Error sending data", exc_info=e)
if client in self.clients:
self.remove(client)