PyVISA
PyVISADocs

Memory Management

Memory optimization techniques for PyVISA applications handling large datasets, preventing memory leaks, and managing instrument resources.

Optimize memory usage in PyVISA applications for better performance, stability, and resource efficiency.

Memory Challenges in Instrument Control

PyVISA applications often face unique memory challenges:

  • Large datasets: Multi-megabyte waveforms and spectra
  • Resource leaks: Unclosed instruments and connections
  • Buffer accumulation: Data buffers that grow over time
  • Long-running applications: Memory growth over hours/days

Resource Management Best Practices

Proper Instrument Lifecycle Management

import pyvisa
import weakref
import atexit
from contextlib import contextmanager

class InstrumentManager:
    """Centralized instrument resource management"""
    
    def __init__(self):
        self.resource_manager = pyvisa.ResourceManager()
        self.open_instruments = weakref.WeakValueDictionary()
        
        # Register cleanup on exit
        atexit.register(self.cleanup_all)
    
    def open_instrument(self, resource_string, **kwargs):
        """Open instrument with automatic cleanup tracking"""
        
        if resource_string in self.open_instruments:
            # Reuse existing connection
            return self.open_instruments[resource_string]
        
        instrument = self.resource_manager.open_resource(resource_string, **kwargs)
        self.open_instruments[resource_string] = instrument
        
        return instrument
    
    def close_instrument(self, resource_string):
        """Close specific instrument"""
        if resource_string in self.open_instruments:
            instrument = self.open_instruments[resource_string]
            instrument.close()
            del self.open_instruments[resource_string]
    
    def cleanup_all(self):
        """Close all open instruments"""
        for resource_string in list(self.open_instruments.keys()):
            self.close_instrument(resource_string)
        
        self.resource_manager.close()

# Global instance
instrument_manager = InstrumentManager()

@contextmanager
def managed_instrument(resource_string, **kwargs):
    """Context manager for automatic instrument cleanup"""
    instrument = instrument_manager.open_instrument(resource_string, **kwargs)
    try:
        yield instrument
    finally:
        # Instrument stays open in manager pool
        pass

# Usage
with managed_instrument('TCPIP::192.168.1.100::INSTR') as scope:
    waveform = scope.query_binary_values('CURVE?', datatype='h')
    # Instrument automatically managed

Memory-Aware Data Acquisition

import numpy as np
import gc
from collections import deque
import psutil

class MemoryAwareDataCollector:
    """Data collector with built-in memory management"""
    
    def __init__(self, max_memory_mb=1000):
        self.max_memory_bytes = max_memory_mb * 1024 * 1024
        self.data_buffer = deque()
        self.total_bytes = 0
        
    def add_measurement(self, data):
        """Add measurement with memory monitoring"""
        
        # Convert to efficient numpy array
        if not isinstance(data, np.ndarray):
            data = np.array(data, dtype=np.float32)  # Use float32 to save memory
        
        data_bytes = data.nbytes
        
        # Check memory limits
        while (self.total_bytes + data_bytes > self.max_memory_bytes and 
               len(self.data_buffer) > 0):
            # Remove oldest data
            old_data = self.data_buffer.popleft()
            self.total_bytes -= old_data.nbytes
            del old_data
            gc.collect()
        
        # Add new data
        self.data_buffer.append(data)
        self.total_bytes += data_bytes
        
        # Memory usage report
        process = psutil.Process()
        memory_mb = process.memory_info().rss / 1024 / 1024
        print(f"Buffer: {len(self.data_buffer)} arrays, "
              f"Memory: {memory_mb:.1f}MB")
    
    def get_recent_data(self, count=None):
        """Get most recent data without copying"""
        if count is None:
            return list(self.data_buffer)
        else:
            return list(self.data_buffer)[-count:]
    
    def clear_all(self):
        """Clear all data and free memory"""
        while self.data_buffer:
            old_data = self.data_buffer.popleft()
            del old_data
        
        self.total_bytes = 0
        gc.collect()

# Usage
collector = MemoryAwareDataCollector(max_memory_mb=500)  # 500MB limit

