Memory Management
Memory optimization techniques for PyVISA applications handling large datasets, preventing memory leaks, and managing instrument resources.
Optimize memory usage in PyVISA applications for better performance, stability, and resource efficiency.
Memory Challenges in Instrument Control
PyVISA applications often face unique memory challenges:
- Large datasets: Multi-megabyte waveforms and spectra
- Resource leaks: Unclosed instruments and connections
- Buffer accumulation: Data buffers that grow over time
- Long-running applications: Memory growth over hours/days
Resource Management Best Practices
Proper Instrument Lifecycle Management
import pyvisa
import weakref
import atexit
from contextlib import contextmanager
class InstrumentManager:
"""Centralized instrument resource management"""
def __init__(self):
self.resource_manager = pyvisa.ResourceManager()
self.open_instruments = weakref.WeakValueDictionary()
# Register cleanup on exit
atexit.register(self.cleanup_all)
def open_instrument(self, resource_string, **kwargs):
"""Open instrument with automatic cleanup tracking"""
if resource_string in self.open_instruments:
# Reuse existing connection
return self.open_instruments[resource_string]
instrument = self.resource_manager.open_resource(resource_string, **kwargs)
self.open_instruments[resource_string] = instrument
return instrument
def close_instrument(self, resource_string):
"""Close specific instrument"""
if resource_string in self.open_instruments:
instrument = self.open_instruments[resource_string]
instrument.close()
del self.open_instruments[resource_string]
def cleanup_all(self):
"""Close all open instruments"""
for resource_string in list(self.open_instruments.keys()):
self.close_instrument(resource_string)
self.resource_manager.close()
# Global instance
instrument_manager = InstrumentManager()
@contextmanager
def managed_instrument(resource_string, **kwargs):
"""Context manager for automatic instrument cleanup"""
instrument = instrument_manager.open_instrument(resource_string, **kwargs)
try:
yield instrument
finally:
# Instrument stays open in manager pool
pass
# Usage
with managed_instrument('TCPIP::192.168.1.100::INSTR') as scope:
waveform = scope.query_binary_values('CURVE?', datatype='h')
# Instrument automatically managed
Memory-Aware Data Acquisition
import numpy as np
import gc
from collections import deque
import psutil
class MemoryAwareDataCollector:
"""Data collector with built-in memory management"""
def __init__(self, max_memory_mb=1000):
self.max_memory_bytes = max_memory_mb * 1024 * 1024
self.data_buffer = deque()
self.total_bytes = 0
def add_measurement(self, data):
"""Add measurement with memory monitoring"""
# Convert to efficient numpy array
if not isinstance(data, np.ndarray):
data = np.array(data, dtype=np.float32) # Use float32 to save memory
data_bytes = data.nbytes
# Check memory limits
while (self.total_bytes + data_bytes > self.max_memory_bytes and
len(self.data_buffer) > 0):
# Remove oldest data
old_data = self.data_buffer.popleft()
self.total_bytes -= old_data.nbytes
del old_data
gc.collect()
# Add new data
self.data_buffer.append(data)
self.total_bytes += data_bytes
# Memory usage report
process = psutil.Process()
memory_mb = process.memory_info().rss / 1024 / 1024
print(f"Buffer: {len(self.data_buffer)} arrays, "
f"Memory: {memory_mb:.1f}MB")
def get_recent_data(self, count=None):
"""Get most recent data without copying"""
if count is None:
return list(self.data_buffer)
else:
return list(self.data_buffer)[-count:]
def clear_all(self):
"""Clear all data and free memory"""
while self.data_buffer:
old_data = self.data_buffer.popleft()
del old_data
self.total_bytes = 0
gc.collect()
# Usage
collector = MemoryAwareDataCollector(max_memory_mb=500) # 500MB limit
# Collect data with automatic memory management
for i in range(1000):
with managed_instrument('TCPIP::192.168.1.100::INSTR') as scope:
waveform = scope.query_binary_values('CURVE?', datatype='h')
collector.add_measurement(waveform)
Efficient Data Types and Structures
Choosing Optimal Data Types
import numpy as np
import sys
def compare_data_types():
"""Compare memory usage of different data types"""
# Create test data
size = 1000000 # 1M samples
# Different data types
int16_data = np.random.randint(-32768, 32767, size, dtype=np.int16)
int32_data = int16_data.astype(np.int32)
float32_data = int16_data.astype(np.float32)
float64_data = int16_data.astype(np.float64)
print("Memory usage comparison for 1M samples:")
print(f"int16: {int16_data.nbytes / 1024 / 1024:.1f} MB")
print(f"int32: {int32_data.nbytes / 1024 / 1024:.1f} MB")
print(f"float32: {float32_data.nbytes / 1024 / 1024:.1f} MB")
print(f"float64: {float64_data.nbytes / 1024 / 1024:.1f} MB")
# Typical output:
# int16: 1.9 MB
# int32: 3.8 MB
# float32: 3.8 MB
# float64: 7.6 MB
compare_data_types()
class OptimizedMeasurement:
"""Measurement class with optimized data types"""
def __init__(self, instrument):
self.instrument = instrument
# Pre-allocate arrays with optimal types
self.voltage_scale = None
self.voltage_offset = None
def configure_for_efficiency(self):
"""Configure instrument for memory-efficient acquisition"""
# Get scaling factors once
preamble = self.instrument.query('WFMPRE?').split(',')
self.voltage_scale = np.float32(preamble[13])
self.voltage_offset = np.float32(preamble[14])
# Set 16-bit mode for smaller transfers
self.instrument.write('DATA:WIDTH 2')
self.instrument.write('DATA:ENC RIBINARY')
def get_waveform_efficient(self):
"""Get waveform with minimal memory usage"""
# Get raw data as int16 (smallest possible)
raw_data = self.instrument.query_binary_values('CURVE?', datatype='h')
# Convert directly to desired format without intermediate copies
# Using float32 instead of float64 saves 50% memory
voltages = (np.array(raw_data, dtype=np.float32) *
self.voltage_scale + self.voltage_offset)
# Clean up raw data immediately
del raw_data
return voltages
# Usage
measurement = OptimizedMeasurement(scope)
measurement.configure_for_efficiency()
waveform = measurement.get_waveform_efficient() # Uses minimal memory
Memory-Mapped Data Storage
import numpy as np
import tempfile
import os
class MemoryMappedDataLogger:
"""Data logger using memory-mapped files for huge datasets"""
def __init__(self, max_samples=100000000, dtype=np.float32):
self.max_samples = max_samples
self.dtype = dtype
self.current_index = 0
# Create temporary memory-mapped file
self.temp_file = tempfile.NamedTemporaryFile(delete=False)
self.data_array = np.memmap(
self.temp_file.name,
dtype=dtype,
mode='w+',
shape=(max_samples,)
)
print(f"Created memory-mapped storage for {max_samples} samples")
def add_samples(self, samples):
"""Add samples to memory-mapped storage"""
samples_array = np.array(samples, dtype=self.dtype)
num_samples = len(samples_array)
if self.current_index + num_samples > self.max_samples:
# Wrap around (circular buffer behavior)
remaining = self.max_samples - self.current_index
self.data_array[self.current_index:] = samples_array[:remaining]
self.data_array[:num_samples - remaining] = samples_array[remaining:]
self.current_index = num_samples - remaining
else:
# Normal append
end_index = self.current_index + num_samples
self.data_array[self.current_index:end_index] = samples_array
self.current_index = end_index
# Force write to disk
self.data_array.flush()
def get_recent_samples(self, count):
"""Get most recent samples"""
if count > self.current_index:
return self.data_array[:self.current_index]
else:
start_index = self.current_index - count
return self.data_array[start_index:self.current_index]
def close(self):
"""Close and clean up memory-mapped file"""
del self.data_array
self.temp_file.close()
os.unlink(self.temp_file.name)
# Usage for very large datasets
logger = MemoryMappedDataLogger(max_samples=1000000000, dtype=np.float32) # 1B samples
# Log data continuously without running out of memory
for i in range(10000):
with managed_instrument('TCPIP::192.168.1.100::INSTR') as scope:
waveform = scope.query_binary_values('CURVE?', datatype='h')
logger.add_samples(waveform)
# Get recent data for analysis
recent_data = logger.get_recent_samples(1000000) # Last 1M samples
logger.close()
Garbage Collection Optimization
Manual Memory Management
import gc
import weakref
import time
from memory_profiler import profile
class GarbageCollectionManager:
"""Advanced garbage collection control"""
def __init__(self):
self.measurement_count = 0
self.gc_frequency = 100 # Force GC every N measurements
@profile # Memory profiler decorator
def collect_with_monitoring(self, force=False):
"""Garbage collect with memory monitoring"""
if not force and self.measurement_count % self.gc_frequency != 0:
return
# Get memory before cleanup
process = psutil.Process()
memory_before = process.memory_info().rss / 1024 / 1024
# Force garbage collection
collected = gc.collect()
# Get memory after cleanup
memory_after = process.memory_info().rss / 1024 / 1024
memory_freed = memory_before - memory_after
print(f"GC: Collected {collected} objects, "
f"freed {memory_freed:.1f}MB")
def measurement_complete(self):
"""Call after each measurement"""
self.measurement_count += 1
self.collect_with_monitoring()
# Usage with automatic garbage collection
gc_manager = GarbageCollectionManager()
for i in range(1000):
with managed_instrument('TCPIP::192.168.1.100::INSTR') as scope:
# Large data acquisition
waveform = scope.query_binary_values('CURVE?', datatype='h')
# Process data (creating temporary arrays)
processed = np.fft.fft(waveform)
power_spectrum = np.abs(processed) ** 2
# Store only what's needed
results = np.mean(power_spectrum)
# Clear large temporary arrays
del waveform, processed, power_spectrum
# Trigger garbage collection periodically
gc_manager.measurement_complete()
Weak References for Callbacks
import weakref
from typing import Callable, Any
class MemoryEfficientCallbackManager:
"""Callback manager that doesn't prevent garbage collection"""
def __init__(self):
self.callbacks = [] # Store weak references
def add_callback(self, callback: Callable, obj: Any = None):
"""Add callback with weak reference to prevent memory leaks"""
if obj is not None:
# Method callback - use weak reference to object
weak_obj = weakref.ref(obj)
weak_callback = lambda *args, **kwargs: (
callback(weak_obj(), *args, **kwargs)
if weak_obj() is not None else None
)
self.callbacks.append(weak_callback)
else:
# Function callback - direct storage
self.callbacks.append(callback)
def trigger_callbacks(self, *args, **kwargs):
"""Trigger all valid callbacks"""
valid_callbacks = []
for callback in self.callbacks:
try:
result = callback(*args, **kwargs)
if result is not None: # Callback still valid
valid_callbacks.append(callback)
except TypeError:
# Weak reference became invalid
pass
# Update list with only valid callbacks
self.callbacks = valid_callbacks
# Usage
callback_manager = MemoryEfficientCallbackManager()
class DataProcessor:
def __init__(self, name):
self.name = name
def process_data(self, data):
print(f"{self.name} processing {len(data)} samples")
# Add processor with weak reference
processor = DataProcessor("Processor1")
callback_manager.add_callback(DataProcessor.process_data, processor)
# When processor goes out of scope, callback automatically invalidates
del processor
gc.collect()
# Callback manager automatically cleans up invalid references
callback_manager.trigger_callbacks([1, 2, 3, 4, 5])
Memory Profiling and Monitoring
Real-Time Memory Monitoring
import psutil
import matplotlib.pyplot as plt
from collections import deque
import threading
import time
class MemoryMonitor:
"""Real-time memory usage monitoring"""
def __init__(self, history_size=1000):
self.history_size = history_size
self.memory_history = deque(maxlen=history_size)
self.time_history = deque(maxlen=history_size)
self.monitoring = False
self.monitor_thread = None
def start_monitoring(self, interval=1.0):
"""Start memory monitoring in background thread"""
self.monitoring = True
self.monitor_thread = threading.Thread(
target=self._monitor_loop,
args=(interval,)
)
self.monitor_thread.daemon = True
self.monitor_thread.start()
def _monitor_loop(self, interval):
"""Monitoring loop"""
process = psutil.Process()
start_time = time.time()
while self.monitoring:
current_time = time.time() - start_time
memory_mb = process.memory_info().rss / 1024 / 1024
self.time_history.append(current_time)
self.memory_history.append(memory_mb)
time.sleep(interval)
def stop_monitoring(self):
"""Stop monitoring"""
self.monitoring = False
if self.monitor_thread:
self.monitor_thread.join()
def get_statistics(self):
"""Get memory usage statistics"""
if not self.memory_history:
return None
memory_array = np.array(self.memory_history)
return {
'current_mb': memory_array[-1],
'max_mb': np.max(memory_array),
'min_mb': np.min(memory_array),
'avg_mb': np.mean(memory_array),
'std_mb': np.std(memory_array)
}
def plot_usage(self):
"""Plot memory usage over time"""
plt.figure(figsize=(12, 6))
plt.plot(list(self.time_history), list(self.memory_history))
plt.xlabel('Time (seconds)')
plt.ylabel('Memory Usage (MB)')
plt.title('Memory Usage Over Time')
plt.grid(True)
plt.show()
# Usage
monitor = MemoryMonitor()
monitor.start_monitoring(interval=0.5) # Monitor every 500ms
# Run memory-intensive operations
for i in range(100):
large_array = np.random.rand(1000000) # 8MB array
processed = np.fft.fft(large_array)
result = np.mean(processed)
# Cleanup
del large_array, processed
if i % 10 == 0:
gc.collect() # Periodic garbage collection
stats = monitor.get_statistics()
monitor.stop_monitoring()
print(f"Memory Statistics:")
print(f"Current: {stats['current_mb']:.1f} MB")
print(f"Maximum: {stats['max_mb']:.1f} MB")
print(f"Average: {stats['avg_mb']:.1f} MB")
monitor.plot_usage()
Memory Leak Detection
import tracemalloc
import linecache
class MemoryLeakDetector:
"""Detect and analyze memory leaks"""
def __init__(self):
self.snapshots = []
def start_tracing(self):
"""Start memory tracing"""
tracemalloc.start()
def take_snapshot(self, label=""):
"""Take memory snapshot"""
snapshot = tracemalloc.take_snapshot()
self.snapshots.append((label, snapshot))
def analyze_growth(self, snapshot1_idx=0, snapshot2_idx=-1):
"""Analyze memory growth between snapshots"""
if len(self.snapshots) < 2:
print("Need at least 2 snapshots to analyze growth")
return
label1, snap1 = self.snapshots[snapshot1_idx]
label2, snap2 = self.snapshots[snapshot2_idx]
# Compare snapshots
top_stats = snap2.compare_to(snap1, 'lineno')
print(f"\nMemory growth from '{label1}' to '{label2}':")
print("=" * 60)
for index, stat in enumerate(top_stats[:10], 1):
print(f"{index}. {stat}")
# Show code context
frame = stat.traceback.format()[-1]
print(f" {frame}")
def get_current_top_allocations(self, limit=10):
"""Get current top memory allocations"""
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
print(f"\nTop {limit} memory allocations:")
print("=" * 60)
for index, stat in enumerate(top_stats[:limit], 1):
print(f"{index}. {stat}")
# Usage for leak detection
leak_detector = MemoryLeakDetector()
leak_detector.start_tracing()
# Baseline snapshot
leak_detector.take_snapshot("Baseline")
# Simulate potential memory leak
data_accumulator = []
for i in range(1000):
# Data acquisition that might leak
with managed_instrument('TCPIP::192.168.1.100::INSTR') as scope:
waveform = scope.query_binary_values('CURVE?', datatype='h')
# Potential leak: accumulating data without cleanup
processed = np.array(waveform, dtype=np.float64) # Expensive type
data_accumulator.append(processed[:1000]) # Keep partial data
# Missing cleanup of 'waveform' and full 'processed' array
# Take snapshot after operations
leak_detector.take_snapshot("After 1000 measurements")
# Analyze what grew
leak_detector.analyze_growth()
leak_detector.get_current_top_allocations()
Advanced Optimization Techniques
Object Pooling
from collections import deque
import numpy as np
class NumpyArrayPool:
"""Pool of reusable numpy arrays to reduce allocation overhead"""
def __init__(self, shape, dtype=np.float32, pool_size=10):
self.shape = shape
self.dtype = dtype
self.available_arrays = deque()
# Pre-allocate arrays
for _ in range(pool_size):
array = np.zeros(shape, dtype=dtype)
self.available_arrays.append(array)
def get_array(self):
"""Get array from pool or create new one"""
if self.available_arrays:
return self.available_arrays.popleft()
else:
# Pool exhausted, create new array
return np.zeros(self.shape, dtype=self.dtype)
def return_array(self, array):
"""Return array to pool for reuse"""
if array.shape == self.shape and array.dtype == self.dtype:
# Clear data and return to pool
array.fill(0) # Or use array[:] = 0
self.available_arrays.append(array)
# If wrong shape/type, just let it be garbage collected
# Usage with pooled arrays
waveform_pool = NumpyArrayPool(shape=(1000000,), dtype=np.float32, pool_size=5)
for i in range(1000):
# Get reusable array instead of allocating new one
work_array = waveform_pool.get_array()
# Use array for computation
with managed_instrument('TCPIP::192.168.1.100::INSTR') as scope:
raw_data = scope.query_binary_values('CURVE?', datatype='h')
work_array[:len(raw_data)] = raw_data
# Process data in-place when possible
np.multiply(work_array, 0.001, out=work_array) # Scale in-place
# Do computations...
result = np.mean(work_array[:len(raw_data)])
# Return array to pool for reuse
waveform_pool.return_array(work_array)
Copy-on-Write Data Structures
import numpy as np
from copy import deepcopy
class CopyOnWriteArray:
"""Array wrapper that only copies data when modified"""
def __init__(self, data):
self._data = data
self._is_copy = False
self._original_id = id(data)
def _ensure_copy(self):
"""Make sure we have our own copy before modifying"""
if not self._is_copy:
self._data = self._data.copy()
self._is_copy = True
@property
def data(self):
"""Get read-only access to data"""
return self._data
def modify(self, operation):
"""Modify data, copying only if necessary"""
self._ensure_copy()
return operation(self._data)
def __len__(self):
return len(self._data)
def __getitem__(self, key):
return self._data[key]
def __setitem__(self, key, value):
self._ensure_copy()
self._data[key] = value
# Usage - multiple references share same data until modified
original_waveform = np.random.rand(1000000)
# Create multiple views without copying
view1 = CopyOnWriteArray(original_waveform)
view2 = CopyOnWriteArray(original_waveform)
view3 = CopyOnWriteArray(original_waveform)
print(f"Memory usage: {view1.data.nbytes / 1024 / 1024:.1f} MB (shared)")
# Only copy when one view is modified
view1[100] = 999.0 # This triggers a copy for view1 only
print(f"After modification:")
print(f"View1 is copy: {view1._is_copy}") # True
print(f"View2 is copy: {view2._is_copy}") # False (still shared)
print(f"View3 is copy: {view3._is_copy}") # False (still shared)
Error Recovery and Resource Cleanup
Exception-Safe Resource Management
import contextlib
import logging
class RobustInstrumentManager:
"""Instrument manager with comprehensive error handling"""
def __init__(self):
self.active_resources = {}
self.logger = logging.getLogger(__name__)
@contextlib.contextmanager
def safe_instrument(self, resource_string, **kwargs):
"""Context manager with guaranteed cleanup"""
instrument = None
try:
# Open with error handling
instrument = pyvisa.ResourceManager().open_resource(
resource_string, **kwargs
)
self.active_resources[resource_string] = instrument
yield instrument
except pyvisa.VisaIOError as e:
self.logger.error(f"VISA error for {resource_string}: {e}")
raise
except Exception as e:
self.logger.error(f"Unexpected error for {resource_string}: {e}")
raise
finally:
# Guaranteed cleanup
if instrument is not None:
try:
instrument.close()
self.active_resources.pop(resource_string, None)
except:
self.logger.warning(f"Error closing {resource_string}")
def emergency_cleanup(self):
"""Force cleanup of all resources"""
for resource_string, instrument in list(self.active_resources.items()):
try:
instrument.close()
self.logger.info(f"Emergency close: {resource_string}")
except:
self.logger.error(f"Failed emergency close: {resource_string}")
self.active_resources.clear()
# Usage with automatic error handling
robust_manager = RobustInstrumentManager()
try:
with robust_manager.safe_instrument('TCPIP::192.168.1.100::INSTR') as scope:
# Operations that might fail
data = scope.query_binary_values('CURVE?', datatype='h')
# Simulated error condition
if len(data) == 0:
raise ValueError("No data received")
except Exception as e:
print(f"Handled error: {e}")
# Resources still cleaned up automatically
# Emergency cleanup if needed
robust_manager.emergency_cleanup()
Best Practices Summary
Memory Efficiency
- Use appropriate data types: int16 vs float64 can save 75% memory
- Enable garbage collection: Force periodic cleanup in long-running apps
- Memory-map large datasets: For data larger than available RAM
- Pool reusable objects: Avoid repeated allocation/deallocation
Resource Management
- Always use context managers: Ensure cleanup even with exceptions
- Track active resources: Monitor and manage instrument connections
- Implement weak references: Prevent circular reference leaks
- Set resource limits: Prevent unbounded memory growth
Monitoring and Debugging
- Profile memory usage: Use monitoring tools to identify bottlenecks
- Detect leaks early: Regular snapshot comparison
- Log resource operations: Track open/close operations
- Monitor system resources: Watch overall system impact
Common Pitfalls
- Forgetting to close instruments: Leads to resource exhaustion
- Accumulating large arrays: Without clearing old data
- Using inefficient data types: float64 when float32 suffices
- Circular references: Objects that prevent garbage collection
Next Steps
- Large data transfers: Large Data Transfers
- Multi-threading: Multi-threading Guide
- General optimization: Performance Guide
How is this guide?