PyVISA
PyVISADocs

Optimization Guide

PyVISA performance optimization - fast data transfers, memory management, connection pooling, async operations, and benchmarking for high-speed instrument control.

PyVISA performance for high-speed applications. From millisecond response times to gigabyte data transfers.

Performance Quick Reference

OperationTypical TimeOptimized TimeKey Technique
Single measurement10-50ms2-5msReduce NPLC, disable autozero
1000 measurements10-50s0.5-2sBurst mode, binary transfer
Large waveform (1MB)5-30s0.5-2sIncrease chunk_size, binary format
Connection setup1-5s0.1-0.5sConnection pooling

1. Fast Data Transfer Techniques

Binary vs ASCII Transfer

ASCII transfer (slow):

import pyvisa
import time

rm = pyvisa.ResourceManager()
scope = rm.open_resource("USB0::0x0699::0x0363::C065089::INSTR")

# ASCII transfer - human readable but slow
start_time = time.time()
scope.write("CURVE?")
ascii_data = scope.read()  # Returns string like "1.23,4.56,7.89,..."
ascii_time = time.time() - start_time

print(f"ASCII transfer: {ascii_time:.3f}s for {len(ascii_data)} chars")

Binary transfer (fast):

# Binary transfer - much faster for large datasets
start_time = time.time()
scope.write("DATA:ENCDG RIBINARY")  # Set binary encoding
scope.write("CURVE?")
binary_data = scope.read_raw()  # Returns bytes
binary_time = time.time() - start_time

print(f"Binary transfer: {binary_time:.3f}s for {len(binary_data)} bytes")
print(f"Speed improvement: {ascii_time / binary_time:.1f}x faster")

# Convert binary to numpy array
import numpy as np
# Assuming 16-bit signed integers, big-endian
waveform = np.frombuffer(binary_data[2:], dtype='>i2')  # Skip 2-byte header

Optimized Binary Data Acquisition

class FastWaveformCapture:
    """Optimized waveform capture class"""
    
    def __init__(self, instrument):
        self.inst = instrument
        self.setup_fast_acquisition()
    
    def setup_fast_acquisition(self):
        """Configure instrument for maximum speed"""
        
        # Increase buffer sizes for large transfers
        self.inst.chunk_size = 1024 * 1024  # 1MB chunks
        self.inst.timeout = 30000  # 30s timeout for large transfers
        
        # Configure instrument for binary transfer
        self.inst.write("DATA:ENCDG RIBINARY")  # Binary encoding
        self.inst.write("DATA:WIDTH 1")        # 1 byte per sample (if sufficient resolution)
        self.inst.write("HEADER OFF")          # Disable headers for speed
        
        # Optimize acquisition settings
        self.inst.write("ACQ:STOPAFTER SEQUENCE")  # Single sequence mode
        self.inst.write("ACQ:STATE OFF")           # Stop acquisition
        
    def capture_waveform_fast(self, channel=1):
        """Capture waveform with maximum speed"""
        
        # Configure channel
        self.inst.write(f"DATA:SOURCE CH{channel}")
        
        # Start acquisition
        self.inst.write("ACQ:STATE ON")
        
        # Wait for acquisition complete
        self.inst.query("*OPC?")  # Wait for operation complete
        
        # Fast binary transfer
        start_time = time.time()
        self.inst.write("CURVE?")
        raw_data = self.inst.read_raw()
        transfer_time = time.time() - start_time
        
        # Parse binary data (skip 2-byte IEEE header)
        waveform_data = np.frombuffer(raw_data[2:], dtype=np.int8)
        
        print(f"Captured {len(waveform_data)} points in {transfer_time:.3f}s")
        print(f"Transfer rate: {len(raw_data) / transfer_time / 1024 / 1024:.1f} MB/s")
        
        return waveform_data, transfer_time

# Example usage
scope = rm.open_resource("USB0::0x0699::0x0363::C065089::INSTR")
fast_capture = FastWaveformCapture(scope)
waveform, time_taken = fast_capture.capture_waveform_fast()

2. High-Speed Measurement Techniques

Burst Measurements

def burst_measurements(instrument, measurement_type="VOLT:DC", count=1000):
    """Perform burst measurements for maximum speed"""
    
    # Configure for speed over accuracy
    instrument.write(f"CONF:{measurement_type}")
    
    if "VOLT" in measurement_type:
        instrument.write(f"{measurement_type}:NPLC 0.02")  # Minimum integration time
        instrument.write(f"{measurement_type}:ZERO:AUTO OFF")  # Disable autozero
        instrument.write(f"{measurement_type}:RANGE:AUTO OFF")  # Fixed range
    
    # Configure for burst
    instrument.write(f"SAMP:COUN {count}")
    instrument.write("TRIG:SOUR IMM")  # Immediate trigger
    
    # Perform burst measurement
    start_time = time.time()
    instrument.write("READ?")
    
    # Read all data at once
    response = instrument.read()
    end_time = time.time()
    
    # Parse results
    values = [float(x) for x in response.split(',')]
    
    measurement_rate = len(values) / (end_time - start_time)
    print(f"Burst measurement: {measurement_rate:.0f} measurements/second")
    
    return values, measurement_rate

# Example usage
dmm = rm.open_resource("USB0::0x2A8D::0x0101::MY53220001::INSTR")
data, rate = burst_measurements(dmm, "VOLT:DC", 500)

Asynchronous Operations

import asyncio
import concurrent.futures

class AsyncInstrumentController:
    """Asynchronous instrument control for parallel operations"""
    
    def __init__(self, instruments):
        self.instruments = instruments
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=len(instruments))
    
    def sync_measurement(self, instrument, command):
        """Synchronous measurement function"""
        try:
            instrument.write(command)
            return float(instrument.read())
        except Exception as e:
            return f"Error: {e}"
    
    async def async_measurement(self, instrument, command):
        """Asynchronous measurement wrapper"""
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            self.executor, 
            self.sync_measurement, 
            instrument, 
            command
        )
    
    async def parallel_measurements(self, commands):
        """Perform measurements on multiple instruments in parallel"""
        
        tasks = []
        for i, command in enumerate(commands):
            if i < len(self.instruments):
                task = self.async_measurement(self.instruments[i], command)
                tasks.append(task)
        
        # Wait for all measurements to complete
        start_time = time.time()
        results = await asyncio.gather(*tasks)
        end_time = time.time()
        
        print(f"Parallel measurements completed in {end_time - start_time:.3f}s")
        return results
    
    def close(self):
        """Clean up resources"""
        self.executor.shutdown(wait=True)

# Example usage with multiple instruments
async def demo_parallel_measurements():
    # Assume we have multiple instruments
    instruments = [
        rm.open_resource("USB0::0x2A8D::0x0101::MY53220001::INSTR"),  # DMM
        rm.open_resource("USB0::0x0699::0x0363::C065089::INSTR"),     # Scope
        # Add more instruments as needed
    ]
    
    controller = AsyncInstrumentController(instruments)
    
    # Perform parallel measurements
    commands = ["*IDN?", "MEAS:VOLT:DC?"]
    results = await controller.parallel_measurements(commands)
    
    for i, result in enumerate(results):
        print(f"Instrument {i}: {result}")
    
    controller.close()

# Run async demo
# asyncio.run(demo_parallel_measurements())

3. Memory Management and Resource Optimization

Connection Pooling

class InstrumentPool:
    """Connection pool for efficient resource management"""
    
    def __init__(self, resource_strings, pool_size=5):
        self.resource_strings = resource_strings
        self.pool_size = pool_size
        self.connections = {}
        self.rm = pyvisa.ResourceManager()
        
    def get_instrument(self, resource_string):
        """Get instrument connection from pool"""
        
        if resource_string not in self.connections:
            if len(self.connections) >= self.pool_size:
                # Remove least recently used connection
                oldest_key = min(self.connections.keys(), 
                               key=lambda k: self.connections[k]['last_used'])
                self.connections[oldest_key]['instrument'].close()
                del self.connections[oldest_key]
            
            # Create new connection
            instrument = self.rm.open_resource(resource_string)
            self.connections[resource_string] = {
                'instrument': instrument,
                'last_used': time.time()
            }
        
        # Update last used time
        self.connections[resource_string]['last_used'] = time.time()
        return self.connections[resource_string]['instrument']
    
    def close_all(self):
        """Close all pooled connections"""
        for conn_info in self.connections.values():
            conn_info['instrument'].close()
        self.connections.clear()
        self.rm.close()

# Example usage
pool = InstrumentPool([
    "USB0::0x2A8D::0x0101::MY53220001::INSTR",
    "USB0::0x0699::0x0363::C065089::INSTR"
])

# Get instruments from pool (reuses connections)
dmm = pool.get_instrument("USB0::0x2A8D::0x0101::MY53220001::INSTR")
scope = pool.get_instrument("USB0::0x0699::0x0363::C065089::INSTR")

# Use instruments...
# pool.close_all()  # Clean up when done

Memory-Efficient Data Handling

def memory_efficient_data_capture(instrument, total_points=1000000):
    """Capture large datasets without overwhelming memory"""
    
    chunk_size = 10000  # Process in chunks
    all_data = []
    
    # Configure instrument
    instrument.write("DATA:ENCDG RIBINARY")
    instrument.chunk_size = 1024 * 1024  # 1MB transfer chunks
    
    for start_point in range(0, total_points, chunk_size):
        end_point = min(start_point + chunk_size, total_points)
        actual_chunk_size = end_point - start_point
        
        # Set data range for this chunk
        instrument.write(f"DATA:START {start_point}")
        instrument.write(f"DATA:STOP {end_point}")
        
        # Capture chunk
        instrument.write("CURVE?")
        chunk_data = instrument.read_raw()
        
        # Process chunk immediately to save memory
        processed_chunk = np.frombuffer(chunk_data[2:], dtype=np.int16)
        
        # Could save to file instead of keeping in memory
        # np.save(f'chunk_{start_point}.npy', processed_chunk)
        
        all_data.extend(processed_chunk)
        
        print(f"Processed chunk {start_point}-{end_point} ({len(processed_chunk)} points)")
        
        # Optional: garbage collect to free memory
        import gc
        gc.collect()
    
    return np.array(all_data)

4. Interface-Specific Optimizations

USB Optimization

def optimize_usb_connection(instrument):
    """Optimize USB connection parameters"""
    
    # Increase buffer sizes for USB
    instrument.chunk_size = 1024 * 1024  # 1MB chunks
    instrument.timeout = 30000  # 30s timeout
    
    # USB-specific optimizations
    try:
        # Some instruments support USB optimization
        instrument.write("SYST:COMM:USB:SPEED HIGH")  # If supported
        
        # Minimize USB packet overhead
        instrument.write("FORM:BORD SWAP")  # Match host byte order
        
    except:
        pass  # Ignore if not supported
    
    print("USB connection optimized")

def usb_bulk_transfer_test(instrument):
    """Test USB bulk transfer performance"""
    
    test_sizes = [1024, 10240, 102400, 1024000]  # 1KB to 1MB
    
    for size in test_sizes:
        # Generate test data
        test_data = b'A' * size
        
        # Measure write performance
        start_time = time.time()
        instrument.write_raw(test_data)
        write_time = time.time() - start_time
        
        write_speed = size / write_time / 1024 / 1024  # MB/s
        print(f"USB Write {size//1024}KB: {write_speed:.1f} MB/s")

Ethernet/TCPIP Optimization

def optimize_ethernet_connection(instrument):
    """Optimize Ethernet connection for high-speed data transfer"""
    
    # Large buffer sizes for network transfer
    instrument.chunk_size = 2 * 1024 * 1024  # 2MB chunks
    instrument.timeout = 60000  # 60s timeout for large transfers
    
    try:
        # Network-specific optimizations
        instrument.write("SYST:COMM:TCPIP:CONTROL?")
        control_conn = instrument.read().strip()
        
        # Use raw socket connection if available
        if 'RAW' in control_conn:
            print("Using raw socket connection for better performance")
    
    except:
        pass

def network_performance_test(ip_address, port=5025):
    """Test network performance with different connection types"""
    
    connection_types = [
        f"TCPIP0::{ip_address}::{port}::SOCKET",
        f"TCPIP0::{ip_address}::inst0::INSTR",
    ]
    
    for conn_type in connection_types:
        try:
            start_time = time.time()
            inst = rm.open_resource(conn_type)
            
            # Test identification query
            idn_start = time.time()
            idn = inst.query("*IDN?")
            idn_time = time.time() - idn_start
            
            connection_time = idn_start - start_time
            
            print(f"{conn_type}:")
            print(f"  Connection time: {connection_time:.3f}s")
            print(f"  Query time: {idn_time:.3f}s")
            
            inst.close()
            
        except Exception as e:
            print(f"{conn_type}: Failed - {e}")

5. Performance Monitoring and Benchmarking

Comprehensive Performance Profiler

import psutil
import threading
import time
from collections import defaultdict

class PyVISAProfiler:
    """Performance profiler for PyVISA applications"""
    
    def __init__(self):
        self.measurements = defaultdict(list)
        self.monitoring = False
        self.monitor_thread = None
    
    def start_monitoring(self):
        """Start system resource monitoring"""
        self.monitoring = True
        self.monitor_thread = threading.Thread(target=self._monitor_resources)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
    
    def stop_monitoring(self):
        """Stop monitoring and return results"""
        self.monitoring = False
        if self.monitor_thread:
            self.monitor_thread.join()
        return dict(self.measurements)
    
    def _monitor_resources(self):
        """Monitor system resources in background"""
        process = psutil.Process()
        
        while self.monitoring:
            # CPU usage
            cpu_percent = process.cpu_percent()
            
            # Memory usage
            memory_info = process.memory_info()
            memory_mb = memory_info.rss / 1024 / 1024
            
            # Network I/O
            net_io = psutil.net_io_counters()
            
            self.measurements['cpu_percent'].append(cpu_percent)
            self.measurements['memory_mb'].append(memory_mb)
            self.measurements['bytes_sent'].append(net_io.bytes_sent)
            self.measurements['bytes_recv'].append(net_io.bytes_recv)
            self.measurements['timestamp'].append(time.time())
            
            time.sleep(0.1)  # Sample every 100ms
    
    def benchmark_operation(self, operation_func, *args, **kwargs):
        """Benchmark a PyVISA operation"""
        
        self.start_monitoring()
        
        start_time = time.time()
        try:
            result = operation_func(*args, **kwargs)
            success = True
            error = None
        except Exception as e:
            result = None
            success = False
            error = str(e)
        end_time = time.time()
        
        resource_usage = self.stop_monitoring()
        
        # Calculate statistics
        if resource_usage['cpu_percent']:
            avg_cpu = sum(resource_usage['cpu_percent']) / len(resource_usage['cpu_percent'])
            max_memory = max(resource_usage['memory_mb'])
        else:
            avg_cpu = 0
            max_memory = 0
        
        benchmark_result = {
            'execution_time': end_time - start_time,
            'success': success,
            'error': error,
            'avg_cpu_percent': avg_cpu,
            'max_memory_mb': max_memory,
            'result': result
        }
        
        return benchmark_result

