PyVISA
PyVISADocs

Rohde & Schwarz Instruments

Control Rohde & Schwarz spectrum analyzers, signal generators, oscilloscopes, and network analyzers using PyVISA with SCPI commands and examples.

Rohde & Schwarz instrument control with PyVISA through examples, R&S-specific SCPI commands, and test automation.

Supported R&S Instruments

Spectrum Analyzers

  • FSW Series: High-end signal and spectrum analyzers (FSW8, FSW26, FSW43, FSW67, FSW85)
  • FSV Series: Mid-range spectrum analyzers (FSV3000, FSV4000)
  • FPS Series: Cost-effective spectrum analyzers
  • FSVA Series: Vector spectrum analyzers

Signal Generators

  • SMW Series: Vector signal generators (SMW200A)
  • SMB Series: Analog signal generators (SMB100A, SMB100B)
  • SMBV Series: Vector signal generators (SMBV100A, SMBV100B)
  • SGT Series: Vector signal generators for 5G/6G

Oscilloscopes

  • RTO Series: High-end oscilloscopes (RTO2000, RTO6000)
  • RTB Series: Mid-range oscilloscopes (RTB2000)
  • RTM Series: Entry-level oscilloscopes (RTM3000)

Network Analyzers

  • ZNA Series: High-end vector network analyzers
  • ZNB Series: Mid-range vector network analyzers
  • ZNC Series: Cost-effective network analyzers

Basic Connection and Setup

R&S-Specific Connection Handling

import pyvisa
import numpy as np
import time
import re

class RohdeSchwartzInstrument:
    """Base class for Rohde & Schwarz instruments"""
    
    def __init__(self, resource_string):
        self.rm = pyvisa.ResourceManager()
        self.instrument = self.rm.open_resource(resource_string)
        
        # R&S-specific settings
        self.instrument.timeout = 30000  # 30 seconds (R&S instruments can be slow)
        self.instrument.read_termination = '\n'
        self.instrument.write_termination = '\n'
        
        # Verify connection and get instrument info
        try:
            self.idn = self.instrument.query('*IDN?')
            print(f"Connected to: {self.idn.strip()}")
            
            # Parse R&S instrument info
            idn_parts = self.idn.split(',')
            self.manufacturer = idn_parts[0].strip()
            self.model = idn_parts[1].strip() if len(idn_parts) > 1 else "Unknown"
            self.serial = idn_parts[2].strip() if len(idn_parts) > 2 else "Unknown"
            self.firmware = idn_parts[3].strip() if len(idn_parts) > 3 else "Unknown"
            
            # Common R&S initialization
            self.instrument.write('*RST')  # Reset
            self.instrument.write('*CLS')  # Clear status
            
            # Set SCPI error verbosity
            self.instrument.write('SYST:ERR:VERB ON')
            
        except Exception as e:
            print(f"Connection failed: {e}")
            raise
    
    def check_errors(self):
        """Check for SCPI errors (R&S specific format)"""
        errors = []
        try:
            while True:
                error_response = self.instrument.query('SYST:ERR?')
                if '0,"No error"' in error_response or error_response.startswith('0,'):
                    break
                errors.append(error_response.strip())
                
                # Prevent infinite loop
                if len(errors) > 100:
                    errors.append("Too many errors, stopping error check")
                    break
                    
        except Exception as e:
            errors.append(f"Error checking failed: {e}")
            
        return errors
    
    def wait_for_operation_complete(self, timeout=60):
        """Wait for operation to complete with timeout"""
        
        self.instrument.write('*OPC')  # Set operation complete bit
        
        start_time = time.time()
        while time.time() - start_time < timeout:
            try:
                # Check status byte
                status = int(self.instrument.query('*ESR?'))
                if status & 1:  # Operation complete bit
                    return True
                time.sleep(0.1)
            except:
                time.sleep(0.1)
        
        raise TimeoutError(f"Operation did not complete within {timeout} seconds")
    
    def close(self):
        """Close instrument connection"""
        self.instrument.close()
        self.rm.close()

# Connection examples for R&S instruments
# Ethernet: 'TCPIP::192.168.1.100::hislip0::INSTR' (HiSLIP protocol)
# USB: 'USB0::0x0AAD::0x0054::123456::INSTR'
# GPIB: 'GPIB0::20::INSTR'

FSW/FSV Spectrum Analyzer Control

Advanced Spectrum Analysis

