0

i am trying to assign a dask array to a dask dataframe but came across this weird issue.

Example:

import dask
import dask.dataframe as dd
import dask.array as da

# create a random dataframe like official documentation does
ts_df = dask.datasets.timeseries(
    '2000', '2003', freq='2H', partition_freq='2Y'
)

print(ts_df.head())

"""
                      id    name         x         y         z
timestamp                                                     
2000-12-31 00:00:00  962     Tim -0.562319 -0.331690 -0.894009
2000-12-31 02:00:00  995   Frank -0.952083 -0.725073 -1.677156
2000-12-31 04:00:00  932     Bob  0.579130  0.923146  1.502276
2000-12-31 06:00:00  997  Ursula  0.120449 -0.059130  0.061319
2000-12-31 08:00:00  975  Victor -0.947120  0.647456 -0.299664
"""

# assign a new column using dask Series works fine
ts_df['z'] = ts_df.x + ts_df.y

# da.where() returns dask.array.core.Array
new_col = da.where(
    ((ts_df.x >= 0) | (ts_df.y >= 0)),
   'N/A',
    ts_df.x.astype(str) + 'x' + ts_df.y.astype(str)
)

# assign a new column using dask array runs into error
ts_df['test'] = new_col
---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
<ipython-input-224-392b07690d97> in <module>
      2     ((ts_df.x >= 0) | (ts_df.y >= 0)),
      3     'N/A',
----> 4     ts_df.x.astype(str) + 'x' + ts_df.y.astype(str)
      5 )

~/venvs/se/lib/python3.7/site-packages/dask/dataframe/core.py in __setitem__(self, key, value)
   3373             df = self.assign(**{k: value for k in key})
   3374         else:
-> 3375             df = self.assign(**{key: value})
   3376 
   3377         self.dask = df.dask

~/venvs/se/lib/python3.7/site-packages/dask/dataframe/core.py in assign(self, **kwargs)
   3633                         )
   3634                     )
-> 3635                 kwargs[k] = from_dask_array(v, index=self.index)
   3636 
   3637         pairs = list(sum(kwargs.items(), ()))

~/venvs/se/lib/python3.7/site-packages/dask/dataframe/io/io.py in from_dask_array(x, columns, index)
    416     dask.dataframe._Frame.to_records: Reverse conversion
    417     """
--> 418     meta = _meta_from_array(x, columns, index)
    419 
    420     if x.ndim == 2 and len(x.chunks[1]) > 1:

~/venvs/se/lib/python3.7/site-packages/dask/dataframe/io/io.py in _meta_from_array(x, columns, index)
     57         )
     58     else:
---> 59         if np.isnan(x.shape[1]):
     60             raise ValueError("Shape along axis 1 must be known")
     61         if columns is None:

IndexError: tuple index out of range

How should I assign a new column to dask dataframe correctly? if I do ts_df['test'] = tt.compute() it returns TypeError: Column assignment doesn't support type numpy.ndarray which makes sense since it's a distributed dataframe. we should still assign a distributed array to the dataframe. I tried ts_df.assign(test=new_col) also got the same IndexError.

Environments:
-- Python 3.7.6
-- dask 2.9.2

1 Answer 1

1

after trail and error, I found a workaround:

x_y = ts_df.x.astype(str) + 'x' + ts_df.y.astype(str)
ts_df['test'] = x_y.mask((ts_df.x >= 0) | (ts_df.y >= 0), 'N/A')

also based on the discussion here, it seems there's an issue with the dask array returned by da.where: dask.array<where, shape=(), dtype=int64, chunksize=(), chunktype=numpy.ndarray>. Given dask series input, the shape and chunksize are empty.
If input is dask array it works.

new_col = da.where(
    ((ts_df.x >= 0) | (ts_df.y >= 0)).values,
   'N/A',
    ts_df.x.astype(str) + 'x' + ts_df.y.astype(str)
)
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.