120 lines
3.5 KiB
Python
120 lines
3.5 KiB
Python
|
import colorsys
|
||
|
import socket
|
||
|
import struct
|
||
|
from threading import Thread
|
||
|
from time import sleep, time
|
||
|
|
||
|
|
||
|
class RGB:
|
||
|
def __init__(self, dmx, slot, offset=0):
|
||
|
self.dmx = dmx
|
||
|
self.slot = slot
|
||
|
self.offset = offset
|
||
|
|
||
|
def rgb(self, color):
|
||
|
(r, g, b) = color
|
||
|
self.dmx.set(self.slot+self.offset+0, r)
|
||
|
self.dmx.set(self.slot+self.offset+1, g)
|
||
|
self.dmx.set(self.slot+self.offset+2, b)
|
||
|
|
||
|
class Bar252(RGB):
|
||
|
def __init__(self, dmx, slot=1):
|
||
|
super(Bar252, self).__init__(dmx, slot, 2)
|
||
|
dmx.set(self.slot+0, 81)
|
||
|
dmx.set(self.slot+1, 0)
|
||
|
|
||
|
|
||
|
class StairvilleLedPar56(RGB):
|
||
|
def __init__(self, dmx, slot=1):
|
||
|
super(StairvilleLedPar56, self).__init__(dmx, slot, 0)
|
||
|
dmx.set(self.slot+3, 0)
|
||
|
dmx.set(self.slot+4, 0)
|
||
|
dmx.set(self.slot+5, 0)
|
||
|
dmx.set(self.slot+6, 255)
|
||
|
|
||
|
|
||
|
class Steady:
|
||
|
def __init__(self, r, g, b):
|
||
|
self.r = r
|
||
|
self.g = g
|
||
|
self.b = b
|
||
|
|
||
|
def update(self, index, count):
|
||
|
return (self.r, self.g, self.b)
|
||
|
|
||
|
def __str__(self):
|
||
|
return f"steady({self.r}, {self.g}, {self.b})"
|
||
|
|
||
|
class RotatingRainbow:
|
||
|
def __init__(self):
|
||
|
pass
|
||
|
|
||
|
def update(self, index, count):
|
||
|
"""
|
||
|
One full round takes 10 seconds, each RGB is offset in a circle
|
||
|
:param index:
|
||
|
:param count:
|
||
|
:return:
|
||
|
"""
|
||
|
hue = (time() / 10.0 + (index + 0.0) / count) % 1.0
|
||
|
rgb = self.hsv_to_rgb(hue, 1, 1)
|
||
|
return rgb
|
||
|
|
||
|
def hsv_to_rgb(self, h, s, v):
|
||
|
(r, g, b) = colorsys.hsv_to_rgb(h, s, v)
|
||
|
return [int(r * 255), int(g * 255), int(b * 255)]
|
||
|
|
||
|
def __str__(self):
|
||
|
return "RotatingRainbow"
|
||
|
|
||
|
class DMX:
|
||
|
def __init__(self, host, port=0x1936, universe=1):
|
||
|
self.host = host
|
||
|
self.port = port
|
||
|
self.universe = universe
|
||
|
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
|
||
|
self.socket.setblocking(False)
|
||
|
self.data = bytearray(512)
|
||
|
packet = bytearray()
|
||
|
packet.extend(map(ord, "Art-Net"))
|
||
|
packet.append(0x00) # Null terminate Art-Net
|
||
|
packet.extend([0x00, 0x50]) # Opcode ArtDMX 0x5000 (Little endian)
|
||
|
packet.extend([0x00, 0x0e]) # Protocol version 14
|
||
|
self.header = packet
|
||
|
self.animation = Steady(64, 64, 64)
|
||
|
self.rgbs = []
|
||
|
|
||
|
def start(self):
|
||
|
self.thread = Thread(daemon=True, target=self.background)
|
||
|
self.thread.start()
|
||
|
|
||
|
def background(self):
|
||
|
print("background starts")
|
||
|
while True:
|
||
|
animation = self.animation
|
||
|
for i in range(0, len(self.rgbs)):
|
||
|
self.rgbs[i].rgb(animation.update(i, len(self.rgbs)))
|
||
|
self.update()
|
||
|
# print("updating")
|
||
|
print(self.data)
|
||
|
break
|
||
|
sleep(1.0/30)
|
||
|
print("background ends")
|
||
|
|
||
|
def update(self):
|
||
|
packet = self.header[:]
|
||
|
packet.append(0) # Sequence,
|
||
|
packet.append(0x00) # Physical
|
||
|
packet.append(self.universe & 0xFF) # Universe LowByte
|
||
|
packet.append(self.universe >> 8 & 0xFF) # Universe HighByte
|
||
|
|
||
|
packet.extend(struct.pack('>h', 512)) # Pack the number of channels Big endian
|
||
|
packet.extend(self.data)
|
||
|
self.socket.sendto(packet, (self.host, self.port))
|
||
|
|
||
|
def set(self, slot, value):
|
||
|
self.data[slot-1] = value
|
||
|
|
||
|
def setAnimation(self, animation):
|
||
|
self.animation = animation
|
||
|
print(f"Animation: {animation}")
|