1/*
2 * SPDX-License-Identifier: Apache-2.0
3 * SPDX-FileCopyrightText: 2026 The Contributors to Eclipse OpenSOVD (see CONTRIBUTORS)
4 *
5 * See the NOTICE file(s) distributed with this work for additional
6 * information regarding copyright ownership.
7 *
8 * This program and the accompanying materials are made available under the
9 * terms of the Apache License Version 2.0 which is available at
10 * https://www.apache.org/licenses/LICENSE-2.0
11 */
12
13use std::time::Duration;
14
15use cda_health::{HealthState, Status as HealthStatus};
16use cda_interfaces::spawn_named;
17use tokio::task::JoinHandle;
18
19/// Creates a background task that integrates with the systemd watchdog.
20///
[docs] 21/// [[ dimpl~system-sd-notify-watchdog-integration, Systemd Watchdog Integration ]]
22///
23/// If the process was booted via systemd and the watchdog is enabled,
24/// this function spawns a task that periodically checks the health state
25/// and sends the appropriate `sd_notify` notification:
26///
27/// - `Ready` when transitioning from `Starting` to `Up`
28/// - `Watchdog` while healthy (`Up` -> `Up`)
29/// - `WatchdogTrigger` when health degrades (`Up` -> `Failed`)
30///
31/// The notification interval is the systemd-configured watchdog timeout
32/// minus 5 seconds (to avoid timing issues).
33///
34/// If systemd is not detected or the watchdog is not enabled, returns `None`
35/// and the CDA runs without `sd_notify` behavior.
36pub fn create_sd_notify_task<F>(
37 health_state: Option<HealthState>,
38 shutdown_signal: F,
39) -> Option<JoinHandle<()>>
40where
41 F: Future<Output = ()> + Clone + Send + 'static,
42{
43 // map std_notify::booted Err(_) to false and treat as not systemd booted
44 if !sd_notify::booted().unwrap_or(false) {
45 tracing::info!("Systemd not detected, skipping sd_notify initialization");
46 return None;
47 }
48 let Some(interval) = determine_interval() else {
49 tracing::info!("Systemd watchdog not enabled, skipping sd_notify initialization");
50 return None;
51 };
52 // init sd notify task
53 let watchdog_future = async move {
54 let mut interval_timer = tokio::time::interval(interval);
55 let mut state = cda_health::Status::Starting;
56 loop {
57 interval_timer.tick().await;
58 state = trigger_watchdog(health_state.as_ref(), state).await;
59 }
60 };
61 let sd_notify_future = spawn_named!("sd_notify", async move {
62 tokio::select! {
63 _ = watchdog_future => {},
64 () = shutdown_signal => {},
65 }
66 });
67 tracing::info!(
68 "Systemd detected and watchdog enabled, initialized sd_notify task with interval of {:?} \
69 seconds",
70 interval.as_secs()
71 );
72 Some(sd_notify_future)
73}
74
75fn determine_interval() -> Option<Duration> {
76 sd_notify::watchdog_enabled().map(|duration| {
77 // ensure we are sending a bit more often than the watchdog expects,
78 // to avoid any timing issues
79 if let Some(interval) = duration.checked_sub(Duration::from_secs(5)) {
80 interval.min(Duration::from_secs(1))
81 } else {
82 duration
83 }
84 })
85}
86
87async fn trigger_watchdog(
88 health_state: Option<&HealthState>,
89 prev_state: cda_health::Status,
90) -> cda_health::Status {
91 let new_status = fold_health_state(health_state).await;
92 let notify = match (prev_state, new_status) {
93 (cda_health::Status::Starting, cda_health::Status::Up) => sd_notify::NotifyState::Ready,
94 (cda_health::Status::Up, cda_health::Status::Failed) => {
95 sd_notify::NotifyState::WatchdogTrigger
96 }
97 (cda_health::Status::Up, cda_health::Status::Up) => sd_notify::NotifyState::Watchdog,
98 // this would be Failure -> Starting or Up -> Starting. so makes no sense
99 _ => {
100 tracing::warn!(
101 "Unexpected health status transition from {prev_state:?} to {new_status:?} when \
102 triggering sd_notify watchdog"
103 );
104 sd_notify::NotifyState::Watchdog
105 }
106 };
107 tracing::debug!("Triggering sd_notify watchdog with status {notify:?}");
108 if let Err(e) = sd_notify::notify(&[notify]) {
109 tracing::warn!(error = %e, "Failed to send sd_notify watchdog notification");
110 }
111 new_status
112}
113
114fn fold_status(acc: HealthStatus, status: HealthStatus) -> HealthStatus {
115 match (acc, status) {
116 (
117 HealthStatus::Up | HealthStatus::Starting | HealthStatus::Pending,
118 HealthStatus::Failed,
119 )
120 | (HealthStatus::Failed, _) => HealthStatus::Failed,
121 (HealthStatus::Starting | HealthStatus::Pending, HealthStatus::Pending) => {
122 HealthStatus::Starting
123 }
124 (HealthStatus::Starting | HealthStatus::Pending, HealthStatus::Up) => HealthStatus::Up,
125 (_, status) => status,
126 }
127}
128
129async fn fold_health_state(health_state: Option<&HealthState>) -> HealthStatus {
130 let Some(health_state) = health_state else {
131 return cda_health::Status::Up;
132 };
133 health_state
134 .query_all_providers()
135 .await
136 .into_values()
137 .fold(cda_health::Status::Starting, fold_status)
138}
139
[docs]140/// [[ test~system-sd-notify-watchdog-integration, Systemd Watchdog Health Aggregation Tests ]]
141#[cfg(test)]
142mod tests {
143 use cda_health::Status;
144
145 use super::fold_status;
146
147 #[test]
148 fn fold_up_then_failed_yields_failed() {
149 assert_eq!(fold_status(Status::Up, Status::Failed), Status::Failed);
150 }
151
152 #[test]
153 fn fold_starting_then_failed_yields_failed() {
154 assert_eq!(
155 fold_status(Status::Starting, Status::Failed),
156 Status::Failed
157 );
158 }
159
160 #[test]
161 fn fold_pending_then_failed_yields_failed() {
162 assert_eq!(fold_status(Status::Pending, Status::Failed), Status::Failed);
163 }
164
165 #[test]
166 fn fold_failed_then_up_yields_failed() {
167 assert_eq!(fold_status(Status::Failed, Status::Up), Status::Failed);
168 }
169
170 #[test]
171 fn fold_failed_then_starting_yields_failed() {
172 assert_eq!(
173 fold_status(Status::Failed, Status::Starting),
174 Status::Failed
175 );
176 }
177
178 #[test]
179 fn fold_failed_then_pending_yields_failed() {
180 assert_eq!(fold_status(Status::Failed, Status::Pending), Status::Failed);
181 }
182
183 #[test]
184 fn fold_failed_then_failed_yields_failed() {
185 assert_eq!(fold_status(Status::Failed, Status::Failed), Status::Failed);
186 }
187
188 #[test]
189 fn fold_starting_then_pending_yields_starting() {
190 assert_eq!(
191 fold_status(Status::Starting, Status::Pending),
192 Status::Starting
193 );
194 }
195
196 #[test]
197 fn fold_pending_then_pending_yields_starting() {
198 assert_eq!(
199 fold_status(Status::Pending, Status::Pending),
200 Status::Starting
201 );
202 }
203
204 #[test]
205 fn fold_starting_then_up_yields_up() {
206 assert_eq!(fold_status(Status::Starting, Status::Up), Status::Up);
207 }
208
209 #[test]
210 fn fold_pending_then_up_yields_up() {
211 assert_eq!(fold_status(Status::Pending, Status::Up), Status::Up);
212 }
213
214 #[test]
215 fn fold_up_then_up_yields_up() {
216 assert_eq!(fold_status(Status::Up, Status::Up), Status::Up);
217 }
218
219 #[test]
220 fn fold_up_then_starting_yields_starting() {
221 assert_eq!(fold_status(Status::Up, Status::Starting), Status::Starting);
222 }
223
224 #[test]
225 fn fold_up_then_pending_yields_pending() {
226 assert_eq!(fold_status(Status::Up, Status::Pending), Status::Pending);
227 }
228
229 #[test]
230 fn fold_sequence_all_up_from_starting() {
231 // initial acc = Starting (as fold_health_state does), all providers Up
232 let result = [Status::Up, Status::Up, Status::Up]
233 .into_iter()
234 .fold(Status::Starting, fold_status);
235 assert_eq!(result, Status::Up);
236 }
237
238 #[test]
239 fn fold_sequence_any_failed_dominates() {
240 let result = [Status::Up, Status::Failed, Status::Up]
241 .into_iter()
242 .fold(Status::Starting, fold_status);
243 assert_eq!(result, Status::Failed);
244 }
245
246 #[test]
247 fn fold_sequence_failed_at_start_stays_failed() {
248 let result = [Status::Failed, Status::Up, Status::Up]
249 .into_iter()
250 .fold(Status::Starting, fold_status);
251 assert_eq!(result, Status::Failed);
252 }
253
254 #[test]
255 fn fold_sequence_all_starting_stays_starting() {
256 let result = [Status::Starting, Status::Starting]
257 .into_iter()
258 .fold(Status::Starting, fold_status);
259 assert_eq!(result, Status::Starting);
260 }
261
262 #[test]
263 fn fold_sequence_mix_starting_pending_yields_starting() {
264 let result = [Status::Pending, Status::Starting, Status::Pending]
265 .into_iter()
266 .fold(Status::Starting, fold_status);
267 assert_eq!(result, Status::Starting);
268 }
269
270 #[test]
271 fn fold_sequence_empty_providers_yields_starting() {
272 // no providers -> fold over empty iterator -> initial accumulator = Starting
273 let result = std::iter::empty::<Status>().fold(Status::Starting, fold_status);
274 assert_eq!(result, Status::Starting);
275 }
276
277 #[tokio::test]
278 async fn fold_health_state_none_yields_up() {
279 let result = super::fold_health_state(None).await;
280 assert_eq!(result, Status::Up);
281 }
282}