PyVISA
PyVISADocs

Multi-threading

Multi-threading techniques for PyVISA applications - parallel instrument control, thread-safe resource management, and async/await patterns.

Use multi-threading to control multiple instruments simultaneously, improve application responsiveness, and maximize measurement throughput.

Threading Fundamentals for Instrument Control

Why Threading Matters for PyVISA

  • Multiple instruments: Control many instruments simultaneously
  • Non-blocking UI: Keep interfaces responsive during slow measurements
  • Pipeline processing: Overlap data acquisition and analysis
  • I/O waiting: Use CPU while waiting for instrument responses

Thread Safety Considerations

import threading
import queue
import time
import pyvisa
from concurrent.futures import ThreadPoolExecutor, as_completed

class ThreadSafeInstrumentManager:
    """Thread-safe instrument resource management"""
    
    def __init__(self):
        self._lock = threading.RLock()  # Reentrant lock
        self._instruments = {}
        self._resource_manager = None
        
    def get_resource_manager(self):
        """Get thread-safe resource manager"""
        with self._lock:
            if self._resource_manager is None:
                self._resource_manager = pyvisa.ResourceManager()
            return self._resource_manager
    
    def open_instrument(self, resource_string, thread_id=None):
        """Open instrument with thread-safe access"""
        
        if thread_id is None:
            thread_id = threading.current_thread().ident
            
        key = f"{resource_string}_{thread_id}"
        
        with self._lock:
            if key not in self._instruments:
                rm = self.get_resource_manager()
                instrument = rm.open_resource(resource_string)
                self._instruments[key] = instrument
                
            return self._instruments[key]
    
    def close_instrument(self, resource_string, thread_id=None):
        """Close instrument thread-safely"""
        
        if thread_id is None:
            thread_id = threading.current_thread().ident
            
        key = f"{resource_string}_{thread_id}"
        
        with self._lock:
            if key in self._instruments:
                self._instruments[key].close()
                del self._instruments[key]
    
    def close_all(self):
        """Close all instruments"""
        with self._lock:
            for instrument in self._instruments.values():
                try:
                    instrument.close()
                except:
                    pass
            self._instruments.clear()

# Global thread-safe manager
thread_safe_manager = ThreadSafeInstrumentManager()

Producer-Consumer Pattern for Data Acquisition

High-Speed Data Collection Pipeline

import threading
import queue
import numpy as np
from collections import namedtuple
import time

# Data structures for pipeline
MeasurementData = namedtuple('MeasurementData', ['timestamp', 'instrument', 'data'])
ProcessedData = namedtuple('ProcessedData', ['timestamp', 'instrument', 'result'])

