Multi-threading
Multi-threading techniques for PyVISA applications - parallel instrument control, thread-safe resource management, and async/await patterns.
Use multi-threading to control multiple instruments simultaneously, improve application responsiveness, and maximize measurement throughput.
Threading Fundamentals for Instrument Control
Why Threading Matters for PyVISA
- Multiple instruments: Control many instruments simultaneously
- Non-blocking UI: Keep interfaces responsive during slow measurements
- Pipeline processing: Overlap data acquisition and analysis
- I/O waiting: Use CPU while waiting for instrument responses
Thread Safety Considerations
import threading
import queue
import time
import pyvisa
from concurrent.futures import ThreadPoolExecutor, as_completed
class ThreadSafeInstrumentManager:
"""Thread-safe instrument resource management"""
def __init__(self):
self._lock = threading.RLock() # Reentrant lock
self._instruments = {}
self._resource_manager = None
def get_resource_manager(self):
"""Get thread-safe resource manager"""
with self._lock:
if self._resource_manager is None:
self._resource_manager = pyvisa.ResourceManager()
return self._resource_manager
def open_instrument(self, resource_string, thread_id=None):
"""Open instrument with thread-safe access"""
if thread_id is None:
thread_id = threading.current_thread().ident
key = f"{resource_string}_{thread_id}"
with self._lock:
if key not in self._instruments:
rm = self.get_resource_manager()
instrument = rm.open_resource(resource_string)
self._instruments[key] = instrument
return self._instruments[key]
def close_instrument(self, resource_string, thread_id=None):
"""Close instrument thread-safely"""
if thread_id is None:
thread_id = threading.current_thread().ident
key = f"{resource_string}_{thread_id}"
with self._lock:
if key in self._instruments:
self._instruments[key].close()
del self._instruments[key]
def close_all(self):
"""Close all instruments"""
with self._lock:
for instrument in self._instruments.values():
try:
instrument.close()
except:
pass
self._instruments.clear()
# Global thread-safe manager
thread_safe_manager = ThreadSafeInstrumentManager()
Producer-Consumer Pattern for Data Acquisition
High-Speed Data Collection Pipeline
import threading
import queue
import numpy as np
from collections import namedtuple
import time
# Data structures for pipeline
MeasurementData = namedtuple('MeasurementData', ['timestamp', 'instrument', 'data'])
ProcessedData = namedtuple('ProcessedData', ['timestamp', 'instrument', 'result'])
class DataAcquisitionPipeline:
"""Multi-threaded data acquisition and processing pipeline"""
def __init__(self, max_queue_size=100):
# Queues for pipeline stages
self.raw_data_queue = queue.Queue(maxsize=max_queue_size)
self.processed_data_queue = queue.Queue(maxsize=max_queue_size)
# Threading controls
self.running = False
self.acquisition_threads = []
self.processing_threads = []
self.storage_thread = None
# Statistics
self.measurements_count = 0
self.processing_count = 0
self.storage_count = 0
self._stats_lock = threading.Lock()
def start_acquisition_thread(self, resource_string, measurement_interval=1.0):
"""Start data acquisition thread for an instrument"""
def acquisition_worker():
thread_id = threading.current_thread().ident
try:
instrument = thread_safe_manager.open_instrument(
resource_string, thread_id
)
# Configure for fast measurements
instrument.write('*RST')
instrument.write('CONF:VOLT:DC') # Example: DC voltage
while self.running:
try:
# Fast measurement
measurement = float(instrument.query('READ?'))
# Package data
data = MeasurementData(
timestamp=time.time(),
instrument=resource_string,
data=measurement
)
# Add to processing queue
self.raw_data_queue.put(data, timeout=1.0)
with self._stats_lock:
self.measurements_count += 1
time.sleep(measurement_interval)
except queue.Full:
print(f"Warning: Raw data queue full for {resource_string}")
except Exception as e:
print(f"Acquisition error for {resource_string}: {e}")
finally:
thread_safe_manager.close_instrument(resource_string, thread_id)
thread = threading.Thread(target=acquisition_worker, daemon=True)
thread.start()
self.acquisition_threads.append(thread)
return thread
def start_processing_threads(self, num_threads=2):
"""Start data processing threads"""
def processing_worker():
while self.running:
try:
# Get raw data
raw_data = self.raw_data_queue.get(timeout=1.0)
# Process data (example: statistical analysis)
processed_value = self._process_measurement(raw_data.data)
# Package processed data
processed = ProcessedData(
timestamp=raw_data.timestamp,
instrument=raw_data.instrument,
result=processed_value
)
# Send to storage
self.processed_data_queue.put(processed, timeout=1.0)
with self._stats_lock:
self.processing_count += 1
# Mark task done
self.raw_data_queue.task_done()
except queue.Empty:
continue
except queue.Full:
print("Warning: Processed data queue full")
except Exception as e:
print(f"Processing error: {e}")
for _ in range(num_threads):
thread = threading.Thread(target=processing_worker, daemon=True)
thread.start()
self.processing_threads.append(thread)
def _process_measurement(self, raw_value):
"""Process raw measurement (override for specific processing)"""
# Example: Simple moving average or other analysis
return raw_value # Placeholder
def start_storage_thread(self):
"""Start data storage thread"""
def storage_worker():
storage_buffer = []
last_save_time = time.time()
save_interval = 10.0 # Save every 10 seconds
while self.running:
try:
# Get processed data
processed_data = self.processed_data_queue.get(timeout=1.0)
storage_buffer.append(processed_data)
# Periodic save
if (time.time() - last_save_time > save_interval or
len(storage_buffer) >= 1000):
self._save_data_batch(storage_buffer)
storage_buffer.clear()
last_save_time = time.time()
with self._stats_lock:
self.storage_count += 1
self.processed_data_queue.task_done()
except queue.Empty:
# Save any remaining data
if storage_buffer:
self._save_data_batch(storage_buffer)
storage_buffer.clear()
except Exception as e:
print(f"Storage error: {e}")
self.storage_thread = threading.Thread(target=storage_worker, daemon=True)
self.storage_thread.start()
def _save_data_batch(self, data_batch):
"""Save batch of data (override for specific storage)"""
print(f"Saving batch of {len(data_batch)} measurements")
# Example: Save to file, database, etc.
def start_pipeline(self, instrument_configs):
"""Start complete data acquisition pipeline"""
self.running = True
# Start processing and storage threads
self.start_processing_threads(num_threads=4)
self.start_storage_thread()
# Start acquisition threads for each instrument
for config in instrument_configs:
self.start_acquisition_thread(
resource_string=config['resource'],
measurement_interval=config.get('interval', 1.0)
)
print(f"Pipeline started with {len(instrument_configs)} instruments")
def stop_pipeline(self):
"""Stop pipeline and wait for completion"""
print("Stopping pipeline...")
self.running = False
# Wait for threads to complete
for thread in self.acquisition_threads:
thread.join(timeout=5.0)
# Wait for queues to empty
self.raw_data_queue.join()
self.processed_data_queue.join()
print("Pipeline stopped")
def get_statistics(self):
"""Get pipeline statistics"""
with self._stats_lock:
return {
'measurements': self.measurements_count,
'processed': self.processing_count,
'stored': self.storage_count,
'raw_queue_size': self.raw_data_queue.qsize(),
'processed_queue_size': self.processed_data_queue.qsize()
}
# Usage example
pipeline = DataAcquisitionPipeline(max_queue_size=1000)
# Configure instruments
instruments = [
{'resource': 'TCPIP::192.168.1.100::INSTR', 'interval': 0.5}, # 2 Hz
{'resource': 'TCPIP::192.168.1.101::INSTR', 'interval': 1.0}, # 1 Hz
{'resource': 'USB::0x0699::0x0363::C102912::INSTR', 'interval': 2.0} # 0.5 Hz
]
# Start pipeline
pipeline.start_pipeline(instruments)
# Run for a while
time.sleep(30)
# Check statistics
stats = pipeline.get_statistics()
print(f"Statistics: {stats}")
# Stop pipeline
pipeline.stop_pipeline()
Parallel Instrument Control
Concurrent Operations with ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
class ParallelInstrumentController:
"""Control multiple instruments in parallel"""
def __init__(self, max_workers=10):
self.max_workers = max_workers
self.instruments = {}
def add_instrument(self, name, resource_string):
"""Add instrument to parallel control group"""
self.instruments[name] = resource_string
def parallel_measurement(self, measurement_func, *args, **kwargs):
"""Execute measurement function on all instruments in parallel"""
results = {}
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all tasks
future_to_instrument = {
executor.submit(self._instrument_measurement,
name, resource, measurement_func, *args, **kwargs): name
for name, resource in self.instruments.items()
}
# Collect results as they complete
for future in as_completed(future_to_instrument, timeout=60):
instrument_name = future_to_instrument[future]
try:
result = future.result()
results[instrument_name] = result
print(f"{instrument_name}: completed")
except Exception as e:
print(f"{instrument_name}: error - {e}")
results[instrument_name] = None
return results
def _instrument_measurement(self, name, resource_string, func, *args, **kwargs):
"""Execute measurement function on single instrument"""
thread_id = threading.current_thread().ident
try:
instrument = thread_safe_manager.open_instrument(resource_string, thread_id)
return func(instrument, *args, **kwargs)
finally:
thread_safe_manager.close_instrument(resource_string, thread_id)
def parallel_configuration(self, config_commands):
"""Configure all instruments in parallel"""
def configure_instrument(instrument, commands):
"""Configure single instrument"""
for command in commands:
instrument.write(command)
return len(commands)
return self.parallel_measurement(configure_instrument, config_commands)
def parallel_sweep(self, parameter_name, values, measurement_func):
"""Perform parameter sweep across all instruments"""
all_results = {}
for value in values:
print(f"Setting {parameter_name} = {value}")
# Set parameter on all instruments
def set_parameter(instrument, param_value):
instrument.write(f"{parameter_name} {param_value}")
time.sleep(0.1) # Allow settling
return param_value
set_results = self.parallel_measurement(set_parameter, value)
# Take measurements
measure_results = self.parallel_measurement(measurement_func)
# Combine results
all_results[value] = {
'settings': set_results,
'measurements': measure_results
}
return all_results
# Example measurement functions
def measure_voltage(instrument):
"""Measure DC voltage"""
return float(instrument.query('MEAS:VOLT:DC?'))
def measure_current(instrument):
"""Measure DC current"""
return float(instrument.query('MEAS:CURR:DC?'))
def measure_resistance(instrument):
"""Measure resistance"""
return float(instrument.query('MEAS:RES?'))
# Usage example
controller = ParallelInstrumentController(max_workers=8)
# Add instruments
controller.add_instrument('DMM1', 'TCPIP::192.168.1.100::INSTR')
controller.add_instrument('DMM2', 'TCPIP::192.168.1.101::INSTR')
controller.add_instrument('DMM3', 'USB::0x2A8D::0x0318::MY12345678::INSTR')
controller.add_instrument('DMM4', 'GPIB::22::INSTR')
# Parallel configuration
config_commands = ['*RST', 'CONF:VOLT:DC 10,0.001', 'TRIG:SOUR IMM']
config_results = controller.parallel_configuration(config_commands)
# Parallel measurements
voltage_results = controller.parallel_measurement(measure_voltage)
current_results = controller.parallel_measurement(measure_current)
print(f"Voltage measurements: {voltage_results}")
print(f"Current measurements: {current_results}")
# Parameter sweep example
frequency_values = [1000, 2000, 5000, 10000] # Hz
sweep_results = controller.parallel_sweep('SOUR:FREQ', frequency_values, measure_voltage)
Async/Await Patterns for Modern Python
AsyncIO with PyVISA
import asyncio
import aiofiles
import time
from typing import Dict, List, Optional
class AsyncInstrumentController:
"""Async/await instrument controller"""
def __init__(self):
self.instruments = {}
self.semaphore = asyncio.Semaphore(10) # Limit concurrent operations
async def add_instrument(self, name: str, resource_string: str):
"""Add instrument (thread-safe initialization)"""
def sync_open():
return thread_safe_manager.open_instrument(resource_string)
# Run blocking operation in thread pool
loop = asyncio.get_event_loop()
instrument = await loop.run_in_executor(None, sync_open)
self.instruments[name] = instrument
async def query_instrument(self, name: str, command: str) -> Optional[str]:
"""Async instrument query"""
if name not in self.instruments:
return None
async with self.semaphore: # Limit concurrent queries
instrument = self.instruments[name]
def sync_query():
return instrument.query(command)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, sync_query)
async def write_instrument(self, name: str, command: str):
"""Async instrument write"""
if name not in self.instruments:
return False
async with self.semaphore:
instrument = self.instruments[name]
def sync_write():
instrument.write(command)
return True
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, sync_write)
async def measure_all_async(self, command: str) -> Dict[str, float]:
"""Measure all instruments concurrently"""
async def measure_single(name):
result = await self.query_instrument(name, command)
try:
return name, float(result) if result else None
except ValueError:
return name, None
# Create tasks for all instruments
tasks = [measure_single(name) for name in self.instruments.keys()]
# Wait for all to complete
results = await asyncio.gather(*tasks, return_exceptions=True)
return {name: value for name, value in results if not isinstance(value, Exception)}
async def timed_measurements(self, duration: float, interval: float, command: str):
"""Take measurements at regular intervals"""
start_time = time.time()
measurements = []
while time.time() - start_time < duration:
timestamp = time.time()
# Measure all instruments
results = await self.measure_all_async(command)
# Store with timestamp
measurements.append({
'timestamp': timestamp,
'data': results
})
# Wait for next measurement (non-blocking)
await asyncio.sleep(interval)
return measurements
async def save_measurements_async(self, measurements: List, filename: str):
"""Save measurements asynchronously"""
async with aiofiles.open(filename, 'w') as f:
await f.write('timestamp')
# Write header
if measurements:
instrument_names = measurements[0]['data'].keys()
for name in instrument_names:
await f.write(f',{name}')
await f.write('\n')
# Write data
for measurement in measurements:
await f.write(f"{measurement['timestamp']}")
for name in instrument_names:
value = measurement['data'].get(name, '')
await f.write(f',{value}')
await f.write('\n')
# Usage example
async def async_measurement_example():
controller = AsyncInstrumentController()
# Add instruments
await controller.add_instrument('DMM1', 'TCPIP::192.168.1.100::INSTR')
await controller.add_instrument('DMM2', 'TCPIP::192.168.1.101::INSTR')
await controller.add_instrument('DMM3', 'USB::0x2A8D::0x0318::MY12345678::INSTR')
# Configure all instruments concurrently
config_tasks = []
for name in controller.instruments.keys():
config_tasks.append(controller.write_instrument(name, '*RST'))
config_tasks.append(controller.write_instrument(name, 'CONF:VOLT:DC'))
await asyncio.gather(*config_tasks)
# Take measurements for 60 seconds at 0.5 Hz
measurements = await controller.timed_measurements(
duration=60.0,
interval=2.0,
command='READ?'
)
# Save results asynchronously
await controller.save_measurements_async(measurements, 'async_measurements.csv')
print(f"Collected {len(measurements)} measurement sets")
# Run async example
# asyncio.run(async_measurement_example())
Thread-Safe Data Structures
Concurrent Data Collection
import threading
import collections
from threading import RLock
import time
class ThreadSafeDataBuffer:
"""Thread-safe circular buffer for measurement data"""
def __init__(self, maxsize=10000):
self.maxsize = maxsize
self.buffer = collections.deque(maxlen=maxsize)
self.lock = RLock()
self.condition = threading.Condition(self.lock)
self.total_added = 0
def add(self, data):
"""Add data to buffer (thread-safe)"""
with self.condition:
self.buffer.append(data)
self.total_added += 1
self.condition.notify_all() # Wake up waiting consumers
def get_recent(self, count=None):
"""Get recent data (thread-safe)"""
with self.lock:
if count is None:
return list(self.buffer)
else:
return list(self.buffer)[-count:]
def wait_for_data(self, min_count=1, timeout=None):
"""Wait until buffer has minimum amount of data"""
with self.condition:
return self.condition.wait_for(
lambda: len(self.buffer) >= min_count,
timeout=timeout
)
def clear(self):
"""Clear buffer (thread-safe)"""
with self.lock:
self.buffer.clear()
def get_statistics(self):
"""Get buffer statistics"""
with self.lock:
return {
'current_size': len(self.buffer),
'max_size': self.maxsize,
'total_added': self.total_added,
'utilization': len(self.buffer) / self.maxsize
}
class ThreadSafeInstrumentResults:
"""Thread-safe storage for instrument measurement results"""
def __init__(self):
self.results = {}
self.lock = RLock()
self.last_update = {}
def update_result(self, instrument_name, measurement):
"""Update result for instrument"""
with self.lock:
self.results[instrument_name] = measurement
self.last_update[instrument_name] = time.time()
def get_all_results(self):
"""Get all current results"""
with self.lock:
return self.results.copy()
def get_result(self, instrument_name):
"""Get result for specific instrument"""
with self.lock:
return self.results.get(instrument_name)
def get_fresh_results(self, max_age_seconds=10):
"""Get results that are not too old"""
current_time = time.time()
fresh_results = {}
with self.lock:
for name, result in self.results.items():
last_update = self.last_update.get(name, 0)
if current_time - last_update <= max_age_seconds:
fresh_results[name] = result
return fresh_results
# Usage with threading
shared_buffer = ThreadSafeDataBuffer(maxsize=5000)
shared_results = ThreadSafeInstrumentResults()
def measurement_worker(instrument_name, resource_string):
"""Worker thread for continuous measurement"""
thread_id = threading.current_thread().ident
try:
instrument = thread_safe_manager.open_instrument(resource_string, thread_id)
while True: # Continuous measurement
try:
measurement = float(instrument.query('READ?'))
timestamp = time.time()
# Add to shared buffer
measurement_data = {
'instrument': instrument_name,
'timestamp': timestamp,
'value': measurement
}
shared_buffer.add(measurement_data)
# Update shared results
shared_results.update_result(instrument_name, measurement)
time.sleep(1.0) # 1 Hz measurement
except Exception as e:
print(f"Measurement error for {instrument_name}: {e}")
time.sleep(5.0) # Wait before retry
except Exception as e:
print(f"Worker error for {instrument_name}: {e}")
finally:
thread_safe_manager.close_instrument(resource_string, thread_id)
# Start measurement threads
instruments = [
('DMM1', 'TCPIP::192.168.1.100::INSTR'),
('DMM2', 'TCPIP::192.168.1.101::INSTR'),
('PSU1', 'USB::0x2A8D::0x0318::MY12345678::INSTR')
]
threads = []
for name, resource in instruments:
thread = threading.Thread(
target=measurement_worker,
args=(name, resource),
daemon=True
)
thread.start()
threads.append(thread)
# Monitor results
for _ in range(30): # Monitor for 30 seconds
time.sleep(1)
# Get fresh results
results = shared_results.get_fresh_results(max_age_seconds=5)
print(f"Current results: {results}")
# Get buffer statistics
stats = shared_buffer.get_statistics()
print(f"Buffer stats: {stats}")
Performance Optimization and Monitoring
Thread Pool Optimization
import concurrent.futures
import threading
import time
import psutil
class OptimizedThreadPool:
"""Optimized thread pool for instrument operations"""
def __init__(self, max_workers=None):
if max_workers is None:
# Auto-detect optimal thread count
cpu_count = psutil.cpu_count(logical=True)
max_workers = min(cpu_count * 2, 20) # 2x CPU cores, max 20
self.max_workers = max_workers
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.active_tasks = 0
self.completed_tasks = 0
self.failed_tasks = 0
self.lock = threading.Lock()
print(f"Initialized thread pool with {max_workers} workers")
def submit_measurement(self, func, *args, **kwargs):
"""Submit measurement task with monitoring"""
def wrapper():
with self.lock:
self.active_tasks += 1
try:
start_time = time.time()
result = func(*args, **kwargs)
duration = time.time() - start_time
with self.lock:
self.completed_tasks += 1
return {
'result': result,
'duration': duration,
'success': True
}
except Exception as e:
with self.lock:
self.failed_tasks += 1
return {
'result': None,
'error': str(e),
'success': False
}
finally:
with self.lock:
self.active_tasks -= 1
return self.executor.submit(wrapper)
def get_statistics(self):
"""Get thread pool statistics"""
with self.lock:
return {
'max_workers': self.max_workers,
'active_tasks': self.active_tasks,
'completed_tasks': self.completed_tasks,
'failed_tasks': self.failed_tasks,
'success_rate': (self.completed_tasks /
max(self.completed_tasks + self.failed_tasks, 1) * 100)
}
def shutdown(self, wait=True):
"""Shutdown thread pool"""
self.executor.shutdown(wait=wait)
# Performance monitoring
class ThreadingPerformanceMonitor:
"""Monitor threading performance"""
def __init__(self):
self.start_time = None
self.measurements = []
self.monitoring = False
self.monitor_thread = None
def start_monitoring(self):
"""Start performance monitoring"""
self.monitoring = True
self.start_time = time.time()
self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
self.monitor_thread.start()
def _monitor_loop(self):
"""Monitoring loop"""
while self.monitoring:
# System metrics
cpu_percent = psutil.cpu_percent(interval=1)
memory_info = psutil.virtual_memory()
thread_count = threading.active_count()
measurement = {
'timestamp': time.time() - self.start_time,
'cpu_percent': cpu_percent,
'memory_percent': memory_info.percent,
'memory_mb': memory_info.used / 1024 / 1024,
'thread_count': thread_count
}
self.measurements.append(measurement)
def stop_monitoring(self):
"""Stop monitoring and get results"""
self.monitoring = False
if self.monitor_thread:
self.monitor_thread.join()
return self.measurements
def get_summary(self):
"""Get performance summary"""
if not self.measurements:
return None
import numpy as np
cpu_values = [m['cpu_percent'] for m in self.measurements]
memory_values = [m['memory_mb'] for m in self.measurements]
thread_values = [m['thread_count'] for m in self.measurements]
return {
'duration': self.measurements[-1]['timestamp'],
'avg_cpu_percent': np.mean(cpu_values),
'max_cpu_percent': np.max(cpu_values),
'avg_memory_mb': np.mean(memory_values),
'max_memory_mb': np.max(memory_values),
'avg_threads': np.mean(thread_values),
'max_threads': np.max(thread_values)
}
# Usage example with performance monitoring
def benchmark_threaded_measurements():
"""Benchmark different threading approaches"""
monitor = ThreadingPerformanceMonitor()
pool = OptimizedThreadPool(max_workers=8)
monitor.start_monitoring()
# Submit many measurement tasks
futures = []
for i in range(100):
future = pool.submit_measurement(
measure_voltage, # measurement function
'TCPIP::192.168.1.100::INSTR' # resource string
)
futures.append(future)
# Wait for completion
results = []
for future in concurrent.futures.as_completed(futures, timeout=120):
result = future.result()
results.append(result)
# Stop monitoring
measurements = monitor.stop_monitoring()
summary = monitor.get_summary()
# Thread pool statistics
pool_stats = pool.get_statistics()
print(f"Performance Summary:")
print(f"Duration: {summary['duration']:.1f}s")
print(f"Average CPU: {summary['avg_cpu_percent']:.1f}%")
print(f"Average Memory: {summary['avg_memory_mb']:.1f}MB")
print(f"Average Threads: {summary['avg_threads']:.1f}")
print(f"Success Rate: {pool_stats['success_rate']:.1f}%")
pool.shutdown()
# Run benchmark
# benchmark_threaded_measurements()
Best Practices and Common Pitfalls
Threading Best Practices
# DO: Use context managers for resource cleanup
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(measure_func, instr) for instr in instruments]
results = [f.result() for f in futures]
# DON'T: Forget to close instruments in threads
def bad_measurement(resource_string):
instrument = pyvisa.ResourceManager().open_resource(resource_string)
result = instrument.query('READ?')
# Missing: instrument.close()
return result
# DO: Proper error handling in threads
def good_measurement(resource_string):
rm = pyvisa.ResourceManager()
try:
instrument = rm.open_resource(resource_string)
try:
result = instrument.query('READ?')
return float(result)
finally:
instrument.close()
finally:
rm.close()
# DO: Use locks for shared data
shared_data = {}
data_lock = threading.Lock()
def thread_safe_update(key, value):
with data_lock:
shared_data[key] = value
# DON'T: Create too many threads
# Bad: Creates 1000 threads
for i in range(1000):
threading.Thread(target=measurement_func).start()
# Good: Use thread pool
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(measurement_func) for _ in range(1000)]
Next Steps
- Large data optimization: Large Data Transfers
- Memory optimization: Memory Management
- General performance: Performance Guide
How is this guide?