1use arrow::bitmap::{Bitmap, BitmapBuilder};
2use arrow::buffer::Buffer;
3use arrow::datatypes::ArrowDataType;
4use arrow::offset::OffsetsBuffer;
5
6use self::encode::fixed_size;
7use self::row::{RowEncodingCategoricalContext, RowEncodingOptions};
8use self::variable::utf8::decode_str;
9use super::*;
10use crate::fixed::{boolean, decimal, numeric, packed_u32};
11use crate::variable::{binary, no_order, utf8};
12
13pub unsafe fn decode_rows_from_binary<'a>(
18 arr: &'a BinaryArray<i64>,
19 opts: &[RowEncodingOptions],
20 dicts: &[Option<RowEncodingContext>],
21 dtypes: &[ArrowDataType],
22 rows: &mut Vec<&'a [u8]>,
23) -> Vec<ArrayRef> {
24 assert_eq!(arr.null_count(), 0);
25 rows.clear();
26 rows.extend(arr.values_iter());
27 decode_rows(rows, opts, dicts, dtypes)
28}
29
30pub unsafe fn decode_rows(
35 rows: &mut [&[u8]],
37 opts: &[RowEncodingOptions],
38 dicts: &[Option<RowEncodingContext>],
39 dtypes: &[ArrowDataType],
40) -> Vec<ArrayRef> {
41 assert_eq!(opts.len(), dtypes.len());
42 assert_eq!(dicts.len(), dtypes.len());
43
44 dtypes
45 .iter()
46 .zip(opts)
47 .zip(dicts)
48 .map(|((dtype, opt), dict)| decode(rows, *opt, dict.as_ref(), dtype))
49 .collect()
50}
51
52unsafe fn decode_validity(rows: &mut [&[u8]], opt: RowEncodingOptions) -> Option<Bitmap> {
53 let null_sentinel = opt.null_sentinel();
56 let first_null = (0..rows.len()).find(|&i| {
57 let v;
58 (v, rows[i]) = rows[i].split_at_unchecked(1);
59 v[0] == null_sentinel
60 });
61
62 let first_null = first_null?;
64
65 let mut bm = BitmapBuilder::new();
66 bm.reserve(rows.len());
67 bm.extend_constant(first_null, true);
68 bm.push(false);
69 bm.extend_trusted_len_iter(rows[first_null + 1..].iter_mut().map(|row| {
70 let v;
71 (v, *row) = row.split_at_unchecked(1);
72 v[0] != null_sentinel
73 }));
74 bm.into_opt_validity()
75}
76
77#[inline(always)]
79fn dtype_and_data_to_encoded_item_len(
80 dtype: &ArrowDataType,
81 data: &[u8],
82 opt: RowEncodingOptions,
83 dict: Option<&RowEncodingContext>,
84) -> usize {
85 if let Some(size) = fixed_size(dtype, dict) {
87 return size;
88 }
89
90 use ArrowDataType as D;
91 match dtype {
92 D::Binary | D::LargeBinary | D::BinaryView | D::Utf8 | D::LargeUtf8 | D::Utf8View
93 if opt.contains(RowEncodingOptions::NO_ORDER) =>
94 unsafe { no_order::len_from_buffer(data, opt) },
95 D::Binary | D::LargeBinary | D::BinaryView => unsafe {
96 binary::encoded_item_len(data, opt)
97 },
98 D::Utf8 | D::LargeUtf8 | D::Utf8View => unsafe { utf8::len_from_buffer(data, opt) },
99
100 D::List(list_field) | D::LargeList(list_field) => {
101 let mut data = data;
102 let mut item_len = 0;
103
104 let list_continuation_token = opt.list_continuation_token();
105
106 while data[0] == list_continuation_token {
107 data = &data[1..];
108 let len = dtype_and_data_to_encoded_item_len(list_field.dtype(), data, opt, dict);
109 data = &data[len..];
110 item_len += 1 + len;
111 }
112 1 + item_len
113 },
114
115 D::FixedSizeBinary(_) => todo!(),
116 D::FixedSizeList(fsl_field, width) => {
117 let mut data = &data[1..];
118 let mut item_len = 1; for _ in 0..*width {
121 let len = dtype_and_data_to_encoded_item_len(fsl_field.dtype(), data, opt, dict);
122 data = &data[len..];
123 item_len += len;
124 }
125 item_len
126 },
127 D::Struct(struct_fields) => {
128 let mut data = &data[1..];
129 let mut item_len = 1; for struct_field in struct_fields {
132 let len = dtype_and_data_to_encoded_item_len(struct_field.dtype(), data, opt, dict);
133 data = &data[len..];
134 item_len += len;
135 }
136 item_len
137 },
138
139 D::Union(_) => todo!(),
140 D::Map(_, _) => todo!(),
141 D::Decimal256(_, _) => todo!(),
142 D::Extension(_) => todo!(),
143 D::Unknown => todo!(),
144
145 _ => unreachable!(),
146 }
147}
148
149fn rows_for_fixed_size_list<'a>(
150 dtype: &ArrowDataType,
151 opt: RowEncodingOptions,
152 dict: Option<&RowEncodingContext>,
153 width: usize,
154 rows: &mut [&'a [u8]],
155 nested_rows: &mut Vec<&'a [u8]>,
156) {
157 nested_rows.clear();
158 nested_rows.reserve(rows.len() * width);
159
160 if let Some(size) = fixed_size(dtype, dict) {
162 for row in rows.iter_mut() {
163 for i in 0..width {
164 nested_rows.push(&row[(i * size)..][..size]);
165 }
166 *row = &row[size * width..];
167 }
168 return;
169 }
170
171 for row in rows.iter_mut() {
173 for _ in 0..width {
174 let length = dtype_and_data_to_encoded_item_len(dtype, row, opt, dict);
175 let v;
176 (v, *row) = row.split_at(length);
177 nested_rows.push(v);
178 }
179 }
180}
181
182unsafe fn decode_lexical_cat(
183 rows: &mut [&[u8]],
184 opt: RowEncodingOptions,
185 _values: &RowEncodingCategoricalContext,
186) -> PrimitiveArray<u32> {
187 let mut s = numeric::decode_primitive::<u32>(rows, opt);
188 numeric::decode_primitive::<u32>(rows, opt).with_validity(s.take_validity())
189}
190
191unsafe fn decode(
192 rows: &mut [&[u8]],
193 opt: RowEncodingOptions,
194 dict: Option<&RowEncodingContext>,
195 dtype: &ArrowDataType,
196) -> ArrayRef {
197 use ArrowDataType as D;
198 match dtype {
199 D::Null => NullArray::new(D::Null, rows.len()).to_boxed(),
200 D::Boolean => boolean::decode_bool(rows, opt).to_boxed(),
201 D::Binary | D::LargeBinary | D::BinaryView | D::Utf8 | D::LargeUtf8 | D::Utf8View
202 if opt.contains(RowEncodingOptions::NO_ORDER) =>
203 {
204 let array = no_order::decode_variable_no_order(rows, opt);
205
206 if matches!(dtype, D::Utf8 | D::LargeUtf8 | D::Utf8View) {
207 unsafe { array.to_utf8view_unchecked() }.to_boxed()
208 } else {
209 array.to_boxed()
210 }
211 },
212 D::Binary | D::LargeBinary | D::BinaryView => binary::decode_binview(rows, opt).to_boxed(),
213 D::Utf8 | D::LargeUtf8 | D::Utf8View => decode_str(rows, opt).boxed(),
214
215 D::Struct(fields) => {
216 let validity = decode_validity(rows, opt);
217
218 let values = match dict {
219 None => fields
220 .iter()
221 .map(|struct_fld| decode(rows, opt, None, struct_fld.dtype()))
222 .collect(),
223 Some(RowEncodingContext::Struct(dicts)) => fields
224 .iter()
225 .zip(dicts)
226 .map(|(struct_fld, dict)| decode(rows, opt, dict.as_ref(), struct_fld.dtype()))
227 .collect(),
228 _ => unreachable!(),
229 };
230 StructArray::new(dtype.clone(), rows.len(), values, validity).to_boxed()
231 },
232 D::FixedSizeList(fsl_field, width) => {
233 let validity = decode_validity(rows, opt);
234
235 let mut nested_rows = Vec::new();
237 rows_for_fixed_size_list(fsl_field.dtype(), opt, dict, *width, rows, &mut nested_rows);
238 let values = decode(&mut nested_rows, opt, dict, fsl_field.dtype());
239
240 FixedSizeListArray::new(dtype.clone(), rows.len(), values, validity).to_boxed()
241 },
242 D::List(list_field) | D::LargeList(list_field) => {
243 let mut validity = BitmapBuilder::new();
244
245 let num_rows = rows.len();
247 let mut nested_rows = Vec::new();
248 let mut offsets = Vec::with_capacity(rows.len() + 1);
249 offsets.push(0);
250
251 let list_null_sentinel = opt.list_null_sentinel();
252 let list_continuation_token = opt.list_continuation_token();
253 let list_termination_token = opt.list_termination_token();
254
255 for (i, row) in rows.iter_mut().enumerate() {
257 while row[0] == list_continuation_token {
258 *row = &row[1..];
259 let len =
260 dtype_and_data_to_encoded_item_len(list_field.dtype(), row, opt, dict);
261 nested_rows.push(&row[..len]);
262 *row = &row[len..];
263 }
264
265 offsets.push(nested_rows.len() as i64);
266
267 if row[0] == list_null_sentinel {
269 *row = &row[1..];
270 validity.reserve(num_rows);
271 validity.extend_constant(i - validity.len(), true);
272 validity.push(false);
273 continue;
274 }
275
276 assert_eq!(row[0], list_termination_token);
277 *row = &row[1..];
278 }
279
280 let validity = if validity.is_empty() {
281 None
282 } else {
283 validity.extend_constant(num_rows - validity.len(), true);
284 validity.into_opt_validity()
285 };
286 assert_eq!(offsets.len(), rows.len() + 1);
287
288 let values = decode(&mut nested_rows, opt, dict, list_field.dtype());
289
290 ListArray::<i64>::new(
291 dtype.clone(),
292 unsafe { OffsetsBuffer::new_unchecked(Buffer::from(offsets)) },
293 values,
294 validity,
295 )
296 .to_boxed()
297 },
298
299 dt => {
300 if matches!(dt, D::UInt32) {
301 if let Some(dict) = dict {
302 return match dict {
303 RowEncodingContext::Categorical(ctx) => {
304 if ctx.is_enum {
305 packed_u32::decode(rows, opt, ctx.needed_num_bits()).to_boxed()
306 } else if ctx.lexical_sort_idxs.is_none() {
307 numeric::decode_primitive::<u32>(rows, opt).to_boxed()
308 } else {
309 decode_lexical_cat(rows, opt, ctx).to_boxed()
310 }
311 },
312 _ => unreachable!(),
313 };
314 }
315 }
316
317 if matches!(dt, D::Int128) {
318 if let Some(dict) = dict {
319 return match dict {
320 RowEncodingContext::Decimal(precision) => {
321 decimal::decode(rows, opt, *precision).to_boxed()
322 },
323 _ => unreachable!(),
324 };
325 }
326 }
327
328 with_match_arrow_primitive_type!(dt, |$T| {
329 numeric::decode_primitive::<$T>(rows, opt).to_boxed()
330 })
331 },
332 }
333}