PyVISA
PyVISADocs

Large Data Transfers

Optimize large data transfers from oscilloscopes, spectrum analyzers, and other high-throughput instruments using PyVISA binary protocols and chunking techniques.

Transfer megabytes of instrument data using optimized PyVISA techniques for oscilloscopes, spectrum analyzers, and data loggers.

Overview

Modern instruments can generate massive datasets:

  • Oscilloscopes: 10M+ sample waveforms (40+ MB)
  • Spectrum analyzers: High-resolution FFTs (10+ MB)
  • Data loggers: Hours of high-speed measurements (100+ MB)

Standard PyVISA approaches can be 10-100x slower than optimized transfers.

Binary vs ASCII Transfer Comparison

import pyvisa
import time
import numpy as np

def compare_transfer_methods(instrument):
    """Compare ASCII vs binary transfer performance"""
    
    # ASCII transfer (slow)
    start = time.time()
    instrument.write('FORM:DATA ASC')
    ascii_data = instrument.query('CURV?')
    ascii_values = [float(x) for x in ascii_data.split(',')]
    ascii_time = time.time() - start
    
    # Binary transfer (fast)
    start = time.time()
    instrument.write('FORM:DATA REAL,32')  # 32-bit floats
    binary_values = instrument.query_binary_values('CURV?', datatype='f')
    binary_time = time.time() - start
    
    print(f"ASCII transfer: {ascii_time:.2f}s ({len(ascii_values)} points)")
    print(f"Binary transfer: {binary_time:.2f}s ({len(binary_values)} points)")
    print(f"Speed improvement: {ascii_time/binary_time:.1f}x faster")
    
    return binary_values

# Example results for 1M sample waveform:
# ASCII transfer: 12.3s (1000000 points)  
# Binary transfer: 0.8s (1000000 points)
# Speed improvement: 15.4x faster

Optimized Oscilloscope Data Acquisition

High-Speed Waveform Capture

class OptimizedScope:
    def __init__(self, resource_string):
        self.rm = pyvisa.ResourceManager()
        self.scope = self.rm.open_resource(resource_string)
        self.scope.timeout = 30000  # 30 second timeout
        
        # Optimize VISA settings for speed
        self.scope.write('*RST')
        self.scope.write('*CLS') 
        
    def setup_for_speed(self):
        """Configure scope for fastest data transfer"""
        
        # Use fastest data format
        self.scope.write('DATA:ENC RIBINARY')  # Big-endian binary
        self.scope.write('DATA:WIDTH 2')      # 16-bit samples
        
        # Minimize data preprocessing  
        self.scope.write('DATA:STOP 10000000')  # All available points
        self.scope.write('DATA:START 1')
        
        # Disable averaging/filtering for speed
        self.scope.write('ACQ:MODE SAMPLE')   # Sample mode (fastest)
        
        # Set optimal record length
        self.scope.write('HOR:RECORDLENGTH 10000000')  # 10M samples
        
    def capture_waveform_optimized(self, channel=1):
        """Capture waveform with maximum speed"""
        
        # Single acquisition for consistency
        self.scope.write('ACQ:STOPAFTER RUNSTOP')
        self.scope.write('ACQ:STATE ON')
        
        # Wait for acquisition complete
        self.scope.write('*OPC?')
        self.scope.read()
        
        # Get waveform preamble for scaling
        self.scope.write(f'DATA:SOURCE CH{channel}')
        preamble = self.scope.query('WFMPRE?').split(',')
        
        y_scale = float(preamble[13])  # Vertical scale
        y_offset = float(preamble[14]) # Vertical offset  
        x_scale = float(preamble[9])   # Horizontal scale
        
        # Binary waveform transfer
        start_time = time.time()
        raw_data = self.scope.query_binary_values('CURVE?', datatype='h')
        transfer_time = time.time() - start_time
        
        # Convert to voltage values
        voltages = np.array(raw_data) * y_scale + y_offset
        time_base = np.arange(len(voltages)) * x_scale
        
        print(f"Transferred {len(voltages)} points in {transfer_time:.2f}s")
        print(f"Transfer rate: {len(voltages)/transfer_time/1e6:.1f} MSa/s")
        
        return time_base, voltages

