Acquire stack in uManager, reslice and display z-section live in napari

This is a follow up to the I2K2020 course.

For alignment in a lightsheet microscope, I want to display live xz (or yz) sections.

I use uManager to control the camera, and thought about using pycro-manager to grab the images and relay them to napari for display. Data could be sliced using the inbuild napari “roll dimensions”, or using numpy to swap the axes.
No need to save the data, this is really just for alignment purposes.

Here is the documentation of my failures :smiley:
(code is run from a jupyter-lab notebook)

First attempt: run multi_d_acquisition in a loop and send each stack to napari.

from pycromanager import Dataset, Acquisition, multi_d_acquisition_events
import napari
import numpy as np

v = napari.Viewer()
v.add_image(np.zeros([512,512]), name = 'uManager', contrast_limits=[0,5000])

for ii in range(2):
    with Acquisition(directory=save_dir, name=save_name, show_display=False) as acq:
        events = multi_d_acquisition_events(num_time_points = 10)
        # timepoints are z-slices since z-stage control is currently from outside of uManager
        acq.acquire(events)

    # read the data that was just acquired
    dataset = Dataset(acq.get_disk_location())

    dask_array = dataset.as_array()
    v.layers[0].data = dask_array

Works in principle, but has some issues:

  • saves the data to hard drive, then reads from hard drive to display in napari - so will fill up storage quickly, plenty of overhead
  • updates napari display only at the end of the loop, so useless for my goal

@henrypinkard suggested to use an image_process_fn:

  • should relay data to napari
  • suppress saving to disk
from pycromanager import Dataset, Acquisition, multi_d_acquisition_events
import napari
import numpy as np

def stream_napari(image, metadata):
    global v
    v.layers[0].data = image

v = napari.Viewer()
v.add_image(np.zeros([512,512]), name = 'uManager', contrast_limits=[0,5000])

for ii in range(2):
    with Acquisition(directory=None, name=None, show_display=False, image_process_fn = stream_napari) as acq:
        events = multi_d_acquisition_events(num_time_points = 10)
        # timepoints are z-slices since z-stage control is currently from outside of uManager
        acq.acquire(events)

Still doesn’t work:

  • napari display is only updated at the end of the for-loop
  • with the image_process_fn, I’m only sending the active z-slice to napari, so I cannot display z-sections

I then tried to use a dummy image_process_fn, only to avoid saving the data and the Acquisition.get_dataset() function to pass the data to napari.

from pycromanager import Dataset, Acquisition, multi_d_acquisition_events
import napari
import numpy as np

def stream_napari(image, metadata):
    return image, metadata

v = napari.Viewer()
dataset = np.zeros([10, 512,512])
v.add_image(dataset, name = 'uManager', contrast_limits=[0,5000])

for ii in range(2):
    with Acquisition(directory=save_dir, name=save_name, show_display=False, image_process_fn = stream_napari) as acq:
        events = multi_d_acquisition_events(num_time_points = 10)
        # timepoints are z-slices since z-stage control is currently from outside of uManager
        acq.acquire(events)
    dataset = acq.get_dataset()

    dask_array = dataset.as_array()
    v.layers[0].data = dataset

Seems I found a not-yet implemented function?

I’m struggling to understand how/if I can pass more arguments into the image_process_fn, and where I would grab the returns. Could I use an image_process_fn to assemble the stack, and only relay to napari once the stack is complete?

I’m now out of ideas and any help would be greatly appreciated!

Yes, I think this is exactly what you want to do. Take a look at this example, specifically the image processer function. It accumulates images from a z stack, until the Z stack finishes, then does something with all of the data

Something like this:


def image_process_fn(image, metadata): 
    # accumulate images as they come
    if not hasattr(img_process_fn, "images"):
        img_process_fn.images = []


    # add pixels and z position
    img_process_fn.images.append(image)

    if metadata["Axes"]["z"] == planes_per_z_stack - 1:
        # its the final one in the z stack
        images = np.stack(img_process_fn.images, axis=2)

        #now do something with the full stack


