Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.ssl.SslContext;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import org.asynchttpclient.channel.ChannelPool;
Expand All @@ -37,6 +38,7 @@
import org.jetbrains.annotations.Nullable;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -375,6 +377,26 @@ default boolean isHttp2CleartextEnabled() {
@Nullable
EventLoopGroup getEventLoopGroup();

/**
* Return the {@link AddressResolverGroup} used for asynchronous DNS resolution.
* <p>
* When non-null, this resolver group is used for hostname resolution instead of
* the per-request {@link io.netty.resolver.NameResolver}. For example,
* Netty's {@link io.netty.resolver.dns.DnsAddressResolverGroup} provides
* non-blocking DNS lookups with inflight coalescing across concurrent requests for
* the same hostname.
* <p>
* By default this returns {@code null}, preserving the legacy per-request name
* resolver behavior. Set via
* {@link DefaultAsyncHttpClientConfig.Builder#setAddressResolverGroup(AddressResolverGroup)}.
*
* @return the {@link AddressResolverGroup} or {@code null} to use per-request resolvers
*/
@Nullable
default AddressResolverGroup<InetSocketAddress> getAddressResolverGroup() {
return null;
}

boolean isUseNativeTransport();

boolean isUseOnlyEpollNativeTransport();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.ssl.SslContext;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.Timer;
import org.asynchttpclient.channel.ChannelPool;
import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
Expand All @@ -36,6 +37,7 @@
import org.asynchttpclient.util.ProxyUtils;
import org.jetbrains.annotations.Nullable;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -200,6 +202,7 @@ public class DefaultAsyncHttpClientConfig implements AsyncHttpClientConfig {
private final int chunkedFileChunkSize;
private final Map<ChannelOption<Object>, Object> channelOptions;
private final @Nullable EventLoopGroup eventLoopGroup;
private final @Nullable AddressResolverGroup<InetSocketAddress> addressResolverGroup;
private final boolean useNativeTransport;
private final boolean useOnlyEpollNativeTransport;
private final @Nullable ByteBufAllocator allocator;
Expand Down Expand Up @@ -305,6 +308,7 @@ private DefaultAsyncHttpClientConfig(// http
int webSocketMaxFrameSize,
Map<ChannelOption<Object>, Object> channelOptions,
@Nullable EventLoopGroup eventLoopGroup,
@Nullable AddressResolverGroup<InetSocketAddress> addressResolverGroup,
boolean useNativeTransport,
boolean useOnlyEpollNativeTransport,
@Nullable ByteBufAllocator allocator,
Expand Down Expand Up @@ -406,6 +410,7 @@ private DefaultAsyncHttpClientConfig(// http
this.chunkedFileChunkSize = chunkedFileChunkSize;
this.channelOptions = channelOptions;
this.eventLoopGroup = eventLoopGroup;
this.addressResolverGroup = addressResolverGroup;
this.useNativeTransport = useNativeTransport;
this.useOnlyEpollNativeTransport = useOnlyEpollNativeTransport;

Expand Down Expand Up @@ -806,6 +811,11 @@ public Map<ChannelOption<Object>, Object> getChannelOptions() {
return eventLoopGroup;
}

@Override
public @Nullable AddressResolverGroup<InetSocketAddress> getAddressResolverGroup() {
return addressResolverGroup;
}

@Override
public boolean isUseNativeTransport() {
return useNativeTransport;
Expand Down Expand Up @@ -959,6 +969,7 @@ public static class Builder {
private @Nullable ByteBufAllocator allocator;
private final Map<ChannelOption<Object>, Object> channelOptions = new HashMap<>();
private @Nullable EventLoopGroup eventLoopGroup;
private @Nullable AddressResolverGroup<InetSocketAddress> addressResolverGroup;
private @Nullable Timer nettyTimer;
private @Nullable ThreadFactory threadFactory;
private @Nullable Consumer<Channel> httpAdditionalChannelInitializer;
Expand Down Expand Up @@ -1061,6 +1072,7 @@ public Builder(AsyncHttpClientConfig config) {
chunkedFileChunkSize = config.getChunkedFileChunkSize();
channelOptions.putAll(config.getChannelOptions());
eventLoopGroup = config.getEventLoopGroup();
addressResolverGroup = config.getAddressResolverGroup();
useNativeTransport = config.isUseNativeTransport();
useOnlyEpollNativeTransport = config.isUseOnlyEpollNativeTransport();

Expand Down Expand Up @@ -1514,6 +1526,25 @@ public Builder setEventLoopGroup(EventLoopGroup eventLoopGroup) {
return this;
}

/**
* Set a custom {@link AddressResolverGroup} for asynchronous DNS resolution.
* <p>
* When set, this resolver group is used instead of the per-request {@link io.netty.resolver.NameResolver}.
* Pass {@code null} (the default) to use per-request resolvers (legacy behavior).
* <p>
* <b>Lifecycle:</b> The client takes ownership of the provided resolver group and will
* {@linkplain AddressResolverGroup#close() close} it when the client is shut down.
* Do not pass a shared resolver group that is used by other clients unless you manage
* its lifecycle independently.
*
* @param addressResolverGroup the resolver group, or {@code null} to use per-request resolvers
* @return the same builder instance
*/
public Builder setAddressResolverGroup(@Nullable AddressResolverGroup<InetSocketAddress> addressResolverGroup) {
this.addressResolverGroup = addressResolverGroup;
return this;
}

public Builder setUseNativeTransport(boolean useNativeTransport) {
this.useNativeTransport = useNativeTransport;
return this;
Expand Down Expand Up @@ -1650,6 +1681,7 @@ public DefaultAsyncHttpClientConfig build() {
webSocketMaxFrameSize,
channelOptions.isEmpty() ? Collections.emptyMap() : Collections.unmodifiableMap(channelOptions),
eventLoopGroup,
addressResolverGroup,
useNativeTransport,
useOnlyEpollNativeTransport,
allocator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.resolver.AddressResolver;
import io.netty.resolver.AddressResolverGroup;
import io.netty.resolver.NameResolver;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
Expand Down Expand Up @@ -82,6 +84,7 @@
import org.asynchttpclient.proxy.ProxyServer;
import org.asynchttpclient.proxy.ProxyType;
import org.asynchttpclient.uri.Uri;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -122,6 +125,7 @@ public class ChannelManager {
private final Bootstrap httpBootstrap;
private final Bootstrap wsBootstrap;
private final long handshakeTimeout;
private final @Nullable AddressResolverGroup<InetSocketAddress> addressResolverGroup;

private final ChannelPool channelPool;
private final ChannelGroup openChannels;
Expand Down Expand Up @@ -193,6 +197,9 @@ public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) {

httpBootstrap = newBootstrap(transportFactory, eventLoopGroup, config);
wsBootstrap = newBootstrap(transportFactory, eventLoopGroup, config);

// Use the address resolver group from config if provided; otherwise null (legacy per-request resolution)
addressResolverGroup = config.getAddressResolverGroup();
}

private static TransportFactory<? extends Channel, ? extends EventLoopGroup> getNativeTransportFactory(AsyncHttpClientConfig config) {
Expand Down Expand Up @@ -412,6 +419,11 @@ private void doClose() {
}

public void close() {
// Close the resolver group first while the EventLoopGroup is still active,
// since Netty DNS resolvers may need a live EventLoop for clean shutdown.
if (addressResolverGroup != null) {
addressResolverGroup.close();
}
if (allowReleaseEventLoopGroup) {
final long shutdownQuietPeriod = config.getShutdownQuietPeriod().toMillis();
final long shutdownTimeout = config.getShutdownTimeout().toMillis();
Expand Down Expand Up @@ -579,39 +591,27 @@ public Future<Bootstrap> getBootstrap(Uri uri, NameResolver<InetAddress> nameRes
Bootstrap socksBootstrap = httpBootstrap.clone();
ChannelHandler httpBootstrapHandler = socksBootstrap.config().handler();

nameResolver.resolve(proxy.getHost()).addListener((Future<InetAddress> whenProxyAddress) -> {
if (whenProxyAddress.isSuccess()) {
socksBootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast(httpBootstrapHandler);

InetSocketAddress proxyAddress = new InetSocketAddress(whenProxyAddress.get(), proxy.getPort());
Realm realm = proxy.getRealm();
String username = realm != null ? realm.getPrincipal() : null;
String password = realm != null ? realm.getPassword() : null;
ProxyHandler socksProxyHandler;
switch (proxy.getProxyType()) {
case SOCKS_V4:
socksProxyHandler = new Socks4ProxyHandler(proxyAddress, username);
break;

case SOCKS_V5:
socksProxyHandler = new Socks5ProxyHandler(proxyAddress, username, password);
break;

default:
throw new IllegalArgumentException("Only SOCKS4 and SOCKS5 supported at the moment.");
}
channel.pipeline().addFirst(SOCKS_HANDLER, socksProxyHandler);
}
});
promise.setSuccess(socksBootstrap);

} else {
promise.setFailure(whenProxyAddress.cause());
}
});
if (addressResolverGroup != null) {
// Use the address resolver group for async, non-blocking proxy host resolution
InetSocketAddress unresolvedProxyAddress = InetSocketAddress.createUnresolved(proxy.getHost(), proxy.getPort());
AddressResolver<InetSocketAddress> resolver = addressResolverGroup.getResolver(eventLoopGroup.next());
resolver.resolve(unresolvedProxyAddress).addListener((Future<InetSocketAddress> whenProxyAddress) -> {
if (whenProxyAddress.isSuccess()) {
configureSocksBootstrap(socksBootstrap, httpBootstrapHandler, whenProxyAddress.get(), proxy, promise);
} else {
promise.setFailure(whenProxyAddress.cause());
}
});
} else {
nameResolver.resolve(proxy.getHost()).addListener((Future<InetAddress> whenProxyAddress) -> {
if (whenProxyAddress.isSuccess()) {
InetSocketAddress proxyAddress = new InetSocketAddress(whenProxyAddress.get(), proxy.getPort());
configureSocksBootstrap(socksBootstrap, httpBootstrapHandler, proxyAddress, proxy, promise);
} else {
promise.setFailure(whenProxyAddress.cause());
}
});
}

} else if (proxy != null && ProxyType.HTTPS.equals(proxy.getProxyType())) {
// For HTTPS proxies, use HTTP bootstrap but ensure SSL connection to proxy
Expand All @@ -624,6 +624,35 @@ protected void initChannel(Channel channel) throws Exception {
return promise;
}

private void configureSocksBootstrap(Bootstrap socksBootstrap, ChannelHandler httpBootstrapHandler,
InetSocketAddress proxyAddress, ProxyServer proxy, Promise<Bootstrap> promise) {
socksBootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast(httpBootstrapHandler);

Realm realm = proxy.getRealm();
String username = realm != null ? realm.getPrincipal() : null;
String password = realm != null ? realm.getPassword() : null;
ProxyHandler socksProxyHandler;
switch (proxy.getProxyType()) {
case SOCKS_V4:
socksProxyHandler = new Socks4ProxyHandler(proxyAddress, username);
break;

case SOCKS_V5:
socksProxyHandler = new Socks5ProxyHandler(proxyAddress, username, password);
break;

default:
throw new IllegalArgumentException("Only SOCKS4 and SOCKS5 supported at the moment.");
}
channel.pipeline().addFirst(SOCKS_HANDLER, socksProxyHandler);
}
});
promise.setSuccess(socksBootstrap);
}

/**
* Checks whether the given channel is an HTTP/2 connection (i.e. has the HTTP/2 multiplex handler installed).
*/
Expand Down Expand Up @@ -790,6 +819,14 @@ public EventLoopGroup getEventLoopGroup() {
return eventLoopGroup;
}

/**
* Return the {@link AddressResolverGroup} used for async DNS resolution, or {@code null}
* if per-request name resolvers should be used (legacy behavior).
*/
public @Nullable AddressResolverGroup<InetSocketAddress> getAddressResolverGroup() {
return addressResolverGroup;
}

public ClientStats getClientStats() {
Map<String, Long> totalConnectionsPerHost = openChannels.stream()
.map(Channel::remoteAddress)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
import io.netty.resolver.AddressResolver;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timer;
import io.netty.util.concurrent.Future;
Expand Down Expand Up @@ -72,6 +74,7 @@
import org.asynchttpclient.resolver.RequestHostnameResolver;
import org.asynchttpclient.uri.Uri;
import org.asynchttpclient.ws.WebSocketUpgradeHandler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -374,7 +377,7 @@ private <T> Future<List<InetSocketAddress>> resolveAddresses(Request request, Pr
int port = ProxyType.HTTPS.equals(proxy.getProxyType()) || uri.isSecured() ? proxy.getSecuredPort() : proxy.getPort();
InetSocketAddress unresolvedRemoteAddress = InetSocketAddress.createUnresolved(proxy.getHost(), port);
scheduleRequestTimeout(future, unresolvedRemoteAddress);
return RequestHostnameResolver.INSTANCE.resolve(request.getNameResolver(), unresolvedRemoteAddress, asyncHandler);
return resolveHostname(request, unresolvedRemoteAddress, asyncHandler);
} else {
int port = uri.getExplicitPort();

Expand All @@ -385,10 +388,18 @@ private <T> Future<List<InetSocketAddress>> resolveAddresses(Request request, Pr
// bypass resolution
InetSocketAddress inetSocketAddress = new InetSocketAddress(request.getAddress(), port);
return promise.setSuccess(singletonList(inetSocketAddress));
} else {
return RequestHostnameResolver.INSTANCE.resolve(request.getNameResolver(), unresolvedRemoteAddress, asyncHandler);
}
return resolveHostname(request, unresolvedRemoteAddress, asyncHandler);
}
}

private Future<List<InetSocketAddress>> resolveHostname(Request request, InetSocketAddress unresolvedRemoteAddress, AsyncHandler<?> asyncHandler) {
AddressResolverGroup<InetSocketAddress> group = channelManager.getAddressResolverGroup();
if (group != null) {
AddressResolver<InetSocketAddress> resolver = group.getResolver(channelManager.getEventLoopGroup().next());
return RequestHostnameResolver.INSTANCE.resolve(resolver, unresolvedRemoteAddress, asyncHandler);
}
return RequestHostnameResolver.INSTANCE.resolve(request.getNameResolver(), unresolvedRemoteAddress, asyncHandler);
}

private <T> NettyResponseFuture<T> newNettyResponseFuture(Request request, AsyncHandler<T> asyncHandler, NettyRequest nettyRequest, ProxyServer proxyServer) {
Expand Down
Loading