# Usage
scope = OptimizedScope('USB::0x0699::0x0363::C102912::INSTR')
scope.setup_for_speed()
time_data, voltage_data = scope.capture_waveform_optimized(1)

Chunked Data Transfer for Very Large Datasets

def chunked_waveform_transfer(instrument, total_points, chunk_size=100000):
    """Transfer large waveform in chunks to avoid memory issues"""
    
    all_data = []
    chunks_transferred = 0
    
    for start_point in range(1, total_points, chunk_size):
        end_point = min(start_point + chunk_size - 1, total_points)
        
        # Set data range for this chunk
        instrument.write(f'DATA:START {start_point}')
        instrument.write(f'DATA:STOP {end_point}')
        
        # Transfer chunk
        chunk_data = instrument.query_binary_values('CURVE?', datatype='h')
        all_data.extend(chunk_data)
        
        chunks_transferred += 1
        points_so_far = len(all_data)
        
        print(f"Chunk {chunks_transferred}: {points_so_far}/{total_points} points")
    
    return np.array(all_data)

# Usage for 50M point waveform
huge_waveform = chunked_waveform_transfer(scope, 50000000, chunk_size=1000000)

Spectrum Analyzer Optimization

Fast Trace Acquisition

class OptimizedSpectrumAnalyzer:
    def __init__(self, resource_string):
        self.rm = pyvisa.ResourceManager()
        self.sa = self.rm.open_resource(resource_string)
        self.sa.timeout = 60000  # 60 seconds for long sweeps
        
    def fast_trace_setup(self):
        """Configure for fastest trace transfer"""
        
        # Binary format for speed
        self.sa.write('FORM:DATA REAL,32')
        
        # Single sweep for consistency
        self.sa.write('INIT:CONT OFF')
        
        # Optimize sweep settings
        self.sa.write('SWE:TYPE AUTO')    # Auto sweep type
        self.sa.write('AVER:COUN 1')      # No averaging
        self.sa.write('AVER:STAT OFF')    # Disable averaging
        
    def capture_trace(self, start_freq=1e6, stop_freq=1e9, points=10001):
        """Capture spectrum trace with optimized settings"""
        
        # Configure frequency range
        self.sa.write(f'FREQ:START {start_freq}')
        self.sa.write(f'FREQ:STOP {stop_freq}') 
        self.sa.write(f'SWE:POIN {points}')
        
        # Start measurement
        self.sa.write('INIT:IMM')
        self.sa.write('*OPC?')
        self.sa.read()  # Wait for completion
        
        # Fast binary transfer
        start_time = time.time()
        trace_data = self.sa.query_binary_values('TRAC? TRACE1', datatype='f')
        transfer_time = time.time() - start_time
        
        # Create frequency axis
        frequencies = np.linspace(start_freq, stop_freq, len(trace_data))
        
        print(f"Spectrum: {len(trace_data)} points in {transfer_time:.2f}s")
        
        return frequencies, trace_data

# Usage
sa = OptimizedSpectrumAnalyzer('TCPIP::192.168.1.101::INSTR')
sa.fast_trace_setup()
freqs, spectrum = sa.capture_trace(1e6, 6e9, 50001)  # 50k points, 1-6 GHz

Memory Management for Large Datasets

Efficient Data Handling

import numpy as np
from scipy import signal
import gc

class EfficientDataHandler:
    def __init__(self):
        self.data_cache = {}
        
    def process_large_waveform(self, instrument, decimate_factor=10):
        """Process large waveform with memory optimization"""
        
        # Use memory mapping for very large files
        total_points = int(instrument.query('HOR:RECORDLENGTH?'))
        
        if total_points > 10000000:  # > 10M points
            return self._process_with_decimation(instrument, decimate_factor)
        else:
            return self._process_normal(instrument)
    
    def _process_with_decimation(self, instrument, factor):
        """Process with on-the-fly decimation to save memory"""
        
        # Set scope to return every Nth point
        instrument.write(f'DATA:RESAMPLE {factor}')
        
        raw_data = instrument.query_binary_values('CURVE?', datatype='h')
        
        # Convert to float32 (half the memory of float64)
        data = np.array(raw_data, dtype=np.float32)
        
        # Clear raw data from memory
        del raw_data
        gc.collect()
        
        return data
    
    def _process_normal(self, instrument):
        """Normal processing for smaller datasets"""
        raw_data = instrument.query_binary_values('CURVE?', datatype='h')
        return np.array(raw_data, dtype=np.float32)
    
    def save_efficiently(self, data, filename, compress=True):
        """Save data with optional compression"""
        
        if compress:
            # Use compression for large files
            np.savez_compressed(f"{filename}.npz", data=data)
            print(f"Saved compressed: {filename}.npz")
        else:
            # Binary format for speed
            np.save(f"{filename}.npy", data)
            print(f"Saved binary: {filename}.npy")