class DataAcquisitionPipeline:
    """Multi-threaded data acquisition and processing pipeline"""
    
    def __init__(self, max_queue_size=100):
        # Queues for pipeline stages
        self.raw_data_queue = queue.Queue(maxsize=max_queue_size)
        self.processed_data_queue = queue.Queue(maxsize=max_queue_size)
        
        # Threading controls
        self.running = False
        self.acquisition_threads = []
        self.processing_threads = []
        self.storage_thread = None
        
        # Statistics
        self.measurements_count = 0
        self.processing_count = 0
        self.storage_count = 0
        self._stats_lock = threading.Lock()
    
    def start_acquisition_thread(self, resource_string, measurement_interval=1.0):
        """Start data acquisition thread for an instrument"""
        
        def acquisition_worker():
            thread_id = threading.current_thread().ident
            
            try:
                instrument = thread_safe_manager.open_instrument(
                    resource_string, thread_id
                )
                
                # Configure for fast measurements
                instrument.write('*RST')
                instrument.write('CONF:VOLT:DC')  # Example: DC voltage
                
                while self.running:
                    try:
                        # Fast measurement
                        measurement = float(instrument.query('READ?'))
                        
                        # Package data
                        data = MeasurementData(
                            timestamp=time.time(),
                            instrument=resource_string,
                            data=measurement
                        )
                        
                        # Add to processing queue
                        self.raw_data_queue.put(data, timeout=1.0)
                        
                        with self._stats_lock:
                            self.measurements_count += 1
                        
                        time.sleep(measurement_interval)
                        
                    except queue.Full:
                        print(f"Warning: Raw data queue full for {resource_string}")
                    except Exception as e:
                        print(f"Acquisition error for {resource_string}: {e}")
                        
            finally:
                thread_safe_manager.close_instrument(resource_string, thread_id)
        
        thread = threading.Thread(target=acquisition_worker, daemon=True)
        thread.start()
        self.acquisition_threads.append(thread)
        
        return thread
    
    def start_processing_threads(self, num_threads=2):
        """Start data processing threads"""
        
        def processing_worker():
            while self.running:
                try:
                    # Get raw data
                    raw_data = self.raw_data_queue.get(timeout=1.0)
                    
                    # Process data (example: statistical analysis)
                    processed_value = self._process_measurement(raw_data.data)
                    
                    # Package processed data
                    processed = ProcessedData(
                        timestamp=raw_data.timestamp,
                        instrument=raw_data.instrument,
                        result=processed_value
                    )
                    
                    # Send to storage
                    self.processed_data_queue.put(processed, timeout=1.0)
                    
                    with self._stats_lock:
                        self.processing_count += 1
                    
                    # Mark task done
                    self.raw_data_queue.task_done()
                    
                except queue.Empty:
                    continue
                except queue.Full:
                    print("Warning: Processed data queue full")
                except Exception as e:
                    print(f"Processing error: {e}")
        
        for _ in range(num_threads):
            thread = threading.Thread(target=processing_worker, daemon=True)
            thread.start()
            self.processing_threads.append(thread)
    
    def _process_measurement(self, raw_value):
        """Process raw measurement (override for specific processing)"""
        # Example: Simple moving average or other analysis
        return raw_value  # Placeholder
    
    def start_storage_thread(self):
        """Start data storage thread"""
        
        def storage_worker():
            storage_buffer = []
            last_save_time = time.time()
            save_interval = 10.0  # Save every 10 seconds
            
            while self.running:
                try:
                    # Get processed data
                    processed_data = self.processed_data_queue.get(timeout=1.0)
                    storage_buffer.append(processed_data)
                    
                    # Periodic save
                    if (time.time() - last_save_time > save_interval or 
                        len(storage_buffer) >= 1000):
                        
                        self._save_data_batch(storage_buffer)
                        storage_buffer.clear()
                        last_save_time = time.time()
                    
                    with self._stats_lock:
                        self.storage_count += 1
                    
                    self.processed_data_queue.task_done()
                    
                except queue.Empty:
                    # Save any remaining data
                    if storage_buffer:
                        self._save_data_batch(storage_buffer)
                        storage_buffer.clear()
                except Exception as e:
                    print(f"Storage error: {e}")
        
        self.storage_thread = threading.Thread(target=storage_worker, daemon=True)
        self.storage_thread.start()
    
    def _save_data_batch(self, data_batch):
        """Save batch of data (override for specific storage)"""
        print(f"Saving batch of {len(data_batch)} measurements")
        # Example: Save to file, database, etc.
    
    def start_pipeline(self, instrument_configs):
        """Start complete data acquisition pipeline"""
        
        self.running = True
        
        # Start processing and storage threads
        self.start_processing_threads(num_threads=4)
        self.start_storage_thread()
        
        # Start acquisition threads for each instrument
        for config in instrument_configs:
            self.start_acquisition_thread(
                resource_string=config['resource'],
                measurement_interval=config.get('interval', 1.0)
            )
        
        print(f"Pipeline started with {len(instrument_configs)} instruments")
    
    def stop_pipeline(self):
        """Stop pipeline and wait for completion"""
        
        print("Stopping pipeline...")
        self.running = False
        
        # Wait for threads to complete
        for thread in self.acquisition_threads:
            thread.join(timeout=5.0)
        
        # Wait for queues to empty
        self.raw_data_queue.join()
        self.processed_data_queue.join()
        
        print("Pipeline stopped")
    
    def get_statistics(self):
        """Get pipeline statistics"""
        with self._stats_lock:
            return {
                'measurements': self.measurements_count,
                'processed': self.processing_count,
                'stored': self.storage_count,
                'raw_queue_size': self.raw_data_queue.qsize(),
                'processed_queue_size': self.processed_data_queue.qsize()
            }

