polars_arrow/
storage.rs

1use std::marker::PhantomData;
2use std::mem::ManuallyDrop;
3use std::ops::{Deref, DerefMut};
4use std::ptr::NonNull;
5use std::sync::atomic::{AtomicU64, Ordering};
6
7use bytemuck::Pod;
8
9// Allows us to transmute between types while also keeping the original
10// stats and drop method of the Vec around.
11struct VecVTable {
12    size: usize,
13    align: usize,
14    drop_buffer: unsafe fn(*mut (), usize),
15}
16
17impl VecVTable {
18    const fn new<T>() -> Self {
19        unsafe fn drop_buffer<T>(ptr: *mut (), cap: usize) {
20            unsafe { drop(Vec::from_raw_parts(ptr.cast::<T>(), 0, cap)) }
21        }
22
23        Self {
24            size: size_of::<T>(),
25            align: align_of::<T>(),
26            drop_buffer: drop_buffer::<T>,
27        }
28    }
29
30    fn new_static<T>() -> &'static Self {
31        const { &Self::new::<T>() }
32    }
33}
34
35use crate::ffi::InternalArrowArray;
36
37enum BackingStorage {
38    Vec {
39        original_capacity: usize, // Elements, not bytes.
40        vtable: &'static VecVTable,
41    },
42    InternalArrowArray(InternalArrowArray),
43}
44
45struct SharedStorageInner<T> {
46    ref_count: AtomicU64,
47    ptr: *mut T,
48    length_in_bytes: usize,
49    backing: Option<BackingStorage>,
50    // https://github.com/rust-lang/rfcs/blob/master/text/0769-sound-generic-drop.md#phantom-data
51    phantom: PhantomData<T>,
52}
53
54impl<T> SharedStorageInner<T> {
55    pub fn from_vec(mut v: Vec<T>) -> Self {
56        let length_in_bytes = v.len() * size_of::<T>();
57        let original_capacity = v.capacity();
58        let ptr = v.as_mut_ptr();
59        core::mem::forget(v);
60        Self {
61            ref_count: AtomicU64::new(1),
62            ptr,
63            length_in_bytes,
64            backing: Some(BackingStorage::Vec {
65                original_capacity,
66                vtable: VecVTable::new_static::<T>(),
67            }),
68            phantom: PhantomData,
69        }
70    }
71}
72
73impl<T> Drop for SharedStorageInner<T> {
74    fn drop(&mut self) {
75        match self.backing.take() {
76            Some(BackingStorage::InternalArrowArray(a)) => drop(a),
77            Some(BackingStorage::Vec {
78                original_capacity,
79                vtable,
80            }) => unsafe {
81                // Drop the elements in our slice.
82                if std::mem::needs_drop::<T>() {
83                    core::ptr::drop_in_place(core::ptr::slice_from_raw_parts_mut(
84                        self.ptr,
85                        self.length_in_bytes / size_of::<T>(),
86                    ));
87                }
88
89                // Free the buffer.
90                (vtable.drop_buffer)(self.ptr.cast(), original_capacity);
91            },
92            None => {},
93        }
94    }
95}
96
97pub struct SharedStorage<T> {
98    inner: NonNull<SharedStorageInner<T>>,
99    phantom: PhantomData<SharedStorageInner<T>>,
100}
101
102unsafe impl<T: Sync + Send> Send for SharedStorage<T> {}
103unsafe impl<T: Sync + Send> Sync for SharedStorage<T> {}
104
105impl<T> SharedStorage<T> {
106    pub fn from_static(slice: &'static [T]) -> Self {
107        #[expect(clippy::manual_slice_size_calculation)]
108        let length_in_bytes = slice.len() * size_of::<T>();
109        let ptr = slice.as_ptr().cast_mut();
110        let inner = SharedStorageInner {
111            ref_count: AtomicU64::new(2), // Never used, but 2 so it won't pass exclusivity tests.
112            ptr,
113            length_in_bytes,
114            backing: None,
115            phantom: PhantomData,
116        };
117        Self {
118            inner: NonNull::new(Box::into_raw(Box::new(inner))).unwrap(),
119            phantom: PhantomData,
120        }
121    }
122
123    pub fn from_vec(v: Vec<T>) -> Self {
124        Self {
125            inner: NonNull::new(Box::into_raw(Box::new(SharedStorageInner::from_vec(v)))).unwrap(),
126            phantom: PhantomData,
127        }
128    }
129
130    pub fn from_internal_arrow_array(ptr: *const T, len: usize, arr: InternalArrowArray) -> Self {
131        let inner = SharedStorageInner {
132            ref_count: AtomicU64::new(1),
133            ptr: ptr.cast_mut(),
134            length_in_bytes: len * size_of::<T>(),
135            backing: Some(BackingStorage::InternalArrowArray(arr)),
136            phantom: PhantomData,
137        };
138        Self {
139            inner: NonNull::new(Box::into_raw(Box::new(inner))).unwrap(),
140            phantom: PhantomData,
141        }
142    }
143}
144
145pub struct SharedStorageAsVecMut<'a, T> {
146    ss: &'a mut SharedStorage<T>,
147    vec: ManuallyDrop<Vec<T>>,
148}
149
150impl<T> Deref for SharedStorageAsVecMut<'_, T> {
151    type Target = Vec<T>;
152
153    fn deref(&self) -> &Self::Target {
154        &self.vec
155    }
156}
157
158impl<T> DerefMut for SharedStorageAsVecMut<'_, T> {
159    fn deref_mut(&mut self) -> &mut Self::Target {
160        &mut self.vec
161    }
162}
163
164impl<T> Drop for SharedStorageAsVecMut<'_, T> {
165    fn drop(&mut self) {
166        unsafe {
167            // Restore the SharedStorage.
168            let vec = ManuallyDrop::take(&mut self.vec);
169            let inner = self.ss.inner.as_ptr();
170            inner.write(SharedStorageInner::from_vec(vec));
171        }
172    }
173}
174
175impl<T> SharedStorage<T> {
176    #[inline(always)]
177    pub fn len(&self) -> usize {
178        self.inner().length_in_bytes / size_of::<T>()
179    }
180
181    #[inline(always)]
182    pub fn as_ptr(&self) -> *const T {
183        self.inner().ptr
184    }
185
186    #[inline(always)]
187    pub fn is_exclusive(&mut self) -> bool {
188        // Ordering semantics copied from Arc<T>.
189        self.inner().ref_count.load(Ordering::Acquire) == 1
190    }
191
192    /// Gets the reference count of this storage.
193    ///
194    /// Because this function takes a shared reference this should not be used
195    /// in cases where we are checking if the refcount is one for safety,
196    /// someone else could increment it in the meantime.
197    #[inline(always)]
198    pub fn refcount(&self) -> u64 {
199        // Ordering semantics copied from Arc<T>.
200        self.inner().ref_count.load(Ordering::Acquire)
201    }
202
203    pub fn try_as_mut_slice(&mut self) -> Option<&mut [T]> {
204        self.is_exclusive().then(|| {
205            let inner = self.inner();
206            let len = inner.length_in_bytes / size_of::<T>();
207            unsafe { core::slice::from_raw_parts_mut(inner.ptr, len) }
208        })
209    }
210
211    /// Try to take the vec backing this SharedStorage, leaving this as an empty slice.
212    pub fn try_take_vec(&mut self) -> Option<Vec<T>> {
213        // We may only go back to a Vec if we originally came from a Vec
214        // where the desired size/align matches the original.
215        let Some(BackingStorage::Vec {
216            original_capacity,
217            vtable,
218        }) = self.inner().backing
219        else {
220            return None;
221        };
222
223        if vtable.size != size_of::<T>() || vtable.align != align_of::<T>() {
224            return None;
225        }
226
227        // If there are other references we can't get an exclusive reference.
228        if !self.is_exclusive() {
229            return None;
230        }
231
232        let ret;
233        unsafe {
234            let inner = &mut *self.inner.as_ptr();
235            let len = inner.length_in_bytes / size_of::<T>();
236            ret = Vec::from_raw_parts(inner.ptr, len, original_capacity);
237            inner.length_in_bytes = 0;
238            inner.backing = None;
239        }
240        Some(ret)
241    }
242
243    /// Attempts to call the given function with this SharedStorage as a
244    /// reference to a mutable Vec. If this SharedStorage can't be converted to
245    /// a Vec the function is not called and instead returned as an error.
246    pub fn try_as_mut_vec(&mut self) -> Option<SharedStorageAsVecMut<'_, T>> {
247        Some(SharedStorageAsVecMut {
248            vec: ManuallyDrop::new(self.try_take_vec()?),
249            ss: self,
250        })
251    }
252
253    pub fn try_into_vec(mut self) -> Result<Vec<T>, Self> {
254        self.try_take_vec().ok_or(self)
255    }
256
257    #[inline(always)]
258    fn inner(&self) -> &SharedStorageInner<T> {
259        unsafe { &*self.inner.as_ptr() }
260    }
261
262    /// # Safety
263    /// May only be called once.
264    #[cold]
265    unsafe fn drop_slow(&mut self) {
266        unsafe { drop(Box::from_raw(self.inner.as_ptr())) }
267    }
268}
269
270impl<T: Pod> SharedStorage<T> {
271    fn try_transmute<U: Pod>(self) -> Result<SharedStorage<U>, Self> {
272        let inner = self.inner();
273
274        // The length of the array in bytes must be a multiple of the target size.
275        // We can skip this check if the size of U divides the size of T.
276        if size_of::<T>() % size_of::<U>() != 0 && inner.length_in_bytes % size_of::<U>() != 0 {
277            return Err(self);
278        }
279
280        // The pointer must be properly aligned for U.
281        // We can skip this check if the alignment of U divides the alignment of T.
282        if align_of::<T>() % align_of::<U>() != 0 && !inner.ptr.cast::<U>().is_aligned() {
283            return Err(self);
284        }
285
286        let storage = SharedStorage {
287            inner: self.inner.cast(),
288            phantom: PhantomData,
289        };
290        std::mem::forget(self);
291        Ok(storage)
292    }
293}
294
295impl SharedStorage<u8> {
296    /// Create a [`SharedStorage<u8>`][SharedStorage] from a [`Vec`] of [`Pod`].
297    pub fn bytes_from_pod_vec<T: Pod>(v: Vec<T>) -> Self {
298        // This can't fail, bytes is compatible with everything.
299        SharedStorage::from_vec(v)
300            .try_transmute::<u8>()
301            .unwrap_or_else(|_| unreachable!())
302    }
303}
304
305impl<T> Deref for SharedStorage<T> {
306    type Target = [T];
307
308    #[inline]
309    fn deref(&self) -> &Self::Target {
310        unsafe {
311            let inner = self.inner();
312            let len = inner.length_in_bytes / size_of::<T>();
313            core::slice::from_raw_parts(inner.ptr, len)
314        }
315    }
316}
317
318impl<T> Clone for SharedStorage<T> {
319    fn clone(&self) -> Self {
320        let inner = self.inner();
321        if inner.backing.is_some() {
322            // Ordering semantics copied from Arc<T>.
323            inner.ref_count.fetch_add(1, Ordering::Relaxed);
324        }
325        Self {
326            inner: self.inner,
327            phantom: PhantomData,
328        }
329    }
330}
331
332impl<T> Drop for SharedStorage<T> {
333    fn drop(&mut self) {
334        let inner = self.inner();
335        if inner.backing.is_none() {
336            return;
337        }
338
339        // Ordering semantics copied from Arc<T>.
340        if inner.ref_count.fetch_sub(1, Ordering::Release) == 1 {
341            std::sync::atomic::fence(Ordering::Acquire);
342            unsafe {
343                self.drop_slow();
344            }
345        }
346    }
347}