
/// The main loop for the "async-io" thread.
fn main_loop(parker: parking::Parker) {
let span = tracing::trace_span!("async_io::main_loop");
let _enter = span.enter();
// The last observed reactor tick.
let mut last_tick = 0;
// Number of sleeps since this thread has called `react()`.
let mut sleeps = 0u64;
loop {
let tick = Reactor::get().ticker();
if last_tick == tick {
let reactor_lock = if sleeps >= 10 {
// If no new ticks have occurred for a while, stop sleeping and spinning in
// this loop and just block on the reactor lock.
Some(Reactor::get().lock())
} else {
Reactor::get().try_lock()
};
if let Some(mut reactor_lock) = reactor_lock {
tracing::trace!("waiting on I/O");
reactor_lock.react(None).ok();
last_tick = Reactor::get().ticker();
sleeps = 0;
}
} else {
last_tick = tick;
}
if BLOCK_ON_COUNT.load(Ordering::SeqCst) > 0 {
// Exponential backoff from 50us to 10ms.
let delay_us = [50, 75, 100, 250, 500, 750, 1000, 2500, 5000]
.get(sleeps as usize)
.unwrap_or(&10_000);
tracing::trace!("sleeping for {} us", delay_us);
if parker.park_timeout(Duration::from_micros(*delay_us)) {
tracing::trace!("notified");
// If notified before timeout, reset the last tick and the sleep counter.
last_tick = Reactor::get().ticker();
sleeps = 0;
} else {
sleeps += 1;
}
}
}
}
核心的block_on函数