planes_per_z_stack = 5

This wont work because if you don’t save the data there’s no dataset to get (ram storage not implemented at the moment)

Im not sure how the dyanmic updates for napari work. There may be some complication in that the image processor runs on a dedicated thread

I don’t have time to reply in detail right now, but you may want to have a look at my demo here in which a volume is continuously updated with new slices as they are coming in:

I am simulating a microscope by reading from files, but you could have a file listener or any other code that gets the image from pycromanager in the worker thread.

3 Likes

Here’s a different example which does live image viewing by connecting a directory watcher with a function that adds images to a dask array which is displayed in napari. It assumes the files are written to disk so you’d need to modify that, but I think it could be very helpful for you to look at.

This is a little different to Volker’s example - if I understand things right he’s putting image content directly into the VisPy texture, whereas this is adding image slices onto a dask array (which napari takes and renders with VisPy).

Here are the two files you’d need to run the example:


4 Likes

Yes, that is correct. In my example, the volume rendered view can be updated as soon as a new slice is available, it doesn’t require waiting for the whole volume acquisition to finish and then updating the whole volume at once. The emphasis is on low latency and instantenous feedback which may be useful for microscope adjustments. As it bypasses the .data attribute of the layer it is a bit of a hack and you wouldn’t be able to go back to earlier timepoints or even to switch to a 2D view.

If low latency and partial volume updates are not a priority, the approach suggested by @GenevieveBuckley is cleaner.

2 Likes

Thanks to everyone who helped so far!

I’m still utterly stuck on getting the communication between micromanager and napari. I’ve tried some things, but nothing worked and I’m not even at a point yet where I can formulate a question for help. Also doesn’t help that I’m dealing for the first time with threads :slight_smile:

As workaround, I started from @GenevieveBuckley’s suggestion that’s saving to disk - only tiny changes to overwrite the whole stack etc. Also only tiny changes to the uManager acquisition routine, as this was saving to disk anyway.
There’s issues:

  • cluttering the disk
  • releasing resources: I need to close everything before executing the code again (?)

I’ll put my attempts here, maybe someone else finds them helpful. To run, uManager has to be started, and then both of the files have to be executed at the same time.

Source code
"""
Loads and Displays tiffs as they get generated in the specific directory.
Trying to simulate the live display of data as it gets acquired by microscope.
This script should be run together with a routine saving files to hard drive.
Adapted from:
https://github.com/napari/napari/blob/1fccbdcdd5c9ca05b4d6e670407e56f3fa305738/examples/live_tiffs.py
"""

import os
import sys
import time
import numpy as np
from skimage.io.collection import alphanumeric_key
from dask import delayed
import dask.array as da
from tifffile import imread
import napari
from napari.qt import thread_worker

post_file =  "/Full Resolution/Acquisition_test_MagellanStack.tif"