# Usage example
pipeline = DataAcquisitionPipeline(max_queue_size=1000)

# Configure instruments
instruments = [
    {'resource': 'TCPIP::192.168.1.100::INSTR', 'interval': 0.5},  # 2 Hz
    {'resource': 'TCPIP::192.168.1.101::INSTR', 'interval': 1.0},  # 1 Hz
    {'resource': 'USB::0x0699::0x0363::C102912::INSTR', 'interval': 2.0}  # 0.5 Hz
]

# Start pipeline
pipeline.start_pipeline(instruments)

# Run for a while
time.sleep(30)

# Check statistics
stats = pipeline.get_statistics()
print(f"Statistics: {stats}")

# Stop pipeline
pipeline.stop_pipeline()

Parallel Instrument Control

Concurrent Operations with ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

class ParallelInstrumentController:
    """Control multiple instruments in parallel"""
    
    def __init__(self, max_workers=10):
        self.max_workers = max_workers
        self.instruments = {}
    
    def add_instrument(self, name, resource_string):
        """Add instrument to parallel control group"""
        self.instruments[name] = resource_string
    
    def parallel_measurement(self, measurement_func, *args, **kwargs):
        """Execute measurement function on all instruments in parallel"""
        
        results = {}
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # Submit all tasks
            future_to_instrument = {
                executor.submit(self._instrument_measurement, 
                              name, resource, measurement_func, *args, **kwargs): name
                for name, resource in self.instruments.items()
            }
            
            # Collect results as they complete
            for future in as_completed(future_to_instrument, timeout=60):
                instrument_name = future_to_instrument[future]
                
                try:
                    result = future.result()
                    results[instrument_name] = result
                    print(f"{instrument_name}: completed")
                except Exception as e:
                    print(f"{instrument_name}: error - {e}")
                    results[instrument_name] = None
        
        return results
    
    def _instrument_measurement(self, name, resource_string, func, *args, **kwargs):
        """Execute measurement function on single instrument"""
        
        thread_id = threading.current_thread().ident
        
        try:
            instrument = thread_safe_manager.open_instrument(resource_string, thread_id)
            return func(instrument, *args, **kwargs)
        finally:
            thread_safe_manager.close_instrument(resource_string, thread_id)
    
    def parallel_configuration(self, config_commands):
        """Configure all instruments in parallel"""
        
        def configure_instrument(instrument, commands):
            """Configure single instrument"""
            for command in commands:
                instrument.write(command)
            return len(commands)
        
        return self.parallel_measurement(configure_instrument, config_commands)
    
    def parallel_sweep(self, parameter_name, values, measurement_func):
        """Perform parameter sweep across all instruments"""
        
        all_results = {}
        
        for value in values:
            print(f"Setting {parameter_name} = {value}")
            
            # Set parameter on all instruments
            def set_parameter(instrument, param_value):
                instrument.write(f"{parameter_name} {param_value}")
                time.sleep(0.1)  # Allow settling
                return param_value
            
            set_results = self.parallel_measurement(set_parameter, value)
            
            # Take measurements 
            measure_results = self.parallel_measurement(measurement_func)
            
            # Combine results
            all_results[value] = {
                'settings': set_results,
                'measurements': measure_results
            }
        
        return all_results

# Example measurement functions
def measure_voltage(instrument):
    """Measure DC voltage"""
    return float(instrument.query('MEAS:VOLT:DC?'))

def measure_current(instrument):
    """Measure DC current"""  
    return float(instrument.query('MEAS:CURR:DC?'))

def measure_resistance(instrument):
    """Measure resistance"""
    return float(instrument.query('MEAS:RES?'))

