Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/api-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ See [Pagination](#pagination) below for how to page through results.

| RPC | Description |
|-------------------|-------------------------------------------------------------|
| `SubscribeEvents` | **Server-streaming.** Subscribe to real-time payment events |
| `SubscribeEvents` | **Server-streaming.** Subscribe to real-time payment and channel events |

`SubscribeEvents` returns a stream of `EventEnvelope` messages. Each envelope contains one of:

Expand All @@ -199,6 +199,7 @@ See [Pagination](#pagination) below for how to page through results.
| `PaymentFailed` | An outbound payment failed |
| `PaymentClaimable` | A hodl invoice payment arrived and is waiting to be claimed or failed |
| `PaymentForwarded` | A payment was routed through this node |
| `ChannelStateChanged` | A channel changed state (pending, ready, open failed, closed) |

Events are broadcast to all connected subscribers. The server uses a bounded broadcast channel
(capacity 1024). A slow subscriber that falls behind will miss events.
Expand Down
161 changes: 158 additions & 3 deletions e2e-tests/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use std::time::Duration;

use e2e_tests::{
find_available_port, mine_and_sync, run_cli, run_cli_raw, setup_funded_channel,
wait_for_onchain_balance, LdkServerConfig, LdkServerHandle, TestBitcoind,
wait_for_onchain_balance, wait_for_usable_channel, LdkServerConfig, LdkServerHandle,
TestBitcoind,
};
use hex_conservative::{DisplayHex, FromHex};
use ldk_node::bitcoin::hashes::{sha256, Hash};
Expand All @@ -21,10 +22,10 @@ use ldk_node::lightning::offers::offer::Offer;
use ldk_node::lightning_invoice::Bolt11Invoice;
use ldk_server_client::client::EventStream;
use ldk_server_client::ldk_server_grpc::api::{
Bolt11ReceiveRequest, Bolt12ReceiveRequest, OnchainReceiveRequest,
Bolt11ReceiveRequest, Bolt12ReceiveRequest, OnchainReceiveRequest, OpenChannelRequest,
};
use ldk_server_client::ldk_server_grpc::events::event_envelope::Event;
use ldk_server_client::ldk_server_grpc::events::EventEnvelope;
use ldk_server_client::ldk_server_grpc::events::{ChannelState, EventEnvelope};
use ldk_server_client::ldk_server_grpc::types::{
bolt11_invoice_description, Bolt11InvoiceDescription,
};
Expand Down Expand Up @@ -410,6 +411,160 @@ async fn test_cli_open_channel() {
assert!(!output["user_channel_id"].as_str().unwrap().is_empty());
}

#[tokio::test]
async fn test_subscribe_events_channel_state_lifecycle_pending_ready_closed() {
let bitcoind = TestBitcoind::new();
let server_a = LdkServerHandle::start(&bitcoind).await;
let server_b = LdkServerHandle::start(&bitcoind).await;

let addr_a = server_a.client().onchain_receive(OnchainReceiveRequest {}).await.unwrap().address;
let addr_b = server_b.client().onchain_receive(OnchainReceiveRequest {}).await.unwrap().address;
bitcoind.fund_address(&addr_a, 1.0);
bitcoind.fund_address(&addr_b, 0.1);
mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await;
wait_for_onchain_balance(server_a.client(), Duration::from_secs(30)).await;
wait_for_onchain_balance(server_b.client(), Duration::from_secs(30)).await;

let mut events_a = server_a.client().subscribe_events().await.unwrap();

let open_resp = server_a
.client()
.open_channel(OpenChannelRequest {
node_pubkey: server_b.node_id().to_string(),
address: format!("127.0.0.1:{}", server_b.p2p_port),
channel_amount_sats: 100_000,
push_to_counterparty_msat: None,
channel_config: None,
announce_channel: true,
disable_counterparty_reserve: false,
})
.await
.unwrap();

let pending = wait_for_event(&mut events_a, |e| {
matches!(
e,
Event::ChannelStateChanged(channel_event)
if channel_event.user_channel_id == open_resp.user_channel_id
&& channel_event.state == ChannelState::Pending as i32
)
})
.await;
assert!(matches!(pending.event, Some(Event::ChannelStateChanged(_))));

mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await;
wait_for_usable_channel(server_a.client(), &bitcoind, Duration::from_secs(60)).await;

let ready = wait_for_event(&mut events_a, |e| {
matches!(
e,
Event::ChannelStateChanged(channel_event)
if channel_event.user_channel_id == open_resp.user_channel_id
&& channel_event.state == ChannelState::Ready as i32
)
})
.await;
assert!(matches!(ready.event, Some(Event::ChannelStateChanged(_))));

run_cli(&server_a, &["close-channel", &open_resp.user_channel_id, server_b.node_id()]);
mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await;

let closed = wait_for_event(&mut events_a, |e| {
matches!(
e,
Event::ChannelStateChanged(channel_event)
if channel_event.user_channel_id == open_resp.user_channel_id
&& channel_event.state == ChannelState::Closed as i32
)
})
.await;

match closed.event {
Some(Event::ChannelStateChanged(channel_event)) => {
assert_eq!(channel_event.user_channel_id, open_resp.user_channel_id);
assert_eq!(channel_event.state, ChannelState::Closed as i32);
},
other => panic!("expected ChannelStateChanged event, got {other:?}"),
}
}

#[tokio::test]
async fn test_subscribe_events_channel_state_lifecycle_pending_ready_force_closed() {
let bitcoind = TestBitcoind::new();
let server_a = LdkServerHandle::start(&bitcoind).await;
let server_b = LdkServerHandle::start(&bitcoind).await;

let addr_a = server_a.client().onchain_receive(OnchainReceiveRequest {}).await.unwrap().address;
let addr_b = server_b.client().onchain_receive(OnchainReceiveRequest {}).await.unwrap().address;
bitcoind.fund_address(&addr_a, 1.0);
bitcoind.fund_address(&addr_b, 0.1);
mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await;
wait_for_onchain_balance(server_a.client(), Duration::from_secs(30)).await;
wait_for_onchain_balance(server_b.client(), Duration::from_secs(30)).await;

let mut events_a = server_a.client().subscribe_events().await.unwrap();

let open_resp = server_a
.client()
.open_channel(OpenChannelRequest {
node_pubkey: server_b.node_id().to_string(),
address: format!("127.0.0.1:{}", server_b.p2p_port),
channel_amount_sats: 100_000,
push_to_counterparty_msat: None,
channel_config: None,
announce_channel: true,
disable_counterparty_reserve: false,
})
.await
.unwrap();

let pending = wait_for_event(&mut events_a, |e| {
matches!(
e,
Event::ChannelStateChanged(channel_event)
if channel_event.user_channel_id == open_resp.user_channel_id
&& channel_event.state == ChannelState::Pending as i32
)
})
.await;
assert!(matches!(pending.event, Some(Event::ChannelStateChanged(_))));

mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await;
wait_for_usable_channel(server_a.client(), &bitcoind, Duration::from_secs(60)).await;

let ready = wait_for_event(&mut events_a, |e| {
matches!(
e,
Event::ChannelStateChanged(channel_event)
if channel_event.user_channel_id == open_resp.user_channel_id
&& channel_event.state == ChannelState::Ready as i32
)
})
.await;
assert!(matches!(ready.event, Some(Event::ChannelStateChanged(_))));

run_cli(&server_a, &["force-close-channel", &open_resp.user_channel_id, server_b.node_id()]);
mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await;

let closed = wait_for_event(&mut events_a, |e| {
matches!(
e,
Event::ChannelStateChanged(channel_event)
if channel_event.user_channel_id == open_resp.user_channel_id
&& channel_event.state == ChannelState::Closed as i32
)
})
.await;

match closed.event {
Some(Event::ChannelStateChanged(channel_event)) => {
assert_eq!(channel_event.user_channel_id, open_resp.user_channel_id);
assert_eq!(channel_event.state, ChannelState::Closed as i32);
},
other => panic!("expected ChannelStateChanged event, got {other:?}"),
}
}

#[tokio::test]
async fn test_cli_list_channels() {
let bitcoind = TestBitcoind::new();
Expand Down
35 changes: 34 additions & 1 deletion ldk-server-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ The client handles HMAC-SHA256 authentication automatically. Pass the hex-encode

## Event Streaming

Subscribe to real-time payment events:
Subscribe to real-time payment and channel events:

```rust,no_run
# use ldk_server_client::client::LdkServerClient;
Expand All @@ -52,6 +52,39 @@ while let Some(result) = stream.next_message().await {
# }
```

Pattern-match channel state changes:

```rust,no_run
# use ldk_server_client::client::LdkServerClient;
# use ldk_server_client::ldk_server_grpc::events::{event_envelope, ChannelState};
# #[tokio::main]
# async fn main() {
# let cert_pem = std::fs::read("/path/to/tls.crt").unwrap();
# let client = LdkServerClient::new("localhost:3536".to_string(), "key".to_string(), &cert_pem).unwrap();
let mut stream = client.subscribe_events().await.unwrap();
while let Some(result) = stream.next_message().await {
match result {
Ok(event) => {
if let Some(event_envelope::Event::ChannelStateChanged(channel_event)) = event.event {
let state = ChannelState::from_i32(channel_event.state)
.unwrap_or(ChannelState::Unspecified);
println!(
"channel {} -> {}",
channel_event.channel_id,
state.as_str_name()
);

if let Some(reason) = channel_event.reason {
println!("reason: {}", reason.message);
}
}
}
Err(e) => eprintln!("Error: {}", e),
}
}
# }
```

## Features

- **`serde`**: Enables `serde::Serialize` and `serde::Deserialize` on all proto types
Expand Down
Loading