-
Notifications
You must be signed in to change notification settings - Fork 234
Restore "Implement IPC-enabled events. (#1145)" #1176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This reverts commit bcd40ff.
|
/ok to test 9dd067e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This PR restores IPC (Inter-Process Communication) support for CUDA events, previously removed in commit bcd40ff. The implementation adds serialization capabilities to Event objects via IPCEventDescriptor, enabling events to be shared across process boundaries using CUDA's cuIpcOpenEventHandle API. Key changes include: adding _ipc_enabled and _ipc_descriptor fields to Event (cuda_core/cuda/core/experimental/_event.pxd), enabling IPC by default for events created via Device.create_event(), implementing pickle support via multiprocessing reducer registration, and reorganizing test helpers into a cuda_core/tests/helpers/ package structure with new utilities (PatternGen, LatchKernel, TimestampedLogger). The feature enforces constraints that IPC events must be bound (created via Stream.record), cannot enable timing, and cannot be re-recorded.
Critical Issues
1. Multiprocessing reducer breaks non-IPC event pickling (cuda_core/cuda/core/experimental/_event.pyx, lines 294-297)
The global multiprocessing reducer registered for Event unconditionally calls get_ipc_descriptor() when pickling any Event instance, but this method raises RuntimeError if the event is not IPC-enabled. This will break existing code that pickles regular (non-IPC) events through multiprocessing queues/pipes.
def _reduce_event(event):
return event.from_ipc_descriptor, (event.get_ipc_descriptor(),) # RuntimeError if not IPC-enabled
multiprocessing.reduction.register(Event, _reduce_event)Solution: Add a conditional check in the reducer to handle both IPC and non-IPC events.
2. Critical bug in LatchKernel instantiation (cuda_core/tests/helpers/latch.py, line 50)
The constructor accesses self.busy_wait_flag[0] before the busy_wait_flag property is defined (property defined at lines 61-63). This will cause an AttributeError when instantiating the class, breaking all tests that use this helper.
# Line 50 - property doesn't exist yet
self.busy_wait_flag[0] = 0
# Lines 61-63 - property defined later
@property
def busy_wait_flag(self):
return ctypes.cast(int(self.buffer.handle), ctypes.POINTER(ctypes.c_int32))Solution: Move the property definition before the __init__ method or directly use the ctypes cast in __init__.
3. Potential memory leak in PatternGen.verify_buffer (cuda_core/tests/helpers/buffers.py, lines 76-82)
The method allocates scratch_buffer via DummyUnifiedMemoryResource.allocate() but never explicitly frees it. Depending on the Buffer lifecycle management in the codebase, this could accumulate memory over repeated test runs.
4. Removed platform capability checks may cause test failures (cuda_core/tests/memory_ipc/test_errors.py, test_workerpool.py, test_send_buffers.py)
Multiple test files removed the supports_ipc_mempool() guard that previously skipped tests on platforms where the driver rejects IPC-enabled mempool creation (e.g., certain WSL configurations). Tests will now fail instead of skip on incompatible platforms.
5. Inconsistent device_id handling for IPC-imported events (cuda_core/cuda/core/experimental/_event.pyx, lines 193-194)
IPC-imported events set device_id = -1 and ctx_handle = None with "??" comments, indicating uncertainty about the correct values. The .device and .context properties will return None for these events, which may break downstream code expecting valid objects.
Confidence Score: 2/5
The multiprocessing reducer bug and LatchKernel instantiation error are critical and will cause immediate failures. The IPC event implementation needs review of its pickle integration strategy and proper handling of device/context for imported events before merging.
20 files reviewed, 4 comments
This comment has been minimized.
This comment has been minimized.
|
/ok to test cbeebf3 |
65975ab to
7cbcd0c
Compare
|
/ok to test dfc96c3 |
| if not IS_WINDOWS and not IS_WSL: | ||
| # On any sort of Windows system, checking the memory before stream | ||
| # sync results in a page error. | ||
| log("checking target == 0") | ||
| assert compare_equal_buffers(target, zeros) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was the cause of the CI crash. A surprise to me, accessing unified memory that is currently queued in a CUDA stream operation results in a page error on Windows. On Linux it works as expected (to me, anyway).
|
/ok to test 51e90e0 |
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
/ok to test 6bbfe18 |
| from cuda.core.experimental import Buffer, MemoryResource | ||
| from cuda.core.experimental._utils.cuda_utils import driver, handle_return | ||
|
|
||
| if sys.platform.startswith("win"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should migrate this to a common place so that this pattern doesn't get replicated in multiple files. It would be a maintenance headache if we had to update it in multiple places.
| } | ||
| // Avoid 100% spin. | ||
| __nanosleep(10000000); // 10 ms |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
10ms is a lot for a polling loop like this. You could probably get away with a 1ms and as a nit if you don't need the granularity of nanosleep you could use the std::thread::sleep as an alternative.
| log(f"got event ({hex(e.handle)})") | ||
| stream2.wait(e) | ||
| log("enqueuing copy on stream2") | ||
| buffer.copy_from(twos, stream=stream2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we verify that the buffer we got in the child process contains "1" before we copy "2" on top of it?
|
| return self._ipc_descriptor | ||
|
|
||
| @classmethod | ||
| def from_ipc_descriptor(cls, ipc_descriptor: IPCEventDescriptor) -> Event: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: it doesn't seem that this public API is tested?
| self._busy_waited = ipc_descriptor._busy_waited | ||
| self._ipc_enabled = True | ||
| self._ipc_descriptor = ipc_descriptor | ||
| self._device_id = -1 # ?? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think when we send over the handle we should also send the device id
| self._ipc_enabled = True | ||
| self._ipc_descriptor = ipc_descriptor | ||
| self._device_id = -1 # ?? | ||
| self._ctx_handle = None # ?? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be (lazily? not sure if it's possible) initialized in the child process (to the specified device's current context).
| from cuda.core.experimental._device import Device # avoid circular import | ||
|
|
||
| return Device(self._device_id) | ||
| if self._device_id >= 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, then this awkward check can be removed.
| def context(self) -> Context: | ||
| """Return the :obj:`~_context.Context` associated with this event.""" | ||
| return Context._from_ctx(self._ctx_handle, self._device_id) | ||
| if self._ctx_handle is not None and self._device_id >= 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| elif event.is_ipc_enabled: | ||
| raise TypeError( | ||
| "IPC-enabled events should not be re-recorded, instead create a " | ||
| "new event by supplying options." | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Imagine we are doing ping pong between two processes. We should be able to reuse the same event in parent and child. Does the driver actually disallow this?
This reverts commit bcd40ff.