+++ /dev/null
-#!/usr/bin/python3
-import RPi.GPIO as GPIO
-import sys
-import threading
-from time import sleep
-from time import strftime
-import time
-import SX1509
-import IO_Types
-from adafruit_bus_device.i2c_device import I2CDevice
-import board
-import busio
-from itertools import chain
-
-pins = [[4,5,6],[2,1,0],[10,9,8]]
-#pincomp = [[1.2,1.2,1.2],[.7,.7,.7],[1,1,1]]
-
-DEVICE_ADDRESS = 0x3E # device address of SX1509
-
-#Set up I2C
-comm_port = busio.I2C(board.SCL, board.SDA)
-device = I2CDevice(comm_port, DEVICE_ADDRESS)
-
-#Initialize the expander
-IOExpander = SX1509.SX1509(comm_port)
-IOExpander.clock(oscDivider = 4)
-IOExpander.debounceTime(32)
-
-#Set up pins
-for set in pins:
- for pin in set:
- IOExpander.pinMode(pin, IO_Types.PIN_TYPE_ANALOG_OUTPUT)
-#print('IO Expander Initialized')
-
-brightness = 1
-colors = [[0,0,0],[0,0,0],[0,0,0]]
-hold = 0
-stop = 0
-controlThreads = []
-
-colorInts = [ [100,0,0],
- [100,50,0],
- [100,100,0],
- [0,100,0],
- [0,50,100],
- [0,100,100],
- [0,0,100],
- [100,0,100],
- [100,100,100] ]
-
-
-def setColor(lamp,thread,r,g=-1,b=-1):
- global controlThreads
- if thread != controlThreads[lamp]:
- return
-# print("setting "+str(lamp)+" to "+str([r,g,b])+" by "+str(thread)+" (with "+str(controlThreads)+")")
- if g == -1:
- setColor(lamp,thread,r[0],r[1],r[2])
- else:
- global colors
- colors[lamp] = [r,g,b]
- pincomp = [1,2,3]
- compfile = open("comp", "r")
- compdata = compfile.read()
- compdata = compdata.split("\n")
- for i in range(0,3):
- pincomp[i] = compdata[i].split(",")
- pinmul = [1,2,3]
- mulfile = open("mul", "r")
- muldata = mulfile.read()
- muldata = muldata.split("\n")
- for i in range(0,3):
- pinmul[i] = muldata[i].split(",")
-
- set = pins[lamp]
- comp = pincomp[lamp]
- mul = pinmul[lamp]
- color = [r,g,b]
- for j in range(0,3):
- color[j] = int(color[j] * brightness * float(mul[j]) + int(comp[j]))
- IOExpander.analogWrite(set[j], color[j])
-
-
-def fadeTo(lamp,thread,speed, dip, r,g=-1,b=-1):
- if g == -1:
- fadeTo(lamp,thread,speed, dip, r[0],r[1],r[2])
- else:
- fade(lamp,thread,speed, colors[lamp], [r, g, b], dip)
-
-def fade(lamp,thread,speed, c1, c2, dip):
- for a in range(0,100):
- if dip == 1:
-# if a < 33:
-# pct1 = (67-a)/67
-# pct2 = 0
-# elif a < 67:
-# pct1 = (67-a)/67
-# pct2 = (a-33)/67
-# else:
-# pct1 = 0
-# pct2 = (a-33)/67
- if a < 10:
- pct1 = (90-a)/90
- pct2 = 0
- elif a < 90:
- pct1 = (90-a)/90
- pct2 = (a-10)/90
- else:
- pct1 = 0
- pct2 = (a-10)/90
-
- else:
- pct1 = (100-a)/100
- pct2 = (a)/100
-
-
- setColor(lamp,thread,c1[0]*pct1+c2[0]*pct2, c1[1]*pct1+c2[1]*pct2, c1[2]*pct1+c2[2]*pct2)
- sleep(speed)
-
-
-def loading(speed, cycles, lamp):
- for red in range(0,100):
- setColor(lamp,red,0,0)
- sleep(speed)
- for x in range(0,cycles):
- for green in range(0,100):
- setColor(lamp,100-green,green,0)
- sleep(speed)
- for blue in range(0,100):
- setColor(lamp,0,100-blue,blue)
- sleep(speed)
- for red in range(0,100):
- setColor(lamp,red,0,100-red)
- sleep(speed)
- for red in range(0,100):
- setColor(lamp,100-red,0,0)
- sleep(speed)
-
-def rainbow(speed, cycles, lamp):
- for red in range(0,100):
- setColor(lamp,red,0,0)
- sleep(speed)
- sleep(1)
- for x in range(0,cycles):
- for orange in range(0,50):
- setColor(lamp,100,orange*0.9,0)
- sleep(speed*2)
- sleep(1)
- for yellow in range(0,50):
- setColor(lamp,100,50+yellow,0)
- sleep(speed*2)
- sleep(1)
- for green in range(0,100):
- setColor(lamp,100-green,100,12-0.12*green)
- sleep(speed)
- sleep(1)
- for blue in range(0,100):
- setColor(lamp,0,100-blue,blue)
- sleep(speed)
- sleep(1)
- for purple in range(0,100):
- setColor(lamp,purple,0,100)
- sleep(speed)
- sleep(1)
- for red in range(0,100):
- setColor(lamp,100,0,100-red)
- sleep(speed)
- sleep(1)
- for clear in range(0,100):
- setColor(lamp,100-clear,0,0)
- sleep(speed)
-
-def pair(lamp,thread,speed, cycles, c1, c2, dip):
- global hold
- for x in range(0,cycles):
- if hold == 0:
- fadeTo(lamp,thread,speed, dip, c1)
- sleep(0.7)
- fadeTo(lamp,thread,speed, dip, c2)
- sleep(0.3)
-
-def trio(lamp,thread,speed, c1,c2,c3, dip):
- global stop
- if stop == 0:
- fadeTo(lamp, thread, speed, dip, c1)
- sleep(0.3)
- fadeTo(lamp, thread, speed, dip, [int(c2[0]*0.9), int(c2[1]*0.9), int(c2[2]*0.9)])
- sleep(0.3)
- fadeTo(lamp, thread, speed, dip, c3)
- sleep(0.3)
-
-def opposeChatter(lamp,delay,threadid, opplamps):
- global stop
- global colors
- while stop == 0:
- color = [200,200,200]
- for i in opplamps:
- color[0] = color[0] - colors[i][0]
- color[1] = color[1] - colors[i][1]
- color[2] = color[2] - colors[i][2]
- setColor(lamp, threadid, color)
- sleep(delay)
-
-def securityChatter(lamp,delay, cycles, threadid):
- global stop
- global secdata
- while stop == 0:
-# secfile = open("security.lst", "r")
-# secdata = secfile.read()
-# secdataarr = secdata.split("\n")
- for i in range(0,len(secdata)):
- for j in range(0, len(secdata[i])):
- if stop == 1:
- break
- colors = secdata[i][j]
- if colors is None or colors == "" or colors == []:
-# print("No color for: "+str(secdata[i]))
- continue
-# print("pair "+str(lamp)+","+str(threadid)+","+str(delay)+","+str(cycles)+","+str(i)+","+str(colors)+",0");
-# print(colors)
-# pair(lamp,threadid,delay,cycles,colorInts[i],colorInts[j], 1)
-# fadeTo(lamp,threadid,delay,0, colors[0],colors[1],colors[2])
- trio(lamp,threadid,delay, colorInts[i],colorInts[j],colors, 0)
- fadeTo(lamp,threadid,delay,0, 100,100,100)
-
-def systemsChatter(lamp,delay, cycles, threadid):
- global stop
- while stop == 0:
- for i in range(0,len(sysdata)):
- if stop == 1:
- break
- if sysdata[i] is None or sysdata[i] == '':
- continue
- if sysdata[i].cpu == -1:
- continue
- if sysdata[i].mem == -1:
- continue
- if sysdata[i].fs == -1:
- continue
-
- r = sysdata[i].mem
- g = sysdata[i].fs
- b = sysdata[i].cpu
-
- for j in range(0,len(hostList)):
- if sysdata[i].host in hostList[j]:
- id1 = colorInts[j]
- id2 = colorInts[hostList[j].index(sysdata[i].host)]
-# id2 = [ int(id2[0]*0.6), int(id2[1]*0.6), int(id2[2]*0.6)]
-
-# pair(lamp,threadid,delay, 1, id1, id2, 1)
-# fadeTo(lamp,threadid,delay,0, r,g,b)
-# sleep(0.5)
-# fadeTo(lamp,threadid,delay,0, 0,0,0)
- trio(lamp,threadid,delay, id1,id2,[r,g,b], 0)
- fadeTo(lamp,threadid,delay,0, 100,100,100)
-
-def gasp(lamp,delay, r, g, b, threadid):
-# print("gasp: "+str(lamp)+": )"+str(r)+","+str(g)+","+str(b)+")")
- fadeTo(lamp,threadid,delay,0, 0,0,0)
- sleep(0.3)
- fadeTo(lamp,threadid,delay,0, r,g,b)
- sleep(1)
- fadeTo(lamp,threadid,delay,0, 0,0,0)
- sleep(0.3)
-
-def chime(lamp,delay, threadid):
- hours = int(strftime("%I"))
- fadeTo(lamp,threadid,delay,0, 0,0,0)
- sleep(2)
- for i in range(0,hours):
- fadeTo(lamp,threadid,delay,0, 100,100,100)
- fadeTo(lamp,threadid,delay,0, 0,0,0)
- sleep(0.1)
- sleep(1)
-
-
-hostList = [ ["cns-master", "cns-bedroom", "cns-fdoor", "001", "fdoor" ],
- ["cns-memory", "www.slightlycyberpunk.com", "mail.bsflowers.net", "git.slightlycyberpunk.com", "mastodon", "collabora", "nextcloud" ],
- ["pfSense.slightlycyberpunk.com", "vhost1", "vhost2", "vhost3"]
- ]
-syssumm = [0,0,0]
-sysdata = []
-def refreshSysData():
- global syssumm
- global sysdata
- sysdata = [type('',(object,),{"cpu": -1, "mem": -1, "fs": -1})() for i in range(0,25)]
- syssumm = [0,0,0]
- lampfile = open("/home/pi/framework/lamp.lst", "r")
- lampdata = lampfile.read()
- lampdata = lampdata.split("\n")
- for i in range(0,len(lampdata)):
- if lampdata[i] == "":
- continue
-
- line = lampdata[i].split("|")
-
- host = 0
- for j in range(0,len(hostList)):
- if line[0] in hostList[j]:
- host += hostList[j].index(line[0])
- else:
- host += len(hostList[j])
-
- sysobj = sysdata[host]
- sysobj.host = line[0]
-
- if line[1] == "cpu":
- sysobj.cpu = int(line[2])
- elif line[1] == "mem":
- sysobj.mem = int(float(line[2])*100)
- elif 'fs' in line[1] and sysobj.fs < int(line[2]):
- sysobj.fs = int(line[2])
-
- sysdata[host] = sysobj;
-
- warn = 0
- crit = 0
- for i in range(0,len(sysdata)):
- if sysdata[i].cpu > 50 or sysdata[i].mem > 50 or sysdata[i].fs > 70:
- warn+=1
- if sysdata[i].cpu > 85 or sysdata[i].mem > 85 or sysdata[i].fs > 90:
- crit+=1
- if crit > 0:
- syssumm = [100,0,0]
- elif warn > 0:
- syssumm = [100,100,0]
- else:
- syssumm = [0,100,0]
-
-sensorlist = [ "audio", "camera", "PRESENCE", "sessions", "sensors" ]
-seccnts = [0,0,0,0]
-secsumm = [100,100,0]
-flathostList = list(chain.from_iterable(hostList))
-secdata = [["" for j in range(0, len(flathostList))] for i in range(0, len(sensorlist))]
-def refreshSecData():
- global secsumm
- global secdata
-
-# flathostList = list(chain.from_iterable(hostList))
-# for i in range(0, len(flathostList)):
-# secdata[i] = ["","",""]
-
- lampfile = open("/home/pi/framework/lampsec.lst", "r")
- lampdata = lampfile.read()
- lampdata = lampdata.split("\n")
- for line in lampdata:
- if line == "":
- continue
-
- print(line)
-
- line = line.split("|")
- sensor = line[0]
- host = line[1]
- value = line[2]
- timestamp = int(line[3])
- tdiff = timestamp - int(time.time()) - 100
- if tdiff > 100:
- tdiff = 100
- if tdiff < 0:
- tdiff = 0
-
-# flathostList = list(chain.from_iterable(hostList))
-# print("SECDATA|SENSOR|SENSOR-IND|FLATLIST|HOST|HOST-IND")
-# print(secdata)
-# print(sensor)
-# print(sensorlist.index(sensor))
-# print(flathostList)
-# print(host)
-# print(flathostList.index(host))
- if value == "WARN":
- secdata[sensorlist.index(sensor)][flathostList.index(host)] = [100-tdiff, 100, 0]
- if secsumm[2] > tdiff:
- secsumm[2] = tdiff
- elif value == "CRIT":
- secdata[sensorlist.index(sensor)][flathostList.index(host)] = [100-tdiff, tdiff, 0]
- if secsumm[1] > tdiff:
- secsumm[1] = tdiff
- elif value == "OK":
- secdata[sensorlist.index(sensor)][flathostList.index(host)] = [tdiff, 100-tdiff, 0]
- if secsumm[1] > 0:
- secsumm = [100 - secsumm[1], secsumm[1], 0]
- elif secsumm[2] > 0:
- secsumm = [100 - secsumm[2], 100, 0]
- else:
- secsumm = [0, 100, 0]
-
-mode="off"
-def refreshMode():
- global mode
- modefile = open("mode", "r")
- modedata = modefile.read()
- mode = modedata.split("\n")[0]
-
-try:
- refreshSysData()
- refreshSecData()
- refreshMode()
- controlThreads = [0,1,2]
- #if mode == "chatter":
- chat = [ threading.Thread(target=systemsChatter,args=(0,0.02,2,0)),
- threading.Thread(target=securityChatter,args=(1,0.022,2,1)),
- threading.Thread(target=opposeChatter,args=(2,0.015,2,[0,1])) ]
- chat[0].start()
- chat[1].start()
- chat[2].start()
- #elif mode == "off":
- # setColor(0,0,0,0,0)
- # setColor(1,1,0,0,0)
- # setColor(2,2,0,0,0)
- #elif mode == "on":
- # setColor(0,0,90,90,90)
- # setColor(1,1,90,90,90)
- # setColor(1,1,90,90,90)
-
- while stop == 0:
- if mode == "off":
- controlThreads = [3,3,3]
- setColor(0,3,0,0,0)
- setColor(1,3,0,0,0)
- setColor(2,3,0,0,0)
- elif mode == "on":
- controlThreads = [4,4,4]
- setColor(0,4,100,100,100)
- setColor(1,4,100,100,100)
- setColor(2,4,100,100,100)
- elif mode == "chatter":
- controlThreads = [0,1,2]
- if int(strftime("%S")) == 0:
- if int(strftime("%M")) == 0:
- controlThreads = [10,11,12]
- chimes = [ threading.Thread(target=chime,args=(0,0.003,10)),
- threading.Thread(target=chime,args=(1,0.003,11)),
- threading.Thread(target=chime,args=(2,0.003,12)) ]
- chimes[0].start()
- chimes[1].start()
- chimes[2].start()
- chimes[0].join()
- chimes[1].join()
- chimes[2].join()
- controlThreads = [0,1,2]
- else:
- gasps = [ threading.Thread(target=gasp,args=(0,0.01,syssumm[0],syssumm[1],syssumm[2],20)),
- threading.Thread(target=gasp,args=(1,0.01,0,100,0,21)),
- threading.Thread(target=gasp,args=(2,0.01,100,0,100,22)) ]
- controlThreads = [20,21,22]
- gasps[0].start()
- gasps[1].start()
- gasps[2].start()
- gasps[0].join()
- gasps[1].join()
- gasps[2].join()
- controlThreads = [0,1,2]
- sleep(1)
- refreshSysData()
- refreshSecData()
- refreshMode()
-
- print("Waiting for threads to terminate...\n")
- chat[0].join()
- print("One left...\n")
- chat[1].join()
- print("DONE!\n")
- chat[2].join()
-
-except KeyboardInterrupt:
- stop = 1
- print("Interrupted")
-
-finally:
- GPIO.cleanup()
-