# Usage example
controller = ParallelInstrumentController(max_workers=8)

# Add instruments
controller.add_instrument('DMM1', 'TCPIP::192.168.1.100::INSTR')
controller.add_instrument('DMM2', 'TCPIP::192.168.1.101::INSTR') 
controller.add_instrument('DMM3', 'USB::0x2A8D::0x0318::MY12345678::INSTR')
controller.add_instrument('DMM4', 'GPIB::22::INSTR')

# Parallel configuration
config_commands = ['*RST', 'CONF:VOLT:DC 10,0.001', 'TRIG:SOUR IMM']
config_results = controller.parallel_configuration(config_commands)

# Parallel measurements
voltage_results = controller.parallel_measurement(measure_voltage)
current_results = controller.parallel_measurement(measure_current)

print(f"Voltage measurements: {voltage_results}")
print(f"Current measurements: {current_results}")

# Parameter sweep example
frequency_values = [1000, 2000, 5000, 10000]  # Hz
sweep_results = controller.parallel_sweep('SOUR:FREQ', frequency_values, measure_voltage)

Async/Await Patterns for Modern Python

AsyncIO with PyVISA

import asyncio
import aiofiles
import time
from typing import Dict, List, Optional

class AsyncInstrumentController:
    """Async/await instrument controller"""
    
    def __init__(self):
        self.instruments = {}
        self.semaphore = asyncio.Semaphore(10)  # Limit concurrent operations
    
    async def add_instrument(self, name: str, resource_string: str):
        """Add instrument (thread-safe initialization)"""
        
        def sync_open():
            return thread_safe_manager.open_instrument(resource_string)
        
        # Run blocking operation in thread pool
        loop = asyncio.get_event_loop()
        instrument = await loop.run_in_executor(None, sync_open)
        self.instruments[name] = instrument
    
    async def query_instrument(self, name: str, command: str) -> Optional[str]:
        """Async instrument query"""
        
        if name not in self.instruments:
            return None
        
        async with self.semaphore:  # Limit concurrent queries
            instrument = self.instruments[name]
            
            def sync_query():
                return instrument.query(command)
            
            loop = asyncio.get_event_loop()
            return await loop.run_in_executor(None, sync_query)
    
    async def write_instrument(self, name: str, command: str):
        """Async instrument write"""
        
        if name not in self.instruments:
            return False
        
        async with self.semaphore:
            instrument = self.instruments[name]
            
            def sync_write():
                instrument.write(command)
                return True
            
            loop = asyncio.get_event_loop()
            return await loop.run_in_executor(None, sync_write)
    
    async def measure_all_async(self, command: str) -> Dict[str, float]:
        """Measure all instruments concurrently"""
        
        async def measure_single(name):
            result = await self.query_instrument(name, command)
            try:
                return name, float(result) if result else None
            except ValueError:
                return name, None
        
        # Create tasks for all instruments
        tasks = [measure_single(name) for name in self.instruments.keys()]
        
        # Wait for all to complete
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        return {name: value for name, value in results if not isinstance(value, Exception)}
    
    async def timed_measurements(self, duration: float, interval: float, command: str):
        """Take measurements at regular intervals"""
        
        start_time = time.time()
        measurements = []
        
        while time.time() - start_time < duration:
            timestamp = time.time()
            
            # Measure all instruments
            results = await self.measure_all_async(command)
            
            # Store with timestamp
            measurements.append({
                'timestamp': timestamp,
                'data': results
            })
            
            # Wait for next measurement (non-blocking)
            await asyncio.sleep(interval)
        
        return measurements
    
    async def save_measurements_async(self, measurements: List, filename: str):
        """Save measurements asynchronously"""
        
        async with aiofiles.open(filename, 'w') as f:
            await f.write('timestamp')
            
            # Write header
            if measurements:
                instrument_names = measurements[0]['data'].keys()
                for name in instrument_names:
                    await f.write(f',{name}')
                await f.write('\n')
                
                # Write data
                for measurement in measurements:
                    await f.write(f"{measurement['timestamp']}")
                    for name in instrument_names:
                        value = measurement['data'].get(name, '')
                        await f.write(f',{value}')
                    await f.write('\n')

