Skip to content

Commit

Permalink
Fix Conflict for NonBlocking Interface in Reactor Integration (#6037)
Browse files Browse the repository at this point in the history
Motivation:
The `reactor.core.scheduler.NonBlocking` interface was introduced to `armeria-core` in #1665 to make `Schedulers.isInNonBlockingThread()` return `true` for an Armeria `EventLoop`. However, a conflict arises when building Java modules because the `NonBlocking` interface clashes with Reactor's own definition.

Modifications:
- Moved `NonBlocking` to `com.linecorp.armeria.common` to resolve the module conflict while retaining its utility for identifying non-blocking threads.
- Updated to call `Schedulers.registerNonBlockingThreadPredicate` if Reactor is available in the classpath.
- Adjusted `CoreBlockHoundIntegration` to update the non-blocking thread predicate.

Result:
- Resolved the Java module conflict with the `NonBlocking` interface.
  • Loading branch information
minwoox authored Jan 2, 2025
1 parent d3ecb50 commit a7f09af
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 18 deletions.
5 changes: 3 additions & 2 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ dependencies {
// JUnit Pioneer
testImplementation libs.junit.pioneer

// Reactor for registering EventLoop as a non blocking thread.
optionalImplementation libs.reactor.core

// Reactive Streams
api libs.reactivestreams
testImplementation libs.reactivestreams.tck
Expand Down Expand Up @@ -217,8 +220,6 @@ if (tasks.findByName('trimShadedJar')) {
tasks.trimShadedJar.configure {
// Keep all classes under com.linecorp.armeria, except the internal ones.
keep "class !com.linecorp.armeria.internal.shaded.**,com.linecorp.armeria.** { *; }"
// Keep the 'NonBlocking' tag interface.
keep "class reactor.core.scheduler.NonBlocking { *; }"
// Do not optimize the dependencies that access some fields via sun.misc.Unsafe or reflection only.
keep "class com.linecorp.armeria.internal.shaded.caffeine.** { *; }"
keep "class com.linecorp.armeria.internal.shaded.jctools.** { *; }"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.common.FlagsProvider;
import com.linecorp.armeria.common.NonBlocking;
import com.linecorp.armeria.common.Scheme;
import com.linecorp.armeria.common.SerializationFormat;
import com.linecorp.armeria.common.SessionProtocol;
Expand All @@ -49,7 +50,6 @@
import io.micrometer.core.instrument.MeterRegistry;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import reactor.core.scheduler.NonBlocking;

/**
* Creates and manages clients.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.linecorp.armeria.client.proxy.Socks4ProxyConfig;
import com.linecorp.armeria.client.proxy.Socks5ProxyConfig;
import com.linecorp.armeria.common.ClosedSessionException;
import com.linecorp.armeria.common.NonBlocking;
import com.linecorp.armeria.common.SerializationFormat;
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.annotation.Nullable;
Expand Down Expand Up @@ -76,7 +77,6 @@
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import reactor.core.scheduler.NonBlocking;

final class HttpChannelPool implements AsyncCloseable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.linecorp.armeria.client.proxy.ProxyConfigSelector;
import com.linecorp.armeria.client.redirect.RedirectConfig;
import com.linecorp.armeria.common.Http1HeaderNaming;
import com.linecorp.armeria.common.NonBlocking;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.Scheme;
import com.linecorp.armeria.common.SerializationFormat;
Expand Down Expand Up @@ -71,7 +72,6 @@
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.concurrent.FutureListener;
import reactor.core.scheduler.NonBlocking;

/**
* A {@link ClientFactory} that creates an HTTP client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ public final class CommonPools {
MoreMeterBinders
.eventLoopMetrics(WORKER_GROUP, new MeterIdPrefix("armeria.netty.common"))
.bindTo(Flags.meterRegistry());

try {
Class.forName("reactor.core.scheduler.Schedulers",
true, CommonPools.class.getClassLoader());
ReactorNonBlockingUtil.registerEventLoopAsNonBlocking();
} catch (ClassNotFoundException e) {
// Do nothing.
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
public final class CoreBlockHoundIntegration implements BlockHoundIntegration {
@Override
public void applyTo(Builder builder) {
builder.nonBlockingThreadPredicate(predicate -> predicate.or(NonBlocking.class::isInstance));

// short locks
builder.allowBlockingCallsInside("com.linecorp.armeria.client.HttpClientFactory",
"pool");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,18 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package reactor.core.scheduler;
package com.linecorp.armeria.common;

/**
* A dummy interface that makes Project Reactor recognize Armeria's event loop threads as non-blocking.
* An interface that indicates a non-blocking thread. You can use this interface to check if the current
* thread is a non-blocking thread. For example:
* <pre>{@code
* if (Thread.currentThread() instanceof NonBlocking) {
* // Avoid blocking operations.
* closeable.closeAsync();
* } else {
* closeable.close();
* }
* }</pre>
*/
public interface NonBlocking {}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 LINE Corporation
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
Expand All @@ -13,11 +13,15 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.armeria.common;

/**
* Provides a dummy interface that makes Project Reactor recognize Armeria's event loop threads as non-blocking.
*/
@NonNullByDefault
package reactor.core.scheduler;
import reactor.core.scheduler.Schedulers;

final class ReactorNonBlockingUtil {

static void registerEventLoopAsNonBlocking() {
Schedulers.registerNonBlockingThreadPredicate(NonBlocking.class::isInstance);
}

import com.linecorp.armeria.common.annotation.NonNullByDefault;
private ReactorNonBlockingUtil() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@
import com.google.common.collect.MapMaker;

import com.linecorp.armeria.common.Flags;
import com.linecorp.armeria.common.NonBlocking;
import com.linecorp.armeria.common.annotation.Nullable;

import reactor.core.scheduler.NonBlocking;

/**
* A {@link CompletableFuture} that warns the user if they call a method that blocks the event loop.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
*/
package com.linecorp.armeria.internal.common.util;

import com.linecorp.armeria.common.NonBlocking;
import com.linecorp.armeria.common.annotation.Nullable;

import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.FastThreadLocalThread;
import reactor.core.scheduler.NonBlocking;

/**
* An event loop thread with support for {@link TemporaryThreadLocals}, Netty {@link FastThreadLocal} and
* Project Reactor {@link NonBlocking}.
* {@link NonBlocking} interface.
*/
public final class EventLoopThread extends FastThreadLocalThread implements NonBlocking {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.armeria.common.reactor3;

import static org.assertj.core.api.Assertions.assertThat;

import org.junit.jupiter.api.Test;

import com.linecorp.armeria.common.CommonPools;

import io.netty.util.concurrent.Future;
import reactor.core.scheduler.Schedulers;

final class EventLoopNonBlockingTest {

/**
* Verifies that the current thread is registered a non-blocking thread via {@code ReactorNonBlockingUtil}.
*/
@Test
void checkEventLoopNonBlocking() throws Exception {
final Future<Boolean> submit = CommonPools.workerGroup().submit(Schedulers::isInNonBlockingThread);
assertThat(submit.get()).isTrue();
}
}

0 comments on commit a7f09af

Please sign in to comment.