AICSImageIO and multiprocessing

Hi,

I am wondering how to correctly / most efficiently use AICSImageIO with my image analysis pipeline. I have to read and analyze hundreds of Gigabits of CZI files per week. I am using skimage and openCV for the image processing part and keras for object classification. My current pipeline looks like this:

With multiprocessing:

  1. Read about 10 images which fits in memory (with czifile.py)
  2. Segmentation and object detection with skimage and openCV
  3. Classification of ROIs with keras.
  4. Continue with the next 10 images…

I am looking for an example or tutorial of similar task but using AICSImageIO. I am a bit confused whether to use multiprocessing, dask or dask-image with step 2 and 3.

Any tips would be very much appreciated,
thanks in advance

For large scale batch processing, I strongly suggest using dask with the dask-distributed client. It makes it very easy to migrate between local machines and clusters, just by changing one line of code (using LocalCluster to work on a single machine). I have a long script here that uses dask-jobqueue to submit jobs to my university compute cluster — but you could do exactly the same with LocalCluster.

There’s a lot of “irrelevant” information in the script but the important bit is futures = client.map(my_longrunning_func, my_data_list) followed by

for future in tqdm(as_completed(futures), total=len(my_data_list)):
    result = future.result()
    do_something_with(result)

The client will manage RAM for you, and run as many parallel jobs as it can while staying within the RAM limits. In short, dask has done a lot of this legwork for you so you don’t have to!

My script is organised a bit differently from yours — I process all images (500GB) in each step before moving on to the next step, because I want the intermediate outputs. But either approach would work.

@jni it’s really interesting to see how people use dask in the imaging context! I have been using dask a lot recently to process large datasets on a cluster, but usually I end up using delayed on image processing functions that run in parallel e.g.

dask.compute([dask.delayed(my_analysis_fun)(im) for im in image_list])

For me this has the advantage that I can run the same code entirely without dask by changing that single line to

[my_analysis_fun(im) for im in image_list]

It works fine, but seeing that you use client.map, client.submit etc. I’m wondering whether you know about specific pros/cons of these different approaches. I know that if you move large data between workers and local process, it’s better to use map, submit etc. but it doesn’t seem to be the case in your example.

1 Like

Hi @guiwitz!

it’s really interesting to see how people use dask in the imaging context!

Agreed! To be honest I don’t think we have quite converged on “best practices”, but there are some interesting patterns emerging!

For me this has the advantage that I can run the same code entirely without dask by changing that single line to

But the problem is that this requires changing code in many places — every function call. I prefer solutions that only require one line of change. I used to worry about making dask an optional dependency, but I think it is now mature enough that I am comfortable depending on it, and using its built-in functions to control execution.

I know that if you move large data between workers and local process

For me, in this case, it was a matter of using dask-jobqueue with the cluster. Clusters have always demanded a significant cognitive overhead because you had to do some mental calculations about how many resources you needed. Usually this meant oversubscribing by a lot (say, 10x), and consequently getting stuck in the queue for a long time. dask-jobqueue changes the game because it submits jobs for you, and those jobs can be small, say, 4 cores for 15min, so they just sneak in whenever there is space available. Once a 15min job finishes, it spins up another to replace it. And so on. It’s really quite magical.

And, as mentioned above, I can in two line changes replace the SLURMCluster with a LocalCluster to test my code runs locally on a subset of the data. This makes debugging large jobs much easier. (There are of course additional issues when running on a cluster — see this discussion for example, though don’t miss the resolution in this comment — but the bulk of the work can be done locally with minimal transition pain to the cluster.

So, in summary, it was not about whether or not I was moving data, but about using the advanced schedulers that are available now with dask.distributed. I expect that things will continue to get better on this front. Also, the distributed schedulers come with a lot of diagnostics, though tbh I have used those less. But I am sure they would be great to identify performance issues.

1 Like

One thing I would add about your solution @guiwitz: it can be made into a one-line solution if you don’t mind depending on dask. Then you can write:

# delayed = dask.delayed
delayed = lambda x: x  # pass through

result = dask.compute([delayed(my_analysis_fun)(im) for im in image_list])

This will work because dask.compute does nothing to non-dask arguments in a list.

Just curious:

I like the pattern of using map as in map(function, images).
So during initial development I would just use pythons built-in map for debugging.
Then, if I want to speed things up I would monkey-patch map to use the map for ThreadPoolExecutor or ProcessPoolExecutor. Am I correct that dask’s client.map could also be used as drop-in replacement or is the handling of the futures slightly different from the other ones?

Usually I chain the different functions of the pipeline together using multiple map(...) calls and in placed where I need the results I turn things into a list.

This approach is probably equivalent to your list comprehensions but probably more of a functional programming apporach.

Your suspicions that there are differences lurking are well-founded. Since your results are Futures and not whatever the original value of your function call was, you would need either all your functions to be Future-aware (ie "is this a Future, if so, grab the .result" for every input), or you need to unpack the Futures in an intermediate step.

This is the price you pay for client.map returning instantly.

You can look at the dask.distributed.Client documentation for details. The easiest intermediate step is results = client.gather(list_of_futures). That will block until all the results are done and give you a list of your results.

Anyway, that documentation page has a lot of interesting discussion about whether this is a good idea. :joy:

1 Like

That’s a great idea! At the moment I’m stupidly redefining all my functions as myfun = delayed(myfun) if dask is used. That will make my code waaaaay simpler. Thanks!