Optimization Guide
PyVISA performance optimization - fast data transfers, memory management, connection pooling, async operations, and benchmarking for high-speed instrument control.
PyVISA performance for high-speed applications. From millisecond response times to gigabyte data transfers.
Performance Quick Reference
Operation | Typical Time | Optimized Time | Key Technique |
---|---|---|---|
Single measurement | 10-50ms | 2-5ms | Reduce NPLC, disable autozero |
1000 measurements | 10-50s | 0.5-2s | Burst mode, binary transfer |
Large waveform (1MB) | 5-30s | 0.5-2s | Increase chunk_size, binary format |
Connection setup | 1-5s | 0.1-0.5s | Connection pooling |
1. Fast Data Transfer Techniques
Binary vs ASCII Transfer
ASCII transfer (slow):
import pyvisa
import time
rm = pyvisa.ResourceManager()
scope = rm.open_resource("USB0::0x0699::0x0363::C065089::INSTR")
# ASCII transfer - human readable but slow
start_time = time.time()
scope.write("CURVE?")
ascii_data = scope.read() # Returns string like "1.23,4.56,7.89,..."
ascii_time = time.time() - start_time
print(f"ASCII transfer: {ascii_time:.3f}s for {len(ascii_data)} chars")
Binary transfer (fast):
# Binary transfer - much faster for large datasets
start_time = time.time()
scope.write("DATA:ENCDG RIBINARY") # Set binary encoding
scope.write("CURVE?")
binary_data = scope.read_raw() # Returns bytes
binary_time = time.time() - start_time
print(f"Binary transfer: {binary_time:.3f}s for {len(binary_data)} bytes")
print(f"Speed improvement: {ascii_time / binary_time:.1f}x faster")
# Convert binary to numpy array
import numpy as np
# Assuming 16-bit signed integers, big-endian
waveform = np.frombuffer(binary_data[2:], dtype='>i2') # Skip 2-byte header
Optimized Binary Data Acquisition
class FastWaveformCapture:
"""Optimized waveform capture class"""
def __init__(self, instrument):
self.inst = instrument
self.setup_fast_acquisition()
def setup_fast_acquisition(self):
"""Configure instrument for maximum speed"""
# Increase buffer sizes for large transfers
self.inst.chunk_size = 1024 * 1024 # 1MB chunks
self.inst.timeout = 30000 # 30s timeout for large transfers
# Configure instrument for binary transfer
self.inst.write("DATA:ENCDG RIBINARY") # Binary encoding
self.inst.write("DATA:WIDTH 1") # 1 byte per sample (if sufficient resolution)
self.inst.write("HEADER OFF") # Disable headers for speed
# Optimize acquisition settings
self.inst.write("ACQ:STOPAFTER SEQUENCE") # Single sequence mode
self.inst.write("ACQ:STATE OFF") # Stop acquisition
def capture_waveform_fast(self, channel=1):
"""Capture waveform with maximum speed"""
# Configure channel
self.inst.write(f"DATA:SOURCE CH{channel}")
# Start acquisition
self.inst.write("ACQ:STATE ON")
# Wait for acquisition complete
self.inst.query("*OPC?") # Wait for operation complete
# Fast binary transfer
start_time = time.time()
self.inst.write("CURVE?")
raw_data = self.inst.read_raw()
transfer_time = time.time() - start_time
# Parse binary data (skip 2-byte IEEE header)
waveform_data = np.frombuffer(raw_data[2:], dtype=np.int8)
print(f"Captured {len(waveform_data)} points in {transfer_time:.3f}s")
print(f"Transfer rate: {len(raw_data) / transfer_time / 1024 / 1024:.1f} MB/s")
return waveform_data, transfer_time
# Example usage
scope = rm.open_resource("USB0::0x0699::0x0363::C065089::INSTR")
fast_capture = FastWaveformCapture(scope)
waveform, time_taken = fast_capture.capture_waveform_fast()
2. High-Speed Measurement Techniques
Burst Measurements
def burst_measurements(instrument, measurement_type="VOLT:DC", count=1000):
"""Perform burst measurements for maximum speed"""
# Configure for speed over accuracy
instrument.write(f"CONF:{measurement_type}")
if "VOLT" in measurement_type:
instrument.write(f"{measurement_type}:NPLC 0.02") # Minimum integration time
instrument.write(f"{measurement_type}:ZERO:AUTO OFF") # Disable autozero
instrument.write(f"{measurement_type}:RANGE:AUTO OFF") # Fixed range
# Configure for burst
instrument.write(f"SAMP:COUN {count}")
instrument.write("TRIG:SOUR IMM") # Immediate trigger
# Perform burst measurement
start_time = time.time()
instrument.write("READ?")
# Read all data at once
response = instrument.read()
end_time = time.time()
# Parse results
values = [float(x) for x in response.split(',')]
measurement_rate = len(values) / (end_time - start_time)
print(f"Burst measurement: {measurement_rate:.0f} measurements/second")
return values, measurement_rate
# Example usage
dmm = rm.open_resource("USB0::0x2A8D::0x0101::MY53220001::INSTR")
data, rate = burst_measurements(dmm, "VOLT:DC", 500)
Asynchronous Operations
import asyncio
import concurrent.futures
class AsyncInstrumentController:
"""Asynchronous instrument control for parallel operations"""
def __init__(self, instruments):
self.instruments = instruments
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=len(instruments))
def sync_measurement(self, instrument, command):
"""Synchronous measurement function"""
try:
instrument.write(command)
return float(instrument.read())
except Exception as e:
return f"Error: {e}"
async def async_measurement(self, instrument, command):
"""Asynchronous measurement wrapper"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
self.sync_measurement,
instrument,
command
)
async def parallel_measurements(self, commands):
"""Perform measurements on multiple instruments in parallel"""
tasks = []
for i, command in enumerate(commands):
if i < len(self.instruments):
task = self.async_measurement(self.instruments[i], command)
tasks.append(task)
# Wait for all measurements to complete
start_time = time.time()
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"Parallel measurements completed in {end_time - start_time:.3f}s")
return results
def close(self):
"""Clean up resources"""
self.executor.shutdown(wait=True)
# Example usage with multiple instruments
async def demo_parallel_measurements():
# Assume we have multiple instruments
instruments = [
rm.open_resource("USB0::0x2A8D::0x0101::MY53220001::INSTR"), # DMM
rm.open_resource("USB0::0x0699::0x0363::C065089::INSTR"), # Scope
# Add more instruments as needed
]
controller = AsyncInstrumentController(instruments)
# Perform parallel measurements
commands = ["*IDN?", "MEAS:VOLT:DC?"]
results = await controller.parallel_measurements(commands)
for i, result in enumerate(results):
print(f"Instrument {i}: {result}")
controller.close()
# Run async demo
# asyncio.run(demo_parallel_measurements())
3. Memory Management and Resource Optimization
Connection Pooling
class InstrumentPool:
"""Connection pool for efficient resource management"""
def __init__(self, resource_strings, pool_size=5):
self.resource_strings = resource_strings
self.pool_size = pool_size
self.connections = {}
self.rm = pyvisa.ResourceManager()
def get_instrument(self, resource_string):
"""Get instrument connection from pool"""
if resource_string not in self.connections:
if len(self.connections) >= self.pool_size:
# Remove least recently used connection
oldest_key = min(self.connections.keys(),
key=lambda k: self.connections[k]['last_used'])
self.connections[oldest_key]['instrument'].close()
del self.connections[oldest_key]
# Create new connection
instrument = self.rm.open_resource(resource_string)
self.connections[resource_string] = {
'instrument': instrument,
'last_used': time.time()
}
# Update last used time
self.connections[resource_string]['last_used'] = time.time()
return self.connections[resource_string]['instrument']
def close_all(self):
"""Close all pooled connections"""
for conn_info in self.connections.values():
conn_info['instrument'].close()
self.connections.clear()
self.rm.close()
# Example usage
pool = InstrumentPool([
"USB0::0x2A8D::0x0101::MY53220001::INSTR",
"USB0::0x0699::0x0363::C065089::INSTR"
])
# Get instruments from pool (reuses connections)
dmm = pool.get_instrument("USB0::0x2A8D::0x0101::MY53220001::INSTR")
scope = pool.get_instrument("USB0::0x0699::0x0363::C065089::INSTR")
# Use instruments...
# pool.close_all() # Clean up when done
Memory-Efficient Data Handling
def memory_efficient_data_capture(instrument, total_points=1000000):
"""Capture large datasets without overwhelming memory"""
chunk_size = 10000 # Process in chunks
all_data = []
# Configure instrument
instrument.write("DATA:ENCDG RIBINARY")
instrument.chunk_size = 1024 * 1024 # 1MB transfer chunks
for start_point in range(0, total_points, chunk_size):
end_point = min(start_point + chunk_size, total_points)
actual_chunk_size = end_point - start_point
# Set data range for this chunk
instrument.write(f"DATA:START {start_point}")
instrument.write(f"DATA:STOP {end_point}")
# Capture chunk
instrument.write("CURVE?")
chunk_data = instrument.read_raw()
# Process chunk immediately to save memory
processed_chunk = np.frombuffer(chunk_data[2:], dtype=np.int16)
# Could save to file instead of keeping in memory
# np.save(f'chunk_{start_point}.npy', processed_chunk)
all_data.extend(processed_chunk)
print(f"Processed chunk {start_point}-{end_point} ({len(processed_chunk)} points)")
# Optional: garbage collect to free memory
import gc
gc.collect()
return np.array(all_data)
4. Interface-Specific Optimizations
USB Optimization
def optimize_usb_connection(instrument):
"""Optimize USB connection parameters"""
# Increase buffer sizes for USB
instrument.chunk_size = 1024 * 1024 # 1MB chunks
instrument.timeout = 30000 # 30s timeout
# USB-specific optimizations
try:
# Some instruments support USB optimization
instrument.write("SYST:COMM:USB:SPEED HIGH") # If supported
# Minimize USB packet overhead
instrument.write("FORM:BORD SWAP") # Match host byte order
except:
pass # Ignore if not supported
print("USB connection optimized")
def usb_bulk_transfer_test(instrument):
"""Test USB bulk transfer performance"""
test_sizes = [1024, 10240, 102400, 1024000] # 1KB to 1MB
for size in test_sizes:
# Generate test data
test_data = b'A' * size
# Measure write performance
start_time = time.time()
instrument.write_raw(test_data)
write_time = time.time() - start_time
write_speed = size / write_time / 1024 / 1024 # MB/s
print(f"USB Write {size//1024}KB: {write_speed:.1f} MB/s")
Ethernet/TCPIP Optimization
def optimize_ethernet_connection(instrument):
"""Optimize Ethernet connection for high-speed data transfer"""
# Large buffer sizes for network transfer
instrument.chunk_size = 2 * 1024 * 1024 # 2MB chunks
instrument.timeout = 60000 # 60s timeout for large transfers
try:
# Network-specific optimizations
instrument.write("SYST:COMM:TCPIP:CONTROL?")
control_conn = instrument.read().strip()
# Use raw socket connection if available
if 'RAW' in control_conn:
print("Using raw socket connection for better performance")
except:
pass
def network_performance_test(ip_address, port=5025):
"""Test network performance with different connection types"""
connection_types = [
f"TCPIP0::{ip_address}::{port}::SOCKET",
f"TCPIP0::{ip_address}::inst0::INSTR",
]
for conn_type in connection_types:
try:
start_time = time.time()
inst = rm.open_resource(conn_type)
# Test identification query
idn_start = time.time()
idn = inst.query("*IDN?")
idn_time = time.time() - idn_start
connection_time = idn_start - start_time
print(f"{conn_type}:")
print(f" Connection time: {connection_time:.3f}s")
print(f" Query time: {idn_time:.3f}s")
inst.close()
except Exception as e:
print(f"{conn_type}: Failed - {e}")
5. Performance Monitoring and Benchmarking
Comprehensive Performance Profiler
import psutil
import threading
import time
from collections import defaultdict
class PyVISAProfiler:
"""Performance profiler for PyVISA applications"""
def __init__(self):
self.measurements = defaultdict(list)
self.monitoring = False
self.monitor_thread = None
def start_monitoring(self):
"""Start system resource monitoring"""
self.monitoring = True
self.monitor_thread = threading.Thread(target=self._monitor_resources)
self.monitor_thread.daemon = True
self.monitor_thread.start()
def stop_monitoring(self):
"""Stop monitoring and return results"""
self.monitoring = False
if self.monitor_thread:
self.monitor_thread.join()
return dict(self.measurements)
def _monitor_resources(self):
"""Monitor system resources in background"""
process = psutil.Process()
while self.monitoring:
# CPU usage
cpu_percent = process.cpu_percent()
# Memory usage
memory_info = process.memory_info()
memory_mb = memory_info.rss / 1024 / 1024
# Network I/O
net_io = psutil.net_io_counters()
self.measurements['cpu_percent'].append(cpu_percent)
self.measurements['memory_mb'].append(memory_mb)
self.measurements['bytes_sent'].append(net_io.bytes_sent)
self.measurements['bytes_recv'].append(net_io.bytes_recv)
self.measurements['timestamp'].append(time.time())
time.sleep(0.1) # Sample every 100ms
def benchmark_operation(self, operation_func, *args, **kwargs):
"""Benchmark a PyVISA operation"""
self.start_monitoring()
start_time = time.time()
try:
result = operation_func(*args, **kwargs)
success = True
error = None
except Exception as e:
result = None
success = False
error = str(e)
end_time = time.time()
resource_usage = self.stop_monitoring()
# Calculate statistics
if resource_usage['cpu_percent']:
avg_cpu = sum(resource_usage['cpu_percent']) / len(resource_usage['cpu_percent'])
max_memory = max(resource_usage['memory_mb'])
else:
avg_cpu = 0
max_memory = 0
benchmark_result = {
'execution_time': end_time - start_time,
'success': success,
'error': error,
'avg_cpu_percent': avg_cpu,
'max_memory_mb': max_memory,
'result': result
}
return benchmark_result
# Example usage
profiler = PyVISAProfiler()
def test_operation():
"""Test operation to benchmark"""
dmm = rm.open_resource("USB0::0x2A8D::0x0101::MY53220001::INSTR")
measurements = []
for i in range(100):
dmm.write("MEAS:VOLT:DC?")
value = float(dmm.read())
measurements.append(value)
dmm.close()
return measurements
# Benchmark the operation
result = profiler.benchmark_operation(test_operation)
print(f"Execution time: {result['execution_time']:.3f}s")
print(f"Average CPU: {result['avg_cpu_percent']:.1f}%")
print(f"Max memory: {result['max_memory_mb']:.1f} MB")
Performance Comparison Framework
def compare_performance_methods():
"""Compare different performance optimization techniques"""
methods = {
'standard': lambda inst: [float(inst.query("MEAS:VOLT:DC?")) for _ in range(100)],
'burst': lambda inst: burst_measurements(inst, "VOLT:DC", 100)[0],
'binary': lambda inst: fast_binary_acquisition(inst, 100),
}
results = {}
instrument = rm.open_resource("USB0::0x2A8D::0x0101::MY53220001::INSTR")
for method_name, method_func in methods.items():
print(f"Testing {method_name} method...")
times = []
for trial in range(5): # Run 5 trials
start_time = time.time()
try:
data = method_func(instrument)
end_time = time.time()
times.append(end_time - start_time)
print(f" Trial {trial + 1}: {times[-1]:.3f}s")
except Exception as e:
print(f" Trial {trial + 1}: Failed - {e}")
times.append(float('inf'))
avg_time = sum(t for t in times if t != float('inf')) / len([t for t in times if t != float('inf')])
results[method_name] = {
'avg_time': avg_time,
'times': times,
'success_rate': len([t for t in times if t != float('inf')]) / len(times)
}
instrument.close()
# Print comparison
print("\n=== Performance Comparison ===")
baseline = results['standard']['avg_time']
for method, result in results.items():
speedup = baseline / result['avg_time'] if result['avg_time'] > 0 else 0
print(f"{method:10}: {result['avg_time']:.3f}s ({speedup:.1f}x) - {result['success_rate']:.0%} success")
return results
6. Best Practices Summary
Performance Best Practices
Connection Management:
- Use connection pooling for repeated connections
- Keep connections open when performing multiple operations
- Close connections properly to free resources
Data Transfer:
- Use binary format for large datasets
- Increase
chunk_size
for large transfers - Use burst measurements when possible
- Set appropriate timeout values
Measurement Optimization:
- Reduce NPLC (integration time) when accuracy permits
- Disable auto-zero for speed
- Use fixed ranges instead of auto-ranging
- Minimize measurement function switching
Memory Management:
- Process data in chunks for very large datasets
- Use generators instead of lists when possible
- Explicitly close instruments and resource managers
- Consider garbage collection for long-running applications
Performance Pitfalls
- Opening/closing connections repeatedly
- Using default small chunk sizes for large transfers
- Ignoring timeout settings
- Not handling errors properly (causes retries)
- Using ASCII format for large numeric datasets
- Leaving auto-zero enabled when not needed
Performance Targets
- Simple measurement: < 10ms
- Complex measurement: < 100ms
- 1000 point dataset: < 1s
- 1MB waveform: < 2s
- Connection establishment: < 500ms
Next Steps
- Implement async patterns: For multi-instrument setups
- Profile your application: Use the provided profiling tools
- Test with your hardware: Performance varies by instrument
- Monitor in production: Watch for performance degradation over time
For more advanced topics, see:
How is this guide?