with napari.gui_qt():
    """Starts Napari viewer with in which to display the tiffs.
    Parameters
    ----------
    input_dir : str, optional
        directory to monitor and load tiffs as they start appearing.
        If no argument is given the then it starts monitoring currect dir.
    """
    viewer = napari.Viewer(ndisplay=3)
    # pass a directory to monitor or it will monitor current directory.
    path = sys.argv[1] if len(sys.argv) > 1 else '.'
    path = os.path.abspath(path)
    end_of_experiment = 'final.log'

    def append(delayed_image):
        """Appends the image to viewer.

        Parameters
        ----------
        delayed_image : dask.delayed function object
        """
        if delayed_image is None:
            return

        if viewer.layers:
            # layer is present, append to its data
            layer = viewer.layers[0]
            image_shape = layer.data.shape[1:]
            image_dtype = layer.data.dtype
            image = da.from_delayed(
                delayed_image, shape=image_shape, dtype=image_dtype,
            ).reshape((1,) + image_shape)
            #layer.data = da.concatenate((layer.data, image), axis=0)
            layer.data = image
        else:
            # first run, no layer added yet
            image = delayed_image.compute()
            image = da.from_delayed(
                delayed_image, shape=image.shape, dtype=image.dtype,
            ).reshape((1,) + image.shape)
            #layer = viewer.add_image(image, rendering='attenuated_mip')
            layer = viewer.add_image(image, name = "uManager",
                                     contrast_limits = [0,10000])

        # we want to show the last file added in the viewer to do so we want to
        # put the slider at the very end. But, sometimes when user is scrolling
        # through the previous slide then it is annoying to jump to last
        # stack as it gets added. To avoid that jump we 1st check where
        # the scroll is and if its not at the last slide then don't move the slider.
        if viewer.dims.point[0] >= layer.data.shape[0] - 2:
            viewer.dims.set_point(0, layer.data.shape[0] - 1)

    @thread_worker(connect={'yielded': append})
    def load_data(path):
        """Watches the path for new files and yields it once file is ready.
        Notes
        -----
        Currently, there is no proper way to know if the file has written 
        entirely. So the workaround is we assume that files are generating 
        serially (in most microscopes it common), and files are name in 
        alphanumeric sequence We start loading the total number of minus the 
        last file (`total__files - last`). In other words, once we see the new 
        file in the directory, it means the file before it has completed so load
        that file. For this example, we also assume that the microscope is 
        generating a `final.log` file at the end of the acquisition, this file 
        is an indicator to stop monitoring the directory.
        Parameters
        ----------
        path : str
        directory to monitor and load tiffs as they start appearing.
        """
        current_files = set()
        processed_files = set()
        end_of_acquisition = False
        while not end_of_acquisition:
            files_to_process = set()
            # Get the all files in the directory at this time
            current_files = set(os.listdir(path))

            # Check if the end of acquisition has reached
            # if yes then remove it from the files_to_process set
            # and send it to display

            if end_of_experiment in current_files:
                #executed if there's an 'end of experiment' mark ... here: final.log file
                files_to_process = current_files - processed_files
                files_to_process.remove(end_of_experiment)
                end_of_acquisition = True

            elif len(current_files):
                #only executed if there are files 
                # get the last file from the current files based on the file names
                last_file = sorted(current_files, key=alphanumeric_key)[-1]
                current_files.remove(last_file)
                files_to_process = current_files - processed_files

            # yield every file to process as a dask.delayed function object.
            for p in sorted(files_to_process, key=alphanumeric_key):
                p_name = p + post_file
                print(p, p_name)
                yield delayed(imread)(os.path.join(path, p_name))
            else:
                yield

            # add the files which we have yielded to the processed list.
            processed_files.update(files_to_process)
            time.sleep(0.1)

    worker = load_data(path)
from pycromanager import Dataset, Acquisition, multi_d_acquisition_events
import numpy as np
import os, shutil

save_dir = r'temp' #r'C:/Users/wjahr/tmp'
save_name = r'Acquisition_test'

def stream_napari(image, metadata):
#image_process_fn currently not needed, data is streamed to hdd and then read from hdd
# idealy, this would relay data to napari w/o saving in between
    return image, metadata

planes_per_z_stack = 10

for ii in range(50):
    with Acquisition(directory=save_dir, name=save_name, show_display=False, image_process_fn = stream_napari) as acq:
        events = []
        for index, z_um in enumerate(np.linspace(0, 10, planes_per_z_stack)):
            evt = {"axes": {"z": index}, "z": z_um}
            events.append(evt)
        acq.acquire(events)

#dump a file when done such that napari while loop stops
final_file = open(os.path.join(save_dir, 'final.log'), 'w')
final_file.close()

#deleting data doesn't work because it's still blocked by uManager
for filename in os.listdir(save_dir):
    file_path = os.path.join(save_dir, filename)
    try:
        if os.path.isfile(file_path) or os.path.islink(file_path):
            os.unlink(file_path)
        elif os.path.isdir(file_path):
            shutil.rmtree(file_path)
    except Exception as e:
        print('Failed to delete %s. Reason: %s' % (file_path, e))

