heating-control/code_sniffing/extract.py
2019-01-02 21:11:36 +00:00

134 lines
3 KiB
Python
Executable file

#!/usr/bin/python3
#This script parses and audacity 'sample data' file, and extracts a heating control code to send to the boiler.
#It is very poorly coded and badly commented. You only have to make it work twice though (for on and off).
import numpy as np
import matplotlib.pyplot as plt
#load the data
time, value = np.genfromtxt('./sample-data.txt', delimiter=None, dtype=None, unpack=True)
#digitise the data
digital = []
for x in value:
if x > 0:
digital.append(1)
else:
digital.append(0)
if len(value) != len(digital):
raise(ValueError('panic'))
lentime = time[-1] - time[0]
itemstime = len(time)
sfreq = lentime / itemstime
#Convert the data to the lengths of time each peak lasts for.
changes = []
changes.append(time[0])
nexts = []
nexts.append(digital[0])
last = digital[0]
lastt = time[0]
#how long is each high or low sent for?
for xtime, x in zip(time, digital):
if x != last:
changes.append(lastt)
nexts.append(last)
changes.append(lastt)
nexts.append(x)
last = x
lastt = xtime
else:
last = x
lastt = xtime
#print('states\n')
#print(nexts)
#print('state changes\n')
#print(changes)
#Calculate intervals between the changes.
intervals = [y - x for x,y in zip(changes,changes[1:])]
#print('intervals\n')
#print(intervals)
nnexts = []
nints = []
for n, i in zip(nexts,intervals):
if i != 0:
nnexts.append(n)
nints.append(i)
ntimes = np.array(nints, dtype='float_')
nstates = np.array(nnexts, dtype='bool_')
ntimes = ntimes * 1000000
print('BEFORE - This is the raw timings.\n')
print(ntimes)
#This sets the binning time. I can't quite remember why 2000.
good_vals = np.arange(0,2000,20)
mtimes = []
for item in ntimes:
idx = (np.abs(good_vals-item)).argmin()
mtimes.append(good_vals[idx])
mtimes = np.array(mtimes, dtype='float_')
togcd = np.array(mtimes, dtype='int_')
best = np.gcd.reduce(togcd)
waittime = best
print('AFTER\n - This is the binned times, along with thier gcd to be the interval.')
print(mtimes)
print(best)
sends = mtimes / waittime
print('numbers\n')
print(sends)
#Convert to zeros and ones for the RF binary.
zeroone = []
for t, am in zip(nnexts, sends):
lst = [int(t)] * int(am)
if not lst:
lst = [0]
zeroone.extend(lst)
zeroones = list(zeroone)
print('\nTHIS IS THE OUTPUT STRING\n')
print(''.join(map(str, zeroones)))
print('\nTHIS IS THE DIVISOR\n')
print(best)
check = np.cumsum(np.array(sends, dtype='int_') * best)
check = check / 1000000
#Do some plotting of the data, to see how the orginal wave compares to what we are outputing. Use this to tweak the binning interval.
xmin = time[0]
xmax = time[-1]
f, (ax1, ax2, ax3, ax4) = plt.subplots(4, 1, sharex=True)
ax1.plot(time, value)
ax2.plot(time, digital)
ax3.plot(changes, nexts)
ax4.plot(check, nnexts)
ax1.set_xlim([xmin, xmax])
ax2.set_xlim([xmin, xmax])
ax3.set_xlim([xmin, xmax])
ax4.set_xlim([xmin, xmax])
f2, (ax5) = plt.subplots(1, 1, sharex=True)
ax5.plot(changes, nexts)
ax5.plot(check, nnexts)
plt.show()