Refactoring

- Animation in it's own class
- JS to sync state across browsers
This commit is contained in:
Stefan Bethke 2022-07-24 11:43:00 +02:00
commit efb7c20af5
6 changed files with 352 additions and 160 deletions

176
dmx.py
View file

@ -1,14 +1,11 @@
import colorsys
import socket
import struct
import sys
from threading import Thread
from time import sleep, time
from time import sleep
from typing import Union
def hsv_to_rgb(h, s, v):
(r, g, b) = colorsys.hsv_to_rgb(h, s, v)
return [int(r * 255), int(g * 255), int(b * 255)]
from animation import Animation, Off, Steady, FadeTo, RotatingRainbow, Chase
def ledlog(value):
@ -50,98 +47,36 @@ class StairvilleLedPar56(RGB):
dmx.set(self.slot + 6, 255)
class Steady:
def __init__(self, r, g, b):
self.r = r
self.g = g
self.b = b
def update(self, index, count):
return (self.r, self.g, self.b)
def __str__(self):
return f"{type(self).__name__}({self.r}, {self.g}, {self.b})"
class FadeTo(Steady):
def __init__(self, r, g, b, t=2.0):
super(FadeTo, self).__init__(r, g, b)
self.t = t
self.start = time()
def update(self, index, count):
h = (time() - self.start) / self.t
h = min(h, 1.0)
return (int(self.r * h), int(self.g * h), int(self.b * h))
def __str__(self):
return f"{type(self).__name__}({self.r}, {self.g}, {self.b}, {self.t:.2f})"
class RotatingRainbow:
def __init__(self, looptime=10.0):
self.looptime = looptime
pass
def update(self, index, count):
"""
One full round takes self.looptime seconds, each RGB is offset in a circle
:param index:
:param count:
:return:
"""
hue = (time() / self.looptime + (index + 0.0) / count) % 1.0
rgb = hsv_to_rgb(hue, 1, 1)
return rgb
def __str__(self):
return f"{type(self).__name__}"
class Chase(Steady):
def __init__(self, r, g, b, looptime=1.0):
super(Chase, self).__init__(r, g, b)
self.looptime = looptime
def update(self, index, count):
angle = (time() / self.looptime + (index + 0.0) / count) % 1.0
l = 1 - min(abs(angle - 2.0 / count), 1.0 / count) * count
# print(f"f({index}, {angle:.2f}) -> {l:.2f}")
return (int(self.r * l), int(self.g * l), int(self.b * l))
def __str__(self):
return f"{type(self).__name__}({self.r}, {self.g}, {self.b}, {self.looptime:.2f})"
class DMX:
def __init__(self, host, port=0x1936, universe=1, maxchan=512):
self.host = host
self.port = port
self.universe = universe
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
self.socket.setblocking(False)
self.data = bytearray(maxchan)
self._host = host
self._port = port
self._universe = universe
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
self._socket.setblocking(False)
self._data = bytearray(maxchan)
packet = bytearray()
packet.extend(map(ord, "Art-Net"))
packet.append(0x00) # Null terminate Art-Net
packet.extend([0x00, 0x50]) # Opcode ArtDMX 0x5000 (Little endian)
packet.extend([0x00, 0x0e]) # Protocol version 14
self.header = packet
self.sequence = 1
self.animation = FadeTo(255, 255, 255)
self.rgbs = []
self.thread = None
self.updating = False
self._header = packet
self._sequence = 1
self._color = (0, 0, 0)
self._animation = FadeTo((255, 255, 255))
self._rgbs = []
self._thread = None
self._updating = False
def start(self):
if self.thread and self.thread.is_alive():
if self._thread and self._thread.is_alive():
return
self.thread = Thread(daemon=True, target=self.background)
self.updating = True
self.thread.start()
self._thread = Thread(daemon=True, target=self.background)
self._updating = True
self._thread.start()
def background(self):
while self.updating:
while self._updating:
self.update()
# print("updating")
# print(self.data)
@ -149,38 +84,67 @@ class DMX:
sleep(1.0 / 30)
def update(self):
if not self.animation:
if not self._animation:
return
for i in range(0, len(self.rgbs)):
self.rgbs[i].rgb(self.animation.update(i, len(self.rgbs)))
for i in range(0, len(self._rgbs)):
self._rgbs[i].rgb(self._animation.update(i, len(self._rgbs)))
packet = self.header[:]
packet.append(self.sequence) # Sequence,
packet = self._header[:]
packet.append(self._sequence) # Sequence,
packet.append(0x00) # Physical
packet.append(self.universe & 0xFF) # Universe LowByte
packet.append(self.universe >> 8 & 0xFF) # Universe HighByte
packet.append(self._universe & 0xFF) # Universe LowByte
packet.append(self._universe >> 8 & 0xFF) # Universe HighByte
packet.extend(struct.pack('>h', len(self.data))) # Pack the number of channels Big endian
packet.extend(self.data)
self.socket.sendto(packet, (self.host, self.port))
packet.extend(struct.pack('>h', len(self._data))) # Pack the number of channels Big endian
packet.extend(self._data)
self._socket.sendto(packet, (self._host, self._port))
# print(f"sent {len(packet)} bytes, {threading.get_native_id()}")
self.sequence += 1
if self.sequence > 255:
self.sequence = 1
self._sequence += 1
if self._sequence > 255:
self._sequence = 1
def set(self, slot, value):
self.data[slot - 1] = value
self._data[slot - 1] = value
def setAnimation(self, animation):
if not animation:
self.updating = False
self.thread.join()
@property
def animation(self) -> str:
return self._animation.name()
@animation.setter
def animation(self, animation: Union[Animation, str]):
if isinstance(animation, str):
if animation == "off":
animation = Off()
elif animation == "chase":
animation = Chase(self._color)
elif animation == "fade":
animation = FadeTo(self._color)
elif animation == "rainbow":
animation = RotatingRainbow()
elif animation == "steady":
animation = Steady(self._color)
else:
raise ValueError(f"No such animation {animation}")
self._animation = animation
if isinstance(animation, Off):
self._updating = False
if self._thread and self._thread.is_alive():
self._thread.join()
# one frame black
self.animation = Steady(0, 0, 0)
self._animation = Steady((0, 0, 0))
self.update()
else:
self.animation = animation
self.start()
print(f"Animation: {animation}", file=sys.stderr)
@property
def color(self):
return self._color
@color.setter
def color(self, color):
if self._color != color:
self._color = color
self.animation = self.animation