class RohdeSchwartzSpectrumAnalyzer(RohdeSchwartzInstrument):
    """R&S Spectrum Analyzer controller (FSW, FSV, FPS series)"""
    
    def __init__(self, resource_string):
        super().__init__(resource_string)
        
        # Configure for optimal performance
        self.instrument.write('FORM ASC')  # ASCII format for queries
        self.instrument.write('INIT:CONT OFF')  # Single sweep mode
        
        # Get instrument capabilities
        self.frequency_range = self._get_frequency_range()
        print(f"Frequency range: {self.frequency_range}")
        
    def _get_frequency_range(self):
        """Get instrument frequency range"""
        try:
            freq_min = float(self.instrument.query('FREQ:START? MIN'))
            freq_max = float(self.instrument.query('FREQ:STOP? MAX'))
            return {'min': freq_min, 'max': freq_max}
        except:
            return {'min': 9e3, 'max': 26.5e9}  # FSW default
    
    def configure_spectrum(self, center_freq=1e9, span=100e6, rbw=1e6, vbw=None):
        """Configure basic spectrum analyzer settings"""
        
        # Frequency settings
        self.instrument.write(f'FREQ:CENT {center_freq}')
        self.instrument.write(f'FREQ:SPAN {span}')
        
        # Bandwidth settings
        self.instrument.write(f'BAND:RES {rbw}')
        if vbw:
            self.instrument.write(f'BAND:VID {vbw}')
        else:
            self.instrument.write('BAND:VID:AUTO ON')
        
        # Sweep settings
        self.instrument.write('SWE:MODE AUTO')  # Auto sweep mode
        self.instrument.write('AVER:COUN 1')    # No averaging initially
        
    def set_reference_level(self, ref_level=0):
        """Set reference level"""
        self.instrument.write(f'DISP:WIND:TRAC:Y:RLEV {ref_level}')
        
    def set_attenuation(self, attenuation=10):
        """Set input attenuation"""
        self.instrument.write(f'INP:ATT {attenuation}')
        
    def acquire_trace(self, trace_number=1):
        """Acquire spectrum trace"""
        
        # Start measurement
        self.instrument.write('INIT:IMM')
        self.wait_for_operation_complete()
        
        # Get trace data
        trace_data = self.instrument.query_ascii_values(f'TRAC{trace_number}? TRACE{trace_number}')
        
        # Get frequency axis
        center_freq = float(self.instrument.query('FREQ:CENT?'))
        span = float(self.instrument.query('FREQ:SPAN?'))
        num_points = len(trace_data)
        
        frequencies = np.linspace(
            center_freq - span/2, 
            center_freq + span/2, 
            num_points
        )
        
        return frequencies, np.array(trace_data)
    
    def peak_search(self, trace_number=1, threshold=-50):
        """Perform peak search"""
        
        # Configure peak search
        self.instrument.write(f'CALC:MARK:FUNC:POW:SEL PEAK')
        self.instrument.write(f'CALC:MARK:TRAC {trace_number}')
        
        # Set threshold if supported
        try:
            self.instrument.write(f'CALC:MARK:PEAK:THR {threshold}')
        except:
            pass  # Not all models support threshold
        
        # Perform peak search
        self.instrument.write('CALC:MARK:MAX')
        
        # Get peak results
        peak_freq = float(self.instrument.query('CALC:MARK:X?'))
        peak_level = float(self.instrument.query('CALC:MARK:Y?'))
        
        return {
            'frequency': peak_freq,
            'level': peak_level,
            'marker_number': 1
        }
    
    def multi_peak_search(self, num_peaks=10, trace_number=1):
        """Find multiple peaks"""
        
        # Enable peak list
        self.instrument.write('CALC:MARK:FUNC:POW:SEL PLIS')
        self.instrument.write(f'CALC:MARK:TRAC {trace_number}')
        
        # Configure peak list
        self.instrument.write(f'CALC:MARK:PLIS:SIZE {num_peaks}')
        
        # Execute peak search
        self.instrument.write('CALC:MARK:FUNC:POW:EXEC PLIS')
        self.wait_for_operation_complete()
        
        # Get peak list
        peaks = []
        try:
            peak_data = self.instrument.query('CALC:MARK:PLIS:DATA?')
            # Parse peak data (frequency, level pairs)
            values = [float(x) for x in peak_data.split(',')]
            
            for i in range(0, len(values), 2):
                if i + 1 < len(values):
                    peaks.append({
                        'frequency': values[i],
                        'level': values[i + 1],
                        'peak_number': i // 2 + 1
                    })
        except Exception as e:
            print(f"Peak list error: {e}")
        
        return peaks
    
    def phase_noise_measurement(self, carrier_freq=1e9, offset_frequencies=None):
        """Phase noise measurement"""
        
        if offset_frequencies is None:
            offset_frequencies = [1e3, 10e3, 100e3, 1e6]  # Default offsets
        
        # Switch to phase noise mode (if supported)
        try:
            self.instrument.write('INST:SEL PNOI')  # Phase noise mode
            self.instrument.write(f'FREQ:CARR {carrier_freq}')
            
            phase_noise_results = []
            
            for offset_freq in offset_frequencies:
                # Set offset frequency
                self.instrument.write(f'FREQ:OFFS {offset_freq}')
                
                # Start measurement
                self.instrument.write('INIT:IMM')
                self.wait_for_operation_complete()
                
                # Read phase noise value
                pn_value = float(self.instrument.query('CALC:MARK:Y?'))
                
                phase_noise_results.append({
                    'offset_frequency': offset_freq,
                    'phase_noise_dbc_hz': pn_value
                })
            
            return phase_noise_results
            
        except Exception as e:
            print(f"Phase noise measurement not available: {e}")
            return None
    
    def emi_measurement(self, start_freq=30e6, stop_freq=1e9, detector='PEAK'):
        """EMI measurement setup"""
        
        # Switch to EMI mode (if supported)
        try:
            self.instrument.write('INST:SEL EMI')
            
            # Configure frequency range
            self.instrument.write(f'FREQ:START {start_freq}')
            self.instrument.write(f'FREQ:STOP {stop_freq}')
            
            # Set detector (PEAK, QPE, AVER, etc.)
            self.instrument.write(f'DET {detector}')
            
            # Configure for EMI compliance
            if start_freq >= 30e6 and stop_freq <= 1e9:
                # CISPR 16 settings for 30 MHz - 1 GHz
                self.instrument.write('BAND:RES 120000')  # 120 kHz RBW
                self.instrument.write('BAND:VID 120000')  # 120 kHz VBW
            
            # Start measurement
            self.instrument.write('INIT:IMM')
            self.wait_for_operation_complete()
            
            # Get trace
            frequencies, trace_data = self.acquire_trace()
            
            return {
                'frequencies': frequencies,
                'levels': trace_data,
                'detector': detector,
                'compliance': 'CISPR_16'
            }
            
        except Exception as e:
            print(f"EMI mode not available: {e}")
            return None

