Skip to content

Event-Driven Acquisition#

Important

This page assumes you have a basic understanding of how the default MDA acquisition engine works to execute a sequence of useq.MDAEvent objects. If you haven't already done so, please read the Acquisition Engine guide first.

You may not always know the exact sequence of events that you want to execute ahead of time. For example, you may want to start acquiring images at a certain frequency, but then take a burst of images at a faster frame rate or in a specific region of interest when a specific (possibly rare) event occurs. This is sometimes referred to as event-driven acquisition, or "smart-microscopy".

In publications

For two compelling examples of this type of event-driven microscopy, see:

  1. Mahecic D, Stepp WL, Zhang C, GriffiΓ© J, Weigert M, Manley S. Event-driven acquisition for content-enriched microscopy. Nat Methods 19, 1262–1267 (2022). https://doi.org/10.1038/s41592-022-01589-x

  2. Shi Y, Tabet JS, Milkie DE, Daugird TA, Yang CQ, Giovannucci A, Legant WR. Smart Lattice Light Sheet Microscopy for imaging rare and complex cellular events. bioRxiv. 2023 Mar 9 https://doi.org/10.1101/2023.03.07.531517.

Obviously, in this case, you can't just create a list of useq.MDAEvent objects and pass them to the acquisition engine, since that list needs to change based on the results of previous events.

Fortunately, the MDARunner.run() method is designed to handle this case.

Iterable[MDAEvent]#

The key thing to observe here is the signature of the MDARunner.run() method:

from typing import Iterable
import useq

class MDARunner:
    def run(self, events: Iterable[useq.MDAEvent]) -> None: ...

πŸ‘€ The run method expects an iterable of useq.MDAEvent objects. πŸ‘€

Iterable

An Iterable is any object that implements an __iter__() method that returns an iterator object. This includes sequences of known length, like list, tuple, but also many other types of objects, such as generators, deque, and more. Other types such as Queue can easily be converted to an iterator as well, as we'll see below.

Useful Iterables#

Many python objects are iterable. Let's look at a few types of iterables that can be used to implement event-driven acquisition in pymmcore-plus.

Generators#

Generator functions are functions that contain yield statements. When called, they return a generator iterator that can be used to iterate over the values yielded by the generator function.

Say what?

That may sound a bit confusing, but it's actually quite simple. It just means that you can use the output of a generator function in a for loop:

from typing import Iterator

# a generator function, which contains "yield" statements
def my_generator_func() -> Iterator[int]:
    yield 1
    yield 2

# calling the function returns an iterator
gen_iterator = my_generator_func()

# which we can iterate over (e.g. in a for loop)
for value in gen_iterator:
    print(value)  # prints 1, then 2

Let's create a generator that yields useq.MDAEvent objects, but simulate a "burst" of events when a certain condition is met:

import random
import time
from typing import Iterator

import useq

def some_condition_is_met() -> bool:
    # Return True 20% of the time ...
    # Just an example of some probabilistic condition
    # This could be anything, the results of analysis, etc.
    return random.random() < 0.2

# generator function that yields events
def my_events() -> Iterator[useq.MDAEvent]:
    i = 0
    while True:
        if some_condition_is_met():
            # yield a burst of events
            for _ in range(5):
                yield useq.MDAEvent(metadata={'bursting': True})
        elif i > 5:
            # stop after 5 events
            # (just an example of some stop condition)
            return
        else:
            # yield a regular single event
            yield useq.MDAEvent()

        # wait a bit before yielding the next event (1)
        time.sleep(0.1)
        i += 1
  1. Note, we could also take advantage of the min_start_time field in MDAEvent, but this demonstrates that the generator can also control the timing of events.
example output of list(my_events())

We can use the list() function to iterate over the generator and collect the yielded events:

list(my_events())

Because of the random condition, the output will be different each time, but it might look something like this:

[
    MDAEvent(),
    MDAEvent(metadata={'bursting': True}),  # (1)!
    MDAEvent(metadata={'bursting': True}),
    MDAEvent(metadata={'bursting': True}),
    MDAEvent(metadata={'bursting': True}),
    MDAEvent(metadata={'bursting': True}),
    MDAEvent(),
    MDAEvent(),
    MDAEvent(),
    MDAEvent() # (2)!
]
  1. some_condition_is_met returned True on the second iteration, so the generator yielded a burst of events.
  2. Our "stop condition" of i > 5 was met, so the generator returned and stopped yielding events.

To run this "experiment" using pymmcore-plus, we can pass the output of the generator to the MDARunner.run() method:

from pymmcore_plus import CMMCorePlus

core = CMMCorePlus()
core.loadSystemConfiguration()

core.run_mda(my_events())

Queues#

Python's Queue class is useful for managing and synchronizing data between multiple threads or processes. It ensures orderly execution and prevents race conditions. Generally, a Queue is passed between threads or processes, and one thread or process puts data (such as an MDAEvent to execute) into the queue, while another thread or process gets data out of the queue.

A Queue instance itself is not an iterable...

πŸ‘Ž

>>> from queue import Queue
>>> list(Queue())
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: 'Queue' object is not iterable

however, a Queue can be easily converted to an iterator using the two-argument version of the builtin iter() function:

πŸ‘

>>> from queue import Queue
>>> q = Queue()
>>> q.put(1)
>>> q.put(2)
>>> q.put('STOP')
>>> iterable_queue = iter(q.get, 'STOP') # !! (1)
>>> list(iterable_queue)
[1, 2]
  1. 🎩 Thanks Kyle Douglass for discovering this handy, if obscure, second argument to iter()!

We can use this iter(queue.get, sentinel) pattern to create a queue-backed iterable that can be passed to the run_mda() method. The acquisition engine will then execute events as they get put into the queue, until the stop sentinel is encountered.

from queue import Queue
from pymmcore_plus import CMMCorePlus
from useq import MDAEvent

core = CMMCorePlus()
core.loadSystemConfiguration()

q = Queue()                    # create the queue
STOP = object()                # any object can serve as the sentinel
q_iterator = iter(q.get, STOP) # create the queue-backed iterable

# start the acquisition in a separate thread
core.run_mda(q_iterator)

# (optional) connect some callback to the imageReady signal
@core.mda.events.frameReady.connect
def on_frame(img, event):
    print(f'Frame {event.index} received: {img.shape}')

# now we can put events into the queue
# according to whatever logic we want:
q.put(MDAEvent(index={'t': 0}, exposure=20))
q.put(MDAEvent(index={'t': 1}, exposure=40))

# ... and eventually stop the acquisition
q.put(STOP)
More complete event-driven acquisition example

The following example is modified from this gist by Kyle Douglass.

It simulates a typical event-driven acquisition, where an Analyzer object analyzes the results of each image and provides a dict of results. The Controller object then decides whether to continue or stop the acquisition (by placing the STOP_EVENT sentinel in the queue).

event_driven_acquisition.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
"""Simple simulator demonstrating event-driven acquisitions with pymmcore-plus."""

import random
import time
from queue import Queue

import numpy as np
from pymmcore_plus import CMMCorePlus
from useq import MDAEvent


class Analyzer:
    """Analyzes images and returns a dict of results."""

    def run(self, data) -> dict:
        # Fake analysis; randomly return a dict with a value of None 10% of the time
        if random.random() < 0.1:
            return {"result": "STOP"}
        else:
            return {"result": random.random()}


class Controller:
    STOP_EVENT = object()

    def __init__(self, analyzer: Analyzer, mmc: CMMCorePlus, queue: Queue):
        self._analyzer = analyzer  # analyzer of images
        self._queue = queue  # queue of MDAEvents
        self._results: dict = {}  # results of analysis

        self._mmc = mmc
        mmc.mda.events.frameReady.connect(self._on_frame_ready)

    def _on_frame_ready(self, img: np.ndarray, event: MDAEvent) -> None:
        # Analyze the image
        self._results = self._analyzer.run(img)

    def run(self) -> None:
        # convert the queue to an iterable
        queue_sequence = iter(self._queue.get, self.STOP_EVENT)

        # Start the acquisition (run_mda is non-blocking)
        self._mmc.run_mda(queue_sequence)

        # Queue the first image acquisition
        self._queue.put(MDAEvent(exposure=10))

        # loop until the analyzer returns "STOP"
        while True:
            # get the last results from the analyzer
            result = self._results.pop("result", None)

            # Decide what to do. This is the key part of the reactive loop.
            if result == "STOP":
                # Do nothing and return
                print("Analyzer returned no results. Stopping...")
                self._queue.put(self.STOP_EVENT)
                break
            elif result:
                # Adjust the exposure time based on the results and continue
                print("Analyzer returned results. Continuing...")
                next_event = MDAEvent(exposure=10 * result)
                self._queue.put(next_event)
            else:
                # No results yet, wait a bit and check again
                time.sleep(0.1)


def main():
    # Setup the MM Core
    mmc = CMMCorePlus()
    mmc.loadSystemConfiguration()

    # create the Queue that will hold the MDAEvents
    q = Queue()

    # Setup the controller and analyzer
    analyzer = Analyzer()
    controller = Controller(analyzer, mmc, q)

    # Start the acquisition
    controller.run()


if __name__ == "__main__":
    main()

MDASequence#

It's worth noting that the MDASequence class is itself an Iterable[MDAEvent]. It implements an __iter__ method that yields the events in the sequence, and it can be passed directly to the run_mda() method as we saw in the Acquisition engine guide. It is a deterministic sequence, so it wouldn't be used on its own to implement conditional event sequences; it can, however, be used in conjunction with other iterables to implement more complex sequences.

Take this simple sequence as an example:

my_sequence = useq.MDASequence(
    time_plan={'loops': 5, 'interval': 0.1},
    channels=["DAPI", "FITC"]
)

In the generator example above, we could yield the events in this sequence when the condition is met (saving us from constructing the events manually)

# example usage in the
def my_events() -> Iterator[useq.MDAEvent]:
    while True:
        if some_condition_is_met():
            yield from my_sequence  # yield the events in the sequence
        else:
            ...

In the Queue example above, we could put the events in the sequence into the queue:

# ... we can put events into the queue
# according to whatever logic we want:
for event in my_sequence:
    q.put(event)