1use std::marker::PhantomData;
2use std::mem::ManuallyDrop;
3use std::ops::{Deref, DerefMut};
4use std::ptr::NonNull;
5use std::sync::atomic::{AtomicU64, Ordering};
6
7use bytemuck::Pod;
8
9struct VecVTable {
12 size: usize,
13 align: usize,
14 drop_buffer: unsafe fn(*mut (), usize),
15}
16
17impl VecVTable {
18 const fn new<T>() -> Self {
19 unsafe fn drop_buffer<T>(ptr: *mut (), cap: usize) {
20 unsafe { drop(Vec::from_raw_parts(ptr.cast::<T>(), 0, cap)) }
21 }
22
23 Self {
24 size: size_of::<T>(),
25 align: align_of::<T>(),
26 drop_buffer: drop_buffer::<T>,
27 }
28 }
29
30 fn new_static<T>() -> &'static Self {
31 const { &Self::new::<T>() }
32 }
33}
34
35use crate::ffi::InternalArrowArray;
36
37enum BackingStorage {
38 Vec {
39 original_capacity: usize, vtable: &'static VecVTable,
41 },
42 InternalArrowArray(InternalArrowArray),
43}
44
45struct SharedStorageInner<T> {
46 ref_count: AtomicU64,
47 ptr: *mut T,
48 length_in_bytes: usize,
49 backing: Option<BackingStorage>,
50 phantom: PhantomData<T>,
52}
53
54impl<T> SharedStorageInner<T> {
55 pub fn from_vec(mut v: Vec<T>) -> Self {
56 let length_in_bytes = v.len() * size_of::<T>();
57 let original_capacity = v.capacity();
58 let ptr = v.as_mut_ptr();
59 core::mem::forget(v);
60 Self {
61 ref_count: AtomicU64::new(1),
62 ptr,
63 length_in_bytes,
64 backing: Some(BackingStorage::Vec {
65 original_capacity,
66 vtable: VecVTable::new_static::<T>(),
67 }),
68 phantom: PhantomData,
69 }
70 }
71}
72
73impl<T> Drop for SharedStorageInner<T> {
74 fn drop(&mut self) {
75 match self.backing.take() {
76 Some(BackingStorage::InternalArrowArray(a)) => drop(a),
77 Some(BackingStorage::Vec {
78 original_capacity,
79 vtable,
80 }) => unsafe {
81 if std::mem::needs_drop::<T>() {
83 core::ptr::drop_in_place(core::ptr::slice_from_raw_parts_mut(
84 self.ptr,
85 self.length_in_bytes / size_of::<T>(),
86 ));
87 }
88
89 (vtable.drop_buffer)(self.ptr.cast(), original_capacity);
91 },
92 None => {},
93 }
94 }
95}
96
97pub struct SharedStorage<T> {
98 inner: NonNull<SharedStorageInner<T>>,
99 phantom: PhantomData<SharedStorageInner<T>>,
100}
101
102unsafe impl<T: Sync + Send> Send for SharedStorage<T> {}
103unsafe impl<T: Sync + Send> Sync for SharedStorage<T> {}
104
105impl<T> SharedStorage<T> {
106 pub fn from_static(slice: &'static [T]) -> Self {
107 #[expect(clippy::manual_slice_size_calculation)]
108 let length_in_bytes = slice.len() * size_of::<T>();
109 let ptr = slice.as_ptr().cast_mut();
110 let inner = SharedStorageInner {
111 ref_count: AtomicU64::new(2), ptr,
113 length_in_bytes,
114 backing: None,
115 phantom: PhantomData,
116 };
117 Self {
118 inner: NonNull::new(Box::into_raw(Box::new(inner))).unwrap(),
119 phantom: PhantomData,
120 }
121 }
122
123 pub fn from_vec(v: Vec<T>) -> Self {
124 Self {
125 inner: NonNull::new(Box::into_raw(Box::new(SharedStorageInner::from_vec(v)))).unwrap(),
126 phantom: PhantomData,
127 }
128 }
129
130 pub fn from_internal_arrow_array(ptr: *const T, len: usize, arr: InternalArrowArray) -> Self {
131 let inner = SharedStorageInner {
132 ref_count: AtomicU64::new(1),
133 ptr: ptr.cast_mut(),
134 length_in_bytes: len * size_of::<T>(),
135 backing: Some(BackingStorage::InternalArrowArray(arr)),
136 phantom: PhantomData,
137 };
138 Self {
139 inner: NonNull::new(Box::into_raw(Box::new(inner))).unwrap(),
140 phantom: PhantomData,
141 }
142 }
143}
144
145pub struct SharedStorageAsVecMut<'a, T> {
146 ss: &'a mut SharedStorage<T>,
147 vec: ManuallyDrop<Vec<T>>,
148}
149
150impl<T> Deref for SharedStorageAsVecMut<'_, T> {
151 type Target = Vec<T>;
152
153 fn deref(&self) -> &Self::Target {
154 &self.vec
155 }
156}
157
158impl<T> DerefMut for SharedStorageAsVecMut<'_, T> {
159 fn deref_mut(&mut self) -> &mut Self::Target {
160 &mut self.vec
161 }
162}
163
164impl<T> Drop for SharedStorageAsVecMut<'_, T> {
165 fn drop(&mut self) {
166 unsafe {
167 let vec = ManuallyDrop::take(&mut self.vec);
169 let inner = self.ss.inner.as_ptr();
170 inner.write(SharedStorageInner::from_vec(vec));
171 }
172 }
173}
174
175impl<T> SharedStorage<T> {
176 #[inline(always)]
177 pub fn len(&self) -> usize {
178 self.inner().length_in_bytes / size_of::<T>()
179 }
180
181 #[inline(always)]
182 pub fn as_ptr(&self) -> *const T {
183 self.inner().ptr
184 }
185
186 #[inline(always)]
187 pub fn is_exclusive(&mut self) -> bool {
188 self.inner().ref_count.load(Ordering::Acquire) == 1
190 }
191
192 #[inline(always)]
198 pub fn refcount(&self) -> u64 {
199 self.inner().ref_count.load(Ordering::Acquire)
201 }
202
203 pub fn try_as_mut_slice(&mut self) -> Option<&mut [T]> {
204 self.is_exclusive().then(|| {
205 let inner = self.inner();
206 let len = inner.length_in_bytes / size_of::<T>();
207 unsafe { core::slice::from_raw_parts_mut(inner.ptr, len) }
208 })
209 }
210
211 pub fn try_take_vec(&mut self) -> Option<Vec<T>> {
213 let Some(BackingStorage::Vec {
216 original_capacity,
217 vtable,
218 }) = self.inner().backing
219 else {
220 return None;
221 };
222
223 if vtable.size != size_of::<T>() || vtable.align != align_of::<T>() {
224 return None;
225 }
226
227 if !self.is_exclusive() {
229 return None;
230 }
231
232 let ret;
233 unsafe {
234 let inner = &mut *self.inner.as_ptr();
235 let len = inner.length_in_bytes / size_of::<T>();
236 ret = Vec::from_raw_parts(inner.ptr, len, original_capacity);
237 inner.length_in_bytes = 0;
238 inner.backing = None;
239 }
240 Some(ret)
241 }
242
243 pub fn try_as_mut_vec(&mut self) -> Option<SharedStorageAsVecMut<'_, T>> {
247 Some(SharedStorageAsVecMut {
248 vec: ManuallyDrop::new(self.try_take_vec()?),
249 ss: self,
250 })
251 }
252
253 pub fn try_into_vec(mut self) -> Result<Vec<T>, Self> {
254 self.try_take_vec().ok_or(self)
255 }
256
257 #[inline(always)]
258 fn inner(&self) -> &SharedStorageInner<T> {
259 unsafe { &*self.inner.as_ptr() }
260 }
261
262 #[cold]
265 unsafe fn drop_slow(&mut self) {
266 unsafe { drop(Box::from_raw(self.inner.as_ptr())) }
267 }
268}
269
270impl<T: Pod> SharedStorage<T> {
271 fn try_transmute<U: Pod>(self) -> Result<SharedStorage<U>, Self> {
272 let inner = self.inner();
273
274 if size_of::<T>() % size_of::<U>() != 0 && inner.length_in_bytes % size_of::<U>() != 0 {
277 return Err(self);
278 }
279
280 if align_of::<T>() % align_of::<U>() != 0 && !inner.ptr.cast::<U>().is_aligned() {
283 return Err(self);
284 }
285
286 let storage = SharedStorage {
287 inner: self.inner.cast(),
288 phantom: PhantomData,
289 };
290 std::mem::forget(self);
291 Ok(storage)
292 }
293}
294
295impl SharedStorage<u8> {
296 pub fn bytes_from_pod_vec<T: Pod>(v: Vec<T>) -> Self {
298 SharedStorage::from_vec(v)
300 .try_transmute::<u8>()
301 .unwrap_or_else(|_| unreachable!())
302 }
303}
304
305impl<T> Deref for SharedStorage<T> {
306 type Target = [T];
307
308 #[inline]
309 fn deref(&self) -> &Self::Target {
310 unsafe {
311 let inner = self.inner();
312 let len = inner.length_in_bytes / size_of::<T>();
313 core::slice::from_raw_parts(inner.ptr, len)
314 }
315 }
316}
317
318impl<T> Clone for SharedStorage<T> {
319 fn clone(&self) -> Self {
320 let inner = self.inner();
321 if inner.backing.is_some() {
322 inner.ref_count.fetch_add(1, Ordering::Relaxed);
324 }
325 Self {
326 inner: self.inner,
327 phantom: PhantomData,
328 }
329 }
330}
331
332impl<T> Drop for SharedStorage<T> {
333 fn drop(&mut self) {
334 let inner = self.inner();
335 if inner.backing.is_none() {
336 return;
337 }
338
339 if inner.ref_count.fetch_sub(1, Ordering::Release) == 1 {
341 std::sync::atomic::fence(Ordering::Acquire);
342 unsafe {
343 self.drop_slow();
344 }
345 }
346 }
347}