# Usage example
fsa = RohdeSchwartzSpectrumAnalyzer('TCPIP::192.168.1.100::hislip0::INSTR')

# Configure for 1 GHz center, 100 MHz span
fsa.configure_spectrum(center_freq=1e9, span=100e6, rbw=1e6)
fsa.set_reference_level(0)  # 0 dBm reference
fsa.set_attenuation(10)     # 10 dB attenuation

# Single trace acquisition
frequencies, spectrum = fsa.acquire_trace()

# Find peaks
peak = fsa.peak_search()
print(f"Peak: {peak['frequency']/1e6:.2f} MHz at {peak['level']:.1f} dBm")

# Multiple peaks
peaks = fsa.multi_peak_search(num_peaks=5)
for i, peak in enumerate(peaks):
    print(f"Peak {i+1}: {peak['frequency']/1e6:.2f} MHz at {peak['level']:.1f} dBm")

fsa.close()

SMW/SMB Signal Generator Control

Vector Signal Generation

class RohdeSchwartzSignalGenerator(RohdeSchwartzInstrument):
    """R&S Signal Generator controller (SMW, SMB, SMBV series)"""
    
    def __init__(self, resource_string):
        super().__init__(resource_string)
        
        # Get generator capabilities
        self.frequency_range = self._get_frequency_range()
        self.has_vector_modulation = self._check_vector_capability()
        
        print(f"Signal generator capabilities:")
        print(f"  Frequency range: {self.frequency_range}")
        print(f"  Vector modulation: {self.has_vector_modulation}")
    
    def _get_frequency_range(self):
        """Determine frequency range"""
        try:
            freq_min = float(self.instrument.query('SOUR:FREQ? MIN'))
            freq_max = float(self.instrument.query('SOUR:FREQ? MAX'))
            return {'min': freq_min, 'max': freq_max}
        except:
            return {'min': 9e3, 'max': 6e9}  # SMB100A default
    
    def _check_vector_capability(self):
        """Check if vector modulation is available"""
        try:
            self.instrument.query('SOUR:IQ:STAT?')
            return True
        except:
            return False
    
    def set_cw_signal(self, frequency=1e9, power=0):
        """Set continuous wave signal"""
        
        self.instrument.write(f'SOUR:FREQ {frequency}')
        self.instrument.write(f'SOUR:POW {power}')
        
        # Disable modulation
        self.instrument.write('SOUR:AM:STAT OFF')
        self.instrument.write('SOUR:FM:STAT OFF')
        self.instrument.write('SOUR:PM:STAT OFF')
        
        if self.has_vector_modulation:
            self.instrument.write('SOUR:IQ:STAT OFF')
    
    def set_am_modulation(self, mod_frequency=1000, mod_depth=50, mod_source='INT'):
        """Configure AM modulation"""
        
        self.instrument.write('SOUR:AM:STAT ON')
        self.instrument.write(f'SOUR:AM:SOUR {mod_source}')
        
        if mod_source == 'INT':
            self.instrument.write(f'SOUR:AM:INT:FREQ {mod_frequency}')
        
        self.instrument.write(f'SOUR:AM:DEPT {mod_depth}')
    
    def set_fm_modulation(self, mod_frequency=1000, deviation=10000, mod_source='INT'):
        """Configure FM modulation"""
        
        self.instrument.write('SOUR:FM:STAT ON')
        self.instrument.write(f'SOUR:FM:SOUR {mod_source}')
        
        if mod_source == 'INT':
            self.instrument.write(f'SOUR:FM:INT:FREQ {mod_frequency}')
        
        self.instrument.write(f'SOUR:FM:DEV {deviation}')
    
    def set_pulse_modulation(self, pulse_width=1e-6, pulse_period=10e-6):
        """Configure pulse modulation"""
        
        self.instrument.write('SOUR:PULM:STAT ON')
        self.instrument.write('SOUR:PULM:SOUR INT')
        self.instrument.write(f'SOUR:PULM:INT:PWID {pulse_width}')
        self.instrument.write(f'SOUR:PULM:INT:PER {pulse_period}')
    
    def load_waveform_file(self, filename, slot='WAVEFORM1'):
        """Load I/Q waveform file"""
        
        if not self.has_vector_modulation:
            raise ValueError("Vector modulation not supported")
        
        # Load waveform (assuming file is on instrument)
        self.instrument.write(f'SOUR:BB:ARB:WAV:SEL "{filename}"')
        
        # Enable ARB and I/Q modulation
        self.instrument.write('SOUR:BB:ARB:STAT ON')
        self.instrument.write('SOUR:IQ:STAT ON')
    
    def generate_multitone(self, base_frequency=1e9, tone_spacing=1e6, num_tones=10, power_per_tone=-10):
        """Generate multitone signal"""
        
        if not self.has_vector_modulation:
            print("Multitone requires vector modulation capability")
            return
        
        # Create multitone I/Q data
        sample_rate = tone_spacing * num_tones * 10  # 10x oversampling
        duration = 1e-3  # 1 ms waveform
        t = np.linspace(0, duration, int(sample_rate * duration))
        
        # Generate complex signal with multiple tones
        signal = np.zeros(len(t), dtype=complex)
        
        for i in range(num_tones):
            tone_freq = (i - num_tones//2) * tone_spacing
            tone_amplitude = 10**(power_per_tone/20) / np.sqrt(num_tones)
            signal += tone_amplitude * np.exp(1j * 2 * np.pi * tone_freq * t)
        
        # Upload I/Q data (this would require file transfer in practice)
        print(f"Generated multitone signal:")
        print(f"  Center frequency: {base_frequency/1e9:.3f} GHz")
        print(f"  {num_tones} tones, {tone_spacing/1e6:.1f} MHz spacing")
        print(f"  Power per tone: {power_per_tone} dBm")
        
        # In practice, you would save this to a file and upload it
        return signal
    
    def lte_signal_generation(self, bandwidth=20e6, resource_blocks=100):
        """Generate LTE-like signal"""
        
        if not self.has_vector_modulation:
            print("LTE signal generation requires vector modulation")
            return
        
        # Configure for LTE (simplified example)
        try:
            # Enable digital standards (if available)
            self.instrument.write('SOUR:BB:EUTR:STAT ON')  # LTE
            self.instrument.write(f'SOUR:BB:EUTR:LINK:DL:BAND {bandwidth}')
            self.instrument.write(f'SOUR:BB:EUTR:LINK:DL:NORB {resource_blocks}')
            
            # Enable I/Q modulation
            self.instrument.write('SOUR:IQ:STAT ON')
            
            print(f"LTE signal configured:")
            print(f"  Bandwidth: {bandwidth/1e6} MHz")
            print(f"  Resource blocks: {resource_blocks}")
            
        except Exception as e:
            print(f"LTE generation not available: {e}")
    
    def set_frequency_sweep(self, start_freq, stop_freq, sweep_time, sweep_mode='AUTO'):
        """Configure frequency sweep"""
        
        self.instrument.write('SOUR:SWE:FREQ:MODE AUTO')
        self.instrument.write(f'SOUR:SWE:FREQ:START {start_freq}')
        self.instrument.write(f'SOUR:SWE:FREQ:STOP {stop_freq}')
        self.instrument.write(f'SOUR:SWE:FREQ:DWELL {sweep_time}')
        
        # Start sweep
        self.instrument.write('SOUR:SWE:FREQ:STAT ON')
    
    def enable_output(self, enabled=True):
        """Enable/disable RF output"""
        
        state = 'ON' if enabled else 'OFF'
        self.instrument.write(f'OUTP:STAT {state}')
        
        # Check for errors after enabling output
        errors = self.check_errors()
        if errors:
            print(f"Output enable errors: {errors}")

# Usage examples
smw = RohdeSchwartzSignalGenerator('TCPIP::192.168.1.101::hislip0::INSTR')

# Generate CW signal
smw.set_cw_signal(frequency=2.4e9, power=-10)  # 2.4 GHz, -10 dBm
smw.enable_output(True)

# Add AM modulation
smw.set_am_modulation(mod_frequency=1000, mod_depth=50)

# Generate multitone for testing
multitone_signal = smw.generate_multitone(
    base_frequency=1e9,
    tone_spacing=100e3,
    num_tones=20,
    power_per_tone=-20
)

# LTE signal
smw.lte_signal_generation(bandwidth=20e6, resource_blocks=100)

smw.close()

RTO/RTM Oscilloscope Control

High-Performance Oscilloscope Acquisition

class RohdeSchwartzOscilloscope(RohdeSchwartzInstrument):
    """R&S Oscilloscope controller (RTO, RTM, RTB series)"""
    
    def __init__(self, resource_string):
        super().__init__(resource_string)
        
        # Configure for optimal data transfer
        self.instrument.write('FORM:DATA REAL,32')  # 32-bit float data
        self.instrument.write('FORM:BORD NORM')     # Normal byte order
        
        # Get oscilloscope info
        self.channels = self._get_channel_count()
        self.bandwidth = self._get_bandwidth()
        
        print(f"Oscilloscope info:")
        print(f"  Channels: {self.channels}")
        print(f"  Bandwidth: {self.bandwidth}")
    
    def _get_channel_count(self):
        """Determine number of channels"""
        count = 0
        for i in range(1, 9):  # Check up to 8 channels
            try:
                self.instrument.query(f'CHAN{i}:STAT?')
                count = i
            except:
                break
        return count
    
    def _get_bandwidth(self):
        """Get oscilloscope bandwidth"""
        try:
            # Try to get bandwidth from system info
            bandwidth_hz = float(self.instrument.query('SYST:BAND?'))
            return f"{bandwidth_hz/1e9:.1f} GHz"
        except:
            return "Unknown"
    
    def configure_channel(self, channel, coupling='DC', scale=1.0, offset=0.0, enabled=True):
        """Configure oscilloscope channel"""
        
        if channel > self.channels:
            raise ValueError(f"Channel {channel} not available")
        
        # Channel settings
        self.instrument.write(f'CHAN{channel}:STAT {"ON" if enabled else "OFF"}')
        self.instrument.write(f'CHAN{channel}:COUP {coupling}')
        self.instrument.write(f'CHAN{channel}:SCAL {scale}')
        self.instrument.write(f'CHAN{channel}:OFFS {offset}')
        
        # Probe settings (auto-detect)
        try:
            self.instrument.write(f'CHAN{channel}:PROB:AUTO ONCE')
        except:
            pass  # Not all models support auto probe detection
    
    def configure_timebase(self, time_scale=1e-6, position=0.0):
        """Configure horizontal timebase"""
        
        self.instrument.write(f'TIM:SCAL {time_scale}')
        self.instrument.write(f'TIM:POS {position}')
        
        # Set acquisition mode
        self.instrument.write('ACQ:MODE RTIM')  # Real-time mode
    
    def setup_trigger(self, channel=1, level=0.0, edge='POS', mode='AUTO'):
        """Setup trigger"""
        
        self.instrument.write(f'TRIG:SOUR CHAN{channel}')
        self.instrument.write(f'TRIG:LEV {level}')
        self.instrument.write(f'TRIG:SLOP {edge}')  # POS or NEG
        self.instrument.write(f'TRIG:MODE {mode}')  # AUTO, NORM, SING
    
    def acquire_waveform(self, channel=1):
        """Acquire waveform from specified channel"""
        
        # Single acquisition
        self.instrument.write('SING')  # Single trigger
        
        # Wait for trigger
        self.wait_for_operation_complete()
        
        # Get waveform data
        self.instrument.write(f'CHAN{channel}:DATA?')
        waveform_data = self.instrument.read_binary_values(datatype='f')
        
        # Get time base information
        time_scale = float(self.instrument.query('TIM:SCAL?'))
        time_position = float(self.instrument.query('TIM:POS?'))
        sample_rate = float(self.instrument.query('ACQ:SRAT?'))
        
        # Create time axis
        num_points = len(waveform_data)
        time_per_div = time_scale
        time_range = time_per_div * 10  # 10 divisions
        dt = 1.0 / sample_rate
        
        time_axis = np.arange(num_points) * dt - time_range/2 + time_position
        
        return time_axis, np.array(waveform_data)
    
    def acquire_all_channels(self):
        """Acquire waveforms from all enabled channels"""
        
        # Single acquisition
        self.instrument.write('SING')
        self.wait_for_operation_complete()
        
        waveforms = {}
        
        for channel in range(1, self.channels + 1):
            # Check if channel is enabled
            try:
                enabled = self.instrument.query(f'CHAN{channel}:STAT?').strip()
                if enabled == '1' or enabled.upper() == 'ON':
                    time_axis, waveform_data = self.acquire_waveform(channel)
                    waveforms[f'CH{channel}'] = {
                        'time': time_axis,
                        'voltage': waveform_data
                    }
            except:
                continue
        
        return waveforms
    
    def automated_measurements(self, channel=1):
        """Perform automated measurements"""
        
        measurements = {}
        
        # R&S measurement commands
        meas_commands = {
            'frequency': f'MEAS:FREQ? CHAN{channel}',
            'period': f'MEAS:PER? CHAN{channel}',
            'amplitude': f'MEAS:AMPL? CHAN{channel}',
            'peak_to_peak': f'MEAS:PTP? CHAN{channel}',
            'rms': f'MEAS:RMS? CHAN{channel}',
            'mean': f'MEAS:MEAN? CHAN{channel}',
            'rise_time': f'MEAS:RIS? CHAN{channel}',
            'fall_time': f'MEAS:FALL? CHAN{channel}',
            'pulse_width': f'MEAS:PWID? CHAN{channel}',
            'duty_cycle': f'MEAS:DUTY? CHAN{channel}'
        }
        
        for param, command in meas_commands.items():
            try:
                value = float(self.instrument.query(command))
                measurements[param] = value
            except:
                measurements[param] = None
        
        return measurements
    
    def fft_analysis(self, channel=1):
        """Perform FFT analysis"""
        
        # Enable math channel for FFT
        try:
            self.instrument.write('CALC:MATH:STAT ON')
            self.instrument.write(f'CALC:MATH:EXPR "FFT(CHAN{channel})"')
            
            # Configure FFT
            self.instrument.write('CALC:MATH:FFT:WIND HANN')  # Hanning window
            
            # Get FFT data
            fft_data = self.instrument.query_binary_values('CALC:MATH:DATA?', datatype='f')
            
            # Get frequency axis information
            center_freq = float(self.instrument.query('CALC:MATH:FFT:FREQ:CENT?'))
            span = float(self.instrument.query('CALC:MATH:FFT:FREQ:SPAN?'))
            
            num_points = len(fft_data)
            frequencies = np.linspace(
                center_freq - span/2,
                center_freq + span/2,
                num_points
            )
            
            return frequencies, np.array(fft_data)
            
        except Exception as e:
            print(f"FFT analysis failed: {e}")
            return None, None
    
    def protocol_decode_setup(self, protocol='I2C', **kwargs):
        """Setup protocol decoding"""
        
        if protocol.upper() == 'I2C':
            # I2C setup
            scl_channel = kwargs.get('scl_channel', 1)
            sda_channel = kwargs.get('sda_channel', 2)
            
            self.instrument.write('BUS1:STAT ON')
            self.instrument.write('BUS1:TYPE I2C')
            self.instrument.write(f'BUS1:I2C:SCL:SOUR CHAN{scl_channel}')
            self.instrument.write(f'BUS1:I2C:SDA:SOUR CHAN{sda_channel}')
            
        elif protocol.upper() == 'SPI':
            # SPI setup
            clk_channel = kwargs.get('clk_channel', 1)
            data_channel = kwargs.get('data_channel', 2)
            cs_channel = kwargs.get('cs_channel', 3)
            
            self.instrument.write('BUS1:STAT ON')
            self.instrument.write('BUS1:TYPE SPI')
            self.instrument.write(f'BUS1:SPI:CLK:SOUR CHAN{clk_channel}')
            self.instrument.write(f'BUS1:SPI:DATA:SOUR CHAN{data_channel}')
            self.instrument.write(f'BUS1:SPI:CS:SOUR CHAN{cs_channel}')
    
    def histogram_analysis(self, channel=1):
        """Perform histogram analysis"""
        
        try:
            # Enable histogram
            self.instrument.write(f'HIST{channel}:STAT ON')
            self.instrument.write(f'HIST{channel}:SOUR CHAN{channel}')
            
            # Start histogram acquisition
            self.instrument.write(f'HIST{channel}:RUN')
            time.sleep(2)  # Allow data collection
            
            # Get histogram results
            hist_data = self.instrument.query_ascii_values(f'HIST{channel}:DATA?')
            
            # Get statistics
            mean = float(self.instrument.query(f'HIST{channel}:MEAN?'))
            std_dev = float(self.instrument.query(f'HIST{channel}:SDEV?'))
            
            return {
                'histogram_data': hist_data,
                'mean': mean,
                'std_dev': std_dev
            }
            
        except Exception as e:
            print(f"Histogram analysis failed: {e}")
            return None

# Usage example
rto = RohdeSchwartzOscilloscope('TCPIP::192.168.1.102::hislip0::INSTR')

# Configure channels
rto.configure_channel(1, coupling='DC', scale=0.5, offset=0.0)  # ±2V range
rto.configure_channel(2, coupling='AC', scale=0.1, offset=0.0)  # ±400mV range

# Set timebase
rto.configure_timebase(time_scale=1e-6)  # 1 μs/div

# Setup trigger
rto.setup_trigger(channel=1, level=0.1, edge='POS', mode='NORM')

# Acquire waveforms
waveforms = rto.acquire_all_channels()

# Automated measurements
measurements = rto.automated_measurements(channel=1)
print("Automated measurements:", measurements)

# FFT analysis
frequencies, fft_data = rto.fft_analysis(channel=1)
if fft_data is not None:
    print(f"FFT: {len(fft_data)} points from {frequencies[0]/1e6:.1f} to {frequencies[-1]/1e6:.1f} MHz")

rto.close()

ZNA/ZNB Network Analyzer Control

Vector Network Analysis

class RohdeSchwartzNetworkAnalyzer(RohdeSchwartzInstrument):
    """R&S Vector Network Analyzer controller (ZNA, ZNB, ZNC series)"""
    
    def __init__(self, resource_string):
        super().__init__(resource_string)
        
        # VNA-specific initialization
        self.ports = self._get_port_count()
        self.frequency_range = self._get_frequency_range()
        
        print(f"Network Analyzer info:")
        print(f"  Ports: {self.ports}")
        print(f"  Frequency range: {self.frequency_range}")
    
    def _get_port_count(self):
        """Determine number of ports"""
        try:
            # Query system configuration
            ports = int(self.instrument.query('INST:PORT:COUN?'))
            return ports
        except:
            return 2  # Assume 2-port
    
    def _get_frequency_range(self):
        """Get frequency range"""
        try:
            freq_min = float(self.instrument.query('FREQ:STAR? MIN'))
            freq_max = float(self.instrument.query('FREQ:STOP? MAX'))
            return {
                'min': freq_min,
                'max': freq_max,
                'range': f"{freq_min/1e9:.3f} - {freq_max/1e9:.1f} GHz"
            }
        except:
            return {'min': 10e6, 'max': 8.5e9, 'range': '0.01 - 8.5 GHz'}
    
    def configure_sweep(self, start_freq=1e9, stop_freq=2e9, points=201, if_bandwidth=10e3):
        """Configure frequency sweep"""
        
        self.instrument.write(f'FREQ:STAR {start_freq}')
        self.instrument.write(f'FREQ:STOP {stop_freq}')
        self.instrument.write(f'SWE:POIN {points}')
        self.instrument.write(f'BAND {if_bandwidth}')
        
        # Set sweep mode
        self.instrument.write('SWE:MODE SING')  # Single sweep
    
    def configure_power(self, power_dbm=0, port=1):
        """Configure source power"""
        
        self.instrument.write(f'SOUR{port}:POW {power_dbm}')
        
        # Enable port
        self.instrument.write(f'SOUR{port}:POW:STAT ON')
    
    def setup_measurement(self, s_parameter='S21', measurement_name='Trc1'):
        """Setup S-parameter measurement"""
        
        # Create/select measurement
        self.instrument.write(f'CALC:PAR:SDEF "{measurement_name}","{s_parameter}"')
        self.instrument.write(f'DISP:WIND:TRAC1:FEED "{measurement_name}"')
        
        # Select measurement for data retrieval
        self.instrument.write(f'CALC:PAR:SEL "{measurement_name}"')
    
    def perform_calibration(self, cal_type='FULL_2PORT'):
        """Perform calibration (simplified)"""
        
        print(f"Starting {cal_type} calibration...")
        
        if cal_type == 'FULL_2PORT' and self.ports >= 2:
            # Full 2-port calibration sequence
            cal_standards = ['OPEN', 'SHORT', 'LOAD', 'THRU']
            
            # Start calibration
            self.instrument.write('SENS:CORR:COLL:METH:SOLT2 1,2')  # SOLT 2-port
            
            for port in [1, 2]:
                for standard in cal_standards[:3]:  # OSL on each port
                    print(f"Connect {standard} to port {port}, press Enter when ready...")
                    input()  # Wait for user
                    
                    if standard == 'OPEN':
                        self.instrument.write(f'SENS:CORR:COLL:OPEN {port}')
                    elif standard == 'SHORT':
                        self.instrument.write(f'SENS:CORR:COLL:SHOR {port}')
                    elif standard == 'LOAD':
                        self.instrument.write(f'SENS:CORR:COLL:LOAD {port}')
            
            # THRU connection
            print("Connect THRU between ports 1 and 2, press Enter when ready...")
            input()
            self.instrument.write('SENS:CORR:COLL:THRU 1,2')
            
            # Apply calibration
            self.instrument.write('SENS:CORR:COLL:SAVE:SEL:DEF')
            print("Calibration completed!")
            
        else:
            print(f"Calibration type {cal_type} not implemented")
    
    def measure_s_parameters(self, s_params=['S11', 'S21', 'S12', 'S22']):
        """Measure multiple S-parameters"""
        
        results = {}
        
        for s_param in s_params:
            # Setup measurement for this S-parameter
            meas_name = f"Meas_{s_param}"
            self.setup_measurement(s_parameter=s_param, measurement_name=meas_name)
            
            # Perform sweep
            self.instrument.write('INIT:IMM')
            self.wait_for_operation_complete()
            
            # Get formatted data (complex)
            self.instrument.write('FORM:DATA REAL,64')  # Double precision
            data_real = self.instrument.query_ascii_values('CALC:DATA? FDAT')
            data_imag = self.instrument.query_ascii_values('CALC:DATA? SDAT')
            
            # Combine into complex array
            s_data = np.array(data_real) + 1j * np.array(data_imag)
            
            results[s_param] = s_data
        
        # Get frequency axis
        frequencies = self.instrument.query_ascii_values('FREQ:DATA?')
        
        return frequencies, results
    
    def time_domain_transform(self, s_parameter='S21', transform_type='LOWPASS'):
        """Perform time domain transformation"""
        
        # Setup measurement
        self.setup_measurement(s_parameter=s_parameter)
        
        # Configure time domain
        self.instrument.write(f'CALC:TRAN:TIME:TYPE {transform_type}')
        self.instrument.write('CALC:TRAN:TIME:STAT ON')
        
        # Perform measurement
        self.instrument.write('INIT:IMM')
        self.wait_for_operation_complete()
        
        # Get time domain data
        time_data = self.instrument.query_ascii_values('CALC:DATA:TDOM? FDAT')
        time_axis = self.instrument.query_ascii_values('CALC:DATA:TDOM:STIM?')
        
        return np.array(time_axis), np.array(time_data)
    
    def measure_group_delay(self, s_parameter='S21'):
        """Measure group delay"""
        
        # Setup group delay measurement
        self.instrument.write(f'CALC:PAR:SDEF "GD","{s_parameter}"')
        self.instrument.write('CALC:PAR:SEL "GD"')
        self.instrument.write('CALC:FORM GDEL')  # Group delay format
        
        # Perform measurement
        self.instrument.write('INIT:IMM')
        self.wait_for_operation_complete()
        
        # Get group delay data
        group_delay = self.instrument.query_ascii_values('CALC:DATA? FDAT')
        frequencies = self.instrument.query_ascii_values('FREQ:DATA?')
        
        return np.array(frequencies), np.array(group_delay)
    
    def mixer_conversion_loss(self, lo_frequency, if_frequency, rf_start, rf_stop):
        """Measure mixer conversion loss"""
        
        # Configure for mixer measurement
        self.instrument.write('SENS:MIX:STAT ON')
        self.instrument.write(f'SENS:MIX:LO:FREQ {lo_frequency}')
        self.instrument.write(f'SENS:MIX:IF:FREQ {if_frequency}')
        
        # Set RF frequency range
        self.instrument.write(f'FREQ:STAR {rf_start}')
        self.instrument.write(f'FREQ:STOP {rf_stop}')
        
        # Setup conversion loss measurement
        self.setup_measurement(s_parameter='SC21', measurement_name='Conv_Loss')
        
        # Perform measurement
        self.instrument.write('INIT:IMM')
        self.wait_for_operation_complete()
        
        # Get conversion loss data
        conv_loss = self.instrument.query_ascii_values('CALC:DATA? FDAT')
        frequencies = self.instrument.query_ascii_values('FREQ:DATA?')
        
        return np.array(frequencies), np.array(conv_loss)

# Usage example
vna = RohdeSchwartzNetworkAnalyzer('TCPIP::192.168.1.103::hislip0::INSTR')

# Configure measurement
vna.configure_sweep(start_freq=1e9, stop_freq=2e9, points=401, if_bandwidth=1e3)
vna.configure_power(power_dbm=-10, port=1)

# Measure S-parameters
frequencies, s_params = vna.measure_s_parameters(['S11', 'S21'])

# Convert to dB and phase
s11_db = 20 * np.log10(np.abs(s_params['S11']))
s11_phase = np.angle(s_params['S11']) * 180 / np.pi
s21_db = 20 * np.log10(np.abs(s_params['S21']))

print(f"S11: {np.min(s11_db):.1f} to {np.max(s11_db):.1f} dB")
print(f"S21: {np.min(s21_db):.1f} to {np.max(s21_db):.1f} dB")

# Group delay measurement
gd_frequencies, group_delay = vna.measure_group_delay('S21')
print(f"Group delay: {np.mean(group_delay)*1e9:.1f} ± {np.std(group_delay)*1e9:.1f} ns")

vna.close()

Automated Test System Example

Multi-Instrument R&S Test Setup

class RohdeSchwartzTestSystem:
    """Integrated R&S test system"""
    
    def __init__(self, fsa_resource=None, smw_resource=None, rto_resource=None, vna_resource=None):
        self.instruments = {}
        
        if fsa_resource:
            self.instruments['FSA'] = RohdeSchwartzSpectrumAnalyzer(fsa_resource)
        if smw_resource:
            self.instruments['SMW'] = RohdeSchwartzSignalGenerator(smw_resource)
        if rto_resource:
            self.instruments['RTO'] = RohdeSchwartzOscilloscope(rto_resource)
        if vna_resource:
            self.instruments['VNA'] = RohdeSchwartzNetworkAnalyzer(vna_resource)
        
        print(f"Test system initialized with: {list(self.instruments.keys())}")
    
    def amplifier_characterization(self, input_frequencies, input_powers):
        """Complete amplifier characterization"""
        
        if 'SMW' not in self.instruments or 'FSA' not in self.instruments:
            raise ValueError("SMW and FSA required for amplifier test")
        
        smw = self.instruments['SMW']
        fsa = self.instruments['FSA']
        
        results = []
        
        for freq in input_frequencies:
            for power in input_powers:
                print(f"Testing {freq/1e9:.3f} GHz @ {power} dBm")
                
                # Configure generator
                smw.set_cw_signal(frequency=freq, power=power)
                smw.enable_output(True)
                
                # Configure analyzer
                fsa.configure_spectrum(center_freq=freq, span=100e6, rbw=1e6)
                fsa.set_reference_level(power + 20)  # Expected gain ~20dB
                
                time.sleep(0.5)  # Settling time
                
                # Measure output
                peak = fsa.peak_search()
                
                # Calculate gain
                gain_db = peak['level'] - power
                
                results.append({
                    'input_frequency': freq,
                    'input_power': power,
                    'output_power': peak['level'],
                    'gain_db': gain_db
                })
        
        return results
    
    def phase_noise_vs_temperature(self, frequencies, temperatures):
        """Phase noise vs temperature test"""
        
        if 'FSA' not in self.instruments:
            raise ValueError("FSA required for phase noise test")
        
        fsa = self.instruments['FSA']
        results = []
        
        for temp in temperatures:
            print(f"Set chamber to {temp}°C and press Enter...")
            input()  # Wait for temperature stabilization
            
            for freq in frequencies:
                # Measure phase noise
                pn_results = fsa.phase_noise_measurement(carrier_freq=freq)
                
                if pn_results:
                    result = {
                        'temperature': temp,
                        'frequency': freq,
                        'phase_noise': pn_results
                    }
                    results.append(result)
        
        return results
    
    def close_all(self):
        """Close all instruments"""
        for name, instrument in self.instruments.items():
            print(f"Closing {name}")
            instrument.close()

# Usage
test_system = RohdeSchwartzTestSystem(
    fsa_resource='TCPIP::192.168.1.100::hislip0::INSTR',
    smw_resource='TCPIP::192.168.1.101::hislip0::INSTR'
)

# Amplifier test
frequencies = [1e9, 1.5e9, 2e9, 2.5e9, 3e9]  # 1-3 GHz
powers = [-30, -20, -10, 0]  # -30 to 0 dBm

amp_results = test_system.amplifier_characterization(frequencies, powers)

# Analyze results
for result in amp_results[:5]:  # Show first 5 results
    print(f"{result['input_frequency']/1e9:.1f} GHz, {result['input_power']} dBm → "
          f"{result['gain_db']:.1f} dB gain")

test_system.close_all()

Best Practices for R&S Instruments

Error Handling and Performance

def rs_best_practices_example():
    """Demonstrate R&S instrument best practices"""
    
    # Use HiSLIP for Ethernet connections (faster than VXI-11)
    instrument_ip = "192.168.1.100"
    resource_string = f"TCPIP::{instrument_ip}::hislip0::INSTR"
    
    try:
        rm = pyvisa.ResourceManager()
        instrument = rm.open_resource(resource_string)
        
        # Optimize timeouts for R&S instruments
        instrument.timeout = 30000  # 30 seconds
        
        # Use appropriate data formats
        instrument.write('FORM:DATA REAL,32')  # 32-bit floats
        
        # Check for errors regularly
        def check_and_clear_errors():
            errors = []
            while True:
                error = instrument.query('SYST:ERR?')
                if '0,' in error:
                    break
                errors.append(error.strip())
                if len(errors) > 10:  # Prevent infinite loop
                    break
            return errors
        
        # Example measurement with error checking
        instrument.write('*RST')
        errors = check_and_clear_errors()
        if errors:
            print(f"Reset errors: {errors}")
        
        # Use *OPC? for synchronization
        instrument.write('INIT:IMM')
        instrument.write('*OPC?')
        response = instrument.read()  # Wait for completion
        
        print("✅ Measurement complete")
        
        instrument.close()
        rm.close()
        
    except Exception as e:
        print(f"❌ Error: {e}")

# Common troubleshooting function
def diagnose_rs_connection(resource_string):
    """Diagnose R&S instrument connection"""
    
    print("R&S Instrument Diagnostics")
    print("=" * 30)
    
    try:
        rm = pyvisa.ResourceManager()
        instrument = rm.open_resource(resource_string)
        
        # Basic identification
        idn = instrument.query('*IDN?')
        print(f"Instrument: {idn.strip()}")
        
        # Check options
        try:
            options = instrument.query('*OPT?')
            print(f"Options: {options.strip()}")
        except:
            print("Options query not supported")
        
        # Check error queue
        errors = []
        while True:
            error = instrument.query('SYST:ERR?')
            if '0,' in error:
                break
            errors.append(error.strip())
        
        if errors:
            print(f"Errors found: {errors}")
        else:
            print("No errors in queue")
        
        instrument.close()
        rm.close()
        
        print("✅ Connection successful")
        
    except Exception as e:
        print(f"❌ Connection failed: {e}")
        print("\nTroubleshooting tips:")
        print("1. Check network connectivity")
        print("2. Verify instrument IP address") 
        print("3. Use HiSLIP protocol (::hislip0::INSTR)")
        print("4. Check firewall settings")
        print("5. Update R&S VISA drivers")

# Run diagnostics
diagnose_rs_connection('TCPIP::192.168.1.100::hislip0::INSTR')

Next Steps

How is this guide?