# Collect data with automatic memory management
for i in range(1000):
    with managed_instrument('TCPIP::192.168.1.100::INSTR') as scope:
        waveform = scope.query_binary_values('CURVE?', datatype='h')
        collector.add_measurement(waveform)

Efficient Data Types and Structures

Choosing Optimal Data Types

import numpy as np
import sys

def compare_data_types():
    """Compare memory usage of different data types"""
    
    # Create test data
    size = 1000000  # 1M samples
    
    # Different data types
    int16_data = np.random.randint(-32768, 32767, size, dtype=np.int16)
    int32_data = int16_data.astype(np.int32)
    float32_data = int16_data.astype(np.float32)  
    float64_data = int16_data.astype(np.float64)
    
    print("Memory usage comparison for 1M samples:")
    print(f"int16:   {int16_data.nbytes / 1024 / 1024:.1f} MB")
    print(f"int32:   {int32_data.nbytes / 1024 / 1024:.1f} MB") 
    print(f"float32: {float32_data.nbytes / 1024 / 1024:.1f} MB")
    print(f"float64: {float64_data.nbytes / 1024 / 1024:.1f} MB")
    
    # Typical output:
    # int16:   1.9 MB
    # int32:   3.8 MB  
    # float32: 3.8 MB
    # float64: 7.6 MB

compare_data_types()

class OptimizedMeasurement:
    """Measurement class with optimized data types"""
    
    def __init__(self, instrument):
        self.instrument = instrument
        
        # Pre-allocate arrays with optimal types
        self.voltage_scale = None
        self.voltage_offset = None
        
    def configure_for_efficiency(self):
        """Configure instrument for memory-efficient acquisition"""
        
        # Get scaling factors once
        preamble = self.instrument.query('WFMPRE?').split(',')
        self.voltage_scale = np.float32(preamble[13])
        self.voltage_offset = np.float32(preamble[14])
        
        # Set 16-bit mode for smaller transfers
        self.instrument.write('DATA:WIDTH 2')
        self.instrument.write('DATA:ENC RIBINARY')
    
    def get_waveform_efficient(self):
        """Get waveform with minimal memory usage"""
        
        # Get raw data as int16 (smallest possible)
        raw_data = self.instrument.query_binary_values('CURVE?', datatype='h')
        
        # Convert directly to desired format without intermediate copies
        # Using float32 instead of float64 saves 50% memory
        voltages = (np.array(raw_data, dtype=np.float32) * 
                   self.voltage_scale + self.voltage_offset)
        
        # Clean up raw data immediately
        del raw_data
        
        return voltages

# Usage
measurement = OptimizedMeasurement(scope)
measurement.configure_for_efficiency()
waveform = measurement.get_waveform_efficient()  # Uses minimal memory

Memory-Mapped Data Storage

import numpy as np
import tempfile
import os

class MemoryMappedDataLogger:
    """Data logger using memory-mapped files for huge datasets"""
    
    def __init__(self, max_samples=100000000, dtype=np.float32):
        self.max_samples = max_samples
        self.dtype = dtype
        self.current_index = 0
        
        # Create temporary memory-mapped file
        self.temp_file = tempfile.NamedTemporaryFile(delete=False)
        self.data_array = np.memmap(
            self.temp_file.name, 
            dtype=dtype, 
            mode='w+', 
            shape=(max_samples,)
        )
        
        print(f"Created memory-mapped storage for {max_samples} samples")
    
    def add_samples(self, samples):
        """Add samples to memory-mapped storage"""
        
        samples_array = np.array(samples, dtype=self.dtype)
        num_samples = len(samples_array)
        
        if self.current_index + num_samples > self.max_samples:
            # Wrap around (circular buffer behavior)
            remaining = self.max_samples - self.current_index
            self.data_array[self.current_index:] = samples_array[:remaining]
            self.data_array[:num_samples - remaining] = samples_array[remaining:]
            self.current_index = num_samples - remaining
        else:
            # Normal append
            end_index = self.current_index + num_samples
            self.data_array[self.current_index:end_index] = samples_array
            self.current_index = end_index
        
        # Force write to disk
        self.data_array.flush()
    
    def get_recent_samples(self, count):
        """Get most recent samples"""
        if count > self.current_index:
            return self.data_array[:self.current_index]
        else:
            start_index = self.current_index - count
            return self.data_array[start_index:self.current_index]
    
    def close(self):
        """Close and clean up memory-mapped file"""
        del self.data_array
        self.temp_file.close()
        os.unlink(self.temp_file.name)