# Usage example
async def async_measurement_example():
    controller = AsyncInstrumentController()
    
    # Add instruments
    await controller.add_instrument('DMM1', 'TCPIP::192.168.1.100::INSTR')
    await controller.add_instrument('DMM2', 'TCPIP::192.168.1.101::INSTR')
    await controller.add_instrument('DMM3', 'USB::0x2A8D::0x0318::MY12345678::INSTR')
    
    # Configure all instruments concurrently
    config_tasks = []
    for name in controller.instruments.keys():
        config_tasks.append(controller.write_instrument(name, '*RST'))
        config_tasks.append(controller.write_instrument(name, 'CONF:VOLT:DC'))
    
    await asyncio.gather(*config_tasks)
    
    # Take measurements for 60 seconds at 0.5 Hz
    measurements = await controller.timed_measurements(
        duration=60.0,
        interval=2.0, 
        command='READ?'
    )
    
    # Save results asynchronously
    await controller.save_measurements_async(measurements, 'async_measurements.csv')
    
    print(f"Collected {len(measurements)} measurement sets")

# Run async example
# asyncio.run(async_measurement_example())

Thread-Safe Data Structures

Concurrent Data Collection

import threading
import collections
from threading import RLock
import time

class ThreadSafeDataBuffer:
    """Thread-safe circular buffer for measurement data"""
    
    def __init__(self, maxsize=10000):
        self.maxsize = maxsize
        self.buffer = collections.deque(maxlen=maxsize)
        self.lock = RLock()
        self.condition = threading.Condition(self.lock)
        self.total_added = 0
        
    def add(self, data):
        """Add data to buffer (thread-safe)"""
        with self.condition:
            self.buffer.append(data)
            self.total_added += 1
            self.condition.notify_all()  # Wake up waiting consumers
    
    def get_recent(self, count=None):
        """Get recent data (thread-safe)"""
        with self.lock:
            if count is None:
                return list(self.buffer)
            else:
                return list(self.buffer)[-count:]
    
    def wait_for_data(self, min_count=1, timeout=None):
        """Wait until buffer has minimum amount of data"""
        with self.condition:
            return self.condition.wait_for(
                lambda: len(self.buffer) >= min_count, 
                timeout=timeout
            )
    
    def clear(self):
        """Clear buffer (thread-safe)"""
        with self.lock:
            self.buffer.clear()
    
    def get_statistics(self):
        """Get buffer statistics"""
        with self.lock:
            return {
                'current_size': len(self.buffer),
                'max_size': self.maxsize,
                'total_added': self.total_added,
                'utilization': len(self.buffer) / self.maxsize
            }

class ThreadSafeInstrumentResults:
    """Thread-safe storage for instrument measurement results"""
    
    def __init__(self):
        self.results = {}
        self.lock = RLock()
        self.last_update = {}
    
    def update_result(self, instrument_name, measurement):
        """Update result for instrument"""
        with self.lock:
            self.results[instrument_name] = measurement
            self.last_update[instrument_name] = time.time()
    
    def get_all_results(self):
        """Get all current results"""
        with self.lock:
            return self.results.copy()
    
    def get_result(self, instrument_name):
        """Get result for specific instrument"""
        with self.lock:
            return self.results.get(instrument_name)
    
    def get_fresh_results(self, max_age_seconds=10):
        """Get results that are not too old"""
        current_time = time.time()
        fresh_results = {}
        
        with self.lock:
            for name, result in self.results.items():
                last_update = self.last_update.get(name, 0)
                if current_time - last_update <= max_age_seconds:
                    fresh_results[name] = result
        
        return fresh_results

# Usage with threading
shared_buffer = ThreadSafeDataBuffer(maxsize=5000)
shared_results = ThreadSafeInstrumentResults()

