-
-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New Dask + ITK blogpost #138
Comments
Link to the data: https://drive.google.com/drive/folders/13mpIfqspKTIINkfoWbFsVtFF8D7jbTqJ (linked to from this earlier blogpost about image loading) Link to the PSF: https://drive.google.com/drive/folders/13udO-h9epItG5MNWBp0VxBkKCllYBLQF (discussed here)
|
Nope, it doesn't work. Previously, we had hoped that the simpler application of itk's richardson lucy deconvolution would now be possible with map_blocks. Unfortunately, it looks like there is still some kind of problem blocking it from happening. deconvolved = da.map_blocks(itk.richardson_lucy_deconvolution_image_filter, imgs, kernel=psf, iterations=1, dtype=np.float32) I wish I had a good error message or something to add here. However, whatever the problem is only happens during the compute call, so I'm not getting anything useful out to help with debugging. I'm pretty sure the problem is not related to the leading dimensions, because I can use np.expand_dims on regular numpy array input to I don't think I have more time to play with this, sadly. cc @thewtex |
@GenevieveBuckley yes, absolutely, great plan! Will be you at SciPy US this year to hash this out? 🚶♀️ 🤝 @PranjalSahu built on your work, made a few fixes: https://www.youtube.com/watch?v=CMmoa8pP_eo These are now published in |
Well, I don't know if there is a plan anymore, since the current situation for me is "it doesn't work and I can't summarize why". I guess having this issue thread discussion is better than nothing, though.
Nope! (I was recently in the US, but didn't realize at the time that would have meant a better opportunity to videochat while in the same timezone. Oh well, we could still videochat, it's just a lot more inconvenient)
Very cool! I will have to check the talk out 😄
We'll have to check, this could be promising! |
Update: Unfortunately it looks like there are still problems. I see this error when I try to compute the deconvolution with itk import itk
import numpy as np
from dask.array.image import imread
imgs = imread('raw-488nm/*.tif').astype(np.float32)
psf = imread('psfs_z0p1/PSF_488nm_dz100nm.tif').astype(np.float32)
deconvolved = da.map_blocks(itk.richardson_lucy_deconvolution_image_filter,
imgs,
kernel_image=psf,
number_of_iterations=1,
dtype=np.float32)
output = deconvolved[100, ...].compute() Traceback: 2022-07-12 17:55:14,883 - distributed.nanny - WARNING - Restarting worker
2022-07-12 17:55:16,364 - distributed.nanny - WARNING - Restarting worker
2022-07-12 17:55:17,415 - distributed.nanny - WARNING - Restarting worker
2022-07-12 17:55:20,493 - distributed.nanny - WARNING - Restarting worker
---------------------------------------------------------------------------
KilledWorker Traceback (most recent call last)
Input In [16], in <cell line: 1>()
----> 1 output = deconvolved[100, ...].compute()
2 print(output.shape)
File ~\.conda\envs\itk-dask\lib\site-packages\dask\base.py:314, in DaskMethodsMixin.compute(self, **kwargs)
290 def compute(self, **kwargs):
291 """Compute this dask collection
292
293 This turns a lazy Dask collection into its in-memory equivalent.
(...)
312 dask.base.compute
313 """
--> 314 (result,) = compute(self, traverse=False, **kwargs)
315 return result
File ~\.conda\envs\itk-dask\lib\site-packages\dask\base.py:602, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
599 keys.append(x.__dask_keys__())
600 postcomputes.append(x.__dask_postcompute__())
--> 602 results = schedule(dsk, keys, **kwargs)
603 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~\.conda\envs\itk-dask\lib\site-packages\distributed\client.py:3000, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2998 should_rejoin = False
2999 try:
-> 3000 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
3001 finally:
3002 for f in futures.values():
File ~\.conda\envs\itk-dask\lib\site-packages\distributed\client.py:2174, in Client.gather(self, futures, errors, direct, asynchronous)
2172 else:
2173 local_worker = None
-> 2174 return self.sync(
2175 self._gather,
2176 futures,
2177 errors=errors,
2178 direct=direct,
2179 local_worker=local_worker,
2180 asynchronous=asynchronous,
2181 )
File ~\.conda\envs\itk-dask\lib\site-packages\distributed\utils.py:336, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
334 return future
335 else:
--> 336 return sync(
337 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
338 )
File ~\.conda\envs\itk-dask\lib\site-packages\distributed\utils.py:403, in sync(loop, func, callback_timeout, *args, **kwargs)
401 if error:
402 typ, exc, tb = error
--> 403 raise exc.with_traceback(tb)
404 else:
405 return result
File ~\.conda\envs\itk-dask\lib\site-packages\distributed\utils.py:376, in sync.<locals>.f()
374 future = asyncio.wait_for(future, callback_timeout)
375 future = asyncio.ensure_future(future)
--> 376 result = yield future
377 except Exception:
378 error = sys.exc_info()
File ~\.conda\envs\itk-dask\lib\site-packages\tornado\gen.py:769, in Runner.run(self)
766 exc_info = None
768 try:
--> 769 value = future.result()
770 except Exception:
771 exc_info = sys.exc_info()
File ~\.conda\envs\itk-dask\lib\site-packages\distributed\client.py:2037, in Client._gather(self, futures, errors, direct, local_worker)
2035 exc = CancelledError(key)
2036 else:
-> 2037 raise exception.with_traceback(traceback)
2038 raise exc
2039 if errors == "skip":
KilledWorker: ("('richardson_lucy_deconvolution_image_filter-getitem-21573412bfdc1b613ca3e9d5e79d94bb', 0, 0, 0)", <WorkerState 'tcp://127.0.0.1:58059', name: 1, status: closed, memory: 0, processing: 1>)
|
I will also test it on my system. |
I tested it on Linux and the computation could finish for this code: I tested it locally. Did you perform computation on some remote server? Ok I can reproduce the error when using LocalCluster with Dask.
|
@GenevieveBuckley Ok I got it to work. I still don't know the root cause but I did something similar before.
|
Oh, that's weird - if you import itk within the function it works for you, but not if itk is imported outside of it? That's very odd. I'll give that a go (tomorrow, probably) |
I have tested it on entire dataset using 2 workers each with 32 GB limit.
Also I see some "ConnectionResetError: [Errno 104] Connection reset by peer" errors. But no computation fails. |
Confirming this, I also see it working if itk is imported inside the function passed to dask's map_blocks, but not for the same thing with the import located outside the function. |
This PR by Matt McCormick could fix the problem: InsightSoftwareConsortium/ITK#3494 |
Status: ITK PR #3494 has fixed the problem! We will wait for the next ITK release candidate (higher than the current v5.3rc03) to become available, and then write the blogpost. It'll be much better to release the blogpost when readers can install the ITK pre-release and try things themselves (plus it'll be easier to write then). Ideally, the blogpost should ideally have two examples:
import dask
import itk
from dask import delayed
if __name__== "__main__":
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
@delayed
def performsum(m):
return m.GetNumberOfPoints()
m = itk.Mesh[itk.F, 3].New()
a = performsum(m).compute()
print(a) |
@thewtex emailed me recently, and I'm copying my reply into this thread so the information is shared publically here too :) Hi Matt, Thanks for sending that feedback. Sorry for my slightly delayed reply, I saw your email and needed to dig into my old files to answer properly. I did do a little bit more work for the ITK + Dask blogpost example. It seems the last time I touched this was in August last year, so I'm a little hazy on the details now. It mostly worked, but with some problems, and I found it very time consuming to work on so I put it down again. Github issue link: #138 NOTEBOOKS EDIT: I've uploaded the notebooks to this public gist: https://gist.github.com/GenevieveBuckley/4ad4282038a9ec49e548898c78d3b590
EXAMPLE DATA Link to the data: https://drive.google.com/drive/folders/13mpIfqspKTIINkfoWbFsVtFF8D7jbTqJ (linked to from this earlier blogpost about image loading) Link to the PSF: https://drive.google.com/drive/folders/13udO-h9epItG5MNWBp0VxBkKCllYBLQF (discussed here) Here's a tree view of what data files I have on disk next to the notebooks:
|
@GenevieveBuckley amazing work!! I am excited to share our lessons learned and progress on the blog! Regarding, the error on the second notebook:
There is a call:
But the input turned out to not be an Regarding the second / third notebook:
This is because kernel argument needs to be I also found that we needed to explicitly extract the time dimension for the function in import dask.array as da
import itk
import numpy as np
iterations = 1
def deconvolve_timepoint(img, kernel_image, number_of_iterations):
timepoint = np.squeeze(img, axis=0)
deconvolved = itk.richardson_lucy_deconvolution_image_filter(timepoint,
kernel_image=kernel_image,
number_of_iterations=number_of_iterations)
return np.expand_dims(deconvolved, 0)
deconvolved = da.map_blocks(
deconvolve_timepoint,
imgs[:4,:,:,:],
kernel_image=psf,
number_of_iterations=iterations,
dtype=np.float32,
)
deconvolved However, if So, maybe we should write a post or two as intermediaries to demonstrate / discuss writing out the original data as OME-Zarr? And the processed result as OME-Zarr? This would facilitate processing, memory usage, and visualization. What do you think? |
a) Thank you! I did not realize I made a mistake and needed b) We still have a problem. Yes, relying on No, I can't run Full error message:>>> deconvolved_result = deconvolved.compute()
TiffTag 32781: coercing invalid ASCII to bytes
TiffTag 32781: coercing invalid ASCII to bytes
TiffTag 32781: coercing invalid ASCII to bytes
TiffTag 32781: coercing invalid ASCII to bytes
TiffTag 32781: coercing invalid ASCII to bytes
TiffTag 32781: coercing invalid ASCII to bytes
TiffTag 32781: coercing invalid ASCII to bytes
TiffTag 32781: coercing invalid ASCII to bytes
TiffTag 32781: coercing invalid ASCII to bytes
TiffTag 32781: coercing invalid ASCII to bytes
TiffTag 32781: coercing invalid ASCII to bytes
TiffTag 32781: coercing invalid ASCII to bytes
TiffTag 32781: coercing invalid ASCII to bytes
TiffTag 32781: coercing invalid ASCII to bytes
TiffTag 32781: coercing invalid ASCII to bytes
TiffTag 32781: coercing invalid ASCII to bytes
TiffTag 32781: coercing invalid ASCII to bytes
TiffTag 32781: coercing invalid ASCII to bytes
TiffTag 32781: coercing invalid ASCII to bytes
TiffTag 32781: coercing invalid ASCII to bytes
TiffTag 32781: coercing invalid ASCII to bytes
TiffTag 32781: coercing invalid ASCII to bytes
TiffTag 32781: coercing invalid ASCII to bytes
TiffTag 32781: coercing invalid ASCII to bytes
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Input In [21], in <cell line: 1>()
----> 1 deconvolved_result = deconvolved.compute()
2 print(deconvolved_result.shape)
File ~/em95/miniconda/conda/envs/itk-dask/lib/python3.10/site-packages/dask/base.py:315, in DaskMethodsMixin.compute(self, **kwargs)
291 def compute(self, **kwargs):
292 """Compute this dask collection
293
294 This turns a lazy Dask collection into its in-memory equivalent.
(...)
313 dask.base.compute
314 """
--> 315 (result,) = compute(self, traverse=False, **kwargs)
316 return result
File ~/em95/miniconda/conda/envs/itk-dask/lib/python3.10/site-packages/dask/base.py:603, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
600 keys.append(x.__dask_keys__())
601 postcomputes.append(x.__dask_postcompute__())
--> 603 results = schedule(dsk, keys, **kwargs)
604 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~/em95/miniconda/conda/envs/itk-dask/lib/python3.10/site-packages/dask/threaded.py:89, in get(dsk, keys, cache, num_workers, pool, **kwargs)
86 elif isinstance(pool, multiprocessing.pool.Pool):
87 pool = MultiprocessingPoolExecutor(pool)
---> 89 results = get_async(
90 pool.submit,
91 pool._max_workers,
92 dsk,
93 keys,
94 cache=cache,
95 get_id=_thread_get_id,
96 pack_exception=pack_exception,
97 **kwargs,
98 )
100 # Cleanup pools associated to dead threads
101 with pools_lock:
File ~/em95/miniconda/conda/envs/itk-dask/lib/python3.10/site-packages/dask/local.py:511, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
509 _execute_task(task, data) # Re-execute locally
510 else:
--> 511 raise_exception(exc, tb)
512 res, worker_id = loads(res_info)
513 state["cache"][key] = res
File ~/em95/miniconda/conda/envs/itk-dask/lib/python3.10/site-packages/dask/local.py:319, in reraise(exc, tb)
317 if exc.__traceback__ is not tb:
318 raise exc.with_traceback(tb)
--> 319 raise exc
File ~/em95/miniconda/conda/envs/itk-dask/lib/python3.10/site-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
222 try:
223 task, data = loads(task_info)
--> 224 result = _execute_task(task, data)
225 id = get_id()
226 result = dumps((result, id))
File ~/em95/miniconda/conda/envs/itk-dask/lib/python3.10/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File ~/em95/miniconda/conda/envs/itk-dask/lib/python3.10/site-packages/dask/optimization.py:990, in SubgraphCallable.__call__(self, *args)
988 if not len(args) == len(self.inkeys):
989 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
File ~/em95/miniconda/conda/envs/itk-dask/lib/python3.10/site-packages/dask/core.py:149, in get(dsk, out, cache)
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)
File ~/em95/miniconda/conda/envs/itk-dask/lib/python3.10/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File ~/em95/miniconda/conda/envs/itk-dask/lib/python3.10/site-packages/dask/utils.py:41, in apply(func, args, kwargs)
39 def apply(func, args, kwargs=None):
40 if kwargs:
---> 41 return func(*args, **kwargs)
42 else:
43 return func(*args)
File ~/em95/miniconda/conda/envs/itk-dask/lib/python3.10/site-packages/itk/support/helpers.py:139, in accept_array_like_xarray_torch.<locals>.image_filter_wrapper(*args, **kwargs)
135 kwargs[key] = image
137 if have_xarray_input or have_torch_input or have_array_input:
138 # Convert output itk.Image's to numpy.ndarray's
--> 139 output = image_filter(*tuple(args_list), **kwargs)
140 if isinstance(output, tuple):
141 output_list = list(output)
File ~/em95/miniconda/conda/envs/itk-dask/lib/python3.10/site-packages/itk/itkRichardsonLucyDeconvolutionImageFilterPython.py:876, in richardson_lucy_deconvolution_image_filter(number_of_iterations, stop_iteration, size_greatest_prime_factor, boundary_condition, kernel_image, normalize, output_region_mode, *args, **kwargs)
873 specified_kwarg_typehints = { k:v for (k,v) in kwarg_typehints.items() if kwarg_typehints[k] is not ... }
874 kwargs.update(specified_kwarg_typehints)
--> 876 instance = itk.RichardsonLucyDeconvolutionImageFilter.New(*args, **kwargs)
877 return instance.__internal_call__()
File ~/em95/miniconda/conda/envs/itk-dask/lib/python3.10/site-packages/itk/support/template_class.py:735, in itkTemplate.New(self, *args, **kwargs)
732 import itk
734 raise itk.TemplateTypeError(self, input_type)
--> 735 return self[list(keys)[0]].New(*args, **kwargs)
File ~/em95/miniconda/conda/envs/itk-dask/lib/python3.10/site-packages/itk/itkRichardsonLucyDeconvolutionImageFilterPython.py:252, in itkRichardsonLucyDeconvolutionImageFilterID4ID4.New(*args, **kargs)
250 obj = itkRichardsonLucyDeconvolutionImageFilterID4ID4.__New_orig__()
251 from itk.support import template_class
--> 252 template_class.New(obj, *args, **kargs)
253 return obj
File ~/em95/miniconda/conda/envs/itk-dask/lib/python3.10/site-packages/itk/support/template_class.py:800, in New(self, *args, **kargs)
797 def New(self, *args, **kargs):
798 import itk
--> 800 itk.set_inputs(self, args, kargs)
802 # now, try to add observer to display progress
803 if "auto_progress" in kargs.keys():
File ~/em95/miniconda/conda/envs/itk-dask/lib/python3.10/site-packages/itk/support/extras.py:1523, in set_inputs(new_itk_object, inargs, inkargs)
1521 attrib(itk.output(value))
1522 else:
-> 1523 attrib(itk.output(value))
TypeError: Expecting argument of type itkImageD4 or itkImageSourceID4. Do you have any idea why the ITK class of the
c) I'm supportive, but not entirely sure what you have in mind when you say this:
|
I should dig deeper, but this may be expected.
This can be addressed with the function that operates on a timepoint explicitly: def deconvolve_timepoint(img, kernel_image, number_of_iterations):
timepoint = np.squeeze(img, axis=0)
deconvolved = itk.richardson_lucy_deconvolution_image_filter(timepoint,
kernel_image=kernel_image,
number_of_iterations=number_of_iterations)
return np.expand_dims(deconvolved, 0)
deconvolved = da.map_blocks(
deconvolve_timepoint,
imgs[:4,:,:,:],
kernel_image=psf,
number_of_iterations=iterations,
dtype=np.float32,
)
deconvolved
👍 I'll work on more details. |
Hi there, I know this is bit off topic and I'm sorry for that, but there has been a topic on Dask discourse asking for advice about using ITK with Dask. I thought you might be able to give some insights there too. In this topic, there is a link to an ITK blog post announcing Dask compatibility, but I've not been able to really find what that meant. |
@guillaumeeb thanks for the note. I followed up on the Dask Discourse thread. |
Now that [release v5.3rc03 of ITK is available (which should include this PR), it would be good to do a follow up blogpost to this one about using Dask + ITK together.
The purpose of this would be:
The first step is re-running the code from the earlier blogpost with ITK v5.3rc03 or above and seeing whether that works or not. Then we write up whatever we find.
Here are the comments specifically discussing what should be included in a followup blogpost:
Links:
The text was updated successfully, but these errors were encountered: