I am trying to write some code using dask.distributed.Client and rioxarray to_raster that:
- Concatenates two rasters (dask arrays)
- Applies a function across all blocks in the concatenated array
- Writes the final array to a ".tif" file
However the code fails either with a ValueError: Lock is not yet acquired or AttributeError: Can't pickle local object 'map_blocks.<locals>._wrapper' error depending on what type of lock I use.
I'm using a local cluster and want to run the code across multiple threads and 1 worker. I've written the following minimum reproducible example:
def test():
from osgeo import gdal
import rasterio
from rasterio.transform import Affine
import xarray as xr
import rioxarray
from dask.distributed import Client, Lock
import numpy as np
# Create georeferenced example rasters
x = np.linspace(-90, 90, 100)
y = np.linspace(90, -90, 100)
X, Y = np.meshgrid(x, y)
Z1 = np.abs(((X - 10) ** 2 + (Y - 10) ** 2) / 1 ** 2)
Z2 = np.abs(((X + 10) ** 2 + (Y + 10) ** 2) / 2.5 ** 2)
Z = (Z1 - Z2)
xres = (x[-1] - x[0]) / len(x)
yres = (y[-1] - y[0]) / len(y)
transform = Affine.translation(x[0] - xres / 2, y[0] - yres / 2) * Affine.scale(xres, yres)
with rasterio.open(
"example.tif",
mode="w",
driver="GTiff",
height=Z.shape[0],
width=Z.shape[1],
count=1,
dtype=Z.dtype,
crs="+proj=latlong",
transform=transform,
) as new_dataset:
new_dataset.write(Z, 1)
with rasterio.open(
"example2.tif",
mode="w",
driver="GTiff",
height=Z.shape[0],
width=Z.shape[1],
count=1,
dtype=Z.dtype,
crs="+proj=latlong",
transform=transform,
) as new_dataset:
new_dataset.write(Z, 1)
# Use dask distributed and rioxarray to open the rasters
chunkDimsX = 10
chunkDimsY = 10
with Client(threads_per_worker=6, n_workers=1, processes=False) as client:
with rioxarray.open_rasterio("example.tif", chunks=(chunkDimsX, chunkDimsY), lock=False) as xds:
with rioxarray.open_rasterio("example2.tif", chunks=(chunkDimsX, chunkDimsY), lock=False) as xds2:
concatRaster = xr.concat([xds, xds2], dim='band').chunk({'band': 1, 'x': chunkDimsX, 'y': chunkDimsY})
concatRaster = concatRaster.map_blocks(lambda x: x + 1, template=concatRaster)
concatRaster.rio.write_nodata(0, inplace=True)
concatRaster.rio.to_raster('concat.tif', tiled=True, lock = Lock(client=client), overwrite=True)
if __name__ == '__main__':
test()
When I run the above, I get the following error message:
Traceback (most recent call last):
File "<stdin>", line 2, in <module>
File "<stdin>", line 52, in test
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\rioxarray\raster_array.py", line 1125, in to_raster
return RasterioWriter(raster_path=raster_path).to_raster(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\rioxarray\raster_writer.py", line 295, in to_raster
return dask.array.store(
^^^^^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\dask\array\core.py", line 1236, in store
compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\dask\base.py", line 402, in compute_as_if_collection
return schedule(dsk2, keys, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\client.py", line 3275, in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\client.py", line 2372, in gather
return self.sync(
^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\lock.py", line 163, in release
raise ValueError("Lock is not yet acquired")
^^^^^^^^^^^^^^^
ValueError: Lock is not yet acquired
I tried changing the Lock argument to be True instead of the dask.distributed client:
concatRaster.rio.to_raster('concat.tif', tiled=True, lock = True, overwrite=True)
But I get the following error:
2024-03-22 10:21:45,634 - distributed.protocol.pickle - ERROR - Failed to serialize <ToPickle: HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x1a0162f93d0>
0. 1787078771392
>.
Traceback (most recent call last):
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\pickle.py", line 63, in dumps
result = pickle.dumps(x, **dump_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't pickle local object 'map_blocks.<locals>._wrapper'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\pickle.py", line 68, in dumps
pickler.dump(x)
AttributeError: Can't pickle local object 'map_blocks.<locals>._wrapper'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\pickle.py", line 81, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\cloudpickle\cloudpickle.py", line 1479, in dumps
cp.dump(obj)
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\cloudpickle\cloudpickle.py", line 1245, in dump
return super().dump(obj)
^^^^^^^^^^^^^^^^^
TypeError: cannot pickle '_thread.lock' object
Traceback (most recent call last):
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\pickle.py", line 63, in dumps
result = pickle.dumps(x, **dump_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't pickle local object 'map_blocks.<locals>._wrapper'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\pickle.py", line 68, in dumps
pickler.dump(x)
AttributeError: Can't pickle local object 'map_blocks.<locals>._wrapper'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\serialize.py", line 353, in serialize
header, frames = dumps(x, context=context) if wants_context else dumps(x)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\serialize.py", line 76, in pickle_dumps
frames[0] = pickle.dumps(
^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\pickle.py", line 81, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\cloudpickle\cloudpickle.py", line 1479, in dumps
cp.dump(obj)
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\cloudpickle\cloudpickle.py", line 1245, in dump
return super().dump(obj)
^^^^^^^^^^^^^^^^^
TypeError: cannot pickle '_thread.lock' object
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "<stdin>", line 2, in <module>
File "<stdin>", line 52, in test
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\rioxarray\raster_array.py", line 1125, in to_raster
return RasterioWriter(raster_path=raster_path).to_raster(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\rioxarray\raster_writer.py", line 295, in to_raster
return dask.array.store(
^^^^^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\dask\array\core.py", line 1236, in store
compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\dask\base.py", line 402, in compute_as_if_collection
return schedule(dsk2, keys, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\client.py", line 3255, in get
futures = self._graph_to_futures(
^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\client.py", line 3151, in _graph_to_futures
header, frames = serialize(ToPickle(dsk), on_error="raise")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\birch\miniconda3\envs\foresceStsimConnector\foresce-stsim-connector-py-conda\Lib\site-packages\distributed\protocol\serialize.py", line 379, in serialize
raise TypeError(msg, str_x) from exc
TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x1a0162f93d0>\n 0. 1787078771392\n>')
It looks like it's related to the dask array map_blocks method.
I'm fairly inexperienced using dask, and seek help with deciphering why these error messages are occurring.
from dask.utils import SerializableLockThen when writing my raster, setting thelockargument in the following way:concatRaster.rio.to_raster("concat.tif", tiled = True, lock = SerializableLock(), windowed=True, overwrite = True)