I think there may be potentially simpler mechanisms I could add to the pycromanager side to improve the Dataset class, if you have a clear idea of what napari will need as input.

It might be worth just trying to generate random data with numpy or dask to dynamically update napari viewer, then we can figure out how to make the same work with pycromanager.

I guess it makes more sense to do this all in RAM? and you want to throw away the previous data as soon as collect new data so as not to fill up memory. Is that right?

2 Likes

Yes, that’s exactly right, and I think I can do it with the tools already there. At least I do have something working off @GenevieveBuckley’s code: starting another thread creating the data (either simulated, or uManager). Code is currently a mess, but I will paste it here after cleaning up and fixing the last remaining issue.

I’ve tracked it down to the image_process fn, there is some weird behaviour?
First image: familiar uManager demo camera as displayed by uManager and as saved onto HDD.
Second image: img_process_fn only plots data using matplotlib, this is what I get.

MWE to reproduce attached.
Micro-Manager-2.0.0-gamma1-20201204
pycromanager 0.7.5
Currently developing on Mac.

Please let me know whether/where I should open a bug report!


import time
import numpy as np
import matplotlib.pyplot as plt
from pycromanager import Dataset, Acquisition, multi_d_acquisition_events

N = 1
z_range = [0, 512, 1, 0]
sleep_time = 0.2

save_dir = r'temp' #r'C:/Users/wjahr/tmp'
save_name = r'Acquisition_test'

def stream_napari(image, metadata):
    """img_process_fnc to grab data from uManager and 
        append to list. Keeps track of z-positions"""

    plt.figure()
    plt.imshow(image)
    print('stream_napari: ', np.dtype(image[0,0]), np.min(image), np.max(image))

    return image, metadata

for ii in range(N):
    with Acquisition(directory=save_dir, name=save_name, show_display=True, image_process_fn = stream_napari) as acq:
        events = []
        for index, z_um in enumerate(np.linspace(z_range[0], z_range[1], z_range[2])):
            evt = {"axes": {"z": index}, "z": z_um}
            events.append(evt)
        acq.acquire(events)
        time.sleep(sleep_time)

I think I ran into the same thing 2 days ago and pushed a fix. Try upgrading to latest nightly build and latest pycro-manager

By the way, when I ran into I was making an example of doing a maximum intensity projection with an image processor. The code might be useful for you since there are some similarities:

1 Like

I also played with this a bit more on Saturday, out of curiosity (note that I had not worked with micro/picromanager before).
So it seems like you found a solution already, but in case it is useful I will put my experiments out there.

Not currently having access to a microscope I used opencv-python to access my webcam code is here.

This can be ported straightforwardly to pymmcore which I learned is a different way of accessing the micromanager libraries code is here, using the simulated camera.
I only started to become familiar with the pycromanager interface which is conceptually a bit different from the others. What I started trying there was an image processor that saves every slice with a time stamp:

def imgprcs(img, meta):
    tifffile.imwrite(f"./images/img_{time.time()}.tif", img) 


with Acquisition(directory=save_dir, name=save_name, image_process_fn=img_process_fn) as acq:
    acq.acquire(events)

The idea here was to save each camera image (slice) as soon as it is available and then add a worker thread in napari that adds these images to the dataset. The worker thread could then also delete these intermediate images if these images would otherwise clutter up the harddrive. The metadata could be embedded in the filename (or in a separate file). Going through the file system is just a simple form of inter-process communication here, obviously other forms that bypass the disk drive would be faster, e.g. an img_process_fn that pushes each frame and its metadata to an interprocess queue in a way that the napari/qt threadworker would only have to retrieve from that queue and yield a new image.