# Example usage
profiler = PyVISAProfiler()

def test_operation():
    """Test operation to benchmark"""
    dmm = rm.open_resource("USB0::0x2A8D::0x0101::MY53220001::INSTR")
    measurements = []
    for i in range(100):
        dmm.write("MEAS:VOLT:DC?")
        value = float(dmm.read())
        measurements.append(value)
    dmm.close()
    return measurements

# Benchmark the operation
result = profiler.benchmark_operation(test_operation)
print(f"Execution time: {result['execution_time']:.3f}s")
print(f"Average CPU: {result['avg_cpu_percent']:.1f}%")
print(f"Max memory: {result['max_memory_mb']:.1f} MB")

Performance Comparison Framework

def compare_performance_methods():
    """Compare different performance optimization techniques"""
    
    methods = {
        'standard': lambda inst: [float(inst.query("MEAS:VOLT:DC?")) for _ in range(100)],
        'burst': lambda inst: burst_measurements(inst, "VOLT:DC", 100)[0],
        'binary': lambda inst: fast_binary_acquisition(inst, 100),
    }
    
    results = {}
    instrument = rm.open_resource("USB0::0x2A8D::0x0101::MY53220001::INSTR")
    
    for method_name, method_func in methods.items():
        print(f"Testing {method_name} method...")
        
        times = []
        for trial in range(5):  # Run 5 trials
            start_time = time.time()
            try:
                data = method_func(instrument)
                end_time = time.time()
                times.append(end_time - start_time)
                print(f"  Trial {trial + 1}: {times[-1]:.3f}s")
            except Exception as e:
                print(f"  Trial {trial + 1}: Failed - {e}")
                times.append(float('inf'))
        
        avg_time = sum(t for t in times if t != float('inf')) / len([t for t in times if t != float('inf')])
        results[method_name] = {
            'avg_time': avg_time,
            'times': times,
            'success_rate': len([t for t in times if t != float('inf')]) / len(times)
        }
    
    instrument.close()
    
    # Print comparison
    print("\n=== Performance Comparison ===")
    baseline = results['standard']['avg_time']
    for method, result in results.items():
        speedup = baseline / result['avg_time'] if result['avg_time'] > 0 else 0
        print(f"{method:10}: {result['avg_time']:.3f}s ({speedup:.1f}x) - {result['success_rate']:.0%} success")
    
    return results

6. Best Practices Summary

Performance Best Practices

Connection Management:

  • Use connection pooling for repeated connections
  • Keep connections open when performing multiple operations
  • Close connections properly to free resources

Data Transfer:

  • Use binary format for large datasets
  • Increase chunk_size for large transfers
  • Use burst measurements when possible
  • Set appropriate timeout values

Measurement Optimization:

  • Reduce NPLC (integration time) when accuracy permits
  • Disable auto-zero for speed
  • Use fixed ranges instead of auto-ranging
  • Minimize measurement function switching

Memory Management:

  • Process data in chunks for very large datasets
  • Use generators instead of lists when possible
  • Explicitly close instruments and resource managers
  • Consider garbage collection for long-running applications

Performance Pitfalls

  • Opening/closing connections repeatedly
  • Using default small chunk sizes for large transfers
  • Ignoring timeout settings
  • Not handling errors properly (causes retries)
  • Using ASCII format for large numeric datasets
  • Leaving auto-zero enabled when not needed

Performance Targets

  • Simple measurement: < 10ms
  • Complex measurement: < 100ms
  • 1000 point dataset: < 1s
  • 1MB waveform: < 2s
  • Connection establishment: < 500ms

Next Steps

  • Implement async patterns: For multi-instrument setups
  • Profile your application: Use the provided profiling tools
  • Test with your hardware: Performance varies by instrument
  • Monitor in production: Watch for performance degradation over time

For more advanced topics, see:

How is this guide?