Fix research tool UI: remove duplicate header, add footer spacing, remove spinner, widen command display

- Remove duplicate tool header (lib.rs already prints it)
- Add newline before timing footer for visual separation
- Remove spinner animation (incompatible with update_tool_output_line)
- Change shell command format to " > `cmd` ..." with 60 char width
This commit is contained in:
Dhanji R. Prasanna
2026-01-10 15:20:40 +11:00
parent 0aa1287ca6
commit 68c9135913
22 changed files with 20843 additions and 39 deletions

View File

@@ -0,0 +1,590 @@
<html lang="en-us" dir="ltr"><head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="
Async Rust with Tokio I/O Streams: Backpressure, Concurrency, and Ergonomics
#
Last updated: Oct 2025
Contents
1 Async Rust with Tokio IO Streams: Backpressure, Concurrency, and Ergonomics
Backpressure
Cancellation
2. I/O loop
Backpressure propagation
Concurrency
3. Tokio I/O Patterns
TCP split stream
Split generic AsyncRead+AsyncWrite stream
Bidirectional driver for I/O without split
Framed I/O
Bidirectional driver for framed I/O
There are many excellent, straightforward guides for getting started with Async Rust and Tokio. Most focus on the core building blocks: Tokio primitives, Rust futures, and concepts such as Pin/Unpin, and often finish with a simple TCP client-server sample. In fact, the typical tutorial example can usually be reduced to something as simple as:">
<meta name="theme-color" media="(prefers-color-scheme: light)" content="#ffffff">
<meta name="theme-color" media="(prefers-color-scheme: dark)" content="#343a40">
<meta name="color-scheme" content="light dark"><meta property="og:url" content="https://biriukov.dev/docs/async-rust-tokio-io/1-async-rust-with-tokio-io-streams-backpressure-concurrency-and-ergonomics/">
<meta property="og:site_name" content="Viacheslav Biriukov">
<meta property="og:title" content="Async Rust with Tokio I/O Streams: Backpressure, Concurrency, and Ergonomics">
<meta property="og:description" content="Async Rust with Tokio I/O Streams: Backpressure, Concurrency, and Ergonomics # Last updated: Oct 2025 Contents
1 Async Rust with Tokio IO Streams: Backpressure, Concurrency, and Ergonomics Backpressure Cancellation 2. I/O loop Backpressure propagation Concurrency 3. Tokio I/O Patterns TCP split stream Split generic AsyncRead+AsyncWrite stream Bidirectional driver for I/O without split Framed I/O Bidirectional driver for framed I/O There are many excellent, straightforward guides for getting started with Async Rust and Tokio. Most focus on the core building blocks: Tokio primitives, Rust futures, and concepts such as Pin/Unpin, and often finish with a simple TCP client-server sample. In fact, the typical tutorial example can usually be reduced to something as simple as:">
<meta property="og:locale" content="en_us">
<meta property="og:type" content="article">
<meta property="article:section" content="docs">
<title>Async Rust with Tokio I/O Streams: Backpressure, Concurrency, and Ergonomics | Viacheslav Biriukov</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png">
<link rel="stylesheet" href="/book.min.1e13c2d8521416f2409b2e0e47b515c4ee90f2718cddd31ea96d2f739bc9d7e1.css" integrity="sha256-HhPC2FIUFvJAmy4OR7UVxO6Q8nGM3dMeqW0vc5vJ1+E=" crossorigin="anonymous">
<script defer="" src="/flexsearch.min.js"></script>
<script defer="" src="/en.search.min.dcb69b7f72cf406432b553b1dca142c9cc7ac0f7ea624f296af7cfa987b92fe5.js" integrity="sha256-3Labf3LPQGQytVOx3KFCycx6wPfqYk8pavfPqYe5L+U=" crossorigin="anonymous"></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<script async="" src="https://www.googletagmanager.com/gtag/js?id=G-599VSLESJL"></script>
<script>
window.dataLayer = window.dataLayer || [];
function gtag() {
dataLayer.push(arguments);
}
gtag("js", new Date());
gtag("config", "G-599VSLESJL");
</script>
<link rel="stylesheet" href="/my_css/cookie.css">
<link rel="stylesheet" href="/my_css/copy-code.css">
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap-icons@1.5.0/font/bootstrap-icons.css">
</head>
<body dir="ltr">
<input type="checkbox" class="hidden toggle" id="menu-control">
<input type="checkbox" class="hidden toggle" id="toc-control">
<main class="container flex">
<aside class="book-menu">
<div class="book-menu-content">
<nav>
<h2 class="book-brand">
<a class="flex align-center" href="/"><span>Viacheslav Biriukov</span>
</a>
</h2>
<div class="book-search">
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
<ul>
<li class="book-section-flat">
<span>Async Rust &amp; Tokio I/O Streams</span>
<ul>
<li>
<a href="/docs/async-rust-tokio-io/1-async-rust-with-tokio-io-streams-backpressure-concurrency-and-ergonomics/" class="active">
1. Backpressure and Concurrency
</a>
</li>
<li>
<a href="/docs/async-rust-tokio-io/2-io-loop/" class="">
2. I/O loop
</a>
</li>
<li>
<a href="/docs/async-rust-tokio-io/3-tokio-io-patterns/" class="">
3. Tokio I/O patterns
</a>
</li>
</ul>
</li>
</ul>
<div style="margin-top: 30px; margin-bottom: 30px">
<b>More recent series:</b>
<ul>
<li><a href="/rust-tokio-io/">1. Async Rust &amp; Tokio I/O Streams: Backpressure, Concurrency, and Ergonomics&nbsp;<span style="padding:0 2px;border-radius:2px ;background-color:#e84118;color: aliceblue;">new</span></a></li><a href="/rust-tokio-io/">
</a><li><a href="/rust-tokio-io/"></a><a href="/docs/resolver-dual-stack-application/0-sre-should-know-about-gnu-linux-resolvers-and-dual-stack-applications/">2. Resolvers and Dual-Stack applications</a></li><a href="/docs/resolver-dual-stack-application/0-sre-should-know-about-gnu-linux-resolvers-and-dual-stack-applications/">
</a><li><a href="/docs/resolver-dual-stack-application/0-sre-should-know-about-gnu-linux-resolvers-and-dual-stack-applications/"></a><a href="/docs/page-cache/0-linux-page-cache-for-sre/">3. Linux Page Cache mini book</a></li><a href="/docs/page-cache/0-linux-page-cache-for-sre/">
</a><li><a href="/docs/page-cache/0-linux-page-cache-for-sre/"></a><a href="/docs/fd-pipe-session-terminal/0-sre-should-know-about-gnu-linux-shell-related-internals-file-descriptors-pipes-terminals-user-sessions-process-groups-and-daemons">4. File descriptors, pipes, terminals, user sessions, process groups and daemons</a></li><a href="/docs/fd-pipe-session-terminal/0-sre-should-know-about-gnu-linux-shell-related-internals-file-descriptors-pipes-terminals-user-sessions-process-groups-and-daemons">
</a></ul><a href="/docs/fd-pipe-session-terminal/0-sre-should-know-about-gnu-linux-shell-related-internals-file-descriptors-pipes-terminals-user-sessions-process-groups-and-daemons">
</a></div><a href="/docs/fd-pipe-session-terminal/0-sre-should-know-about-gnu-linux-shell-related-internals-file-descriptors-pipes-terminals-user-sessions-process-groups-and-daemons">
</a><div style="margin-top: 30px; margin-bottom: 30px"><a href="/docs/fd-pipe-session-terminal/0-sre-should-know-about-gnu-linux-shell-related-internals-file-descriptors-pipes-terminals-user-sessions-process-groups-and-daemons">
<b>Open Source Projects</b>
</a><ul><a href="/docs/fd-pipe-session-terminal/0-sre-should-know-about-gnu-linux-shell-related-internals-file-descriptors-pipes-terminals-user-sessions-process-groups-and-daemons">
</a><li><a href="/docs/fd-pipe-session-terminal/0-sre-should-know-about-gnu-linux-shell-related-internals-file-descriptors-pipes-terminals-user-sessions-process-groups-and-daemons">
</a><a href="/posts/trixter-chaos-proxy/"> • trixter chaos proxy</a>
</li>
<li><a href="https://crates.io/crates/tokio-netem"> • tokio-netem</a></li>
</ul>
</div>
<ul>
<li>
<a href="https://twitter.com/brk0v/" target="_blank" rel="noopener"><i class="bi bi-twitter"></i>
Twitter
</a>
</li>
<li>
<a href="https://www.linkedin.com/in/biriukov/" target="_blank" rel="noopener"><i class="bi bi-linkedin"></i>
Linkedin
</a>
</li>
<li>
<a href="https://github.com/brk0v/" target="_blank" rel="noopener"><i class="bi bi-github"></i>
Github
</a>
</li>
</ul>
<div style="margin-top: 30px;">
<p xmlns:cc="http://creativecommons.org/ns#">
This content is licensed under
<a href="http://creativecommons.org/licenses/by-nc/4.0/?ref=chooser-v1" target="_blank" rel="license noopener noreferrer" style="display:inline-block; ">CC BY-NC 4.0<img style="height:22px!important;margin-left:3px;vertical-align:text-bottom;" src="https://mirrors.creativecommons.org/presskit/icons/cc.svg?ref=chooser-v1"><img style="height:22px!important;margin-left:3px;vertical-align:text-bottom;" src="https://mirrors.creativecommons.org/presskit/icons/by.svg?ref=chooser-v1"><img style="height:22px!important;margin-left:3px;vertical-align:text-bottom;" src="https://mirrors.creativecommons.org/presskit/icons/nc.svg?ref=chooser-v1">
</a>
</p>
</div>
</nav>
<script>(function(){var e=document.querySelector("aside .book-menu-content");addEventListener("beforeunload",function(){localStorage.setItem("menu.scrollTop",e.scrollTop)}),e.scrollTop=localStorage.getItem("menu.scrollTop")})()</script>
</div>
</aside>
<div class="book-page">
<header class="book-header">
<div class="flex align-center justify-between">
<label for="menu-control">
<img src="/svg/menu.svg" class="book-icon" alt="Menu">
</label>
<strong>Async Rust with Tokio I/O Streams: Backpressure, Concurrency, and Ergonomics</strong>
<label for="toc-control">
</label>
</div>
</header>
<article class="markdown book-article"><h1 id="async-rust-with-tokio-io-streams-backpressure-concurrency-and-ergonomics">
Async Rust with Tokio I/O Streams: Backpressure, Concurrency, and Ergonomics
<a class="anchor" href="#async-rust-with-tokio-io-streams-backpressure-concurrency-and-ergonomics">#</a>
</h1>
<p class="updated-right">
<i>
<time datetime="2025-10">Last updated: Oct 2025</time>
</i>
</p>
<p><strong>Contents</strong></p>
<ul>
<li><a href="/rust-tokio-io/">1 Async Rust with Tokio IO Streams: Backpressure, Concurrency, and Ergonomics</a>
<ul>
<li><a href="/docs/async-rust-tokio-io/1-async-rust-with-tokio-io-streams-backpressure-concurrency-and-ergonomics/#backpressure">Backpressure</a></li>
<li><a href="/docs/async-rust-tokio-io/1-async-rust-with-tokio-io-streams-backpressure-concurrency-and-ergonomics/#cancellation">Cancellation</a></li>
</ul>
</li>
<li><a href="/docs/async-rust-tokio-io/2-io-loop/">2. I/O loop</a>
<ul>
<li><a href="/docs/async-rust-tokio-io/2-io-loop/#backpressure-propagation">Backpressure propagation</a></li>
<li><a href="/docs/async-rust-tokio-io/2-io-loop/#concurrency">Concurrency</a></li>
</ul>
</li>
<li><a href="/docs/async-rust-tokio-io/3-tokio-io-patterns/">3. Tokio I/O Patterns</a>
<ul>
<li><a href="/docs/async-rust-tokio-io/3-tokio-io-patterns/#tcp-split-stream">TCP split stream</a></li>
<li><a href="/docs/async-rust-tokio-io/3-tokio-io-patterns/#split-generic-asyncreadasyncwrite-stream">Split generic <code>AsyncRead+AsyncWrite</code> stream</a></li>
<li><a href="/docs/async-rust-tokio-io/3-tokio-io-patterns/#bidirectional-driver-for-io-without-split">Bidirectional driver for I/O without split</a></li>
<li><a href="/docs/async-rust-tokio-io/3-tokio-io-patterns/#framed-io">Framed I/O</a></li>
<li><a href="/docs/async-rust-tokio-io/3-tokio-io-patterns/#35-bidirectional-driver-for-framed-io">Bidirectional driver for framed I/O</a></li>
</ul>
</li>
</ul>
<hr>
<p>There are many excellent, straightforward guides for getting started with Async Rust and Tokio. Most focus on the core building blocks: Tokio primitives, Rust futures, and concepts such as <code>Pin</code>/<code>Unpin</code>, and often finish with a simple TCP client-server sample. In fact, the typical tutorial example can usually be reduced to something as simple as:</p>
<div class="highlight"><pre tabindex="0" style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code class="language-rust" data-lang="rust"><span style="display:flex;"><span><span style="color:#66d9ef">loop</span> {
</span></span><span style="display:flex;"><span> tokio::<span style="color:#a6e22e">select!</span> {
</span></span><span style="display:flex;"><span> Some(msg) <span style="color:#f92672">=</span> rx.recv() <span style="color:#f92672">=&gt;</span> {
</span></span><span style="display:flex;"><span> stream.write_all(<span style="color:#f92672">&amp;</span>msg).<span style="color:#66d9ef">await</span><span style="color:#f92672">?</span>;
</span></span><span style="display:flex;"><span> stream.flush().<span style="color:#66d9ef">await</span><span style="color:#f92672">?</span>;
</span></span><span style="display:flex;"><span> }
</span></span><span style="display:flex;"><span>
</span></span><span style="display:flex;"><span> res <span style="color:#f92672">=</span> stream.read(<span style="color:#f92672">&amp;</span><span style="color:#66d9ef">mut</span> read_buf) <span style="color:#f92672">=&gt;</span> {
</span></span><span style="display:flex;"><span> <span style="color:#66d9ef">let</span> n <span style="color:#f92672">=</span> res<span style="color:#f92672">?</span>;
</span></span><span style="display:flex;"><span> <span style="color:#66d9ef">if</span> n <span style="color:#f92672">==</span> <span style="color:#ae81ff">0</span> {
</span></span><span style="display:flex;"><span> <span style="color:#75715e">// EOF - server closed connection
</span></span></span><span style="display:flex;"><span><span style="color:#75715e"></span> <span style="color:#a6e22e">eprintln!</span>(<span style="color:#e6db74">"Server closed the connection."</span>);
</span></span><span style="display:flex;"><span> <span style="color:#66d9ef">break</span>; <span style="color:#75715e">//exit
</span></span></span><span style="display:flex;"><span><span style="color:#75715e"></span> }
</span></span><span style="display:flex;"><span> <span style="color:#a6e22e">print!</span>(<span style="color:#e6db74">"</span><span style="color:#e6db74">{}</span><span style="color:#e6db74">"</span>, String::from_utf8_lossy(<span style="color:#f92672">&amp;</span>read_buf[<span style="color:#f92672">..</span>n]));
</span></span><span style="display:flex;"><span> }
</span></span><span style="display:flex;"><span>
</span></span><span style="display:flex;"><span> <span style="color:#66d9ef">else</span> <span style="color:#f92672">=&gt;</span> <span style="color:#66d9ef">break</span>
</span></span><span style="display:flex;"><span> }
</span></span><span style="display:flex;"><span>}
</span></span></code><button type="button" class="copy-button" aria-label="Copy code to clipboard" title="Copy">Copy</button></pre></div><p>The code above is <strong>easy to read, straightforward, and idiomatic</strong>.</p>
<p>It uses a Tokio <a href="https://docs.rs/tokio/latest/tokio/sync/mpsc/index.html" target="_blank" rel="noopener"><code>mpsc</code></a> receiver <code>rx</code> whose incoming messages are written to the I/O stream (the remote peer in TCP terms).
On the other branch of the <code>select!</code>, it reads data into <code>read_buf</code> until reaching the end of file (EOF).
Im sure youve seen many examples that follow exactly this pattern.</p>
<p>But if we think not only about the functionality of the above code, but also about the mechanics and design behind it, some crucial details emerge.
With the loop-select pattern we have concurrent reads and writes.
This means the Tokio runtime scheduler multiplexes them and runs <strong>sequentially</strong> one after another, and never truly in parallel on two OS threads or CPU cores (assuming the default multithreaded runtime with <code>#[tokio::main]</code>).
All branches of the <code>select!</code> are awaited for readiness and then run one by one in a pseudo-random order, unless the <code>biased;</code> argument is set, in which case they run in the provided order.
In our example, we have two branches:</p>
<ul>
<li>read from <code>rx</code> channel;</li>
<li>read from the I/O stream.</li>
</ul>
<p>Async Rust implicitly generates a state machine to make the polling of futures easier. This allows <code>select!</code> to run its branches concurrently. But when we fall inside the first branch and explicitly await inside it, we ask Rust to move the tasks state machine into a state where the only possible wake signal for the task is a wake notification from the <code>write_all()</code> future (or <code>flush()</code> later). The code logic doesnt allow us to await on <code>write_all()</code>/<code>flush()</code> and <code>read()</code> (from the second branch of the <code>select!</code>) at the same time .</p>
<p>Code written in this way naturally introduces <strong>backpressure</strong>. This side effect deserves careful attention and a clear understanding.</p>
<h2 id="backpressure">
Backpressure
<a class="anchor" href="#backpressure">#</a>
</h2>
<img style="float:right;" alt="This is fine" src="../images/ok.png" width="40%" class="img-center">
<p>If a <code>write()</code> or <code>flush()</code> call becomes blocked for any reason, the entire loop-select is effectively blocked as well, meaning no reads will occur.
This can lead to <strong>read starvation</strong> and, if the remote peer continues writing, to an inflated receive socket buffer and eventual <strong>backpressure</strong> on the remote peer side.</p>
<p>Such backpressure can actually be useful when the code should avoid reading more data if it cant send a timely response.
In client-server communication, this makes sense: your server might not want to start processing work, or buffer it in memory, if the client isnt ready to read the response to its request.</p>
<p>But as usual, theres no one-size-fits-all solution, it isnt always the desired behavior. For example, if your code acts as a proxy or bridge, youll likely still want to continue draining the reader buffer, because, for example, the decision logic lives outside your module.</p>
<p>Another use case is when <strong>producing data doesnt fully depend on what you read</strong>.
This often applies on the client side: a client may want to receive the response to a previously sent request as quickly as possible while concurrently sending a new, unrelated one.</p>
<p>More examples are protocols where <strong>reads unblock writes</strong>, where reads are crucial and should always be handled. For instance, TCP itself is such a protocol. TCP flow control sends window size updates with ACKs to notify the remote peer to continue sending and/or increase the amount of in-flight data. HTTP/2 also has a <code>WINDOW_UPDATE</code> frame used for the same purpose the consumer controls the producers pace.</p>
<h3 id="experiment">
Experiment
<a class="anchor" href="#experiment">#</a>
</h3>
<p>Lets run a small experiment to reproduce TCP backpressure and explore a few related GNU/Linux tools.</p>
<p>We can write a TCP server that intentionally blocks its read calls. Also in order to trigger backpressure earlier, server sets its socket receive buffer to the minimum allowed value with <code>setsockopt</code> and <code>SO_RCVBUF</code> (the OS rounds it up to a small default value):</p>
<div class="highlight"><pre tabindex="0" style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code class="language-rust" data-lang="rust"><span style="display:flex;"><span><span style="color:#f92672">..</span>.
</span></span><span style="display:flex;"><span><span style="color:#66d9ef">let</span> s2 <span style="color:#f92672">=</span> Socket::from(std_stream);
</span></span><span style="display:flex;"><span>s2.set_recv_buffer_size(<span style="color:#ae81ff">1</span>)<span style="color:#f92672">?</span>;
</span></span><span style="display:flex;"><span><span style="color:#f92672">..</span>.
</span></span></code><button type="button" class="copy-button" aria-label="Copy code to clipboard" title="Copy">Copy</button></pre></div><p>If server doesnt read data from buffer, its receive buffer fills up and pushes back on the TCP sender.
This effectively simulates slow request processing, for example, when the next request hits a slow database while the previous response is still being flushed to the socket to deliver to the client.</p>
<div class="highlight"><pre tabindex="0" style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code class="language-rust" data-lang="rust"><span style="display:flex;"><span><span style="color:#66d9ef">async</span> <span style="color:#66d9ef">fn</span> <span style="color:#a6e22e">handle_client</span>(<span style="color:#66d9ef">mut</span> stream: <span style="color:#a6e22e">TcpStream</span>) -&gt; <span style="color:#a6e22e">std</span>::io::Result<span style="color:#f92672">&lt;</span>()<span style="color:#f92672">&gt;</span> {
</span></span><span style="display:flex;"><span> <span style="color:#a6e22e">println!</span>(<span style="color:#e6db74">"start serving"</span>);
</span></span><span style="display:flex;"><span>
</span></span><span style="display:flex;"><span> <span style="color:#66d9ef">let</span> <span style="color:#66d9ef">mut</span> i <span style="color:#f92672">=</span> <span style="color:#ae81ff">0</span>;
</span></span><span style="display:flex;"><span> <span style="color:#66d9ef">loop</span> {
</span></span><span style="display:flex;"><span> <span style="color:#75715e">// Don't read any data to emulate backpressure.
</span></span></span><span style="display:flex;"><span><span style="color:#75715e"></span> <span style="color:#75715e">// let mut buf = [0u8; 1024];
</span></span></span><span style="display:flex;"><span><span style="color:#75715e"></span> <span style="color:#75715e">// let n = stream.read(&amp;mut buf).await?;
</span></span></span><span style="display:flex;"><span><span style="color:#75715e"></span> <span style="color:#75715e">// if n == 0 {
</span></span></span><span style="display:flex;"><span><span style="color:#75715e"></span> <span style="color:#75715e">// break;
</span></span></span><span style="display:flex;"><span><span style="color:#75715e"></span> <span style="color:#75715e">// }
</span></span></span><span style="display:flex;"><span><span style="color:#75715e"></span> <span style="color:#75715e">// println!("Received {n} bytes");
</span></span></span><span style="display:flex;"><span><span style="color:#75715e"></span>
</span></span><span style="display:flex;"><span> i <span style="color:#f92672">+=</span> <span style="color:#ae81ff">1</span>;
</span></span><span style="display:flex;"><span> sleep(Duration::from_millis(<span style="color:#ae81ff">10</span>)).<span style="color:#66d9ef">await</span>;
</span></span><span style="display:flex;"><span>
</span></span><span style="display:flex;"><span> stream
</span></span><span style="display:flex;"><span> .write_all(<span style="color:#a6e22e">format!</span>(<span style="color:#e6db74">"message from server: </span><span style="color:#e6db74">{i}</span><span style="color:#ae81ff">\n</span><span style="color:#e6db74">"</span>).as_bytes())
</span></span><span style="display:flex;"><span> .<span style="color:#66d9ef">await</span><span style="color:#f92672">?</span>;
</span></span><span style="display:flex;"><span> stream.flush().<span style="color:#66d9ef">await</span><span style="color:#f92672">?</span>;
</span></span><span style="display:flex;"><span> <span style="color:#a6e22e">println!</span>(<span style="color:#e6db74">"written: </span><span style="color:#e6db74">{i}</span><span style="color:#e6db74">"</span>);
</span></span><span style="display:flex;"><span> }
</span></span><span style="display:flex;"><span>}
</span></span></code><button type="button" class="copy-button" aria-label="Copy code to clipboard" title="Copy">Copy</button></pre></div><p>The full code can be found on <a href="https://github.com/brk0v/blog-async-rust-tokio-io/tree/main/src" target="_blank" rel="noopener">github</a>.</p>
<p>And run it:</p>
<div class="highlight"><pre tabindex="0" style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code class="language-bash" data-lang="bash"><span style="display:flex;"><span>$ cargo run --bin 1_server_recvbuf_set
</span></span></code><button type="button" class="copy-button" aria-label="Copy code to clipboard" title="Copy">Copy</button></pre></div><p>In the other console window run the client:</p>
<div class="highlight"><pre tabindex="0" style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code class="language-bash" data-lang="bash"><span style="display:flex;"><span>$ cargo run --bin 1_client_simple
</span></span></code><button type="button" class="copy-button" aria-label="Copy code to clipboard" title="Copy">Copy</button></pre></div><p>As you can see from the output logs, the client stops reading from the server even while the server keeps writing. Packets pile up in the clients receive buffer, eventually filling it and forcing TCP to apply backpressure in the opposite direction.</p>
<p>In the <code>tcpdump</code> output you can see the server on port 8080 sending a zero-window (win 0) ACKs, notifying the client that its receive buffer is full.</p>
<div class="highlight"><pre tabindex="0" style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code class="language-bash" data-lang="bash"><span style="display:flex;"><span>$ sudo tcpdump -i any -n -s0 port <span style="color:#ae81ff">8080</span>
</span></span><span style="display:flex;"><span>...
</span></span><span style="display:flex;"><span>21:36:00.095027 lo In IP 127.0.0.1.8080 &gt; 127.0.0.1.58090: Flags <span style="color:#f92672">[</span>P.<span style="color:#f92672">]</span>, seq 5668:5693, ack 102401, win 0, options <span style="color:#f92672">[</span>nop,nop,TS val <span style="color:#ae81ff">2038288450</span> ecr 2038288438<span style="color:#f92672">]</span>, length 25: HTTP
</span></span><span style="display:flex;"><span>...
</span></span></code><button type="button" class="copy-button" aria-label="Copy code to clipboard" title="Copy">Copy</button></pre></div><p>The <code>ss</code> can help us too to find the stalled send buffer:</p>
<div class="highlight"><pre tabindex="0" style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code class="language-bash" data-lang="bash"><span style="display:flex;"><span>$ ss -tan | grep 127.0.0.1:58090
</span></span><span style="display:flex;"><span>State Recv-Q Send-Q Local Address:Port Peer Address:Port
</span></span><span style="display:flex;"><span>ESTAB <span style="color:#ae81ff">102400</span> <span style="color:#ae81ff">0</span> 127.0.0.1:8080 127.0.0.1:58090
</span></span><span style="display:flex;"><span>ESTAB <span style="color:#ae81ff">869937</span> <span style="color:#ae81ff">2628271</span> 127.0.0.1:58090 127.0.0.1:8080
</span></span></code><button type="button" class="copy-button" aria-label="Copy code to clipboard" title="Copy">Copy</button></pre></div><p>In simplified terms, the TCP connection behaves like a network of queues. Congestion control and flow control cooperate to size those queues, balance throughput, and protect each endpoint from overload.</p>
<img style="margin: 50px 0 20px 0" alt="TCP buffers" src="../images/tcp-buffers.png" width="100%" class="img-center">
<div class="text-center">
Figure 1. TCP buffers for simplex communication
</div>
<p>Usually write calls to a socket complete almost instantly (0.5-5 µs depending on size) because they land in memory (userspace buffers such as <code><a href="https://docs.rs/tokio/latest/tokio/io/struct.BufWriter.html" target="_blank" rel="noopener">BufWriter</a></code> or the kernels TCP send buffer) without waiting for remote acknowledgements.</p>
<p>This asynchronous nature of writes can hide subtle problems: <strong>background write failures, timeouts, and uncertainty of delivery often surface only under congestion</strong>. In our toy setup, <strong>small writes and generous buffers mask pending backpressure</strong>.</p>
<blockquote class="book-hint info">
<p><strong>Note:</strong></p>
<p>A useful per-socket/system-wide setting to improve responsiveness and backpressure behavior is <code><a href="https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt" target="_blank" rel="noopener">TCP_NOTSENT_LOWAT</a></code>. Cloudflare has a good <a href="https://blog.cloudflare.com/http-2-prioritization-with-nginx/" target="_blank" rel="noopener">write-up</a>.</p>
</blockquote>
<h2 id="cancellation">
Cancellation
<a class="anchor" href="#cancellation">#</a>
</h2>
<p>Another interesting part of the code is <strong>how to perform cancellation while backpressure is applied</strong>. We might need to stop processing for many reasons: restart, upstream/downstream abort, timeouts, or simply because the caller no longer needs the result.</p>
<p>In the code above, simply adding a cancellation branch to <code>select!</code> will not work if the task is blocked on <code>write_all()</code>:</p>
<div class="highlight"><pre tabindex="0" style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code class="language-rust" data-lang="rust"><span style="display:flex;"><span><span style="color:#66d9ef">let</span> cancel <span style="color:#f92672">=</span> CancellationToken::new();
</span></span><span style="display:flex;"><span>
</span></span><span style="display:flex;"><span><span style="color:#66d9ef">loop</span> {
</span></span><span style="display:flex;"><span> tokio::<span style="color:#a6e22e">select!</span> {
</span></span><span style="display:flex;"><span> Some(msg) <span style="color:#f92672">=</span> rx.recv() <span style="color:#f92672">=&gt;</span> {
</span></span><span style="display:flex;"><span> stream.write_all(<span style="color:#f92672">&amp;</span>msg).<span style="color:#66d9ef">await</span><span style="color:#f92672">?</span>;
</span></span><span style="display:flex;"><span> stream.flush().<span style="color:#66d9ef">await</span><span style="color:#f92672">?</span>;
</span></span><span style="display:flex;"><span> <span style="color:#a6e22e">println!</span>(<span style="color:#e6db74">"client's written"</span>);
</span></span><span style="display:flex;"><span> }
</span></span><span style="display:flex;"><span>
</span></span><span style="display:flex;"><span> res <span style="color:#f92672">=</span> stream.read(<span style="color:#f92672">&amp;</span><span style="color:#66d9ef">mut</span> read_buf) <span style="color:#f92672">=&gt;</span> {
</span></span><span style="display:flex;"><span> <span style="color:#66d9ef">let</span> n <span style="color:#f92672">=</span> res<span style="color:#f92672">?</span>;
</span></span><span style="display:flex;"><span> <span style="color:#66d9ef">if</span> n <span style="color:#f92672">==</span> <span style="color:#ae81ff">0</span> {
</span></span><span style="display:flex;"><span> <span style="color:#75715e">// EOF - server closed connection
</span></span></span><span style="display:flex;"><span><span style="color:#75715e"></span> <span style="color:#a6e22e">eprintln!</span>(<span style="color:#e6db74">"Server closed the connection."</span>);
</span></span><span style="display:flex;"><span> <span style="color:#66d9ef">break</span>; <span style="color:#75715e">//exit
</span></span></span><span style="display:flex;"><span><span style="color:#75715e"></span> }
</span></span><span style="display:flex;"><span> <span style="color:#a6e22e">print!</span>(<span style="color:#e6db74">"</span><span style="color:#e6db74">{}</span><span style="color:#e6db74">"</span>, String::from_utf8_lossy(<span style="color:#f92672">&amp;</span>read_buf[<span style="color:#f92672">..</span>n]));
</span></span><span style="display:flex;"><span> }
</span></span><span style="display:flex;"><span>
</span></span><span style="display:flex;"><span> _ <span style="color:#f92672">=</span> cancel.cancelled() <span style="color:#f92672">=&gt;</span> <span style="color:#66d9ef">break</span>, <span style="color:#75715e">// &lt;---------------- cancellation
</span></span></span><span style="display:flex;"><span><span style="color:#75715e"></span>
</span></span><span style="display:flex;"><span> <span style="color:#66d9ef">else</span> <span style="color:#f92672">=&gt;</span> <span style="color:#66d9ef">break</span>
</span></span><span style="display:flex;"><span> }
</span></span><span style="display:flex;"><span>}
</span></span></code><button type="button" class="copy-button" aria-label="Copy code to clipboard" title="Copy">Copy</button></pre></div><p>At that point the state machine awaits only <code>write_all()</code>, remaining stuck in <code>Poll::Pending</code> inside the first branch.</p>
<p>The first idea is to apply the cancellation token through every <code>await</code> calls:</p>
<div class="highlight"><pre tabindex="0" style="color:#f8f8f2;background-color:#272822;-moz-tab-size:4;-o-tab-size:4;tab-size:4;"><code class="language-rust" data-lang="rust"><span style="display:flex;"><span><span style="color:#66d9ef">let</span> cancel <span style="color:#f92672">=</span> CancellationToken::new();
</span></span><span style="display:flex;"><span>
</span></span><span style="display:flex;"><span><span style="color:#66d9ef">loop</span> {
</span></span><span style="display:flex;"><span> tokio::<span style="color:#a6e22e">select!</span> {
</span></span><span style="display:flex;"><span> Some(msg) <span style="color:#f92672">=</span> rx.recv() <span style="color:#f92672">=&gt;</span> {
</span></span><span style="display:flex;"><span> tokio::<span style="color:#a6e22e">select!</span> {
</span></span><span style="display:flex;"><span> res <span style="color:#f92672">=</span> stream.write_all(<span style="color:#f92672">&amp;</span>msg) <span style="color:#f92672">=&gt;</span> res,
</span></span><span style="display:flex;"><span> _ <span style="color:#f92672">=</span> cancel.cancelled() <span style="color:#f92672">=&gt;</span> <span style="color:#66d9ef">break</span>
</span></span><span style="display:flex;"><span> }<span style="color:#f92672">?</span>;
</span></span><span style="display:flex;"><span> tokio::<span style="color:#a6e22e">select!</span> {
</span></span><span style="display:flex;"><span> res <span style="color:#f92672">=</span> stream.flush() <span style="color:#f92672">=&gt;</span> res,
</span></span><span style="display:flex;"><span> _ <span style="color:#f92672">=</span> cancel.cancelled() <span style="color:#f92672">=&gt;</span> <span style="color:#66d9ef">break</span>
</span></span><span style="display:flex;"><span> }<span style="color:#f92672">?</span>;
</span></span><span style="display:flex;"><span> <span style="color:#a6e22e">println!</span>(<span style="color:#e6db74">"client's written"</span>);
</span></span><span style="display:flex;"><span> }
</span></span><span style="display:flex;"><span>
</span></span><span style="display:flex;"><span> res <span style="color:#f92672">=</span> stream.read(<span style="color:#f92672">&amp;</span><span style="color:#66d9ef">mut</span> read_buf) <span style="color:#f92672">=&gt;</span> {
</span></span><span style="display:flex;"><span> <span style="color:#66d9ef">let</span> n <span style="color:#f92672">=</span> res<span style="color:#f92672">?</span>;
</span></span><span style="display:flex;"><span> <span style="color:#66d9ef">if</span> n <span style="color:#f92672">==</span> <span style="color:#ae81ff">0</span> {
</span></span><span style="display:flex;"><span> <span style="color:#75715e">// EOF - server closed connection
</span></span></span><span style="display:flex;"><span><span style="color:#75715e"></span> <span style="color:#a6e22e">eprintln!</span>(<span style="color:#e6db74">"Server closed the connection."</span>);
</span></span><span style="display:flex;"><span> <span style="color:#66d9ef">break</span>; <span style="color:#75715e">//exit
</span></span></span><span style="display:flex;"><span><span style="color:#75715e"></span> }
</span></span><span style="display:flex;"><span> <span style="color:#a6e22e">print!</span>(<span style="color:#e6db74">"</span><span style="color:#e6db74">{}</span><span style="color:#e6db74">"</span>, String::from_utf8_lossy(<span style="color:#f92672">&amp;</span>read_buf[<span style="color:#f92672">..</span>n]));
</span></span><span style="display:flex;"><span> }
</span></span><span style="display:flex;"><span>
</span></span><span style="display:flex;"><span> _ <span style="color:#f92672">=</span> cancel.cancelled() <span style="color:#f92672">=&gt;</span> <span style="color:#66d9ef">break</span>,
</span></span><span style="display:flex;"><span>
</span></span><span style="display:flex;"><span> <span style="color:#66d9ef">else</span> <span style="color:#f92672">=&gt;</span> <span style="color:#66d9ef">break</span>
</span></span><span style="display:flex;"><span> }
</span></span><span style="display:flex;"><span>}
</span></span></code><button type="button" class="copy-button" aria-label="Copy code to clipboard" title="Copy">Copy</button></pre></div><p>This works, but it scales poorly: every new await point needs another nested <code>select!</code>, and readability drops fast.</p>
<p>Later in the post Ill show a tidier cancellation pattern, especially if you need a simple “short-circuit” solution.</p>
<a href="/docs/async-rust-tokio-io/2-io-loop/" class="book-btn right-button">
Read next chapter →
</a>
</article>
<footer class="book-footer">
<div class="flex flex-wrap justify-between">
</div>
<script defer="" src="/my_js/copy-code.js"></script>
</footer>
<div class="book-comments">
</div>
<label for="menu-control" class="hidden book-menu-overlay"></label>
</div>
</main>
<div class="cookie-container active">
<p>
This website uses "<b>cookies</b>".
Using this website means you're OK with this.
If you are <b>NOT</b>, please close the site page.
</p>
<button class="cookie-btn">
ACCEPT AND CLOSE
</button>
</div>
<script src="/my_js/cookie.js"></script>
</body></html>