Large Data Transfers
Optimize large data transfers from oscilloscopes, spectrum analyzers, and other high-throughput instruments using PyVISA binary protocols and chunking techniques.
Transfer megabytes of instrument data using optimized PyVISA techniques for oscilloscopes, spectrum analyzers, and data loggers.
Overview
Modern instruments can generate massive datasets:
- Oscilloscopes: 10M+ sample waveforms (40+ MB)
- Spectrum analyzers: High-resolution FFTs (10+ MB)
- Data loggers: Hours of high-speed measurements (100+ MB)
Standard PyVISA approaches can be 10-100x slower than optimized transfers.
Binary vs ASCII Transfer Comparison
import pyvisa
import time
import numpy as np
def compare_transfer_methods(instrument):
"""Compare ASCII vs binary transfer performance"""
# ASCII transfer (slow)
start = time.time()
instrument.write('FORM:DATA ASC')
ascii_data = instrument.query('CURV?')
ascii_values = [float(x) for x in ascii_data.split(',')]
ascii_time = time.time() - start
# Binary transfer (fast)
start = time.time()
instrument.write('FORM:DATA REAL,32') # 32-bit floats
binary_values = instrument.query_binary_values('CURV?', datatype='f')
binary_time = time.time() - start
print(f"ASCII transfer: {ascii_time:.2f}s ({len(ascii_values)} points)")
print(f"Binary transfer: {binary_time:.2f}s ({len(binary_values)} points)")
print(f"Speed improvement: {ascii_time/binary_time:.1f}x faster")
return binary_values
# Example results for 1M sample waveform:
# ASCII transfer: 12.3s (1000000 points)
# Binary transfer: 0.8s (1000000 points)
# Speed improvement: 15.4x faster
Optimized Oscilloscope Data Acquisition
High-Speed Waveform Capture
class OptimizedScope:
def __init__(self, resource_string):
self.rm = pyvisa.ResourceManager()
self.scope = self.rm.open_resource(resource_string)
self.scope.timeout = 30000 # 30 second timeout
# Optimize VISA settings for speed
self.scope.write('*RST')
self.scope.write('*CLS')
def setup_for_speed(self):
"""Configure scope for fastest data transfer"""
# Use fastest data format
self.scope.write('DATA:ENC RIBINARY') # Big-endian binary
self.scope.write('DATA:WIDTH 2') # 16-bit samples
# Minimize data preprocessing
self.scope.write('DATA:STOP 10000000') # All available points
self.scope.write('DATA:START 1')
# Disable averaging/filtering for speed
self.scope.write('ACQ:MODE SAMPLE') # Sample mode (fastest)
# Set optimal record length
self.scope.write('HOR:RECORDLENGTH 10000000') # 10M samples
def capture_waveform_optimized(self, channel=1):
"""Capture waveform with maximum speed"""
# Single acquisition for consistency
self.scope.write('ACQ:STOPAFTER RUNSTOP')
self.scope.write('ACQ:STATE ON')
# Wait for acquisition complete
self.scope.write('*OPC?')
self.scope.read()
# Get waveform preamble for scaling
self.scope.write(f'DATA:SOURCE CH{channel}')
preamble = self.scope.query('WFMPRE?').split(',')
y_scale = float(preamble[13]) # Vertical scale
y_offset = float(preamble[14]) # Vertical offset
x_scale = float(preamble[9]) # Horizontal scale
# Binary waveform transfer
start_time = time.time()
raw_data = self.scope.query_binary_values('CURVE?', datatype='h')
transfer_time = time.time() - start_time
# Convert to voltage values
voltages = np.array(raw_data) * y_scale + y_offset
time_base = np.arange(len(voltages)) * x_scale
print(f"Transferred {len(voltages)} points in {transfer_time:.2f}s")
print(f"Transfer rate: {len(voltages)/transfer_time/1e6:.1f} MSa/s")
return time_base, voltages
# Usage
scope = OptimizedScope('USB::0x0699::0x0363::C102912::INSTR')
scope.setup_for_speed()
time_data, voltage_data = scope.capture_waveform_optimized(1)
Chunked Data Transfer for Very Large Datasets
def chunked_waveform_transfer(instrument, total_points, chunk_size=100000):
"""Transfer large waveform in chunks to avoid memory issues"""
all_data = []
chunks_transferred = 0
for start_point in range(1, total_points, chunk_size):
end_point = min(start_point + chunk_size - 1, total_points)
# Set data range for this chunk
instrument.write(f'DATA:START {start_point}')
instrument.write(f'DATA:STOP {end_point}')
# Transfer chunk
chunk_data = instrument.query_binary_values('CURVE?', datatype='h')
all_data.extend(chunk_data)
chunks_transferred += 1
points_so_far = len(all_data)
print(f"Chunk {chunks_transferred}: {points_so_far}/{total_points} points")
return np.array(all_data)
# Usage for 50M point waveform
huge_waveform = chunked_waveform_transfer(scope, 50000000, chunk_size=1000000)
Spectrum Analyzer Optimization
Fast Trace Acquisition
class OptimizedSpectrumAnalyzer:
def __init__(self, resource_string):
self.rm = pyvisa.ResourceManager()
self.sa = self.rm.open_resource(resource_string)
self.sa.timeout = 60000 # 60 seconds for long sweeps
def fast_trace_setup(self):
"""Configure for fastest trace transfer"""
# Binary format for speed
self.sa.write('FORM:DATA REAL,32')
# Single sweep for consistency
self.sa.write('INIT:CONT OFF')
# Optimize sweep settings
self.sa.write('SWE:TYPE AUTO') # Auto sweep type
self.sa.write('AVER:COUN 1') # No averaging
self.sa.write('AVER:STAT OFF') # Disable averaging
def capture_trace(self, start_freq=1e6, stop_freq=1e9, points=10001):
"""Capture spectrum trace with optimized settings"""
# Configure frequency range
self.sa.write(f'FREQ:START {start_freq}')
self.sa.write(f'FREQ:STOP {stop_freq}')
self.sa.write(f'SWE:POIN {points}')
# Start measurement
self.sa.write('INIT:IMM')
self.sa.write('*OPC?')
self.sa.read() # Wait for completion
# Fast binary transfer
start_time = time.time()
trace_data = self.sa.query_binary_values('TRAC? TRACE1', datatype='f')
transfer_time = time.time() - start_time
# Create frequency axis
frequencies = np.linspace(start_freq, stop_freq, len(trace_data))
print(f"Spectrum: {len(trace_data)} points in {transfer_time:.2f}s")
return frequencies, trace_data
# Usage
sa = OptimizedSpectrumAnalyzer('TCPIP::192.168.1.101::INSTR')
sa.fast_trace_setup()
freqs, spectrum = sa.capture_trace(1e6, 6e9, 50001) # 50k points, 1-6 GHz
Memory Management for Large Datasets
Efficient Data Handling
import numpy as np
from scipy import signal
import gc
class EfficientDataHandler:
def __init__(self):
self.data_cache = {}
def process_large_waveform(self, instrument, decimate_factor=10):
"""Process large waveform with memory optimization"""
# Use memory mapping for very large files
total_points = int(instrument.query('HOR:RECORDLENGTH?'))
if total_points > 10000000: # > 10M points
return self._process_with_decimation(instrument, decimate_factor)
else:
return self._process_normal(instrument)
def _process_with_decimation(self, instrument, factor):
"""Process with on-the-fly decimation to save memory"""
# Set scope to return every Nth point
instrument.write(f'DATA:RESAMPLE {factor}')
raw_data = instrument.query_binary_values('CURVE?', datatype='h')
# Convert to float32 (half the memory of float64)
data = np.array(raw_data, dtype=np.float32)
# Clear raw data from memory
del raw_data
gc.collect()
return data
def _process_normal(self, instrument):
"""Normal processing for smaller datasets"""
raw_data = instrument.query_binary_values('CURVE?', datatype='h')
return np.array(raw_data, dtype=np.float32)
def save_efficiently(self, data, filename, compress=True):
"""Save data with optional compression"""
if compress:
# Use compression for large files
np.savez_compressed(f"{filename}.npz", data=data)
print(f"Saved compressed: {filename}.npz")
else:
# Binary format for speed
np.save(f"{filename}.npy", data)
print(f"Saved binary: {filename}.npy")
# Usage
handler = EfficientDataHandler()
waveform = handler.process_large_waveform(scope, decimate_factor=5)
handler.save_efficiently(waveform, "large_waveform", compress=True)
Streaming Data Processing
def streaming_fft_analysis(instrument, chunk_size=1000000):
"""Process FFT in chunks to handle very large waveforms"""
total_points = int(instrument.query('HOR:RECORDLENGTH?'))
sample_rate = float(instrument.query('HOR:SAMPLERATE?'))
# Initialize FFT accumulator
fft_accumulator = None
num_chunks = 0
for start_point in range(1, total_points, chunk_size):
end_point = min(start_point + chunk_size - 1, total_points)
# Get chunk
instrument.write(f'DATA:START {start_point}')
instrument.write(f'DATA:STOP {end_point}')
chunk_data = instrument.query_binary_values('CURVE?', datatype='h')
chunk_array = np.array(chunk_data, dtype=np.float32)
# Compute FFT of chunk
chunk_fft = np.fft.fft(chunk_array)
chunk_power = np.abs(chunk_fft) ** 2
# Accumulate power spectrum
if fft_accumulator is None:
fft_accumulator = chunk_power
else:
fft_accumulator += chunk_power
num_chunks += 1
# Free memory
del chunk_data, chunk_array, chunk_fft, chunk_power
gc.collect()
print(f"Processed chunk {num_chunks}")
# Average and create frequency axis
avg_power_spectrum = fft_accumulator / num_chunks
freqs = np.fft.fftfreq(len(avg_power_spectrum), 1/sample_rate)
return freqs[:len(freqs)//2], avg_power_spectrum[:len(avg_power_spectrum)//2]
# Usage for 100M point waveform FFT
freqs, power_spectrum = streaming_fft_analysis(scope, chunk_size=2000000)
Network Optimization
TCP/IP Settings for High Throughput
def optimize_tcpip_instrument(resource_string):
"""Optimize TCP/IP instrument for high-speed transfers"""
rm = pyvisa.ResourceManager()
# Open with optimized settings
instrument = rm.open_resource(
resource_string,
read_termination='\n',
write_termination='\n',
timeout=60000,
chunk_size=1024*1024, # 1MB chunks
send_end=True
)
# Set instrument network settings if supported
try:
# Increase network buffer sizes
instrument.write('SYST:COMM:LAN:MCON 1') # Multiple connections
instrument.write('SYST:COMM:LAN:BUFF:SIZE 65536') # 64KB buffer
except:
pass # Not all instruments support these
return instrument
# Usage
fast_instrument = optimize_tcpip_instrument('TCPIP::192.168.1.100::INSTR')
Parallel Data Acquisition
import concurrent.futures
import threading
class ParallelDataAcquisition:
def __init__(self, resource_strings):
self.instruments = {}
for i, resource in enumerate(resource_strings):
self.instruments[f'instr_{i}'] = pyvisa.ResourceManager().open_resource(resource)
def parallel_measurements(self, measurement_func, *args):
"""Execute measurements in parallel across instruments"""
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
future_to_instr = {
executor.submit(measurement_func, instr, *args): name
for name, instr in self.instruments.items()
}
for future in concurrent.futures.as_completed(future_to_instr):
instr_name = future_to_instr[future]
try:
results[instr_name] = future.result()
print(f"{instr_name}: measurement complete")
except Exception as e:
print(f"{instr_name}: error - {e}")
return results
def close_all(self):
for instr in self.instruments.values():
instr.close()
def capture_waveform(instrument):
"""Waveform capture function for parallel execution"""
instrument.write('*RST')
instrument.write('DATA:ENC RIB')
waveform = instrument.query_binary_values('CURVE?', datatype='h')
return np.array(waveform)
# Usage with multiple scopes
scope_resources = [
'TCPIP::192.168.1.100::INSTR',
'TCPIP::192.168.1.101::INSTR',
'TCPIP::192.168.1.102::INSTR',
'TCPIP::192.168.1.103::INSTR'
]
parallel_acq = ParallelDataAcquisition(scope_resources)
all_waveforms = parallel_acq.parallel_measurements(capture_waveform)
parallel_acq.close_all()
print(f"Captured {len(all_waveforms)} waveforms simultaneously")
Performance Monitoring and Benchmarking
Transfer Rate Measurement
import time
import psutil
import threading
class TransferMonitor:
def __init__(self):
self.monitoring = False
self.start_time = None
self.bytes_transferred = 0
def start_monitoring(self):
"""Start monitoring transfer rates"""
self.monitoring = True
self.start_time = time.time()
self.bytes_transferred = 0
# Monitor network usage in separate thread
monitor_thread = threading.Thread(target=self._monitor_network)
monitor_thread.daemon = True
monitor_thread.start()
def _monitor_network(self):
"""Monitor network stats during transfer"""
while self.monitoring:
stats = psutil.net_io_counters()
time.sleep(1)
def record_transfer(self, data_size_bytes):
"""Record completed transfer"""
self.bytes_transferred += data_size_bytes
def get_statistics(self):
"""Get transfer statistics"""
if self.start_time is None:
return None
elapsed = time.time() - self.start_time
rate_mbps = (self.bytes_transferred * 8) / (elapsed * 1e6) # Mbps
rate_mbs = self.bytes_transferred / (elapsed * 1e6) # MB/s
return {
'elapsed_time': elapsed,
'bytes_transferred': self.bytes_transferred,
'rate_mbps': rate_mbps,
'rate_mbs': rate_mbs
}
def stop_monitoring(self):
"""Stop monitoring"""
self.monitoring = False
stats = self.get_statistics()
if stats:
print(f"\nTransfer Statistics:")
print(f"Time: {stats['elapsed_time']:.2f} seconds")
print(f"Data: {stats['bytes_transferred']/1e6:.1f} MB")
print(f"Rate: {stats['rate_mbs']:.1f} MB/s ({stats['rate_mbps']:.1f} Mbps)")
return stats
# Usage
monitor = TransferMonitor()
monitor.start_monitoring()
# Perform large data transfer
waveform = scope.query_binary_values('CURVE?', datatype='h')
monitor.record_transfer(len(waveform) * 2) # 2 bytes per sample
stats = monitor.stop_monitoring()
Best Practices Summary
Configuration
- Use binary formats: REAL,32 or INTEGER,16 vs ASCII
- Optimize timeouts: Set based on expected transfer time
- Minimize SCPI overhead: Combine commands when possible
- Use appropriate chunk sizes: 1MB chunks for network transfers
Memory Management
- Use appropriate data types: float32 vs float64
- Process in chunks: For datasets > 100MB
- Enable compression: For long-term storage
- Clean up resources: Explicit memory management
Network Optimization
- Direct Ethernet connection: Avoid WiFi for large transfers
- Increase buffer sizes: Both instrument and PC
- Use dedicated network: Isolate instrument traffic
- Monitor network utilization: Identify bottlenecks
Troubleshooting
- Profile transfer rates: Use monitoring tools
- Check network configuration: MTU size, duplex settings
- Verify instrument settings: Buffer sizes, data formats
- Monitor system resources: CPU, memory, network usage
Next Steps
- Threading guide: Multi-threading Guide
- Memory optimization: Memory Management
- General performance: Performance Guide
How is this guide?