Edit to add:
The advantage of this is that you can already see the xz or yz while your scanning. If the data isn’t too large this may also be possible using the “proper” way that Genevieve used (i.e. not bypassing the napari datastructures). You could then simply roll dimensions etc. I probably won’t have time to work on this in the coming weeks but maybe it will show some possibilities.

@henrypinkard While playing with this I noticed that passing an img_process_fn causes an error if the camera frame is not grayscale but RGB or RGBA. It complains about incompatible array shapes. I discovered this when I created a micromanger configuration with the OpenCV camera driver (which returned RGBA for my webcam) rather than the simulated camera (which is grayscale). I didn’t log the error message at the time.

Good to know, I made an issue to track (https://github.com/micro-manager/pycro-manager/issues/202)

You can also control the core through pycromanager (and it has almost the exact same api as pymmcore)

1 Like

Here’s my cleaned up code.

I have one thread acquiring / simulating images and adding them to a list, another thread reading / deleting from the list and pushing to napari.

Display is updated for each “slice” - so each volume will rebuild from top to bottom. I keep track of z-position with a counter & modulo division; it’s written to and read from the same list as the images.

Tested with simulated data (each image of constant brightness, increases over the acquisition -> easy to see glitches … there are some, but not sure if they would be an issue with an actual setup) and uManager demo camera. Didn’t test with real setup yet.

Interestingly, code seems to run a bit more stably if executed from jupyter-lab notebook than from .py.

Thanks everyone for the help!

Code
import time
import numpy as np

import napari
from napari.qt import thread_worker

from dask import delayed
import dask.array as da

from pycromanager import Dataset, Acquisition, multi_d_acquisition_events

##################################################
#                 define constants               #
##################################################

# number of iterations. Ideally would be while loop and stop condition later
N = 10
# image size. Ideally would be read from experiment
size = [512,512]
# um / px for scaling in napari
size_um = [1, 1]
# start in um, end in um, number of slices, active slice
z_range = [0, 512, 50, 0]
# sleep time for testing with simulated data
sleep_time = 0.2
# contrast limits for display. read from data?
clim = [0, 6000]
# color map for display
cmap = 'gray'
# flag whether acquisition finished
acq_done = False
# initialize empty list for images
img_list = []


# custom numpy data type to hold z-position and image data 
img_pos_dtype = np.dtype([('pos', np.uint16), ('img', np.float64, size)])
img_pos = np.empty((1), dtype = img_pos_dtype)


##################################################
#             fnc's appending images             #
##################################################

def create_image(b, size = [128,128]):
    """fnc to simulate an image of constant brightness.
        Keeps track of z-position in stacks and appends
        both image and z-pos to list with custom dtype.
        Can be replaced with uManager img_process_fn.
        Inputs: int b: brightness
                np.array size: # of px in image in xy.
        """
    image = np.ones(size) * b
    global img_list   
    global z_range
    
    img_pos["pos"] = z_range[3]
    img_pos["img"] = image
    img_list.append(img_pos)
    
    z_range[3]= (z_range[3]+1) % z_range[2]
    
    #return image
        

def grab_image(image, metadata):
    """image_process_fnc to grab image from uManager.
        Keeps track of z-position in stacks and appends
        both image and z-pos to list with custom dtype.
        Can be replaced with uManager img_process_fn.
        Inputs: np.array size: # of px in image in xy.
        """
    global img_list   
    global z_range
    
    img_pos["pos"] = z_range[3]
    img_pos["img"] = np.float64(image)
    img_list.append(img_pos)
    
    z_range[3]= (z_range[3]+1) % z_range[2]
   
    return image, metadata


##################################################
# "main" napari & threads to create / write imgs #
##################################################

with napari.gui_qt():
    """Starts Napari viewer with in which to display the tiffs.    
    Original code copied from:
    https://github.com/napari/napari/blob/1fccbdcdd5c9ca05b4d6e670407e56f3fa305738/examples/live_tiffs_generator.py
    and:
    https://github.com/napari/napari/blob/1fccbdcdd5c9ca05b4d6e670407e56f3fa305738/examples/live_tiffs.py
    """
        
    viewer = napari.Viewer(ndisplay=2)

    print("starting threads...")
 
    def display_napari(img_pos):
        """Appends the image to viewer.        
        Parameters
        ----------
        delayed_image : dask.delayed function object
        """
        if img_pos is None:
            return
        
        delayed_image = delayed(img_pos["img"])
        z_pos = img_pos["pos"]
        image_dtype = np.float64
      
        if viewer.layers:
            # layer is present, append to its data
            layer = viewer.layers[0]
            image_shape = layer.data.shape[1:]
            image_dtype = layer.data.dtype
            image = da.from_delayed(delayed_image, shape=image_shape,
                                    dtype=image_dtype).reshape((1,) + image_shape)
        else:
            # first run, no layer added yet
            image = delayed_image.compute()
            img_dtype = image.dtype
            image = da.from_delayed(delayed_image, shape=image.shape, 
                                    dtype=img_dtype,).reshape((1,) + image.shape)  
            layer = viewer.add_image(np.zeros([z_range[2], size[0], size[1]]), 
                                     name = 'uManager',
                                     colormap = cmap,
                                     interpolation = 'nearest',
                                     blending = 'additive',
                                     rendering = 'attenuated_mip',
                                     scale = [z_range[1]/z_range[2], size_um[1], size_um[0]],
                                     contrast_limits = clim )
            viewer.dims._roll()
            
        # overwriting whole data and not just the slice seems necessary 
        # for napari to update dynamically
        data = layer.data
        data[z_pos] = np.squeeze(image)
        layer.data = data

        # we want to show the last file added in the viewer to do so we want to
        # put the slider at the very end. But, sometimes when user is scrolling
        # through the previous slide then it is annoying to jump to last
        # stack as it gets added. To avoid that jump we 1st check where
        # the scroll is and if its not at the last slide then don't move the slider.
        if viewer.dims.point[0] >= layer.data.shape[0] - 2:
            viewer.dims.set_point(0, layer.data.shape[0] - 1)
                        
    @thread_worker(connect={'yielded': display_napari})
    def yield_img(img_list):
        global acq_done

        current_imgs = []
        end_of_acquisition = False
        while not end_of_acquisition:
            imgs_to_process = []
            current_imgs = img_list

            # Check if the end of acquisition has been reached
            # stop while loop and process remaining image(s)
            if acq_done == True:
                print("acuisition done")
                imgs_to_process = current_imgs.copy()
                end_of_acquisition = True

            elif len(current_imgs) > 1:
                # if there's more than one image in the list"
                # enroll all but the last image for processing
                # remove all but the last image from list
                imgs_to_process = current_imgs.copy()[:-1]
                del current_imgs[:len(imgs_to_process)]    
            # yield every file to process
            for p in imgs_to_process:
                yield p
            else:
                yield
            
    @thread_worker
    def append_img(img_list, N):
        """worker thread that adds images to a list"""
        for ii in range(N):
            # uncomment when working with dummy data
            # comment when working with uManager
            #for zz in range(z_range[2]):
            #    brightness = (ii+1) * (zz+1) / ((N+1) * (z_range[2]+1)) * clim[1]
            #    create_image(brightness, size)
            
            # uncomment when working with uManager
            # comment when working with dummy data
            with Acquisition(directory=None, name=None, 
                             show_display=False, 
                             image_process_fn = grab_image) as acq:
                events = []
                for index, z_um in enumerate(np.linspace(z_range[0], z_range[1], z_range[2])):
                    evt = {"axes": {"z": index}, "z": z_um}
                    events.append(evt)
                acq.acquire(events)
                time.sleep(sleep_time)
                
            
        global acq_done 
        acq_done = True

    worker1 = append_img(img_list, N)
    worker2 = yield_img(img_list)

    worker1.start()
    #worker2.start()

Awesome!

You should pull request the notebook to pycro-manager so it can go to the website. I’m sure there are others who this could really benefit