def measurement_worker(instrument_name, resource_string):
    """Worker thread for continuous measurement"""
    
    thread_id = threading.current_thread().ident
    
    try:
        instrument = thread_safe_manager.open_instrument(resource_string, thread_id)
        
        while True:  # Continuous measurement
            try:
                measurement = float(instrument.query('READ?'))
                timestamp = time.time()
                
                # Add to shared buffer
                measurement_data = {
                    'instrument': instrument_name,
                    'timestamp': timestamp,
                    'value': measurement
                }
                shared_buffer.add(measurement_data)
                
                # Update shared results
                shared_results.update_result(instrument_name, measurement)
                
                time.sleep(1.0)  # 1 Hz measurement
                
            except Exception as e:
                print(f"Measurement error for {instrument_name}: {e}")
                time.sleep(5.0)  # Wait before retry
                
    except Exception as e:
        print(f"Worker error for {instrument_name}: {e}")
    finally:
        thread_safe_manager.close_instrument(resource_string, thread_id)

# Start measurement threads
instruments = [
    ('DMM1', 'TCPIP::192.168.1.100::INSTR'),
    ('DMM2', 'TCPIP::192.168.1.101::INSTR'),
    ('PSU1', 'USB::0x2A8D::0x0318::MY12345678::INSTR')
]

threads = []
for name, resource in instruments:
    thread = threading.Thread(
        target=measurement_worker, 
        args=(name, resource),
        daemon=True
    )
    thread.start()
    threads.append(thread)

# Monitor results
for _ in range(30):  # Monitor for 30 seconds
    time.sleep(1)
    
    # Get fresh results
    results = shared_results.get_fresh_results(max_age_seconds=5)
    print(f"Current results: {results}")
    
    # Get buffer statistics
    stats = shared_buffer.get_statistics()
    print(f"Buffer stats: {stats}")

Performance Optimization and Monitoring

Thread Pool Optimization

import concurrent.futures
import threading
import time
import psutil

class OptimizedThreadPool:
    """Optimized thread pool for instrument operations"""
    
    def __init__(self, max_workers=None):
        if max_workers is None:
            # Auto-detect optimal thread count
            cpu_count = psutil.cpu_count(logical=True)
            max_workers = min(cpu_count * 2, 20)  # 2x CPU cores, max 20
        
        self.max_workers = max_workers
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.active_tasks = 0
        self.completed_tasks = 0
        self.failed_tasks = 0
        self.lock = threading.Lock()
        
        print(f"Initialized thread pool with {max_workers} workers")
    
    def submit_measurement(self, func, *args, **kwargs):
        """Submit measurement task with monitoring"""
        
        def wrapper():
            with self.lock:
                self.active_tasks += 1
            
            try:
                start_time = time.time()
                result = func(*args, **kwargs)
                duration = time.time() - start_time
                
                with self.lock:
                    self.completed_tasks += 1
                
                return {
                    'result': result,
                    'duration': duration,
                    'success': True
                }
            except Exception as e:
                with self.lock:
                    self.failed_tasks += 1
                
                return {
                    'result': None,
                    'error': str(e),
                    'success': False
                }
            finally:
                with self.lock:
                    self.active_tasks -= 1
        
        return self.executor.submit(wrapper)
    
    def get_statistics(self):
        """Get thread pool statistics"""
        with self.lock:
            return {
                'max_workers': self.max_workers,
                'active_tasks': self.active_tasks,
                'completed_tasks': self.completed_tasks,
                'failed_tasks': self.failed_tasks,
                'success_rate': (self.completed_tasks / 
                               max(self.completed_tasks + self.failed_tasks, 1) * 100)
            }
    
    def shutdown(self, wait=True):
        """Shutdown thread pool"""
        self.executor.shutdown(wait=wait)

