polars_row/
decode.rs

1use arrow::bitmap::{Bitmap, BitmapBuilder};
2use arrow::buffer::Buffer;
3use arrow::datatypes::ArrowDataType;
4use arrow::offset::OffsetsBuffer;
5
6use self::encode::fixed_size;
7use self::row::{RowEncodingCategoricalContext, RowEncodingOptions};
8use self::variable::utf8::decode_str;
9use super::*;
10use crate::fixed::{boolean, decimal, numeric, packed_u32};
11use crate::variable::{binary, no_order, utf8};
12
13/// Decode `rows` into a arrow format
14/// # Safety
15/// This will not do any bound checks. Caller must ensure the `rows` are valid
16/// encodings.
17pub unsafe fn decode_rows_from_binary<'a>(
18    arr: &'a BinaryArray<i64>,
19    opts: &[RowEncodingOptions],
20    dicts: &[Option<RowEncodingContext>],
21    dtypes: &[ArrowDataType],
22    rows: &mut Vec<&'a [u8]>,
23) -> Vec<ArrayRef> {
24    assert_eq!(arr.null_count(), 0);
25    rows.clear();
26    rows.extend(arr.values_iter());
27    decode_rows(rows, opts, dicts, dtypes)
28}
29
30/// Decode `rows` into a arrow format
31/// # Safety
32/// This will not do any bound checks. Caller must ensure the `rows` are valid
33/// encodings.
34pub unsafe fn decode_rows(
35    // the rows will be updated while the data is decoded
36    rows: &mut [&[u8]],
37    opts: &[RowEncodingOptions],
38    dicts: &[Option<RowEncodingContext>],
39    dtypes: &[ArrowDataType],
40) -> Vec<ArrayRef> {
41    assert_eq!(opts.len(), dtypes.len());
42    assert_eq!(dicts.len(), dtypes.len());
43
44    dtypes
45        .iter()
46        .zip(opts)
47        .zip(dicts)
48        .map(|((dtype, opt), dict)| decode(rows, *opt, dict.as_ref(), dtype))
49        .collect()
50}
51
52unsafe fn decode_validity(rows: &mut [&[u8]], opt: RowEncodingOptions) -> Option<Bitmap> {
53    // 2 loop system to avoid the overhead of allocating the bitmap if all the elements are valid.
54
55    let null_sentinel = opt.null_sentinel();
56    let first_null = (0..rows.len()).find(|&i| {
57        let v;
58        (v, rows[i]) = rows[i].split_at_unchecked(1);
59        v[0] == null_sentinel
60    });
61
62    // No nulls just return None
63    let first_null = first_null?;
64
65    let mut bm = BitmapBuilder::new();
66    bm.reserve(rows.len());
67    bm.extend_constant(first_null, true);
68    bm.push(false);
69    bm.extend_trusted_len_iter(rows[first_null + 1..].iter_mut().map(|row| {
70        let v;
71        (v, *row) = row.split_at_unchecked(1);
72        v[0] != null_sentinel
73    }));
74    bm.into_opt_validity()
75}
76
77// We inline this in an attempt to avoid the dispatch cost.
78#[inline(always)]
79fn dtype_and_data_to_encoded_item_len(
80    dtype: &ArrowDataType,
81    data: &[u8],
82    opt: RowEncodingOptions,
83    dict: Option<&RowEncodingContext>,
84) -> usize {
85    // Fast path: if the size is fixed, we can just divide.
86    if let Some(size) = fixed_size(dtype, dict) {
87        return size;
88    }
89
90    use ArrowDataType as D;
91    match dtype {
92        D::Binary | D::LargeBinary | D::BinaryView | D::Utf8 | D::LargeUtf8 | D::Utf8View
93            if opt.contains(RowEncodingOptions::NO_ORDER) =>
94        unsafe { no_order::len_from_buffer(data, opt) },
95        D::Binary | D::LargeBinary | D::BinaryView => unsafe {
96            binary::encoded_item_len(data, opt)
97        },
98        D::Utf8 | D::LargeUtf8 | D::Utf8View => unsafe { utf8::len_from_buffer(data, opt) },
99
100        D::List(list_field) | D::LargeList(list_field) => {
101            let mut data = data;
102            let mut item_len = 0;
103
104            let list_continuation_token = opt.list_continuation_token();
105
106            while data[0] == list_continuation_token {
107                data = &data[1..];
108                let len = dtype_and_data_to_encoded_item_len(list_field.dtype(), data, opt, dict);
109                data = &data[len..];
110                item_len += 1 + len;
111            }
112            1 + item_len
113        },
114
115        D::FixedSizeBinary(_) => todo!(),
116        D::FixedSizeList(fsl_field, width) => {
117            let mut data = &data[1..];
118            let mut item_len = 1; // validity byte
119
120            for _ in 0..*width {
121                let len = dtype_and_data_to_encoded_item_len(fsl_field.dtype(), data, opt, dict);
122                data = &data[len..];
123                item_len += len;
124            }
125            item_len
126        },
127        D::Struct(struct_fields) => {
128            let mut data = &data[1..];
129            let mut item_len = 1; // validity byte
130
131            for struct_field in struct_fields {
132                let len = dtype_and_data_to_encoded_item_len(struct_field.dtype(), data, opt, dict);
133                data = &data[len..];
134                item_len += len;
135            }
136            item_len
137        },
138
139        D::Union(_) => todo!(),
140        D::Map(_, _) => todo!(),
141        D::Decimal256(_, _) => todo!(),
142        D::Extension(_) => todo!(),
143        D::Unknown => todo!(),
144
145        _ => unreachable!(),
146    }
147}
148
149fn rows_for_fixed_size_list<'a>(
150    dtype: &ArrowDataType,
151    opt: RowEncodingOptions,
152    dict: Option<&RowEncodingContext>,
153    width: usize,
154    rows: &mut [&'a [u8]],
155    nested_rows: &mut Vec<&'a [u8]>,
156) {
157    nested_rows.clear();
158    nested_rows.reserve(rows.len() * width);
159
160    // Fast path: if the size is fixed, we can just divide.
161    if let Some(size) = fixed_size(dtype, dict) {
162        for row in rows.iter_mut() {
163            for i in 0..width {
164                nested_rows.push(&row[(i * size)..][..size]);
165            }
166            *row = &row[size * width..];
167        }
168        return;
169    }
170
171    // @TODO: This is quite slow since we need to dispatch for possibly every nested type
172    for row in rows.iter_mut() {
173        for _ in 0..width {
174            let length = dtype_and_data_to_encoded_item_len(dtype, row, opt, dict);
175            let v;
176            (v, *row) = row.split_at(length);
177            nested_rows.push(v);
178        }
179    }
180}
181
182unsafe fn decode_lexical_cat(
183    rows: &mut [&[u8]],
184    opt: RowEncodingOptions,
185    _values: &RowEncodingCategoricalContext,
186) -> PrimitiveArray<u32> {
187    let mut s = numeric::decode_primitive::<u32>(rows, opt);
188    numeric::decode_primitive::<u32>(rows, opt).with_validity(s.take_validity())
189}
190
191unsafe fn decode(
192    rows: &mut [&[u8]],
193    opt: RowEncodingOptions,
194    dict: Option<&RowEncodingContext>,
195    dtype: &ArrowDataType,
196) -> ArrayRef {
197    use ArrowDataType as D;
198    match dtype {
199        D::Null => NullArray::new(D::Null, rows.len()).to_boxed(),
200        D::Boolean => boolean::decode_bool(rows, opt).to_boxed(),
201        D::Binary | D::LargeBinary | D::BinaryView | D::Utf8 | D::LargeUtf8 | D::Utf8View
202            if opt.contains(RowEncodingOptions::NO_ORDER) =>
203        {
204            let array = no_order::decode_variable_no_order(rows, opt);
205
206            if matches!(dtype, D::Utf8 | D::LargeUtf8 | D::Utf8View) {
207                unsafe { array.to_utf8view_unchecked() }.to_boxed()
208            } else {
209                array.to_boxed()
210            }
211        },
212        D::Binary | D::LargeBinary | D::BinaryView => binary::decode_binview(rows, opt).to_boxed(),
213        D::Utf8 | D::LargeUtf8 | D::Utf8View => decode_str(rows, opt).boxed(),
214
215        D::Struct(fields) => {
216            let validity = decode_validity(rows, opt);
217
218            let values = match dict {
219                None => fields
220                    .iter()
221                    .map(|struct_fld| decode(rows, opt, None, struct_fld.dtype()))
222                    .collect(),
223                Some(RowEncodingContext::Struct(dicts)) => fields
224                    .iter()
225                    .zip(dicts)
226                    .map(|(struct_fld, dict)| decode(rows, opt, dict.as_ref(), struct_fld.dtype()))
227                    .collect(),
228                _ => unreachable!(),
229            };
230            StructArray::new(dtype.clone(), rows.len(), values, validity).to_boxed()
231        },
232        D::FixedSizeList(fsl_field, width) => {
233            let validity = decode_validity(rows, opt);
234
235            // @TODO: we could consider making this into a scratchpad
236            let mut nested_rows = Vec::new();
237            rows_for_fixed_size_list(fsl_field.dtype(), opt, dict, *width, rows, &mut nested_rows);
238            let values = decode(&mut nested_rows, opt, dict, fsl_field.dtype());
239
240            FixedSizeListArray::new(dtype.clone(), rows.len(), values, validity).to_boxed()
241        },
242        D::List(list_field) | D::LargeList(list_field) => {
243            let mut validity = BitmapBuilder::new();
244
245            // @TODO: we could consider making this into a scratchpad
246            let num_rows = rows.len();
247            let mut nested_rows = Vec::new();
248            let mut offsets = Vec::with_capacity(rows.len() + 1);
249            offsets.push(0);
250
251            let list_null_sentinel = opt.list_null_sentinel();
252            let list_continuation_token = opt.list_continuation_token();
253            let list_termination_token = opt.list_termination_token();
254
255            // @TODO: make a specialized loop for fixed size list_field.dtype()
256            for (i, row) in rows.iter_mut().enumerate() {
257                while row[0] == list_continuation_token {
258                    *row = &row[1..];
259                    let len =
260                        dtype_and_data_to_encoded_item_len(list_field.dtype(), row, opt, dict);
261                    nested_rows.push(&row[..len]);
262                    *row = &row[len..];
263                }
264
265                offsets.push(nested_rows.len() as i64);
266
267                // @TODO: Might be better to make this a 2-loop system.
268                if row[0] == list_null_sentinel {
269                    *row = &row[1..];
270                    validity.reserve(num_rows);
271                    validity.extend_constant(i - validity.len(), true);
272                    validity.push(false);
273                    continue;
274                }
275
276                assert_eq!(row[0], list_termination_token);
277                *row = &row[1..];
278            }
279
280            let validity = if validity.is_empty() {
281                None
282            } else {
283                validity.extend_constant(num_rows - validity.len(), true);
284                validity.into_opt_validity()
285            };
286            assert_eq!(offsets.len(), rows.len() + 1);
287
288            let values = decode(&mut nested_rows, opt, dict, list_field.dtype());
289
290            ListArray::<i64>::new(
291                dtype.clone(),
292                unsafe { OffsetsBuffer::new_unchecked(Buffer::from(offsets)) },
293                values,
294                validity,
295            )
296            .to_boxed()
297        },
298
299        dt => {
300            if matches!(dt, D::UInt32) {
301                if let Some(dict) = dict {
302                    return match dict {
303                        RowEncodingContext::Categorical(ctx) => {
304                            if ctx.is_enum {
305                                packed_u32::decode(rows, opt, ctx.needed_num_bits()).to_boxed()
306                            } else if ctx.lexical_sort_idxs.is_none() {
307                                numeric::decode_primitive::<u32>(rows, opt).to_boxed()
308                            } else {
309                                decode_lexical_cat(rows, opt, ctx).to_boxed()
310                            }
311                        },
312                        _ => unreachable!(),
313                    };
314                }
315            }
316
317            if matches!(dt, D::Int128) {
318                if let Some(dict) = dict {
319                    return match dict {
320                        RowEncodingContext::Decimal(precision) => {
321                            decimal::decode(rows, opt, *precision).to_boxed()
322                        },
323                        _ => unreachable!(),
324                    };
325                }
326            }
327
328            with_match_arrow_primitive_type!(dt, |$T| {
329                numeric::decode_primitive::<$T>(rows, opt).to_boxed()
330            })
331        },
332    }
333}