import colorsys import socket import struct import sys from threading import Thread from time import sleep, time def hsv_to_rgb(h, s, v): (r, g, b) = colorsys.hsv_to_rgb(h, s, v) return [int(r * 255), int(g * 255), int(b * 255)] def ledlog(value): return int(pow(float(value) / 255.0, 2) * 255) class RGB: def __init__(self, dmx, slot, offset=0): self.dmx = dmx self.slot = slot self.offset = offset def rgb(self, color): (r, g, b) = color self.dmx.set(self.slot + self.offset + 0, ledlog(r)) self.dmx.set(self.slot + self.offset + 1, ledlog(g)) self.dmx.set(self.slot + self.offset + 2, ledlog(b)) class Bar252(RGB): def __init__(self, dmx, slot=1): super(Bar252, self).__init__(dmx, slot, 2) dmx.set(self.slot + 0, 81) dmx.set(self.slot + 1, 0) class REDSpot18RGB(RGB): def __init__(self, dmx, slot=1): super(REDSpot18RGB, self).__init__(dmx, slot, 1) dmx.set(self.slot + 0, 0) class StairvilleLedPar56(RGB): def __init__(self, dmx, slot=1): super(StairvilleLedPar56, self).__init__(dmx, slot, 0) dmx.set(self.slot + 3, 0) dmx.set(self.slot + 4, 0) dmx.set(self.slot + 5, 0) dmx.set(self.slot + 6, 255) class Steady: def __init__(self, r, g, b): self.r = r self.g = g self.b = b def update(self, index, count): return (self.r, self.g, self.b) def __str__(self): return f"{type(self).__name__}({self.r}, {self.g}, {self.b})" class FadeTo(Steady): def __init__(self, r, g, b, t=2.0): super(FadeTo, self).__init__(r, g, b) self.t = t self.start = time() def update(self, index, count): h = (time() - self.start) / self.t h = min(h, 1.0) return (int(self.r * h), int(self.g * h), int(self.b * h)) def __str__(self): return f"{type(self).__name__}({self.r}, {self.g}, {self.b}, {self.t:.2f})" class RotatingRainbow: def __init__(self, looptime=10.0): self.looptime = looptime pass def update(self, index, count): """ One full round takes self.looptime seconds, each RGB is offset in a circle :param index: :param count: :return: """ hue = (time() / self.looptime + (index + 0.0) / count) % 1.0 rgb = hsv_to_rgb(hue, 1, 1) return rgb def __str__(self): return f"{type(self).__name__}" class Chase(Steady): def __init__(self, r, g, b, looptime=1.0): super(Chase, self).__init__(r, g, b) self.looptime = looptime def update(self, index, count): angle = (time() / self.looptime + (index + 0.0) / count) % 1.0 l = 1 - min(abs(angle - 2.0 / count), 1.0 / count) * count # print(f"f({index}, {angle:.2f}) -> {l:.2f}") return (int(self.r * l), int(self.g * l), int(self.b * l)) def __str__(self): return f"{type(self).__name__}({self.r}, {self.g}, {self.b}, {self.looptime:.2f})" class DMX: def __init__(self, host, port=0x1936, universe=1, maxchan=512): self.host = host self.port = port self.universe = universe self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP self.socket.setblocking(False) self.data = bytearray(maxchan) packet = bytearray() packet.extend(map(ord, "Art-Net")) packet.append(0x00) # Null terminate Art-Net packet.extend([0x00, 0x50]) # Opcode ArtDMX 0x5000 (Little endian) packet.extend([0x00, 0x0e]) # Protocol version 14 self.header = packet self.sequence = 1 self.animation = FadeTo(255, 255, 255) self.rgbs = [] self.thread = None self.updating = False def start(self): if self.thread and self.thread.is_alive(): return self.thread = Thread(daemon=True, target=self.background) self.updating = True self.thread.start() def background(self): while self.updating: self.update() # print("updating") # print(self.data) # break sleep(1.0 / 30) def update(self): if not self.animation: return for i in range(0, len(self.rgbs)): self.rgbs[i].rgb(self.animation.update(i, len(self.rgbs))) packet = self.header[:] packet.append(self.sequence) # Sequence, packet.append(0x00) # Physical packet.append(self.universe & 0xFF) # Universe LowByte packet.append(self.universe >> 8 & 0xFF) # Universe HighByte packet.extend(struct.pack('>h', len(self.data))) # Pack the number of channels Big endian packet.extend(self.data) self.socket.sendto(packet, (self.host, self.port)) # print(f"sent {len(packet)} bytes, {threading.get_native_id()}") self.sequence += 1 if self.sequence > 255: self.sequence = 1 def set(self, slot, value): self.data[slot - 1] = value def setAnimation(self, animation): if not animation: self.updating = False self.thread.join() # one frame black self.animation = Steady(0, 0, 0) self.update() else: self.animation = animation self.start() print(f"Animation: {animation}", file=sys.stderr)