diff --git a/docs/api-guide.md b/docs/api-guide.md index 63594f3d..9d4621f0 100644 --- a/docs/api-guide.md +++ b/docs/api-guide.md @@ -188,7 +188,7 @@ See [Pagination](#pagination) below for how to page through results. | RPC | Description | |-------------------|-------------------------------------------------------------| -| `SubscribeEvents` | **Server-streaming.** Subscribe to real-time payment events | +| `SubscribeEvents` | **Server-streaming.** Subscribe to real-time payment and channel events | `SubscribeEvents` returns a stream of `EventEnvelope` messages. Each envelope contains one of: @@ -199,6 +199,7 @@ See [Pagination](#pagination) below for how to page through results. | `PaymentFailed` | An outbound payment failed | | `PaymentClaimable` | A hodl invoice payment arrived and is waiting to be claimed or failed | | `PaymentForwarded` | A payment was routed through this node | +| `ChannelStateChanged` | A channel changed state (pending, ready, open failed, closed) | Events are broadcast to all connected subscribers. The server uses a bounded broadcast channel (capacity 1024). A slow subscriber that falls behind will miss events. diff --git a/e2e-tests/tests/e2e.rs b/e2e-tests/tests/e2e.rs index 8d0ef7c4..c22f1bc7 100644 --- a/e2e-tests/tests/e2e.rs +++ b/e2e-tests/tests/e2e.rs @@ -12,7 +12,8 @@ use std::time::Duration; use e2e_tests::{ find_available_port, mine_and_sync, run_cli, run_cli_raw, setup_funded_channel, - wait_for_onchain_balance, LdkServerConfig, LdkServerHandle, TestBitcoind, + wait_for_onchain_balance, wait_for_usable_channel, LdkServerConfig, LdkServerHandle, + TestBitcoind, }; use hex_conservative::{DisplayHex, FromHex}; use ldk_node::bitcoin::hashes::{sha256, Hash}; @@ -21,10 +22,10 @@ use ldk_node::lightning::offers::offer::Offer; use ldk_node::lightning_invoice::Bolt11Invoice; use ldk_server_client::client::EventStream; use ldk_server_client::ldk_server_grpc::api::{ - Bolt11ReceiveRequest, Bolt12ReceiveRequest, OnchainReceiveRequest, + Bolt11ReceiveRequest, Bolt12ReceiveRequest, OnchainReceiveRequest, OpenChannelRequest, }; use ldk_server_client::ldk_server_grpc::events::event_envelope::Event; -use ldk_server_client::ldk_server_grpc::events::EventEnvelope; +use ldk_server_client::ldk_server_grpc::events::{ChannelState, EventEnvelope}; use ldk_server_client::ldk_server_grpc::types::{ bolt11_invoice_description, Bolt11InvoiceDescription, }; @@ -410,6 +411,160 @@ async fn test_cli_open_channel() { assert!(!output["user_channel_id"].as_str().unwrap().is_empty()); } +#[tokio::test] +async fn test_subscribe_events_channel_state_lifecycle_pending_ready_closed() { + let bitcoind = TestBitcoind::new(); + let server_a = LdkServerHandle::start(&bitcoind).await; + let server_b = LdkServerHandle::start(&bitcoind).await; + + let addr_a = server_a.client().onchain_receive(OnchainReceiveRequest {}).await.unwrap().address; + let addr_b = server_b.client().onchain_receive(OnchainReceiveRequest {}).await.unwrap().address; + bitcoind.fund_address(&addr_a, 1.0); + bitcoind.fund_address(&addr_b, 0.1); + mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await; + wait_for_onchain_balance(server_a.client(), Duration::from_secs(30)).await; + wait_for_onchain_balance(server_b.client(), Duration::from_secs(30)).await; + + let mut events_a = server_a.client().subscribe_events().await.unwrap(); + + let open_resp = server_a + .client() + .open_channel(OpenChannelRequest { + node_pubkey: server_b.node_id().to_string(), + address: format!("127.0.0.1:{}", server_b.p2p_port), + channel_amount_sats: 100_000, + push_to_counterparty_msat: None, + channel_config: None, + announce_channel: true, + disable_counterparty_reserve: false, + }) + .await + .unwrap(); + + let pending = wait_for_event(&mut events_a, |e| { + matches!( + e, + Event::ChannelStateChanged(channel_event) + if channel_event.user_channel_id == open_resp.user_channel_id + && channel_event.state == ChannelState::Pending as i32 + ) + }) + .await; + assert!(matches!(pending.event, Some(Event::ChannelStateChanged(_)))); + + mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await; + wait_for_usable_channel(server_a.client(), &bitcoind, Duration::from_secs(60)).await; + + let ready = wait_for_event(&mut events_a, |e| { + matches!( + e, + Event::ChannelStateChanged(channel_event) + if channel_event.user_channel_id == open_resp.user_channel_id + && channel_event.state == ChannelState::Ready as i32 + ) + }) + .await; + assert!(matches!(ready.event, Some(Event::ChannelStateChanged(_)))); + + run_cli(&server_a, &["close-channel", &open_resp.user_channel_id, server_b.node_id()]); + mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await; + + let closed = wait_for_event(&mut events_a, |e| { + matches!( + e, + Event::ChannelStateChanged(channel_event) + if channel_event.user_channel_id == open_resp.user_channel_id + && channel_event.state == ChannelState::Closed as i32 + ) + }) + .await; + + match closed.event { + Some(Event::ChannelStateChanged(channel_event)) => { + assert_eq!(channel_event.user_channel_id, open_resp.user_channel_id); + assert_eq!(channel_event.state, ChannelState::Closed as i32); + }, + other => panic!("expected ChannelStateChanged event, got {other:?}"), + } +} + +#[tokio::test] +async fn test_subscribe_events_channel_state_lifecycle_pending_ready_force_closed() { + let bitcoind = TestBitcoind::new(); + let server_a = LdkServerHandle::start(&bitcoind).await; + let server_b = LdkServerHandle::start(&bitcoind).await; + + let addr_a = server_a.client().onchain_receive(OnchainReceiveRequest {}).await.unwrap().address; + let addr_b = server_b.client().onchain_receive(OnchainReceiveRequest {}).await.unwrap().address; + bitcoind.fund_address(&addr_a, 1.0); + bitcoind.fund_address(&addr_b, 0.1); + mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await; + wait_for_onchain_balance(server_a.client(), Duration::from_secs(30)).await; + wait_for_onchain_balance(server_b.client(), Duration::from_secs(30)).await; + + let mut events_a = server_a.client().subscribe_events().await.unwrap(); + + let open_resp = server_a + .client() + .open_channel(OpenChannelRequest { + node_pubkey: server_b.node_id().to_string(), + address: format!("127.0.0.1:{}", server_b.p2p_port), + channel_amount_sats: 100_000, + push_to_counterparty_msat: None, + channel_config: None, + announce_channel: true, + disable_counterparty_reserve: false, + }) + .await + .unwrap(); + + let pending = wait_for_event(&mut events_a, |e| { + matches!( + e, + Event::ChannelStateChanged(channel_event) + if channel_event.user_channel_id == open_resp.user_channel_id + && channel_event.state == ChannelState::Pending as i32 + ) + }) + .await; + assert!(matches!(pending.event, Some(Event::ChannelStateChanged(_)))); + + mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await; + wait_for_usable_channel(server_a.client(), &bitcoind, Duration::from_secs(60)).await; + + let ready = wait_for_event(&mut events_a, |e| { + matches!( + e, + Event::ChannelStateChanged(channel_event) + if channel_event.user_channel_id == open_resp.user_channel_id + && channel_event.state == ChannelState::Ready as i32 + ) + }) + .await; + assert!(matches!(ready.event, Some(Event::ChannelStateChanged(_)))); + + run_cli(&server_a, &["force-close-channel", &open_resp.user_channel_id, server_b.node_id()]); + mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await; + + let closed = wait_for_event(&mut events_a, |e| { + matches!( + e, + Event::ChannelStateChanged(channel_event) + if channel_event.user_channel_id == open_resp.user_channel_id + && channel_event.state == ChannelState::Closed as i32 + ) + }) + .await; + + match closed.event { + Some(Event::ChannelStateChanged(channel_event)) => { + assert_eq!(channel_event.user_channel_id, open_resp.user_channel_id); + assert_eq!(channel_event.state, ChannelState::Closed as i32); + }, + other => panic!("expected ChannelStateChanged event, got {other:?}"), + } +} + #[tokio::test] async fn test_cli_list_channels() { let bitcoind = TestBitcoind::new(); diff --git a/ldk-server-client/README.md b/ldk-server-client/README.md index 65648a5b..41eaeddd 100644 --- a/ldk-server-client/README.md +++ b/ldk-server-client/README.md @@ -34,7 +34,7 @@ The client handles HMAC-SHA256 authentication automatically. Pass the hex-encode ## Event Streaming -Subscribe to real-time payment events: +Subscribe to real-time payment and channel events: ```rust,no_run # use ldk_server_client::client::LdkServerClient; @@ -52,6 +52,39 @@ while let Some(result) = stream.next_message().await { # } ``` +Pattern-match channel state changes: + +```rust,no_run +# use ldk_server_client::client::LdkServerClient; +# use ldk_server_client::ldk_server_grpc::events::{event_envelope, ChannelState}; +# #[tokio::main] +# async fn main() { +# let cert_pem = std::fs::read("/path/to/tls.crt").unwrap(); +# let client = LdkServerClient::new("localhost:3536".to_string(), "key".to_string(), &cert_pem).unwrap(); +let mut stream = client.subscribe_events().await.unwrap(); +while let Some(result) = stream.next_message().await { + match result { + Ok(event) => { + if let Some(event_envelope::Event::ChannelStateChanged(channel_event)) = event.event { + let state = ChannelState::from_i32(channel_event.state) + .unwrap_or(ChannelState::Unspecified); + println!( + "channel {} -> {}", + channel_event.channel_id, + state.as_str_name() + ); + + if let Some(reason) = channel_event.reason { + println!("reason: {}", reason.message); + } + } + } + Err(e) => eprintln!("Error: {}", e), + } +} +# } +``` + ## Features - **`serde`**: Enables `serde::Serialize` and `serde::Deserialize` on all proto types diff --git a/ldk-server-grpc/src/events.rs b/ldk-server-grpc/src/events.rs index c2e74fcc..6582d423 100644 --- a/ldk-server-grpc/src/events.rs +++ b/ldk-server-grpc/src/events.rs @@ -13,7 +13,7 @@ #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct EventEnvelope { - #[prost(oneof = "event_envelope::Event", tags = "2, 3, 4, 6, 7")] + #[prost(oneof = "event_envelope::Event", tags = "2, 3, 4, 6, 7, 8")] pub event: ::core::option::Option, } /// Nested message and enum types in `EventEnvelope`. @@ -33,8 +33,105 @@ pub mod event_envelope { PaymentForwarded(super::PaymentForwarded), #[prost(message, tag = "7")] PaymentClaimable(super::PaymentClaimable), + #[prost(message, tag = "8")] + ChannelStateChanged(super::ChannelStateChanged), } } +#[cfg_attr(feature = "serde", derive(serde::Serialize))] +#[cfg_attr(feature = "serde", serde(rename_all = "snake_case"))] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CounterpartyForceClosedDetails { + #[prost(string, tag = "1")] + pub peer_msg: ::prost::alloc::string::String, +} +#[cfg_attr(feature = "serde", derive(serde::Serialize))] +#[cfg_attr(feature = "serde", serde(rename_all = "snake_case"))] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct HolderForceClosedDetails { + #[prost(bool, optional, tag = "1")] + pub broadcasted_latest_txn: ::core::option::Option, + #[prost(string, tag = "2")] + pub message: ::prost::alloc::string::String, +} +#[cfg_attr(feature = "serde", derive(serde::Serialize))] +#[cfg_attr(feature = "serde", serde(rename_all = "snake_case"))] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ProcessingErrorDetails { + #[prost(string, tag = "1")] + pub err: ::prost::alloc::string::String, +} +#[cfg_attr(feature = "serde", derive(serde::Serialize))] +#[cfg_attr(feature = "serde", serde(rename_all = "snake_case"))] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct HtlcsTimedOutDetails { + #[prost(string, optional, tag = "1")] + pub payment_hash: ::core::option::Option<::prost::alloc::string::String>, +} +#[cfg_attr(feature = "serde", derive(serde::Serialize))] +#[cfg_attr(feature = "serde", serde(rename_all = "snake_case"))] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PeerFeerateTooLowDetails { + #[prost(uint32, tag = "1")] + pub peer_feerate_sat_per_kw: u32, + #[prost(uint32, tag = "2")] + pub required_feerate_sat_per_kw: u32, +} +#[cfg_attr(feature = "serde", derive(serde::Serialize))] +#[cfg_attr(feature = "serde", serde(rename_all = "snake_case"))] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ChannelStateChangeReason { + #[prost(enumeration = "ChannelStateChangeReasonKind", tag = "1")] + pub kind: i32, + #[prost(string, tag = "2")] + pub message: ::prost::alloc::string::String, + #[prost(oneof = "channel_state_change_reason::Details", tags = "3, 4, 5, 6, 7")] + pub details: ::core::option::Option, +} +/// Nested message and enum types in `ChannelStateChangeReason`. +pub mod channel_state_change_reason { + #[cfg_attr(feature = "serde", derive(serde::Serialize))] + #[cfg_attr(feature = "serde", serde(rename_all = "snake_case"))] + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Details { + #[prost(message, tag = "3")] + CounterpartyForceClosed(super::CounterpartyForceClosedDetails), + #[prost(message, tag = "4")] + HolderForceClosed(super::HolderForceClosedDetails), + #[prost(message, tag = "5")] + ProcessingError(super::ProcessingErrorDetails), + #[prost(message, tag = "6")] + HtlcsTimedOut(super::HtlcsTimedOutDetails), + #[prost(message, tag = "7")] + PeerFeerateTooLow(super::PeerFeerateTooLowDetails), + } +} +#[cfg_attr(feature = "serde", derive(serde::Serialize))] +#[cfg_attr(feature = "serde", serde(rename_all = "snake_case"))] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ChannelStateChanged { + #[prost(string, tag = "1")] + pub channel_id: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub user_channel_id: ::prost::alloc::string::String, + #[prost(string, optional, tag = "3")] + pub counterparty_node_id: ::core::option::Option<::prost::alloc::string::String>, + #[prost(enumeration = "ChannelState", tag = "4")] + pub state: i32, + #[prost(string, optional, tag = "5")] + pub funding_txo: ::core::option::Option<::prost::alloc::string::String>, + #[prost(message, optional, tag = "6")] + pub reason: ::core::option::Option, + #[prost(enumeration = "ChannelClosureInitiator", tag = "7")] + pub closure_initiator: i32, +} /// PaymentReceived indicates a payment has been received. #[cfg_attr(feature = "serde", derive(serde::Serialize))] #[cfg_attr(feature = "serde", serde(rename_all = "snake_case"))] @@ -85,3 +182,196 @@ pub struct PaymentForwarded { #[prost(message, optional, tag = "1")] pub forwarded_payment: ::core::option::Option, } +#[cfg_attr(feature = "serde", derive(serde::Serialize))] +#[cfg_attr(feature = "serde", serde(rename_all = "snake_case"))] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum ChannelState { + Unspecified = 0, + Pending = 1, + Ready = 2, + OpenFailed = 3, + Closed = 4, +} +impl ChannelState { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + ChannelState::Unspecified => "CHANNEL_STATE_UNSPECIFIED", + ChannelState::Pending => "CHANNEL_STATE_PENDING", + ChannelState::Ready => "CHANNEL_STATE_READY", + ChannelState::OpenFailed => "CHANNEL_STATE_OPEN_FAILED", + ChannelState::Closed => "CHANNEL_STATE_CLOSED", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "CHANNEL_STATE_UNSPECIFIED" => Some(Self::Unspecified), + "CHANNEL_STATE_PENDING" => Some(Self::Pending), + "CHANNEL_STATE_READY" => Some(Self::Ready), + "CHANNEL_STATE_OPEN_FAILED" => Some(Self::OpenFailed), + "CHANNEL_STATE_CLOSED" => Some(Self::Closed), + _ => None, + } + } +} +#[cfg_attr(feature = "serde", derive(serde::Serialize))] +#[cfg_attr(feature = "serde", serde(rename_all = "snake_case"))] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum ChannelClosureInitiator { + Unspecified = 0, + Local = 1, + Remote = 2, + Unknown = 3, +} +impl ChannelClosureInitiator { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + ChannelClosureInitiator::Unspecified => "CHANNEL_CLOSURE_INITIATOR_UNSPECIFIED", + ChannelClosureInitiator::Local => "CHANNEL_CLOSURE_INITIATOR_LOCAL", + ChannelClosureInitiator::Remote => "CHANNEL_CLOSURE_INITIATOR_REMOTE", + ChannelClosureInitiator::Unknown => "CHANNEL_CLOSURE_INITIATOR_UNKNOWN", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "CHANNEL_CLOSURE_INITIATOR_UNSPECIFIED" => Some(Self::Unspecified), + "CHANNEL_CLOSURE_INITIATOR_LOCAL" => Some(Self::Local), + "CHANNEL_CLOSURE_INITIATOR_REMOTE" => Some(Self::Remote), + "CHANNEL_CLOSURE_INITIATOR_UNKNOWN" => Some(Self::Unknown), + _ => None, + } + } +} +#[cfg_attr(feature = "serde", derive(serde::Serialize))] +#[cfg_attr(feature = "serde", serde(rename_all = "snake_case"))] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum ChannelStateChangeReasonKind { + Unspecified = 0, + CounterpartyForceClosed = 1, + HolderForceClosed = 2, + LegacyCooperativeClosure = 3, + CounterpartyInitiatedCooperativeClosure = 4, + LocallyInitiatedCooperativeClosure = 5, + CommitmentTxConfirmed = 6, + FundingTimedOut = 7, + ProcessingError = 8, + DisconnectedPeer = 9, + OutdatedChannelManager = 10, + CounterpartyCoopClosedUnfundedChannel = 11, + LocallyCoopClosedUnfundedChannel = 12, + FundingBatchClosure = 13, + HtlcsTimedOut = 14, + PeerFeerateTooLow = 15, +} +impl ChannelStateChangeReasonKind { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + ChannelStateChangeReasonKind::Unspecified => { + "CHANNEL_STATE_CHANGE_REASON_KIND_UNSPECIFIED" + }, + ChannelStateChangeReasonKind::CounterpartyForceClosed => { + "CHANNEL_STATE_CHANGE_REASON_KIND_COUNTERPARTY_FORCE_CLOSED" + }, + ChannelStateChangeReasonKind::HolderForceClosed => { + "CHANNEL_STATE_CHANGE_REASON_KIND_HOLDER_FORCE_CLOSED" + }, + ChannelStateChangeReasonKind::LegacyCooperativeClosure => { + "CHANNEL_STATE_CHANGE_REASON_KIND_LEGACY_COOPERATIVE_CLOSURE" + }, + ChannelStateChangeReasonKind::CounterpartyInitiatedCooperativeClosure => { + "CHANNEL_STATE_CHANGE_REASON_KIND_COUNTERPARTY_INITIATED_COOPERATIVE_CLOSURE" + }, + ChannelStateChangeReasonKind::LocallyInitiatedCooperativeClosure => { + "CHANNEL_STATE_CHANGE_REASON_KIND_LOCALLY_INITIATED_COOPERATIVE_CLOSURE" + }, + ChannelStateChangeReasonKind::CommitmentTxConfirmed => { + "CHANNEL_STATE_CHANGE_REASON_KIND_COMMITMENT_TX_CONFIRMED" + }, + ChannelStateChangeReasonKind::FundingTimedOut => { + "CHANNEL_STATE_CHANGE_REASON_KIND_FUNDING_TIMED_OUT" + }, + ChannelStateChangeReasonKind::ProcessingError => { + "CHANNEL_STATE_CHANGE_REASON_KIND_PROCESSING_ERROR" + }, + ChannelStateChangeReasonKind::DisconnectedPeer => { + "CHANNEL_STATE_CHANGE_REASON_KIND_DISCONNECTED_PEER" + }, + ChannelStateChangeReasonKind::OutdatedChannelManager => { + "CHANNEL_STATE_CHANGE_REASON_KIND_OUTDATED_CHANNEL_MANAGER" + }, + ChannelStateChangeReasonKind::CounterpartyCoopClosedUnfundedChannel => { + "CHANNEL_STATE_CHANGE_REASON_KIND_COUNTERPARTY_COOP_CLOSED_UNFUNDED_CHANNEL" + }, + ChannelStateChangeReasonKind::LocallyCoopClosedUnfundedChannel => { + "CHANNEL_STATE_CHANGE_REASON_KIND_LOCALLY_COOP_CLOSED_UNFUNDED_CHANNEL" + }, + ChannelStateChangeReasonKind::FundingBatchClosure => { + "CHANNEL_STATE_CHANGE_REASON_KIND_FUNDING_BATCH_CLOSURE" + }, + ChannelStateChangeReasonKind::HtlcsTimedOut => { + "CHANNEL_STATE_CHANGE_REASON_KIND_HTLCS_TIMED_OUT" + }, + ChannelStateChangeReasonKind::PeerFeerateTooLow => { + "CHANNEL_STATE_CHANGE_REASON_KIND_PEER_FEERATE_TOO_LOW" + }, + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "CHANNEL_STATE_CHANGE_REASON_KIND_UNSPECIFIED" => Some(Self::Unspecified), + "CHANNEL_STATE_CHANGE_REASON_KIND_COUNTERPARTY_FORCE_CLOSED" => { + Some(Self::CounterpartyForceClosed) + }, + "CHANNEL_STATE_CHANGE_REASON_KIND_HOLDER_FORCE_CLOSED" => Some(Self::HolderForceClosed), + "CHANNEL_STATE_CHANGE_REASON_KIND_LEGACY_COOPERATIVE_CLOSURE" => { + Some(Self::LegacyCooperativeClosure) + }, + "CHANNEL_STATE_CHANGE_REASON_KIND_COUNTERPARTY_INITIATED_COOPERATIVE_CLOSURE" => { + Some(Self::CounterpartyInitiatedCooperativeClosure) + }, + "CHANNEL_STATE_CHANGE_REASON_KIND_LOCALLY_INITIATED_COOPERATIVE_CLOSURE" => { + Some(Self::LocallyInitiatedCooperativeClosure) + }, + "CHANNEL_STATE_CHANGE_REASON_KIND_COMMITMENT_TX_CONFIRMED" => { + Some(Self::CommitmentTxConfirmed) + }, + "CHANNEL_STATE_CHANGE_REASON_KIND_FUNDING_TIMED_OUT" => Some(Self::FundingTimedOut), + "CHANNEL_STATE_CHANGE_REASON_KIND_PROCESSING_ERROR" => Some(Self::ProcessingError), + "CHANNEL_STATE_CHANGE_REASON_KIND_DISCONNECTED_PEER" => Some(Self::DisconnectedPeer), + "CHANNEL_STATE_CHANGE_REASON_KIND_OUTDATED_CHANNEL_MANAGER" => { + Some(Self::OutdatedChannelManager) + }, + "CHANNEL_STATE_CHANGE_REASON_KIND_COUNTERPARTY_COOP_CLOSED_UNFUNDED_CHANNEL" => { + Some(Self::CounterpartyCoopClosedUnfundedChannel) + }, + "CHANNEL_STATE_CHANGE_REASON_KIND_LOCALLY_COOP_CLOSED_UNFUNDED_CHANNEL" => { + Some(Self::LocallyCoopClosedUnfundedChannel) + }, + "CHANNEL_STATE_CHANGE_REASON_KIND_FUNDING_BATCH_CLOSURE" => { + Some(Self::FundingBatchClosure) + }, + "CHANNEL_STATE_CHANGE_REASON_KIND_HTLCS_TIMED_OUT" => Some(Self::HtlcsTimedOut), + "CHANNEL_STATE_CHANGE_REASON_KIND_PEER_FEERATE_TOO_LOW" => { + Some(Self::PeerFeerateTooLow) + }, + _ => None, + } + } +} diff --git a/ldk-server-grpc/src/proto/events.proto b/ldk-server-grpc/src/proto/events.proto index 5c5ce2c3..97524fc4 100644 --- a/ldk-server-grpc/src/proto/events.proto +++ b/ldk-server-grpc/src/proto/events.proto @@ -10,9 +10,88 @@ message EventEnvelope { PaymentFailed payment_failed = 4; PaymentForwarded payment_forwarded = 6; PaymentClaimable payment_claimable = 7; + ChannelStateChanged channel_state_changed = 8; } } +enum ChannelState { + CHANNEL_STATE_UNSPECIFIED = 0; + CHANNEL_STATE_PENDING = 1; + CHANNEL_STATE_READY = 2; + CHANNEL_STATE_OPEN_FAILED = 3; + CHANNEL_STATE_CLOSED = 4; +} + +enum ChannelClosureInitiator { + CHANNEL_CLOSURE_INITIATOR_UNSPECIFIED = 0; + CHANNEL_CLOSURE_INITIATOR_LOCAL = 1; + CHANNEL_CLOSURE_INITIATOR_REMOTE = 2; + CHANNEL_CLOSURE_INITIATOR_UNKNOWN = 3; +} + +enum ChannelStateChangeReasonKind { + CHANNEL_STATE_CHANGE_REASON_KIND_UNSPECIFIED = 0; + CHANNEL_STATE_CHANGE_REASON_KIND_COUNTERPARTY_FORCE_CLOSED = 1; + CHANNEL_STATE_CHANGE_REASON_KIND_HOLDER_FORCE_CLOSED = 2; + CHANNEL_STATE_CHANGE_REASON_KIND_LEGACY_COOPERATIVE_CLOSURE = 3; + CHANNEL_STATE_CHANGE_REASON_KIND_COUNTERPARTY_INITIATED_COOPERATIVE_CLOSURE = 4; + CHANNEL_STATE_CHANGE_REASON_KIND_LOCALLY_INITIATED_COOPERATIVE_CLOSURE = 5; + CHANNEL_STATE_CHANGE_REASON_KIND_COMMITMENT_TX_CONFIRMED = 6; + CHANNEL_STATE_CHANGE_REASON_KIND_FUNDING_TIMED_OUT = 7; + CHANNEL_STATE_CHANGE_REASON_KIND_PROCESSING_ERROR = 8; + CHANNEL_STATE_CHANGE_REASON_KIND_DISCONNECTED_PEER = 9; + CHANNEL_STATE_CHANGE_REASON_KIND_OUTDATED_CHANNEL_MANAGER = 10; + CHANNEL_STATE_CHANGE_REASON_KIND_COUNTERPARTY_COOP_CLOSED_UNFUNDED_CHANNEL = 11; + CHANNEL_STATE_CHANGE_REASON_KIND_LOCALLY_COOP_CLOSED_UNFUNDED_CHANNEL = 12; + CHANNEL_STATE_CHANGE_REASON_KIND_FUNDING_BATCH_CLOSURE = 13; + CHANNEL_STATE_CHANGE_REASON_KIND_HTLCS_TIMED_OUT = 14; + CHANNEL_STATE_CHANGE_REASON_KIND_PEER_FEERATE_TOO_LOW = 15; +} + +message CounterpartyForceClosedDetails { + string peer_msg = 1; +} + +message HolderForceClosedDetails { + optional bool broadcasted_latest_txn = 1; + string message = 2; +} + +message ProcessingErrorDetails { + string err = 1; +} + +message HtlcsTimedOutDetails { + optional string payment_hash = 1; +} + +message PeerFeerateTooLowDetails { + uint32 peer_feerate_sat_per_kw = 1; + uint32 required_feerate_sat_per_kw = 2; +} + +message ChannelStateChangeReason { + ChannelStateChangeReasonKind kind = 1; + string message = 2; + oneof details { + CounterpartyForceClosedDetails counterparty_force_closed = 3; + HolderForceClosedDetails holder_force_closed = 4; + ProcessingErrorDetails processing_error = 5; + HtlcsTimedOutDetails htlcs_timed_out = 6; + PeerFeerateTooLowDetails peer_feerate_too_low = 7; + } +} + +message ChannelStateChanged { + string channel_id = 1; + string user_channel_id = 2; + optional string counterparty_node_id = 3; + ChannelState state = 4; + optional string funding_txo = 5; + optional ChannelStateChangeReason reason = 6; + ChannelClosureInitiator closure_initiator = 7; +} + // PaymentReceived indicates a payment has been received. message PaymentReceived { // The payment details for the payment in event. diff --git a/ldk-server/src/main.rs b/ldk-server/src/main.rs index 30807541..9088dbdd 100644 --- a/ldk-server/src/main.rs +++ b/ldk-server/src/main.rs @@ -12,6 +12,7 @@ mod io; mod service; mod util; +use std::collections::HashSet; use std::fs; use std::os::unix::fs::PermissionsExt; use std::path::{Path, PathBuf}; @@ -27,6 +28,7 @@ use hyper_util::rt::{TokioExecutor, TokioIo}; use ldk_node::bitcoin::Network; use ldk_node::config::Config; use ldk_node::entropy::NodeEntropy; +use ldk_node::lightning::events::ClosureReason; use ldk_node::lightning::ln::channelmanager::PaymentId; use ldk_node::{Builder, Event, Node}; use ldk_server_grpc::events; @@ -271,6 +273,11 @@ fn main() { } }; let event_node = Arc::clone(&node); + let mut ready_channel_ids: HashSet = event_node + .list_channels() + .into_iter() + .map(|channel| channel.channel_id.0.to_lower_hex_string()) + .collect(); let metrics: Option> = if config_file.metrics_enabled { let poll_metrics_interval = Duration::from_secs(config_file.poll_metrics_interval.unwrap_or(60)); @@ -323,38 +330,110 @@ fn main() { loop { select! { - event = event_node.next_event_async() => { - match event { - Event::ChannelPending { channel_id, counterparty_node_id, .. } => { - info!( - "CHANNEL_PENDING: {} from counterparty {}", - channel_id, counterparty_node_id - ); - if let Err(e) = event_node.event_handled() { - error!("Failed to mark event as handled: {e}"); - } - }, - Event::ChannelReady { channel_id, counterparty_node_id, .. } => { - info!( - "CHANNEL_READY: {} from counterparty {:?}", - channel_id, counterparty_node_id - ); - if let Err(e) = event_node.event_handled() { - error!("Failed to mark event as handled: {e}"); - } + event = event_node.next_event_async() => { + match event { + Event::ChannelPending { + channel_id, + user_channel_id, + counterparty_node_id, + funding_txo, + .. + } => { + info!( + "CHANNEL_PENDING: {} from counterparty {}", + channel_id, counterparty_node_id + ); + + send_channel_state_event( + event_envelope::Event::ChannelStateChanged(events::ChannelStateChanged { + channel_id: channel_id.0.to_lower_hex_string(), + user_channel_id: user_channel_id.0.to_string(), + counterparty_node_id: Some(counterparty_node_id.to_string()), + state: events::ChannelState::Pending.into(), + funding_txo: Some(funding_txo.to_string()), + reason: None, + closure_initiator: events::ChannelClosureInitiator::Unspecified.into(), + }), + &event_sender, + ); + + if let Err(e) = event_node.event_handled() { + error!("Failed to mark event as handled: {e}"); + } + }, + Event::ChannelReady { + channel_id, + user_channel_id, + counterparty_node_id, + funding_txo, + } => { + info!( + "CHANNEL_READY: {} from counterparty {:?}", + channel_id, counterparty_node_id + ); + + let channel_id_hex = channel_id.0.to_lower_hex_string(); + ready_channel_ids.insert(channel_id_hex.clone()); + + send_channel_state_event( + event_envelope::Event::ChannelStateChanged(events::ChannelStateChanged { + channel_id: channel_id_hex, + user_channel_id: user_channel_id.0.to_string(), + counterparty_node_id: counterparty_node_id + .map(|node_id| node_id.to_string()), + state: events::ChannelState::Ready.into(), + funding_txo: funding_txo.map(|outpoint| outpoint.to_string()), + reason: None, + closure_initiator: events::ChannelClosureInitiator::Unspecified.into(), + }), + &event_sender, + ); + + if let Err(e) = event_node.event_handled() { + error!("Failed to mark event as handled: {e}"); + } if let Some(metrics) = &metrics { metrics.update_channels_count(false); } }, - Event::ChannelClosed { channel_id, counterparty_node_id, .. } => { - info!( - "CHANNEL_CLOSED: {} from counterparty {:?}", - channel_id, counterparty_node_id - ); - if let Err(e) = event_node.event_handled() { - error!("Failed to mark event as handled: {e}"); - } + Event::ChannelClosed { + channel_id, + user_channel_id, + counterparty_node_id, + reason, + } => { + info!( + "CHANNEL_CLOSED: {} from counterparty {:?}", + channel_id, counterparty_node_id + ); + + let channel_id_hex = channel_id.0.to_lower_hex_string(); + let was_ready = ready_channel_ids.remove(&channel_id_hex); + let reason_ref = reason.as_ref(); + let is_open_failure = !was_ready && is_channel_open_failure(reason_ref); + + send_channel_state_event( + event_envelope::Event::ChannelStateChanged(events::ChannelStateChanged { + channel_id: channel_id_hex, + user_channel_id: user_channel_id.0.to_string(), + counterparty_node_id: counterparty_node_id + .map(|node_id| node_id.to_string()), + state: if is_open_failure { + events::ChannelState::OpenFailed.into() + } else { + events::ChannelState::Closed.into() + }, + funding_txo: None, + reason: reason_ref.map(closure_reason_to_proto), + closure_initiator: closure_initiator_from_reason(reason_ref).into(), + }), + &event_sender, + ); + + if let Err(e) = event_node.event_handled() { + error!("Failed to mark event as handled: {e}"); + } if let Some(metrics) = &metrics { metrics.update_channels_count(true); @@ -555,6 +634,131 @@ fn send_event_and_upsert_payment( } } +fn send_channel_state_event( + event: event_envelope::Event, event_sender: &broadcast::Sender, +) { + if let Err(e) = event_sender.send(EventEnvelope { event: Some(event) }) { + debug!("No event subscribers connected, skipping event: {e}"); + } +} + +fn is_channel_open_failure(reason: Option<&ClosureReason>) -> bool { + matches!( + reason, + Some(ClosureReason::FundingTimedOut) + | Some(ClosureReason::DisconnectedPeer) + | Some(ClosureReason::CounterpartyCoopClosedUnfundedChannel) + | Some(ClosureReason::LocallyCoopClosedUnfundedChannel) + | Some(ClosureReason::FundingBatchClosure) + ) +} + +fn closure_initiator_from_reason( + reason: Option<&ClosureReason>, +) -> events::ChannelClosureInitiator { + match reason { + Some(ClosureReason::HolderForceClosed { .. }) + | Some(ClosureReason::LocallyInitiatedCooperativeClosure) + | Some(ClosureReason::LocallyCoopClosedUnfundedChannel) => events::ChannelClosureInitiator::Local, + Some(ClosureReason::CounterpartyForceClosed { .. }) + | Some(ClosureReason::CounterpartyInitiatedCooperativeClosure) + | Some(ClosureReason::CounterpartyCoopClosedUnfundedChannel) => { + events::ChannelClosureInitiator::Remote + }, + Some(_) => events::ChannelClosureInitiator::Unknown, + None => events::ChannelClosureInitiator::Unspecified, + } +} + +fn closure_reason_to_proto(reason: &ClosureReason) -> events::ChannelStateChangeReason { + events::ChannelStateChangeReason { + kind: closure_reason_kind(reason).into(), + message: reason.to_string(), + details: closure_reason_details(reason), + } +} + +fn closure_reason_kind(reason: &ClosureReason) -> events::ChannelStateChangeReasonKind { + match reason { + ClosureReason::CounterpartyForceClosed { .. } => { + events::ChannelStateChangeReasonKind::CounterpartyForceClosed + }, + ClosureReason::HolderForceClosed { .. } => { + events::ChannelStateChangeReasonKind::HolderForceClosed + }, + ClosureReason::LegacyCooperativeClosure => { + events::ChannelStateChangeReasonKind::LegacyCooperativeClosure + }, + ClosureReason::CounterpartyInitiatedCooperativeClosure => { + events::ChannelStateChangeReasonKind::CounterpartyInitiatedCooperativeClosure + }, + ClosureReason::LocallyInitiatedCooperativeClosure => { + events::ChannelStateChangeReasonKind::LocallyInitiatedCooperativeClosure + }, + ClosureReason::CommitmentTxConfirmed => { + events::ChannelStateChangeReasonKind::CommitmentTxConfirmed + }, + ClosureReason::FundingTimedOut => events::ChannelStateChangeReasonKind::FundingTimedOut, + ClosureReason::ProcessingError { .. } => { + events::ChannelStateChangeReasonKind::ProcessingError + }, + ClosureReason::DisconnectedPeer => events::ChannelStateChangeReasonKind::DisconnectedPeer, + ClosureReason::OutdatedChannelManager => { + events::ChannelStateChangeReasonKind::OutdatedChannelManager + }, + ClosureReason::CounterpartyCoopClosedUnfundedChannel => { + events::ChannelStateChangeReasonKind::CounterpartyCoopClosedUnfundedChannel + }, + ClosureReason::LocallyCoopClosedUnfundedChannel => { + events::ChannelStateChangeReasonKind::LocallyCoopClosedUnfundedChannel + }, + ClosureReason::FundingBatchClosure => { + events::ChannelStateChangeReasonKind::FundingBatchClosure + }, + ClosureReason::HTLCsTimedOut { .. } => events::ChannelStateChangeReasonKind::HtlcsTimedOut, + ClosureReason::PeerFeerateTooLow { .. } => { + events::ChannelStateChangeReasonKind::PeerFeerateTooLow + }, + } +} + +fn closure_reason_details( + reason: &ClosureReason, +) -> Option { + use events::channel_state_change_reason::Details; + + match reason { + ClosureReason::CounterpartyForceClosed { peer_msg } => { + Some(Details::CounterpartyForceClosed(events::CounterpartyForceClosedDetails { + peer_msg: peer_msg.to_string(), + })) + }, + ClosureReason::HolderForceClosed { + broadcasted_latest_txn, + message: force_close_message, + } => Some(Details::HolderForceClosed(events::HolderForceClosedDetails { + broadcasted_latest_txn: *broadcasted_latest_txn, + message: force_close_message.clone(), + })), + ClosureReason::ProcessingError { err } => { + Some(Details::ProcessingError(events::ProcessingErrorDetails { err: err.clone() })) + }, + ClosureReason::HTLCsTimedOut { payment_hash } => { + Some(Details::HtlcsTimedOut(events::HtlcsTimedOutDetails { + payment_hash: payment_hash.map(|hash| hash.to_string()), + })) + }, + ClosureReason::PeerFeerateTooLow { + peer_feerate_sat_per_kw, + required_feerate_sat_per_kw, + } => Some(Details::PeerFeerateTooLow(events::PeerFeerateTooLowDetails { + peer_feerate_sat_per_kw: *peer_feerate_sat_per_kw, + required_feerate_sat_per_kw: *required_feerate_sat_per_kw, + })), + _ => None, + } +} + fn upsert_payment_details( event_node: &Node, paginated_store: Arc, payment: &Payment, ) { @@ -606,3 +810,104 @@ fn load_or_generate_api_key(storage_dir: &Path) -> std::io::Result { Ok(key_bytes.to_lower_hex_string()) } } + +#[cfg(test)] +mod tests { + use super::*; + use ldk_server_grpc::events::channel_state_change_reason::Details; + + #[test] + fn test_is_channel_open_failure_classification() { + assert!(is_channel_open_failure(Some(&ClosureReason::FundingTimedOut))); + assert!(is_channel_open_failure(Some(&ClosureReason::DisconnectedPeer))); + assert!(is_channel_open_failure(Some(&ClosureReason::FundingBatchClosure))); + assert!(is_channel_open_failure(Some( + &ClosureReason::CounterpartyCoopClosedUnfundedChannel, + ))); + assert!(is_channel_open_failure(Some(&ClosureReason::LocallyCoopClosedUnfundedChannel,))); + + assert!(!is_channel_open_failure(Some(&ClosureReason::CommitmentTxConfirmed))); + assert!(!is_channel_open_failure(None)); + } + + #[test] + fn test_closure_initiator_mapping() { + assert_eq!( + closure_initiator_from_reason(Some(&ClosureReason::HolderForceClosed { + broadcasted_latest_txn: Some(true), + message: "local close".to_string(), + })), + events::ChannelClosureInitiator::Local + ); + assert_eq!( + closure_initiator_from_reason( + Some(&ClosureReason::LocallyInitiatedCooperativeClosure,) + ), + events::ChannelClosureInitiator::Local + ); + + assert_eq!( + closure_initiator_from_reason(Some( + &ClosureReason::CounterpartyInitiatedCooperativeClosure, + )), + events::ChannelClosureInitiator::Remote + ); + assert_eq!( + closure_initiator_from_reason(Some( + &ClosureReason::CounterpartyCoopClosedUnfundedChannel, + )), + events::ChannelClosureInitiator::Remote + ); + + assert_eq!( + closure_initiator_from_reason(Some(&ClosureReason::CommitmentTxConfirmed)), + events::ChannelClosureInitiator::Unknown + ); + assert_eq!( + closure_initiator_from_reason(None), + events::ChannelClosureInitiator::Unspecified + ); + } + + #[test] + fn test_closure_reason_to_proto_holder_force_closed_details() { + let proto = closure_reason_to_proto(&ClosureReason::HolderForceClosed { + broadcasted_latest_txn: Some(false), + message: "manual force close".to_string(), + }); + + assert_eq!(proto.kind, events::ChannelStateChangeReasonKind::HolderForceClosed as i32); + assert!(proto.message.contains("manual force close")); + match proto.details { + Some(Details::HolderForceClosed(details)) => { + assert_eq!(details.broadcasted_latest_txn, Some(false)); + assert_eq!(details.message, "manual force close"); + }, + other => panic!("expected HolderForceClosed details, got {other:?}"), + } + } + + #[test] + fn test_closure_reason_to_proto_peer_feerate_details() { + let proto = closure_reason_to_proto(&ClosureReason::PeerFeerateTooLow { + peer_feerate_sat_per_kw: 100, + required_feerate_sat_per_kw: 250, + }); + + assert_eq!(proto.kind, events::ChannelStateChangeReasonKind::PeerFeerateTooLow as i32); + match proto.details { + Some(Details::PeerFeerateTooLow(details)) => { + assert_eq!(details.peer_feerate_sat_per_kw, 100); + assert_eq!(details.required_feerate_sat_per_kw, 250); + }, + other => panic!("expected PeerFeerateTooLow details, got {other:?}"), + } + } + + #[test] + fn test_closure_reason_to_proto_without_details() { + let proto = closure_reason_to_proto(&ClosureReason::FundingTimedOut); + assert_eq!(proto.kind, events::ChannelStateChangeReasonKind::FundingTimedOut as i32); + assert!(proto.details.is_none()); + } +}