import pyb, array, micropython, utime ########################################### # This is the main function for you to write yourself. # Musc1, musc2 and musc3 correspond to boards #1, #2 and #3 respectively. def muscle_fun (musc1_on, musc2_on, musc3_on, changed): ... your code here... ########################################### SAMPLE_FREQ=1000# Most of the usable energy is in 50-150Hz, and we want to # sample several points on each cycle of the wave. # This code runs fine up to 2500 samples/sec. NSAMPLES= SAMPLE_FREQ * 15 # So 15 seconds of operation and we're done # Global user-set parameters ########################################### f1 = 16 # How long the first (smoothing) filter is f2 = 64 # How long the second (baselining) filter is f3 = 100 # How many samples per segment ########################################### # Global user-set per-channel parameters ########################################### ch1_Schmidt0=130 # Call the signal a 0 when it dips lower than this. ch1_Schmidt1=220 # " " " " 1 " " " higher " ". ch2_Schmidt0=130 # Ditto, for channel 2 ch2_Schmidt1=220 ch3_Schmidt0=130 # And channel 3 ch3_Schmidt1=220 # Our final "RMS," doesn't take the sqrt or divide by f3. So compensate. # sqrt(N/f3)=S; N/f3=S**2; N=f3*S**2 ch1_Schmidt0=f3 * ch1_Schmidt0 * ch1_Schmidt0 ch1_Schmidt1=f3 * ch1_Schmidt1 * ch1_Schmidt1 ch2_Schmidt0=f3 * ch2_Schmidt0 * ch2_Schmidt0 ch2_Schmidt1=f3 * ch2_Schmidt1 * ch2_Schmidt1 ch3_Schmidt0=f3 * ch3_Schmidt0 * ch3_Schmidt0 ch3_Schmidt1=f3 * ch3_Schmidt1 * ch3_Schmidt1 ########################################### # Global variables ########################################### f1_ptr=0 f2_ptr=0 pos_in_current_segment = 0 # I.e., we're at the beginning of a segment. n_reads=0 # Number of reads since forever. f1_mask = f1-1 # If f1=16, then 0xF f2_mask = f2-1 # If f2=64, then 0x3F ########################################### # Global per-channel variables ########################################### # 2B/entry, holds the last 'f2' ADC values. ch1_circ_buffer1=array.array('H', (0 for _ in range(f1))) ch1_circ_buffer2=array.array('H', (0 for _ in range(f2))) ch1_f1_run_sum=0 # running sum for smoothing filter ch1_f2_run_sum=0 # running sum for baselining filter ch1_run_sumsqr=0 # running sum of baselined**2 on current segment ch1_On = False # Current state for the Schmidt trigger. ch1_ADC = pyb.ADC(pyb.Pin.board.X1) # The ADC reading pin X1 # Ditto, for channel 1 ch2_circ_buffer1=array.array('H', (0 for _ in range(f1))) ch2_circ_buffer2=array.array('H', (0 for _ in range(f2))) ch2_f1_run_sum=0 # running sum for smoothing filter ch2_f2_run_sum=0 # running sum for baselining filter ch2_run_sumsqr=0 # running sum of baselined**2 on current segment ch2_On = False # Current state for the Schmidt trigger. ch2_ADC = pyb.ADC(pyb.Pin.board.X22) # The ADC reading pin X1 # Ditto, for channel 2 ch3_circ_buffer1=array.array('H', (0 for _ in range(f1))) ch3_circ_buffer2=array.array('H', (0 for _ in range(f2))) ch3_f1_run_sum=0 # running sum for smoothing filter ch3_f2_run_sum=0 # running sum for baselining filter ch3_run_sumsqr=0 # running sum of baselined**2 on current segment ch3_On = False # Current state for the Schmidt trigger. ch3_ADC = pyb.ADC(pyb.Pin.board.X19) # The ADC reading pin X1 ########################################### # Library routines. ########################################### # Find i such that 1< Schmidt1)): #print ("On at n_reads=", n_reads, "and run_sumsqr=", run_sumsqr) return(True, True) if (On and (run_sumsqr < Schmidt0)): #print ("Off at n_reads=", n_reads, "and run_sumsqr=", run_sumsqr) return(False, True) return (On, False) ########################################### # The hierarchy of interrupt-driven routines ########################################### # Called every 1ms as an interrupt handler; cannot allocate or free any memory. def cb_irq(timer): micropython.schedule (cb, timer) # Called every 1ms by cb_irq(); *can* allocate or free memory. def cb(timer): global n_reads, f1_ptr, f2_ptr, pos_in_current_segment, \ ch1_f1_run_sum, ch1_f2_run_sum, ch1_run_sumsqr, ch1_On, \ ch2_f1_run_sum, ch2_f2_run_sum, ch2_run_sumsqr, ch2_On, \ ch3_f1_run_sum, ch3_f2_run_sum, ch3_run_sumsqr, ch3_On # Channel-zero processing val = ch1_ADC.read() # ADC for this reading f1_ptr = n_reads & f1_mask ch1_f1_run_sum += (val-ch1_circ_buffer1[f1_ptr]) #Sum of last 'f1' readings f1_out = ch1_f1_run_sum >> divide_f1 # avg over last 'f1' readings ch1_circ_buffer1[f1_ptr] = val f2_ptr = n_reads & f2_mask ch1_f2_run_sum += (val-ch1_circ_buffer2[f2_ptr]) #Sum of last 'f2' readings ch1_circ_buffer2[f2_ptr] = val baseline = ch1_f2_run_sum >> divide_f2 # avg over last 'f2' readings baselined = f1_out - baseline ch1_run_sumsqr += (baselined * baselined) # Channel-one processing val = ch2_ADC.read() # ADC for this reading f1_ptr = n_reads & f1_mask ch2_f1_run_sum += (val-ch2_circ_buffer1[f1_ptr]) #Sum of last 'f1' readings f1_out = ch2_f1_run_sum >> divide_f1 # avg over last 'f1' readings ch2_circ_buffer1[f1_ptr] = val f2_ptr = n_reads & f2_mask ch2_f2_run_sum += (val-ch2_circ_buffer2[f2_ptr]) #Sum of last 'f2' readings ch2_circ_buffer2[f2_ptr] = val baseline = ch2_f2_run_sum >> divide_f2 # avg over last 'f2' readings baselined = f1_out - baseline ch2_run_sumsqr += (baselined * baselined) # Channel-two processing val = ch3_ADC.read() # ADC for this reading f1_ptr = n_reads & f1_mask ch3_f1_run_sum += (val-ch3_circ_buffer1[f1_ptr]) #Sum of last 'f1' readings f1_out = ch3_f1_run_sum >> divide_f1 # avg over last 'f1' readings ch3_circ_buffer1[f1_ptr] = val f2_ptr = n_reads & f2_mask ch3_f2_run_sum += (val-ch3_circ_buffer2[f2_ptr]) #Sum of last 'f2' readings ch3_circ_buffer2[f2_ptr] = val baseline = ch3_f2_run_sum >> divide_f2 # avg over last 'f2' readings baselined = f1_out - baseline ch3_run_sumsqr += (baselined * baselined) # Pos_in_current_segment counts cyclically in [0,f3) to flag when we're # done with a segment. pos_in_current_segment += 1 if (pos_in_current_segment==f3): pos_in_current_segment=0 ch1_On,changed0=Schmidt(ch1_On,ch1_run_sumsqr,ch1_Schmidt0,ch1_Schmidt1) ch2_On,changed1=Schmidt(ch2_On,ch2_run_sumsqr,ch2_Schmidt0,ch2_Schmidt1) ch3_On,changed2=Schmidt(ch3_On,ch3_run_sumsqr,ch3_Schmidt0,ch3_Schmidt1) muscle_fun (ch1_On, ch2_On, ch3_On, changed0 or changed1 or changed2) ch1_run_sumsqr=0; ch2_run_sumsqr=0; ch3_run_sumsqr=0 n_reads += 1 if (n_reads>NSAMPLES): timer.deinit() def main(): micropython.alloc_emergency_exception_buf(100) global divide_f1, divide_f2 divide_f1 = shiftR_amount (f1) # If f1=16, then 4 (since >>4 is /16) divide_f2 = shiftR_amount (f2) # If f2=64, then 6 (since >>6 is /64) # Set up the timer. tim = pyb.Timer(4) tim.init(freq=SAMPLE_FREQ, callback=cb_irq) print ("Setup timer") time1 = utime.ticks_ms() # Tim.init() is non-blocking, and so returns immediately. # Use this infinite loop to wait until we've collected NSAMPLES samples. print ("Running") while n_reads < NSAMPLES: pass time2 = utime.ticks_ms() print ("Done sampling") expected = NSAMPLES/SAMPLE_FREQ actual = (time2-time1)/1000 print("Expected {} samples at {}/sec={}sec; got {}sec, off by {}%".format( NSAMPLES,SAMPLE_FREQ, expected,actual, 100*(expected-actual)/expected)) main()