polars_arrow/io/ipc/write/serialize/
mod.rs

1#![allow(clippy::ptr_arg)] // false positive in clippy, see https://github.com/rust-lang/rust-clippy/issues/8463
2use arrow_format::ipc;
3
4use super::super::compression;
5use super::super::endianness::is_native_little_endian;
6use super::common::{pad_to_64, Compression};
7use crate::array::*;
8use crate::bitmap::Bitmap;
9use crate::datatypes::PhysicalType;
10use crate::offset::{Offset, OffsetsBuffer};
11use crate::trusted_len::TrustedLen;
12use crate::types::NativeType;
13use crate::{match_integer_type, with_match_primitive_type_full};
14mod binary;
15mod binview;
16mod boolean;
17mod dictionary;
18mod fixed_size_binary;
19mod fixed_sized_list;
20mod list;
21mod map;
22mod primitive;
23mod struct_;
24mod union;
25
26use binary::*;
27use binview::*;
28use boolean::*;
29pub(super) use dictionary::*;
30use fixed_size_binary::*;
31use fixed_sized_list::*;
32use list::*;
33use map::*;
34use primitive::*;
35use struct_::*;
36use union::*;
37
38/// Writes an [`Array`] to `arrow_data`
39pub fn write(
40    array: &dyn Array,
41    buffers: &mut Vec<ipc::Buffer>,
42    arrow_data: &mut Vec<u8>,
43    nodes: &mut Vec<ipc::FieldNode>,
44    offset: &mut i64,
45    is_little_endian: bool,
46    compression: Option<Compression>,
47) {
48    nodes.push(ipc::FieldNode {
49        length: array.len() as i64,
50        null_count: array.null_count() as i64,
51    });
52    use PhysicalType::*;
53    match array.dtype().to_physical_type() {
54        Null => (),
55        Boolean => write_boolean(
56            array.as_any().downcast_ref().unwrap(),
57            buffers,
58            arrow_data,
59            offset,
60            is_little_endian,
61            compression,
62        ),
63        Primitive(primitive) => with_match_primitive_type_full!(primitive, |$T| {
64            let array = array.as_any().downcast_ref().unwrap();
65            write_primitive::<$T>(array, buffers, arrow_data, offset, is_little_endian, compression)
66        }),
67        Binary => write_binary::<i32>(
68            array.as_any().downcast_ref().unwrap(),
69            buffers,
70            arrow_data,
71            offset,
72            is_little_endian,
73            compression,
74        ),
75        LargeBinary => write_binary::<i64>(
76            array.as_any().downcast_ref().unwrap(),
77            buffers,
78            arrow_data,
79            offset,
80            is_little_endian,
81            compression,
82        ),
83        FixedSizeBinary => write_fixed_size_binary(
84            array.as_any().downcast_ref().unwrap(),
85            buffers,
86            arrow_data,
87            offset,
88            is_little_endian,
89            compression,
90        ),
91        Utf8 => write_utf8::<i32>(
92            array.as_any().downcast_ref().unwrap(),
93            buffers,
94            arrow_data,
95            offset,
96            is_little_endian,
97            compression,
98        ),
99        LargeUtf8 => write_utf8::<i64>(
100            array.as_any().downcast_ref().unwrap(),
101            buffers,
102            arrow_data,
103            offset,
104            is_little_endian,
105            compression,
106        ),
107        List => write_list::<i32>(
108            array.as_any().downcast_ref().unwrap(),
109            buffers,
110            arrow_data,
111            nodes,
112            offset,
113            is_little_endian,
114            compression,
115        ),
116        LargeList => write_list::<i64>(
117            array.as_any().downcast_ref().unwrap(),
118            buffers,
119            arrow_data,
120            nodes,
121            offset,
122            is_little_endian,
123            compression,
124        ),
125        FixedSizeList => write_fixed_size_list(
126            array.as_any().downcast_ref().unwrap(),
127            buffers,
128            arrow_data,
129            nodes,
130            offset,
131            is_little_endian,
132            compression,
133        ),
134        Struct => write_struct(
135            array.as_any().downcast_ref().unwrap(),
136            buffers,
137            arrow_data,
138            nodes,
139            offset,
140            is_little_endian,
141            compression,
142        ),
143        Dictionary(key_type) => match_integer_type!(key_type, |$T| {
144            write_dictionary::<$T>(
145                array.as_any().downcast_ref().unwrap(),
146                buffers,
147                arrow_data,
148                nodes,
149                offset,
150                is_little_endian,
151                compression,
152                true,
153            );
154        }),
155        Union => {
156            write_union(
157                array.as_any().downcast_ref().unwrap(),
158                buffers,
159                arrow_data,
160                nodes,
161                offset,
162                is_little_endian,
163                compression,
164            );
165        },
166        Map => {
167            write_map(
168                array.as_any().downcast_ref().unwrap(),
169                buffers,
170                arrow_data,
171                nodes,
172                offset,
173                is_little_endian,
174                compression,
175            );
176        },
177        Utf8View => write_binview(
178            array.as_any().downcast_ref::<Utf8ViewArray>().unwrap(),
179            buffers,
180            arrow_data,
181            offset,
182            is_little_endian,
183            compression,
184        ),
185        BinaryView => write_binview(
186            array.as_any().downcast_ref::<BinaryViewArray>().unwrap(),
187            buffers,
188            arrow_data,
189            offset,
190            is_little_endian,
191            compression,
192        ),
193    }
194}
195
196#[inline]
197fn pad_buffer_to_64(buffer: &mut Vec<u8>, length: usize) {
198    let pad_len = pad_to_64(length);
199    for _ in 0..pad_len {
200        buffer.push(0u8);
201    }
202}
203
204/// writes `bytes` to `arrow_data` updating `buffers` and `offset` and guaranteeing a 8 byte boundary.
205fn write_bytes(
206    bytes: &[u8],
207    buffers: &mut Vec<ipc::Buffer>,
208    arrow_data: &mut Vec<u8>,
209    offset: &mut i64,
210    compression: Option<Compression>,
211) {
212    let start = arrow_data.len();
213    if let Some(compression) = compression {
214        arrow_data.extend_from_slice(&(bytes.len() as i64).to_le_bytes());
215        match compression {
216            Compression::LZ4 => {
217                compression::compress_lz4(bytes, arrow_data).unwrap();
218            },
219            Compression::ZSTD => {
220                compression::compress_zstd(bytes, arrow_data).unwrap();
221            },
222        }
223    } else {
224        arrow_data.extend_from_slice(bytes);
225    };
226
227    buffers.push(finish_buffer(arrow_data, start, offset));
228}
229
230fn write_bitmap(
231    bitmap: Option<&Bitmap>,
232    length: usize,
233    buffers: &mut Vec<ipc::Buffer>,
234    arrow_data: &mut Vec<u8>,
235    offset: &mut i64,
236    compression: Option<Compression>,
237) {
238    match bitmap {
239        Some(bitmap) => {
240            assert_eq!(bitmap.len(), length);
241            let (slice, slice_offset, _) = bitmap.as_slice();
242            if slice_offset != 0 {
243                // case where we can't slice the bitmap as the offsets are not multiple of 8
244                let bytes = Bitmap::from_trusted_len_iter(bitmap.iter());
245                let (slice, _, _) = bytes.as_slice();
246                write_bytes(slice, buffers, arrow_data, offset, compression)
247            } else {
248                write_bytes(slice, buffers, arrow_data, offset, compression)
249            }
250        },
251        None => {
252            buffers.push(ipc::Buffer {
253                offset: *offset,
254                length: 0,
255            });
256        },
257    }
258}
259
260/// writes `bytes` to `arrow_data` updating `buffers` and `offset` and guaranteeing a 8 byte boundary.
261fn write_buffer<T: NativeType>(
262    buffer: &[T],
263    buffers: &mut Vec<ipc::Buffer>,
264    arrow_data: &mut Vec<u8>,
265    offset: &mut i64,
266    is_little_endian: bool,
267    compression: Option<Compression>,
268) {
269    let start = arrow_data.len();
270    if let Some(compression) = compression {
271        _write_compressed_buffer(buffer, arrow_data, is_little_endian, compression);
272    } else {
273        _write_buffer(buffer, arrow_data, is_little_endian);
274    };
275
276    buffers.push(finish_buffer(arrow_data, start, offset));
277}
278
279#[inline]
280fn _write_buffer_from_iter<T: NativeType, I: TrustedLen<Item = T>>(
281    buffer: I,
282    arrow_data: &mut Vec<u8>,
283    is_little_endian: bool,
284) {
285    let len = buffer.size_hint().0;
286    arrow_data.reserve(len * size_of::<T>());
287    if is_little_endian {
288        buffer
289            .map(|x| T::to_le_bytes(&x))
290            .for_each(|x| arrow_data.extend_from_slice(x.as_ref()))
291    } else {
292        buffer
293            .map(|x| T::to_be_bytes(&x))
294            .for_each(|x| arrow_data.extend_from_slice(x.as_ref()))
295    }
296}
297
298#[inline]
299fn _write_compressed_buffer_from_iter<T: NativeType, I: TrustedLen<Item = T>>(
300    buffer: I,
301    arrow_data: &mut Vec<u8>,
302    is_little_endian: bool,
303    compression: Compression,
304) {
305    let len = buffer.size_hint().0;
306    let mut swapped = Vec::with_capacity(len * size_of::<T>());
307    if is_little_endian {
308        buffer
309            .map(|x| T::to_le_bytes(&x))
310            .for_each(|x| swapped.extend_from_slice(x.as_ref()));
311    } else {
312        buffer
313            .map(|x| T::to_be_bytes(&x))
314            .for_each(|x| swapped.extend_from_slice(x.as_ref()))
315    };
316    arrow_data.extend_from_slice(&(swapped.len() as i64).to_le_bytes());
317    match compression {
318        Compression::LZ4 => {
319            compression::compress_lz4(&swapped, arrow_data).unwrap();
320        },
321        Compression::ZSTD => {
322            compression::compress_zstd(&swapped, arrow_data).unwrap();
323        },
324    }
325}
326
327fn _write_buffer<T: NativeType>(buffer: &[T], arrow_data: &mut Vec<u8>, is_little_endian: bool) {
328    if is_little_endian == is_native_little_endian() {
329        // in native endianness we can use the bytes directly.
330        let buffer = bytemuck::cast_slice(buffer);
331        arrow_data.extend_from_slice(buffer);
332    } else {
333        _write_buffer_from_iter(buffer.iter().copied(), arrow_data, is_little_endian)
334    }
335}
336
337fn _write_compressed_buffer<T: NativeType>(
338    buffer: &[T],
339    arrow_data: &mut Vec<u8>,
340    is_little_endian: bool,
341    compression: Compression,
342) {
343    if is_little_endian == is_native_little_endian() {
344        let bytes = bytemuck::cast_slice(buffer);
345        arrow_data.extend_from_slice(&(bytes.len() as i64).to_le_bytes());
346        match compression {
347            Compression::LZ4 => {
348                compression::compress_lz4(bytes, arrow_data).unwrap();
349            },
350            Compression::ZSTD => {
351                compression::compress_zstd(bytes, arrow_data).unwrap();
352            },
353        }
354    } else {
355        todo!()
356    }
357}
358
359/// writes `bytes` to `arrow_data` updating `buffers` and `offset` and guaranteeing a 8 byte boundary.
360#[inline]
361fn write_buffer_from_iter<T: NativeType, I: TrustedLen<Item = T>>(
362    buffer: I,
363    buffers: &mut Vec<ipc::Buffer>,
364    arrow_data: &mut Vec<u8>,
365    offset: &mut i64,
366    is_little_endian: bool,
367    compression: Option<Compression>,
368) {
369    let start = arrow_data.len();
370
371    if let Some(compression) = compression {
372        _write_compressed_buffer_from_iter(buffer, arrow_data, is_little_endian, compression);
373    } else {
374        _write_buffer_from_iter(buffer, arrow_data, is_little_endian);
375    }
376
377    buffers.push(finish_buffer(arrow_data, start, offset));
378}
379
380fn finish_buffer(arrow_data: &mut Vec<u8>, start: usize, offset: &mut i64) -> ipc::Buffer {
381    let buffer_len = (arrow_data.len() - start) as i64;
382
383    pad_buffer_to_64(arrow_data, arrow_data.len() - start);
384    let total_len = (arrow_data.len() - start) as i64;
385
386    let buffer = ipc::Buffer {
387        offset: *offset,
388        length: buffer_len,
389    };
390    *offset += total_len;
391    buffer
392}