Raspberry Pi based RTL-SDR Input Level and In-Band Fluctuation Measurement Tool
Overview
- 고가의 장비를 시간율 측정을 위해서 장시간 운용하는 데서 오는 부담과 공간율 측정을 위해서 다수 위치에 설치해야 하는 비용적 부담을 해결하기 위해서, Raspberry Pi를 활용하여 Low-Cost Measurement Tool을 개발하는 데 그 목적이 있음.
- 임의의 주파수 대역의 신호를 수신하여 Input Level을 측정
- In-Band Fluctuation을 판단하는 지표로써 ITU-R BT.0000에서의 정의하고 있는 Standard Deviation 값 측정
Hardware
- Raspberry Pi Model B+
- Software Defined Radio Receiver USB Stick - RTL2832 w/R820T
- MCX Jack to SMA RF Cable Adapter
Installation
Implementation
```python #— Sungho JEON — 2014/12/23 2015/03/16 2015/04/23
Sensor Networks Example Raspberry Pi ThingSpeak Cloud Storage
import os
from rtlsdr import * # ADC USB 기기 불러옴
from gps import * # USB GPS 기기 불러옴 import threading
import time from datetime import datetime
import sys import httplib, urllib
import numpy as np import math
###############################################################################
gpsd = None #seting the global variable
###############################################################################
class GpsPoller(threading.Thread): #GPS 연동을 위한 부분 def init(self): threading.Thread.init(self) global gpsd #bring it in scope gpsd = gps(mode=WATCH_ENABLE) #starting the stream of info self.current_value = None self.running = True #setting the thread running to true
def run(self): global gpsd while gpsp.running: gpsd.next() #this will continue to loop and grab EACH set of gpsd info to clear the buffer
###############################################################################
def record_samples_to_ThingSpeak(input1): #측정 데이터를 서버에 업로드 하는 부분
print "Uploading to ThingSpeak ..."
# use your API key generated in the thingspeak channels for the value of 'key'
params = urllib.urlencode({'field1': input1, 'field2': sdr.center_freq, 'field3': gpsd.fix.latitude, 'field4': gpsd.fix.longitude, 'field5': gpsd.fix.altitude, 'key':'0D1VWX5IXVK7OKB0'})
# temp is the data you will be sending to the thingspeak channel for plotting the graph.
# You can add more than one channel and plot more graphs
headers = {"Content-typZZe": "application/x-www-form-urlencoded","Accept": "text/plain"}
conn = httplib.HTTPConnection("api.thingspeak.com:80")
try:
conn.request("POST", "/update", params, headers)
response = conn.getresponse()
print response.status, response.reason
data = response.read()
conn.close()
print "done."
except:
print "connection failed"
###############################################################################
def get_inband_level_std(): #수신전계레벨[dBm], 대역내주파수응답 표준편차
num_of_time_samples = pow(2,17)
fft_size = pow(2,8)
# initialize the variables
AvgInbandLevel = 0.
ChannelStdDev = 0.
Pxx = np.zeros((Nfft,1),dtype=float)
freq_center = sdr.center_freq
# Lower Side Band (LSB) sample
sdr.reset_buffer()
sdr.center_freq = freq_center - sdr.sample_rate/2 - 0.005e6
samples_time1 = sdr.read_samples(num_of_time_samples)*pow(10,-sdr.gain/20)
# Upper Side Band (USB) sample
sdr.reset_buffer()
sdr.center_freq = freq_center + sdr.sample_rate/2 + 0.005e6
samples_time2 = sdr.read_samples(num_of_time_samples)*pow(10,-sdr.gain/20)
#========= Received Field Strength in dBm
samples_time = np.concatenate((samples_time1,samples_time2),axis=0)
AvgInbandLevel = 10*np.log10(pow(abs(samples_time),2).mean(axis=0))
#========= In-band fluctuation in dtandard deviation
# calculating the averaged in-band fluctuation
samples_freq1 = np.fft.fft(samples_time1,fft_size)/fft_size
samples_freq2 = np.fft.fft(samples_time2,fft_size)/fft_size
samples_freq = np.concatenate((samples_freq1,samples_freq2),axis=0)
# Power Spectral Density over frequency
Pxx = 20*np.log10(abs(samples_freq))
# calculating the in-band fluctuation in terms of stadnard deviation [dB] # WindowsSize = 5 # Pxx = np.convolve( Pxx, np.ones((WindowsSize,),)/WindowsSize, mode='valid') # Pxx = Pxx[ WindowsSize: , ] # Pxx = Pxx[ -WindowsSize: , ]
ChannelStdDev = Pxx.std(axis=0)
del Pxx
return AvgInbandLevel, ChannelStdDev
###############################################################################
def main():
print "Start!!"
measuredata = open("Lab201_151123_dipole_707MHz.csv", 'a')
global sdr
sdr = RtlSdr()
sdr.sample_rate = 2.800e6 # in MHz, reliably sampling at a rate up to 2.8 MHz
sdr.gain = 40. # in dB, gain of RF amplifier at the receiver front-end
sdr.center_freq = 701.000e6 # DVB-T2 center frequency [Hz] 701. MBC # sdr.center_freq = 707.000e6 # DVB-T2 center frequency [Hz] 707. SBS # sdr.center_freq = 761.000e6 # DVB-T2 center frequency [Hz] 761. KBS
while True:
AvgInbandLevel,ChannelStdDev = get_inband_level_std()
record_samples_to_ThingSpeak(AvgInbandLevel,ChannelStdDev)
print datetime.today(), AvgInbandLevel, ChannelStdDev, sdr.center_freq/1.0e6, sdr.gain
print "..................", gpsd.fix.latitude, gpsd.fix.longitude, gpsd.fix.altitude
# write_samples into txt file
measuredata.write(datetime.today().strftime("%Y%m%d%H%M%S, \t"))
measuredata.write("%f, \t" % AvgInbandLevel)
measuredata.write("%f, \t" % sdr.center_freq)
measuredata.write("%f, \t" % sdr.gain)
measuredata.write("%f, \t" % gpsd.fix.latitude)
measuredata.write("%f, \t" % gpsd.fix.longitude)
measuredata.write("%f, \t" % gpsd.fix.altitude)
measuredata.write("\n")
sdr.close()
measuredata.close()
if name == ‘main’:
gpsp = GpsPoller()
try:
gpsp.start() # start it up
args = sys.argv[1:]
main(*args)
except IOError:
KeyboardInterrupt
finally:
main(*args)