import socket import struct import sys from threading import Thread from time import sleep from typing import Union from animation import Animation, Off, Steady, FadeTo, RotatingRainbow, Chase def ledlog(value): return int(pow(float(value) / 255.0, 2) * 255) class RGB: def __init__(self, slot, offset=0): self.slot = slot self.offset = offset def rgb(self, dmx, color): (r, g, b) = color dmx.set(self.slot + self.offset + 0, ledlog(r)) dmx.set(self.slot + self.offset + 1, ledlog(g)) dmx.set(self.slot + self.offset + 2, ledlog(b)) class Bar252(RGB): def __init__(self, dmx, slot=1): super(Bar252, self).__init__(dmx, slot, 2) def rgb(self, dmx, color): super().rgb(dmx, color) dmx.set(self.slot + 0, 81) dmx.set(self.slot + 1, 0) class REDSpot18RGB(RGB): def __init__(self, dmx, slot=1): super(REDSpot18RGB, self).__init__(dmx, slot, 1) dmx.set(self.slot + 0, 0) def rgb(self, dmx, color): super().rgb(dmx, color) dmx.set(self.slot + 0, 81) class StairvilleLedPar56(RGB): def __init__(self, dmx, slot=1): super(StairvilleLedPar56, self).__init__(dmx, slot, 0) dmx.set(self.slot + 3, 0) dmx.set(self.slot + 4, 0) dmx.set(self.slot + 5, 0) dmx.set(self.slot + 6, 255) class Group: def __init__(self, rgbs : list[RGB]): self._rgbs = rgbs self._color = (0, 0, 0) self._animation = FadeTo((255, 255, 255)) @property def animation(self) -> Animation: return self._animation @animation.setter def animation(self, animation: Animation): self._animation = animation @property def color(self) -> list[int]: return self._color @color.setter def color(self, color: list[int]): self._color = color class DMX: def __init__(self, host, port=0x1936, universe=1, maxchan=512): self._host = host self._port = port self._universe = universe self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP self._socket.setblocking(False) self._data = bytearray(maxchan) packet = bytearray() packet.extend(map(ord, "Art-Net")) packet.append(0x00) # Null terminate Art-Net packet.extend([0x00, 0x50]) # Opcode ArtDMX 0x5000 (Little endian) packet.extend([0x00, 0x0e]) # Protocol version 14 self._header = packet self._sequence = 1 self._groups = [] self._thread = None self._updating = False def group(self, group: int, rgbs : list[RGB]): self._groups[group] = Group(rgbs) def start(self): if self._thread and self._thread.is_alive(): return self._thread = Thread(daemon=True, target=self.background) self._updating = True self._thread.start() def background(self): while self._updating: self.update() # print("updating") # print(self.data) # break sleep(1.0 / 30) def update(self): if not self._groups[0]._animation: return for g in self._groups: for i in range(0, len(g._rgbs)): g._rgbs[i].rgb(self, g._animation.update(i, len(g._rgbs))) packet = self._header[:] packet.append(self._sequence) # Sequence, packet.append(0x00) # Physical packet.append(self._universe & 0xFF) # Universe LowByte packet.append(self._universe >> 8 & 0xFF) # Universe HighByte packet.extend(struct.pack('>h', len(self._data))) # Pack the number of channels Big endian packet.extend(self._data) self._socket.sendto(packet, (self._host, self._port)) # print(f"sent {len(packet)} bytes, {threading.get_native_id()}") self._sequence += 1 if self._sequence > 255: self._sequence = 1 def set(self, slot, value): self._data[slot - 1] = value @property def animation(self) -> str: return self._groups[0]._animation.name() @animation.setter def animation(self, animation: Union[Animation, str]): if isinstance(animation, str): if animation == "off": animation = Off() elif animation == "chase": animation = Chase(self._groups[0]._color) elif animation == "fade": animation = FadeTo(self._groups[0]._color) elif animation == "rainbow": animation = RotatingRainbow() elif animation == "steady": animation = Steady(self._groups[0]._color) else: raise ValueError(f"No such animation {animation}") self._groups[0]._animation = animation if isinstance(animation, Off): self._updating = False if self._thread and self._thread.is_alive(): self._thread.join() # one frame black self._groups[0]._animation = Steady((0, 0, 0)) self.update() self._groups[0]._animation = Off() else: self.start() print(f"Animation: {animation}", file=sys.stderr) @property def color(self): return self._groups[0].color @color.setter def color(self, color): if self._groups[0]._color != color: self._groups[0].color = color self.animation = self.animation