merge main
This commit is contained in:
commit
0de14df168
7 changed files with 147 additions and 158 deletions
|
@ -5,13 +5,13 @@ from bottle_log import LoggingPlugin
|
|||
from bottle_websocket import websocket, GeventWebSocketServer
|
||||
from geventwebsocket.websocket import WebSocket
|
||||
|
||||
from buba.animations.dbf import DBFAnimation
|
||||
from buba.animations.icalevents import IcalEvents
|
||||
from buba.animations.snake import SnakeAnimation
|
||||
from buba.animations.time import BubaTime
|
||||
from buba.appconfig import AppConfig
|
||||
from buba.bubaanimator import BubaAnimator
|
||||
from buba.animations.time import BubaTime
|
||||
from buba.bubacmd import BubaCmd
|
||||
|
||||
from buba.animations.dbf import DBFAnimation
|
||||
from buba.websocketcomm import WebSocketClients
|
||||
|
||||
config = AppConfig()
|
||||
|
@ -25,7 +25,6 @@ if config.debug:
|
|||
app.install(LoggingPlugin(app.config))
|
||||
TEMPLATE_PATH.insert(0, config.templatepath)
|
||||
|
||||
|
||||
websocket_clients = WebSocketClients()
|
||||
buba = BubaCmd(config.serial, websocket_clients.send)
|
||||
animator = BubaAnimator(buba)
|
||||
|
@ -33,7 +32,9 @@ animator = BubaAnimator(buba)
|
|||
animator.add(BubaTime)
|
||||
animator.add(DBFAnimation, ds100="AHST", station="Holstenstraße")
|
||||
animator.add(DBFAnimation, ds100="AHS", station="Altona", count=9)
|
||||
animator.add(IcalEvents, url="https://cloud.hamburg.ccc.de/remote.php/dav/public-calendars/QJAdExziSnNJEz5g?export", title="CCCHH Events")
|
||||
animator.add(IcalEvents, url="https://cloud.hamburg.ccc.de/remote.php/dav/public-calendars/QJAdExziSnNJEz5g?export",
|
||||
title="CCCHH Events")
|
||||
animator.add(SnakeAnimation)
|
||||
|
||||
@app.route("/static/<filepath>")
|
||||
def server_static(filepath):
|
||||
|
@ -45,6 +46,7 @@ def server_static(filepath):
|
|||
def root():
|
||||
return {}
|
||||
|
||||
|
||||
@app.get('/ws', apply=[websocket])
|
||||
def websocket_endpoint(ws: WebSocket):
|
||||
try:
|
||||
|
|
|
@ -16,6 +16,7 @@ from buba.bubacmd import BubaCmd
|
|||
class DBFAnimation(BubaAnimation):
|
||||
def __init__(self, buba: BubaCmd, ds100="AHST", station="Holstenstraße", count=3):
|
||||
super().__init__(buba)
|
||||
self.dbi = DBInfoscreen("trains.xatlabs.com")
|
||||
self.ds100 = ds100
|
||||
self.station = station
|
||||
self.trains = []
|
||||
|
@ -26,18 +27,22 @@ class DBFAnimation(BubaAnimation):
|
|||
return f"<{type(self).__name__}, {self.ds100}>"
|
||||
|
||||
def fetch(self):
|
||||
dbi = DBInfoscreen("trains.xatlabs.com")
|
||||
trains = dbi.calc_real_times(dbi.get_trains(self.ds100)) # Station
|
||||
trains = [t for t in trains] # if t['platform'] == "1"] # platform gleis
|
||||
trains.sort(key=dbi.time_sort)
|
||||
trains = self.dbi.calc_real_times(self.dbi.get_trains(self.ds100)) # Station
|
||||
# trains = [t for t in trains] # if t['platform'] == "1"] # platform gleis
|
||||
trains.sort(key=self.dbi.time_sort)
|
||||
self.trains = trains
|
||||
self.log.info(f"Fetched {len(trains)} trains")
|
||||
|
||||
def update(self):
|
||||
self.fetch()
|
||||
try:
|
||||
self.fetch()
|
||||
except Exception as e:
|
||||
self.log.warning(f"Unable to fetch {self.station}: {e}")
|
||||
pass
|
||||
sleep(60)
|
||||
|
||||
@staticmethod
|
||||
def countdown(dt:datetime):
|
||||
def countdown(dt: datetime):
|
||||
now = datetime.datetime.now()
|
||||
try:
|
||||
dep_time = datetime.datetime.strptime(dt, "%H:%M").time()
|
||||
|
@ -64,8 +69,6 @@ class DBFAnimation(BubaAnimation):
|
|||
|
||||
# Recalculate the timedelta
|
||||
return BubaAnimation.countdown(datetime.datetime.combine(dep_date, dep_time))
|
||||
# dep_td = datetime.datetime.combine(dep_date, dep_time) - now
|
||||
# return round(dep_td.total_seconds() / 60)
|
||||
|
||||
@staticmethod
|
||||
def short_station(station: str) -> str:
|
||||
|
@ -78,6 +81,8 @@ class DBFAnimation(BubaAnimation):
|
|||
station = station[:-8]
|
||||
if station == "Hbf":
|
||||
station = "Hauptbahnhof"
|
||||
if station == "Wedel(Holst)":
|
||||
station = "Wedel"
|
||||
return station
|
||||
|
||||
@staticmethod
|
||||
|
@ -101,7 +106,7 @@ class DBFAnimation(BubaAnimation):
|
|||
|
||||
def run(self):
|
||||
all = self.trains[:self.count]
|
||||
all_len = int((len(all)+1)/3)
|
||||
all_len = int((len(all) + 1) / 3)
|
||||
|
||||
if len(self.trains) == 0:
|
||||
sleep(5)
|
||||
|
@ -110,7 +115,7 @@ class DBFAnimation(BubaAnimation):
|
|||
if all_len == 1:
|
||||
title = self.station
|
||||
else:
|
||||
title = f"{self.station} ({page+1}/{all_len})"
|
||||
title = f"{self.station} ({page + 1}/{all_len})"
|
||||
self.buba.text(page=0, row=0, col_start=0, col_end=92, text=title, align=BubaCmd.ALIGN_LEFT)
|
||||
for i, train in enumerate(trains):
|
||||
if train['isCancelled']:
|
||||
|
@ -118,10 +123,12 @@ class DBFAnimation(BubaAnimation):
|
|||
else:
|
||||
when = self.countdown(train['actualDeparture'])
|
||||
self.buba.text(page=0, row=i + 1, col_start=0, col_end=11, text=self.short_train(train['train']))
|
||||
self.buba.text(page=0, row=i + 1, col_start=12, col_end=104, text=self.short_station(train['destination']))
|
||||
self.buba.text(page=0, row=i + 1, col_start=12, col_end=104,
|
||||
text=self.short_station(train['destination']))
|
||||
self.buba.text(page=0, row=i + 1, col_start=105, col_end=119,
|
||||
text=when, align=BubaCmd.ALIGN_RIGHT)
|
||||
self.buba.set_page(0)
|
||||
for i in range(5):
|
||||
self.buba.text(page=0, row=0, col_start=93, col_end=119, text=datetime.datetime.now().strftime("%H:%M"), align=BubaCmd.ALIGN_RIGHT)
|
||||
self.buba.text(page=0, row=0, col_start=93, col_end=119, text=datetime.datetime.now().strftime("%H:%M"),
|
||||
align=BubaCmd.ALIGN_RIGHT)
|
||||
sleep(2)
|
||||
|
|
|
@ -23,7 +23,7 @@ class IcalEvents(BubaAnimation):
|
|||
|
||||
def update(self):
|
||||
tz = timezone(os.getenv("TZ", "Europe/Berlin"))
|
||||
events = icalevents.icalevents.events(self.url, tzinfo=tz, sort=True, end=datetime.now(tz)+timedelta(days=14))
|
||||
events = icalevents.icalevents.events(self.url, tzinfo=tz, sort=True, end=datetime.now(tz) + timedelta(days=14))
|
||||
for event in events:
|
||||
event.start = event.start.astimezone(tz)
|
||||
self.events = events
|
||||
|
@ -33,7 +33,8 @@ class IcalEvents(BubaAnimation):
|
|||
for (page, events) in enumerate(self.chunk(self.events, 3)):
|
||||
if len(self.events) > 3:
|
||||
self.buba.text(page=0, row=0, col_start=0, col_end=119,
|
||||
text=f"{self.title} ({page + 1}/{int((len(self.events)+2) / 3)})", align=BubaCmd.ALIGN_LEFT)
|
||||
text=f"{self.title} ({page + 1}/{int((len(self.events) + 2) / 3)})",
|
||||
align=BubaCmd.ALIGN_LEFT)
|
||||
else:
|
||||
self.buba.text(page=0, row=0, col_start=0, col_end=119, text=self.title, align=BubaCmd.ALIGN_LEFT)
|
||||
for i in range(3):
|
||||
|
@ -41,7 +42,7 @@ class IcalEvents(BubaAnimation):
|
|||
self.buba.text(page=0, row=i + 1, col_start=0, col_end=119, text="")
|
||||
else:
|
||||
event = events[i]
|
||||
self.buba.text(page=0, row=i + 1, col_start=0, col_end=103, text=self.ellipsis(event.summary, 25))
|
||||
self.buba.text(page=0, row=i + 1, col_start=104, col_end=119,
|
||||
self.buba.text(page=0, row=i + 1, col_start=0, col_end=100, text=self.ellipsis(event.summary, 25))
|
||||
self.buba.text(page=0, row=i + 1, col_start=101, col_end=119,
|
||||
text=self.countdown(event.start), align=BubaCmd.ALIGN_RIGHT)
|
||||
sleep(10)
|
||||
|
|
107
buba/animations/snake.py
Normal file
107
buba/animations/snake.py
Normal file
|
@ -0,0 +1,107 @@
|
|||
import random
|
||||
from time import sleep
|
||||
|
||||
from buba.bubaanimator import BubaAnimation
|
||||
|
||||
|
||||
class SnakeAnimation(BubaAnimation):
|
||||
def __init__(self, buba):
|
||||
super().__init__(buba)
|
||||
# characters to render the grid and the snake
|
||||
# Because of Python's limited codec, instead of using the correct Unicode
|
||||
# codepoints, we're using the CP437 codepoints directly.
|
||||
self.width = 20
|
||||
self.height = 4
|
||||
self.grid = []
|
||||
self.prev_grid = []
|
||||
self.body = []
|
||||
# 0:Space
|
||||
self.charset = " "
|
||||
# 1:up 2:right 3:down 4:left
|
||||
self.charset += "\u001e\u0010\u001f\u0011"
|
||||
# 5:vertical 6:horizontal
|
||||
self.charset += "\u2551\u2550"
|
||||
# 7:up-right 8:down-right 9:down-left 10:up-left
|
||||
self.charset += "\u255a\u2554\u2557\u255d"
|
||||
# 11:tail
|
||||
self.charset += "\u25a0"
|
||||
self.turn = [
|
||||
[5, 8, 5, 9], # have been going up, so coming from down
|
||||
[10, 6, 9, 6], # have been going right, so coming from left
|
||||
[5, 7, 5, 10], # have been going down, so coming from up
|
||||
[7, 6, 8, 6], # have been going left, so coming from right
|
||||
]
|
||||
|
||||
random.seed()
|
||||
|
||||
def run(self):
|
||||
self.grid = [list([0] * self.width) for i in range(self.height)]
|
||||
self.prev_grid = [list([0] * self.width) for i in range(self.height)]
|
||||
x = random.randrange(self.width)
|
||||
y = random.randrange(self.height)
|
||||
d = random.randrange(4)
|
||||
for r in range(self.height):
|
||||
self.buba.simple_text(0, r, 0, "") # clear display
|
||||
self.grid[y][x] = 1 + d
|
||||
self.body = [(x, y)]
|
||||
self.render()
|
||||
iterations = 0
|
||||
while True:
|
||||
if self.is_blocked(x, y, d):
|
||||
end = True
|
||||
prev_d = d
|
||||
for n in self.shift(list(range(4))):
|
||||
if not self.is_blocked(x, y, n):
|
||||
end = False
|
||||
d = n
|
||||
break
|
||||
if end:
|
||||
self.grid[y][x] = 11
|
||||
self.render()
|
||||
break
|
||||
self.grid[y][x] = self.turn[prev_d][d]
|
||||
else:
|
||||
self.grid[y][x] = 5 + (d % 2)
|
||||
(x, y) = self.next(x, y, d)
|
||||
self.grid[y][x] = 1 + d
|
||||
iterations += 1
|
||||
self.body.append((x, y))
|
||||
if iterations % 3 == 0:
|
||||
(tx, ty) = self.body.pop(0)
|
||||
self.grid[ty][tx] = 0
|
||||
(tx, ty) = self.body[0]
|
||||
self.grid[ty][tx] = 11
|
||||
self.render()
|
||||
sleep(1)
|
||||
sleep(5)
|
||||
|
||||
@staticmethod
|
||||
def shift(a):
|
||||
i = random.randrange(1, len(a))
|
||||
return a[i:] + a[:i]
|
||||
|
||||
def next(self, x, y, d):
|
||||
match d:
|
||||
case 0:
|
||||
y -= 1
|
||||
case 1:
|
||||
x += 1
|
||||
case 2:
|
||||
y += 1
|
||||
case 3:
|
||||
x -= 1
|
||||
return (x, y)
|
||||
|
||||
def is_blocked(self, x, y, d):
|
||||
(x, y) = self.next(x, y, d)
|
||||
return (y < 0 or y >= self.height or
|
||||
x < 0 or x >= self.width or
|
||||
self.grid[y][x] != 0)
|
||||
|
||||
def render(self):
|
||||
for x in range(self.width):
|
||||
for y in range(self.height):
|
||||
if self.grid[y][x] != self.prev_grid[y][x]:
|
||||
c = x*6
|
||||
self.buba.text(0, y, c, c+5, self.charset[self.grid[y][x]])
|
||||
self.prev_grid[y][x] = self.grid[y][x]
|
|
@ -31,7 +31,7 @@ class BubaAnimation:
|
|||
return iter(lambda: tuple(islice(it, size)), ())
|
||||
|
||||
@staticmethod
|
||||
def countdown(dt:datetime):
|
||||
def countdown(dt: datetime):
|
||||
"""
|
||||
Compute a human-readable time specification until the target event starts. The day starts at 04:00.
|
||||
:param dt: datetime timezone-aware datetime
|
||||
|
@ -41,17 +41,16 @@ class BubaAnimation:
|
|||
from_day_start = now.replace(hour=4, minute=0, second=0, microsecond=0)
|
||||
now_delta = dt - now
|
||||
day_delta = dt - from_day_start
|
||||
if now_delta < timedelta(seconds=0):
|
||||
if now_delta < timedelta(seconds=60):
|
||||
return "now"
|
||||
if now_delta < timedelta(minutes=30):
|
||||
return f"{int(now_delta.seconds/60)}m"
|
||||
return f"{int(now_delta.seconds / 60)}m"
|
||||
if day_delta < timedelta(hours=24):
|
||||
return f"{int((now_delta.seconds+3599)/3600)}h"
|
||||
return f"{int((now_delta.seconds + 3599) / 3600)}h"
|
||||
if day_delta < timedelta(days=7):
|
||||
return dt.strftime("%a") # weekday
|
||||
return dt.strftime("%a") # weekday
|
||||
return dt.strftime("%d.%m.")
|
||||
|
||||
|
||||
@staticmethod
|
||||
def ellipsis(text, max=28):
|
||||
"""
|
||||
|
@ -61,7 +60,7 @@ class BubaAnimation:
|
|||
:return: shortened text
|
||||
"""
|
||||
if len(text) > max:
|
||||
text = text[:max - 2] + "..." # we can get away with just 2, since the periods are very narrow
|
||||
text = text[:max - 2] + "..." # we can get away with just 2, since the periods are very narrow
|
||||
return text
|
||||
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@ class BubaCmd:
|
|||
ALIGN_LEFT = 0x00
|
||||
ALIGN_RIGHT = 0x01
|
||||
ALIGN_CENTER = 0x02
|
||||
ALIGN_SCROLL = 0x03 # apparently not supported
|
||||
ALIGN_SCROLL = 0x03 # apparently not supported
|
||||
|
||||
def __init__(self, serial: str, send: Callable):
|
||||
self.log = logging.getLogger(__name__)
|
||||
|
@ -38,7 +38,7 @@ class BubaCmd:
|
|||
:param align: alignment options, see MIS1TextDisplay.ALIGN_*
|
||||
:return:
|
||||
"""
|
||||
text = text.encode("CP437", "replace").decode("CP437") # force text to be CP437 compliant
|
||||
text = text.encode("CP437", "replace").decode("CP437") # force text to be CP437 compliant
|
||||
if self.display is not None:
|
||||
self.display.simple_text(page, row, col, text, align)
|
||||
self.send({
|
||||
|
@ -50,7 +50,6 @@ class BubaCmd:
|
|||
'align': align,
|
||||
})
|
||||
|
||||
|
||||
def text(self, page, row, col_start, col_end, text, align=MIS1TextDisplay.ALIGN_LEFT):
|
||||
"""
|
||||
Send text to the specified row, placing it between col_start and col_end.
|
||||
|
@ -65,7 +64,7 @@ class BubaCmd:
|
|||
:param align: alignment options, see MIS1TextDisplay.ALIGN_*
|
||||
:return:
|
||||
"""
|
||||
text = text.encode("CP437", "replace").decode("CP437") # force text to be CP437 compliant
|
||||
text = text.encode("CP437", "replace").decode("CP437") # force text to be CP437 compliant
|
||||
if self.display is not None:
|
||||
self.display.text(page, row, col_start, col_end, text, align)
|
||||
self.send({
|
||||
|
@ -78,7 +77,6 @@ class BubaCmd:
|
|||
'align': align,
|
||||
})
|
||||
|
||||
|
||||
def set_page(self, page):
|
||||
"""
|
||||
Display the given page.
|
||||
|
@ -88,7 +86,6 @@ class BubaCmd:
|
|||
"""
|
||||
return self.set_pages([(page, 255)])
|
||||
|
||||
|
||||
def set_pages(self, pages):
|
||||
"""
|
||||
Configure automatic paging.
|
||||
|
|
|
@ -1,124 +0,0 @@
|
|||
from time import sleep
|
||||
|
||||
from pyfis.aegmis import MIS1TextDisplay
|
||||
from pyfis.utils import debug_hex
|
||||
|
||||
|
||||
def receive(display):
|
||||
if display.use_rts:
|
||||
display.port.setRTS(1)
|
||||
display.port.write([0x04, 0x81, 0x05])
|
||||
if display.use_rts:
|
||||
display.port.setRTS(0)
|
||||
return display.read_response(True)
|
||||
|
||||
def decode(display):
|
||||
r = receive(display)
|
||||
if r[0] != 0x02:
|
||||
print("Invalid byte {r[0]:02x}, expected EOT")
|
||||
d = {
|
||||
'code': r[1],
|
||||
'subcode': r[2],
|
||||
}
|
||||
m = []
|
||||
esc = False
|
||||
for i in range(3, len(r)-1):
|
||||
if esc:
|
||||
m.append(r[i])
|
||||
erc = False
|
||||
elif r[i] == 0x10:
|
||||
esc = True
|
||||
else:
|
||||
if r[i] == 0x03:
|
||||
# compute checksum
|
||||
next
|
||||
m.append(r[i])
|
||||
m = m[0:-1]
|
||||
print(f"m({len(m)}): {debug_hex(m)}")
|
||||
return m
|
||||
|
||||
|
||||
def reset_and_get_config(display):
|
||||
display.send_command(0x31, 0x00, [])
|
||||
r = decode(display)
|
||||
return {
|
||||
'type': r[0],
|
||||
'control-units': r[1],
|
||||
'digits-a': r[2] * 256 + r[3],
|
||||
'lines-a': r[4],
|
||||
'digits-b': r[5] * 256 + r[6],
|
||||
'lines-b': r[7],
|
||||
'timeout': r[8] * 128.0 + r[9] * 0.5,
|
||||
}
|
||||
|
||||
def get_config(display):
|
||||
display.send_command(0x38, 0x00, [])
|
||||
r = decode(display)
|
||||
return r
|
||||
|
||||
def text_raw(display, page, row, text):
|
||||
data = [display.ALIGN_LEFT, page, row, 0] + text
|
||||
return display.send_command(0x11, 0x00, data, expect_response=False)
|
||||
|
||||
|
||||
def charset():
|
||||
lines = []
|
||||
for i in range(0, 256, 16):
|
||||
label = f"{i:03d}: "
|
||||
b = list(label.encode("CP437"))
|
||||
for j in range(i, i+16):
|
||||
if j == 0:
|
||||
b.append(0x20)
|
||||
else:
|
||||
b.append(j)
|
||||
lines.append(b)
|
||||
|
||||
display.set_page(1)
|
||||
|
||||
while True:
|
||||
for p in range(1,16):
|
||||
for l in range(0,4):
|
||||
text_raw(display, 1, l, lines[(p+l-1) % len(lines)])
|
||||
sleep(5)
|
||||
|
||||
print("Opening serial port")
|
||||
display = MIS1TextDisplay("/dev/ttyUSB0")
|
||||
|
||||
print("Sending text")
|
||||
display.debug = True
|
||||
#r = display.send_command(0x31, 0x00, []) # reset
|
||||
#r = display.send_command(0x32, 0x01, []) # test
|
||||
#r = display.send_command(0x38, 0x01, []) # read config
|
||||
#r = display.read_response()
|
||||
#print(f"Response: {r}")
|
||||
#r = display.send_command(0x52, 0x01, []) # read config
|
||||
#display.send_raw_telegram([0xff])
|
||||
#display.send_raw_data([0x04, 0x81, 0x02, 0x03, 0x00])
|
||||
#print(f"response: {display.read_response()}")
|
||||
#r = reset_and_get_config(display)
|
||||
r = get_config(display)
|
||||
print(f"reply {len(r)}: {r}")
|
||||
|
||||
#display.simple_text(page=1, row=0, col=0, text="MEOW MEOW MEOW MEOW MEOW MEOW MEOW MEOW", align=display.ALIGN_CENTER)
|
||||
#display.simple_text(page=1, row=1, col=0, text="MEOW MEOW MEOW MEOW MEOW MEOW MEOW MEOW", align=display.ALIGN_CENTER)
|
||||
#display.simple_text(page=1, row=2, col=0, text="MEOW MEOW MEOW MEOW MEOW MEOW MEOW MEOW", align=display.ALIGN_CENTER)
|
||||
#display.simple_text(page=1, row=3, col=0, text="MEOW MEOW MEOW MEOW MEOW MEOW MEOW MEOW", align=display.ALIGN_CENTER)
|
||||
|
||||
#display.simple_text(page=0, row=3, col=0, text="MEOW MEOW MEOW MEOW MEOW MEOW", align=display.ALIGN_CENTER)
|
||||
|
||||
#print("page 1")
|
||||
#display.set_page(1)
|
||||
#sleep(10)
|
||||
|
||||
#for l in range(0,4):
|
||||
# display.simple_text(page=0, row=l, col=0, text="")
|
||||
#print("page 0")
|
||||
#display.set_page(0)
|
||||
#sleep(10)
|
||||
|
||||
#print("page 1")
|
||||
#display.set_page(1)
|
||||
#sleep(10)
|
||||
|
||||
charset()
|
||||
print("Thats all, folks!")
|
Loading…
Add table
Add a link
Reference in a new issue