# Usage for very large datasets
logger = MemoryMappedDataLogger(max_samples=1000000000, dtype=np.float32)  # 1B samples

# Log data continuously without running out of memory
for i in range(10000):
    with managed_instrument('TCPIP::192.168.1.100::INSTR') as scope:
        waveform = scope.query_binary_values('CURVE?', datatype='h')
        logger.add_samples(waveform)

# Get recent data for analysis
recent_data = logger.get_recent_samples(1000000)  # Last 1M samples
logger.close()

Garbage Collection Optimization

Manual Memory Management

import gc
import weakref
import time
from memory_profiler import profile

class GarbageCollectionManager:
    """Advanced garbage collection control"""
    
    def __init__(self):
        self.measurement_count = 0
        self.gc_frequency = 100  # Force GC every N measurements
        
    @profile  # Memory profiler decorator
    def collect_with_monitoring(self, force=False):
        """Garbage collect with memory monitoring"""
        
        if not force and self.measurement_count % self.gc_frequency != 0:
            return
        
        # Get memory before cleanup
        process = psutil.Process()
        memory_before = process.memory_info().rss / 1024 / 1024
        
        # Force garbage collection
        collected = gc.collect()
        
        # Get memory after cleanup  
        memory_after = process.memory_info().rss / 1024 / 1024
        memory_freed = memory_before - memory_after
        
        print(f"GC: Collected {collected} objects, "
              f"freed {memory_freed:.1f}MB")
    
    def measurement_complete(self):
        """Call after each measurement"""
        self.measurement_count += 1
        self.collect_with_monitoring()

# Usage with automatic garbage collection
gc_manager = GarbageCollectionManager()

for i in range(1000):
    with managed_instrument('TCPIP::192.168.1.100::INSTR') as scope:
        # Large data acquisition
        waveform = scope.query_binary_values('CURVE?', datatype='h')
        
        # Process data (creating temporary arrays)
        processed = np.fft.fft(waveform)
        power_spectrum = np.abs(processed) ** 2
        
        # Store only what's needed
        results = np.mean(power_spectrum)
        
        # Clear large temporary arrays
        del waveform, processed, power_spectrum
        
        # Trigger garbage collection periodically
        gc_manager.measurement_complete()

Weak References for Callbacks

import weakref
from typing import Callable, Any

class MemoryEfficientCallbackManager:
    """Callback manager that doesn't prevent garbage collection"""
    
    def __init__(self):
        self.callbacks = []  # Store weak references
    
    def add_callback(self, callback: Callable, obj: Any = None):
        """Add callback with weak reference to prevent memory leaks"""
        
        if obj is not None:
            # Method callback - use weak reference to object
            weak_obj = weakref.ref(obj)
            weak_callback = lambda *args, **kwargs: (
                callback(weak_obj(), *args, **kwargs) 
                if weak_obj() is not None else None
            )
            self.callbacks.append(weak_callback)
        else:
            # Function callback - direct storage
            self.callbacks.append(callback)
    
    def trigger_callbacks(self, *args, **kwargs):
        """Trigger all valid callbacks"""
        
        valid_callbacks = []
        
        for callback in self.callbacks:
            try:
                result = callback(*args, **kwargs)
                if result is not None:  # Callback still valid
                    valid_callbacks.append(callback)
            except TypeError:
                # Weak reference became invalid
                pass
        
        # Update list with only valid callbacks
        self.callbacks = valid_callbacks

# Usage
callback_manager = MemoryEfficientCallbackManager()

