1/*
  2 * SPDX-License-Identifier: Apache-2.0
  3 * SPDX-FileCopyrightText: 2026 The Contributors to Eclipse OpenSOVD (see CONTRIBUTORS)
  4 *
  5 * See the NOTICE file(s) distributed with this work for additional
  6 * information regarding copyright ownership.
  7 *
  8 * This program and the accompanying materials are made available under the
  9 * terms of the Apache License Version 2.0 which is available at
 10 * https://www.apache.org/licenses/LICENSE-2.0
 11 */
 12
 13use std::time::Duration;
 14
 15use cda_health::{HealthState, Status as HealthStatus};
 16use cda_interfaces::spawn_named;
 17use tokio::task::JoinHandle;
 18
 19/// Creates a background task that integrates with the systemd watchdog.
 20///
[docs] 21/// [[ dimpl~system-sd-notify-watchdog-integration, Systemd Watchdog Integration ]]
 22///
 23/// If the process was booted via systemd and the watchdog is enabled,
 24/// this function spawns a task that periodically checks the health state
 25/// and sends the appropriate `sd_notify` notification:
 26///
 27/// - `Ready` when transitioning from `Starting` to `Up`
 28/// - `Watchdog` while healthy (`Up` -> `Up`)
 29/// - `WatchdogTrigger` when health degrades (`Up` -> `Failed`)
 30///
 31/// The notification interval is the systemd-configured watchdog timeout
 32/// minus 5 seconds (to avoid timing issues).
 33///
 34/// If systemd is not detected or the watchdog is not enabled, returns `None`
 35/// and the CDA runs without `sd_notify` behavior.
 36pub fn create_sd_notify_task<F>(
 37    health_state: Option<HealthState>,
 38    shutdown_signal: F,
 39) -> Option<JoinHandle<()>>
 40where
 41    F: Future<Output = ()> + Clone + Send + 'static,
 42{
 43    // map std_notify::booted Err(_) to false and treat as not systemd booted
 44    if !sd_notify::booted().unwrap_or(false) {
 45        tracing::info!("Systemd not detected, skipping sd_notify initialization");
 46        return None;
 47    }
 48    let Some(interval) = determine_interval() else {
 49        tracing::info!("Systemd watchdog not enabled, skipping sd_notify initialization");
 50        return None;
 51    };
 52    // init sd notify task
 53    let watchdog_future = async move {
 54        let mut interval_timer = tokio::time::interval(interval);
 55        let mut state = cda_health::Status::Starting;
 56        loop {
 57            interval_timer.tick().await;
 58            state = trigger_watchdog(health_state.as_ref(), state).await;
 59        }
 60    };
 61    let sd_notify_future = spawn_named!("sd_notify", async move {
 62        tokio::select! {
 63            _ = watchdog_future => {},
 64            () = shutdown_signal => {},
 65        }
 66    });
 67    tracing::info!(
 68        "Systemd detected and watchdog enabled, initialized sd_notify task with interval of {:?} \
 69         seconds",
 70        interval.as_secs()
 71    );
 72    Some(sd_notify_future)
 73}
 74
 75fn determine_interval() -> Option<Duration> {
 76    sd_notify::watchdog_enabled().map(|duration| {
 77        // ensure we are sending a bit more often than the watchdog expects,
 78        // to avoid any timing issues
 79        if let Some(interval) = duration.checked_sub(Duration::from_secs(5)) {
 80            interval.min(Duration::from_secs(1))
 81        } else {
 82            duration
 83        }
 84    })
 85}
 86
 87async fn trigger_watchdog(
 88    health_state: Option<&HealthState>,
 89    prev_state: cda_health::Status,
 90) -> cda_health::Status {
 91    let new_status = fold_health_state(health_state).await;
 92    let notify = match (prev_state, new_status) {
 93        (cda_health::Status::Starting, cda_health::Status::Up) => sd_notify::NotifyState::Ready,
 94        (cda_health::Status::Up, cda_health::Status::Failed) => {
 95            sd_notify::NotifyState::WatchdogTrigger
 96        }
 97        (cda_health::Status::Up, cda_health::Status::Up) => sd_notify::NotifyState::Watchdog,
 98        // this would be Failure -> Starting or Up -> Starting. so makes no sense
 99        _ => {
100            tracing::warn!(
101                "Unexpected health status transition from {prev_state:?} to {new_status:?} when \
102                 triggering sd_notify watchdog"
103            );
104            sd_notify::NotifyState::Watchdog
105        }
106    };
107    tracing::debug!("Triggering sd_notify watchdog with status {notify:?}");
108    if let Err(e) = sd_notify::notify(&[notify]) {
109        tracing::warn!(error = %e, "Failed to send sd_notify watchdog notification");
110    }
111    new_status
112}
113
114fn fold_status(acc: HealthStatus, status: HealthStatus) -> HealthStatus {
115    match (acc, status) {
116        (
117            HealthStatus::Up | HealthStatus::Starting | HealthStatus::Pending,
118            HealthStatus::Failed,
119        )
120        | (HealthStatus::Failed, _) => HealthStatus::Failed,
121        (HealthStatus::Starting | HealthStatus::Pending, HealthStatus::Pending) => {
122            HealthStatus::Starting
123        }
124        (HealthStatus::Starting | HealthStatus::Pending, HealthStatus::Up) => HealthStatus::Up,
125        (_, status) => status,
126    }
127}
128
129async fn fold_health_state(health_state: Option<&HealthState>) -> HealthStatus {
130    let Some(health_state) = health_state else {
131        return cda_health::Status::Up;
132    };
133    health_state
134        .query_all_providers()
135        .await
136        .into_values()
137        .fold(cda_health::Status::Starting, fold_status)
138}
139
[docs]140/// [[ test~system-sd-notify-watchdog-integration, Systemd Watchdog Health Aggregation Tests ]]
141#[cfg(test)]
142mod tests {
143    use cda_health::Status;
144
145    use super::fold_status;
146
147    #[test]
148    fn fold_up_then_failed_yields_failed() {
149        assert_eq!(fold_status(Status::Up, Status::Failed), Status::Failed);
150    }
151
152    #[test]
153    fn fold_starting_then_failed_yields_failed() {
154        assert_eq!(
155            fold_status(Status::Starting, Status::Failed),
156            Status::Failed
157        );
158    }
159
160    #[test]
161    fn fold_pending_then_failed_yields_failed() {
162        assert_eq!(fold_status(Status::Pending, Status::Failed), Status::Failed);
163    }
164
165    #[test]
166    fn fold_failed_then_up_yields_failed() {
167        assert_eq!(fold_status(Status::Failed, Status::Up), Status::Failed);
168    }
169
170    #[test]
171    fn fold_failed_then_starting_yields_failed() {
172        assert_eq!(
173            fold_status(Status::Failed, Status::Starting),
174            Status::Failed
175        );
176    }
177
178    #[test]
179    fn fold_failed_then_pending_yields_failed() {
180        assert_eq!(fold_status(Status::Failed, Status::Pending), Status::Failed);
181    }
182
183    #[test]
184    fn fold_failed_then_failed_yields_failed() {
185        assert_eq!(fold_status(Status::Failed, Status::Failed), Status::Failed);
186    }
187
188    #[test]
189    fn fold_starting_then_pending_yields_starting() {
190        assert_eq!(
191            fold_status(Status::Starting, Status::Pending),
192            Status::Starting
193        );
194    }
195
196    #[test]
197    fn fold_pending_then_pending_yields_starting() {
198        assert_eq!(
199            fold_status(Status::Pending, Status::Pending),
200            Status::Starting
201        );
202    }
203
204    #[test]
205    fn fold_starting_then_up_yields_up() {
206        assert_eq!(fold_status(Status::Starting, Status::Up), Status::Up);
207    }
208
209    #[test]
210    fn fold_pending_then_up_yields_up() {
211        assert_eq!(fold_status(Status::Pending, Status::Up), Status::Up);
212    }
213
214    #[test]
215    fn fold_up_then_up_yields_up() {
216        assert_eq!(fold_status(Status::Up, Status::Up), Status::Up);
217    }
218
219    #[test]
220    fn fold_up_then_starting_yields_starting() {
221        assert_eq!(fold_status(Status::Up, Status::Starting), Status::Starting);
222    }
223
224    #[test]
225    fn fold_up_then_pending_yields_pending() {
226        assert_eq!(fold_status(Status::Up, Status::Pending), Status::Pending);
227    }
228
229    #[test]
230    fn fold_sequence_all_up_from_starting() {
231        // initial acc = Starting (as fold_health_state does), all providers Up
232        let result = [Status::Up, Status::Up, Status::Up]
233            .into_iter()
234            .fold(Status::Starting, fold_status);
235        assert_eq!(result, Status::Up);
236    }
237
238    #[test]
239    fn fold_sequence_any_failed_dominates() {
240        let result = [Status::Up, Status::Failed, Status::Up]
241            .into_iter()
242            .fold(Status::Starting, fold_status);
243        assert_eq!(result, Status::Failed);
244    }
245
246    #[test]
247    fn fold_sequence_failed_at_start_stays_failed() {
248        let result = [Status::Failed, Status::Up, Status::Up]
249            .into_iter()
250            .fold(Status::Starting, fold_status);
251        assert_eq!(result, Status::Failed);
252    }
253
254    #[test]
255    fn fold_sequence_all_starting_stays_starting() {
256        let result = [Status::Starting, Status::Starting]
257            .into_iter()
258            .fold(Status::Starting, fold_status);
259        assert_eq!(result, Status::Starting);
260    }
261
262    #[test]
263    fn fold_sequence_mix_starting_pending_yields_starting() {
264        let result = [Status::Pending, Status::Starting, Status::Pending]
265            .into_iter()
266            .fold(Status::Starting, fold_status);
267        assert_eq!(result, Status::Starting);
268    }
269
270    #[test]
271    fn fold_sequence_empty_providers_yields_starting() {
272        // no providers -> fold over empty iterator -> initial accumulator = Starting
273        let result = std::iter::empty::<Status>().fold(Status::Starting, fold_status);
274        assert_eq!(result, Status::Starting);
275    }
276
277    #[tokio::test]
278    async fn fold_health_state_none_yields_up() {
279        let result = super::fold_health_state(None).await;
280        assert_eq!(result, Status::Up);
281    }
282}