tokio/task/local.rs
1//! Runs `!Send` futures on the current thread.
2use crate::loom::cell::UnsafeCell;
3use crate::loom::sync::{Arc, Mutex};
4#[cfg(tokio_unstable)]
5use crate::runtime;
6use crate::runtime::task::{
7 self, JoinHandle, LocalOwnedTasks, SpawnLocation, Task, TaskHarnessScheduleHooks,
8};
9use crate::runtime::{context, ThreadId, BOX_FUTURE_THRESHOLD};
10use crate::sync::AtomicWaker;
11use crate::util::trace::SpawnMeta;
12use crate::util::RcCell;
13
14use std::cell::Cell;
15use std::collections::VecDeque;
16use std::fmt;
17use std::future::Future;
18use std::marker::PhantomData;
19use std::mem;
20use std::pin::Pin;
21use std::rc::Rc;
22use std::task::Poll;
23
24use pin_project_lite::pin_project;
25
26cfg_rt! {
27 /// A set of tasks which are executed on the same thread.
28 ///
29 /// In some cases, it is necessary to run one or more futures that do not
30 /// implement [`Send`] and thus are unsafe to send between threads. In these
31 /// cases, a [local task set] may be used to schedule one or more `!Send`
32 /// futures to run together on the same thread.
33 ///
34 /// For example, the following code will not compile:
35 ///
36 /// ```rust,compile_fail
37 /// use std::rc::Rc;
38 ///
39 /// #[tokio::main]
40 /// async fn main() {
41 /// // `Rc` does not implement `Send`, and thus may not be sent between
42 /// // threads safely.
43 /// let nonsend_data = Rc::new("my nonsend data...");
44 ///
45 /// let nonsend_data = nonsend_data.clone();
46 /// // Because the `async` block here moves `nonsend_data`, the future is `!Send`.
47 /// // Since `tokio::spawn` requires the spawned future to implement `Send`, this
48 /// // will not compile.
49 /// tokio::spawn(async move {
50 /// println!("{}", nonsend_data);
51 /// // ...
52 /// }).await.unwrap();
53 /// }
54 /// ```
55 ///
56 /// # Use with `run_until`
57 ///
58 /// To spawn `!Send` futures, we can use a local task set to schedule them
59 /// on the thread calling [`Runtime::block_on`]. When running inside of the
60 /// local task set, we can use [`task::spawn_local`], which can spawn
61 /// `!Send` futures. For example:
62 ///
63 /// ```rust
64 /// use std::rc::Rc;
65 /// use tokio::task;
66 ///
67 /// # #[tokio::main(flavor = "current_thread")]
68 /// # async fn main() {
69 /// let nonsend_data = Rc::new("my nonsend data...");
70 ///
71 /// // Construct a local task set that can run `!Send` futures.
72 /// let local = task::LocalSet::new();
73 ///
74 /// // Run the local task set.
75 /// local.run_until(async move {
76 /// let nonsend_data = nonsend_data.clone();
77 /// // `spawn_local` ensures that the future is spawned on the local
78 /// // task set.
79 /// task::spawn_local(async move {
80 /// println!("{}", nonsend_data);
81 /// // ...
82 /// }).await.unwrap();
83 /// }).await;
84 /// # }
85 /// ```
86 /// **Note:** The `run_until` method can only be used in `#[tokio::main]`,
87 /// `#[tokio::test]` or directly inside a call to [`Runtime::block_on`]. It
88 /// cannot be used inside a task spawned with `tokio::spawn`.
89 ///
90 /// ## Awaiting a `LocalSet`
91 ///
92 /// Additionally, a `LocalSet` itself implements `Future`, completing when
93 /// *all* tasks spawned on the `LocalSet` complete. This can be used to run
94 /// several futures on a `LocalSet` and drive the whole set until they
95 /// complete. For example,
96 ///
97 /// ```rust
98 /// use tokio::{task, time};
99 /// use std::rc::Rc;
100 ///
101 /// # #[tokio::main(flavor = "current_thread")]
102 /// # async fn main() {
103 /// let nonsend_data = Rc::new("world");
104 /// let local = task::LocalSet::new();
105 ///
106 /// let nonsend_data2 = nonsend_data.clone();
107 /// local.spawn_local(async move {
108 /// // ...
109 /// println!("hello {}", nonsend_data2)
110 /// });
111 ///
112 /// local.spawn_local(async move {
113 /// time::sleep(time::Duration::from_millis(100)).await;
114 /// println!("goodbye {}", nonsend_data)
115 /// });
116 ///
117 /// // ...
118 ///
119 /// local.await;
120 /// # }
121 /// ```
122 /// **Note:** Awaiting a `LocalSet` can only be done inside
123 /// `#[tokio::main]`, `#[tokio::test]` or directly inside a call to
124 /// [`Runtime::block_on`]. It cannot be used inside a task spawned with
125 /// `tokio::spawn`.
126 ///
127 /// ## Use inside `tokio::spawn`
128 ///
129 /// The two methods mentioned above cannot be used inside `tokio::spawn`, so
130 /// to spawn `!Send` futures from inside `tokio::spawn`, we need to do
131 /// something else. The solution is to create the `LocalSet` somewhere else,
132 /// and communicate with it using an [`mpsc`] channel.
133 ///
134 /// The following example puts the `LocalSet` inside a new thread.
135 /// ```
136 /// # #[cfg(not(target_family = "wasm"))]
137 /// # {
138 /// use tokio::runtime::Builder;
139 /// use tokio::sync::{mpsc, oneshot};
140 /// use tokio::task::LocalSet;
141 ///
142 /// // This struct describes the task you want to spawn. Here we include
143 /// // some simple examples. The oneshot channel allows sending a response
144 /// // to the spawner.
145 /// #[derive(Debug)]
146 /// enum Task {
147 /// PrintNumber(u32),
148 /// AddOne(u32, oneshot::Sender<u32>),
149 /// }
150 ///
151 /// #[derive(Clone)]
152 /// struct LocalSpawner {
153 /// send: mpsc::UnboundedSender<Task>,
154 /// }
155 ///
156 /// impl LocalSpawner {
157 /// pub fn new() -> Self {
158 /// let (send, mut recv) = mpsc::unbounded_channel();
159 ///
160 /// let rt = Builder::new_current_thread()
161 /// .enable_all()
162 /// .build()
163 /// .unwrap();
164 ///
165 /// std::thread::spawn(move || {
166 /// let local = LocalSet::new();
167 ///
168 /// local.spawn_local(async move {
169 /// while let Some(new_task) = recv.recv().await {
170 /// tokio::task::spawn_local(run_task(new_task));
171 /// }
172 /// // If the while loop returns, then all the LocalSpawner
173 /// // objects have been dropped.
174 /// });
175 ///
176 /// // This will return once all senders are dropped and all
177 /// // spawned tasks have returned.
178 /// rt.block_on(local);
179 /// });
180 ///
181 /// Self {
182 /// send,
183 /// }
184 /// }
185 ///
186 /// pub fn spawn(&self, task: Task) {
187 /// self.send.send(task).expect("Thread with LocalSet has shut down.");
188 /// }
189 /// }
190 ///
191 /// // This task may do !Send stuff. We use printing a number as an example,
192 /// // but it could be anything.
193 /// //
194 /// // The Task struct is an enum to support spawning many different kinds
195 /// // of operations.
196 /// async fn run_task(task: Task) {
197 /// match task {
198 /// Task::PrintNumber(n) => {
199 /// println!("{}", n);
200 /// },
201 /// Task::AddOne(n, response) => {
202 /// // We ignore failures to send the response.
203 /// let _ = response.send(n + 1);
204 /// },
205 /// }
206 /// }
207 ///
208 /// #[tokio::main]
209 /// async fn main() {
210 /// let spawner = LocalSpawner::new();
211 ///
212 /// let (send, response) = oneshot::channel();
213 /// spawner.spawn(Task::AddOne(10, send));
214 /// let eleven = response.await.unwrap();
215 /// assert_eq!(eleven, 11);
216 /// }
217 /// # }
218 /// ```
219 ///
220 /// [`Send`]: trait@std::marker::Send
221 /// [local task set]: struct@LocalSet
222 /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
223 /// [`task::spawn_local`]: fn@spawn_local
224 /// [`mpsc`]: mod@crate::sync::mpsc
225 pub struct LocalSet {
226 /// Current scheduler tick.
227 tick: Cell<u8>,
228
229 /// State available from thread-local.
230 context: Rc<Context>,
231
232 /// This type should not be Send.
233 _not_send: PhantomData<*const ()>,
234 }
235}
236
237/// State available from the thread-local.
238struct Context {
239 /// State shared between threads.
240 shared: Arc<Shared>,
241
242 /// True if a task panicked without being handled and the local set is
243 /// configured to shutdown on unhandled panic.
244 unhandled_panic: Cell<bool>,
245}
246
247/// `LocalSet` state shared between threads.
248struct Shared {
249 /// # Safety
250 ///
251 /// This field must *only* be accessed from the thread that owns the
252 /// `LocalSet` (i.e., `Thread::current().id() == owner`).
253 local_state: LocalState,
254
255 /// Remote run queue sender.
256 queue: Mutex<Option<VecDeque<task::Notified<Arc<Shared>>>>>,
257
258 /// Wake the `LocalSet` task.
259 waker: AtomicWaker,
260
261 /// How to respond to unhandled task panics.
262 #[cfg(tokio_unstable)]
263 pub(crate) unhandled_panic: crate::runtime::UnhandledPanic,
264}
265
266/// Tracks the `LocalSet` state that must only be accessed from the thread that
267/// created the `LocalSet`.
268struct LocalState {
269 /// The `ThreadId` of the thread that owns the `LocalSet`.
270 owner: ThreadId,
271
272 /// Local run queue sender and receiver.
273 local_queue: UnsafeCell<VecDeque<task::Notified<Arc<Shared>>>>,
274
275 /// Collection of all active tasks spawned onto this executor.
276 owned: LocalOwnedTasks<Arc<Shared>>,
277}
278
279pin_project! {
280 #[derive(Debug)]
281 struct RunUntil<'a, F> {
282 local_set: &'a LocalSet,
283 #[pin]
284 future: F,
285 }
286}
287
288tokio_thread_local!(static CURRENT: LocalData = const { LocalData {
289 ctx: RcCell::new(),
290 wake_on_schedule: Cell::new(false),
291} });
292
293struct LocalData {
294 ctx: RcCell<Context>,
295 wake_on_schedule: Cell<bool>,
296}
297
298impl LocalData {
299 /// Should be called except when we call `LocalSet::enter`.
300 /// Especially when we poll a `LocalSet`.
301 #[must_use = "dropping this guard will reset the entered state"]
302 fn enter(&self, ctx: Rc<Context>) -> LocalDataEnterGuard<'_> {
303 let ctx = self.ctx.replace(Some(ctx));
304 let wake_on_schedule = self.wake_on_schedule.replace(false);
305 LocalDataEnterGuard {
306 local_data_ref: self,
307 ctx,
308 wake_on_schedule,
309 }
310 }
311}
312
313/// A guard for `LocalData::enter()`
314struct LocalDataEnterGuard<'a> {
315 local_data_ref: &'a LocalData,
316 ctx: Option<Rc<Context>>,
317 wake_on_schedule: bool,
318}
319
320impl<'a> Drop for LocalDataEnterGuard<'a> {
321 fn drop(&mut self) {
322 self.local_data_ref.ctx.set(self.ctx.take());
323 self.local_data_ref
324 .wake_on_schedule
325 .set(self.wake_on_schedule)
326 }
327}
328
329cfg_rt! {
330 /// Spawns a `!Send` future on the current [`LocalSet`] or [`LocalRuntime`].
331 ///
332 /// This is possible when either using one of these types
333 /// explicitly, or (with `tokio_unstable`) by opting to use the
334 /// `"local"` runtime flavor in `tokio::main`:
335 ///
336 /// ```ignore
337 /// #[tokio::main(flavor = "local")]
338 /// ```
339 ///
340 /// The spawned future will run on the same thread that called `spawn_local`.
341 ///
342 /// The provided future will start running in the background immediately
343 /// when `spawn_local` is called, even if you don't await the returned
344 /// `JoinHandle`.
345 ///
346 /// # Panics
347 ///
348 /// This function panics if called outside of a [`LocalSet`] or [`LocalRuntime`].
349 ///
350 /// Note that if [`tokio::spawn`] is used from within a `LocalSet`, the
351 /// resulting new task will _not_ be inside the `LocalSet`, so you must use
352 /// `spawn_local` if you want to stay within the `LocalSet`.
353 ///
354 /// # Examples
355 ///
356 /// With `LocalSet`:
357 ///
358 /// ```rust
359 /// use std::rc::Rc;
360 /// use tokio::task;
361 ///
362 /// # #[tokio::main(flavor = "current_thread")]
363 /// # async fn main() {
364 /// let nonsend_data = Rc::new("my nonsend data...");
365 ///
366 /// let local = task::LocalSet::new();
367 ///
368 /// // Run the local task set.
369 /// local.run_until(async move {
370 /// let nonsend_data = nonsend_data.clone();
371 /// task::spawn_local(async move {
372 /// println!("{}", nonsend_data);
373 /// // ...
374 /// }).await.unwrap();
375 /// }).await;
376 /// # }
377 /// ```
378 /// With local runtime flavor ([Unstable API][unstable] only).
379 ///
380 /// ```rust
381 /// # #[cfg(tokio_unstable)]
382 /// #[tokio::main(flavor = "local")]
383 /// async fn main() {
384 /// let join = tokio::task::spawn_local(async {
385 /// println!("my nonsend data...")
386 /// });
387 ///
388 /// join.await.unwrap()
389 /// }
390 /// # #[cfg(not(tokio_unstable))]
391 /// # fn main() {}
392 ///
393 /// ```
394 ///
395 /// [`LocalSet`]: struct@crate::task::LocalSet
396 /// [`LocalRuntime`]: struct@crate::runtime::LocalRuntime
397 /// [`tokio::spawn`]: fn@crate::task::spawn
398 /// [unstable]: ../../tokio/index.html#unstable-features
399 #[track_caller]
400 pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
401 where
402 F: Future + 'static,
403 F::Output: 'static,
404 {
405 let fut_size = std::mem::size_of::<F>();
406 if fut_size > BOX_FUTURE_THRESHOLD {
407 spawn_local_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
408 } else {
409 spawn_local_inner(future, SpawnMeta::new_unnamed(fut_size))
410 }
411 }
412
413
414 #[track_caller]
415 pub(super) fn spawn_local_inner<F>(future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
416 where F: Future + 'static,
417 F::Output: 'static
418 {
419 use crate::runtime::{context, task};
420
421 let mut future = Some(future);
422
423 let res = context::with_current(|handle| {
424 Some(if handle.is_local() {
425 if !handle.can_spawn_local_on_local_runtime() {
426 return None;
427 }
428
429 let future = future.take().unwrap();
430
431 #[cfg(all(
432 tokio_unstable,
433 feature = "taskdump",
434 feature = "rt",
435 target_os = "linux",
436 any(
437 target_arch = "aarch64",
438 target_arch = "x86",
439 target_arch = "x86_64"
440 )
441 ))]
442 let future = task::trace::Trace::root(future);
443 let id = task::Id::next();
444 let task = crate::util::trace::task(future, "task", meta, id.as_u64());
445
446 // safety: we have verified that this is a `LocalRuntime` owned by the current thread
447 unsafe { handle.spawn_local(task, id, meta.spawned_at) }
448 } else {
449 match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) {
450 None => panic!("`spawn_local` called from outside of a `task::LocalSet` or `runtime::LocalRuntime`"),
451 Some(cx) => cx.spawn(future.take().unwrap(), meta)
452 }
453 })
454 });
455
456 match res {
457 Ok(None) => panic!("Local tasks can only be spawned on a LocalRuntime from the thread the runtime was created on"),
458 Ok(Some(join_handle)) => join_handle,
459 Err(_) => match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) {
460 None => panic!("`spawn_local` called from outside of a `task::LocalSet` or `runtime::LocalRuntime`"),
461 Some(cx) => cx.spawn(future.unwrap(), meta)
462 }
463 }
464 }
465}
466
467/// Initial queue capacity.
468const INITIAL_CAPACITY: usize = 64;
469
470/// Max number of tasks to poll per tick.
471const MAX_TASKS_PER_TICK: usize = 61;
472
473/// How often it check the remote queue first.
474const REMOTE_FIRST_INTERVAL: u8 = 31;
475
476/// Context guard for `LocalSet`
477pub struct LocalEnterGuard {
478 ctx: Option<Rc<Context>>,
479
480 /// Distinguishes whether the context was entered or being polled.
481 /// When we enter it, the value `wake_on_schedule` is set. In this case
482 /// `spawn_local` refers the context, whereas it is not being polled now.
483 wake_on_schedule: bool,
484}
485
486impl Drop for LocalEnterGuard {
487 fn drop(&mut self) {
488 CURRENT.with(
489 |LocalData {
490 ctx,
491 wake_on_schedule,
492 }| {
493 ctx.set(self.ctx.take());
494 wake_on_schedule.set(self.wake_on_schedule);
495 },
496 );
497 }
498}
499
500impl fmt::Debug for LocalEnterGuard {
501 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
502 f.debug_struct("LocalEnterGuard").finish()
503 }
504}
505
506impl LocalSet {
507 /// Returns a new local task set.
508 pub fn new() -> LocalSet {
509 let owner = context::thread_id().expect("cannot create LocalSet during thread shutdown");
510
511 LocalSet {
512 tick: Cell::new(0),
513 context: Rc::new(Context {
514 shared: Arc::new(Shared {
515 local_state: LocalState {
516 owner,
517 owned: LocalOwnedTasks::new(),
518 local_queue: UnsafeCell::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
519 },
520 queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
521 waker: AtomicWaker::new(),
522 #[cfg(tokio_unstable)]
523 unhandled_panic: crate::runtime::UnhandledPanic::Ignore,
524 }),
525 unhandled_panic: Cell::new(false),
526 }),
527 _not_send: PhantomData,
528 }
529 }
530
531 /// Enters the context of this `LocalSet`.
532 ///
533 /// The [`spawn_local`] method will spawn tasks on the `LocalSet` whose
534 /// context you are inside.
535 ///
536 /// [`spawn_local`]: fn@crate::task::spawn_local
537 pub fn enter(&self) -> LocalEnterGuard {
538 CURRENT.with(
539 |LocalData {
540 ctx,
541 wake_on_schedule,
542 ..
543 }| {
544 let ctx = ctx.replace(Some(self.context.clone()));
545 let wake_on_schedule = wake_on_schedule.replace(true);
546 LocalEnterGuard {
547 ctx,
548 wake_on_schedule,
549 }
550 },
551 )
552 }
553
554 /// Spawns a `!Send` task onto the local task set.
555 ///
556 /// This task is guaranteed to be run on the current thread.
557 ///
558 /// Unlike the free function [`spawn_local`], this method may be used to
559 /// spawn local tasks when the `LocalSet` is _not_ running. The provided
560 /// future will start running once the `LocalSet` is next started, even if
561 /// you don't await the returned `JoinHandle`.
562 ///
563 /// # Examples
564 ///
565 /// ```rust
566 /// use tokio::task;
567 ///
568 /// # #[tokio::main(flavor = "current_thread")]
569 /// # async fn main() {
570 /// let local = task::LocalSet::new();
571 ///
572 /// // Spawn a future on the local set. This future will be run when
573 /// // we call `run_until` to drive the task set.
574 /// local.spawn_local(async {
575 /// // ...
576 /// });
577 ///
578 /// // Run the local task set.
579 /// local.run_until(async move {
580 /// // ...
581 /// }).await;
582 ///
583 /// // When `run` finishes, we can spawn _more_ futures, which will
584 /// // run in subsequent calls to `run_until`.
585 /// local.spawn_local(async {
586 /// // ...
587 /// });
588 ///
589 /// local.run_until(async move {
590 /// // ...
591 /// }).await;
592 /// # }
593 /// ```
594 /// [`spawn_local`]: fn@spawn_local
595 #[track_caller]
596 pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
597 where
598 F: Future + 'static,
599 F::Output: 'static,
600 {
601 let fut_size = mem::size_of::<F>();
602 if fut_size > BOX_FUTURE_THRESHOLD {
603 self.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
604 } else {
605 self.spawn_named(future, SpawnMeta::new_unnamed(fut_size))
606 }
607 }
608
609 /// Runs a future to completion on the provided runtime, driving any local
610 /// futures spawned on this task set on the current thread.
611 ///
612 /// This runs the given future on the runtime, blocking until it is
613 /// complete, and yielding its resolved result. Any tasks or timers which
614 /// the future spawns internally will be executed on the runtime. The future
615 /// may also call [`spawn_local`] to `spawn_local` additional local futures on the
616 /// current thread.
617 ///
618 /// This method should not be called from an asynchronous context.
619 ///
620 /// # Panics
621 ///
622 /// This function panics if the executor is at capacity, if the provided
623 /// future panics, or if called within an asynchronous execution context.
624 ///
625 /// # Notes
626 ///
627 /// Since this function internally calls [`Runtime::block_on`], and drives
628 /// futures in the local task set inside that call to `block_on`, the local
629 /// futures may not use [in-place blocking]. If a blocking call needs to be
630 /// issued from a local task, the [`spawn_blocking`] API may be used instead.
631 ///
632 /// For example, this will panic:
633 /// ```should_panic,ignore-wasm
634 /// use tokio::runtime::Runtime;
635 /// use tokio::task;
636 ///
637 /// let rt = Runtime::new().unwrap();
638 /// let local = task::LocalSet::new();
639 /// local.block_on(&rt, async {
640 /// let join = task::spawn_local(async {
641 /// let blocking_result = task::block_in_place(|| {
642 /// // ...
643 /// });
644 /// // ...
645 /// });
646 /// join.await.unwrap();
647 /// })
648 /// ```
649 /// This, however, will not panic:
650 /// ```
651 /// # #[cfg(not(target_family = "wasm"))]
652 /// # {
653 /// use tokio::runtime::Runtime;
654 /// use tokio::task;
655 ///
656 /// let rt = Runtime::new().unwrap();
657 /// let local = task::LocalSet::new();
658 /// local.block_on(&rt, async {
659 /// let join = task::spawn_local(async {
660 /// let blocking_result = task::spawn_blocking(|| {
661 /// // ...
662 /// }).await;
663 /// // ...
664 /// });
665 /// join.await.unwrap();
666 /// })
667 /// # }
668 /// ```
669 ///
670 /// [`spawn_local`]: fn@spawn_local
671 /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
672 /// [in-place blocking]: fn@crate::task::block_in_place
673 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
674 #[track_caller]
675 #[cfg(feature = "rt")]
676 #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
677 pub fn block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output
678 where
679 F: Future,
680 {
681 rt.block_on(self.run_until(future))
682 }
683
684 /// Runs a future to completion on the local set, returning its output.
685 ///
686 /// This returns a future that runs the given future with a local set,
687 /// allowing it to call [`spawn_local`] to spawn additional `!Send` futures.
688 /// Any local futures spawned on the local set will be driven in the
689 /// background until the future passed to `run_until` completes. When the future
690 /// passed to `run_until` finishes, any local futures which have not completed
691 /// will remain on the local set, and will be driven on subsequent calls to
692 /// `run_until` or when [awaiting the local set] itself.
693 ///
694 /// # Cancel safety
695 ///
696 /// This method is cancel safe when `future` is cancel safe.
697 ///
698 /// # Examples
699 ///
700 /// ```rust
701 /// use tokio::task;
702 ///
703 /// # #[tokio::main(flavor = "current_thread")]
704 /// # async fn main() {
705 /// task::LocalSet::new().run_until(async {
706 /// task::spawn_local(async move {
707 /// // ...
708 /// }).await.unwrap();
709 /// // ...
710 /// }).await;
711 /// # }
712 /// ```
713 ///
714 /// [`spawn_local`]: fn@spawn_local
715 /// [awaiting the local set]: #awaiting-a-localset
716 pub async fn run_until<F>(&self, future: F) -> F::Output
717 where
718 F: Future,
719 {
720 let run_until = RunUntil {
721 future,
722 local_set: self,
723 };
724 run_until.await
725 }
726
727 #[track_caller]
728 pub(in crate::task) fn spawn_named<F>(
729 &self,
730 future: F,
731 meta: SpawnMeta<'_>,
732 ) -> JoinHandle<F::Output>
733 where
734 F: Future + 'static,
735 F::Output: 'static,
736 {
737 self.spawn_named_inner(future, meta)
738 }
739
740 #[track_caller]
741 fn spawn_named_inner<F>(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
742 where
743 F: Future + 'static,
744 F::Output: 'static,
745 {
746 let handle = self.context.spawn(future, meta);
747
748 // Because a task was spawned from *outside* the `LocalSet`, wake the
749 // `LocalSet` future to execute the new task, if it hasn't been woken.
750 //
751 // Spawning via the free fn `spawn` does not require this, as it can
752 // only be called from *within* a future executing on the `LocalSet` —
753 // in that case, the `LocalSet` must already be awake.
754 self.context.shared.waker.wake();
755 handle
756 }
757
758 /// Ticks the scheduler, returning whether the local future needs to be
759 /// notified again.
760 fn tick(&self) -> bool {
761 for _ in 0..MAX_TASKS_PER_TICK {
762 // Make sure we didn't hit an unhandled panic
763 assert!(!self.context.unhandled_panic.get(), "a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic");
764
765 match self.next_task() {
766 // Run the task
767 //
768 // Safety: As spawned tasks are `!Send`, `run_unchecked` must be
769 // used. We are responsible for maintaining the invariant that
770 // `run_unchecked` is only called on threads that spawned the
771 // task initially. Because `LocalSet` itself is `!Send`, and
772 // `spawn_local` spawns into the `LocalSet` on the current
773 // thread, the invariant is maintained.
774 Some(task) => crate::task::coop::budget(|| task.run()),
775 // We have fully drained the queue of notified tasks, so the
776 // local future doesn't need to be notified again — it can wait
777 // until something else wakes a task in the local set.
778 None => return false,
779 }
780 }
781
782 true
783 }
784
785 fn next_task(&self) -> Option<task::LocalNotified<Arc<Shared>>> {
786 let tick = self.tick.get();
787 self.tick.set(tick.wrapping_add(1));
788
789 let task = if tick % REMOTE_FIRST_INTERVAL == 0 {
790 self.context
791 .shared
792 .queue
793 .lock()
794 .as_mut()
795 .and_then(|queue| queue.pop_front())
796 .or_else(|| self.pop_local())
797 } else {
798 self.pop_local().or_else(|| {
799 self.context
800 .shared
801 .queue
802 .lock()
803 .as_mut()
804 .and_then(VecDeque::pop_front)
805 })
806 };
807
808 task.map(|task| unsafe {
809 // Safety: because the `LocalSet` itself is `!Send`, we know we are
810 // on the same thread if we have access to the `LocalSet`, and can
811 // therefore access the local run queue.
812 self.context.shared.local_state.assert_owner(task)
813 })
814 }
815
816 fn pop_local(&self) -> Option<task::Notified<Arc<Shared>>> {
817 unsafe {
818 // Safety: because the `LocalSet` itself is `!Send`, we know we are
819 // on the same thread if we have access to the `LocalSet`, and can
820 // therefore access the local run queue.
821 self.context.shared.local_state.task_pop_front()
822 }
823 }
824
825 fn with<T>(&self, f: impl FnOnce() -> T) -> T {
826 CURRENT.with(|local_data| {
827 let _guard = local_data.enter(self.context.clone());
828 f()
829 })
830 }
831
832 /// This method is like `with`, but it just calls `f` without setting the thread-local if that
833 /// fails.
834 fn with_if_possible<T>(&self, f: impl FnOnce() -> T) -> T {
835 let mut f = Some(f);
836
837 let res = CURRENT.try_with(|local_data| {
838 let _guard = local_data.enter(self.context.clone());
839 (f.take().unwrap())()
840 });
841
842 match res {
843 Ok(res) => res,
844 Err(_access_error) => (f.take().unwrap())(),
845 }
846 }
847}
848
849cfg_unstable! {
850 impl LocalSet {
851 /// Configure how the `LocalSet` responds to an unhandled panic on a
852 /// spawned task.
853 ///
854 /// By default, an unhandled panic (i.e. a panic not caught by
855 /// [`std::panic::catch_unwind`]) has no impact on the `LocalSet`'s
856 /// execution. The panic is error value is forwarded to the task's
857 /// [`JoinHandle`] and all other spawned tasks continue running.
858 ///
859 /// The `unhandled_panic` option enables configuring this behavior.
860 ///
861 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
862 /// spawned tasks have no impact on the `LocalSet`'s execution.
863 /// * `UnhandledPanic::ShutdownRuntime` will force the `LocalSet` to
864 /// shutdown immediately when a spawned task panics even if that
865 /// task's `JoinHandle` has not been dropped. All other spawned tasks
866 /// will immediately terminate and further calls to
867 /// [`LocalSet::block_on`] and [`LocalSet::run_until`] will panic.
868 ///
869 /// # Panics
870 ///
871 /// This method panics if called after the `LocalSet` has started
872 /// running.
873 ///
874 /// # Unstable
875 ///
876 /// This option is currently unstable and its implementation is
877 /// incomplete. The API may change or be removed in the future. See
878 /// tokio-rs/tokio#4516 for more details.
879 ///
880 /// # Examples
881 ///
882 /// The following demonstrates a `LocalSet` configured to shutdown on
883 /// panic. The first spawned task panics and results in the `LocalSet`
884 /// shutting down. The second spawned task never has a chance to
885 /// execute. The call to `run_until` will panic due to the runtime being
886 /// forcibly shutdown.
887 ///
888 /// ```should_panic
889 /// use tokio::runtime::UnhandledPanic;
890 ///
891 /// # #[tokio::main(flavor = "current_thread")]
892 /// # async fn main() {
893 /// tokio::task::LocalSet::new()
894 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
895 /// .run_until(async {
896 /// tokio::task::spawn_local(async { panic!("boom"); });
897 /// tokio::task::spawn_local(async {
898 /// // This task never completes
899 /// });
900 ///
901 /// // Do some work, but `run_until` will panic before it completes
902 /// # loop { tokio::task::yield_now().await; }
903 /// })
904 /// .await;
905 /// # }
906 /// ```
907 ///
908 /// [`JoinHandle`]: struct@crate::task::JoinHandle
909 pub fn unhandled_panic(&mut self, behavior: crate::runtime::UnhandledPanic) -> &mut Self {
910 // TODO: This should be set as a builder
911 Rc::get_mut(&mut self.context)
912 .and_then(|ctx| Arc::get_mut(&mut ctx.shared))
913 .expect("Unhandled Panic behavior modified after starting LocalSet")
914 .unhandled_panic = behavior;
915 self
916 }
917
918 /// Returns the [`Id`] of the current `LocalSet` runtime.
919 ///
920 /// # Examples
921 ///
922 /// ```rust
923 /// use tokio::task;
924 ///
925 /// # #[tokio::main(flavor = "current_thread")]
926 /// # async fn main() {
927 /// let local_set = task::LocalSet::new();
928 /// println!("Local set id: {}", local_set.id());
929 /// # }
930 /// ```
931 ///
932 /// **Note**: This is an [unstable API][unstable]. The public API of this type
933 /// may break in 1.x releases. See [the documentation on unstable
934 /// features][unstable] for details.
935 ///
936 /// [unstable]: crate#unstable-features
937 /// [`Id`]: struct@crate::runtime::Id
938 pub fn id(&self) -> runtime::Id {
939 self.context.shared.local_state.owned.id.into()
940 }
941 }
942}
943
944impl fmt::Debug for LocalSet {
945 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
946 fmt.debug_struct("LocalSet").finish()
947 }
948}
949
950impl Future for LocalSet {
951 type Output = ();
952
953 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
954 let _no_blocking = crate::runtime::context::disallow_block_in_place();
955
956 // Register the waker before starting to work
957 self.context.shared.waker.register_by_ref(cx.waker());
958
959 if self.with(|| self.tick()) {
960 // If `tick` returns true, we need to notify the local future again:
961 // there are still tasks remaining in the run queue.
962 cx.waker().wake_by_ref();
963 Poll::Pending
964
965 // Safety: called from the thread that owns `LocalSet`. Because
966 // `LocalSet` is `!Send`, this is safe.
967 } else if unsafe { self.context.shared.local_state.owned_is_empty() } {
968 // If the scheduler has no remaining futures, we're done!
969 Poll::Ready(())
970 } else {
971 // There are still futures in the local set, but we've polled all the
972 // futures in the run queue. Therefore, we can just return Pending
973 // since the remaining futures will be woken from somewhere else.
974 Poll::Pending
975 }
976 }
977}
978
979impl Default for LocalSet {
980 fn default() -> LocalSet {
981 LocalSet::new()
982 }
983}
984
985impl Drop for LocalSet {
986 fn drop(&mut self) {
987 self.with_if_possible(|| {
988 let _no_blocking = crate::runtime::context::disallow_block_in_place();
989
990 // Shut down all tasks in the LocalOwnedTasks and close it to
991 // prevent new tasks from ever being added.
992 unsafe {
993 // Safety: called from the thread that owns `LocalSet`
994 self.context.shared.local_state.close_and_shutdown_all();
995 }
996
997 // We already called shutdown on all tasks above, so there is no
998 // need to call shutdown.
999
1000 // Safety: note that this *intentionally* bypasses the unsafe
1001 // `Shared::local_queue()` method. This is in order to avoid the
1002 // debug assertion that we are on the thread that owns the
1003 // `LocalSet`, because on some systems (e.g. at least some macOS
1004 // versions), attempting to get the current thread ID can panic due
1005 // to the thread's local data that stores the thread ID being
1006 // dropped *before* the `LocalSet`.
1007 //
1008 // Despite avoiding the assertion here, it is safe for us to access
1009 // the local queue in `Drop`, because the `LocalSet` itself is
1010 // `!Send`, so we can reasonably guarantee that it will not be
1011 // `Drop`ped from another thread.
1012 let local_queue = unsafe {
1013 // Safety: called from the thread that owns `LocalSet`
1014 self.context.shared.local_state.take_local_queue()
1015 };
1016 for task in local_queue {
1017 drop(task);
1018 }
1019
1020 // Take the queue from the Shared object to prevent pushing
1021 // notifications to it in the future.
1022 let queue = self.context.shared.queue.lock().take().unwrap();
1023 for task in queue {
1024 drop(task);
1025 }
1026
1027 // Safety: called from the thread that owns `LocalSet`
1028 assert!(unsafe { self.context.shared.local_state.owned_is_empty() });
1029 });
1030 }
1031}
1032
1033// === impl Context ===
1034
1035impl Context {
1036 #[track_caller]
1037 fn spawn<F>(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
1038 where
1039 F: Future + 'static,
1040 F::Output: 'static,
1041 {
1042 let id = crate::runtime::task::Id::next();
1043 let future = crate::util::trace::task(future, "local", meta, id.as_u64());
1044
1045 // Safety: called from the thread that owns the `LocalSet`
1046 let (handle, notified) = {
1047 self.shared.local_state.assert_called_from_owner_thread();
1048 self.shared.local_state.owned.bind(
1049 future,
1050 self.shared.clone(),
1051 id,
1052 SpawnLocation::capture(),
1053 )
1054 };
1055
1056 if let Some(notified) = notified {
1057 self.shared.schedule(notified);
1058 }
1059
1060 handle
1061 }
1062}
1063
1064// === impl LocalFuture ===
1065
1066impl<T: Future> Future for RunUntil<'_, T> {
1067 type Output = T::Output;
1068
1069 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1070 let me = self.project();
1071
1072 me.local_set.with(|| {
1073 me.local_set
1074 .context
1075 .shared
1076 .waker
1077 .register_by_ref(cx.waker());
1078
1079 let _no_blocking = crate::runtime::context::disallow_block_in_place();
1080 let f = me.future;
1081
1082 if let Poll::Ready(output) = f.poll(cx) {
1083 return Poll::Ready(output);
1084 }
1085
1086 if me.local_set.tick() {
1087 // If `tick` returns `true`, we need to notify the local future again:
1088 // there are still tasks remaining in the run queue.
1089 cx.waker().wake_by_ref();
1090 }
1091
1092 Poll::Pending
1093 })
1094 }
1095}
1096
1097impl Shared {
1098 /// Schedule the provided task on the scheduler.
1099 fn schedule(&self, task: task::Notified<Arc<Self>>) {
1100 CURRENT.with(|localdata| {
1101 match localdata.ctx.get() {
1102 // If the current `LocalSet` is being polled, we don't need to wake it.
1103 // When we `enter` it, then the value `wake_on_schedule` is set to be true.
1104 // In this case it is not being polled, so we need to wake it.
1105 Some(cx) if cx.shared.ptr_eq(self) && !localdata.wake_on_schedule.get() => unsafe {
1106 // Safety: if the current `LocalSet` context points to this
1107 // `LocalSet`, then we are on the thread that owns it.
1108 cx.shared.local_state.task_push_back(task);
1109 },
1110
1111 // We are on the thread that owns the `LocalSet`, so we can
1112 // wake to the local queue.
1113 _ if context::thread_id().ok() == Some(self.local_state.owner) => {
1114 unsafe {
1115 // Safety: we just checked that the thread ID matches
1116 // the localset's owner, so this is safe.
1117 self.local_state.task_push_back(task);
1118 }
1119 // We still have to wake the `LocalSet`, because it isn't
1120 // currently being polled.
1121 self.waker.wake();
1122 }
1123
1124 // We are *not* on the thread that owns the `LocalSet`, so we
1125 // have to wake to the remote queue.
1126 _ => {
1127 // First, check whether the queue is still there (if not, the
1128 // LocalSet is dropped). Then push to it if so, and if not,
1129 // do nothing.
1130 let mut lock = self.queue.lock();
1131
1132 if let Some(queue) = lock.as_mut() {
1133 queue.push_back(task);
1134 drop(lock);
1135 self.waker.wake();
1136 }
1137 }
1138 }
1139 });
1140 }
1141
1142 fn ptr_eq(&self, other: &Shared) -> bool {
1143 std::ptr::eq(self, other)
1144 }
1145}
1146
1147// This is safe because (and only because) we *pinky pwomise* to never touch the
1148// local run queue except from the thread that owns the `LocalSet`.
1149unsafe impl Sync for Shared {}
1150
1151impl task::Schedule for Arc<Shared> {
1152 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
1153 // Safety, this is always called from the thread that owns `LocalSet`
1154 unsafe { self.local_state.task_remove(task) }
1155 }
1156
1157 fn schedule(&self, task: task::Notified<Self>) {
1158 Shared::schedule(self, task);
1159 }
1160
1161 // localset does not currently support task hooks
1162 fn hooks(&self) -> TaskHarnessScheduleHooks {
1163 TaskHarnessScheduleHooks {
1164 task_terminate_callback: None,
1165 }
1166 }
1167
1168 cfg_unstable! {
1169 fn unhandled_panic(&self) {
1170 use crate::runtime::UnhandledPanic;
1171
1172 match self.unhandled_panic {
1173 UnhandledPanic::Ignore => {
1174 // Do nothing
1175 }
1176 UnhandledPanic::ShutdownRuntime => {
1177 // This hook is only called from within the runtime, so
1178 // `CURRENT` should match with `&self`, i.e. there is no
1179 // opportunity for a nested scheduler to be called.
1180 CURRENT.with(|LocalData { ctx, .. }| match ctx.get() {
1181 Some(cx) if Arc::ptr_eq(self, &cx.shared) => {
1182 cx.unhandled_panic.set(true);
1183 // Safety: this is always called from the thread that owns `LocalSet`
1184 unsafe { cx.shared.local_state.close_and_shutdown_all(); }
1185 }
1186 _ => unreachable!("runtime core not set in CURRENT thread-local"),
1187 })
1188 }
1189 }
1190 }
1191 }
1192}
1193
1194impl LocalState {
1195 unsafe fn task_pop_front(&self) -> Option<task::Notified<Arc<Shared>>> {
1196 // The caller ensures it is called from the same thread that owns
1197 // the LocalSet.
1198 self.assert_called_from_owner_thread();
1199
1200 self.local_queue.with_mut(|ptr| (*ptr).pop_front())
1201 }
1202
1203 unsafe fn task_push_back(&self, task: task::Notified<Arc<Shared>>) {
1204 // The caller ensures it is called from the same thread that owns
1205 // the LocalSet.
1206 self.assert_called_from_owner_thread();
1207
1208 self.local_queue.with_mut(|ptr| (*ptr).push_back(task));
1209 }
1210
1211 unsafe fn take_local_queue(&self) -> VecDeque<task::Notified<Arc<Shared>>> {
1212 // The caller ensures it is called from the same thread that owns
1213 // the LocalSet.
1214 self.assert_called_from_owner_thread();
1215
1216 self.local_queue.with_mut(|ptr| std::mem::take(&mut (*ptr)))
1217 }
1218
1219 unsafe fn task_remove(&self, task: &Task<Arc<Shared>>) -> Option<Task<Arc<Shared>>> {
1220 // The caller ensures it is called from the same thread that owns
1221 // the LocalSet.
1222 self.assert_called_from_owner_thread();
1223
1224 self.owned.remove(task)
1225 }
1226
1227 /// Returns true if the `LocalSet` does not have any spawned tasks
1228 unsafe fn owned_is_empty(&self) -> bool {
1229 // The caller ensures it is called from the same thread that owns
1230 // the LocalSet.
1231 self.assert_called_from_owner_thread();
1232
1233 self.owned.is_empty()
1234 }
1235
1236 unsafe fn assert_owner(
1237 &self,
1238 task: task::Notified<Arc<Shared>>,
1239 ) -> task::LocalNotified<Arc<Shared>> {
1240 // The caller ensures it is called from the same thread that owns
1241 // the LocalSet.
1242 self.assert_called_from_owner_thread();
1243
1244 self.owned.assert_owner(task)
1245 }
1246
1247 unsafe fn close_and_shutdown_all(&self) {
1248 // The caller ensures it is called from the same thread that owns
1249 // the LocalSet.
1250 self.assert_called_from_owner_thread();
1251
1252 self.owned.close_and_shutdown_all();
1253 }
1254
1255 #[track_caller]
1256 fn assert_called_from_owner_thread(&self) {
1257 // FreeBSD has some weirdness around thread-local destruction.
1258 // TODO: remove this hack when thread id is cleaned up
1259 #[cfg(not(any(target_os = "openbsd", target_os = "freebsd")))]
1260 debug_assert!(
1261 // if we couldn't get the thread ID because we're dropping the local
1262 // data, skip the assertion --- the `Drop` impl is not going to be
1263 // called from another thread, because `LocalSet` is `!Send`
1264 context::thread_id()
1265 .map(|id| id == self.owner)
1266 .unwrap_or(true),
1267 "`LocalSet`'s local run queue must not be accessed by another thread!"
1268 );
1269 }
1270}
1271
1272// This is `Send` because it is stored in `Shared`. It is up to the caller to
1273// ensure they are on the same thread that owns the `LocalSet`.
1274unsafe impl Send for LocalState {}
1275
1276#[cfg(all(test, not(loom)))]
1277mod tests {
1278 use super::*;
1279
1280 // Does a `LocalSet` running on a current-thread runtime...basically work?
1281 //
1282 // This duplicates a test in `tests/task_local_set.rs`, but because this is
1283 // a lib test, it will run under Miri, so this is necessary to catch stacked
1284 // borrows violations in the `LocalSet` implementation.
1285 #[test]
1286 fn local_current_thread_scheduler() {
1287 let f = async {
1288 LocalSet::new()
1289 .run_until(async {
1290 spawn_local(async {}).await.unwrap();
1291 })
1292 .await;
1293 };
1294 crate::runtime::Builder::new_current_thread()
1295 .build()
1296 .expect("rt")
1297 .block_on(f)
1298 }
1299
1300 // Tests that when a task on a `LocalSet` is woken by an io driver on the
1301 // same thread, the task is woken to the localset's local queue rather than
1302 // its remote queue.
1303 //
1304 // This test has to be defined in the `local.rs` file as a lib test, rather
1305 // than in `tests/`, because it makes assertions about the local set's
1306 // internal state.
1307 #[test]
1308 fn wakes_to_local_queue() {
1309 use super::*;
1310 use crate::sync::Notify;
1311 let rt = crate::runtime::Builder::new_current_thread()
1312 .build()
1313 .expect("rt");
1314 rt.block_on(async {
1315 let local = LocalSet::new();
1316 let notify = Arc::new(Notify::new());
1317 let task = local.spawn_local({
1318 let notify = notify.clone();
1319 async move {
1320 notify.notified().await;
1321 }
1322 });
1323 let mut run_until = Box::pin(local.run_until(async move {
1324 task.await.unwrap();
1325 }));
1326
1327 // poll the run until future once
1328 std::future::poll_fn(|cx| {
1329 let _ = run_until.as_mut().poll(cx);
1330 Poll::Ready(())
1331 })
1332 .await;
1333
1334 notify.notify_one();
1335 let task = unsafe { local.context.shared.local_state.task_pop_front() };
1336 // TODO(eliza): it would be nice to be able to assert that this is
1337 // the local task.
1338 assert!(
1339 task.is_some(),
1340 "task should have been notified to the LocalSet's local queue"
1341 );
1342 })
1343 }
1344}