class DataProcessor:
    def __init__(self, name):
        self.name = name
        
    def process_data(self, data):
        print(f"{self.name} processing {len(data)} samples")

# Add processor with weak reference
processor = DataProcessor("Processor1")
callback_manager.add_callback(DataProcessor.process_data, processor)

# When processor goes out of scope, callback automatically invalidates
del processor
gc.collect()

# Callback manager automatically cleans up invalid references
callback_manager.trigger_callbacks([1, 2, 3, 4, 5])

Memory Profiling and Monitoring

Real-Time Memory Monitoring

import psutil
import matplotlib.pyplot as plt
from collections import deque
import threading
import time

class MemoryMonitor:
    """Real-time memory usage monitoring"""
    
    def __init__(self, history_size=1000):
        self.history_size = history_size
        self.memory_history = deque(maxlen=history_size)
        self.time_history = deque(maxlen=history_size)
        self.monitoring = False
        self.monitor_thread = None
        
    def start_monitoring(self, interval=1.0):
        """Start memory monitoring in background thread"""
        self.monitoring = True
        self.monitor_thread = threading.Thread(
            target=self._monitor_loop, 
            args=(interval,)
        )
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
        
    def _monitor_loop(self, interval):
        """Monitoring loop"""
        process = psutil.Process()
        start_time = time.time()
        
        while self.monitoring:
            current_time = time.time() - start_time
            memory_mb = process.memory_info().rss / 1024 / 1024
            
            self.time_history.append(current_time)
            self.memory_history.append(memory_mb)
            
            time.sleep(interval)
    
    def stop_monitoring(self):
        """Stop monitoring"""
        self.monitoring = False
        if self.monitor_thread:
            self.monitor_thread.join()
    
    def get_statistics(self):
        """Get memory usage statistics"""
        if not self.memory_history:
            return None
            
        memory_array = np.array(self.memory_history)
        
        return {
            'current_mb': memory_array[-1],
            'max_mb': np.max(memory_array),
            'min_mb': np.min(memory_array), 
            'avg_mb': np.mean(memory_array),
            'std_mb': np.std(memory_array)
        }
    
    def plot_usage(self):
        """Plot memory usage over time"""
        plt.figure(figsize=(12, 6))
        plt.plot(list(self.time_history), list(self.memory_history))
        plt.xlabel('Time (seconds)')
        plt.ylabel('Memory Usage (MB)')
        plt.title('Memory Usage Over Time')
        plt.grid(True)
        plt.show()

# Usage
monitor = MemoryMonitor()
monitor.start_monitoring(interval=0.5)  # Monitor every 500ms

# Run memory-intensive operations
for i in range(100):
    large_array = np.random.rand(1000000)  # 8MB array
    processed = np.fft.fft(large_array)
    result = np.mean(processed)
    
    # Cleanup
    del large_array, processed
    
    if i % 10 == 0:
        gc.collect()  # Periodic garbage collection

stats = monitor.get_statistics()
monitor.stop_monitoring()

print(f"Memory Statistics:")
print(f"Current: {stats['current_mb']:.1f} MB")
print(f"Maximum: {stats['max_mb']:.1f} MB")  
print(f"Average: {stats['avg_mb']:.1f} MB")

monitor.plot_usage()

Memory Leak Detection

import tracemalloc
import linecache

