import pyb, array, micropython, utime ########################################### # This is the main function for you to write yourself. def muscle_fun (musc1_on, changed): ... your code here... ########################################### SAMPLE_FREQ=1000# Most of the usable energy is in 50-150Hz, and we want to # sample several points on each cycle of the wave. # This code runs fine up to 2500 samples/sec. NSAMPLES= SAMPLE_FREQ * 15 # So 15 seconds of operation and we're done ########################################### # Global user-set parameters ########################################### f1 = 16 # How long the first (smoothing) filter is f2 = 64 # How long the second (baselining) filter is f3 = 100 # How many samples per segment ########################################### # Global user-set per-channel parameters ########################################### ch1_Schmidt0=130 # Call the signal a 0 when it dips lower than this. ch1_Schmidt1=220 # " " " " 1 " " " higher " ". # Our final "RMS," doesn't take the sqrt or divide by f3. So compensate. # sqrt(N/f3)=S; N/f3=S**2; N=f3*S**2 ch1_Schmidt0=f3 * ch1_Schmidt0 * ch1_Schmidt0 ch1_Schmidt1=f3 * ch1_Schmidt1 * ch1_Schmidt1 ########################################### # Global variables ########################################### f1_ptr=0 f2_ptr=0 pos_in_current_segment = 0 # I.e., we're at the beginning of a segment. n_reads=0 # Number of reads since forever. f1_mask = f1-1 # If f1=16, then 0xF f2_mask = f2-1 # If f2=64, then 0x3F ########################################### # Global per-channel variables ########################################### # 2B/entry, holds the last 'f2' ADC values. ch1_circ_buffer1=array.array('H', (0 for _ in range(f1))) ch1_circ_buffer2=array.array('H', (0 for _ in range(f2))) ch1_f1_run_sum=0 # running sum for smoothing filter ch1_f2_run_sum=0 # running sum for baselining filter ch1_run_sumsqr=0 # running sum of baselined**2 on current segment ch1_On = False # Current state for the Schmidt trigger. ch1_ADC = pyb.ADC(pyb.Pin.board.X1) # The ADC reading pin X1 ########################################### # Library routines. ########################################### # Find i such that 1< Schmidt1)): #print ("On at n_reads=", n_reads, "and run_sumsqr=", run_sumsqr) return(True, True) if (On and (run_sumsqr < Schmidt0)): #print ("Off at n_reads=", n_reads, "and run_sumsqr=", run_sumsqr) return(False, True) return (On, False) ########################################### # The hierarchy of interrupt-driven routines ########################################### # Called every 1ms as an interrupt handler; cannot allocate or free any memory. def cb_irq(timer): micropython.schedule (cb, timer) # Called every 1ms by cb_irq(); *can* allocate or free memory. def cb(timer): global n_reads, f1_ptr, f2_ptr, pos_in_current_segment, ch1_f1_run_sum, \ ch1_f2_run_sum, ch1_run_sumsqr, ch1_On # Channel-zero processing val = ch1_ADC.read() # ADC for this reading f1_ptr = n_reads & f1_mask ch1_f1_run_sum += (val-ch1_circ_buffer1[f1_ptr]) #Sum of last 'f1' readings f1_out = ch1_f1_run_sum >> divide_f1 # avg over last 'f1' readings ch1_circ_buffer1[f1_ptr] = val f2_ptr = n_reads & f2_mask ch1_f2_run_sum += (val-ch1_circ_buffer2[f2_ptr]) #Sum of last 'f2' readings ch1_circ_buffer2[f2_ptr] = val baseline = ch1_f2_run_sum >> divide_f2 # avg over last 'f2' readings baselined = f1_out - baseline ch1_run_sumsqr += (baselined * baselined) # Pos_in_current_segment counts cyclically in [0,f3) to flag when we're # done with a segment. pos_in_current_segment += 1 if (pos_in_current_segment==f3): pos_in_current_segment=0 ch1_On,changed=Schmidt(ch1_On,ch1_run_sumsqr,ch1_Schmidt0,ch1_Schmidt1) muscle_fun (ch1_On, changed) # User's function to call. ch1_run_sumsqr=0 n_reads += 1 if (n_reads>NSAMPLES): timer.deinit() def main(): micropython.alloc_emergency_exception_buf(100) global divide_f1, divide_f2 divide_f1 = shiftR_amount (f1) # If f1=16, then 4 (since >>4 is /16) divide_f2 = shiftR_amount (f2) # If f2=64, then 6 (since >>6 is /64) # Set up the timer. tim = pyb.Timer(4) tim.init(freq=SAMPLE_FREQ, callback=cb_irq) print ("Setup timer") time1 = utime.ticks_ms() # Tim.init() is non-blocking, and so returns immediately. # Use this infinite loop to wait until we've collected NSAMPLES samples. print ("Running") while n_reads < NSAMPLES: pass time2 = utime.ticks_ms() print ("Done sampling") expected = NSAMPLES/SAMPLE_FREQ actual = (time2-time1)/1000 print("Expected {} samples at {}/sec={}sec; got {}sec, off by {}%".format( NSAMPLES,SAMPLE_FREQ, expected,actual, 100*(expected-actual)/expected)) main()