Skip to content

Commit

Permalink
Merge branch 'main' into feature/create-disk-memory-health-checker
Browse files Browse the repository at this point in the history
  • Loading branch information
ikhoon authored Jul 25, 2024
2 parents 8ce5606 + c7b2a44 commit f4c53c1
Show file tree
Hide file tree
Showing 207 changed files with 2,424 additions and 662 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private static ServiceConfig newServiceConfig(Route route) {
final Path multipartUploadsLocation = Flags.defaultMultipartUploadsLocation();
final ServiceErrorHandler serviceErrorHandler = ServerErrorHandler.ofDefault().asServiceErrorHandler();
return new ServiceConfig(route, route,
SERVICE, defaultLogName, defaultServiceName, defaultServiceNaming, 0, 0,
SERVICE, defaultServiceName, defaultServiceNaming, defaultLogName, 0, 0,
false, AccessLogWriter.disabled(), CommonPools.blockingTaskExecutor(),
SuccessFunction.always(), 0, multipartUploadsLocation,
MultipartRemovalStrategy.ON_RESPONSE_COMPLETION,
Expand Down
22 changes: 22 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ plugins {
alias libs.plugins.kotlin apply false
alias libs.plugins.ktlint apply false
alias libs.plugins.errorprone apply false
alias libs.plugins.nullaway apply false
}

allprojects {
Expand Down Expand Up @@ -171,13 +172,34 @@ configure(projectsWithFlags('java')) {
// Error Prone compiler
if (!rootProject.hasProperty('noLint')) {
apply plugin: 'net.ltgt.errorprone'
apply plugin: 'net.ltgt.nullaway'

dependencies {
errorprone libs.errorprone.core
errorprone libs.nullaway
}

nullaway {
annotatedPackages.add("com.linecorp.armeria")
}

tasks.withType(JavaCompile) {
options.errorprone.excludedPaths = '.*/gen-src/.*'
options.errorprone.nullaway {
if (name.toLowerCase().contains("test")) {
// Disable NullAway for tests for now.
disable()
} else if (name.matches(/compileJava[0-9]+.*/)) {
// Disable MR-JAR classes which seem to confuse NullAway and break the build.
disable()
} else if (project != project(':core')) {
// TODO(trustin): Enable NullAway for all projects once we fix all violations.
warn()
} else {
error()
assertsEnabled = true
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.proxy.ProxyConnectException;

abstract class AbstractHttpRequestHandler implements ChannelFutureListener {

Expand Down Expand Up @@ -216,6 +215,7 @@ RequestHeaders mergedRequestHeaders(RequestHeaders headers) {
* {@link Channel#flush()} when each write unit is done.
*/
final void writeHeaders(RequestHeaders headers, boolean needs100Continue) {
assert session != null;
final SessionProtocol protocol = session.protocol();
assert protocol != null;
if (needs100Continue) {
Expand Down Expand Up @@ -377,8 +377,7 @@ final void failAndReset(Throwable cause) {
session.markUnacquirable();
}

if (cause instanceof ProxyConnectException || cause instanceof ResponseCompleteException) {
// - ProxyConnectException is handled by HttpSessionHandler.exceptionCaught().
if (cause instanceof ResponseCompleteException) {
// - ResponseCompleteException means the response is successfully received.
state = State.DONE;
cancel();
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/com/linecorp/armeria/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.linecorp.armeria.common.Response;
import com.linecorp.armeria.common.RpcRequest;
import com.linecorp.armeria.common.RpcResponse;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.util.Unwrappable;

/**
Expand Down Expand Up @@ -71,6 +72,7 @@ public interface Client<I extends Request, O extends Response> extends Unwrappab
* @see ClientFactory#unwrap(Object, Class)
* @see Unwrappable
*/
@Nullable
@Override
default <T> T as(Class<T> type) {
requireNonNull(type, "type");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,26 +52,31 @@ public ClientRequestContext newDerivedContext(RequestId id, @Nullable HttpReques
return unwrap().newDerivedContext(id, req, rpcReq, endpoint);
}

@Nullable
@Override
public EndpointGroup endpointGroup() {
return unwrap().endpointGroup();
}

@Nullable
@Override
public Endpoint endpoint() {
return unwrap().endpoint();
}

@Nullable
@Override
public String fragment() {
return unwrap().fragment();
}

@Nullable
@Override
public String authority() {
return unwrap().authority();
}

@Nullable
@Override
public String host() {
return unwrap().host();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,13 @@ public Object newClient(ClientBuilderParams params) {
return unwrap().newClient(params);
}

@Nullable
@Override
public <T> ClientBuilderParams clientBuilderParams(T client) {
return unwrap().clientBuilderParams(client);
}

@Nullable
@Override
public <T> T unwrap(Object client, Class<T> type) {
return unwrap().unwrap(client, type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ public Object newClient(ClientBuilderParams params) {
"No ClientFactory for scheme: " + scheme + " matched clientType: " + clientType);
}

@Nullable
@Override
public <T> T unwrap(Object client, Class<T> type) {
final T params = ClientFactory.super.unwrap(client, type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public void cache(DnsQuestion question, UnknownHostException cause) {
}
}

@Nullable
@Override
public List<DnsRecord> get(DnsQuestion question) throws UnknownHostException {
requireNonNull(question, "question");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public long maxResponseLength() {
return maxResponseLength;
}

@Nullable
@Override
public Long requestAutoAbortDelayMillis() {
return requestAutoAbortDelayMillis;
Expand All @@ -78,6 +79,7 @@ public Map<AttributeKey<?>, Object> attrs() {
return attributeMap;
}

@Nullable
@Override
public ExchangeType exchangeType() {
return exchangeType;
Expand Down
33 changes: 30 additions & 3 deletions core/src/main/java/com/linecorp/armeria/client/Endpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;

import javax.annotation.Nonnull;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -323,6 +325,7 @@ public EndpointSelectionStrategy selectionStrategy() {
return EndpointSelectionStrategy.weightedRoundRobin();
}

@Nonnull
@Override
public Endpoint selectNow(ClientRequestContext ctx) {
return this;
Expand Down Expand Up @@ -693,16 +696,40 @@ public <T> Endpoint withAttr(AttributeKey<T> key, @Nullable T value) {
if (value == null) {
return this;
}
return withAttrs(Attributes.of(key, value));
return replaceAttrs(Attributes.of(key, value));
}

if (attributes.attr(key) == value) {
return this;
} else {
final AttributesBuilder attributesBuilder = attributes.toBuilder();
attributesBuilder.set(key, value);
return withAttrs(attributesBuilder.build());
return replaceAttrs(attributesBuilder.build());
}
}

/**
* Returns a new {@link Endpoint} with the specified {@link Attributes}.
* Note that the {@link #attrs()} of this {@link Endpoint} is merged with the specified
* {@link Attributes}. For attributes with the same {@link AttributeKey}, the attribute
* in {@param newAttributes} has higher precedence.
*/
@UnstableApi
@SuppressWarnings("unchecked")
public Endpoint withAttrs(Attributes newAttributes) {
requireNonNull(newAttributes, "newAttributes");
if (newAttributes.isEmpty()) {
return this;
}
if (attrs().isEmpty()) {
return replaceAttrs(newAttributes);
}
final AttributesBuilder builder = attrs().toBuilder();
newAttributes.attrs().forEachRemaining(entry -> {
final AttributeKey<Object> key = (AttributeKey<Object>) entry.getKey();
builder.set(key, entry.getValue());
});
return new Endpoint(type, host, ipAddr, port, weight, builder.build());
}

/**
Expand All @@ -711,7 +738,7 @@ public <T> Endpoint withAttr(AttributeKey<T> key, @Nullable T value) {
* {@link Attributes}.
*/
@UnstableApi
public Endpoint withAttrs(Attributes newAttributes) {
public Endpoint replaceAttrs(Attributes newAttributes) {
requireNonNull(newAttributes, "newAttributes");
if (attrs().isEmpty() && newAttributes.isEmpty()) {
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public CompletableFuture<T> execute() {

return response.exceptionally(cause -> {
cause = Exceptions.peel(cause);
assert errorHandler != null;
final Object maybeRecovered = errorHandler.apply(cause);
if (maybeRecovered instanceof Throwable) {
// The cause was translated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,12 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
final InetSocketAddress proxyAddress = haProxyConfig.proxyAddress();
assert proxyAddress != null;
promise.addListener(f -> {
final ChannelPromise connectionPromise = ctx.newPromise();
ctx.connect(proxyAddress, localAddress, connectionPromise);
connectionPromise.addListener(f -> {
if (!f.isSuccess()) {
promise.tryFailure(wrapException(f.cause()));
ctx.close();
return;
}
try {
Expand All @@ -71,20 +75,20 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
ctx.pipeline().remove(HAProxyMessageEncoder.INSTANCE);
final ProxyConnectionEvent event = new ProxyConnectionEvent(
PROTOCOL, AUTH, proxyAddress, remoteAddress);
promise.trySuccess();
ctx.pipeline().fireUserEventTriggered(event);
} else {
ctx.fireExceptionCaught(wrapException(f0.cause()));
promise.tryFailure(wrapException(f0.cause()));
ctx.close();
}
});
} catch (Exception e) {
ctx.pipeline().fireUserEventTriggered(wrapException(e));
promise.tryFailure(wrapException(e));
ctx.close();
} finally {
ctx.pipeline().remove(this);
}
});
super.connect(ctx, proxyAddress, localAddress, promise);
}

private static ProxyConnectException wrapException(Throwable e) {
Expand Down Expand Up @@ -123,4 +127,3 @@ private static Inet6Address translateToInet6(InetAddress inetAddress) {
return NetUtil.getByName(inetAddress.getHostAddress());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,12 @@ final class HttpChannelPool implements AsyncCloseable {
SessionProtocol.H2, SessionProtocol.H2C));
pendingAcquisitions = newEnumMap(httpAndHttpsValues());
allChannels = new IdentityHashMap<>();
connectTimeoutMillis = (Integer) clientFactory.options()
.channelOptions()
.get(ChannelOption.CONNECT_TIMEOUT_MILLIS);
final Integer connectTimeoutMillisBoxed =
(Integer) clientFactory.options()
.channelOptions()
.get(ChannelOption.CONNECT_TIMEOUT_MILLIS);
assert connectTimeoutMillisBoxed != null;
connectTimeoutMillis = connectTimeoutMillisBoxed;
bootstraps = new Bootstraps(clientFactory, eventLoop, sslCtxHttp1Or2, sslCtxHttp1Only);
}

Expand Down Expand Up @@ -828,6 +831,7 @@ private void handlePiggyback(SessionProtocol desiredProtocol,

switch (result) {
case SUCCESS:
assert pch != null;
timingsBuilder.pendingAcquisitionEnd();
childPromise.complete(pch);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ final class HttpClientPipelineConfigurator extends ChannelDuplexHandler {
.responseTimeoutMillis(0)
.maxResponseLength(UPGRADE_RESPONSE_MAX_LENGTH).build();

private static final RequestTarget REQ_TARGET_ASTERISK;

static {
final RequestTarget asterisk = RequestTarget.forClient("*");
assert asterisk != null;
REQ_TARGET_ASTERISK = asterisk;
}

private enum HttpPreference {
HTTP1_REQUIRED,
HTTP2_PREFERRED,
Expand Down Expand Up @@ -215,7 +223,7 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, Sock
* See <a href="https://http2.github.io/http2-spec/#discover-https">HTTP/2 specification</a>.
*/
private void configureAsHttps(Channel ch, SocketAddress remoteAddr) {
assert isHttps();
assert sslCtx != null;

final ChannelPipeline p = ch.pipeline();
final SSLEngine sslEngine;
Expand Down Expand Up @@ -564,7 +572,7 @@ public void onComplete() {}
final DefaultClientRequestContext reqCtx = new DefaultClientRequestContext(
ctx.channel().eventLoop(), Flags.meterRegistry(), H1C, RequestId.random(),
com.linecorp.armeria.common.HttpMethod.OPTIONS,
RequestTarget.forClient("*"), ClientOptions.of(),
REQ_TARGET_ASTERISK, ClientOptions.of(),
HttpRequest.of(com.linecorp.armeria.common.HttpMethod.OPTIONS, "*"),
null, REQUEST_OPTIONS_FOR_UPGRADE_REQUEST, CancellationScheduler.noop(),
System.nanoTime(), SystemInfo.currentTimeMicros());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,9 @@ private void cancelTimeoutOrLog(@Nullable Throwable cause, boolean cancel) {
}

final StringBuilder logMsg = new StringBuilder("Unexpected exception while closing a request");
final String authority = ctx.request().authority();
final HttpRequest request = ctx.request();
assert request != null;
final String authority = request.authority();
if (authority != null) {
logMsg.append(" to ").append(authority);
}
Expand All @@ -311,7 +313,9 @@ public boolean canSchedule() {
@Override
public void run(Throwable cause) {
delegate.close(cause);
ctx.request().abort(cause);
final HttpRequest request = ctx.request();
assert request != null;
request.abort(cause);
ctx.logBuilder().endResponse(cause);
}
};
Expand Down
Loading

0 comments on commit f4c53c1

Please sign in to comment.