# Usage
handler = EfficientDataHandler()
waveform = handler.process_large_waveform(scope, decimate_factor=5)
handler.save_efficiently(waveform, "large_waveform", compress=True)

Streaming Data Processing

def streaming_fft_analysis(instrument, chunk_size=1000000):
    """Process FFT in chunks to handle very large waveforms"""
    
    total_points = int(instrument.query('HOR:RECORDLENGTH?'))
    sample_rate = float(instrument.query('HOR:SAMPLERATE?'))
    
    # Initialize FFT accumulator
    fft_accumulator = None
    num_chunks = 0
    
    for start_point in range(1, total_points, chunk_size):
        end_point = min(start_point + chunk_size - 1, total_points)
        
        # Get chunk
        instrument.write(f'DATA:START {start_point}')
        instrument.write(f'DATA:STOP {end_point}')
        
        chunk_data = instrument.query_binary_values('CURVE?', datatype='h')
        chunk_array = np.array(chunk_data, dtype=np.float32)
        
        # Compute FFT of chunk
        chunk_fft = np.fft.fft(chunk_array)
        chunk_power = np.abs(chunk_fft) ** 2
        
        # Accumulate power spectrum
        if fft_accumulator is None:
            fft_accumulator = chunk_power
        else:
            fft_accumulator += chunk_power
            
        num_chunks += 1
        
        # Free memory
        del chunk_data, chunk_array, chunk_fft, chunk_power
        gc.collect()
        
        print(f"Processed chunk {num_chunks}")
    
    # Average and create frequency axis
    avg_power_spectrum = fft_accumulator / num_chunks
    freqs = np.fft.fftfreq(len(avg_power_spectrum), 1/sample_rate)
    
    return freqs[:len(freqs)//2], avg_power_spectrum[:len(avg_power_spectrum)//2]

# Usage for 100M point waveform FFT
freqs, power_spectrum = streaming_fft_analysis(scope, chunk_size=2000000)

Network Optimization

TCP/IP Settings for High Throughput

def optimize_tcpip_instrument(resource_string):
    """Optimize TCP/IP instrument for high-speed transfers"""
    
    rm = pyvisa.ResourceManager()
    
    # Open with optimized settings
    instrument = rm.open_resource(
        resource_string,
        read_termination='\n',
        write_termination='\n',
        timeout=60000,
        chunk_size=1024*1024,  # 1MB chunks
        send_end=True
    )
    
    # Set instrument network settings if supported
    try:
        # Increase network buffer sizes
        instrument.write('SYST:COMM:LAN:MCON 1')  # Multiple connections
        instrument.write('SYST:COMM:LAN:BUFF:SIZE 65536')  # 64KB buffer
    except:
        pass  # Not all instruments support these
    
    return instrument

# Usage
fast_instrument = optimize_tcpip_instrument('TCPIP::192.168.1.100::INSTR')

Parallel Data Acquisition

import concurrent.futures
import threading

class ParallelDataAcquisition:
    def __init__(self, resource_strings):
        self.instruments = {}
        for i, resource in enumerate(resource_strings):
            self.instruments[f'instr_{i}'] = pyvisa.ResourceManager().open_resource(resource)
    
    def parallel_measurements(self, measurement_func, *args):
        """Execute measurements in parallel across instruments"""
        
        results = {}
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
            future_to_instr = {
                executor.submit(measurement_func, instr, *args): name 
                for name, instr in self.instruments.items()
            }
            
            for future in concurrent.futures.as_completed(future_to_instr):
                instr_name = future_to_instr[future]
                try:
                    results[instr_name] = future.result()
                    print(f"{instr_name}: measurement complete")
                except Exception as e:
                    print(f"{instr_name}: error - {e}")
                    
        return results
    
    def close_all(self):
        for instr in self.instruments.values():
            instr.close()

def capture_waveform(instrument):
    """Waveform capture function for parallel execution"""
    instrument.write('*RST')
    instrument.write('DATA:ENC RIB')
    waveform = instrument.query_binary_values('CURVE?', datatype='h')
    return np.array(waveform)

# Usage with multiple scopes
scope_resources = [
    'TCPIP::192.168.1.100::INSTR',
    'TCPIP::192.168.1.101::INSTR', 
    'TCPIP::192.168.1.102::INSTR',
    'TCPIP::192.168.1.103::INSTR'
]

parallel_acq = ParallelDataAcquisition(scope_resources)
all_waveforms = parallel_acq.parallel_measurements(capture_waveform)
parallel_acq.close_all()

print(f"Captured {len(all_waveforms)} waveforms simultaneously")

Performance Monitoring and Benchmarking

Transfer Rate Measurement

import time
import psutil
import threading

class TransferMonitor:
    def __init__(self):
        self.monitoring = False
        self.start_time = None
        self.bytes_transferred = 0
        
    def start_monitoring(self):
        """Start monitoring transfer rates"""
        self.monitoring = True
        self.start_time = time.time()
        self.bytes_transferred = 0
        
        # Monitor network usage in separate thread
        monitor_thread = threading.Thread(target=self._monitor_network)
        monitor_thread.daemon = True
        monitor_thread.start()
        
    def _monitor_network(self):
        """Monitor network stats during transfer"""
        while self.monitoring:
            stats = psutil.net_io_counters()
            time.sleep(1)
            
    def record_transfer(self, data_size_bytes):
        """Record completed transfer"""
        self.bytes_transferred += data_size_bytes
        
    def get_statistics(self):
        """Get transfer statistics"""
        if self.start_time is None:
            return None
            
        elapsed = time.time() - self.start_time
        rate_mbps = (self.bytes_transferred * 8) / (elapsed * 1e6)  # Mbps
        rate_mbs = self.bytes_transferred / (elapsed * 1e6)  # MB/s
        
        return {
            'elapsed_time': elapsed,
            'bytes_transferred': self.bytes_transferred,
            'rate_mbps': rate_mbps,
            'rate_mbs': rate_mbs
        }
    
    def stop_monitoring(self):
        """Stop monitoring"""
        self.monitoring = False
        stats = self.get_statistics()
        
        if stats:
            print(f"\nTransfer Statistics:")
            print(f"Time: {stats['elapsed_time']:.2f} seconds")
            print(f"Data: {stats['bytes_transferred']/1e6:.1f} MB")
            print(f"Rate: {stats['rate_mbs']:.1f} MB/s ({stats['rate_mbps']:.1f} Mbps)")
            
        return stats

# Usage
monitor = TransferMonitor()
monitor.start_monitoring()

# Perform large data transfer
waveform = scope.query_binary_values('CURVE?', datatype='h') 
monitor.record_transfer(len(waveform) * 2)  # 2 bytes per sample

stats = monitor.stop_monitoring()

Best Practices Summary

Configuration

  • Use binary formats: REAL,32 or INTEGER,16 vs ASCII
  • Optimize timeouts: Set based on expected transfer time
  • Minimize SCPI overhead: Combine commands when possible
  • Use appropriate chunk sizes: 1MB chunks for network transfers

Memory Management

  • Use appropriate data types: float32 vs float64
  • Process in chunks: For datasets > 100MB
  • Enable compression: For long-term storage
  • Clean up resources: Explicit memory management

Network Optimization

  • Direct Ethernet connection: Avoid WiFi for large transfers
  • Increase buffer sizes: Both instrument and PC
  • Use dedicated network: Isolate instrument traffic
  • Monitor network utilization: Identify bottlenecks

Troubleshooting

  • Profile transfer rates: Use monitoring tools
  • Check network configuration: MTU size, duplex settings
  • Verify instrument settings: Buffer sizes, data formats
  • Monitor system resources: CPU, memory, network usage

Next Steps

How is this guide?