class MemoryLeakDetector:
    """Detect and analyze memory leaks"""
    
    def __init__(self):
        self.snapshots = []
        
    def start_tracing(self):
        """Start memory tracing"""
        tracemalloc.start()
        
    def take_snapshot(self, label=""):
        """Take memory snapshot"""
        snapshot = tracemalloc.take_snapshot()
        self.snapshots.append((label, snapshot))
        
    def analyze_growth(self, snapshot1_idx=0, snapshot2_idx=-1):
        """Analyze memory growth between snapshots"""
        
        if len(self.snapshots) < 2:
            print("Need at least 2 snapshots to analyze growth")
            return
            
        label1, snap1 = self.snapshots[snapshot1_idx]
        label2, snap2 = self.snapshots[snapshot2_idx]
        
        # Compare snapshots
        top_stats = snap2.compare_to(snap1, 'lineno')
        
        print(f"\nMemory growth from '{label1}' to '{label2}':")
        print("=" * 60)
        
        for index, stat in enumerate(top_stats[:10], 1):
            print(f"{index}. {stat}")
            
            # Show code context
            frame = stat.traceback.format()[-1]
            print(f"   {frame}")
    
    def get_current_top_allocations(self, limit=10):
        """Get current top memory allocations"""
        snapshot = tracemalloc.take_snapshot()
        top_stats = snapshot.statistics('lineno')
        
        print(f"\nTop {limit} memory allocations:")
        print("=" * 60)
        
        for index, stat in enumerate(top_stats[:limit], 1):
            print(f"{index}. {stat}")

# Usage for leak detection
leak_detector = MemoryLeakDetector()
leak_detector.start_tracing()

# Baseline snapshot
leak_detector.take_snapshot("Baseline")

# Simulate potential memory leak
data_accumulator = []

for i in range(1000):
    # Data acquisition that might leak
    with managed_instrument('TCPIP::192.168.1.100::INSTR') as scope:
        waveform = scope.query_binary_values('CURVE?', datatype='h')
        
        # Potential leak: accumulating data without cleanup
        processed = np.array(waveform, dtype=np.float64)  # Expensive type
        data_accumulator.append(processed[:1000])  # Keep partial data
        
        # Missing cleanup of 'waveform' and full 'processed' array

# Take snapshot after operations
leak_detector.take_snapshot("After 1000 measurements")

# Analyze what grew
leak_detector.analyze_growth()
leak_detector.get_current_top_allocations()

Advanced Optimization Techniques

Object Pooling

from collections import deque
import numpy as np

class NumpyArrayPool:
    """Pool of reusable numpy arrays to reduce allocation overhead"""
    
    def __init__(self, shape, dtype=np.float32, pool_size=10):
        self.shape = shape
        self.dtype = dtype
        self.available_arrays = deque()
        
        # Pre-allocate arrays
        for _ in range(pool_size):
            array = np.zeros(shape, dtype=dtype)
            self.available_arrays.append(array)
    
    def get_array(self):
        """Get array from pool or create new one"""
        if self.available_arrays:
            return self.available_arrays.popleft()
        else:
            # Pool exhausted, create new array
            return np.zeros(self.shape, dtype=self.dtype)
    
    def return_array(self, array):
        """Return array to pool for reuse"""
        if array.shape == self.shape and array.dtype == self.dtype:
            # Clear data and return to pool
            array.fill(0)  # Or use array[:] = 0 
            self.available_arrays.append(array)
        # If wrong shape/type, just let it be garbage collected

# Usage with pooled arrays
waveform_pool = NumpyArrayPool(shape=(1000000,), dtype=np.float32, pool_size=5)

for i in range(1000):
    # Get reusable array instead of allocating new one
    work_array = waveform_pool.get_array()
    
    # Use array for computation
    with managed_instrument('TCPIP::192.168.1.100::INSTR') as scope:
        raw_data = scope.query_binary_values('CURVE?', datatype='h')
        work_array[:len(raw_data)] = raw_data
        
        # Process data in-place when possible
        np.multiply(work_array, 0.001, out=work_array)  # Scale in-place
        
        # Do computations...
        result = np.mean(work_array[:len(raw_data)])
        
    # Return array to pool for reuse
    waveform_pool.return_array(work_array)

Copy-on-Write Data Structures

import numpy as np
from copy import deepcopy

class CopyOnWriteArray:
    """Array wrapper that only copies data when modified"""
    
    def __init__(self, data):
        self._data = data
        self._is_copy = False
        self._original_id = id(data)
        
    def _ensure_copy(self):
        """Make sure we have our own copy before modifying"""
        if not self._is_copy:
            self._data = self._data.copy()
            self._is_copy = True
    
    @property 
    def data(self):
        """Get read-only access to data"""
        return self._data
    
    def modify(self, operation):
        """Modify data, copying only if necessary"""
        self._ensure_copy()
        return operation(self._data)
    
    def __len__(self):
        return len(self._data)
    
    def __getitem__(self, key):
        return self._data[key]
    
    def __setitem__(self, key, value):
        self._ensure_copy()
        self._data[key] = value

