Multi-threading
How to control multiple PyVISA instruments in parallel. Thread-safe access, ThreadPoolExecutor, and producer-consumer patterns.
VISA I/O is blocking. While one instrument responds, your CPU sits idle. Threading lets you overlap waits across multiple instruments and pipeline acquisition with processing.
Thread-Safe Instrument Access
PyVISA's ResourceManager isn't thread-safe. Use a lock to serialize access to shared instruments, or open one connection per thread.
import threading
import pyvisa
class ThreadSafeInstrumentManager:
"""Opens one connection per (resource, thread) pair."""
def __init__(self):
self._lock = threading.RLock()
self._instruments = {}
self._rm = None
def _get_rm(self):
with self._lock:
if self._rm is None:
self._rm = pyvisa.ResourceManager()
return self._rm
def open(self, resource_string):
thread_id = threading.current_thread().ident
key = f"{resource_string}_{thread_id}"
with self._lock:
if key not in self._instruments:
rm = self._get_rm()
self._instruments[key] = rm.open_resource(resource_string)
return self._instruments[key]
def close(self, resource_string):
thread_id = threading.current_thread().ident
key = f"{resource_string}_{thread_id}"
with self._lock:
if key in self._instruments:
self._instruments[key].close()
del self._instruments[key]
def close_all(self):
with self._lock:
for inst in self._instruments.values():
try:
inst.close()
except Exception:
pass
self._instruments.clear()
manager = ThreadSafeInstrumentManager()Log measurements automatically
TofuPilot records test results from your PyVISA scripts, tracks pass/fail rates, and generates compliance reports. Free to start.
Parallel Measurements with ThreadPoolExecutor
The simplest way to measure multiple instruments at once.
from concurrent.futures import ThreadPoolExecutor, as_completed
import pyvisa
import time
resources = {
"DMM1": "TCPIP::192.168.1.100::INSTR",
"DMM2": "TCPIP::192.168.1.101::INSTR",
"DMM3": "USB0::0x2A8D::0x0318::MY12345678::INSTR",
"DMM4": "GPIB::22::INSTR",
}
def measure_voltage(name, resource_string):
"""Measure DC voltage on one instrument."""
rm = pyvisa.ResourceManager()
try:
inst = rm.open_resource(resource_string)
try:
inst.timeout = 5000
inst.write("CONF:VOLT:DC 10,0.001")
value = float(inst.query("READ?"))
return name, value
finally:
inst.close()
finally:
rm.close()
start = time.time()
with ThreadPoolExecutor(max_workers=len(resources)) as executor:
futures = {
executor.submit(measure_voltage, name, rs): name
for name, rs in resources.items()
}
for future in as_completed(futures):
name, voltage = future.result()
print(f"{name}: {voltage:.6f} V")
elapsed = time.time() - start
print(f"All {len(resources)} instruments measured in {elapsed:.2f}s")One ResourceManager per thread
Creating a separate ResourceManager per thread avoids subtle crashes. It costs
almost nothing compared to the I/O time.
Producer-Consumer Pipeline
When you need continuous acquisition on multiple instruments with background processing, use a queue to decouple acquisition from analysis.
import threading
import queue
import time
import pyvisa
data_queue = queue.Queue(maxsize=1000)
stop_event = threading.Event()
def acquisition_worker(resource_string, interval=1.0):
"""Producer: reads from an instrument and pushes to the queue."""
rm = pyvisa.ResourceManager()
inst = rm.open_resource(resource_string)
inst.timeout = 5000
inst.write("CONF:VOLT:DC")
try:
while not stop_event.is_set():
value = float(inst.query("READ?"))
data_queue.put({
"timestamp": time.time(),
"source": resource_string,
"value": value,
})
time.sleep(interval)
finally:
inst.close()
rm.close()
def processing_worker():
"""Consumer: pulls from the queue and processes."""
while not stop_event.is_set():
try:
item = data_queue.get(timeout=1.0)
# Process, log, write to file, etc.
print(f"{item['source']}: {item['value']:.6f} V")
data_queue.task_done()
except queue.Empty:
continue
# Start threads
producers = [
threading.Thread(target=acquisition_worker, args=("TCPIP::192.168.1.100::INSTR", 0.5), daemon=True),
threading.Thread(target=acquisition_worker, args=("TCPIP::192.168.1.101::INSTR", 1.0), daemon=True),
]
consumer = threading.Thread(target=processing_worker, daemon=True)
for t in producers:
t.start()
consumer.start()
# Run for 30 seconds
time.sleep(30)
stop_event.set()
for t in producers:
t.join(timeout=5)
consumer.join(timeout=5)
print(f"Queue remaining: {data_queue.qsize()} items")Do's and Don'ts
import threading
from concurrent.futures import ThreadPoolExecutor
import pyvisa
# DO: Use a thread pool (bounded number of threads)
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(measure_func, r) for r in resources]
results = [f.result() for f in futures]
# DON'T: Spawn unbounded threads
# for r in resources:
# threading.Thread(target=measure_func, args=(r,)).start()
# DO: Close instruments in a finally block
def safe_measure(resource_string):
rm = pyvisa.ResourceManager()
try:
inst = rm.open_resource(resource_string)
try:
return float(inst.query("READ?"))
finally:
inst.close()
finally:
rm.close()
# DO: Use locks for shared state
results = {}
lock = threading.Lock()
def store_result(name, value):
with lock:
results[name] = valueThread Count Guidelines
| Scenario | Recommended Workers | Why |
|---|---|---|
| 2-4 instruments | 1 thread per instrument | No contention, simple |
| 5-20 instruments | 8-10 threads | Diminishing returns beyond this |
| 50+ instruments | 10-20 threads | OS thread overhead becomes significant |
| CPU-heavy processing | os.cpu_count() | Matches available cores |