# Performance monitoring
class ThreadingPerformanceMonitor:
    """Monitor threading performance"""
    
    def __init__(self):
        self.start_time = None
        self.measurements = []
        self.monitoring = False
        self.monitor_thread = None
    
    def start_monitoring(self):
        """Start performance monitoring"""
        self.monitoring = True
        self.start_time = time.time()
        self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
        self.monitor_thread.start()
    
    def _monitor_loop(self):
        """Monitoring loop"""
        while self.monitoring:
            # System metrics
            cpu_percent = psutil.cpu_percent(interval=1)
            memory_info = psutil.virtual_memory()
            thread_count = threading.active_count()
            
            measurement = {
                'timestamp': time.time() - self.start_time,
                'cpu_percent': cpu_percent,
                'memory_percent': memory_info.percent,
                'memory_mb': memory_info.used / 1024 / 1024,
                'thread_count': thread_count
            }
            
            self.measurements.append(measurement)
            
    def stop_monitoring(self):
        """Stop monitoring and get results"""
        self.monitoring = False
        if self.monitor_thread:
            self.monitor_thread.join()
        
        return self.measurements
    
    def get_summary(self):
        """Get performance summary"""
        if not self.measurements:
            return None
        
        import numpy as np
        
        cpu_values = [m['cpu_percent'] for m in self.measurements]
        memory_values = [m['memory_mb'] for m in self.measurements]
        thread_values = [m['thread_count'] for m in self.measurements]
        
        return {
            'duration': self.measurements[-1]['timestamp'],
            'avg_cpu_percent': np.mean(cpu_values),
            'max_cpu_percent': np.max(cpu_values),
            'avg_memory_mb': np.mean(memory_values),
            'max_memory_mb': np.max(memory_values),
            'avg_threads': np.mean(thread_values),
            'max_threads': np.max(thread_values)
        }

# Usage example with performance monitoring
def benchmark_threaded_measurements():
    """Benchmark different threading approaches"""
    
    monitor = ThreadingPerformanceMonitor()
    pool = OptimizedThreadPool(max_workers=8)
    
    monitor.start_monitoring()
    
    # Submit many measurement tasks
    futures = []
    for i in range(100):
        future = pool.submit_measurement(
            measure_voltage,  # measurement function
            'TCPIP::192.168.1.100::INSTR'  # resource string
        )
        futures.append(future)
    
    # Wait for completion
    results = []
    for future in concurrent.futures.as_completed(futures, timeout=120):
        result = future.result()
        results.append(result)
    
    # Stop monitoring
    measurements = monitor.stop_monitoring()
    summary = monitor.get_summary()
    
    # Thread pool statistics
    pool_stats = pool.get_statistics()
    
    print(f"Performance Summary:")
    print(f"Duration: {summary['duration']:.1f}s")
    print(f"Average CPU: {summary['avg_cpu_percent']:.1f}%")
    print(f"Average Memory: {summary['avg_memory_mb']:.1f}MB")
    print(f"Average Threads: {summary['avg_threads']:.1f}")
    print(f"Success Rate: {pool_stats['success_rate']:.1f}%")
    
    pool.shutdown()

# Run benchmark
# benchmark_threaded_measurements()

Best Practices and Common Pitfalls

Threading Best Practices

# DO: Use context managers for resource cleanup
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(measure_func, instr) for instr in instruments]
    results = [f.result() for f in futures]

# DON'T: Forget to close instruments in threads
def bad_measurement(resource_string):
    instrument = pyvisa.ResourceManager().open_resource(resource_string)
    result = instrument.query('READ?')
    # Missing: instrument.close()
    return result

# DO: Proper error handling in threads
def good_measurement(resource_string):
    rm = pyvisa.ResourceManager()
    try:
        instrument = rm.open_resource(resource_string)
        try:
            result = instrument.query('READ?')
            return float(result)
        finally:
            instrument.close()
    finally:
        rm.close()

# DO: Use locks for shared data
shared_data = {}
data_lock = threading.Lock()

def thread_safe_update(key, value):
    with data_lock:
        shared_data[key] = value

# DON'T: Create too many threads
# Bad: Creates 1000 threads
for i in range(1000):
    threading.Thread(target=measurement_func).start()

# Good: Use thread pool
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(measurement_func) for _ in range(1000)]

Next Steps

How is this guide?