# Usage - multiple references share same data until modified
original_waveform = np.random.rand(1000000)

# Create multiple views without copying
view1 = CopyOnWriteArray(original_waveform)  
view2 = CopyOnWriteArray(original_waveform)
view3 = CopyOnWriteArray(original_waveform)

print(f"Memory usage: {view1.data.nbytes / 1024 / 1024:.1f} MB (shared)")

# Only copy when one view is modified
view1[100] = 999.0  # This triggers a copy for view1 only

print(f"After modification:")
print(f"View1 is copy: {view1._is_copy}")  # True
print(f"View2 is copy: {view2._is_copy}")  # False (still shared)
print(f"View3 is copy: {view3._is_copy}")  # False (still shared)

Error Recovery and Resource Cleanup

Exception-Safe Resource Management

import contextlib
import logging

class RobustInstrumentManager:
    """Instrument manager with comprehensive error handling"""
    
    def __init__(self):
        self.active_resources = {}
        self.logger = logging.getLogger(__name__)
        
    @contextlib.contextmanager
    def safe_instrument(self, resource_string, **kwargs):
        """Context manager with guaranteed cleanup"""
        
        instrument = None
        try:
            # Open with error handling
            instrument = pyvisa.ResourceManager().open_resource(
                resource_string, **kwargs
            )
            self.active_resources[resource_string] = instrument
            
            yield instrument
            
        except pyvisa.VisaIOError as e:
            self.logger.error(f"VISA error for {resource_string}: {e}")
            raise
        except Exception as e:
            self.logger.error(f"Unexpected error for {resource_string}: {e}")
            raise
        finally:
            # Guaranteed cleanup
            if instrument is not None:
                try:
                    instrument.close()
                    self.active_resources.pop(resource_string, None)
                except:
                    self.logger.warning(f"Error closing {resource_string}")
    
    def emergency_cleanup(self):
        """Force cleanup of all resources"""
        for resource_string, instrument in list(self.active_resources.items()):
            try:
                instrument.close()
                self.logger.info(f"Emergency close: {resource_string}")
            except:
                self.logger.error(f"Failed emergency close: {resource_string}")
        
        self.active_resources.clear()

# Usage with automatic error handling
robust_manager = RobustInstrumentManager()

try:
    with robust_manager.safe_instrument('TCPIP::192.168.1.100::INSTR') as scope:
        # Operations that might fail
        data = scope.query_binary_values('CURVE?', datatype='h')
        
        # Simulated error condition
        if len(data) == 0:
            raise ValueError("No data received")
            
except Exception as e:
    print(f"Handled error: {e}")
    # Resources still cleaned up automatically

# Emergency cleanup if needed
robust_manager.emergency_cleanup()

Best Practices Summary

Memory Efficiency

  • Use appropriate data types: int16 vs float64 can save 75% memory
  • Enable garbage collection: Force periodic cleanup in long-running apps
  • Memory-map large datasets: For data larger than available RAM
  • Pool reusable objects: Avoid repeated allocation/deallocation

Resource Management

  • Always use context managers: Ensure cleanup even with exceptions
  • Track active resources: Monitor and manage instrument connections
  • Implement weak references: Prevent circular reference leaks
  • Set resource limits: Prevent unbounded memory growth

Monitoring and Debugging

  • Profile memory usage: Use monitoring tools to identify bottlenecks
  • Detect leaks early: Regular snapshot comparison
  • Log resource operations: Track open/close operations
  • Monitor system resources: Watch overall system impact

Common Pitfalls

  • Forgetting to close instruments: Leads to resource exhaustion
  • Accumulating large arrays: Without clearing old data
  • Using inefficient data types: float64 when float32 suffices
  • Circular references: Objects that prevent garbage collection

Next Steps

How is this guide?