From ebaa7edb839450ad117a01a14d0913e7c8e972f3 Mon Sep 17 00:00:00 2001 From: Fernando Ledesma Date: Tue, 31 Mar 2026 12:04:24 -0500 Subject: [PATCH 1/2] lsps1: Add `prune_orders` API to remove completed order state Add `prune_orders(counterparty_node_id, max_age: Duration)` to both `LSPS1ServiceHandler` and `LSPS1ServiceHandlerSync`. It removes all terminal orders (`CompletedAndChannelOpened` / `FailedAndRefunded`) for a given peer that are at least `max_age` old, persists the updated state, and returns the number of entries removed. Passing `Duration::ZERO` prunes all terminal orders immediately regardless of age, which is the recommended approach to unblock a client that has hit the per-peer request limit due to accumulated failed orders. On the `PeerState` layer, `prune_terminal_orders(now, max_age)` uses `retain` for a single-pass removal and sets `needs_persist` only when at least one entry is removed. --- lightning-liquidity/src/lsps1/peer_state.rs | 177 ++++++++++++++++++++ lightning-liquidity/src/lsps1/service.rs | 66 ++++++++ 2 files changed, 243 insertions(+) diff --git a/lightning-liquidity/src/lsps1/peer_state.rs b/lightning-liquidity/src/lsps1/peer_state.rs index 6e1889749ae..d814d6641ce 100644 --- a/lightning-liquidity/src/lsps1/peer_state.rs +++ b/lightning-liquidity/src/lsps1/peer_state.rs @@ -17,6 +17,8 @@ use super::msgs::{ use crate::lsps0::ser::{LSPSDateTime, LSPSRequestId}; use crate::prelude::HashMap; +use core::time::Duration; + use lightning::util::hash_tables::new_hash_map; use lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum}; @@ -397,6 +399,36 @@ impl PeerState { }); } + /// Removes all terminal orders from state that are at least `max_age` old. + /// + /// Terminal orders are those in the [`ChannelOrderState::CompletedAndChannelOpened`] or + /// [`ChannelOrderState::FailedAndRefunded`] state. `max_age` is measured from the order's + /// `created_at` timestamp. Pass [`Duration::ZERO`] to prune all terminal orders regardless + /// of age, which is useful to immediately free per-peer quota when a client is blocked by + /// the request limit due to accumulated `FailedAndRefunded` entries. + /// + /// Returns the number of orders removed. + pub(super) fn prune_terminal_orders(&mut self, now: &LSPSDateTime, max_age: Duration) -> usize { + let mut pruned = 0usize; + self.outbound_channels_by_order_id.retain(|_order_id, order| { + let is_terminal = matches!( + order.state, + ChannelOrderState::CompletedAndChannelOpened { .. } + | ChannelOrderState::FailedAndRefunded { .. } + ); + if is_terminal && now.duration_since(&order.created_at) >= max_age { + pruned += 1; + false + } else { + true + } + }); + if pruned > 0 { + self.needs_persist |= true; + } + pruned + } + fn pending_requests_and_unpaid_orders(&self) -> usize { let pending_requests = self.pending_requests.len(); // We exclude paid and completed orders. @@ -778,4 +810,149 @@ mod tests { // Available in CompletedAndChannelOpened assert_eq!(state.channel_info(), Some(&channel_info)); } + + fn create_test_order_params() -> LSPS1OrderParams { + LSPS1OrderParams { + lsp_balance_sat: 100_000, + client_balance_sat: 0, + required_channel_confirmations: 0, + funding_confirms_within_blocks: 6, + channel_expiry_blocks: 144, + token: None, + announce_channel: false, + } + } + + #[test] + fn test_prune_terminal_orders_completed() { + let mut peer_state = PeerState::default(); + let order_id = LSPS1OrderId("order1".to_string()); + peer_state.new_order( + order_id.clone(), + create_test_order_params(), + LSPSDateTime::from_str("2024-01-01T00:00:00Z").unwrap(), + create_test_payment_info_bolt11_only(), + ); + peer_state.order_payment_received(&order_id, PaymentMethod::Bolt11).unwrap(); + peer_state.order_channel_opened(&order_id, create_test_channel_info()).unwrap(); + + // max_age=0 prunes all terminal orders regardless of age. + let now = LSPSDateTime::from_str("2024-01-01T01:00:00Z").unwrap(); + assert_eq!(peer_state.prune_terminal_orders(&now, Duration::ZERO), 1); + assert!(peer_state.get_order(&order_id).is_err()); + } + + #[test] + fn test_prune_terminal_orders_failed_and_refunded() { + let mut peer_state = PeerState::default(); + let order_id = LSPS1OrderId("order2".to_string()); + // Non-expired invoice: verify we do not require invoice expiry before pruning. + peer_state.new_order( + order_id.clone(), + create_test_order_params(), + LSPSDateTime::from_str("2024-01-01T00:00:00Z").unwrap(), + create_test_payment_info_bolt11_only(), + ); + peer_state.order_failed_and_refunded(&order_id).unwrap(); + + let now = LSPSDateTime::from_str("2024-01-01T01:00:00Z").unwrap(); + assert_eq!(peer_state.prune_terminal_orders(&now, Duration::ZERO), 1); + assert!(peer_state.get_order(&order_id).is_err()); + } + + #[test] + fn test_prune_terminal_orders_age_filter() { + let mut peer_state = PeerState::default(); + + // Old order (2 hours before now) — must be pruned when max_age = 1 hour. + let old_id = LSPS1OrderId("old".to_string()); + peer_state.new_order( + old_id.clone(), + create_test_order_params(), + LSPSDateTime::from_str("2024-01-01T00:00:00Z").unwrap(), + create_test_payment_info_bolt11_only(), + ); + peer_state.order_failed_and_refunded(&old_id).unwrap(); + + // Recent order (10 minutes before now) — must NOT be pruned when max_age = 1 hour. + let recent_id = LSPS1OrderId("recent".to_string()); + peer_state.new_order( + recent_id.clone(), + create_test_order_params(), + LSPSDateTime::from_str("2024-01-01T01:50:00Z").unwrap(), + create_test_payment_info_bolt11_only(), + ); + peer_state.order_failed_and_refunded(&recent_id).unwrap(); + + let now = LSPSDateTime::from_str("2024-01-01T02:00:00Z").unwrap(); + let pruned = peer_state.prune_terminal_orders(&now, Duration::from_secs(3600)); + assert_eq!(pruned, 1); + assert!(peer_state.get_order(&old_id).is_err()); + assert!(peer_state.get_order(&recent_id).is_ok()); + } + + #[test] + fn test_prune_terminal_orders_non_terminal_skipped() { + let mut peer_state = PeerState::default(); + + // ExpectingPayment is not a terminal state. + let expecting_id = LSPS1OrderId("expecting".to_string()); + peer_state.new_order( + expecting_id.clone(), + create_test_order_params(), + LSPSDateTime::from_str("2024-01-01T00:00:00Z").unwrap(), + create_test_payment_info_bolt11_only(), + ); + + // OrderPaid is not a terminal state. + let paid_id = LSPS1OrderId("paid".to_string()); + peer_state.new_order( + paid_id.clone(), + create_test_order_params(), + LSPSDateTime::from_str("2024-01-01T00:00:00Z").unwrap(), + create_test_payment_info_bolt11_only(), + ); + peer_state.order_payment_received(&paid_id, PaymentMethod::Bolt11).unwrap(); + + let now = LSPSDateTime::from_str("2024-01-01T02:00:00Z").unwrap(); + assert_eq!(peer_state.prune_terminal_orders(&now, Duration::ZERO), 0); + assert!(peer_state.get_order(&expecting_id).is_ok()); + assert!(peer_state.get_order(&paid_id).is_ok()); + } + + #[test] + fn test_prune_terminal_orders_frees_quota() { + let mut peer_state = PeerState::default(); + + // Fill up to the limit with FailedAndRefunded orders. + for i in 0..MAX_PENDING_REQUESTS_PER_PEER { + let order_id = LSPS1OrderId(format!("order{}", i)); + peer_state.new_order( + order_id.clone(), + create_test_order_params(), + LSPSDateTime::from_str("2024-01-01T00:00:00Z").unwrap(), + create_test_payment_info_bolt11_only(), + ); + peer_state.order_failed_and_refunded(&order_id).unwrap(); + } + + // Registering another request must fail: quota is exhausted. + let dummy_request = LSPS1Request::GetInfo(Default::default()); + assert!(matches!( + peer_state.register_request(LSPSRequestId("r0".to_string()), dummy_request.clone()), + Err(PeerStateError::TooManyPendingRequests) + )); + + // Prune all failed orders with max_age=0. + let now = LSPSDateTime::from_str("2024-01-01T01:00:00Z").unwrap(); + assert_eq!( + peer_state.prune_terminal_orders(&now, Duration::ZERO), + MAX_PENDING_REQUESTS_PER_PEER + ); + + // Now registering a new request must succeed. + assert!(peer_state + .register_request(LSPSRequestId("r1".to_string()), dummy_request) + .is_ok()); + } } diff --git a/lightning-liquidity/src/lsps1/service.rs b/lightning-liquidity/src/lsps1/service.rs index 0e139907589..c77329b508b 100644 --- a/lightning-liquidity/src/lsps1/service.rs +++ b/lightning-liquidity/src/lsps1/service.rs @@ -17,6 +17,7 @@ use core::ops::Deref; use core::pin::pin; use core::sync::atomic::{AtomicUsize, Ordering}; use core::task; +use core::time::Duration; use super::event::LSPS1ServiceEvent; use super::msgs::{ @@ -752,6 +753,52 @@ where Ok(()) } + /// Prunes terminal orders for a peer that are at least `max_age` old, freeing memory and + /// per-peer quota. + /// + /// Terminal orders are those in the [`LSPS1OrderState::Completed`] or + /// [`LSPS1OrderState::Failed`] state. `max_age` is measured from each order's `created_at` + /// timestamp. Pass [`Duration::ZERO`] to prune all terminal orders regardless of age, + /// which is useful to immediately free per-peer quota when a client is blocked by the + /// per-peer request limit due to accumulated failed orders. + /// + /// Returns the number of orders removed, or an [`APIError::APIMisuseError`] if no state + /// exists for the given counterparty. + pub async fn prune_orders( + &self, counterparty_node_id: PublicKey, max_age: Duration, + ) -> Result { + let now = + LSPSDateTime::new_from_duration_since_epoch(self.time_provider.duration_since_epoch()); + let pruned; + { + let outer_state_lock = self.per_peer_state.read().unwrap(); + let inner_state_lock = + outer_state_lock.get(&counterparty_node_id).ok_or_else(|| { + APIError::APIMisuseError { + err: format!( + "No existing state with counterparty {}", + counterparty_node_id + ), + } + })?; + let mut peer_state = inner_state_lock.lock().unwrap(); + pruned = peer_state.prune_terminal_orders(&now, max_age); + } + + if pruned > 0 { + self.persist_peer_state(counterparty_node_id).await.map_err(|e| { + APIError::APIMisuseError { + err: format!( + "Failed to persist peer state for {}: {}", + counterparty_node_id, e + ), + } + })?; + } + + Ok(pruned) + } + fn generate_order_id(&self) -> LSPS1OrderId { let bytes = self.entropy_source.get_secure_random_bytes(); LSPS1OrderId(utils::hex_str(&bytes[0..16])) @@ -930,6 +977,25 @@ where }, } } + + /// Prunes terminal orders for a peer that are at least `max_age` old. + /// + /// Wraps [`LSPS1ServiceHandler::prune_orders`]. + pub fn prune_orders( + &self, counterparty_node_id: PublicKey, max_age: Duration, + ) -> Result { + let mut fut = pin!(self.inner.prune_orders(counterparty_node_id, max_age)); + + let mut waker = dummy_waker(); + let mut ctx = task::Context::from_waker(&mut waker); + match fut.as_mut().poll(&mut ctx) { + task::Poll::Ready(result) => result, + task::Poll::Pending => { + // In a sync context, we can't wait for the future to complete. + unreachable!("Should not be pending in a sync context"); + }, + } + } } fn check_range(min: u64, max: u64, value: u64) -> bool { From 057fdc3d64b6b60754907bf48582641513b897f0 Mon Sep 17 00:00:00 2001 From: Fernando Ledesma Date: Mon, 13 Apr 2026 16:55:07 -0500 Subject: [PATCH 2/2] lsps2: Track channel creation time via `created_at` field Add a `created_at: LSPSDateTime` field to `OutboundJITChannel` to record when each JIT channel was created (i.e., when the buy request was accepted by the LSP). This timestamp is needed to implement time-based bulk pruning of completed channel state. The field is persisted as TLV type 10 with a `default_value` of Unix epoch, ensuring old serialized data (without TLV 10) is read back successfully with the epoch sentinel rather than failing deserialization. --- lightning-liquidity/src/lsps2/service.rs | 85 +++++++++++++++++++++--- lightning-liquidity/src/manager.rs | 9 +-- 2 files changed, 79 insertions(+), 15 deletions(-) diff --git a/lightning-liquidity/src/lsps2/service.rs b/lightning-liquidity/src/lsps2/service.rs index b7f6f2fc64d..df17534c752 100644 --- a/lightning-liquidity/src/lsps2/service.rs +++ b/lightning-liquidity/src/lsps2/service.rs @@ -19,13 +19,15 @@ use core::ops::Deref; use core::pin::pin; use core::sync::atomic::{AtomicUsize, Ordering}; use core::task; +use core::time::Duration; use crate::events::EventQueue; use crate::lsps0::ser::{ - LSPSMessage, LSPSProtocolMessageHandler, LSPSRequestId, LSPSResponseError, + LSPSDateTime, LSPSMessage, LSPSProtocolMessageHandler, LSPSRequestId, LSPSResponseError, JSONRPC_INTERNAL_ERROR_ERROR_CODE, JSONRPC_INTERNAL_ERROR_ERROR_MESSAGE, LSPS0_CLIENT_REJECTED_ERROR_CODE, }; +use crate::utils::time::TimeProvider; use crate::lsps2::event::LSPS2ServiceEvent; use crate::lsps2::payment_queue::{InterceptedHTLC, PaymentQueue}; use crate::lsps2::utils::{ @@ -497,6 +499,8 @@ struct OutboundJITChannel { opening_fee_params: LSPS2OpeningFeeParams, payment_size_msat: Option, trust_model: TrustModel, + /// The time at which the JIT channel was created (i.e., the buy request was accepted). + created_at: LSPSDateTime, } impl_writeable_tlv_based!(OutboundJITChannel, { @@ -505,12 +509,13 @@ impl_writeable_tlv_based!(OutboundJITChannel, { (4, opening_fee_params, required), (6, payment_size_msat, option), (8, trust_model, required), + (10, created_at, (default_value, LSPSDateTime::new_from_duration_since_epoch(Duration::ZERO))), }); impl OutboundJITChannel { fn new( payment_size_msat: Option, opening_fee_params: LSPS2OpeningFeeParams, - user_channel_id: u128, client_trusts_lsp: bool, + user_channel_id: u128, client_trusts_lsp: bool, created_at: LSPSDateTime, ) -> Self { Self { user_channel_id, @@ -518,6 +523,7 @@ impl OutboundJITChannel { opening_fee_params, payment_size_msat, trust_model: TrustModel::new(client_trusts_lsp), + created_at, } } @@ -702,9 +708,10 @@ macro_rules! get_or_insert_peer_state_entry { } /// The main object allowing to send and receive bLIP-52 / LSPS2 messages. -pub struct LSPS2ServiceHandler +pub struct LSPS2ServiceHandler where CM::Target: AChannelManager, + TP::Target: TimeProvider, { channel_manager: CM, kv_store: K, @@ -717,17 +724,20 @@ where total_pending_requests: AtomicUsize, config: LSPS2ServiceConfig, persistence_in_flight: AtomicUsize, + time_provider: TP, } -impl LSPS2ServiceHandler +impl + LSPS2ServiceHandler where CM::Target: AChannelManager, + TP::Target: TimeProvider, { /// Constructs a `LSPS2ServiceHandler`. pub(crate) fn new( per_peer_state: HashMap>, pending_messages: Arc, pending_events: Arc>, channel_manager: CM, kv_store: K, tx_broadcaster: T, - config: LSPS2ServiceConfig, + config: LSPS2ServiceConfig, time_provider: TP, ) -> Result { let mut peer_by_intercept_scid = new_hash_map(); let mut peer_by_channel_id = new_hash_map(); @@ -768,6 +778,7 @@ where kv_store, tx_broadcaster, config, + time_provider, }) } @@ -921,11 +932,15 @@ where peer_by_intercept_scid.insert(intercept_scid, *counterparty_node_id); } + let created_at = LSPSDateTime::new_from_duration_since_epoch( + self.time_provider.duration_since_epoch(), + ); let outbound_jit_channel = OutboundJITChannel::new( buy_request.payment_size_msat, buy_request.opening_fee_params, user_channel_id, client_trusts_lsp, + created_at, ); peer_state_lock @@ -2050,10 +2065,11 @@ where } } -impl LSPSProtocolMessageHandler - for LSPS2ServiceHandler +impl + LSPSProtocolMessageHandler for LSPS2ServiceHandler where CM::Target: AChannelManager, + TP::Target: TimeProvider, { type ProtocolMessage = LSPS2Message; const PROTOCOL_NUMBER: Option = Some(2); @@ -2128,18 +2144,21 @@ pub struct LSPS2ServiceHandlerSync< CM: Deref, K: KVStore + Clone, T: BroadcasterInterface + Clone, + TP: Deref + Clone, > where CM::Target: AChannelManager, + TP::Target: TimeProvider, { - inner: &'a LSPS2ServiceHandler, + inner: &'a LSPS2ServiceHandler, } -impl<'a, CM: Deref, K: KVStore + Clone, T: BroadcasterInterface + Clone> - LSPS2ServiceHandlerSync<'a, CM, K, T> +impl<'a, CM: Deref, K: KVStore + Clone, T: BroadcasterInterface + Clone, TP: Deref + Clone> + LSPS2ServiceHandlerSync<'a, CM, K, T, TP> where CM::Target: AChannelManager, + TP::Target: TimeProvider, { - pub(crate) fn from_inner(inner: &'a LSPS2ServiceHandler) -> Self { + pub(crate) fn from_inner(inner: &'a LSPS2ServiceHandler) -> Self { Self { inner } } @@ -2785,6 +2804,7 @@ mod tests { opening_fee_params.clone(), user_channel_id, true, + LSPSDateTime::from_str("2024-01-01T00:00:00Z").unwrap(), ); let opening_payment_hash = PaymentHash([42; 32]); @@ -2864,4 +2884,47 @@ mod tests { "Broadcast was not allowed even though all the skimmed fees were collected" ); } + + #[test] + fn test_outbound_jit_channel_created_at_stored() { + let opening_fee_params = LSPS2OpeningFeeParams { + min_fee_msat: 1_000, + proportional: 0, + valid_until: LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap(), + min_lifetime: 144, + max_client_to_self_delay: 128, + min_payment_size_msat: 1, + max_payment_size_msat: 10_000_000_000, + promise: "ignore".to_string(), + }; + let created_at = LSPSDateTime::from_str("2024-06-15T12:00:00Z").unwrap(); + let channel = + OutboundJITChannel::new(Some(1_000_000), opening_fee_params, 1u128, true, created_at); + assert_eq!(channel.created_at, created_at); + } + + #[test] + fn test_outbound_jit_channel_created_at_round_trips() { + use lightning::util::ser::{Readable, Writeable}; + + let opening_fee_params = LSPS2OpeningFeeParams { + min_fee_msat: 1_000, + proportional: 0, + valid_until: LSPSDateTime::from_str("2035-05-20T08:30:45Z").unwrap(), + min_lifetime: 144, + max_client_to_self_delay: 128, + min_payment_size_msat: 1, + max_payment_size_msat: 10_000_000_000, + promise: "ignore".to_string(), + }; + let created_at = LSPSDateTime::from_str("2024-06-15T12:00:00Z").unwrap(); + let channel = + OutboundJITChannel::new(Some(1_000_000), opening_fee_params, 1u128, true, created_at); + + let mut buf = Vec::new(); + channel.write(&mut buf).unwrap(); + + let decoded = ::read(&mut &buf[..]).unwrap(); + assert_eq!(decoded.created_at, created_at); + } } diff --git a/lightning-liquidity/src/manager.rs b/lightning-liquidity/src/manager.rs index f1b098dbfaa..b34dc8d98b9 100644 --- a/lightning-liquidity/src/manager.rs +++ b/lightning-liquidity/src/manager.rs @@ -283,7 +283,7 @@ pub struct LiquidityManager< lsps0_service_handler: Option, lsps1_service_handler: Option>, lsps1_client_handler: Option>, - lsps2_service_handler: Option>, + lsps2_service_handler: Option>, lsps2_client_handler: Option>, lsps5_service_handler: Option>, lsps5_client_handler: Option>, @@ -377,7 +377,7 @@ where let lsps2_service_handler = if let Some(service_config) = service_config.as_ref() { if let Some(lsps2_service_config) = service_config.lsps2_service_config.as_ref() { if let Some(number) = - as LSPSProtocolMessageHandler>::PROTOCOL_NUMBER + as LSPSProtocolMessageHandler>::PROTOCOL_NUMBER { supported_protocols.push(number); } @@ -391,6 +391,7 @@ where kv_store.clone(), transaction_broadcaster.clone(), lsps2_service_config.clone(), + time_provider.clone(), )?) } else { None @@ -540,7 +541,7 @@ where /// Returns a reference to the LSPS2 server-side handler. /// /// The returned hendler allows to initiate the LSPS2 service-side flow. - pub fn lsps2_service_handler(&self) -> Option<&LSPS2ServiceHandler> { + pub fn lsps2_service_handler(&self) -> Option<&LSPS2ServiceHandler> { self.lsps2_service_handler.as_ref() } @@ -1053,7 +1054,7 @@ where /// Wraps [`LiquidityManager::lsps2_service_handler`]. pub fn lsps2_service_handler<'a>( &'a self, - ) -> Option, T>> { + ) -> Option, T, TP>> { self.inner.lsps2_service_handler.as_ref().map(|r| LSPS2ServiceHandlerSync::from_inner(r)) }