Rohde & Schwarz Instruments
Control Rohde & Schwarz spectrum analyzers, signal generators, oscilloscopes, and network analyzers using PyVISA with SCPI commands and examples.
Rohde & Schwarz instrument control with PyVISA through examples, R&S-specific SCPI commands, and test automation.
Supported R&S Instruments
Spectrum Analyzers
- FSW Series: High-end signal and spectrum analyzers (FSW8, FSW26, FSW43, FSW67, FSW85)
- FSV Series: Mid-range spectrum analyzers (FSV3000, FSV4000)
- FPS Series: Cost-effective spectrum analyzers
- FSVA Series: Vector spectrum analyzers
Signal Generators
- SMW Series: Vector signal generators (SMW200A)
- SMB Series: Analog signal generators (SMB100A, SMB100B)
- SMBV Series: Vector signal generators (SMBV100A, SMBV100B)
- SGT Series: Vector signal generators for 5G/6G
Oscilloscopes
- RTO Series: High-end oscilloscopes (RTO2000, RTO6000)
- RTB Series: Mid-range oscilloscopes (RTB2000)
- RTM Series: Entry-level oscilloscopes (RTM3000)
Network Analyzers
- ZNA Series: High-end vector network analyzers
- ZNB Series: Mid-range vector network analyzers
- ZNC Series: Cost-effective network analyzers
Basic Connection and Setup
R&S-Specific Connection Handling
import pyvisa
import numpy as np
import time
import re
class RohdeSchwartzInstrument:
"""Base class for Rohde & Schwarz instruments"""
def __init__(self, resource_string):
self.rm = pyvisa.ResourceManager()
self.instrument = self.rm.open_resource(resource_string)
# R&S-specific settings
self.instrument.timeout = 30000 # 30 seconds (R&S instruments can be slow)
self.instrument.read_termination = '\n'
self.instrument.write_termination = '\n'
# Verify connection and get instrument info
try:
self.idn = self.instrument.query('*IDN?')
print(f"Connected to: {self.idn.strip()}")
# Parse R&S instrument info
idn_parts = self.idn.split(',')
self.manufacturer = idn_parts[0].strip()
self.model = idn_parts[1].strip() if len(idn_parts) > 1 else "Unknown"
self.serial = idn_parts[2].strip() if len(idn_parts) > 2 else "Unknown"
self.firmware = idn_parts[3].strip() if len(idn_parts) > 3 else "Unknown"
# Common R&S initialization
self.instrument.write('*RST') # Reset
self.instrument.write('*CLS') # Clear status
# Set SCPI error verbosity
self.instrument.write('SYST:ERR:VERB ON')
except Exception as e:
print(f"Connection failed: {e}")
raise
def check_errors(self):
"""Check for SCPI errors (R&S specific format)"""
errors = []
try:
while True:
error_response = self.instrument.query('SYST:ERR?')
if '0,"No error"' in error_response or error_response.startswith('0,'):
break
errors.append(error_response.strip())
# Prevent infinite loop
if len(errors) > 100:
errors.append("Too many errors, stopping error check")
break
except Exception as e:
errors.append(f"Error checking failed: {e}")
return errors
def wait_for_operation_complete(self, timeout=60):
"""Wait for operation to complete with timeout"""
self.instrument.write('*OPC') # Set operation complete bit
start_time = time.time()
while time.time() - start_time < timeout:
try:
# Check status byte
status = int(self.instrument.query('*ESR?'))
if status & 1: # Operation complete bit
return True
time.sleep(0.1)
except:
time.sleep(0.1)
raise TimeoutError(f"Operation did not complete within {timeout} seconds")
def close(self):
"""Close instrument connection"""
self.instrument.close()
self.rm.close()
# Connection examples for R&S instruments
# Ethernet: 'TCPIP::192.168.1.100::hislip0::INSTR' (HiSLIP protocol)
# USB: 'USB0::0x0AAD::0x0054::123456::INSTR'
# GPIB: 'GPIB0::20::INSTR'
FSW/FSV Spectrum Analyzer Control
Advanced Spectrum Analysis
class RohdeSchwartzSpectrumAnalyzer(RohdeSchwartzInstrument):
"""R&S Spectrum Analyzer controller (FSW, FSV, FPS series)"""
def __init__(self, resource_string):
super().__init__(resource_string)
# Configure for optimal performance
self.instrument.write('FORM ASC') # ASCII format for queries
self.instrument.write('INIT:CONT OFF') # Single sweep mode
# Get instrument capabilities
self.frequency_range = self._get_frequency_range()
print(f"Frequency range: {self.frequency_range}")
def _get_frequency_range(self):
"""Get instrument frequency range"""
try:
freq_min = float(self.instrument.query('FREQ:START? MIN'))
freq_max = float(self.instrument.query('FREQ:STOP? MAX'))
return {'min': freq_min, 'max': freq_max}
except:
return {'min': 9e3, 'max': 26.5e9} # FSW default
def configure_spectrum(self, center_freq=1e9, span=100e6, rbw=1e6, vbw=None):
"""Configure basic spectrum analyzer settings"""
# Frequency settings
self.instrument.write(f'FREQ:CENT {center_freq}')
self.instrument.write(f'FREQ:SPAN {span}')
# Bandwidth settings
self.instrument.write(f'BAND:RES {rbw}')
if vbw:
self.instrument.write(f'BAND:VID {vbw}')
else:
self.instrument.write('BAND:VID:AUTO ON')
# Sweep settings
self.instrument.write('SWE:MODE AUTO') # Auto sweep mode
self.instrument.write('AVER:COUN 1') # No averaging initially
def set_reference_level(self, ref_level=0):
"""Set reference level"""
self.instrument.write(f'DISP:WIND:TRAC:Y:RLEV {ref_level}')
def set_attenuation(self, attenuation=10):
"""Set input attenuation"""
self.instrument.write(f'INP:ATT {attenuation}')
def acquire_trace(self, trace_number=1):
"""Acquire spectrum trace"""
# Start measurement
self.instrument.write('INIT:IMM')
self.wait_for_operation_complete()
# Get trace data
trace_data = self.instrument.query_ascii_values(f'TRAC{trace_number}? TRACE{trace_number}')
# Get frequency axis
center_freq = float(self.instrument.query('FREQ:CENT?'))
span = float(self.instrument.query('FREQ:SPAN?'))
num_points = len(trace_data)
frequencies = np.linspace(
center_freq - span/2,
center_freq + span/2,
num_points
)
return frequencies, np.array(trace_data)
def peak_search(self, trace_number=1, threshold=-50):
"""Perform peak search"""
# Configure peak search
self.instrument.write(f'CALC:MARK:FUNC:POW:SEL PEAK')
self.instrument.write(f'CALC:MARK:TRAC {trace_number}')
# Set threshold if supported
try:
self.instrument.write(f'CALC:MARK:PEAK:THR {threshold}')
except:
pass # Not all models support threshold
# Perform peak search
self.instrument.write('CALC:MARK:MAX')
# Get peak results
peak_freq = float(self.instrument.query('CALC:MARK:X?'))
peak_level = float(self.instrument.query('CALC:MARK:Y?'))
return {
'frequency': peak_freq,
'level': peak_level,
'marker_number': 1
}
def multi_peak_search(self, num_peaks=10, trace_number=1):
"""Find multiple peaks"""
# Enable peak list
self.instrument.write('CALC:MARK:FUNC:POW:SEL PLIS')
self.instrument.write(f'CALC:MARK:TRAC {trace_number}')
# Configure peak list
self.instrument.write(f'CALC:MARK:PLIS:SIZE {num_peaks}')
# Execute peak search
self.instrument.write('CALC:MARK:FUNC:POW:EXEC PLIS')
self.wait_for_operation_complete()
# Get peak list
peaks = []
try:
peak_data = self.instrument.query('CALC:MARK:PLIS:DATA?')
# Parse peak data (frequency, level pairs)
values = [float(x) for x in peak_data.split(',')]
for i in range(0, len(values), 2):
if i + 1 < len(values):
peaks.append({
'frequency': values[i],
'level': values[i + 1],
'peak_number': i // 2 + 1
})
except Exception as e:
print(f"Peak list error: {e}")
return peaks
def phase_noise_measurement(self, carrier_freq=1e9, offset_frequencies=None):
"""Phase noise measurement"""
if offset_frequencies is None:
offset_frequencies = [1e3, 10e3, 100e3, 1e6] # Default offsets
# Switch to phase noise mode (if supported)
try:
self.instrument.write('INST:SEL PNOI') # Phase noise mode
self.instrument.write(f'FREQ:CARR {carrier_freq}')
phase_noise_results = []
for offset_freq in offset_frequencies:
# Set offset frequency
self.instrument.write(f'FREQ:OFFS {offset_freq}')
# Start measurement
self.instrument.write('INIT:IMM')
self.wait_for_operation_complete()
# Read phase noise value
pn_value = float(self.instrument.query('CALC:MARK:Y?'))
phase_noise_results.append({
'offset_frequency': offset_freq,
'phase_noise_dbc_hz': pn_value
})
return phase_noise_results
except Exception as e:
print(f"Phase noise measurement not available: {e}")
return None
def emi_measurement(self, start_freq=30e6, stop_freq=1e9, detector='PEAK'):
"""EMI measurement setup"""
# Switch to EMI mode (if supported)
try:
self.instrument.write('INST:SEL EMI')
# Configure frequency range
self.instrument.write(f'FREQ:START {start_freq}')
self.instrument.write(f'FREQ:STOP {stop_freq}')
# Set detector (PEAK, QPE, AVER, etc.)
self.instrument.write(f'DET {detector}')
# Configure for EMI compliance
if start_freq >= 30e6 and stop_freq <= 1e9:
# CISPR 16 settings for 30 MHz - 1 GHz
self.instrument.write('BAND:RES 120000') # 120 kHz RBW
self.instrument.write('BAND:VID 120000') # 120 kHz VBW
# Start measurement
self.instrument.write('INIT:IMM')
self.wait_for_operation_complete()
# Get trace
frequencies, trace_data = self.acquire_trace()
return {
'frequencies': frequencies,
'levels': trace_data,
'detector': detector,
'compliance': 'CISPR_16'
}
except Exception as e:
print(f"EMI mode not available: {e}")
return None
# Usage example
fsa = RohdeSchwartzSpectrumAnalyzer('TCPIP::192.168.1.100::hislip0::INSTR')
# Configure for 1 GHz center, 100 MHz span
fsa.configure_spectrum(center_freq=1e9, span=100e6, rbw=1e6)
fsa.set_reference_level(0) # 0 dBm reference
fsa.set_attenuation(10) # 10 dB attenuation
# Single trace acquisition
frequencies, spectrum = fsa.acquire_trace()
# Find peaks
peak = fsa.peak_search()
print(f"Peak: {peak['frequency']/1e6:.2f} MHz at {peak['level']:.1f} dBm")
# Multiple peaks
peaks = fsa.multi_peak_search(num_peaks=5)
for i, peak in enumerate(peaks):
print(f"Peak {i+1}: {peak['frequency']/1e6:.2f} MHz at {peak['level']:.1f} dBm")
fsa.close()
SMW/SMB Signal Generator Control
Vector Signal Generation
class RohdeSchwartzSignalGenerator(RohdeSchwartzInstrument):
"""R&S Signal Generator controller (SMW, SMB, SMBV series)"""
def __init__(self, resource_string):
super().__init__(resource_string)
# Get generator capabilities
self.frequency_range = self._get_frequency_range()
self.has_vector_modulation = self._check_vector_capability()
print(f"Signal generator capabilities:")
print(f" Frequency range: {self.frequency_range}")
print(f" Vector modulation: {self.has_vector_modulation}")
def _get_frequency_range(self):
"""Determine frequency range"""
try:
freq_min = float(self.instrument.query('SOUR:FREQ? MIN'))
freq_max = float(self.instrument.query('SOUR:FREQ? MAX'))
return {'min': freq_min, 'max': freq_max}
except:
return {'min': 9e3, 'max': 6e9} # SMB100A default
def _check_vector_capability(self):
"""Check if vector modulation is available"""
try:
self.instrument.query('SOUR:IQ:STAT?')
return True
except:
return False
def set_cw_signal(self, frequency=1e9, power=0):
"""Set continuous wave signal"""
self.instrument.write(f'SOUR:FREQ {frequency}')
self.instrument.write(f'SOUR:POW {power}')
# Disable modulation
self.instrument.write('SOUR:AM:STAT OFF')
self.instrument.write('SOUR:FM:STAT OFF')
self.instrument.write('SOUR:PM:STAT OFF')
if self.has_vector_modulation:
self.instrument.write('SOUR:IQ:STAT OFF')
def set_am_modulation(self, mod_frequency=1000, mod_depth=50, mod_source='INT'):
"""Configure AM modulation"""
self.instrument.write('SOUR:AM:STAT ON')
self.instrument.write(f'SOUR:AM:SOUR {mod_source}')
if mod_source == 'INT':
self.instrument.write(f'SOUR:AM:INT:FREQ {mod_frequency}')
self.instrument.write(f'SOUR:AM:DEPT {mod_depth}')
def set_fm_modulation(self, mod_frequency=1000, deviation=10000, mod_source='INT'):
"""Configure FM modulation"""
self.instrument.write('SOUR:FM:STAT ON')
self.instrument.write(f'SOUR:FM:SOUR {mod_source}')
if mod_source == 'INT':
self.instrument.write(f'SOUR:FM:INT:FREQ {mod_frequency}')
self.instrument.write(f'SOUR:FM:DEV {deviation}')
def set_pulse_modulation(self, pulse_width=1e-6, pulse_period=10e-6):
"""Configure pulse modulation"""
self.instrument.write('SOUR:PULM:STAT ON')
self.instrument.write('SOUR:PULM:SOUR INT')
self.instrument.write(f'SOUR:PULM:INT:PWID {pulse_width}')
self.instrument.write(f'SOUR:PULM:INT:PER {pulse_period}')
def load_waveform_file(self, filename, slot='WAVEFORM1'):
"""Load I/Q waveform file"""
if not self.has_vector_modulation:
raise ValueError("Vector modulation not supported")
# Load waveform (assuming file is on instrument)
self.instrument.write(f'SOUR:BB:ARB:WAV:SEL "{filename}"')
# Enable ARB and I/Q modulation
self.instrument.write('SOUR:BB:ARB:STAT ON')
self.instrument.write('SOUR:IQ:STAT ON')
def generate_multitone(self, base_frequency=1e9, tone_spacing=1e6, num_tones=10, power_per_tone=-10):
"""Generate multitone signal"""
if not self.has_vector_modulation:
print("Multitone requires vector modulation capability")
return
# Create multitone I/Q data
sample_rate = tone_spacing * num_tones * 10 # 10x oversampling
duration = 1e-3 # 1 ms waveform
t = np.linspace(0, duration, int(sample_rate * duration))
# Generate complex signal with multiple tones
signal = np.zeros(len(t), dtype=complex)
for i in range(num_tones):
tone_freq = (i - num_tones//2) * tone_spacing
tone_amplitude = 10**(power_per_tone/20) / np.sqrt(num_tones)
signal += tone_amplitude * np.exp(1j * 2 * np.pi * tone_freq * t)
# Upload I/Q data (this would require file transfer in practice)
print(f"Generated multitone signal:")
print(f" Center frequency: {base_frequency/1e9:.3f} GHz")
print(f" {num_tones} tones, {tone_spacing/1e6:.1f} MHz spacing")
print(f" Power per tone: {power_per_tone} dBm")
# In practice, you would save this to a file and upload it
return signal
def lte_signal_generation(self, bandwidth=20e6, resource_blocks=100):
"""Generate LTE-like signal"""
if not self.has_vector_modulation:
print("LTE signal generation requires vector modulation")
return
# Configure for LTE (simplified example)
try:
# Enable digital standards (if available)
self.instrument.write('SOUR:BB:EUTR:STAT ON') # LTE
self.instrument.write(f'SOUR:BB:EUTR:LINK:DL:BAND {bandwidth}')
self.instrument.write(f'SOUR:BB:EUTR:LINK:DL:NORB {resource_blocks}')
# Enable I/Q modulation
self.instrument.write('SOUR:IQ:STAT ON')
print(f"LTE signal configured:")
print(f" Bandwidth: {bandwidth/1e6} MHz")
print(f" Resource blocks: {resource_blocks}")
except Exception as e:
print(f"LTE generation not available: {e}")
def set_frequency_sweep(self, start_freq, stop_freq, sweep_time, sweep_mode='AUTO'):
"""Configure frequency sweep"""
self.instrument.write('SOUR:SWE:FREQ:MODE AUTO')
self.instrument.write(f'SOUR:SWE:FREQ:START {start_freq}')
self.instrument.write(f'SOUR:SWE:FREQ:STOP {stop_freq}')
self.instrument.write(f'SOUR:SWE:FREQ:DWELL {sweep_time}')
# Start sweep
self.instrument.write('SOUR:SWE:FREQ:STAT ON')
def enable_output(self, enabled=True):
"""Enable/disable RF output"""
state = 'ON' if enabled else 'OFF'
self.instrument.write(f'OUTP:STAT {state}')
# Check for errors after enabling output
errors = self.check_errors()
if errors:
print(f"Output enable errors: {errors}")
# Usage examples
smw = RohdeSchwartzSignalGenerator('TCPIP::192.168.1.101::hislip0::INSTR')
# Generate CW signal
smw.set_cw_signal(frequency=2.4e9, power=-10) # 2.4 GHz, -10 dBm
smw.enable_output(True)
# Add AM modulation
smw.set_am_modulation(mod_frequency=1000, mod_depth=50)
# Generate multitone for testing
multitone_signal = smw.generate_multitone(
base_frequency=1e9,
tone_spacing=100e3,
num_tones=20,
power_per_tone=-20
)
# LTE signal
smw.lte_signal_generation(bandwidth=20e6, resource_blocks=100)
smw.close()
RTO/RTM Oscilloscope Control
High-Performance Oscilloscope Acquisition
class RohdeSchwartzOscilloscope(RohdeSchwartzInstrument):
"""R&S Oscilloscope controller (RTO, RTM, RTB series)"""
def __init__(self, resource_string):
super().__init__(resource_string)
# Configure for optimal data transfer
self.instrument.write('FORM:DATA REAL,32') # 32-bit float data
self.instrument.write('FORM:BORD NORM') # Normal byte order
# Get oscilloscope info
self.channels = self._get_channel_count()
self.bandwidth = self._get_bandwidth()
print(f"Oscilloscope info:")
print(f" Channels: {self.channels}")
print(f" Bandwidth: {self.bandwidth}")
def _get_channel_count(self):
"""Determine number of channels"""
count = 0
for i in range(1, 9): # Check up to 8 channels
try:
self.instrument.query(f'CHAN{i}:STAT?')
count = i
except:
break
return count
def _get_bandwidth(self):
"""Get oscilloscope bandwidth"""
try:
# Try to get bandwidth from system info
bandwidth_hz = float(self.instrument.query('SYST:BAND?'))
return f"{bandwidth_hz/1e9:.1f} GHz"
except:
return "Unknown"
def configure_channel(self, channel, coupling='DC', scale=1.0, offset=0.0, enabled=True):
"""Configure oscilloscope channel"""
if channel > self.channels:
raise ValueError(f"Channel {channel} not available")
# Channel settings
self.instrument.write(f'CHAN{channel}:STAT {"ON" if enabled else "OFF"}')
self.instrument.write(f'CHAN{channel}:COUP {coupling}')
self.instrument.write(f'CHAN{channel}:SCAL {scale}')
self.instrument.write(f'CHAN{channel}:OFFS {offset}')
# Probe settings (auto-detect)
try:
self.instrument.write(f'CHAN{channel}:PROB:AUTO ONCE')
except:
pass # Not all models support auto probe detection
def configure_timebase(self, time_scale=1e-6, position=0.0):
"""Configure horizontal timebase"""
self.instrument.write(f'TIM:SCAL {time_scale}')
self.instrument.write(f'TIM:POS {position}')
# Set acquisition mode
self.instrument.write('ACQ:MODE RTIM') # Real-time mode
def setup_trigger(self, channel=1, level=0.0, edge='POS', mode='AUTO'):
"""Setup trigger"""
self.instrument.write(f'TRIG:SOUR CHAN{channel}')
self.instrument.write(f'TRIG:LEV {level}')
self.instrument.write(f'TRIG:SLOP {edge}') # POS or NEG
self.instrument.write(f'TRIG:MODE {mode}') # AUTO, NORM, SING
def acquire_waveform(self, channel=1):
"""Acquire waveform from specified channel"""
# Single acquisition
self.instrument.write('SING') # Single trigger
# Wait for trigger
self.wait_for_operation_complete()
# Get waveform data
self.instrument.write(f'CHAN{channel}:DATA?')
waveform_data = self.instrument.read_binary_values(datatype='f')
# Get time base information
time_scale = float(self.instrument.query('TIM:SCAL?'))
time_position = float(self.instrument.query('TIM:POS?'))
sample_rate = float(self.instrument.query('ACQ:SRAT?'))
# Create time axis
num_points = len(waveform_data)
time_per_div = time_scale
time_range = time_per_div * 10 # 10 divisions
dt = 1.0 / sample_rate
time_axis = np.arange(num_points) * dt - time_range/2 + time_position
return time_axis, np.array(waveform_data)
def acquire_all_channels(self):
"""Acquire waveforms from all enabled channels"""
# Single acquisition
self.instrument.write('SING')
self.wait_for_operation_complete()
waveforms = {}
for channel in range(1, self.channels + 1):
# Check if channel is enabled
try:
enabled = self.instrument.query(f'CHAN{channel}:STAT?').strip()
if enabled == '1' or enabled.upper() == 'ON':
time_axis, waveform_data = self.acquire_waveform(channel)
waveforms[f'CH{channel}'] = {
'time': time_axis,
'voltage': waveform_data
}
except:
continue
return waveforms
def automated_measurements(self, channel=1):
"""Perform automated measurements"""
measurements = {}
# R&S measurement commands
meas_commands = {
'frequency': f'MEAS:FREQ? CHAN{channel}',
'period': f'MEAS:PER? CHAN{channel}',
'amplitude': f'MEAS:AMPL? CHAN{channel}',
'peak_to_peak': f'MEAS:PTP? CHAN{channel}',
'rms': f'MEAS:RMS? CHAN{channel}',
'mean': f'MEAS:MEAN? CHAN{channel}',
'rise_time': f'MEAS:RIS? CHAN{channel}',
'fall_time': f'MEAS:FALL? CHAN{channel}',
'pulse_width': f'MEAS:PWID? CHAN{channel}',
'duty_cycle': f'MEAS:DUTY? CHAN{channel}'
}
for param, command in meas_commands.items():
try:
value = float(self.instrument.query(command))
measurements[param] = value
except:
measurements[param] = None
return measurements
def fft_analysis(self, channel=1):
"""Perform FFT analysis"""
# Enable math channel for FFT
try:
self.instrument.write('CALC:MATH:STAT ON')
self.instrument.write(f'CALC:MATH:EXPR "FFT(CHAN{channel})"')
# Configure FFT
self.instrument.write('CALC:MATH:FFT:WIND HANN') # Hanning window
# Get FFT data
fft_data = self.instrument.query_binary_values('CALC:MATH:DATA?', datatype='f')
# Get frequency axis information
center_freq = float(self.instrument.query('CALC:MATH:FFT:FREQ:CENT?'))
span = float(self.instrument.query('CALC:MATH:FFT:FREQ:SPAN?'))
num_points = len(fft_data)
frequencies = np.linspace(
center_freq - span/2,
center_freq + span/2,
num_points
)
return frequencies, np.array(fft_data)
except Exception as e:
print(f"FFT analysis failed: {e}")
return None, None
def protocol_decode_setup(self, protocol='I2C', **kwargs):
"""Setup protocol decoding"""
if protocol.upper() == 'I2C':
# I2C setup
scl_channel = kwargs.get('scl_channel', 1)
sda_channel = kwargs.get('sda_channel', 2)
self.instrument.write('BUS1:STAT ON')
self.instrument.write('BUS1:TYPE I2C')
self.instrument.write(f'BUS1:I2C:SCL:SOUR CHAN{scl_channel}')
self.instrument.write(f'BUS1:I2C:SDA:SOUR CHAN{sda_channel}')
elif protocol.upper() == 'SPI':
# SPI setup
clk_channel = kwargs.get('clk_channel', 1)
data_channel = kwargs.get('data_channel', 2)
cs_channel = kwargs.get('cs_channel', 3)
self.instrument.write('BUS1:STAT ON')
self.instrument.write('BUS1:TYPE SPI')
self.instrument.write(f'BUS1:SPI:CLK:SOUR CHAN{clk_channel}')
self.instrument.write(f'BUS1:SPI:DATA:SOUR CHAN{data_channel}')
self.instrument.write(f'BUS1:SPI:CS:SOUR CHAN{cs_channel}')
def histogram_analysis(self, channel=1):
"""Perform histogram analysis"""
try:
# Enable histogram
self.instrument.write(f'HIST{channel}:STAT ON')
self.instrument.write(f'HIST{channel}:SOUR CHAN{channel}')
# Start histogram acquisition
self.instrument.write(f'HIST{channel}:RUN')
time.sleep(2) # Allow data collection
# Get histogram results
hist_data = self.instrument.query_ascii_values(f'HIST{channel}:DATA?')
# Get statistics
mean = float(self.instrument.query(f'HIST{channel}:MEAN?'))
std_dev = float(self.instrument.query(f'HIST{channel}:SDEV?'))
return {
'histogram_data': hist_data,
'mean': mean,
'std_dev': std_dev
}
except Exception as e:
print(f"Histogram analysis failed: {e}")
return None
# Usage example
rto = RohdeSchwartzOscilloscope('TCPIP::192.168.1.102::hislip0::INSTR')
# Configure channels
rto.configure_channel(1, coupling='DC', scale=0.5, offset=0.0) # ±2V range
rto.configure_channel(2, coupling='AC', scale=0.1, offset=0.0) # ±400mV range
# Set timebase
rto.configure_timebase(time_scale=1e-6) # 1 μs/div
# Setup trigger
rto.setup_trigger(channel=1, level=0.1, edge='POS', mode='NORM')
# Acquire waveforms
waveforms = rto.acquire_all_channels()
# Automated measurements
measurements = rto.automated_measurements(channel=1)
print("Automated measurements:", measurements)
# FFT analysis
frequencies, fft_data = rto.fft_analysis(channel=1)
if fft_data is not None:
print(f"FFT: {len(fft_data)} points from {frequencies[0]/1e6:.1f} to {frequencies[-1]/1e6:.1f} MHz")
rto.close()
ZNA/ZNB Network Analyzer Control
Vector Network Analysis
class RohdeSchwartzNetworkAnalyzer(RohdeSchwartzInstrument):
"""R&S Vector Network Analyzer controller (ZNA, ZNB, ZNC series)"""
def __init__(self, resource_string):
super().__init__(resource_string)
# VNA-specific initialization
self.ports = self._get_port_count()
self.frequency_range = self._get_frequency_range()
print(f"Network Analyzer info:")
print(f" Ports: {self.ports}")
print(f" Frequency range: {self.frequency_range}")
def _get_port_count(self):
"""Determine number of ports"""
try:
# Query system configuration
ports = int(self.instrument.query('INST:PORT:COUN?'))
return ports
except:
return 2 # Assume 2-port
def _get_frequency_range(self):
"""Get frequency range"""
try:
freq_min = float(self.instrument.query('FREQ:STAR? MIN'))
freq_max = float(self.instrument.query('FREQ:STOP? MAX'))
return {
'min': freq_min,
'max': freq_max,
'range': f"{freq_min/1e9:.3f} - {freq_max/1e9:.1f} GHz"
}
except:
return {'min': 10e6, 'max': 8.5e9, 'range': '0.01 - 8.5 GHz'}
def configure_sweep(self, start_freq=1e9, stop_freq=2e9, points=201, if_bandwidth=10e3):
"""Configure frequency sweep"""
self.instrument.write(f'FREQ:STAR {start_freq}')
self.instrument.write(f'FREQ:STOP {stop_freq}')
self.instrument.write(f'SWE:POIN {points}')
self.instrument.write(f'BAND {if_bandwidth}')
# Set sweep mode
self.instrument.write('SWE:MODE SING') # Single sweep
def configure_power(self, power_dbm=0, port=1):
"""Configure source power"""
self.instrument.write(f'SOUR{port}:POW {power_dbm}')
# Enable port
self.instrument.write(f'SOUR{port}:POW:STAT ON')
def setup_measurement(self, s_parameter='S21', measurement_name='Trc1'):
"""Setup S-parameter measurement"""
# Create/select measurement
self.instrument.write(f'CALC:PAR:SDEF "{measurement_name}","{s_parameter}"')
self.instrument.write(f'DISP:WIND:TRAC1:FEED "{measurement_name}"')
# Select measurement for data retrieval
self.instrument.write(f'CALC:PAR:SEL "{measurement_name}"')
def perform_calibration(self, cal_type='FULL_2PORT'):
"""Perform calibration (simplified)"""
print(f"Starting {cal_type} calibration...")
if cal_type == 'FULL_2PORT' and self.ports >= 2:
# Full 2-port calibration sequence
cal_standards = ['OPEN', 'SHORT', 'LOAD', 'THRU']
# Start calibration
self.instrument.write('SENS:CORR:COLL:METH:SOLT2 1,2') # SOLT 2-port
for port in [1, 2]:
for standard in cal_standards[:3]: # OSL on each port
print(f"Connect {standard} to port {port}, press Enter when ready...")
input() # Wait for user
if standard == 'OPEN':
self.instrument.write(f'SENS:CORR:COLL:OPEN {port}')
elif standard == 'SHORT':
self.instrument.write(f'SENS:CORR:COLL:SHOR {port}')
elif standard == 'LOAD':
self.instrument.write(f'SENS:CORR:COLL:LOAD {port}')
# THRU connection
print("Connect THRU between ports 1 and 2, press Enter when ready...")
input()
self.instrument.write('SENS:CORR:COLL:THRU 1,2')
# Apply calibration
self.instrument.write('SENS:CORR:COLL:SAVE:SEL:DEF')
print("Calibration completed!")
else:
print(f"Calibration type {cal_type} not implemented")
def measure_s_parameters(self, s_params=['S11', 'S21', 'S12', 'S22']):
"""Measure multiple S-parameters"""
results = {}
for s_param in s_params:
# Setup measurement for this S-parameter
meas_name = f"Meas_{s_param}"
self.setup_measurement(s_parameter=s_param, measurement_name=meas_name)
# Perform sweep
self.instrument.write('INIT:IMM')
self.wait_for_operation_complete()
# Get formatted data (complex)
self.instrument.write('FORM:DATA REAL,64') # Double precision
data_real = self.instrument.query_ascii_values('CALC:DATA? FDAT')
data_imag = self.instrument.query_ascii_values('CALC:DATA? SDAT')
# Combine into complex array
s_data = np.array(data_real) + 1j * np.array(data_imag)
results[s_param] = s_data
# Get frequency axis
frequencies = self.instrument.query_ascii_values('FREQ:DATA?')
return frequencies, results
def time_domain_transform(self, s_parameter='S21', transform_type='LOWPASS'):
"""Perform time domain transformation"""
# Setup measurement
self.setup_measurement(s_parameter=s_parameter)
# Configure time domain
self.instrument.write(f'CALC:TRAN:TIME:TYPE {transform_type}')
self.instrument.write('CALC:TRAN:TIME:STAT ON')
# Perform measurement
self.instrument.write('INIT:IMM')
self.wait_for_operation_complete()
# Get time domain data
time_data = self.instrument.query_ascii_values('CALC:DATA:TDOM? FDAT')
time_axis = self.instrument.query_ascii_values('CALC:DATA:TDOM:STIM?')
return np.array(time_axis), np.array(time_data)
def measure_group_delay(self, s_parameter='S21'):
"""Measure group delay"""
# Setup group delay measurement
self.instrument.write(f'CALC:PAR:SDEF "GD","{s_parameter}"')
self.instrument.write('CALC:PAR:SEL "GD"')
self.instrument.write('CALC:FORM GDEL') # Group delay format
# Perform measurement
self.instrument.write('INIT:IMM')
self.wait_for_operation_complete()
# Get group delay data
group_delay = self.instrument.query_ascii_values('CALC:DATA? FDAT')
frequencies = self.instrument.query_ascii_values('FREQ:DATA?')
return np.array(frequencies), np.array(group_delay)
def mixer_conversion_loss(self, lo_frequency, if_frequency, rf_start, rf_stop):
"""Measure mixer conversion loss"""
# Configure for mixer measurement
self.instrument.write('SENS:MIX:STAT ON')
self.instrument.write(f'SENS:MIX:LO:FREQ {lo_frequency}')
self.instrument.write(f'SENS:MIX:IF:FREQ {if_frequency}')
# Set RF frequency range
self.instrument.write(f'FREQ:STAR {rf_start}')
self.instrument.write(f'FREQ:STOP {rf_stop}')
# Setup conversion loss measurement
self.setup_measurement(s_parameter='SC21', measurement_name='Conv_Loss')
# Perform measurement
self.instrument.write('INIT:IMM')
self.wait_for_operation_complete()
# Get conversion loss data
conv_loss = self.instrument.query_ascii_values('CALC:DATA? FDAT')
frequencies = self.instrument.query_ascii_values('FREQ:DATA?')
return np.array(frequencies), np.array(conv_loss)
# Usage example
vna = RohdeSchwartzNetworkAnalyzer('TCPIP::192.168.1.103::hislip0::INSTR')
# Configure measurement
vna.configure_sweep(start_freq=1e9, stop_freq=2e9, points=401, if_bandwidth=1e3)
vna.configure_power(power_dbm=-10, port=1)
# Measure S-parameters
frequencies, s_params = vna.measure_s_parameters(['S11', 'S21'])
# Convert to dB and phase
s11_db = 20 * np.log10(np.abs(s_params['S11']))
s11_phase = np.angle(s_params['S11']) * 180 / np.pi
s21_db = 20 * np.log10(np.abs(s_params['S21']))
print(f"S11: {np.min(s11_db):.1f} to {np.max(s11_db):.1f} dB")
print(f"S21: {np.min(s21_db):.1f} to {np.max(s21_db):.1f} dB")
# Group delay measurement
gd_frequencies, group_delay = vna.measure_group_delay('S21')
print(f"Group delay: {np.mean(group_delay)*1e9:.1f} ± {np.std(group_delay)*1e9:.1f} ns")
vna.close()
Automated Test System Example
Multi-Instrument R&S Test Setup
class RohdeSchwartzTestSystem:
"""Integrated R&S test system"""
def __init__(self, fsa_resource=None, smw_resource=None, rto_resource=None, vna_resource=None):
self.instruments = {}
if fsa_resource:
self.instruments['FSA'] = RohdeSchwartzSpectrumAnalyzer(fsa_resource)
if smw_resource:
self.instruments['SMW'] = RohdeSchwartzSignalGenerator(smw_resource)
if rto_resource:
self.instruments['RTO'] = RohdeSchwartzOscilloscope(rto_resource)
if vna_resource:
self.instruments['VNA'] = RohdeSchwartzNetworkAnalyzer(vna_resource)
print(f"Test system initialized with: {list(self.instruments.keys())}")
def amplifier_characterization(self, input_frequencies, input_powers):
"""Complete amplifier characterization"""
if 'SMW' not in self.instruments or 'FSA' not in self.instruments:
raise ValueError("SMW and FSA required for amplifier test")
smw = self.instruments['SMW']
fsa = self.instruments['FSA']
results = []
for freq in input_frequencies:
for power in input_powers:
print(f"Testing {freq/1e9:.3f} GHz @ {power} dBm")
# Configure generator
smw.set_cw_signal(frequency=freq, power=power)
smw.enable_output(True)
# Configure analyzer
fsa.configure_spectrum(center_freq=freq, span=100e6, rbw=1e6)
fsa.set_reference_level(power + 20) # Expected gain ~20dB
time.sleep(0.5) # Settling time
# Measure output
peak = fsa.peak_search()
# Calculate gain
gain_db = peak['level'] - power
results.append({
'input_frequency': freq,
'input_power': power,
'output_power': peak['level'],
'gain_db': gain_db
})
return results
def phase_noise_vs_temperature(self, frequencies, temperatures):
"""Phase noise vs temperature test"""
if 'FSA' not in self.instruments:
raise ValueError("FSA required for phase noise test")
fsa = self.instruments['FSA']
results = []
for temp in temperatures:
print(f"Set chamber to {temp}°C and press Enter...")
input() # Wait for temperature stabilization
for freq in frequencies:
# Measure phase noise
pn_results = fsa.phase_noise_measurement(carrier_freq=freq)
if pn_results:
result = {
'temperature': temp,
'frequency': freq,
'phase_noise': pn_results
}
results.append(result)
return results
def close_all(self):
"""Close all instruments"""
for name, instrument in self.instruments.items():
print(f"Closing {name}")
instrument.close()
# Usage
test_system = RohdeSchwartzTestSystem(
fsa_resource='TCPIP::192.168.1.100::hislip0::INSTR',
smw_resource='TCPIP::192.168.1.101::hislip0::INSTR'
)
# Amplifier test
frequencies = [1e9, 1.5e9, 2e9, 2.5e9, 3e9] # 1-3 GHz
powers = [-30, -20, -10, 0] # -30 to 0 dBm
amp_results = test_system.amplifier_characterization(frequencies, powers)
# Analyze results
for result in amp_results[:5]: # Show first 5 results
print(f"{result['input_frequency']/1e9:.1f} GHz, {result['input_power']} dBm → "
f"{result['gain_db']:.1f} dB gain")
test_system.close_all()
Best Practices for R&S Instruments
Error Handling and Performance
def rs_best_practices_example():
"""Demonstrate R&S instrument best practices"""
# Use HiSLIP for Ethernet connections (faster than VXI-11)
instrument_ip = "192.168.1.100"
resource_string = f"TCPIP::{instrument_ip}::hislip0::INSTR"
try:
rm = pyvisa.ResourceManager()
instrument = rm.open_resource(resource_string)
# Optimize timeouts for R&S instruments
instrument.timeout = 30000 # 30 seconds
# Use appropriate data formats
instrument.write('FORM:DATA REAL,32') # 32-bit floats
# Check for errors regularly
def check_and_clear_errors():
errors = []
while True:
error = instrument.query('SYST:ERR?')
if '0,' in error:
break
errors.append(error.strip())
if len(errors) > 10: # Prevent infinite loop
break
return errors
# Example measurement with error checking
instrument.write('*RST')
errors = check_and_clear_errors()
if errors:
print(f"Reset errors: {errors}")
# Use *OPC? for synchronization
instrument.write('INIT:IMM')
instrument.write('*OPC?')
response = instrument.read() # Wait for completion
print("✅ Measurement complete")
instrument.close()
rm.close()
except Exception as e:
print(f"❌ Error: {e}")
# Common troubleshooting function
def diagnose_rs_connection(resource_string):
"""Diagnose R&S instrument connection"""
print("R&S Instrument Diagnostics")
print("=" * 30)
try:
rm = pyvisa.ResourceManager()
instrument = rm.open_resource(resource_string)
# Basic identification
idn = instrument.query('*IDN?')
print(f"Instrument: {idn.strip()}")
# Check options
try:
options = instrument.query('*OPT?')
print(f"Options: {options.strip()}")
except:
print("Options query not supported")
# Check error queue
errors = []
while True:
error = instrument.query('SYST:ERR?')
if '0,' in error:
break
errors.append(error.strip())
if errors:
print(f"Errors found: {errors}")
else:
print("No errors in queue")
instrument.close()
rm.close()
print("✅ Connection successful")
except Exception as e:
print(f"❌ Connection failed: {e}")
print("\nTroubleshooting tips:")
print("1. Check network connectivity")
print("2. Verify instrument IP address")
print("3. Use HiSLIP protocol (::hislip0::INSTR)")
print("4. Check firewall settings")
print("5. Update R&S VISA drivers")
# Run diagnostics
diagnose_rs_connection('TCPIP::192.168.1.100::hislip0::INSTR')
Next Steps
- Keysight instruments: Keysight Instruments
- Tektronix instruments: Tektronix Instruments
- Performance optimization: Performance Guide
How is this guide?