syzbot |
sign-in | mailing list | source | docs | 🏰 |
| ID | Workflow | Result | Correct | Bug | Created | Started | Finished | Revision | Error |
|---|---|---|---|---|---|---|---|---|---|
| 1dfc13c0-b4b0-412c-8d24-057d3d782c29 | assessment-kcsan | Benign: ✅ Confident: ✅ | ❓ | KCSAN: data-race in _prb_read_valid / prb_reserve | 2026/02/24 17:55 | 2026/02/24 17:55 | 2026/02/24 17:57 | 305c0ec5cd886e2d13738e28e1b2df9b0ec20fc9 |
The data race occurs in the `printk` ringbuffer between a writer reserving a new descriptor and a reader attempting to read a record. ### Analysis **Writer Side:** In `prb_reserve()`, a writer reserves a new descriptor slot by atomically setting its state to `desc_reserved`. After successful reservation, it initializes the associated `printk_info` structure: 1. It calls `memset(info, 0, sizeof(*info));` to clear the meta-data (this is the 88-byte write reported). 2. It then calculates and assigns the new sequence number to `info->seq`. **Reader Side:** In `prb_read()`, a reader attempts to read a record with a specific sequence number. It calls `desc_read()`, which follows a "read-check-recheck" pattern: 1. It first reads the descriptor's `state_var` and checks if it's in a consistent state (e.g., `desc_finalized`). 2. If consistent, it proceeds to copy the meta-data, including `info->seq` (this is the 8-byte read reported). 3. After copying, it re-reads the `state_var` and re-checks the state and ID. ### Why the race is benign: 1. **Protocol Design:** The `printk_ringbuffer` is designed to be lockless and explicitly handles concurrent accesses. The reader's re-check of the state variable ensures that if a writer modified the `info` structure during the read, the reader will detect the state change (e.g., seeing `desc_reserved` or a different ID) and discard the potentially inconsistent data. 2. **Memory Barriers:** The ringbuffer implementation uses appropriate memory barriers (`smp_rmb()` in the reader and atomic operations with barrier semantics in the writer) to ensure that the state transitions and data updates are seen in the correct order. Specifically, the writer's update to `info->seq` is guaranteed to be visible before it transitions the state to `desc_committed` or `desc_finalized`. 3. **Safety against Garbage:** The reader code is robust against reading garbage values during a race. For instance, in `copy_data()`, the length used for `memcpy` is clamped to the actual buffer sizes, preventing overflows even if a garbage `text_len` is read. 4. **Atomic Accesses:** On the reported x86_64 system, 8-byte aligned loads and stores of `u64` (like `info->seq`) are atomic at the machine level. The reader will see either the old value, the zeroed value from `memset`, or the new value, but never a torn write. Regardless of which value is seen, the subsequent state re-check will invalidate the read if it happened concurrently with a write. In conclusion, the data race is a known and handled consequence of the lockless design of the printk ringbuffer and does not lead to any functional issues, memory corruption, or crashes. Final Verdict: Benign.
================================================================== BUG: KCSAN: data-race in _prb_read_valid / prb_reserve write to 0xffffffff86cd92e0 of 88 bytes by task 28565 on cpu 0: prb_reserve+0x6e6/0xae0 kernel/printk/printk_ringbuffer.c:1690 vprintk_store+0x54a/0x910 kernel/printk/printk.c:2354 vprintk_emit+0x1a4/0x600 kernel/printk/printk.c:2455 vprintk_default+0x26/0x30 kernel/printk/printk.c:2494 vprintk+0x1d/0x30 kernel/printk/printk_safe.c:82 _printk+0x79/0xa0 kernel/printk/printk.c:2504 fail_dump lib/fault-inject.c:66 [inline] should_fail_ex+0x24b/0x280 lib/fault-inject.c:174 should_failslab+0x8c/0xb0 mm/failslab.c:46 slab_pre_alloc_hook mm/slub.c:4454 [inline] slab_alloc_node mm/slub.c:4807 [inline] kmem_cache_alloc_lru_noprof+0x6c/0x410 mm/slub.c:4849 sock_alloc_inode+0x34/0xa0 net/socket.c:322 alloc_inode+0x40/0x170 fs/inode.c:347 new_inode_pseudo include/linux/fs.h:3003 [inline] sock_alloc net/socket.c:637 [inline] __sock_create+0x120/0x580 net/socket.c:1569 sock_create_kern+0x38/0x50 net/socket.c:1681 udp_sock_create4+0x68/0x2e0 net/ipv4/udp_tunnel_core.c:19 udp_sock_create include/net/udp_tunnel.h:64 [inline] fou_create net/ipv4/fou_core.c:579 [inline] fou_nl_add_doit+0xf1/0x6b0 net/ipv4/fou_core.c:764 genl_family_rcv_msg_doit+0x187/0x1f0 net/netlink/genetlink.c:1114 genl_family_rcv_msg net/netlink/genetlink.c:1194 [inline] genl_rcv_msg+0x432/0x470 net/netlink/genetlink.c:1209 netlink_rcv_skb+0x123/0x220 net/netlink/af_netlink.c:2550 genl_rcv+0x28/0x40 net/netlink/genetlink.c:1218 netlink_unicast_kernel net/netlink/af_netlink.c:1318 [inline] netlink_unicast+0x5c0/0x690 net/netlink/af_netlink.c:1344 netlink_sendmsg+0x5c8/0x6f0 net/netlink/af_netlink.c:1894 sock_sendmsg_nosec net/socket.c:727 [inline] __sock_sendmsg net/socket.c:742 [inline] ____sys_sendmsg+0x5af/0x600 net/socket.c:2592 ___sys_sendmsg+0x195/0x1e0 net/socket.c:2646 __sys_sendmsg net/socket.c:2678 [inline] __do_sys_sendmsg net/socket.c:2683 [inline] __se_sys_sendmsg net/socket.c:2681 [inline] __x64_sys_sendmsg+0xd4/0x160 net/socket.c:2681 x64_sys_call+0x194c/0x3020 arch/x86/include/generated/asm/syscalls_64.h:47 do_syscall_x64 arch/x86/entry/syscall_64.c:63 [inline] do_syscall_64+0x12c/0x370 arch/x86/entry/syscall_64.c:94 entry_SYSCALL_64_after_hwframe+0x77/0x7f read to 0xffffffff86cd92e0 of 8 bytes by task 28566 on cpu 1: desc_read kernel/printk/printk_ringbuffer.c:500 [inline] desc_read_finalized_seq kernel/printk/printk_ringbuffer.c:1931 [inline] prb_read kernel/printk/printk_ringbuffer.c:1979 [inline] _prb_read_valid+0x1c4/0x950 kernel/printk/printk_ringbuffer.c:2170 prb_read_valid+0x3c/0x60 kernel/printk/printk_ringbuffer.c:2242 __console_flush_and_unlock kernel/printk/printk.c:3392 [inline] console_unlock+0x146/0x280 kernel/printk/printk.c:3413 class_console_lock_destructor include/linux/console.h:736 [inline] vcs_open+0x90/0xa0 drivers/tty/vt/vc_screen.c:752 chrdev_open+0x2eb/0x3a0 fs/char_dev.c:411 do_dentry_open+0x4ca/0xa90 fs/open.c:949 vfs_open+0x37/0x1e0 fs/open.c:1081 do_open fs/namei.c:4671 [inline] path_openat+0x1b70/0x2050 fs/namei.c:4830 do_file_open+0x16c/0x290 fs/namei.c:4859 do_sys_openat2+0x94/0x130 fs/open.c:1366 do_sys_open fs/open.c:1372 [inline] __do_sys_openat fs/open.c:1388 [inline] __se_sys_openat fs/open.c:1383 [inline] __x64_sys_openat+0xf2/0x120 fs/open.c:1383 x64_sys_call+0x1e39/0x3020 arch/x86/include/generated/asm/syscalls_64.h:258 do_syscall_x64 arch/x86/entry/syscall_64.c:63 [inline] do_syscall_64+0x12c/0x370 arch/x86/entry/syscall_64.c:94 entry_SYSCALL_64_after_hwframe+0x77/0x7f value changed: 0x0000000000002b74 -> 0x0000000000004b74 Reported by Kernel Concurrency Sanitizer on: CPU: 1 UID: 0 PID: 28566 Comm: syz.2.11955 Tainted: G W syzkaller #0 PREEMPT(full) Tainted: [W]=WARN Hardware name: Google Google Compute Engine/Google Compute Engine, BIOS Google 02/12/2026 ================================================================== bond1: entered promiscuous mode bond1: entered allmulticast mode 8021q: adding VLAN 0 to HW filter on device bond1 A link change request failed with some changes committed already. Interface bond1 may have been left with an inconsistent configuration, please check.
| Seq | Timestamp | Type | Name | Duration |
|---|---|---|---|---|
| 0/0 | 2026/02/24 17:55 | flow | assessment-kcsan |
2mResults:map[Benign:true Confident:true Explanation:The data race occurs in the `printk` ringbuffer between a writer reserving a new descriptor and a reader attempting to read a record. ### Analysis **Writer Side:** In `prb_reserve()`, a writer reserves a new descriptor slot by atomically setting its state to `desc_reserved`. After successful reservation, it initializes the associated `printk_info` structure: 1. It calls `memset(info, 0, sizeof(*info));` to clear the meta-data (this is the 88-byte write reported). 2. It then calculates and assigns the new sequence number to `info->seq`. **Reader Side:** In `prb_read()`, a reader attempts to read a record with a specific sequence number. It calls `desc_read()`, which follows a "read-check-recheck" pattern: 1. It first reads the descriptor's `state_var` and checks if it's in a consistent state (e.g., `desc_finalized`). 2. If consistent, it proceeds to copy the meta-data, including `info->seq` (this is the 8-byte read reported). 3. After copying, it re-reads the `state_var` and re-checks the state and ID. ### Why the race is benign: 1. **Protocol Design:** The `printk_ringbuffer` is designed to be lockless and explicitly handles concurrent accesses. The reader's re-check of the state variable ensures that if a writer modified the `info` structure during the read, the reader will detect the state change (e.g., seeing `desc_reserved` or a different ID) and discard the potentially inconsistent data. 2. **Memory Barriers:** The ringbuffer implementation uses appropriate memory barriers (`smp_rmb()` in the reader and atomic operations with barrier semantics in the writer) to ensure that the state transitions and data updates are seen in the correct order. Specifically, the writer's update to `info->seq` is guaranteed to be visible before it transitions the state to `desc_committed` or `desc_finalized`. 3. **Safety against Garbage:** The reader code is robust against reading garbage values during a race. For instance, in `copy_data()`, the length used for `memcpy` is clamped to the actual buffer sizes, preventing overflows even if a garbage `text_len` is read. 4. **Atomic Accesses:** On the reported x86_64 system, 8-byte aligned loads and stores of `u64` (like `info->seq`) are atomic at the machine level. The reader will see either the old value, the zeroed value from `memset`, or the new value, but never a torn write. Regardless of which value is seen, the subsequent state re-check will invalidate the read if it happened concurrently with a write. In conclusion, the data race is a known and handled consequence of the lockless design of the printk ringbuffer and does not lead to any functional issues, memory corruption, or crashes. Final Verdict: Benign.] |
| 1/1 | 2026/02/24 17:55 | action | kernel-checkouter |
0mResults:map[KernelSrc:/usr/local/google/home/dvyukov/syzkaller/agent/workdir/cache/src/6d58cef964dfa0a62b5f1a85798409a9a17de2fd] |
| 2/1 | 2026/02/24 17:55 | action | kernel-builder |
0mResults:map[KernelObj:/usr/local/google/home/dvyukov/syzkaller/agent/workdir/cache/build/31496bfd96da020ec120e5ff617b45f3c5347922] |
| 3/1 | 2026/02/24 17:55 | action | codesearch-prepare |
0mResults:map[Index:codesearch-index] |
| 4/1 | 2026/02/24 17:55 | agent | expert |
2mModel:gemini-3-flash-preview Results: map[Benign:true Confident:true] Instruction: You are an experienced Linux kernel developer tasked with determining if the given kernel data race is benign or not. The data race report is from KCSAN tool. It contains 2 stack traces of the memory accesses that constitute a data race. A "benign" data races are on a simple int/bool variable or similar field, and the accesses are not supposed to be protected by any mutual exclusion primitives. Common examples of such "benign" data races are accesses to various flags fields, statistics counters, and similar. A "benign" data race does not lead to memory corruption/crash with a conservative compiler that compiles memory accesses to primitive types effectively as atomic. A non-benign (or "harmful" data race) can lead to corruption/crash even with a conservative compiler that compiles memory accesses to primitive types effectively as atomic. A common example of a "harmful" data races is race on a complex container (list/hashmap/etc), where accesses are supposed to be protected by a mutual exclusion primitive. Also consider races that happen at the same time with the given one. If there is no synchronization in between, other memory accesses in the involved threads race with each other if they access the same memory. For example, if both threads execute: some_struct->have_elements = true; list_add(new_node, &some_struct->list_head); the race on some_struct->have_elements may appear benign, however it also implies there is a race on some_struct->list_head which is not benign, since the list is not thread-safe. Take into account that on 32-bit systems 64-bit memory accesses may be split into two accesses, and thus even with a conservative compiler may not be fully atomic. However, such races may still be benign depending on how writes are done, and how read data is used. In the final reply explain why you think the given data race is benign or is harmful. Use the provided tools to confirm any assumptions, variables/fields being accessed, etc. In particular, don't make assumptions about the kernel source code, use codesearch tools to read the actual source code. Prefer calling several tools at the same time to save round-trips. Use set-results tool to provide results of the analysis. It must be called exactly once before the final reply. Ignore results of this tool. Prompt: The data race report is: ================================================================== BUG: KCSAN: data-race in _prb_read_valid / prb_reserve write to 0xffffffff86cd92e0 of 88 bytes by task 28565 on cpu 0: prb_reserve+0x6e6/0xae0 kernel/printk/printk_ringbuffer.c:1690 vprintk_store+0x54a/0x910 kernel/printk/printk.c:2354 vprintk_emit+0x1a4/0x600 kernel/printk/printk.c:2455 vprintk_default+0x26/0x30 kernel/printk/printk.c:2494 vprintk+0x1d/0x30 kernel/printk/printk_safe.c:82 _printk+0x79/0xa0 kernel/printk/printk.c:2504 fail_dump lib/fault-inject.c:66 [inline] should_fail_ex+0x24b/0x280 lib/fault-inject.c:174 should_failslab+0x8c/0xb0 mm/failslab.c:46 slab_pre_alloc_hook mm/slub.c:4454 [inline] slab_alloc_node mm/slub.c:4807 [inline] kmem_cache_alloc_lru_noprof+0x6c/0x410 mm/slub.c:4849 sock_alloc_inode+0x34/0xa0 net/socket.c:322 alloc_inode+0x40/0x170 fs/inode.c:347 new_inode_pseudo include/linux/fs.h:3003 [inline] sock_alloc net/socket.c:637 [inline] __sock_create+0x120/0x580 net/socket.c:1569 sock_create_kern+0x38/0x50 net/socket.c:1681 udp_sock_create4+0x68/0x2e0 net/ipv4/udp_tunnel_core.c:19 udp_sock_create include/net/udp_tunnel.h:64 [inline] fou_create net/ipv4/fou_core.c:579 [inline] fou_nl_add_doit+0xf1/0x6b0 net/ipv4/fou_core.c:764 genl_family_rcv_msg_doit+0x187/0x1f0 net/netlink/genetlink.c:1114 genl_family_rcv_msg net/netlink/genetlink.c:1194 [inline] genl_rcv_msg+0x432/0x470 net/netlink/genetlink.c:1209 netlink_rcv_skb+0x123/0x220 net/netlink/af_netlink.c:2550 genl_rcv+0x28/0x40 net/netlink/genetlink.c:1218 netlink_unicast_kernel net/netlink/af_netlink.c:1318 [inline] netlink_unicast+0x5c0/0x690 net/netlink/af_netlink.c:1344 netlink_sendmsg+0x5c8/0x6f0 net/netlink/af_netlink.c:1894 sock_sendmsg_nosec net/socket.c:727 [inline] __sock_sendmsg net/socket.c:742 [inline] ____sys_sendmsg+0x5af/0x600 net/socket.c:2592 ___sys_sendmsg+0x195/0x1e0 net/socket.c:2646 __sys_sendmsg net/socket.c:2678 [inline] __do_sys_sendmsg net/socket.c:2683 [inline] __se_sys_sendmsg net/socket.c:2681 [inline] __x64_sys_sendmsg+0xd4/0x160 net/socket.c:2681 x64_sys_call+0x194c/0x3020 arch/x86/include/generated/asm/syscalls_64.h:47 do_syscall_x64 arch/x86/entry/syscall_64.c:63 [inline] do_syscall_64+0x12c/0x370 arch/x86/entry/syscall_64.c:94 entry_SYSCALL_64_after_hwframe+0x77/0x7f read to 0xffffffff86cd92e0 of 8 bytes by task 28566 on cpu 1: desc_read kernel/printk/printk_ringbuffer.c:500 [inline] desc_read_finalized_seq kernel/printk/printk_ringbuffer.c:1931 [inline] prb_read kernel/printk/printk_ringbuffer.c:1979 [inline] _prb_read_valid+0x1c4/0x950 kernel/printk/printk_ringbuffer.c:2170 prb_read_valid+0x3c/0x60 kernel/printk/printk_ringbuffer.c:2242 __console_flush_and_unlock kernel/printk/printk.c:3392 [inline] console_unlock+0x146/0x280 kernel/printk/printk.c:3413 class_console_lock_destructor include/linux/console.h:736 [inline] vcs_open+0x90/0xa0 drivers/tty/vt/vc_screen.c:752 chrdev_open+0x2eb/0x3a0 fs/char_dev.c:411 do_dentry_open+0x4ca/0xa90 fs/open.c:949 vfs_open+0x37/0x1e0 fs/open.c:1081 do_open fs/namei.c:4671 [inline] path_openat+0x1b70/0x2050 fs/namei.c:4830 do_file_open+0x16c/0x290 fs/namei.c:4859 do_sys_openat2+0x94/0x130 fs/open.c:1366 do_sys_open fs/open.c:1372 [inline] __do_sys_openat fs/open.c:1388 [inline] __se_sys_openat fs/open.c:1383 [inline] __x64_sys_openat+0xf2/0x120 fs/open.c:1383 x64_sys_call+0x1e39/0x3020 arch/x86/include/generated/asm/syscalls_64.h:258 do_syscall_x64 arch/x86/entry/syscall_64.c:63 [inline] do_syscall_64+0x12c/0x370 arch/x86/entry/syscall_64.c:94 entry_SYSCALL_64_after_hwframe+0x77/0x7f value changed: 0x0000000000002b74 -> 0x0000000000004b74 Reported by Kernel Concurrency Sanitizer on: CPU: 1 UID: 0 PID: 28566 Comm: syz.2.11955 Tainted: G W syzkaller #0 PREEMPT(full) Tainted: [W]=WARN Hardware name: Google Google Compute Engine/Google Compute Engine, BIOS Google 02/12/2026 ================================================================== bond1: entered promiscuous mode bond1: entered allmulticast mode 8021q: adding VLAN 0 to HW filter on device bond1 A link change request failed with some changes committed already. Interface bond1 may have been left with an inconsistent configuration, please check. Reply: The data race occurs in the `printk` ringbuffer between a writer reserving a new descriptor and a reader attempting to read a record. ### Analysis **Writer Side:** In `prb_reserve()`, a writer reserves a new descriptor slot by atomically setting its state to `desc_reserved`. After successful reservation, it initializes the associated `printk_info` structure: 1. It calls `memset(info, 0, sizeof(*info));` to clear the meta-data (this is the 88-byte write reported). 2. It then calculates and assigns the new sequence number to `info->seq`. **Reader Side:** In `prb_read()`, a reader attempts to read a record with a specific sequence number. It calls `desc_read()`, which follows a "read-check-recheck" pattern: 1. It first reads the descriptor's `state_var` and checks if it's in a consistent state (e.g., `desc_finalized`). 2. If consistent, it proceeds to copy the meta-data, including `info->seq` (this is the 8-byte read reported). 3. After copying, it re-reads the `state_var` and re-checks the state and ID. ### Why the race is benign: 1. **Protocol Design:** The `printk_ringbuffer` is designed to be lockless and explicitly handles concurrent accesses. The reader's re-check of the state variable ensures that if a writer modified the `info` structure during the read, the reader will detect the state change (e.g., seeing `desc_reserved` or a different ID) and discard the potentially inconsistent data. 2. **Memory Barriers:** The ringbuffer implementation uses appropriate memory barriers (`smp_rmb()` in the reader and atomic operations with barrier semantics in the writer) to ensure that the state transitions and data updates are seen in the correct order. Specifically, the writer's update to `info->seq` is guaranteed to be visible before it transitions the state to `desc_committed` or `desc_finalized`. 3. **Safety against Garbage:** The reader code is robust against reading garbage values during a race. For instance, in `copy_data()`, the length used for `memcpy` is clamped to the actual buffer sizes, preventing overflows even if a garbage `text_len` is read. 4. **Atomic Accesses:** On the reported x86_64 system, 8-byte aligned loads and stores of `u64` (like `info->seq`) are atomic at the machine level. The reader will see either the old value, the zeroed value from `memset`, or the new value, but never a torn write. Regardless of which value is seen, the subsequent state re-check will invalidate the read if it happened concurrently with a write. In conclusion, the data race is a known and handled consequence of the lockless design of the printk ringbuffer and does not lead to any functional issues, memory corruption, or crashes. Final Verdict: Benign. |
| 5/2 | 2026/02/24 17:55 | llm | expert |
0mModel:gemini-3-flash-preview Tokens: input: 5150 output: 22 thoughts: 302 Thoughts: **Analyzing a Data Race in the Kernel's Printk Ringbuffer** Okay, so the report flags a data race, and it seems like it's between `prb_reserve` (the write) and `_prb_read_valid` (the read) within `kernel/printk/printk_ringbuffer.c`. I need to dig into this. The key information is the specific address: `0xffffffff86cd92e0`. The write is substantial at 88 bytes, while the read is just 8 bytes. I'm noting the value change too: `0x0000000000002b74` to `0x0000000000004b74`. Let's get the exact function calls to confirm what I'm looking at. The write is `prb_reserve+0x6e6/0xae0` at line 1690, and the read is `_prb_read_valid+0x1c4/0x950` at line 2170, and this call chain includes `prb_read`, then `desc_read_finalized_seq` finally ending in `desc_read`. Eighty-eight bytes for a write is pretty significant. This implies a structure or larger data being copied or initialized. I should examine the code to see what that 88-byte chunk represents. This could give me some crucial context. I'll focus on what's being written during `prb_reserve` and then what that same data is used for during the `_prb_read_valid` path. Understanding the data's meaning will illuminate the race's potential severity and how to mitigate it. |
| 6/2 | 2026/02/24 17:55 | tool | codesearch-dir-index |
0mArgs:map[Dir:kernel/printk/] Results: map[Files:[braille.c braille.h console_cmdline.h index.c internal.h nbcon.c printk.c printk_ringbuffer.c printk_ringbuffer.h printk_ringbuffer_kunit_test.c printk_safe.c sysctl.c] Subdirs:<nil>] |
| 7/2 | 2026/02/24 17:55 | llm | expert |
0mModel:gemini-3-flash-preview Tokens: input: 5973 output: 25 thoughts: 30 Thoughts: Here's my analysis of the provided text, rendered as though I'm thinking it: **Digging into the Printk Ring Buffer** Alright, looks like we're troubleshooting something related to `printk`. My first instinct is to zero in on the ring buffer implementation, specifically the core logic. I'm going to jump straight into `kernel/printk/printk_ringbuffer.c`. I know that's where the heart of the ring buffer lives. Let's see... based on the suggestion, lines 1690 and 2170 seem like potentially interesting spots to start digging. Those line numbers probably represent specific function calls or sections within that file, likely dealing with how data is written to, or read from, the buffer. Let's see what's happening there. I'll need to understand the data structures and control flow around those lines to figure out the issue. This could involve examining lock usage, memory management, or potential race conditions, depending on what I find. |
| 8/2 | 2026/02/24 17:55 | tool | read-file |
0mArgs:map[File:kernel/printk/printk_ringbuffer.c] Results: map[Contents:// SPDX-License-Identifier: GPL-2.0
#include <kunit/visibility.h>
#include <linux/kernel.h>
#include <linux/irqflags.h>
#include <linux/string.h>
#include <linux/errno.h>
#include <linux/bug.h>
#include "printk_ringbuffer.h"
#include "internal.h"
/**
* DOC: printk_ringbuffer overview
*
* Data Structure
* --------------
* The printk_ringbuffer is made up of 3 internal ringbuffers:
*
* desc_ring
* A ring of descriptors and their meta data (such as sequence number,
* timestamp, loglevel, etc.) as well as internal state information about
* the record and logical positions specifying where in the other
* ringbuffer the text strings are located.
*
* text_data_ring
* A ring of data blocks. A data block consists of an unsigned long
* integer (ID) that maps to a desc_ring index followed by the text
* string of the record.
*
* The internal state information of a descriptor is the key element to allow
* readers and writers to locklessly synchronize access to the data.
*
* Implementation
* --------------
*
* Descriptor Ring
* ~~~~~~~~~~~~~~~
* The descriptor ring is an array of descriptors. A descriptor contains
* essential meta data to track the data of a printk record using
* blk_lpos structs pointing to associated text data blocks (see
* "Data Rings" below). Each descriptor is assigned an ID that maps
* directly to index values of the descriptor array and has a state. The ID
* and the state are bitwise combined into a single descriptor field named
* @state_var, allowing ID and state to be synchronously and atomically
* updated.
*
* Descriptors have four states:
*
* reserved
* A writer is modifying the record.
*
* committed
* The record and all its data are written. A writer can reopen the
* descriptor (transitioning it back to reserved), but in the committed
* state the data is consistent.
*
* finalized
* The record and all its data are complete and available for reading. A
* writer cannot reopen the descriptor.
*
* reusable
* The record exists, but its text and/or meta data may no longer be
* available.
*
* Querying the @state_var of a record requires providing the ID of the
* descriptor to query. This can yield a possible fifth (pseudo) state:
*
* miss
* The descriptor being queried has an unexpected ID.
*
* The descriptor ring has a @tail_id that contains the ID of the oldest
* descriptor and @head_id that contains the ID of the newest descriptor.
*
* When a new descriptor should be created (and the ring is full), the tail
* descriptor is invalidated by first transitioning to the reusable state and
* then invalidating all tail data blocks up to and including the data blocks
* associated with the tail descriptor (for the text ring). Then
* @tail_id is advanced, followed by advancing @head_id. And finally the
* @state_var of the new descriptor is initialized to the new ID and reserved
* state.
*
* The @tail_id can only be advanced if the new @tail_id would be in the
* committed or reusable queried state. This makes it possible that a valid
* sequence number of the tail is always available.
*
* Descriptor Finalization
* ~~~~~~~~~~~~~~~~~~~~~~~
* When a writer calls the commit function prb_commit(), record data is
* fully stored and is consistent within the ringbuffer. However, a writer can
* reopen that record, claiming exclusive access (as with prb_reserve()), and
* modify that record. When finished, the writer must again commit the record.
*
* In order for a record to be made available to readers (and also become
* recyclable for writers), it must be finalized. A finalized record cannot be
* reopened and can never become "unfinalized". Record finalization can occur
* in three different scenarios:
*
* 1) A writer can simultaneously commit and finalize its record by calling
* prb_final_commit() instead of prb_commit().
*
* 2) When a new record is reserved and the previous record has been
* committed via prb_commit(), that previous record is automatically
* finalized.
*
* 3) When a record is committed via prb_commit() and a newer record
* already exists, the record being committed is automatically finalized.
*
* Data Ring
* ~~~~~~~~~
* The text data ring is a byte array composed of data blocks. Data blocks are
* referenced by blk_lpos structs that point to the logical position of the
* beginning of a data block and the beginning of the next adjacent data
* block. Logical positions are mapped directly to index values of the byte
* array ringbuffer.
*
* Each data block consists of an ID followed by the writer data. The ID is
* the identifier of a descriptor that is associated with the data block. A
* given data block is considered valid if all of the following conditions
* are met:
*
* 1) The descriptor associated with the data block is in the committed
* or finalized queried state.
*
* 2) The blk_lpos struct within the descriptor associated with the data
* block references back to the same data block.
*
* 3) The data block is within the head/tail logical position range.
*
* If the writer data of a data block would extend beyond the end of the
* byte array, only the ID of the data block is stored at the logical
* position and the full data block (ID and writer data) is stored at the
* beginning of the byte array. The referencing blk_lpos will point to the
* ID before the wrap and the next data block will be at the logical
* position adjacent the full data block after the wrap.
*
* Data rings have a @tail_lpos that points to the beginning of the oldest
* data block and a @head_lpos that points to the logical position of the
* next (not yet existing) data block.
*
* When a new data block should be created (and the ring is full), tail data
* blocks will first be invalidated by putting their associated descriptors
* into the reusable state and then pushing the @tail_lpos forward beyond
* them. Then the @head_lpos is pushed forward and is associated with a new
* descriptor. If a data block is not valid, the @tail_lpos cannot be
* advanced beyond it.
*
* Info Array
* ~~~~~~~~~~
* The general meta data of printk records are stored in printk_info structs,
* stored in an array with the same number of elements as the descriptor ring.
* Each info corresponds to the descriptor of the same index in the
* descriptor ring. Info validity is confirmed by evaluating the corresponding
* descriptor before and after loading the info.
*
* Usage
* -----
* Here are some simple examples demonstrating writers and readers. For the
* examples a global ringbuffer (test_rb) is available (which is not the
* actual ringbuffer used by printk)::
*
* DEFINE_PRINTKRB(test_rb, 15, 5);
*
* This ringbuffer allows up to 32768 records (2 ^ 15) and has a size of
* 1 MiB (2 ^ (15 + 5)) for text data.
*
* Sample writer code::
*
* const char *textstr = "message text";
* struct prb_reserved_entry e;
* struct printk_record r;
*
* // specify how much to allocate
* prb_rec_init_wr(&r, strlen(textstr) + 1);
*
* if (prb_reserve(&e, &test_rb, &r)) {
* snprintf(r.text_buf, r.text_buf_size, "%s", textstr);
*
* r.info->text_len = strlen(textstr);
* r.info->ts_nsec = local_clock();
* r.info->caller_id = printk_caller_id();
*
* // commit and finalize the record
* prb_final_commit(&e);
* }
*
* Note that additional writer functions are available to extend a record
* after it has been committed but not yet finalized. This can be done as
* long as no new records have been reserved and the caller is the same.
*
* Sample writer code (record extending)::
*
* // alternate rest of previous example
*
* r.info->text_len = strlen(textstr);
* r.info->ts_nsec = local_clock();
* r.info->caller_id = printk_caller_id();
*
* // commit the record (but do not finalize yet)
* prb_commit(&e);
* }
*
* ...
*
* // specify additional 5 bytes text space to extend
* prb_rec_init_wr(&r, 5);
*
* // try to extend, but only if it does not exceed 32 bytes
* if (prb_reserve_in_last(&e, &test_rb, &r, printk_caller_id(), 32)) {
* snprintf(&r.text_buf[r.info->text_len],
* r.text_buf_size - r.info->text_len, "hello");
*
* r.info->text_len += 5;
*
* // commit and finalize the record
* prb_final_commit(&e);
* }
*
* Sample reader code::
*
* struct printk_info info;
* struct printk_record r;
* char text_buf[32];
* u64 seq;
*
* prb_rec_init_rd(&r, &info, &text_buf[0], sizeof(text_buf));
*
* prb_for_each_record(0, &test_rb, &seq, &r) {
* if (info.seq != seq)
* pr_warn("lost %llu records\n", info.seq - seq);
*
* if (info.text_len > r.text_buf_size) {
* pr_warn("record %llu text truncated\n", info.seq);
* text_buf[r.text_buf_size - 1] = 0;
* }
*
* pr_info("%llu: %llu: %s\n", info.seq, info.ts_nsec,
* &text_buf[0]);
* }
*
* Note that additional less convenient reader functions are available to
* allow complex record access.
*
* ABA Issues
* ~~~~~~~~~~
* To help avoid ABA issues, descriptors are referenced by IDs (array index
* values combined with tagged bits counting array wraps) and data blocks are
* referenced by logical positions (array index values combined with tagged
* bits counting array wraps). However, on 32-bit systems the number of
* tagged bits is relatively small such that an ABA incident is (at least
* theoretically) possible. For example, if 4 million maximally sized (1KiB)
* printk messages were to occur in NMI context on a 32-bit system, the
* interrupted context would not be able to recognize that the 32-bit integer
* completely wrapped and thus represents a different data block than the one
* the interrupted context expects.
*
* To help combat this possibility, additional state checking is performed
* (such as using cmpxchg() even though set() would suffice). These extra
* checks are commented as such and will hopefully catch any ABA issue that
* a 32-bit system might experience.
*
* Memory Barriers
* ~~~~~~~~~~~~~~~
* Multiple memory barriers are used. To simplify proving correctness and
* generating litmus tests, lines of code related to memory barriers
* (loads, stores, and the associated memory barriers) are labeled::
*
* LMM(function:letter)
*
* Comments reference the labels using only the "function:letter" part.
*
* The memory barrier pairs and their ordering are:
*
* desc_reserve:D / desc_reserve:B
* push descriptor tail (id), then push descriptor head (id)
*
* desc_reserve:D / data_push_tail:B
* push data tail (lpos), then set new descriptor reserved (state)
*
* desc_reserve:D / desc_push_tail:C
* push descriptor tail (id), then set new descriptor reserved (state)
*
* desc_reserve:D / prb_first_seq:C
* push descriptor tail (id), then set new descriptor reserved (state)
*
* desc_reserve:F / desc_read:D
* set new descriptor id and reserved (state), then allow writer changes
*
* data_alloc:A (or data_realloc:A) / desc_read:D
* set old descriptor reusable (state), then modify new data block area
*
* data_alloc:A (or data_realloc:A) / data_push_tail:B
* push data tail (lpos), then modify new data block area
*
* _prb_commit:B / desc_read:B
* store writer changes, then set new descriptor committed (state)
*
* desc_reopen_last:A / _prb_commit:B
* set descriptor reserved (state), then read descriptor data
*
* _prb_commit:B / desc_reserve:D
* set new descriptor committed (state), then check descriptor head (id)
*
* data_push_tail:D / data_push_tail:A
* set descriptor reusable (state), then push data tail (lpos)
*
* desc_push_tail:B / desc_reserve:D
* set descriptor reusable (state), then push descriptor tail (id)
*
* desc_update_last_finalized:A / desc_last_finalized_seq:A
* store finalized record, then set new highest finalized sequence number
*/
#define DATA_SIZE(data_ring) _DATA_SIZE((data_ring)->size_bits)
#define DATA_SIZE_MASK(data_ring) (DATA_SIZE(data_ring) - 1)
#define DESCS_COUNT(desc_ring) _DESCS_COUNT((desc_ring)->count_bits)
#define DESCS_COUNT_MASK(desc_ring) (DESCS_COUNT(desc_ring) - 1)
/* Determine the data array index from a logical position. */
#define DATA_INDEX(data_ring, lpos) ((lpos) & DATA_SIZE_MASK(data_ring))
/* Determine the desc array index from an ID or sequence number. */
#define DESC_INDEX(desc_ring, n) ((n) & DESCS_COUNT_MASK(desc_ring))
/* Determine how many times the data array has wrapped. */
#define DATA_WRAPS(data_ring, lpos) ((lpos) >> (data_ring)->size_bits)
/* Determine if a logical position refers to a data-less block. */
#define LPOS_DATALESS(lpos) ((lpos) & 1UL)
#define BLK_DATALESS(blk) (LPOS_DATALESS((blk)->begin) && \
LPOS_DATALESS((blk)->next))
/* Get the logical position at index 0 of the current wrap. */
#define DATA_THIS_WRAP_START_LPOS(data_ring, lpos) \
((lpos) & ~DATA_SIZE_MASK(data_ring))
/* Get the ID for the same index of the previous wrap as the given ID. */
#define DESC_ID_PREV_WRAP(desc_ring, id) \
DESC_ID((id) - DESCS_COUNT(desc_ring))
/*
* A data block: mapped directly to the beginning of the data block area
* specified as a logical position within the data ring.
*
* @id: the ID of the associated descriptor
* @data: the writer data
*
* Note that the size of a data block is only known by its associated
* descriptor.
*/
struct prb_data_block {
unsigned long id;
char data[];
};
/*
* Return the descriptor associated with @n. @n can be either a
* descriptor ID or a sequence number.
*/
static struct prb_desc *to_desc(struct prb_desc_ring *desc_ring, u64 n)
{
return &desc_ring->descs[DESC_INDEX(desc_ring, n)];
}
/*
* Return the printk_info associated with @n. @n can be either a
* descriptor ID or a sequence number.
*/
static struct printk_info *to_info(struct prb_desc_ring *desc_ring, u64 n)
{
return &desc_ring->infos[DESC_INDEX(desc_ring, n)];
}
static struct prb_data_block *to_block(struct prb_data_ring *data_ring,
unsigned long begin_lpos)
{
return (void *)&data_ring->data[DATA_INDEX(data_ring, begin_lpos)];
}
/*
* Increase the data size to account for data block meta data plus any
* padding so that the adjacent data block is aligned on the ID size.
*/
static unsigned int to_blk_size(unsigned int size)
{
struct prb_data_block *db = NULL;
size += sizeof(*db);
size = ALIGN(size, sizeof(db->id));
return size;
}
/*
* Sanity checker for reserve size. The ringbuffer code assumes that a data
* block does not exceed the maximum possible size that could fit within the
* ringbuffer. This function provides that basic size check so that the
* assumption is safe. In particular, it guarantees that data_push_tail() will
* never attempt to push the tail beyond the head.
*/
static bool data_check_size(struct prb_data_ring *data_ring, unsigned int size)
{
/* Data-less blocks take no space. */
if (size == 0)
return true;
/*
* If data blocks were allowed to be larger than half the data ring
* size, a wrapping data block could require more space than the full
* ringbuffer.
*/
return to_blk_size(size) <= DATA_SIZE(data_ring) / 2;
}
/*
* Compare the current and requested logical position and decide
* whether more space is needed.
*
* Return false when @lpos_current is already at or beyond @lpos_target.
*
* Also return false when the difference between the positions is bigger
* than the size of the data buffer. It might happen only when the caller
* raced with another CPU(s) which already made and used the space.
*/
static bool need_more_space(struct prb_data_ring *data_ring,
unsigned long lpos_current,
unsigned long lpos_target)
{
return lpos_target - lpos_current - 1 < DATA_SIZE(data_ring);
}
/* Query the state of a descriptor. */
static enum desc_state get_desc_state(unsigned long id,
unsigned long state_val)
{
if (id != DESC_ID(state_val))
return desc_miss;
return DESC_STATE(state_val);
}
/*
* Get a copy of a specified descriptor and return its queried state. If the
* descriptor is in an inconsistent state (miss or reserved), the caller can
* only expect the descriptor's @state_var field to be valid.
*
* The sequence number and caller_id can be optionally retrieved. Like all
* non-state_var data, they are only valid if the descriptor is in a
* consistent state.
*/
static enum desc_state desc_read(struct prb_desc_ring *desc_ring,
unsigned long id, struct prb_desc *desc_out,
u64 *seq_out, u32 *caller_id_out)
{
struct printk_info *info = to_info(desc_ring, id);
struct prb_desc *desc = to_desc(desc_ring, id);
atomic_long_t *state_var = &desc->state_var;
enum desc_state d_state;
unsigned long state_val;
/* Check the descriptor state. */
state_val = atomic_long_read(state_var); /* LMM(desc_read:A) */
d_state = get_desc_state(id, state_val);
if (d_state == desc_miss || d_state == desc_reserved) {
/*
* The descriptor is in an inconsistent state. Set at least
* @state_var so that the caller can see the details of
* the inconsistent state.
*/
goto out;
}
/*
* Guarantee the state is loaded before copying the descriptor
* content. This avoids copying obsolete descriptor content that might
* not apply to the descriptor state. This pairs with _prb_commit:B.
*
* Memory barrier involvement:
*
* If desc_read:A reads from _prb_commit:B, then desc_read:C reads
* from _prb_commit:A.
*
* Relies on:
*
* WMB from _prb_commit:A to _prb_commit:B
* matching
* RMB from desc_read:A to desc_read:C
*/
smp_rmb(); /* LMM(desc_read:B) */
/*
* Copy the descriptor data. The data is not valid until the
* state has been re-checked. A memcpy() for all of @desc
* cannot be used because of the atomic_t @state_var field.
*/
if (desc_out) {
memcpy(&desc_out->text_blk_lpos, &desc->text_blk_lpos,
sizeof(desc_out->text_blk_lpos)); /* LMM(desc_read:C) */
}
if (seq_out)
*seq_out = info->seq; /* also part of desc_read:C */
if (caller_id_out)
*caller_id_out = info->caller_id; /* also part of desc_read:C */
/*
* 1. Guarantee the descriptor content is loaded before re-checking
* the state. This avoids reading an obsolete descriptor state
* that may not apply to the copied content. This pairs with
* desc_reserve:F.
*
* Memory barrier involvement:
*
* If desc_read:C reads from desc_reserve:G, then desc_read:E
* reads from desc_reserve:F.
*
* Relies on:
*
* WMB from desc_reserve:F to desc_reserve:G
* matching
* RMB from desc_read:C to desc_read:E
*
* 2. Guarantee the record data is loaded before re-checking the
* state. This avoids reading an obsolete descriptor state that may
* not apply to the copied data. This pairs with data_alloc:A and
* data_realloc:A.
*
* Memory barrier involvement:
*
* If copy_data:A reads from data_alloc:B, then desc_read:E
* reads from desc_make_reusable:A.
*
* Relies on:
*
* MB from desc_make_reusable:A to data_alloc:B
* matching
* RMB from desc_read:C to desc_read:E
*
* Note: desc_make_reusable:A and data_alloc:B can be different
* CPUs. However, the data_alloc:B CPU (which performs the
* full memory barrier) must have previously seen
* desc_make_reusable:A.
*/
smp_rmb(); /* LMM(desc_read:D) */
/*
* The data has been copied. Return the current descriptor state,
* which may have changed since the load above.
*/
state_val = atomic_long_read(state_var); /* LMM(desc_read:E) */
d_state = get_desc_state(id, state_val);
out:
if (desc_out)
atomic_long_set(&desc_out->state_var, state_val);
return d_state;
}
/*
* Take a specified descriptor out of the finalized state by attempting
* the transition from finalized to reusable. Either this context or some
* other context will have been successful.
*/
static void desc_make_reusable(struct prb_desc_ring *desc_ring,
unsigned long id)
{
unsigned long val_finalized = DESC_SV(id, desc_finalized);
unsigned long val_reusable = DESC_SV(id, desc_reusable);
struct prb_desc *desc = to_desc(desc_ring, id);
atomic_long_t *state_var = &desc->state_var;
atomic_long_cmpxchg_relaxed(state_var, val_finalized,
val_reusable); /* LMM(desc_make_reusable:A) */
}
/*
* Given the text data ring, put the associated descriptor of each
* data block from @lpos_begin until @lpos_end into the reusable state.
*
* If there is any problem making the associated descriptor reusable, either
* the descriptor has not yet been finalized or another writer context has
* already pushed the tail lpos past the problematic data block. Regardless,
* on error the caller can re-load the tail lpos to determine the situation.
*/
static bool data_make_reusable(struct printk_ringbuffer *rb,
unsigned long lpos_begin,
unsigned long lpos_end,
unsigned long *lpos_out)
{
struct prb_data_ring *data_ring = &rb->text_data_ring;
struct prb_desc_ring *desc_ring = &rb->desc_ring;
struct prb_data_block *blk;
enum desc_state d_state;
struct prb_desc desc;
struct prb_data_blk_lpos *blk_lpos = &desc.text_blk_lpos;
unsigned long id;
/* Loop until @lpos_begin has advanced to or beyond @lpos_end. */
while (need_more_space(data_ring, lpos_begin, lpos_end)) {
blk = to_block(data_ring, lpos_begin);
/*
* Load the block ID from the data block. This is a data race
* against a writer that may have newly reserved this data
* area. If the loaded value matches a valid descriptor ID,
* the blk_lpos of that descriptor will be checked to make
* sure it points back to this data block. If the check fails,
* the data area has been recycled by another writer.
*/
id = blk->id; /* LMM(data_make_reusable:A) */
d_state = desc_read(desc_ring, id, &desc,
NULL, NULL); /* LMM(data_make_reusable:B) */
switch (d_state) {
case desc_miss:
case desc_reserved:
case desc_committed:
return false;
case desc_finalized:
/*
* This data block is invalid if the descriptor
* does not point back to it.
*/
if (blk_lpos->begin != lpos_begin)
return false;
desc_make_reusable(desc_ring, id);
break;
case desc_reusable:
/*
* This data block is invalid if the descriptor
* does not point back to it.
*/
if (blk_lpos->begin != lpos_begin)
return false;
break;
}
/* Advance @lpos_begin to the next data block. */
lpos_begin = blk_lpos->next;
}
*lpos_out = lpos_begin;
return true;
}
/*
* Advance the data ring tail to at least @lpos. This function puts
* descriptors into the reusable state if the tail is pushed beyond
* their associated data block.
*/
static bool data_push_tail(struct printk_ringbuffer *rb, unsigned long lpos)
{
struct prb_data_ring *data_ring = &rb->text_data_ring;
unsigned long tail_lpos_new;
unsigned long tail_lpos;
unsigned long next_lpos;
/* If @lpos is from a data-less block, there is nothing to do. */
if (LPOS_DATALESS(lpos))
return true;
/*
* Any descriptor states that have transitioned to reusable due to the
* data tail being pushed to this loaded value will be visible to this
* CPU. This pairs with data_push_tail:D.
*
* Memory barrier involvement:
*
* If data_push_tail:A reads from data_push_tail:D, then this CPU can
* see desc_make_reusable:A.
*
* Relies on:
*
* MB from desc_make_reusable:A to data_push_tail:D
* matches
* READFROM from data_push_tail:D to data_push_tail:A
* thus
* READFROM from desc_make_reusable:A to this CPU
*/
tail_lpos = atomic_long_read(&data_ring->tail_lpos); /* LMM(data_push_tail:A) */
/*
* Loop until the tail lpos is at or beyond @lpos. This condition
* may already be satisfied, resulting in no full memory barrier
* from data_push_tail:D being performed. However, since this CPU
* sees the new tail lpos, any descriptor states that transitioned to
* the reusable state must already be visible.
*/
while (need_more_space(data_ring, tail_lpos, lpos)) {
/*
* Make all descriptors reusable that are associated with
* data blocks before @lpos.
*/
if (!data_make_reusable(rb, tail_lpos, lpos, &next_lpos)) {
/*
* 1. Guarantee the block ID loaded in
* data_make_reusable() is performed before
* reloading the tail lpos. The failed
* data_make_reusable() may be due to a newly
* recycled data area causing the tail lpos to
* have been previously pushed. This pairs with
* data_alloc:A and data_realloc:A.
*
* Memory barrier involvement:
*
* If data_make_reusable:A reads from data_alloc:B,
* then data_push_tail:C reads from
* data_push_tail:D.
*
* Relies on:
*
* MB from data_push_tail:D to data_alloc:B
* matching
* RMB from data_make_reusable:A to
* data_push_tail:C
*
* Note: data_push_tail:D and data_alloc:B can be
* different CPUs. However, the data_alloc:B
* CPU (which performs the full memory
* barrier) must have previously seen
* data_push_tail:D.
*
* 2. Guarantee the descriptor state loaded in
* data_make_reusable() is performed before
* reloading the tail lpos. The failed
* data_make_reusable() may be due to a newly
* recycled descriptor causing the tail lpos to
* have been previously pushed. This pairs with
* desc_reserve:D.
*
* Memory barrier involvement:
*
* If data_make_reusable:B reads from
* desc_reserve:F, then data_push_tail:C reads
* from data_push_tail:D.
*
* Relies on:
*
* MB from data_push_tail:D to desc_reserve:F
* matching
* RMB from data_make_reusable:B to
* data_push_tail:C
*
* Note: data_push_tail:D and desc_reserve:F can
* be different CPUs. However, the
* desc_reserve:F CPU (which performs the
* full memory barrier) must have previously
* seen data_push_tail:D.
*/
smp_rmb(); /* LMM(data_push_tail:B) */
tail_lpos_new = atomic_long_read(&data_ring->tail_lpos
); /* LMM(data_push_tail:C) */
if (tail_lpos_new == tail_lpos)
return false;
/* Another CPU pushed the tail. Try again. */
tail_lpos = tail_lpos_new;
continue;
}
/*
* Guarantee any descriptor states that have transitioned to
* reusable are stored before pushing the tail lpos. A full
* memory barrier is needed since other CPUs may have made
* the descriptor states reusable. This pairs with
* data_push_tail:A.
*/
if (atomic_long_try_cmpxchg(&data_ring->tail_lpos, &tail_lpos,
next_lpos)) { /* LMM(data_push_tail:D) */
break;
}
}
return true;
}
/*
* Advance the desc ring tail. This function advances the tail by one
* descriptor, thus invalidating the oldest descriptor. Before advancing
* the tail, the tail descriptor is made reusable and all data blocks up to
* and including the descriptor's data block are invalidated (i.e. the data
* ring tail is pushed past the data block of the descriptor being made
* reusable).
*/
static bool desc_push_tail(struct printk_ringbuffer *rb,
unsigned long tail_id)
{
struct prb_desc_ring *desc_ring = &rb->desc_ring;
enum desc_state d_state;
struct prb_desc desc;
d_state = desc_read(desc_ring, tail_id, &desc, NULL, NULL);
switch (d_state) {
case desc_miss:
/*
* If the ID is exactly 1 wrap behind the expected, it is
* in the process of being reserved by another writer and
* must be considered reserved.
*/
if (DESC_ID(atomic_long_read(&desc.state_var)) ==
DESC_ID_PREV_WRAP(desc_ring, tail_id)) {
return false;
}
/*
* The ID has changed. Another writer must have pushed the
* tail and recycled the descriptor already. Success is
* returned because the caller is only interested in the
* specified tail being pushed, which it was.
*/
return true;
case desc_reserved:
case desc_committed:
return false;
case desc_finalized:
desc_make_reusable(desc_ring, tail_id);
break;
case desc_reusable:
break;
}
/*
* Data blocks must be invalidated before their associated
* descriptor can be made available for recycling. Invalidating
* them later is not possible because there is no way to trust
* data blocks once their associated descriptor is gone.
*/
if (!data_push_tail(rb, desc.text_blk_lpos.next))
return false;
/*
* Check the next descriptor after @tail_id before pushing the tail
* to it because the tail must always be in a finalized or reusable
* state. The implementation of prb_first_seq() relies on this.
*
* A successful read implies that the next descriptor is less than or
* equal to @head_id so there is no risk of pushing the tail past the
* head.
*/
d_state = desc_read(desc_ring, DESC_ID(tail_id + 1), &desc,
NULL, NULL); /* LMM(desc_push_tail:A) */
if (d_state == desc_finalized || d_state == desc_reusable) {
/*
* Guarantee any descriptor states that have transitioned to
* reusable are stored before pushing the tail ID. This allows
* verifying the recycled descriptor state. A full memory
* barrier is needed since other CPUs may have made the
* descriptor states reusable. This pairs with desc_reserve:D.
*/
atomic_long_cmpxchg(&desc_ring->tail_id, tail_id,
DESC_ID(tail_id + 1)); /* LMM(desc_push_tail:B) */
} else {
/*
* Guarantee the last state load from desc_read() is before
* reloading @tail_id in order to see a new tail ID in the
* case that the descriptor has been recycled. This pairs
* with desc_reserve:D.
*
* Memory barrier involvement:
*
* If desc_push_tail:A reads from desc_reserve:F, then
* desc_push_tail:D reads from desc_push_tail:B.
*
* Relies on:
*
* MB from desc_push_tail:B to desc_reserve:F
* matching
* RMB from desc_push_tail:A to desc_push_tail:D
*
* Note: desc_push_tail:B and desc_reserve:F can be different
* CPUs. However, the desc_reserve:F CPU (which performs
* the full memory barrier) must have previously seen
* desc_push_tail:B.
*/
smp_rmb(); /* LMM(desc_push_tail:C) */
/*
* Re-check the tail ID. The descriptor following @tail_id is
* not in an allowed tail state. But if the tail has since
* been moved by another CPU, then it does not matter.
*/
if (atomic_long_read(&desc_ring->tail_id) == tail_id) /* LMM(desc_push_tail:D) */
return false;
}
return true;
}
/* Reserve a new descriptor, invalidating the oldest if necessary. */
static bool desc_reserve(struct printk_ringbuffer *rb, unsigned long *id_out)
{
struct prb_desc_ring *desc_ring = &rb->desc_ring;
unsigned long prev_state_val;
unsigned long id_prev_wrap;
struct prb_desc *desc;
unsigned long head_id;
unsigned long id;
head_id = atomic_long_read(&desc_ring->head_id); /* LMM(desc_reserve:A) */
do {
id = DESC_ID(head_id + 1);
id_prev_wrap = DESC_ID_PREV_WRAP(desc_ring, id);
/*
* Guarantee the head ID is read before reading the tail ID.
* Since the tail ID is updated before the head ID, this
* guarantees that @id_prev_wrap is never ahead of the tail
* ID. This pairs with desc_reserve:D.
*
* Memory barrier involvement:
*
* If desc_reserve:A reads from desc_reserve:D, then
* desc_reserve:C reads from desc_push_tail:B.
*
* Relies on:
*
* MB from desc_push_tail:B to desc_reserve:D
* matching
* RMB from desc_reserve:A to desc_reserve:C
*
* Note: desc_push_tail:B and desc_reserve:D can be different
* CPUs. However, the desc_reserve:D CPU (which performs
* the full memory barrier) must have previously seen
* desc_push_tail:B.
*/
smp_rmb(); /* LMM(desc_reserve:B) */
if (id_prev_wrap == atomic_long_read(&desc_ring->tail_id
)) { /* LMM(desc_reserve:C) */
/*
* Make space for the new descriptor by
* advancing the tail.
*/
if (!desc_push_tail(rb, id_prev_wrap))
return false;
}
/*
* 1. Guarantee the tail ID is read before validating the
* recycled descriptor state. A read memory barrier is
* sufficient for this. This pairs with desc_push_tail:B.
*
* Memory barrier involvement:
*
* If desc_reserve:C reads from desc_push_tail:B, then
* desc_reserve:E reads from desc_make_reusable:A.
*
* Relies on:
*
* MB from desc_make_reusable:A to desc_push_tail:B
* matching
* RMB from desc_reserve:C to desc_reserve:E
*
* Note: desc_make_reusable:A and desc_push_tail:B can be
* different CPUs. However, the desc_push_tail:B CPU
* (which performs the full memory barrier) must have
* previously seen desc_make_reusable:A.
*
* 2. Guarantee the tail ID is stored before storing the head
* ID. This pairs with desc_reserve:B.
*
* 3. Guarantee any data ring tail changes are stored before
* recycling the descriptor. Data ring tail changes can
* happen via desc_push_tail()->data_push_tail(). A full
* memory barrier is needed since another CPU may have
* pushed the data ring tails. This pairs with
* data_push_tail:B.
*
* 4. Guarantee a new tail ID is stored before recycling the
* descriptor. A full memory barrier is needed since
* another CPU may have pushed the tail ID. This pairs
* with desc_push_tail:C and this also pairs with
* prb_first_seq:C.
*
* 5. Guarantee the head ID is stored before trying to
* finalize the previous descriptor. This pairs with
* _prb_commit:B.
*/
} while (!atomic_long_try_cmpxchg(&desc_ring->head_id, &head_id,
id)); /* LMM(desc_reserve:D) */
desc = to_desc(desc_ring, id);
/*
* If the descriptor has been recycled, verify the old state val.
* See "ABA Issues" about why this verification is performed.
*/
prev_state_val = atomic_long_read(&desc->state_var); /* LMM(desc_reserve:E) */
if (prev_state_val &&
get_desc_state(id_prev_wrap, prev_state_val) != desc_reusable) {
WARN_ON_ONCE(1);
return false;
}
/*
* Assign the descriptor a new ID and set its state to reserved.
* See "ABA Issues" about why cmpxchg() instead of set() is used.
*
* Guarantee the new descriptor ID and state is stored before making
* any other changes. A write memory barrier is sufficient for this.
* This pairs with desc_read:D.
*/
if (!atomic_long_try_cmpxchg(&desc->state_var, &prev_state_val,
DESC_SV(id, desc_reserved))) { /* LMM(desc_reserve:F) */
WARN_ON_ONCE(1);
return false;
}
/* Now data in @desc can be modified: LMM(desc_reserve:G) */
*id_out = id;
return true;
}
static bool is_blk_wrapped(struct prb_data_ring *data_ring,
unsigned long begin_lpos, unsigned long next_lpos)
{
/*
* Subtract one from next_lpos since it's not actually part of this data
* block. This allows perfectly fitting records to not wrap.
*/
return DATA_WRAPS(data_ring, begin_lpos) !=
DATA_WRAPS(data_ring, next_lpos - 1);
}
/* Determine the end of a data block. */
static unsigned long get_next_lpos(struct prb_data_ring *data_ring,
unsigned long lpos, unsigned int size)
{
unsigned long begin_lpos;
unsigned long next_lpos;
begin_lpos = lpos;
next_lpos = lpos + size;
/* First check if the data block does not wrap. */
if (!is_blk_wrapped(data_ring, begin_lpos, next_lpos))
return next_lpos;
/* Wrapping data blocks store their data at the beginning. */
return (DATA_THIS_WRAP_START_LPOS(data_ring, next_lpos) + size);
}
/*
* Allocate a new data block, invalidating the oldest data block(s)
* if necessary. This function also associates the data block with
* a specified descriptor.
*/
static char *data_alloc(struct printk_ringbuffer *rb, unsigned int size,
struct prb_data_blk_lpos *blk_lpos, unsigned long id)
{
struct prb_data_ring *data_ring = &rb->text_data_ring;
struct prb_data_block *blk;
unsigned long begin_lpos;
unsigned long next_lpos;
if (size == 0) {
/*
* Data blocks are not created for empty lines. Instead, the
* reader will recognize these special lpos values and handle
* it appropriately.
*/
blk_lpos->begin = EMPTY_LINE_LPOS;
blk_lpos->next = EMPTY_LINE_LPOS;
return NULL;
}
size = to_blk_size(size);
begin_lpos = atomic_long_read(&data_ring->head_lpos);
do {
next_lpos = get_next_lpos(data_ring, begin_lpos, size);
/*
* data_check_size() prevents data block allocation that could
* cause illegal ringbuffer states. But double check that the
* used space will not be bigger than the ring buffer. Wrapped
* messages need to reserve more space, see get_next_lpos().
*
* Specify a data-less block when the check or the allocation
* fails.
*/
if (WARN_ON_ONCE(next_lpos - begin_lpos > DATA_SIZE(data_ring)) ||
!data_push_tail(rb, next_lpos - DATA_SIZE(data_ring))) {
blk_lpos->begin = FAILED_LPOS;
blk_lpos->next = FAILED_LPOS;
return NULL;
}
/*
* 1. Guarantee any descriptor states that have transitioned
* to reusable are stored before modifying the newly
* allocated data area. A full memory barrier is needed
* since other CPUs may have made the descriptor states
* reusable. See data_push_tail:A about why the reusable
* states are visible. This pairs with desc_read:D.
*
* 2. Guarantee any updated tail lpos is stored before
* modifying the newly allocated data area. Another CPU may
* be in data_make_reusable() and is reading a block ID
* from this area. data_make_reusable() can handle reading
* a garbage block ID value, but then it must be able to
* load a new tail lpos. A full memory barrier is needed
* since other CPUs may have updated the tail lpos. This
* pairs with data_push_tail:B.
*/
} while (!atomic_long_try_cmpxchg(&data_ring->head_lpos, &begin_lpos,
next_lpos)); /* LMM(data_alloc:A) */
blk = to_block(data_ring, begin_lpos);
blk->id = id; /* LMM(data_alloc:B) */
if (is_blk_wrapped(data_ring, begin_lpos, next_lpos)) {
/* Wrapping data blocks store their data at the beginning. */
blk = to_block(data_ring, 0);
/*
* Store the ID on the wrapped block for consistency.
* The printk_ringbuffer does not actually use it.
*/
blk->id = id;
}
blk_lpos->begin = begin_lpos;
blk_lpos->next = next_lpos;
return &blk->data[0];
}
/*
* Try to resize an existing data block associated with the descriptor
* specified by @id. If the resized data block should become wrapped, it
* copies the old data to the new data block. If @size yields a data block
* with the same or less size, the data block is left as is.
*
* Fail if this is not the last allocated data block or if there is not
* enough space or it is not possible make enough space.
*
* Return a pointer to the beginning of the entire data buffer or NULL on
* failure.
*/
static char *data_realloc(struct printk_ringbuffer *rb, unsigned int size,
struct prb_data_blk_lpos *blk_lpos, unsigned long id)
{
struct prb_data_ring *data_ring = &rb->text_data_ring;
struct prb_data_block *blk;
unsigned long head_lpos;
unsigned long next_lpos;
bool wrapped;
/* Reallocation only works if @blk_lpos is the newest data block. */
head_lpos = atomic_long_read(&data_ring->head_lpos);
if (head_lpos != blk_lpos->next)
return NULL;
/* Keep track if @blk_lpos was a wrapping data block. */
wrapped = is_blk_wrapped(data_ring, blk_lpos->begin, blk_lpos->next);
size = to_blk_size(size);
next_lpos = get_next_lpos(data_ring, blk_lpos->begin, size);
/*
* Use the current data block when the size does not increase, i.e.
* when @head_lpos is already able to accommodate the new @next_lpos.
*
* Note that need_more_space() could never return false here because
* the difference between the positions was bigger than the data
* buffer size. The data block is reopened and can't get reused.
*/
if (!need_more_space(data_ring, head_lpos, next_lpos)) {
if (wrapped)
blk = to_block(data_ring, 0);
else
blk = to_block(data_ring, blk_lpos->begin);
return &blk->data[0];
}
/*
* data_check_size() prevents data block reallocation that could
* cause illegal ringbuffer states. But double check that the
* new used space will not be bigger than the ring buffer. Wrapped
* messages need to reserve more space, see get_next_lpos().
*
* Specify failure when the check or the allocation fails.
*/
if (WARN_ON_ONCE(next_lpos - blk_lpos->begin > DATA_SIZE(data_ring)) ||
!data_push_tail(rb, next_lpos - DATA_SIZE(data_ring))) {
return NULL;
}
/* The memory barrier involvement is the same as data_alloc:A. */
if (!atomic_long_try_cmpxchg(&data_ring->head_lpos, &head_lpos,
next_lpos)) { /* LMM(data_realloc:A) */
return NULL;
}
blk = to_block(data_ring, blk_lpos->begin);
if (is_blk_wrapped(data_ring, blk_lpos->begin, next_lpos)) {
struct prb_data_block *old_blk = blk;
/* Wrapping data blocks store their data at the beginning. */
blk = to_block(data_ring, 0);
/*
* Store the ID on the wrapped block for consistency.
* The printk_ringbuffer does not actually use it.
*/
blk->id = id;
if (!wrapped) {
/*
* Since the allocated space is now in the newly
* created wrapping data block, copy the content
* from the old data block.
*/
memcpy(&blk->data[0], &old_blk->data[0],
(blk_lpos->next - blk_lpos->begin) - sizeof(blk->id));
}
}
blk_lpos->next = next_lpos;
return &blk->data[0];
}
/* Return the number of bytes used by a data block. */
static unsigned int space_used(struct prb_data_ring *data_ring,
struct prb_data_blk_lpos *blk_lpos)
{
/* Data-less blocks take no space. */
if (BLK_DATALESS(blk_lpos))
return 0;
if (!is_blk_wrapped(data_ring, blk_lpos->begin, blk_lpos->next)) {
/* Data block does not wrap. */
return (DATA_INDEX(data_ring, blk_lpos->next) -
DATA_INDEX(data_ring, blk_lpos->begin));
}
/*
* For wrapping data blocks, the trailing (wasted) space is
* also counted.
*/
return (DATA_INDEX(data_ring, blk_lpos->next) +
DATA_SIZE(data_ring) - DATA_INDEX(data_ring, blk_lpos->begin));
}
/*
* Given @blk_lpos, return a pointer to the writer data from the data block
* and calculate the size of the data part. A NULL pointer is returned if
* @blk_lpos specifies values that could never be legal.
*
* This function (used by readers) performs strict validation on the lpos
* values to possibly detect bugs in the writer code. A WARN_ON_ONCE() is
* triggered if an internal error is detected.
*/
static const char *get_data(struct prb_data_ring *data_ring,
struct prb_data_blk_lpos *blk_lpos,
unsigned int *data_size)
{
struct prb_data_block *db;
/* Data-less data block description. */
if (BLK_DATALESS(blk_lpos)) {
/*
* Records that are just empty lines are also valid, even
* though they do not have a data block. For such records
* explicitly return empty string data to signify success.
*/
if (blk_lpos->begin == EMPTY_LINE_LPOS &&
blk_lpos->next == EMPTY_LINE_LPOS) {
*data_size = 0;
return "";
}
/* Data lost, invalid, or otherwise unavailable. */
return NULL;
}
/* Regular data block: @begin and @next in the same wrap. */
if (!is_blk_wrapped(data_ring, blk_lpos->begin, blk_lpos->next)) {
db = to_block(data_ring, blk_lpos->begin);
*data_size = blk_lpos->next - blk_lpos->begin;
/* Wrapping data block: @begin is one wrap behind @next. */
} else if (!is_blk_wrapped(data_ring,
blk_lpos->begin + DATA_SIZE(data_ring),
blk_lpos->next)) {
db = to_block(data_ring, 0);
*data_size = DATA_INDEX(data_ring, blk_lpos->next);
/* Illegal block description. */
} else {
WARN_ON_ONCE(1);
return NULL;
}
/* Sanity check. Data-less blocks were handled earlier. */
if (WARN_ON_ONCE(!data_check_size(data_ring, *data_size) || !*data_size))
return NULL;
/* A valid data block will always be aligned to the ID size. */
if (WARN_ON_ONCE(blk_lpos->begin != ALIGN(blk_lpos->begin, sizeof(db->id))) ||
WARN_ON_ONCE(blk_lpos->next != ALIGN(blk_lpos->next, sizeof(db->id)))) {
return NULL;
}
/* A valid data block will always have at least an ID. */
if (WARN_ON_ONCE(*data_size < sizeof(db->id)))
return NULL;
/* Subtract block ID space from size to reflect data size. */
*data_size -= sizeof(db->id);
return &db->data[0];
}
/*
* Attempt to transition the newest descriptor from committed back to reserved
* so that the record can be modified by a writer again. This is only possible
* if the descriptor is not yet finalized and the provided @caller_id matches.
*/
static struct prb_desc *desc_reopen_last(struct prb_desc_ring *desc_ring,
u32 caller_id, unsigned long *id_out)
{
unsigned long prev_state_val;
enum desc_state d_state;
struct prb_desc desc;
struct prb_desc *d;
unsigned long id;
u32 cid;
id = atomic_long_read(&desc_ring->head_id);
/*
* To reduce unnecessarily reopening, first check if the descriptor
* state and caller ID are correct.
*/
d_state = desc_read(desc_ring, id, &desc, NULL, &cid);
if (d_state != desc_committed || cid != caller_id)
return NULL;
d = to_desc(desc_ring, id);
prev_state_val = DESC_SV(id, desc_committed);
/*
* Guarantee the reserved state is stored before reading any
* record data. A full memory barrier is needed because @state_var
* modification is followed by reading. This pairs with _prb_commit:B.
*
* Memory barrier involvement:
*
* If desc_reopen_last:A reads from _prb_commit:B, then
* prb_reserve_in_last:A reads from _prb_commit:A.
*
* Relies on:
*
* WMB from _prb_commit:A to _prb_commit:B
* matching
* MB If desc_reopen_last:A to prb_reserve_in_last:A
*/
if (!atomic_long_try_cmpxchg(&d->state_var, &prev_state_val,
DESC_SV(id, desc_reserved))) { /* LMM(desc_reopen_last:A) */
return NULL;
}
*id_out = id;
return d;
}
/**
* prb_reserve_in_last() - Re-reserve and extend the space in the ringbuffer
* used by the newest record.
*
* @e: The entry structure to setup.
* @rb: The ringbuffer to re-reserve and extend data in.
* @r: The record structure to allocate buffers for.
* @caller_id: The caller ID of the caller (reserving writer).
* @max_size: Fail if the extended size would be greater than this.
*
* This is the public function available to writers to re-reserve and extend
* data.
*
* The writer specifies the text size to extend (not the new total size) by
* setting the @text_buf_size field of @r. To ensure proper initialization
* of @r, prb_rec_init_wr() should be used.
*
* This function will fail if @caller_id does not match the caller ID of the
* newest record. In that case the caller must reserve new data using
* prb_reserve().
*
* Context: Any context. Disables local interrupts on success.
* Return: true if text data could be extended, otherwise false.
*
* On success:
*
* - @r->text_buf points to the beginning of the entire text buffer.
*
* - @r->text_buf_size is set to the new total size of the buffer.
*
* - @r->info is not touched so that @r->info->text_len could be used
* to append the text.
*
* - prb_record_text_space() can be used on @e to query the new
* actually used space.
*
* Important: All @r->info fields will already be set with the current values
* for the record. I.e. @r->info->text_len will be less than
* @text_buf_size. Writers can use @r->info->text_len to know
* where concatenation begins and writers should update
* @r->info->text_len after concatenating.
*/
bool prb_reserve_in_last(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
struct printk_record *r, u32 caller_id, unsigned int max_size)
{
struct prb_desc_ring *desc_ring = &rb->desc_ring;
struct printk_info *info;
unsigned int data_size;
struct prb_desc *d;
unsigned long id;
local_irq_save(e->irqflags);
/* Transition the newest descriptor back to the reserved state. */
d = desc_reopen_last(desc_ring, caller_id, &id);
if (!d) {
local_irq_restore(e->irqflags);
goto fail_reopen;
}
/* Now the writer has exclusive access: LMM(prb_reserve_in_last:A) */
info = to_info(desc_ring, id);
/*
* Set the @e fields here so that prb_commit() can be used if
* anything fails from now on.
*/
e->rb = rb;
e->id = id;
/*
* desc_reopen_last() checked the caller_id, but there was no
* exclusive access at that point. The descriptor may have
* changed since then.
*/
if (caller_id != info->caller_id)
goto fail;
if (BLK_DATALESS(&d->text_blk_lpos)) {
if (WARN_ON_ONCE(info->text_len != 0)) {
pr_warn_once("wrong text_len value (%hu, expecting 0)\n",
info->text_len);
info->text_len = 0;
}
if (!data_check_size(&rb->text_data_ring, r->text_buf_size))
goto fail;
if (r->text_buf_size > max_size)
goto fail;
r->text_buf = data_alloc(rb, r->text_buf_size,
&d->text_blk_lpos, id);
} else {
if (!get_data(&rb->text_data_ring, &d->text_blk_lpos, &data_size))
goto fail;
/*
* Increase the buffer size to include the original size. If
* the meta data (@text_len) is not sane, use the full data
* block size.
*/
if (WARN_ON_ONCE(info->text_len > data_size)) {
pr_warn_once("wrong text_len value (%hu, expecting <=%u)\n",
info->text_len, data_size);
info->text_len = data_size;
}
r->text_buf_size += info->text_len;
if (!data_check_size(&rb->text_data_ring, r->text_buf_size))
goto fail;
if (r->text_buf_size > max_size)
goto fail;
r->text_buf = data_realloc(rb, r->text_buf_size,
&d->text_blk_lpos, id);
}
if (r->text_buf_size && !r->text_buf)
goto fail;
r->info = info;
e->text_space = space_used(&rb->text_data_ring, &d->text_blk_lpos);
return true;
fail:
prb_commit(e);
/* prb_commit() re-enabled interrupts. */
fail_reopen:
/* Make it clear to the caller that the re-reserve failed. */
memset(r, 0, sizeof(*r));
return false;
}
/*
* @last_finalized_seq value guarantees that all records up to and including
* this sequence number are finalized and can be read. The only exception are
* too old records which have already been overwritten.
*
* It is also guaranteed that @last_finalized_seq only increases.
*
* Be aware that finalized records following non-finalized records are not
* reported because they are not yet available to the reader. For example,
* a new record stored via printk() will not be available to a printer if
* it follows a record that has not been finalized yet. However, once that
* non-finalized record becomes finalized, @last_finalized_seq will be
* appropriately updated and the full set of finalized records will be
* available to the printer. And since each printk() caller will either
* directly print or trigger deferred printing of all available unprinted
* records, all printk() messages will get printed.
*/
static u64 desc_last_finalized_seq(struct printk_ringbuffer *rb)
{
struct prb_desc_ring *desc_ring = &rb->desc_ring;
unsigned long ulseq;
/*
* Guarantee the sequence number is loaded before loading the
* associated record in order to guarantee that the record can be
* seen by this CPU. This pairs with desc_update_last_finalized:A.
*/
ulseq = atomic_long_read_acquire(&desc_ring->last_finalized_seq
); /* LMM(desc_last_finalized_seq:A) */
return __ulseq_to_u64seq(rb, ulseq);
}
static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq,
struct printk_record *r, unsigned int *line_count);
/*
* Check if there are records directly following @last_finalized_seq that are
* finalized. If so, update @last_finalized_seq to the latest of these
* records. It is not allowed to skip over records that are not yet finalized.
*/
static void desc_update_last_finalized(struct printk_ringbuffer *rb)
{
struct prb_desc_ring *desc_ring = &rb->desc_ring;
u64 old_seq = desc_last_finalized_seq(rb);
unsigned long oldval;
unsigned long newval;
u64 finalized_seq;
u64 try_seq;
try_again:
finalized_seq = old_seq;
try_seq = finalized_seq + 1;
/* Try to find later finalized records. */
while (_prb_read_valid(rb, &try_seq, NULL, NULL)) {
finalized_seq = try_seq;
try_seq++;
}
/* No update needed if no later finalized record was found. */
if (finalized_seq == old_seq)
return;
oldval = __u64seq_to_ulseq(old_seq);
newval = __u64seq_to_ulseq(finalized_seq);
/*
* Set the sequence number of a later finalized record that has been
* seen.
*
* Guarantee the record data is visible to other CPUs before storing
* its sequence number. This pairs with desc_last_finalized_seq:A.
*
* Memory barrier involvement:
*
* If desc_last_finalized_seq:A reads from
* desc_update_last_finalized:A, then desc_read:A reads from
* _prb_commit:B.
*
* Relies on:
*
* RELEASE from _prb_commit:B to desc_update_last_finalized:A
* matching
* ACQUIRE from desc_last_finalized_seq:A to desc_read:A
*
* Note: _prb_commit:B and desc_update_last_finalized:A can be
* different CPUs. However, the desc_update_last_finalized:A
* CPU (which performs the release) must have previously seen
* _prb_commit:B.
*/
if (!atomic_long_try_cmpxchg_release(&desc_ring->last_finalized_seq,
&oldval, newval)) { /* LMM(desc_update_last_finalized:A) */
old_seq = __ulseq_to_u64seq(rb, oldval);
goto try_again;
}
}
/*
* Attempt to finalize a specified descriptor. If this fails, the descriptor
* is either already final or it will finalize itself when the writer commits.
*/
static void desc_make_final(struct printk_ringbuffer *rb, unsigned long id)
{
struct prb_desc_ring *desc_ring = &rb->desc_ring;
unsigned long prev_state_val = DESC_SV(id, desc_committed);
struct prb_desc *d = to_desc(desc_ring, id);
if (atomic_long_try_cmpxchg_relaxed(&d->state_var, &prev_state_val,
DESC_SV(id, desc_finalized))) { /* LMM(desc_make_final:A) */
desc_update_last_finalized(rb);
}
}
/**
* prb_reserve() - Reserve space in the ringbuffer.
*
* @e: The entry structure to setup.
* @rb: The ringbuffer to reserve data in.
* @r: The record structure to allocate buffers for.
*
* This is the public function available to writers to reserve data.
*
* The writer specifies the text size to reserve by setting the
* @text_buf_size field of @r. To ensure proper initialization of @r,
* prb_rec_init_wr() should be used.
*
* Context: Any context. Disables local interrupts on success.
* Return: true if at least text data could be allocated, otherwise false.
*
* On success, the fields @info and @text_buf of @r will be set by this
* function and should be filled in by the writer before committing. Also
* on success, prb_record_text_space() can be used on @e to query the actual
* space used for the text data block.
*
* Important: @info->text_len needs to be set correctly by the writer in
* order for data to be readable and/or extended. Its value
* is initialized to 0.
*/
bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
struct printk_record *r)
{
struct prb_desc_ring *desc_ring = &rb->desc_ring;
struct printk_info *info;
struct prb_desc *d;
unsigned long id;
u64 seq;
if (!data_check_size(&rb->text_data_ring, r->text_buf_size))
goto fail;
/*
* Descriptors in the reserved state act as blockers to all further
* reservations once the desc_ring has fully wrapped. Disable
* interrupts during the reserve/commit window in order to minimize
* the likelihood of this happening.
*/
local_irq_save(e->irqflags);
if (!desc_reserve(rb, &id)) {
/* Descriptor reservation failures are tracked. */
atomic_long_inc(&rb->fail);
local_irq_restore(e->irqflags);
goto fail;
}
d = to_desc(desc_ring, id);
info = to_info(desc_ring, id);
/*
* All @info fields (except @seq) are cleared and must be filled in
* by the writer. Save @seq before clearing because it is used to
* determine the new sequence number.
*/
seq = info->seq;
memset(info, 0, sizeof(*info));
/*
* Set the @e fields here so that prb_commit() can be used if
* text data allocation fails.
*/
e->rb = rb;
e->id = id;
/*
* Initialize the sequence number if it has "never been set".
* Otherwise just increment it by a full wrap.
*
* @seq is considered "never been set" if it has a value of 0,
* _except_ for @infos[0], which was specially setup by the ringbuffer
* initializer and therefore is always considered as set.
*
* See the "Bootstrap" comment block in printk_ringbuffer.h for
* details about how the initializer bootstraps the descriptors.
*/
if (seq == 0 && DESC_INDEX(desc_ring, id) != 0)
info->seq = DESC_INDEX(desc_ring, id);
else
info->seq = seq + DESCS_COUNT(desc_ring);
/*
* New data is about to be reserved. Once that happens, previous
* descriptors are no longer able to be extended. Finalize the
* previous descriptor now so that it can be made available to
* readers. (For seq==0 there is no previous descriptor.)
*/
if (info->seq > 0)
desc_make_final(rb, DESC_ID(id - 1));
r->text_buf = data_alloc(rb, r->text_buf_size, &d->text_blk_lpos, id);
/* If text data allocation fails, a data-less record is committed. */
if (r->text_buf_size && !r->text_buf) {
prb_commit(e);
/* prb_commit() re-enabled interrupts. */
goto fail;
}
r->info = info;
/* Record full text space used by record. */
e->text_space = space_used(&rb->text_data_ring, &d->text_blk_lpos);
return true;
fail:
/* Make it clear to the caller that the reserve failed. */
memset(r, 0, sizeof(*r));
return false;
}
EXPORT_SYMBOL_IF_KUNIT(prb_reserve);
/* Commit the data (possibly finalizing it) and restore interrupts. */
static void _prb_commit(struct prb_reserved_entry *e, unsigned long state_val)
{
struct prb_desc_ring *desc_ring = &e->rb->desc_ring;
struct prb_desc *d = to_desc(desc_ring, e->id);
unsigned long prev_state_val = DESC_SV(e->id, desc_reserved);
/* Now the writer has finished all writing: LMM(_prb_commit:A) */
/*
* Set the descriptor as committed. See "ABA Issues" about why
* cmpxchg() instead of set() is used.
*
* 1 Guarantee all record data is stored before the descriptor state
* is stored as committed. A write memory barrier is sufficient
* for this. This pairs with desc_read:B and desc_reopen_last:A.
*
* 2. Guarantee the descriptor state is stored as committed before
* re-checking the head ID in order to possibly finalize this
* descriptor. This pairs with desc_reserve:D.
*
* Memory barrier involvement:
*
* If prb_commit:A reads from desc_reserve:D, then
* desc_make_final:A reads from _prb_commit:B.
*
* Relies on:
*
* MB _prb_commit:B to prb_commit:A
* matching
* MB desc_reserve:D to desc_make_final:A
*/
if (!atomic_long_try_cmpxchg(&d->state_var, &prev_state_val,
DESC_SV(e->id, state_val))) { /* LMM(_prb_commit:B) */
WARN_ON_ONCE(1);
}
/* Restore interrupts, the reserve/commit window is finished. */
local_irq_restore(e->irqflags);
}
/**
* prb_commit() - Commit (previously reserved) data to the ringbuffer.
*
* @e: The entry containing the reserved data information.
*
* This is the public function available to writers to commit data.
*
* Note that the data is not yet available to readers until it is finalized.
* Finalizing happens automatically when space for the next record is
* reserved.
*
* See prb_final_commit() for a version of this function that finalizes
* immediately.
*
* Context: Any context. Enables local interrupts.
*/
void prb_commit(struct prb_reserved_entry *e)
{
struct prb_desc_ring *desc_ring = &e->rb->desc_ring;
unsigned long head_id;
_prb_commit(e, desc_committed);
/*
* If this descriptor is no longer the head (i.e. a new record has
* been allocated), extending the data for this record is no longer
* allowed and therefore it must be finalized.
*/
head_id = atomic_long_read(&desc_ring->head_id); /* LMM(prb_commit:A) */
if (head_id != e->id)
desc_make_final(e->rb, e->id);
}
EXPORT_SYMBOL_IF_KUNIT(prb_commit);
/**
* prb_final_commit() - Commit and finalize (previously reserved) data to
* the ringbuffer.
*
* @e: The entry containing the reserved data information.
*
* This is the public function available to writers to commit+finalize data.
*
* By finalizing, the data is made immediately available to readers.
*
* This function should only be used if there are no intentions of extending
* this data using prb_reserve_in_last().
*
* Context: Any context. Enables local interrupts.
*/
void prb_final_commit(struct prb_reserved_entry *e)
{
_prb_commit(e, desc_finalized);
desc_update_last_finalized(e->rb);
}
/*
* Count the number of lines in provided text. All text has at least 1 line
* (even if @text_size is 0). Each '\n' processed is counted as an additional
* line.
*/
static unsigned int count_lines(const char *text, unsigned int text_size)
{
unsigned int next_size = text_size;
unsigned int line_count = 1;
const char *next = text;
while (next_size) {
next = memchr(next, '\n', next_size);
if (!next)
break;
line_count++;
next++;
next_size = text_size - (next - text);
}
return line_count;
}
/*
* Given @blk_lpos, copy an expected @len of data into the provided buffer.
* If @line_count is provided, count the number of lines in the data.
*
* This function (used by readers) performs strict validation on the data
* size to possibly detect bugs in the writer code. A WARN_ON_ONCE() is
* triggered if an internal error is detected.
*/
static bool copy_data(struct prb_data_ring *data_ring,
struct prb_data_blk_lpos *blk_lpos, u16 len, char *buf,
unsigned int buf_size, unsigned int *line_count)
{
unsigned int data_size;
const char *data;
/* Caller might not want any data. */
if ((!buf || !buf_size) && !line_count)
return true;
data = get_data(data_ring, blk_lpos, &data_size);
if (!data)
return false;
/*
* Actual cannot be less than expected. It can be more than expected
* because of the trailing alignment padding.
*
* Note that invalid @len values can occur because the caller loads
* the value during an allowed data race.
*/
if (data_size < (unsigned int)len)
return false;
/* Caller interested in the line count? */
if (line_count)
*line_count = count_lines(data, len);
/* Caller interested in the data content? */
if (!buf || !buf_size)
return true;
data_size = min_t(unsigned int, buf_size, len);
memcpy(&buf[0], data, data_size); /* LMM(copy_data:A) */
return true;
}
/*
* This is an extended version of desc_read(). It gets a copy of a specified
* descriptor. However, it also verifies that the record is finalized and has
* the sequence number @seq. On success, 0 is returned.
*
* Error return values:
* -EINVAL: A finalized record with sequence number @seq does not exist.
* -ENOENT: A finalized record with sequence number @seq exists, but its data
* is not available. This is a valid record, so readers should
* continue with the next record.
*/
static int desc_read_finalized_seq(struct prb_desc_ring *desc_ring,
unsigned long id, u64 seq,
struct prb_desc *desc_out)
{
struct prb_data_blk_lpos *blk_lpos = &desc_out->text_blk_lpos;
enum desc_state d_state;
u64 s;
d_state = desc_read(desc_ring, id, desc_out, &s, NULL);
/*
* An unexpected @id (desc_miss) or @seq mismatch means the record
* does not exist. A descriptor in the reserved or committed state
* means the record does not yet exist for the reader.
*/
if (d_state == desc_miss ||
d_state == desc_reserved ||
d_state == desc_committed ||
s != seq) {
return -EINVAL;
}
/*
* A descriptor in the reusable state may no longer have its data
* available; report it as existing but with lost data. Or the record
* may actually be a record with lost data.
*/
if (d_state == desc_reusable ||
(blk_lpos->begin == FAILED_LPOS && blk_lpos->next == FAILED_LPOS)) {
return -ENOENT;
}
return 0;
}
/*
* Copy the ringbuffer data from the record with @seq to the provided
* @r buffer. On success, 0 is returned.
*
* See desc_read_finalized_seq() for error return values.
*/
static int prb_read(struct printk_ringbuffer *rb, u64 seq,
struct printk_record *r, unsigned int *line_count)
{
struct prb_desc_ring *desc_ring = &rb->desc_ring;
struct printk_info *info = to_info(desc_ring, seq);
struct prb_desc *rdesc = to_desc(desc_ring, seq);
atomic_long_t *state_var = &rdesc->state_var;
struct prb_desc desc;
unsigned long id;
int err;
/* Extract the ID, used to specify the descriptor to read. */
id = DESC_ID(atomic_long_read(state_var));
/* Get a local copy of the correct descriptor (if available). */
err = desc_read_finalized_seq(desc_ring, id, seq, &desc);
/*
* If @r is NULL, the caller is only interested in the availability
* of the record.
*/
if (err || !r)
return err;
/* If requested, copy meta data. */
if (r->info)
memcpy(r->info, info, sizeof(*(r->info)));
/* Copy text data. If it fails, this is a data-less record. */
if (!copy_data(&rb->text_data_ring, &desc.text_blk_lpos, info->text_len,
r->text_buf, r->text_buf_size, line_count)) {
return -ENOENT;
}
/* Ensure the record is still finalized and has the same @seq. */
return desc_read_finalized_seq(desc_ring, id, seq, &desc);
}
/* Get the sequence number of the tail descriptor. */
u64 prb_first_seq(struct printk_ringbuffer *rb)
{
struct prb_desc_ring *desc_ring = &rb->desc_ring;
enum desc_state d_state;
struct prb_desc desc;
unsigned long id;
u64 seq;
for (;;) {
id = atomic_long_read(&rb->desc_ring.tail_id); /* LMM(prb_first_seq:A) */
d_state = desc_read(desc_ring, id, &desc, &seq, NULL); /* LMM(prb_first_seq:B) */
/*
* This loop will not be infinite because the tail is
* _always_ in the finalized or reusable state.
*/
if (d_state == desc_finalized || d_state == desc_reusable)
break;
/*
* Guarantee the last state load from desc_read() is before
* reloading @tail_id in order to see a new tail in the case
* that the descriptor has been recycled. This pairs with
* desc_reserve:D.
*
* Memory barrier involvement:
*
* If prb_first_seq:B reads from desc_reserve:F, then
* prb_first_seq:A reads from desc_push_tail:B.
*
* Relies on:
*
* MB from desc_push_tail:B to desc_reserve:F
* matching
* RMB prb_first_seq:B to prb_first_seq:A
*/
smp_rmb(); /* LMM(prb_first_seq:C) */
}
return seq;
}
/**
* prb_next_reserve_seq() - Get the sequence number after the most recently
* reserved record.
*
* @rb: The ringbuffer to get the sequence number from.
*
* This is the public function available to readers to see what sequence
* number will be assigned to the next reserved record.
*
* Note that depending on the situation, this value can be equal to or
* higher than the sequence number returned by prb_next_seq().
*
* Context: Any context.
* Return: The sequence number that will be assigned to the next record
* reserved.
*/
u64 prb_next_reserve_seq(struct printk_ringbuffer *rb)
{
struct prb_desc_ring *desc_ring = &rb->desc_ring;
unsigned long last_finalized_id;
atomic_long_t *state_var;
u64 last_finalized_seq;
unsigned long head_id;
struct prb_desc desc;
unsigned long diff;
struct prb_desc *d;
int err;
/*
* It may not be possible to read a sequence number for @head_id.
* So the ID of @last_finailzed_seq is used to calculate what the
* sequence number of @head_id will be.
*/
try_again:
last_finalized_seq = desc_last_finalized_seq(rb);
/*
* @head_id is loaded after @last_finalized_seq to ensure that
* it points to the record with @last_finalized_seq or newer.
*
* Memory barrier involvement:
*
* If desc_last_finalized_seq:A reads from
* desc_update_last_finalized:A, then
* prb_next_reserve_seq:A reads from desc_reserve:D.
*
* Relies on:
*
* RELEASE from desc_reserve:D to desc_update_last_finalized:A
* matching
* ACQUIRE from desc_last_finalized_seq:A to prb_next_reserve_seq:A
*
* Note: desc_reserve:D and desc_update_last_finalized:A can be
* different CPUs. However, the desc_update_last_finalized:A CPU
* (which performs the release) must have previously seen
* desc_read:C, which implies desc_reserve:D can be seen.
*/
head_id = atomic_long_read(&desc_ring->head_id); /* LMM(prb_next_reserve_seq:A) */
d = to_desc(desc_ring, last_finalized_seq);
state_var = &d->state_var;
/* Extract the ID, used to specify the descriptor to read. */
last_finalized_id = DESC_ID(atomic_long_read(state_var));
/* Ensure @last_finalized_id is correct. */
err = desc_read_finalized_seq(desc_ring, last_finalized_id, last_finalized_seq, &desc);
if (err == -EINVAL) {
if (last_finalized_seq == 0) {
/*
* No record has been finalized or even reserved yet.
*
* The @head_id is initialized such that the first
* increment will yield the first record (seq=0).
* Handle it separately to avoid a negative @diff
* below.
*/
if (head_id == DESC0_ID(desc_ring->count_bits))
return 0;
/*
* One or more descriptors are already reserved. Use
* the descriptor ID of the first one (@seq=0) for
* the @diff below.
*/
last_finalized_id = DESC0_ID(desc_ring->count_bits) + 1;
} else {
/* Record must have been overwritten. Try again. */
goto try_again;
}
}
/* Diff of known descriptor IDs to compute related sequence numbers. */
diff = head_id - last_finalized_id;
/*
* @head_id points to the most recently reserved record, but this
* function returns the sequence number that will be assigned to the
* next (not yet reserved) record. Thus +1 is needed.
*/
return (last_finalized_seq + diff + 1);
}
/*
* Non-blocking read of a record.
*
* On success @seq is updated to the record that was read and (if provided)
* @r and @line_count will contain the read/calculated data.
*
* On failure @seq is updated to a record that is not yet available to the
* reader, but it will be the next record available to the reader.
*
* Note: When the current CPU is in panic, this function will skip over any
* non-existent/non-finalized records in order to allow the panic CPU
* to print any and all records that have been finalized.
*/
static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq,
struct printk_record *r, unsigned int *line_count)
{
u64 tail_seq;
int err;
while ((err = prb_read(rb, *seq, r, line_count))) {
tail_seq = prb_first_seq(rb);
if (*seq < tail_seq) {
/*
* Behind the tail. Catch up and try again. This
* can happen for -ENOENT and -EINVAL cases.
*/
*seq = tail_seq;
} else if (err == -ENOENT) {
/* Record exists, but the data was lost. Skip. */
(*seq)++;
} else {
/*
* Non-existent/non-finalized record. Must stop.
*
* For panic situations it cannot be expected that
* non-finalized records will become finalized. But
* there may be other finalized records beyond that
* need to be printed for a panic situation. If this
* is the panic CPU, skip this
* non-existent/non-finalized record unless non-panic
* CPUs are still running and their debugging is
* explicitly enabled.
*
* Note that new messages printed on panic CPU are
* finalized when we are here. The only exception
* might be the last message without trailing newline.
* But it would have the sequence number returned
* by "prb_next_reserve_seq() - 1".
*/
if (panic_on_this_cpu() &&
(!debug_non_panic_cpus || legacy_allow_panic_sync) &&
((*seq + 1) < prb_next_reserve_seq(rb))) {
(*seq)++;
} else {
return false;
}
}
}
return true;
}
/**
* prb_read_valid() - Non-blocking read of a requested record or (if gone)
* the next available record.
*
* @rb: The ringbuffer to read from.
* @seq: The sequence number of the record to read.
* @r: A record data buffer to store the read record to.
*
* This is the public function available to readers to read a record.
*
* The reader provides the @info and @text_buf buffers of @r to be
* filled in. Any of the buffer pointers can be set to NULL if the reader
* is not interested in that data. To ensure proper initialization of @r,
* prb_rec_init_rd() should be used.
*
* Context: Any context.
* Return: true if a record was read, otherwise false.
*
* On success, the reader must check r->info.seq to see which record was
* actually read. This allows the reader to detect dropped records.
*
* Failure means @seq refers to a record not yet available to the reader.
*/
bool prb_read_valid(struct printk_ringbuffer *rb, u64 seq,
struct printk_record *r)
{
return _prb_read_valid(rb, &seq, r, NULL);
}
EXPORT_SYMBOL_IF_KUNIT(prb_read_valid);
/**
* prb_read_valid_info() - Non-blocking read of meta data for a requested
* record or (if gone) the next available record.
*
* @rb: The ringbuffer to read from.
* @seq: The sequence number of the record to read.
* @info: A buffer to store the read record meta data to.
* @line_count: A buffer to store the number of lines in the record text.
*
* This is the public function available to readers to read only the
* meta data of a record.
*
* The reader provides the @info, @line_count buffers to be filled in.
* Either of the buffer pointers can be set to NULL if the reader is not
* interested in that data.
*
* Context: Any context.
* Return: true if a record's meta data was read, otherwise false.
*
* On success, the reader must check info->seq to see which record meta data
* was actually read. This allows the reader to detect dropped records.
*
* Failure means @seq refers to a record not yet available to the reader.
*/
bool prb_read_valid_info(struct printk_ringbuffer *rb, u64 seq,
struct printk_info *info, unsigned int *line_count)
{
struct printk_record r;
prb_rec_init_rd(&r, info, NULL, 0);
return _prb_read_valid(rb, &seq, &r, line_count);
}
/**
* prb_first_valid_seq() - Get the sequence number of the oldest available
* record.
*
* @rb: The ringbuffer to get the sequence number from.
*
* This is the public function available to readers to see what the
* first/oldest valid sequence number is.
*
* This provides readers a starting point to begin iterating the ringbuffer.
*
* Context: Any context.
* Return: The sequence number of the first/oldest record or, if the
* ringbuffer is empty, 0 is returned.
*/
u64 prb_first_valid_seq(struct printk_ringbuffer *rb)
{
u64 seq = 0;
if (!_prb_read_valid(rb, &seq, NULL, NULL))
return 0;
return seq;
}
/**
* prb_next_seq() - Get the sequence number after the last available record.
*
* @rb: The ringbuffer to get the sequence number from.
*
* This is the public function available to readers to see what the next
* newest sequence number available to readers will be.
*
* This provides readers a sequence number to jump to if all currently
* available records should be skipped. It is guaranteed that all records
* previous to the returned value have been finalized and are (or were)
* available to the reader.
*
* Context: Any context.
* Return: The sequence number of the next newest (not yet available) record
* for readers.
*/
u64 prb_next_seq(struct printk_ringbuffer *rb)
{
u64 seq;
seq = desc_last_finalized_seq(rb);
/*
* Begin searching after the last finalized record.
*
* On 0, the search must begin at 0 because of hack#2
* of the bootstrapping phase it is not known if a
* record at index 0 exists.
*/
if (seq != 0)
seq++;
/*
* The information about the last finalized @seq might be inaccurate.
* Search forward to find the current one.
*/
while (_prb_read_valid(rb, &seq, NULL, NULL))
seq++;
return seq;
}
/**
* prb_init() - Initialize a ringbuffer to use provided external buffers.
*
* @rb: The ringbuffer to initialize.
* @text_buf: The data buffer for text data.
* @textbits: The size of @text_buf as a power-of-2 value.
* @descs: The descriptor buffer for ringbuffer records.
* @descbits: The count of @descs items as a power-of-2 value.
* @infos: The printk_info buffer for ringbuffer records.
*
* This is the public function available to writers to setup a ringbuffer
* during runtime using provided buffers.
*
* This must match the initialization of DEFINE_PRINTKRB().
*
* Context: Any context.
*/
void prb_init(struct printk_ringbuffer *rb,
char *text_buf, unsigned int textbits,
struct prb_desc *descs, unsigned int descbits,
struct printk_info *infos)
{
memset(descs, 0, _DESCS_COUNT(descbits) * sizeof(descs[0]));
memset(infos, 0, _DESCS_COUNT(descbits) * sizeof(infos[0]));
rb->desc_ring.count_bits = descbits;
rb->desc_ring.descs = descs;
rb->desc_ring.infos = infos;
atomic_long_set(&rb->desc_ring.head_id, DESC0_ID(descbits));
atomic_long_set(&rb->desc_ring.tail_id, DESC0_ID(descbits));
atomic_long_set(&rb->desc_ring.last_finalized_seq, 0);
rb->text_data_ring.size_bits = textbits;
rb->text_data_ring.data = text_buf;
atomic_long_set(&rb->text_data_ring.head_lpos, BLK0_LPOS(textbits));
atomic_long_set(&rb->text_data_ring.tail_lpos, BLK0_LPOS(textbits));
atomic_long_set(&rb->fail, 0);
atomic_long_set(&(descs[_DESCS_COUNT(descbits) - 1].state_var), DESC0_SV(descbits));
descs[_DESCS_COUNT(descbits) - 1].text_blk_lpos.begin = FAILED_LPOS;
descs[_DESCS_COUNT(descbits) - 1].text_blk_lpos.next = FAILED_LPOS;
infos[0].seq = -(u64)_DESCS_COUNT(descbits);
infos[_DESCS_COUNT(descbits) - 1].seq = 0;
}
EXPORT_SYMBOL_IF_KUNIT(prb_init);
/**
* prb_record_text_space() - Query the full actual used ringbuffer space for
* the text data of a reserved entry.
*
* @e: The successfully reserved entry to query.
*
* This is the public function available to writers to see how much actual
* space is used in the ringbuffer to store the text data of the specified
* entry.
*
* This function is only valid if @e has been successfully reserved using
* prb_reserve().
*
* Context: Any context.
* Return: The size in bytes used by the text data of the associated record.
*/
unsigned int prb_record_text_space(struct prb_reserved_entry *e)
{
return e->text_space;
}
] |
| 9/2 | 2026/02/24 17:55 | llm | expert |
0mModel:gemini-3-flash-preview Tokens: input: 31415 output: 21 thoughts: 254 Thoughts: **Analyzing a Suspected Race Condition in the Printk Ringbuffer**
Okay, so we have a potential race condition flagged at `0xffffffff86cd92e0`. The write is originating from `prb_reserve+0x6e6/0xae0` and the read from `_prb_read_valid+0x1c4/0x950`. We'll need to drill down into `prb_reserve` to understand the write path better.
Looking at the code, specifically the `prb_reserve` function, I see the following sequence:
```c
bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
struct printk_record *r)
{
...
d = to_desc(desc_ring, id);
info = to_info(desc_ring, id);
/*
* All @info fields (except @seq) are cleared and must be filled in
* by the writer. Save @seq before clearing because it is used to
* determine the new sequence number.
*/
seq = info->seq;
memset(info, 0, sizeof(*info)); // <--- This might be the 88-byte write?
...
```
The key line that jumps out is the `memset(info, 0, sizeof(*info));` call. This is where the write is happening. The comment suggests that this clears the `info` structure, and the sequence number is saved before the `memset` operation. The 88-byte write has to be coming from here.
Now, to properly assess the race, I'll need to determine the size of the `struct printk_info`. This is crucial for understanding the extent of the memory being accessed and if any overlaps with the read path.
|
| 10/2 | 2026/02/24 17:55 | tool | codesearch-struct-layout |
0mArgs:map[Name:printk_info] Results: map[Fields:[map[Name:seq OffsetBits:0 SizeBits:64] map[Name:ts_nsec OffsetBits:64 SizeBits:64] map[Name:text_len OffsetBits:128 SizeBits:16] map[Name:facility OffsetBits:144 SizeBits:8] map[Name:flags OffsetBits:152 SizeBits:5] map[Name:level OffsetBits:157 SizeBits:3] map[Name:caller_id OffsetBits:160 SizeBits:32] map[Name:dev_info OffsetBits:192 SizeBits:512]]] |
| 11/2 | 2026/02/24 17:55 | llm | expert |
0mModel:gemini-3-flash-preview Tokens: input: 32267 output: 29 thoughts: 7934 Thoughts: **Analyzing the Ringbuffer Race** Okay, here's what I've pieced together about this KCSAN report. The core issue seems to revolve around a race condition within the printk ringbuffer, specifically between a writer (likely the kernel's logging mechanism) and a reader. The report focuses on an 8-byte read from memory address `0xffffffff86cd92e0`, which corresponds to a `struct printk_info` instance, specifically the `seq` field. The writer is in `prb_reserve`, the function responsible for allocating and initializing a new entry in the ringbuffer. It performs these critical steps: 1. **Reservation:** Calls `desc_reserve` (setting descriptor state to `desc_reserved`). This uses a Compare-and-Swap (CAS) to ensure exclusive access. 2. **Zeroing:** It then calls `memset(info, 0, sizeof(*info));`, effectively clearing the `printk_info` structure. 3. **Sequence Assignment:** It assigns a sequence number (`info->seq`) to the entry. The reader, on the other hand, is executing within `_prb_read_valid`, which ultimately calls `prb_read`. The relevant sequence of events in `prb_read` is as follows: 1. **ID Extraction:** Reads `state_var` (descriptor state) from the relevant ringbuffer slot, extracting an ID which points to the current descriptor for a given sequence number. 2. **Validation:** It then calls `desc_read_finalized_seq`, which in turn, calls `desc_read`. 3. **Descriptor Read:** In `desc_read` (the crucial bit), it first reads the `state_var` from `state_var` and checks its state. If the state is "reserved" it returns. If it is not, it reads the sequence number `info->seq`. The KCSAN report suggests that the 8-byte read of `info->seq` in `desc_read` is happening concurrently with the writer's operations in `prb_reserve`. The fact that the value changes from `0x2b74` to `0x4b74` indicates that the reader is observing the sequence number during the writer's modification of the `info` structure. The sequence number is not set until *after* `memset(info, 0, 88)`, which makes a race here possible. The core of the problem lies in the fact that the reader might potentially read the value of `info->seq` from the old descriptor, and subsequently see the sequence number change because the writer is concurrently overwriting this slot. I see that `desc_reserve` sets the `state_var` to `desc_reserved` *before* the `memset` and sequence assignment. This is meant to prevent the race, but the race is still being triggered. The `state_var` check in `desc_read` should prevent this, so there is something missing. I need to confirm the sequence of operations with a multi-threaded execution model. The `state_var` must be read such that the reader misses the `desc_reserved` state and reads `info->seq` before the write of the new sequence number completes. It is critical that I confirm whether it is possible for the reader to read the value of `info->seq` at any time *other* than when the ID matches the actual `seq` being sought by the reader. |
| 12/2 | 2026/02/24 17:56 | tool | codesearch-file-index |
0mArgs:map[SourceFile:kernel/printk/printk_ringbuffer.c] Results: map[Entities:[map[Kind:function Name:_prb_commit] map[Kind:function Name:_prb_read_valid] map[Kind:function Name:copy_data] map[Kind:function Name:count_lines] map[Kind:function Name:data_alloc] map[Kind:function Name:data_check_size] map[Kind:function Name:data_make_reusable] map[Kind:function Name:data_push_tail] map[Kind:function Name:data_realloc] map[Kind:function Name:desc_last_finalized_seq] map[Kind:function Name:desc_make_final] map[Kind:function Name:desc_make_reusable] map[Kind:function Name:desc_push_tail] map[Kind:function Name:desc_read] map[Kind:function Name:desc_read_finalized_seq] map[Kind:function Name:desc_reopen_last] map[Kind:function Name:desc_reserve] map[Kind:function Name:desc_update_last_finalized] map[Kind:function Name:get_data] map[Kind:function Name:get_desc_state] map[Kind:function Name:get_next_lpos] map[Kind:function Name:is_blk_wrapped] map[Kind:function Name:need_more_space] map[Kind:function Name:prb_commit] map[Kind:function Name:prb_final_commit] map[Kind:function Name:prb_first_seq] map[Kind:function Name:prb_first_valid_seq] map[Kind:function Name:prb_init] map[Kind:function Name:prb_next_reserve_seq] map[Kind:function Name:prb_next_seq] map[Kind:function Name:prb_read] map[Kind:function Name:prb_read_valid] map[Kind:function Name:prb_read_valid_info] map[Kind:function Name:prb_record_text_space] map[Kind:function Name:prb_reserve] map[Kind:function Name:prb_reserve_in_last] map[Kind:function Name:space_used] map[Kind:function Name:to_blk_size] map[Kind:function Name:to_block] map[Kind:function Name:to_desc] map[Kind:function Name:to_info] map[Kind:struct Name:prb_data_block]]] |
| 13/2 | 2026/02/24 17:56 | llm | expert |
0mModel:gemini-3-flash-preview Tokens: input: 41679 output: 42 thoughts: 16 Thoughts: Okay, here's my take on those thoughts, tailored for an expert audience: **Analyzing the `desc_read` Source** Alright, time to dig in. My initial thought is to get the source code for the `desc_read` function, but I don't want just the function definition; I need the context. Specifically, I'm thinking line numbers. Jumping straight into the code without knowing where to look is a recipe for wasted time. Line numbers offer pinpoint accuracy. That way I can quickly determine how `desc_read` is called and, with that, the context it functions within. This should streamline my understanding of the larger system. |
| 14/2 | 2026/02/24 17:56 | tool | codesearch-definition-source |
0mArgs:map[ContextFile:kernel/printk/printk_ringbuffer.c IncludeLines:true Name:desc_read] Results: map[SourceCode: 441: /*
442: * Get a copy of a specified descriptor and return its queried state. If the
443: * descriptor is in an inconsistent state (miss or reserved), the caller can
444: * only expect the descriptor's @state_var field to be valid.
445: *
446: * The sequence number and caller_id can be optionally retrieved. Like all
447: * non-state_var data, they are only valid if the descriptor is in a
448: * consistent state.
449: */
450: static enum desc_state desc_read(struct prb_desc_ring *desc_ring,
451: unsigned long id, struct prb_desc *desc_out,
452: u64 *seq_out, u32 *caller_id_out)
453: {
454: struct printk_info *info = to_info(desc_ring, id);
455: struct prb_desc *desc = to_desc(desc_ring, id);
456: atomic_long_t *state_var = &desc->state_var;
457: enum desc_state d_state;
458: unsigned long state_val;
459:
460: /* Check the descriptor state. */
461: state_val = atomic_long_read(state_var); /* LMM(desc_read:A) */
462: d_state = get_desc_state(id, state_val);
463: if (d_state == desc_miss || d_state == desc_reserved) {
464: /*
465: * The descriptor is in an inconsistent state. Set at least
466: * @state_var so that the caller can see the details of
467: * the inconsistent state.
468: */
469: goto out;
470: }
471:
472: /*
473: * Guarantee the state is loaded before copying the descriptor
474: * content. This avoids copying obsolete descriptor content that might
475: * not apply to the descriptor state. This pairs with _prb_commit:B.
476: *
477: * Memory barrier involvement:
478: *
479: * If desc_read:A reads from _prb_commit:B, then desc_read:C reads
480: * from _prb_commit:A.
481: *
482: * Relies on:
483: *
484: * WMB from _prb_commit:A to _prb_commit:B
485: * matching
486: * RMB from desc_read:A to desc_read:C
487: */
488: smp_rmb(); /* LMM(desc_read:B) */
489:
490: /*
491: * Copy the descriptor data. The data is not valid until the
492: * state has been re-checked. A memcpy() for all of @desc
493: * cannot be used because of the atomic_t @state_var field.
494: */
495: if (desc_out) {
496: memcpy(&desc_out->text_blk_lpos, &desc->text_blk_lpos,
497: sizeof(desc_out->text_blk_lpos)); /* LMM(desc_read:C) */
498: }
499: if (seq_out)
500: *seq_out = info->seq; /* also part of desc_read:C */
501: if (caller_id_out)
502: *caller_id_out = info->caller_id; /* also part of desc_read:C */
503:
504: /*
505: * 1. Guarantee the descriptor content is loaded before re-checking
506: * the state. This avoids reading an obsolete descriptor state
507: * that may not apply to the copied content. This pairs with
508: * desc_reserve:F.
509: *
510: * Memory barrier involvement:
511: *
512: * If desc_read:C reads from desc_reserve:G, then desc_read:E
513: * reads from desc_reserve:F.
514: *
515: * Relies on:
516: *
517: * WMB from desc_reserve:F to desc_reserve:G
518: * matching
519: * RMB from desc_read:C to desc_read:E
520: *
521: * 2. Guarantee the record data is loaded before re-checking the
522: * state. This avoids reading an obsolete descriptor state that may
523: * not apply to the copied data. This pairs with data_alloc:A and
524: * data_realloc:A.
525: *
526: * Memory barrier involvement:
527: *
528: * If copy_data:A reads from data_alloc:B, then desc_read:E
529: * reads from desc_make_reusable:A.
530: *
531: * Relies on:
532: *
533: * MB from desc_make_reusable:A to data_alloc:B
534: * matching
535: * RMB from desc_read:C to desc_read:E
536: *
537: * Note: desc_make_reusable:A and data_alloc:B can be different
538: * CPUs. However, the data_alloc:B CPU (which performs the
539: * full memory barrier) must have previously seen
540: * desc_make_reusable:A.
541: */
542: smp_rmb(); /* LMM(desc_read:D) */
543:
544: /*
545: * The data has been copied. Return the current descriptor state,
546: * which may have changed since the load above.
547: */
548: state_val = atomic_long_read(state_var); /* LMM(desc_read:E) */
549: d_state = get_desc_state(id, state_val);
550: out:
551: if (desc_out)
552: atomic_long_set(&desc_out->state_var, state_val);
553: return d_state;
554: }
SourceFile:kernel/printk/printk_ringbuffer.c] |
| 15/2 | 2026/02/24 17:56 | llm | expert |
0mModel:gemini-3-flash-preview Tokens: input: 43773 output: 43 thoughts: 64 Thoughts: Here's my analysis of the provided information, as if these were my internal thoughts: **Dissecting the Kernel Trace** Alright, the confirmation is in. Line 500, specifically `*seq_out = info->seq;`, aligns perfectly with the pinpointed `desc_read` call within `kernel/printk/printk_ringbuffer.c` as the report indicates. That's a solid data point. We've got our target. Now, with this confirmation in hand, I need to understand the context. Diving into `prb_reserve` is the logical next step. I'll meticulously trace it, paying close attention to the line numbers as I go. Let's see how that sequence assignment fits into the broader picture of buffer management. |
| 16/2 | 2026/02/24 17:56 | tool | codesearch-definition-source |
0mArgs:map[ContextFile:kernel/printk/printk_ringbuffer.c IncludeLines:true Name:prb_reserve] Results: map[SourceCode:1629: /**
1630: * prb_reserve() - Reserve space in the ringbuffer.
1631: *
1632: * @e: The entry structure to setup.
1633: * @rb: The ringbuffer to reserve data in.
1634: * @r: The record structure to allocate buffers for.
1635: *
1636: * This is the public function available to writers to reserve data.
1637: *
1638: * The writer specifies the text size to reserve by setting the
1639: * @text_buf_size field of @r. To ensure proper initialization of @r,
1640: * prb_rec_init_wr() should be used.
1641: *
1642: * Context: Any context. Disables local interrupts on success.
1643: * Return: true if at least text data could be allocated, otherwise false.
1644: *
1645: * On success, the fields @info and @text_buf of @r will be set by this
1646: * function and should be filled in by the writer before committing. Also
1647: * on success, prb_record_text_space() can be used on @e to query the actual
1648: * space used for the text data block.
1649: *
1650: * Important: @info->text_len needs to be set correctly by the writer in
1651: * order for data to be readable and/or extended. Its value
1652: * is initialized to 0.
1653: */
1654: bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
1655: struct printk_record *r)
1656: {
1657: struct prb_desc_ring *desc_ring = &rb->desc_ring;
1658: struct printk_info *info;
1659: struct prb_desc *d;
1660: unsigned long id;
1661: u64 seq;
1662:
1663: if (!data_check_size(&rb->text_data_ring, r->text_buf_size))
1664: goto fail;
1665:
1666: /*
1667: * Descriptors in the reserved state act as blockers to all further
1668: * reservations once the desc_ring has fully wrapped. Disable
1669: * interrupts during the reserve/commit window in order to minimize
1670: * the likelihood of this happening.
1671: */
1672: local_irq_save(e->irqflags);
1673:
1674: if (!desc_reserve(rb, &id)) {
1675: /* Descriptor reservation failures are tracked. */
1676: atomic_long_inc(&rb->fail);
1677: local_irq_restore(e->irqflags);
1678: goto fail;
1679: }
1680:
1681: d = to_desc(desc_ring, id);
1682: info = to_info(desc_ring, id);
1683:
1684: /*
1685: * All @info fields (except @seq) are cleared and must be filled in
1686: * by the writer. Save @seq before clearing because it is used to
1687: * determine the new sequence number.
1688: */
1689: seq = info->seq;
1690: memset(info, 0, sizeof(*info));
1691:
1692: /*
1693: * Set the @e fields here so that prb_commit() can be used if
1694: * text data allocation fails.
1695: */
1696: e->rb = rb;
1697: e->id = id;
1698:
1699: /*
1700: * Initialize the sequence number if it has "never been set".
1701: * Otherwise just increment it by a full wrap.
1702: *
1703: * @seq is considered "never been set" if it has a value of 0,
1704: * _except_ for @infos[0], which was specially setup by the ringbuffer
1705: * initializer and therefore is always considered as set.
1706: *
1707: * See the "Bootstrap" comment block in printk_ringbuffer.h for
1708: * details about how the initializer bootstraps the descriptors.
1709: */
1710: if (seq == 0 && DESC_INDEX(desc_ring, id) != 0)
1711: info->seq = DESC_INDEX(desc_ring, id);
1712: else
1713: info->seq = seq + DESCS_COUNT(desc_ring);
1714:
1715: /*
1716: * New data is about to be reserved. Once that happens, previous
1717: * descriptors are no longer able to be extended. Finalize the
1718: * previous descriptor now so that it can be made available to
1719: * readers. (For seq==0 there is no previous descriptor.)
1720: */
1721: if (info->seq > 0)
1722: desc_make_final(rb, DESC_ID(id - 1));
1723:
1724: r->text_buf = data_alloc(rb, r->text_buf_size, &d->text_blk_lpos, id);
1725: /* If text data allocation fails, a data-less record is committed. */
1726: if (r->text_buf_size && !r->text_buf) {
1727: prb_commit(e);
1728: /* prb_commit() re-enabled interrupts. */
1729: goto fail;
1730: }
1731:
1732: r->info = info;
1733:
1734: /* Record full text space used by record. */
1735: e->text_space = space_used(&rb->text_data_ring, &d->text_blk_lpos);
1736:
1737: return true;
1738: fail:
1739: /* Make it clear to the caller that the reserve failed. */
1740: memset(r, 0, sizeof(*r));
1741: return false;
1742: }
SourceFile:kernel/printk/printk_ringbuffer.c] |
| 17/2 | 2026/02/24 17:56 | llm | expert |
0mModel:gemini-3-flash-preview Tokens: input: 45860 output: 25 thoughts: 2456 Thoughts: **Analysis of the Kernel Data Race Report** Okay, so KCSAN is flagging a potential data race here, specifically between a `memset` on line 1690 (inside `prb_reserve`) writing to `info`, and a read of `info->seq` on line 500 (inside `desc_read`). The report seems accurate, and the scenario as presented makes sense. The code sequence looks like this: a writer zeroes out a `printk_info` structure and writes to its `seq` field, while a reader tries to read the `seq` field. The key is in understanding how `desc_read` is designed. It's supposed to be lockless, using a read-check-recheck strategy based on the `state_var`. This variable tracks the state of the descriptor. The idea is that the reader first checks the descriptor's state using `atomic_long_read`. It then reads the `info->seq` value, and finally, re-checks the state. The writer, as I see it, first sets the state to `desc_reserved`, then clears the `info` struct (with the memset), and only *then* writes the new sequence number into the sequence number field. The reader, depending on the timing of these events, could encounter the following cases: * The reader correctly reads `desc_reserved` and aborts. * The reader sees a "valid" state (like `desc_committed`) after the sequence number has been written, reading a valid value. * **The problematic case:** The reader *thinks* the descriptor is valid after its first read, reads the sequence number while the writer is still in the middle of clearing the struct, and it does not pass the second state check. The reader gets a "garbage" value. The code *should* be designed to handle this, because `desc_read` re-checks the descriptor's state after reading. It uses `smp_rmb()` to enforce memory ordering. The second state check should catch the inconsistent read. However, KCSAN is still reporting a race because it sees concurrent memory accesses without explicit synchronization on the `info->seq` field itself. From KCSAN's perspective, this is a race. The real question is, is this a *harmful* race? The intent of this code is to be lockless and to use a read-check-recheck strategy. If the reader reads garbage but subsequently discards that data, then the race is probably "benign". However, I have to ensure that the garbage value does not cause a crash. I also have to consider the atomicity of the `info->seq` access, which is a 64-bit value on x86\_64 (as reported by the system call information). On 64-bit systems, unaligned and non-atomic accesses are possible. However, 64-bit loads and stores are usually atomic. So, the reader would either see the *old* sequence number, the result of the memset, or the *new* sequence number after the memset and update. I need to confirm that `info->seq` is properly aligned to be safe. Looking at `printk_ringbuffer.h` and the structure alignment will resolve this question. In addition, there are memory barriers within `desc_read` that need to ensure that the read of `info->seq` is done after the state check. The critical code in `_prb_commit` uses `atomic_long_try_cmpxchg`, which is a full barrier. So the sequence number update happens before the descriptor is committed. So the code sequence is correct and robust, if slightly complicated for a simple data race. However, the compiler may optimize the read/write sequences to violate this. In such cases, `data_race()` or `READ_ONCE`/`WRITE_ONCE` should be used to tell KCSAN that the code is intentional. I need to figure out if it is the case here. Otherwise, I may need to add it, so that the compiler is not confused, as this is a data race. |
| 18/2 | 2026/02/24 17:57 | tool | read-file |
0mArgs:map[File:kernel/printk/printk_ringbuffer.h] Results: map[Contents:/* SPDX-License-Identifier: GPL-2.0 */
#ifndef _KERNEL_PRINTK_RINGBUFFER_H
#define _KERNEL_PRINTK_RINGBUFFER_H
#include <linux/atomic.h>
#include <linux/bits.h>
#include <linux/dev_printk.h>
#include <linux/stddef.h>
#include <linux/types.h>
/*
* Meta information about each stored message.
*
* All fields are set by the printk code except for @seq, which is
* set by the ringbuffer code.
*/
struct printk_info {
u64 seq; /* sequence number */
u64 ts_nsec; /* timestamp in nanoseconds */
u16 text_len; /* length of text message */
u8 facility; /* syslog facility */
u8 flags:5; /* internal record flags */
u8 level:3; /* syslog level */
u32 caller_id; /* thread id or processor id */
#ifdef CONFIG_PRINTK_EXECUTION_CTX
u32 caller_id2; /* caller_id complement */
/* name of the task that generated the message */
char comm[TASK_COMM_LEN];
#endif
struct dev_printk_info dev_info;
};
/*
* A structure providing the buffers, used by writers and readers.
*
* Writers:
* Using prb_rec_init_wr(), a writer sets @text_buf_size before calling
* prb_reserve(). On success, prb_reserve() sets @info and @text_buf to
* buffers reserved for that writer.
*
* Readers:
* Using prb_rec_init_rd(), a reader sets all fields before calling
* prb_read_valid(). Note that the reader provides the @info and @text_buf,
* buffers. On success, the struct pointed to by @info will be filled and
* the char array pointed to by @text_buf will be filled with text data.
*/
struct printk_record {
struct printk_info *info;
char *text_buf;
unsigned int text_buf_size;
};
/* Specifies the logical position and span of a data block. */
struct prb_data_blk_lpos {
unsigned long begin;
unsigned long next;
};
/*
* A descriptor: the complete meta-data for a record.
*
* @state_var: A bitwise combination of descriptor ID and descriptor state.
*/
struct prb_desc {
atomic_long_t state_var;
struct prb_data_blk_lpos text_blk_lpos;
};
/* A ringbuffer of "ID + data" elements. */
struct prb_data_ring {
unsigned int size_bits;
char *data;
atomic_long_t head_lpos;
atomic_long_t tail_lpos;
};
/* A ringbuffer of "struct prb_desc" elements. */
struct prb_desc_ring {
unsigned int count_bits;
struct prb_desc *descs;
struct printk_info *infos;
atomic_long_t head_id;
atomic_long_t tail_id;
atomic_long_t last_finalized_seq;
};
/*
* The high level structure representing the printk ringbuffer.
*
* @fail: Count of failed prb_reserve() calls where not even a data-less
* record was created.
*/
struct printk_ringbuffer {
struct prb_desc_ring desc_ring;
struct prb_data_ring text_data_ring;
atomic_long_t fail;
};
/*
* Used by writers as a reserve/commit handle.
*
* @rb: Ringbuffer where the entry is reserved.
* @irqflags: Saved irq flags to restore on entry commit.
* @id: ID of the reserved descriptor.
* @text_space: Total occupied buffer space in the text data ring, including
* ID, alignment padding, and wrapping data blocks.
*
* This structure is an opaque handle for writers. Its contents are only
* to be used by the ringbuffer implementation.
*/
struct prb_reserved_entry {
struct printk_ringbuffer *rb;
unsigned long irqflags;
unsigned long id;
unsigned int text_space;
};
/* The possible responses of a descriptor state-query. */
enum desc_state {
desc_miss = -1, /* ID mismatch (pseudo state) */
desc_reserved = 0x0, /* reserved, in use by writer */
desc_committed = 0x1, /* committed by writer, could get reopened */
desc_finalized = 0x2, /* committed, no further modification allowed */
desc_reusable = 0x3, /* free, not yet used by any writer */
};
#define _DATA_SIZE(sz_bits) (1UL << (sz_bits))
#define _DESCS_COUNT(ct_bits) (1U << (ct_bits))
#define DESC_SV_BITS BITS_PER_LONG
#define DESC_FLAGS_SHIFT (DESC_SV_BITS - 2)
#define DESC_FLAGS_MASK (3UL << DESC_FLAGS_SHIFT)
#define DESC_STATE(sv) (3UL & (sv >> DESC_FLAGS_SHIFT))
#define DESC_SV(id, state) (((unsigned long)state << DESC_FLAGS_SHIFT) | id)
#define DESC_ID_MASK (~DESC_FLAGS_MASK)
#define DESC_ID(sv) ((sv) & DESC_ID_MASK)
/*
* Special data block logical position values (for fields of
* @prb_desc.text_blk_lpos).
*
* - Bit0 is used to identify if the record has no data block. (Implemented in
* the LPOS_DATALESS() macro.)
*
* - Bit1 specifies the reason for not having a data block.
*
* These special values could never be real lpos values because of the
* meta data and alignment padding of data blocks. (See to_blk_size() for
* details.)
*/
#define FAILED_LPOS 0x1
#define EMPTY_LINE_LPOS 0x3
#define FAILED_BLK_LPOS \
{ \
.begin = FAILED_LPOS, \
.next = FAILED_LPOS, \
}
/*
* Descriptor Bootstrap
*
* The descriptor array is minimally initialized to allow immediate usage
* by readers and writers. The requirements that the descriptor array
* initialization must satisfy:
*
* Req1
* The tail must point to an existing (committed or reusable) descriptor.
* This is required by the implementation of prb_first_seq().
*
* Req2
* Readers must see that the ringbuffer is initially empty.
*
* Req3
* The first record reserved by a writer is assigned sequence number 0.
*
* To satisfy Req1, the tail initially points to a descriptor that is
* minimally initialized (having no data block, i.e. data-less with the
* data block's lpos @begin and @next values set to FAILED_LPOS).
*
* To satisfy Req2, the initial tail descriptor is initialized to the
* reusable state. Readers recognize reusable descriptors as existing
* records, but skip over them.
*
* To satisfy Req3, the last descriptor in the array is used as the initial
* head (and tail) descriptor. This allows the first record reserved by a
* writer (head + 1) to be the first descriptor in the array. (Only the first
* descriptor in the array could have a valid sequence number of 0.)
*
* The first time a descriptor is reserved, it is assigned a sequence number
* with the value of the array index. A "first time reserved" descriptor can
* be recognized because it has a sequence number of 0 but does not have an
* index of 0. (Only the first descriptor in the array could have a valid
* sequence number of 0.) After the first reservation, all future reservations
* (recycling) simply involve incrementing the sequence number by the array
* count.
*
* Hack #1
* Only the first descriptor in the array is allowed to have the sequence
* number 0. In this case it is not possible to recognize if it is being
* reserved the first time (set to index value) or has been reserved
* previously (increment by the array count). This is handled by _always_
* incrementing the sequence number by the array count when reserving the
* first descriptor in the array. In order to satisfy Req3, the sequence
* number of the first descriptor in the array is initialized to minus
* the array count. Then, upon the first reservation, it is incremented
* to 0, thus satisfying Req3.
*
* Hack #2
* prb_first_seq() can be called at any time by readers to retrieve the
* sequence number of the tail descriptor. However, due to Req2 and Req3,
* initially there are no records to report the sequence number of
* (sequence numbers are u64 and there is nothing less than 0). To handle
* this, the sequence number of the initial tail descriptor is initialized
* to 0. Technically this is incorrect, because there is no record with
* sequence number 0 (yet) and the tail descriptor is not the first
* descriptor in the array. But it allows prb_read_valid() to correctly
* report the existence of a record for _any_ given sequence number at all
* times. Bootstrapping is complete when the tail is pushed the first
* time, thus finally pointing to the first descriptor reserved by a
* writer, which has the assigned sequence number 0.
*/
/*
* Initiating Logical Value Overflows
*
* Both logical position (lpos) and ID values can be mapped to array indexes
* but may experience overflows during the lifetime of the system. To ensure
* that printk_ringbuffer can handle the overflows for these types, initial
* values are chosen that map to the correct initial array indexes, but will
* result in overflows soon.
*
* BLK0_LPOS
* The initial @head_lpos and @tail_lpos for data rings. It is at index
* 0 and the lpos value is such that it will overflow on the first wrap.
*
* DESC0_ID
* The initial @head_id and @tail_id for the desc ring. It is at the last
* index of the descriptor array (see Req3 above) and the ID value is such
* that it will overflow on the second wrap.
*/
#define BLK0_LPOS(sz_bits) (-(_DATA_SIZE(sz_bits)))
#define DESC0_ID(ct_bits) DESC_ID(-(_DESCS_COUNT(ct_bits) + 1))
#define DESC0_SV(ct_bits) DESC_SV(DESC0_ID(ct_bits), desc_reusable)
/*
* Define a ringbuffer with an external text data buffer. The same as
* DEFINE_PRINTKRB() but requires specifying an external buffer for the
* text data.
*
* Note: The specified external buffer must be of the size:
* 2 ^ (descbits + avgtextbits)
*/
#define _DEFINE_PRINTKRB(name, descbits, avgtextbits, text_buf) \
static struct prb_desc _##name##_descs[_DESCS_COUNT(descbits)] = { \
/* the initial head and tail */ \
[_DESCS_COUNT(descbits) - 1] = { \
/* reusable */ \
.state_var = ATOMIC_INIT(DESC0_SV(descbits)), \
/* no associated data block */ \
.text_blk_lpos = FAILED_BLK_LPOS, \
}, \
}; \
static struct printk_info _##name##_infos[_DESCS_COUNT(descbits)] = { \
/* this will be the first record reserved by a writer */ \
[0] = { \
/* will be incremented to 0 on the first reservation */ \
.seq = -(u64)_DESCS_COUNT(descbits), \
}, \
/* the initial head and tail */ \
[_DESCS_COUNT(descbits) - 1] = { \
/* reports the first seq value during the bootstrap phase */ \
.seq = 0, \
}, \
}; \
static struct printk_ringbuffer name = { \
.desc_ring = { \
.count_bits = descbits, \
.descs = &_##name##_descs[0], \
.infos = &_##name##_infos[0], \
.head_id = ATOMIC_INIT(DESC0_ID(descbits)), \
.tail_id = ATOMIC_INIT(DESC0_ID(descbits)), \
.last_finalized_seq = ATOMIC_INIT(0), \
}, \
.text_data_ring = { \
.size_bits = (avgtextbits) + (descbits), \
.data = text_buf, \
.head_lpos = ATOMIC_LONG_INIT(BLK0_LPOS((avgtextbits) + (descbits))), \
.tail_lpos = ATOMIC_LONG_INIT(BLK0_LPOS((avgtextbits) + (descbits))), \
}, \
.fail = ATOMIC_LONG_INIT(0), \
}
/**
* DEFINE_PRINTKRB() - Define a ringbuffer.
*
* @name: The name of the ringbuffer variable.
* @descbits: The number of descriptors as a power-of-2 value.
* @avgtextbits: The average text data size per record as a power-of-2 value.
*
* This is a macro for defining a ringbuffer and all internal structures
* such that it is ready for immediate use. See _DEFINE_PRINTKRB() for a
* variant where the text data buffer can be specified externally.
*/
#define DEFINE_PRINTKRB(name, descbits, avgtextbits) \
static char _##name##_text[1U << ((avgtextbits) + (descbits))] \
__aligned(__alignof__(unsigned long)); \
_DEFINE_PRINTKRB(name, descbits, avgtextbits, &_##name##_text[0])
/* Writer Interface */
/**
* prb_rec_init_wr() - Initialize a buffer for writing records.
*
* @r: The record to initialize.
* @text_buf_size: The needed text buffer size.
*/
static inline void prb_rec_init_wr(struct printk_record *r,
unsigned int text_buf_size)
{
r->info = NULL;
r->text_buf = NULL;
r->text_buf_size = text_buf_size;
}
bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
struct printk_record *r);
bool prb_reserve_in_last(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
struct printk_record *r, u32 caller_id, unsigned int max_size);
void prb_commit(struct prb_reserved_entry *e);
void prb_final_commit(struct prb_reserved_entry *e);
void prb_init(struct printk_ringbuffer *rb,
char *text_buf, unsigned int text_buf_size,
struct prb_desc *descs, unsigned int descs_count_bits,
struct printk_info *infos);
unsigned int prb_record_text_space(struct prb_reserved_entry *e);
/* Reader Interface */
/**
* prb_rec_init_rd() - Initialize a buffer for reading records.
*
* @r: The record to initialize.
* @info: A buffer to store record meta-data.
* @text_buf: A buffer to store text data.
* @text_buf_size: The size of @text_buf.
*
* Initialize all the fields that a reader is interested in. All arguments
* (except @r) are optional. Only record data for arguments that are
* non-NULL or non-zero will be read.
*/
static inline void prb_rec_init_rd(struct printk_record *r,
struct printk_info *info,
char *text_buf, unsigned int text_buf_size)
{
r->info = info;
r->text_buf = text_buf;
r->text_buf_size = text_buf_size;
}
/**
* prb_for_each_record() - Iterate over the records of a ringbuffer.
*
* @from: The sequence number to begin with.
* @rb: The ringbuffer to iterate over.
* @s: A u64 to store the sequence number on each iteration.
* @r: A printk_record to store the record on each iteration.
*
* This is a macro for conveniently iterating over a ringbuffer.
* Note that @s may not be the sequence number of the record on each
* iteration. For the sequence number, @r->info->seq should be checked.
*
* Context: Any context.
*/
#define prb_for_each_record(from, rb, s, r) \
for ((s) = from; prb_read_valid(rb, s, r); (s) = (r)->info->seq + 1)
/**
* prb_for_each_info() - Iterate over the meta data of a ringbuffer.
*
* @from: The sequence number to begin with.
* @rb: The ringbuffer to iterate over.
* @s: A u64 to store the sequence number on each iteration.
* @i: A printk_info to store the record meta data on each iteration.
* @lc: An unsigned int to store the text line count of each record.
*
* This is a macro for conveniently iterating over a ringbuffer.
* Note that @s may not be the sequence number of the record on each
* iteration. For the sequence number, @r->info->seq should be checked.
*
* Context: Any context.
*/
#define prb_for_each_info(from, rb, s, i, lc) \
for ((s) = from; prb_read_valid_info(rb, s, i, lc); (s) = (i)->seq + 1)
bool prb_read_valid(struct printk_ringbuffer *rb, u64 seq,
struct printk_record *r);
bool prb_read_valid_info(struct printk_ringbuffer *rb, u64 seq,
struct printk_info *info, unsigned int *line_count);
u64 prb_first_seq(struct printk_ringbuffer *rb);
u64 prb_first_valid_seq(struct printk_ringbuffer *rb);
u64 prb_next_seq(struct printk_ringbuffer *rb);
u64 prb_next_reserve_seq(struct printk_ringbuffer *rb);
#ifdef CONFIG_64BIT
#define __u64seq_to_ulseq(u64seq) (u64seq)
#define __ulseq_to_u64seq(rb, ulseq) (ulseq)
#define ULSEQ_MAX(rb) (-1)
#else /* CONFIG_64BIT */
#define __u64seq_to_ulseq(u64seq) ((u32)u64seq)
#define ULSEQ_MAX(rb) __u64seq_to_ulseq(prb_first_seq(rb) + 0x80000000UL)
static inline u64 __ulseq_to_u64seq(struct printk_ringbuffer *rb, u32 ulseq)
{
u64 rb_first_seq = prb_first_seq(rb);
u64 seq;
/*
* The provided sequence is only the lower 32 bits of the ringbuffer
* sequence. It needs to be expanded to 64bit. Get the first sequence
* number from the ringbuffer and fold it.
*
* Having a 32bit representation in the console is sufficient.
* If a console ever gets more than 2^31 records behind
* the ringbuffer then this is the least of the problems.
*
* Also the access to the ring buffer is always safe.
*/
seq = rb_first_seq - (s32)((u32)rb_first_seq - ulseq);
return seq;
}
#endif /* CONFIG_64BIT */
#endif /* _KERNEL_PRINTK_RINGBUFFER_H */
] |
| 19/2 | 2026/02/24 17:57 | llm | expert |
0mModel:gemini-3-flash-preview Tokens: input: 54327 output: 19 thoughts: 5770 Thoughts: **Analysis of a Potential Data Race in the Printk Ringbuffer** Okay, so I'm looking at this KCSAN report. The gist is, there's a data race in the printk ringbuffer between writing to `info->seq` and reading it. The `printk_info` struct is 8-byte aligned because it starts with a `u64`, so that's good. In `_DEFINE_PRINTKRB`, the `_##name##_infos` array is naturally aligned because it is defined as static array. Also 8-byte aligned reads and writes are atomic on x86_64. The KCSAN report shows a race between `memset(info, 0, 88)` and the read of `info->seq`. The `memset` zeros `info->seq`, the reader might see zero but the re-check of state will catch it. The interesting thing is, this whole thing seems… *benign*. This read/check/recheck pattern, it's all part of the printk ringbuffer's design. The writer reserves a descriptor, does a `memset`, then updates `info->seq`. The reader, potentially running concurrently, *reads* `info->seq`, but it *also* checks the state of the descriptor before and after the read. So if the state isn't what it expects (finalized), or if it detects a change in the ID during the process, it discards the value. The writer uses atomic barriers, which guarantees writes to `info` happen before the state update. The barrier ensures the read sees a consistent value, but that doesn't mean it's necessarily valid data. The compiler could optimize the `memset` and `info->seq`, but the sequence assignment depends on the previous sequence (`seq`) and the compiler can’t eliminate it because it's *possibly* being read by another thread (even though the compiler doesn't "know" this because there's no volatile). The `smp_rmb()` in `desc_read` acts as a compiler barrier as well, so it can't move the load of `info->seq` after the load of `state_var`, which would make the "read-check-recheck" useless. The `atomic_long_try_cmpxchg` in `_prb_commit` provides a full barrier, ensuring all writes to `info`, including `info->seq`, are complete before the `state_var` update. So, the protocol *is* correct. KCSAN just flags the concurrent access as a data race because there is no formal synchronization. The printk ringbuffer is a hot path; it's designed to be fast and lockless. The devs *know* about these races and have designed the system to *handle* them. There's a comment in `desc_read` that states as much; the data isn't valid until the state is re-checked. Even the `copy_data` function, which copies the text data, has a comment acknowledging the possibility of invalid `len` values due to the race. It's safe though, because it uses `min_t` to avoid overflowing the buffer. There are data races, but the system's design incorporates the race into the design, and the state checks make sure all is well. I see that `READ_ONCE` might resolve KCSAN's complaints. But, as I'm observing the code it's not present (at least in this version), which is fine given the error protocol. So, is this *harmful*? No. The printk ringbuffer is carefully designed to handle these races. The code checks the state *before* and *after* reading `info->seq`. If the state is not finalized (or reusable, depending on the call site), the value is discarded, and the ringbuffer recovers. It is expected and is handled. The value changed, the reader completed its read. The second check will then detect the data race. The checks handle that. The KCSAN report is flagging a race on the first 8 bytes of the `info` struct, which is the 88-byte `memset`, which is not a simple boolean/integer, but the core of the problem is just on the `seq` which has this explicit "handle this race" code. It is designed to work. It's a key part of how the ringbuffer synchronizes, so it doesn't lead to any corruption or crash. I should verify that. The code reads `info->text_len` during the race, but `copy_data` checks the `info->text_len` to avoid overflow. `desc_read` just does an assignment to a local variable. No crash here. The writer does a `memset` and |
| 20/2 | 2026/02/24 17:57 | tool | set-results |
0mArgs:map[Benign:true Confident:true] Results: map[Benign:true Confident:true] |
| 21/2 | 2026/02/24 17:57 | llm | expert |
0mModel:gemini-3-flash-preview Tokens: input: 61158 output: 656 thoughts: 89 Thoughts: Okay, here's my interpretation of that analysis, from my perspective as someone well-versed in this area: **Data Race Analysis Summary** Alright, the investigation is done. Looks like the data race we found isn't actually a problem. It's part of the `printk` ringbuffer's lockless synchronization, and a well-known one at that. That's a relief! The "read-check-recheck" pattern is the key. The reader's designed to grab the descriptor state, check it, and then recheck it before actually using the data. This handles the concurrency issue because any metadata like `info->seq` that the reader grabs while a concurrent write is in progress will be flagged as inconsistent. Essentially, the "recheck" will detect that change and the read will be discarded; preventing the reader from using invalid or mixed data. Plus, the code is clearly written to handle any garbage values that might creep in during the race. So, even if something odd gets read, the system is robust enough to deal with it without crashing or corrupting data. Good stuff! This looks like a solid, albeit optimized, solution to the problem. |
| Total Calls | Total Tokens | Avg Tokens | Total Duration (Seconds) | Avg Duration (Seconds) |
|---|
| Total Calls | Total Duration (Seconds) | Avg Duration (Seconds) |
|---|