1#![allow(clippy::ptr_arg)] use arrow_format::ipc;
3
4use super::super::compression;
5use super::super::endianness::is_native_little_endian;
6use super::common::{pad_to_64, Compression};
7use crate::array::*;
8use crate::bitmap::Bitmap;
9use crate::datatypes::PhysicalType;
10use crate::offset::{Offset, OffsetsBuffer};
11use crate::trusted_len::TrustedLen;
12use crate::types::NativeType;
13use crate::{match_integer_type, with_match_primitive_type_full};
14mod binary;
15mod binview;
16mod boolean;
17mod dictionary;
18mod fixed_size_binary;
19mod fixed_sized_list;
20mod list;
21mod map;
22mod primitive;
23mod struct_;
24mod union;
25
26use binary::*;
27use binview::*;
28use boolean::*;
29pub(super) use dictionary::*;
30use fixed_size_binary::*;
31use fixed_sized_list::*;
32use list::*;
33use map::*;
34use primitive::*;
35use struct_::*;
36use union::*;
37
38pub fn write(
40 array: &dyn Array,
41 buffers: &mut Vec<ipc::Buffer>,
42 arrow_data: &mut Vec<u8>,
43 nodes: &mut Vec<ipc::FieldNode>,
44 offset: &mut i64,
45 is_little_endian: bool,
46 compression: Option<Compression>,
47) {
48 nodes.push(ipc::FieldNode {
49 length: array.len() as i64,
50 null_count: array.null_count() as i64,
51 });
52 use PhysicalType::*;
53 match array.dtype().to_physical_type() {
54 Null => (),
55 Boolean => write_boolean(
56 array.as_any().downcast_ref().unwrap(),
57 buffers,
58 arrow_data,
59 offset,
60 is_little_endian,
61 compression,
62 ),
63 Primitive(primitive) => with_match_primitive_type_full!(primitive, |$T| {
64 let array = array.as_any().downcast_ref().unwrap();
65 write_primitive::<$T>(array, buffers, arrow_data, offset, is_little_endian, compression)
66 }),
67 Binary => write_binary::<i32>(
68 array.as_any().downcast_ref().unwrap(),
69 buffers,
70 arrow_data,
71 offset,
72 is_little_endian,
73 compression,
74 ),
75 LargeBinary => write_binary::<i64>(
76 array.as_any().downcast_ref().unwrap(),
77 buffers,
78 arrow_data,
79 offset,
80 is_little_endian,
81 compression,
82 ),
83 FixedSizeBinary => write_fixed_size_binary(
84 array.as_any().downcast_ref().unwrap(),
85 buffers,
86 arrow_data,
87 offset,
88 is_little_endian,
89 compression,
90 ),
91 Utf8 => write_utf8::<i32>(
92 array.as_any().downcast_ref().unwrap(),
93 buffers,
94 arrow_data,
95 offset,
96 is_little_endian,
97 compression,
98 ),
99 LargeUtf8 => write_utf8::<i64>(
100 array.as_any().downcast_ref().unwrap(),
101 buffers,
102 arrow_data,
103 offset,
104 is_little_endian,
105 compression,
106 ),
107 List => write_list::<i32>(
108 array.as_any().downcast_ref().unwrap(),
109 buffers,
110 arrow_data,
111 nodes,
112 offset,
113 is_little_endian,
114 compression,
115 ),
116 LargeList => write_list::<i64>(
117 array.as_any().downcast_ref().unwrap(),
118 buffers,
119 arrow_data,
120 nodes,
121 offset,
122 is_little_endian,
123 compression,
124 ),
125 FixedSizeList => write_fixed_size_list(
126 array.as_any().downcast_ref().unwrap(),
127 buffers,
128 arrow_data,
129 nodes,
130 offset,
131 is_little_endian,
132 compression,
133 ),
134 Struct => write_struct(
135 array.as_any().downcast_ref().unwrap(),
136 buffers,
137 arrow_data,
138 nodes,
139 offset,
140 is_little_endian,
141 compression,
142 ),
143 Dictionary(key_type) => match_integer_type!(key_type, |$T| {
144 write_dictionary::<$T>(
145 array.as_any().downcast_ref().unwrap(),
146 buffers,
147 arrow_data,
148 nodes,
149 offset,
150 is_little_endian,
151 compression,
152 true,
153 );
154 }),
155 Union => {
156 write_union(
157 array.as_any().downcast_ref().unwrap(),
158 buffers,
159 arrow_data,
160 nodes,
161 offset,
162 is_little_endian,
163 compression,
164 );
165 },
166 Map => {
167 write_map(
168 array.as_any().downcast_ref().unwrap(),
169 buffers,
170 arrow_data,
171 nodes,
172 offset,
173 is_little_endian,
174 compression,
175 );
176 },
177 Utf8View => write_binview(
178 array.as_any().downcast_ref::<Utf8ViewArray>().unwrap(),
179 buffers,
180 arrow_data,
181 offset,
182 is_little_endian,
183 compression,
184 ),
185 BinaryView => write_binview(
186 array.as_any().downcast_ref::<BinaryViewArray>().unwrap(),
187 buffers,
188 arrow_data,
189 offset,
190 is_little_endian,
191 compression,
192 ),
193 }
194}
195
196#[inline]
197fn pad_buffer_to_64(buffer: &mut Vec<u8>, length: usize) {
198 let pad_len = pad_to_64(length);
199 for _ in 0..pad_len {
200 buffer.push(0u8);
201 }
202}
203
204fn write_bytes(
206 bytes: &[u8],
207 buffers: &mut Vec<ipc::Buffer>,
208 arrow_data: &mut Vec<u8>,
209 offset: &mut i64,
210 compression: Option<Compression>,
211) {
212 let start = arrow_data.len();
213 if let Some(compression) = compression {
214 arrow_data.extend_from_slice(&(bytes.len() as i64).to_le_bytes());
215 match compression {
216 Compression::LZ4 => {
217 compression::compress_lz4(bytes, arrow_data).unwrap();
218 },
219 Compression::ZSTD => {
220 compression::compress_zstd(bytes, arrow_data).unwrap();
221 },
222 }
223 } else {
224 arrow_data.extend_from_slice(bytes);
225 };
226
227 buffers.push(finish_buffer(arrow_data, start, offset));
228}
229
230fn write_bitmap(
231 bitmap: Option<&Bitmap>,
232 length: usize,
233 buffers: &mut Vec<ipc::Buffer>,
234 arrow_data: &mut Vec<u8>,
235 offset: &mut i64,
236 compression: Option<Compression>,
237) {
238 match bitmap {
239 Some(bitmap) => {
240 assert_eq!(bitmap.len(), length);
241 let (slice, slice_offset, _) = bitmap.as_slice();
242 if slice_offset != 0 {
243 let bytes = Bitmap::from_trusted_len_iter(bitmap.iter());
245 let (slice, _, _) = bytes.as_slice();
246 write_bytes(slice, buffers, arrow_data, offset, compression)
247 } else {
248 write_bytes(slice, buffers, arrow_data, offset, compression)
249 }
250 },
251 None => {
252 buffers.push(ipc::Buffer {
253 offset: *offset,
254 length: 0,
255 });
256 },
257 }
258}
259
260fn write_buffer<T: NativeType>(
262 buffer: &[T],
263 buffers: &mut Vec<ipc::Buffer>,
264 arrow_data: &mut Vec<u8>,
265 offset: &mut i64,
266 is_little_endian: bool,
267 compression: Option<Compression>,
268) {
269 let start = arrow_data.len();
270 if let Some(compression) = compression {
271 _write_compressed_buffer(buffer, arrow_data, is_little_endian, compression);
272 } else {
273 _write_buffer(buffer, arrow_data, is_little_endian);
274 };
275
276 buffers.push(finish_buffer(arrow_data, start, offset));
277}
278
279#[inline]
280fn _write_buffer_from_iter<T: NativeType, I: TrustedLen<Item = T>>(
281 buffer: I,
282 arrow_data: &mut Vec<u8>,
283 is_little_endian: bool,
284) {
285 let len = buffer.size_hint().0;
286 arrow_data.reserve(len * size_of::<T>());
287 if is_little_endian {
288 buffer
289 .map(|x| T::to_le_bytes(&x))
290 .for_each(|x| arrow_data.extend_from_slice(x.as_ref()))
291 } else {
292 buffer
293 .map(|x| T::to_be_bytes(&x))
294 .for_each(|x| arrow_data.extend_from_slice(x.as_ref()))
295 }
296}
297
298#[inline]
299fn _write_compressed_buffer_from_iter<T: NativeType, I: TrustedLen<Item = T>>(
300 buffer: I,
301 arrow_data: &mut Vec<u8>,
302 is_little_endian: bool,
303 compression: Compression,
304) {
305 let len = buffer.size_hint().0;
306 let mut swapped = Vec::with_capacity(len * size_of::<T>());
307 if is_little_endian {
308 buffer
309 .map(|x| T::to_le_bytes(&x))
310 .for_each(|x| swapped.extend_from_slice(x.as_ref()));
311 } else {
312 buffer
313 .map(|x| T::to_be_bytes(&x))
314 .for_each(|x| swapped.extend_from_slice(x.as_ref()))
315 };
316 arrow_data.extend_from_slice(&(swapped.len() as i64).to_le_bytes());
317 match compression {
318 Compression::LZ4 => {
319 compression::compress_lz4(&swapped, arrow_data).unwrap();
320 },
321 Compression::ZSTD => {
322 compression::compress_zstd(&swapped, arrow_data).unwrap();
323 },
324 }
325}
326
327fn _write_buffer<T: NativeType>(buffer: &[T], arrow_data: &mut Vec<u8>, is_little_endian: bool) {
328 if is_little_endian == is_native_little_endian() {
329 let buffer = bytemuck::cast_slice(buffer);
331 arrow_data.extend_from_slice(buffer);
332 } else {
333 _write_buffer_from_iter(buffer.iter().copied(), arrow_data, is_little_endian)
334 }
335}
336
337fn _write_compressed_buffer<T: NativeType>(
338 buffer: &[T],
339 arrow_data: &mut Vec<u8>,
340 is_little_endian: bool,
341 compression: Compression,
342) {
343 if is_little_endian == is_native_little_endian() {
344 let bytes = bytemuck::cast_slice(buffer);
345 arrow_data.extend_from_slice(&(bytes.len() as i64).to_le_bytes());
346 match compression {
347 Compression::LZ4 => {
348 compression::compress_lz4(bytes, arrow_data).unwrap();
349 },
350 Compression::ZSTD => {
351 compression::compress_zstd(bytes, arrow_data).unwrap();
352 },
353 }
354 } else {
355 todo!()
356 }
357}
358
359#[inline]
361fn write_buffer_from_iter<T: NativeType, I: TrustedLen<Item = T>>(
362 buffer: I,
363 buffers: &mut Vec<ipc::Buffer>,
364 arrow_data: &mut Vec<u8>,
365 offset: &mut i64,
366 is_little_endian: bool,
367 compression: Option<Compression>,
368) {
369 let start = arrow_data.len();
370
371 if let Some(compression) = compression {
372 _write_compressed_buffer_from_iter(buffer, arrow_data, is_little_endian, compression);
373 } else {
374 _write_buffer_from_iter(buffer, arrow_data, is_little_endian);
375 }
376
377 buffers.push(finish_buffer(arrow_data, start, offset));
378}
379
380fn finish_buffer(arrow_data: &mut Vec<u8>, start: usize, offset: &mut i64) -> ipc::Buffer {
381 let buffer_len = (arrow_data.len() - start) as i64;
382
383 pad_buffer_to_64(arrow_data, arrow_data.len() - start);
384 let total_len = (arrow_data.len() - start) as i64;
385
386 let buffer = ipc::Buffer {
387 offset: *offset,
388 length: buffer_len,
389 };
390 *offset += total_len;
391 buffer
392}