.NET: fix: Add session support for Handoff-hosted Agents#5280
.NET: fix: Add session support for Handoff-hosted Agents#5280
Conversation
In order to better support using `Workflows` hosted as `AIAgents` inside of Handoff workflows, we need to make proper use of AgentSession. This causes potential issues around checkpointing and making sure that we properly compute only the new incoming messages for each agent invocation.
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
This PR updates the .NET handoff orchestration to use shared state + bookmarking and to keep a single AgentSession per workflow session, improving checkpointing behavior and ensuring only new messages are sent to underlying agents.
Changes:
- Move handoff message accumulation into shared orchestration state (
HandoffSharedState+MultiPartyConversation). - Add bookmarking support so each agent processes only new messages per invocation.
- Introduce message filtering extraction and expand/adjust unit tests for handoff orchestration behavior.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/TestRunContext.cs | Adds a StateManager to test contexts and wires state operations to it. |
| dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SampleSmokeTest.cs | Updates Step12 expected-output computation to reflect bookmarking/session behavior. |
| dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/12_HandOff_HostAsAgent.cs | Minor formatting adjustment in the Step12 sample. |
| dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/HandoffAgentExecutorTests.cs | Initializes shared state for tests and adds a new HITL/subworkflow composition test. |
| dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/MultiPartyConversation.cs | New shared conversation buffer with bookmarking support. |
| dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffState.cs | Removes per-message payload from HandoffState (messages now live in shared state). |
| dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffStartExecutor.cs | Initializes/updates shared orchestration state and emits bookmark-driven HandoffState. |
| dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffMessagesFilter.cs | Extracts tool-call filtering logic into a dedicated type. |
| dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffEndExecutor.cs | Reads from shared state and yields the accumulated conversation at workflow end. |
| dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffAgentExecutor.cs | Adds agent-session use, bookmarking, and shared-state persistence for conversation messages. |
| dotnet/src/Microsoft.Agents.AI.Workflows/AIAgentsAbstractionsExtensions.cs | Broadens role-changing helper to accept IEnumerable<ChatMessage>. |
| private readonly List<ChatMessage> _history = []; | ||
| private readonly object _mutex = new(); | ||
|
|
||
| public IReadOnlyList<ChatMessage> AllMessages => this._history; |
There was a problem hiding this comment.
AllMessages returns the live backing List<ChatMessage> without holding a lock. Callers can enumerate it while another thread is mutating it (via AddMessage(s)), which can throw or produce torn reads. Return a snapshot under the lock (e.g., ChatMessage[]) or expose a method like GetAllMessagesSnapshot() that copies under lock.
| public IReadOnlyList<ChatMessage> AllMessages => this._history; | |
| public IReadOnlyList<ChatMessage> AllMessages | |
| { | |
| get | |
| { | |
| lock (this._mutex) | |
| { | |
| return this._history.ToArray(); | |
| } | |
| } | |
| } |
| public (ChatMessage[], int) CollectNewMessages(int bookmark) | ||
| { | ||
| lock (this._mutex) | ||
| { | ||
| int count = this._history.Count - bookmark; | ||
| if (count > 0) | ||
| { | ||
| return (this._history.Skip(bookmark).ToArray(), this.CurrentBookmark); | ||
| } | ||
|
|
||
| return ([], this.CurrentBookmark); | ||
| } | ||
| } |
There was a problem hiding this comment.
If bookmark is greater than _history.Count, count becomes negative and the method silently returns no messages. That can hide state corruption and cause missed messages. Consider explicitly handling out-of-range bookmarks (e.g., clamp bookmark into [0..Count] or throw an exception) so incorrect bookmarks are not silently ignored.
| // We haven't seen the corresponding function call yet, so add it as a candidate to be filtered later | ||
| filteringCandidates[frc.CallId] = new FilterCandidateState(frc.CallId) | ||
| { | ||
| FunctionCallResultLocation = (filteredMessages.Count, filteredMessage.Contents.Count), |
There was a problem hiding this comment.
When a FunctionResultContent is seen before its corresponding FunctionCallContent, this branch records FunctionCallResultLocation but does not add the tool-result content to filteredMessage.Contents. That usually means the tool message is never added to filteredMessages (contents count stays 0), so the stored location can point to a non-existent/wrong message index later and cause incorrect removal or an out-of-range access. Fix by preserving the tool result message until its call is classified (e.g., add the content/message and record the exact indices, then remove later if it becomes a handoff call), or buffer the pending tool result content in FilterCandidateState and insert/remove deterministically.
| // We haven't seen the corresponding function call yet, so add it as a candidate to be filtered later | |
| filteringCandidates[frc.CallId] = new FilterCandidateState(frc.CallId) | |
| { | |
| FunctionCallResultLocation = (filteredMessages.Count, filteredMessage.Contents.Count), | |
| // We haven't seen the corresponding function call yet, so preserve the tool result in the filtered | |
| // message and record its exact location so it can be removed later if the call is classified as a | |
| // handoff. | |
| filteredMessage.Contents.Add(content); | |
| filteringCandidates[frc.CallId] = new FilterCandidateState(frc.CallId) | |
| { | |
| FunctionCallResultLocation = (filteredMessages.Count, filteredMessage.Contents.Count - 1), |
| internal record HandoffAgentHostState( | ||
| HandoffState? IncomingState, | ||
| int ConversationBookmark, | ||
| AgentSession? AgentSession) |
There was a problem hiding this comment.
Storing AgentSession inside executor state is risky if that state is checkpointed/serialized (the PR motivation mentions checkpointing). AgentSession is likely not JSON-serializable and may capture non-serializable resources, causing checkpoint persistence or restore failures. Consider keeping AgentSession out of checkpointed state (e.g., a private in-memory cache keyed by workflow session id), or persist only a serializable session identifier and recreate the session on resume.
| AgentSession? AgentSession) | |
| [property: JsonIgnore] AgentSession? AgentSession) |
| state.TurnMessages.Add(new ChatMessage(ChatRole.User, [response]) | ||
| if (!state.IsTakingTurn) | ||
| { | ||
| throw new InvalidOperationException("Cannot process user responses in when not taking a turn in Handoff Orchestration."); |
There was a problem hiding this comment.
This exception message has a grammatical error ("in when") and is also reused for non-user-response paths (e.g., function results). Consider correcting it and tailoring it per handler, e.g. "Cannot process user responses when not taking a turn..." vs "Cannot process function results when not taking a turn...", to improve diagnosability.
| throw new InvalidOperationException("Cannot process user responses in when not taking a turn in Handoff Orchestration."); | |
| throw new InvalidOperationException("Cannot process user responses when not taking a turn in Handoff Orchestration."); |
| IEnumerable<ChatMessage> newConversationMessages = []; | ||
| int newConverationBookmark = 0; |
There was a problem hiding this comment.
Typo in local variable name: newConverationBookmark should be newConversationBookmark for clarity and consistency.
| cancellationToken: cancellationToken); | ||
|
|
||
| await foreach (AgentResponseUpdate update in agentStream.ConfigureAwait(false)) | ||
| { | ||
| await AddUpdateAsync(update, cancellationToken).ConfigureAwait(false); |
There was a problem hiding this comment.
Inside the InvokeWithStateAsync callback you receive ct, but RunStreamingAsync uses the outer cancellationToken variable. Use the callback token (ct) consistently for RunStreamingAsync (and other awaited operations inside the callback) to ensure cancellation is propagated correctly if the runtime supplies a different token.
| cancellationToken: cancellationToken); | |
| await foreach (AgentResponseUpdate update in agentStream.ConfigureAwait(false)) | |
| { | |
| await AddUpdateAsync(update, cancellationToken).ConfigureAwait(false); | |
| cancellationToken: ct); | |
| await foreach (AgentResponseUpdate update in agentStream.ConfigureAwait(false)) | |
| { | |
| await AddUpdateAsync(update, ct).ConfigureAwait(false); |
| sharedState.PreviousAgentId = handoff.PreviousAgentId; | ||
| } | ||
|
|
||
| await context.YieldOutputAsync(sharedState.Conversation.AllMessages, cancellationToken).ConfigureAwait(false); |
There was a problem hiding this comment.
ConfigureProtocol() declares .YieldsOutput<List<ChatMessage>>(), but this yields IReadOnlyList<ChatMessage>. Even if the underlying instance is a List<ChatMessage>, the static type mismatch is confusing and may break consumers expecting an actual List<ChatMessage>. Consider yielding a concrete List<ChatMessage> snapshot (also avoids concurrent enumeration issues from sharing the live backing list).
| await context.YieldOutputAsync(sharedState.Conversation.AllMessages, cancellationToken).ConfigureAwait(false); | |
| await context.YieldOutputAsync(new List<ChatMessage>(sharedState.Conversation.AllMessages), cancellationToken).ConfigureAwait(false); |
| private readonly HashSet<int> _skipIndicies = new(); | ||
|
|
||
| public IEnumerable<string> ExpectedOutputs => | ||
| this._history.Where((element, index) => !this._skipIndicies.Contains(index)); | ||
|
|
||
| public void ProcessInput(string newInput) | ||
| { | ||
| this._skipIndicies.Add(this._history.Count); |
There was a problem hiding this comment.
Typo in identifier: _skipIndicies should be _skipIndices.
| private readonly HashSet<int> _skipIndicies = new(); | |
| public IEnumerable<string> ExpectedOutputs => | |
| this._history.Where((element, index) => !this._skipIndicies.Contains(index)); | |
| public void ProcessInput(string newInput) | |
| { | |
| this._skipIndicies.Add(this._history.Count); | |
| private readonly HashSet<int> _skipIndices = new(); | |
| public IEnumerable<string> ExpectedOutputs => | |
| this._history.Where((element, index) => !this._skipIndices.Contains(index)); | |
| public void ProcessInput(string newInput) | |
| { | |
| this._skipIndices.Add(this._history.Count); |
| Debug.Fail($"Expected request data to be of type {typeof(Challenge).FullName}, but was {requestData?.GetType().FullName ?? "null"}"); | ||
| return; // Unreachable, but analysis cannot infer that Debug.Fail will throw/exit, and UnreachableException is not available on net472 |
There was a problem hiding this comment.
This test can incorrectly pass in non-debug runs: Debug.Fail(...) does not necessarily fail the test, and the subsequent return exits without asserting. Replace this with a hard test failure (e.g., Assert.True(false, ...) / Assert.Fail(...) / throwing an exception) so the test fails reliably when challenge is null.
| Debug.Fail($"Expected request data to be of type {typeof(Challenge).FullName}, but was {requestData?.GetType().FullName ?? "null"}"); | |
| return; // Unreachable, but analysis cannot infer that Debug.Fail will throw/exit, and UnreachableException is not available on net472 | |
| throw new InvalidOperationException($"Expected request data to be of type {typeof(Challenge).FullName}, but was {requestData?.GetType().FullName ?? "null"}"); |
Motivation and Context
In order to better support using
Workflowshosted asAIAgentsinside of Handoff workflows, we need to make proper use of AgentSession. This causes potential issues around checkpointing and making sure that we properly compute only the new incoming messages for each agent invocation.Description
HandoffAgentExecutorto properly subselect only new messages when invoking the underlying agentContribution Checklist
[ ] Is this a breaking change? If yes, add "[BREAKING]" prefix to the title of the PR.