tokio/runtime/task/trace/
mod.rs1use crate::loom::sync::Arc;
2use crate::runtime::context;
3use crate::runtime::scheduler::{self, current_thread, Inject};
4use crate::task::Id;
5
6use backtrace::BacktraceFrame;
7use std::cell::Cell;
8use std::collections::VecDeque;
9use std::ffi::c_void;
10use std::fmt;
11use std::future::Future;
12use std::pin::Pin;
13use std::ptr::{self, NonNull};
14use std::task::{self, Poll};
15
16mod symbol;
17mod tree;
18
19use symbol::Symbol;
20use tree::Tree;
21
22use super::{Notified, OwnedTasks, Schedule};
23
24type Backtrace = Vec<BacktraceFrame>;
25type SymbolTrace = Vec<Symbol>;
26
27pub(crate) struct Context {
29 active_frame: Cell<Option<NonNull<Frame>>>,
32 collector: Cell<Option<Trace>>,
34}
35
36struct Frame {
38 inner_addr: *const c_void,
40
41 parent: Option<NonNull<Frame>>,
43}
44
45#[derive(Clone, Debug)]
50pub(crate) struct Trace {
51 backtraces: Vec<Backtrace>,
54}
55
56pin_project_lite::pin_project! {
57 #[derive(Debug, Clone)]
58 #[must_use = "futures do nothing unless you `.await` or poll them"]
59 pub struct Root<T> {
61 #[pin]
62 future: T,
63 }
64}
65
66const FAIL_NO_THREAD_LOCAL: &str = "The Tokio thread-local has been destroyed \
67 as part of shutting down the current \
68 thread, so collecting a taskdump is not \
69 possible.";
70
71impl Context {
72 pub(crate) const fn new() -> Self {
73 Context {
74 active_frame: Cell::new(None),
75 collector: Cell::new(None),
76 }
77 }
78
79 unsafe fn try_with_current<F, R>(f: F) -> Option<R>
82 where
83 F: FnOnce(&Self) -> R,
84 {
85 crate::runtime::context::with_trace(f)
86 }
87
88 unsafe fn with_current_frame<F, R>(f: F) -> R
89 where
90 F: FnOnce(&Cell<Option<NonNull<Frame>>>) -> R,
91 {
92 Self::try_with_current(|context| f(&context.active_frame)).expect(FAIL_NO_THREAD_LOCAL)
93 }
94
95 fn with_current_collector<F, R>(f: F) -> R
96 where
97 F: FnOnce(&Cell<Option<Trace>>) -> R,
98 {
99 unsafe {
102 Self::try_with_current(|context| f(&context.collector)).expect(FAIL_NO_THREAD_LOCAL)
103 }
104 }
105
106 pub(crate) fn is_tracing() -> bool {
108 Self::with_current_collector(|maybe_collector| {
109 let collector = maybe_collector.take();
110 let result = collector.is_some();
111 maybe_collector.set(collector);
112 result
113 })
114 }
115}
116
117impl Trace {
118 #[inline(never)]
121 pub(crate) fn capture<F, R>(f: F) -> (R, Trace)
122 where
123 F: FnOnce() -> R,
124 {
125 let collector = Trace { backtraces: vec![] };
126
127 let previous = Context::with_current_collector(|current| current.replace(Some(collector)));
128
129 let result = f();
130
131 let collector =
132 Context::with_current_collector(|current| current.replace(previous)).unwrap();
133
134 (result, collector)
135 }
136
137 #[inline(never)]
139 pub(crate) fn root<F>(future: F) -> Root<F> {
140 Root { future }
141 }
142
143 pub(crate) fn backtraces(&self) -> &[Backtrace] {
144 &self.backtraces
145 }
146}
147
148#[inline(never)]
158pub(crate) fn trace_leaf(cx: &mut task::Context<'_>) -> Poll<()> {
159 let did_trace = unsafe {
161 Context::try_with_current(|context_cell| {
162 if let Some(mut collector) = context_cell.collector.take() {
163 let mut frames = vec![];
164 let mut above_leaf = false;
165
166 if let Some(active_frame) = context_cell.active_frame.get() {
167 let active_frame = active_frame.as_ref();
168
169 backtrace::trace(|frame| {
170 let below_root = !ptr::eq(frame.symbol_address(), active_frame.inner_addr);
171
172 if above_leaf && below_root {
175 frames.push(frame.to_owned().into());
176 }
177
178 if ptr::eq(frame.symbol_address(), trace_leaf as *const _) {
179 above_leaf = true;
180 }
181
182 below_root
184 });
185 }
186 collector.backtraces.push(frames);
187 context_cell.collector.set(Some(collector));
188 true
189 } else {
190 false
191 }
192 })
193 .unwrap_or(false)
194 };
195
196 if did_trace {
197 context::with_scheduler(|scheduler| {
200 if let Some(scheduler) = scheduler {
201 match scheduler {
202 scheduler::Context::CurrentThread(s) => s.defer.defer(cx.waker()),
203 #[cfg(feature = "rt-multi-thread")]
204 scheduler::Context::MultiThread(s) => s.defer.defer(cx.waker()),
205 }
206 }
207 });
208
209 Poll::Pending
210 } else {
211 Poll::Ready(())
212 }
213}
214
215impl fmt::Display for Trace {
216 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217 Tree::from_trace(self.clone()).fmt(f)
218 }
219}
220
221fn defer<F: FnOnce() -> R, R>(f: F) -> impl Drop {
222 use std::mem::ManuallyDrop;
223
224 struct Defer<F: FnOnce() -> R, R>(ManuallyDrop<F>);
225
226 impl<F: FnOnce() -> R, R> Drop for Defer<F, R> {
227 #[inline(always)]
228 fn drop(&mut self) {
229 unsafe {
230 ManuallyDrop::take(&mut self.0)();
231 }
232 }
233 }
234
235 Defer(ManuallyDrop::new(f))
236}
237
238impl<T: Future> Future for Root<T> {
239 type Output = T::Output;
240
241 #[inline(never)]
242 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
243 unsafe {
246 let mut frame = Frame {
247 inner_addr: Self::poll as *const c_void,
248 parent: None,
249 };
250
251 Context::with_current_frame(|current| {
252 frame.parent = current.take();
253 current.set(Some(NonNull::from(&frame)));
254 });
255
256 let _restore = defer(|| {
257 Context::with_current_frame(|current| {
258 current.set(frame.parent);
259 });
260 });
261
262 let this = self.project();
263 this.future.poll(cx)
264 }
265 }
266}
267
268pub(in crate::runtime) fn trace_current_thread(
270 owned: &OwnedTasks<Arc<current_thread::Handle>>,
271 local: &mut VecDeque<Notified<Arc<current_thread::Handle>>>,
272 injection: &Inject<Arc<current_thread::Handle>>,
273) -> Vec<(Id, Trace)> {
274 let mut dequeued = Vec::new();
277
278 while let Some(task) = local.pop_back() {
279 dequeued.push(task);
280 }
281
282 while let Some(task) = injection.pop() {
283 dequeued.push(task);
284 }
285
286 trace_owned(owned, dequeued)
288}
289
290cfg_rt_multi_thread! {
291 use crate::loom::sync::Mutex;
292 use crate::runtime::scheduler::multi_thread;
293 use crate::runtime::scheduler::multi_thread::Synced;
294 use crate::runtime::scheduler::inject::Shared;
295
296 pub(in crate::runtime) unsafe fn trace_multi_thread(
302 owned: &OwnedTasks<Arc<multi_thread::Handle>>,
303 local: &mut multi_thread::queue::Local<Arc<multi_thread::Handle>>,
304 synced: &Mutex<Synced>,
305 injection: &Shared<Arc<multi_thread::Handle>>,
306 ) -> Vec<(Id, Trace)> {
307 let mut dequeued = Vec::new();
308
309 while let Some(notified) = local.pop() {
311 dequeued.push(notified);
312 }
313
314 let mut synced = synced.lock();
316 while let Some(notified) = injection.pop(&mut synced.inject) {
317 dequeued.push(notified);
318 }
319
320 drop(synced);
321
322 trace_owned(owned, dequeued)
325 }
326}
327
328fn trace_owned<S: Schedule>(owned: &OwnedTasks<S>, dequeued: Vec<Notified<S>>) -> Vec<(Id, Trace)> {
335 let mut tasks = dequeued;
336 owned.for_each(|task| {
339 if let Some(notified) = task.notify_for_tracing() {
343 tasks.push(notified);
344 }
345 });
349
350 tasks
351 .into_iter()
352 .map(|task| {
353 let local_notified = owned.assert_owner(task);
354 let id = local_notified.task.id();
355 let ((), trace) = Trace::capture(|| local_notified.run());
356 (id, trace)
357 })
358 .collect()
359}