Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Wisp] remove steal lock #60

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/linux/classes/com/alibaba/wisp/engine/WispScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import sun.misc.SharedSecrets;
import sun.misc.UnsafeAccess;

import java.dyn.CoroutineSupport;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
Expand Down Expand Up @@ -138,6 +139,9 @@ public void run() {
WispScheduler.this.engine.carrierEngines.add(carrier);
WispEngine.registerPerfCounter(carrier);
WispSysmon.INSTANCE.register(carrier);
// For Wisp2, the carrier may not create any coroutine, but it can steal coroutine from
// other carrier. then there are some coroutine in this carrier.
CoroutineSupport.markAsCarrier();
runCarrier(carrier);
} finally {
WispEngine.carrierThreads.remove(thread);
Expand Down
2 changes: 1 addition & 1 deletion src/share/classes/java/dyn/Coroutine.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public Coroutine(Runnable target, long stacksize) {

public static void yieldTo(Coroutine target) {
JavaLangAccess jla = SharedSecrets.getJavaLangAccess();
jla.getCoroutineSupport(jla.currentThread0()).symmetricYieldTo(target);
jla.getCoroutineSupport(jla.currentThread0()).unsafeSymmetricYieldTo(target);
}

/**
Expand Down
6 changes: 0 additions & 6 deletions src/share/classes/java/dyn/CoroutineBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,13 @@ public abstract class CoroutineBase {
private final void startInternal() {
assert threadSupport.getThread() == SharedSecrets.getJavaLangAccess().currentThread0();
try {
// When we symmetricYieldTo a newly created coroutine,
// we'll expect the new coroutine release lock as soon as possible
threadSupport.beforeResume(this);
run();
} catch (Throwable t) {
if (!(t instanceof CoroutineExitException)) {
t.printStackTrace();
}
} finally {
finished = true;
// threadSupport is fixed by steal()
threadSupport.beforeResume(this);

threadSupport.terminateCoroutine(null);
}
assert threadSupport.getThread() == SharedSecrets.getJavaLangAccess().currentThread0();
Expand Down
185 changes: 24 additions & 161 deletions src/share/classes/java/dyn/CoroutineSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,13 @@
import sun.misc.Contended;
import sun.misc.JavaLangAccess;
import sun.misc.SharedSecrets;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

@Contended
public class CoroutineSupport {

private static final boolean CHECK_LOCK = true;
private static final int SPIN_BACKOFF_LIMIT = 2 << 8;

private final static AtomicInteger idGen = new AtomicInteger();

// The thread that this CoroutineSupport belongs to. There's only one CoroutineSupport per Thread
Expand All @@ -50,9 +46,6 @@ public class CoroutineSupport {
// The currently executing coroutine
private Coroutine currentCoroutine;

private volatile Thread lockOwner = null; // also protect double link list of JavaThread->coroutine_list()
private int lockRecursive; // volatile is not need

private final int id;
private boolean terminated = false;

Expand All @@ -78,12 +71,7 @@ public Coroutine threadCoroutine() {

void addCoroutine(Coroutine coroutine, long stacksize) {
assert currentCoroutine != null;
lock();
try {
coroutine.nativeCoroutine = createCoroutine(coroutine, stacksize);
} finally {
unlock();
}
coroutine.nativeCoroutine = createCoroutine(coroutine, stacksize);
}

Thread getThread() {
Expand All @@ -99,25 +87,16 @@ public void drain() {
throw new IllegalArgumentException("Cannot drain another threads CoroutineThreadSupport");
}

lock();
try {
// drain all coroutines
Coroutine next = null;
while ((next = getNextCoroutine(currentCoroutine.nativeCoroutine)) != currentCoroutine) {
symmetricExitInternal(next);
}

CoroutineBase coro;
while ((coro = cleanupCoroutine()) != null) {
System.out.println(coro);
throw new NotImplementedException();
}
} catch (Throwable t) {
t.printStackTrace();
} finally {
assert lockOwner == thread && lockRecursive == 0;
terminated = true;
unlock();
}
}

Expand All @@ -129,6 +108,7 @@ public void drain() {
* this function should only be called in
* {@link com.alibaba.wisp.engine.WispTask#switchTo(WispTask, WispTask, boolean)},
* we skipped unnecessary lock to improve performance.
*
* @param target
*/
public boolean unsafeSymmetricYieldTo(Coroutine target) {
Expand All @@ -138,37 +118,16 @@ public boolean unsafeSymmetricYieldTo(Coroutine target) {
final Coroutine current = currentCoroutine;
currentCoroutine = target;
switchTo(current, target);
//check if locked by exiting coroutine
beforeResume(current);
return true;
}

public void symmetricYieldTo(Coroutine target) {
lock();
if (target.threadSupport != this) {
unlock();
return;
}
moveCoroutine(currentCoroutine.nativeCoroutine, target.nativeCoroutine);
unlockLater(target);
unsafeSymmetricYieldTo(target);
}


public void symmetricStopCoroutine(Coroutine target) {
Coroutine current;
lock();
try {
if (target.threadSupport != this) {
unlock();
return;
}
current = currentCoroutine;
currentCoroutine = target;
moveCoroutine(current.nativeCoroutine, target.nativeCoroutine);
} finally {
unlock();
if (target.threadSupport != this) {
return;
}
current = currentCoroutine;
currentCoroutine = target;
switchToAndExit(current, target);
}

Expand All @@ -181,12 +140,9 @@ void symmetricExitInternal(Coroutine coroutine) {
assert coroutine.threadSupport == this;

if (!testDisposableAndTryReleaseStack(coroutine.nativeCoroutine)) {
moveCoroutine(currentCoroutine.nativeCoroutine, coroutine.nativeCoroutine);

final Coroutine current = currentCoroutine;
currentCoroutine = coroutine;
switchToAndExit(current, coroutine);
beforeResume(current);
}
}

Expand All @@ -198,14 +154,9 @@ public void terminateCoroutine(Coroutine target) {
assert currentCoroutine != getNextCoroutine(currentCoroutine.nativeCoroutine) :
"last coroutine shouldn't call coroutineexit";

lock();
Coroutine old = currentCoroutine;
Coroutine forward = target;
if (forward == null) {
forward = getNextCoroutine(old.nativeCoroutine);
}
Coroutine forward = threadCoroutine;
currentCoroutine = forward;
unlockLater(forward);
switchToAndTerminate(old, forward);

// should never run here.
Expand All @@ -216,8 +167,7 @@ public void terminateCoroutine(Coroutine target) {
* Steal coroutine from it's carrier thread to current thread.
*
* @param failOnContention steal fail if there's too much lock contention
*
* @param coroutine to be stolen
* @param coroutine to be stolen
*/
Coroutine.StealResult steal(Coroutine coroutine, boolean failOnContention) {
assert coroutine.threadSupport.threadCoroutine() != coroutine;
Expand All @@ -227,102 +177,17 @@ Coroutine.StealResult steal(Coroutine coroutine, boolean failOnContention) {

if (source == target) {
return Coroutine.StealResult.SUCCESS;
}

if (source.id < target.id) { // prevent dead lock
if (!source.lockInternal(failOnContention)) {
return Coroutine.StealResult.FAIL_BY_CONTENTION;
}
target.lock();
} else {
target.lock();
if (!source.lockInternal(failOnContention)) {
target.unlock();
return Coroutine.StealResult.FAIL_BY_CONTENTION;
}
}

try {
if (source.terminated || coroutine.finished ||
coroutine.threadSupport != source || // already been stolen
source.currentCoroutine == coroutine) {
return Coroutine.StealResult.FAIL_BY_STATUS;
}
if (!stealCoroutine(coroutine.nativeCoroutine)) { // native frame
return Coroutine.StealResult.FAIL_BY_NATIVE_FRAME;
}
coroutine.threadSupport = target;
} finally {
source.unlock();
target.unlock();
}

} else if (source.terminated || coroutine.finished ||
coroutine.threadSupport != source || // already been stolen
source.currentCoroutine == coroutine) {
return Coroutine.StealResult.FAIL_BY_STATUS;
} else if (!stealCoroutine(coroutine.nativeCoroutine)) { // native frame
return Coroutine.StealResult.FAIL_BY_NATIVE_FRAME;
}
coroutine.threadSupport = target;
return Coroutine.StealResult.SUCCESS;
}

/**
* Can not be stolen while executing this, because lock is held
*/
void beforeResume(CoroutineBase source) {
if (source.needsUnlock) {
source.needsUnlock = false;
source.threadSupport.unlock();
}
}

private void unlockLater(CoroutineBase next) {
if (CHECK_LOCK && next.needsUnlock) {
throw new InternalError("pending unlock");
}
next.needsUnlock = true;
}

private void lock() {
boolean success = lockInternal(false);
assert success;
}

private boolean lockInternal(boolean tryingLock) {
final Thread th = SharedSecrets.getJavaLangAccess().currentThread0();
if (lockOwner == th) {
lockRecursive++;
return true;
}
for (int spin = 1; ; ) {
if (lockOwner == null && LOCK_UPDATER.compareAndSet(this, null, th)) {
return true;
}
for (int i = 0; i < spin; ) {
i++;
}
if (spin == SPIN_BACKOFF_LIMIT) {
if (tryingLock) {
return false;
}
SharedSecrets.getJavaLangAccess().yield0(); // yield safepoint
} else { // back off
spin *= 2;
}
}
}

private void unlock() {
if (CHECK_LOCK && SharedSecrets.getJavaLangAccess().currentThread0() != lockOwner) {
throw new InternalError("unlock from non-owner thread");
}
if (lockRecursive > 0) {
lockRecursive--;
} else {
LOCK_UPDATER.lazySet(this, null);
}
}

private static final AtomicReferenceFieldUpdater<CoroutineSupport, Thread> LOCK_UPDATER;

static {
LOCK_UPDATER = AtomicReferenceFieldUpdater.newUpdater(CoroutineSupport.class, Thread.class, "lockOwner");
}

public boolean isCurrent(CoroutineBase coroutine) {
return coroutine == currentCoroutine;
}
Expand All @@ -331,7 +196,6 @@ public CoroutineBase getCurrent() {
return currentCoroutine;
}


private static native void registerNatives();

private static native long getNativeThreadCoroutine();
Expand All @@ -350,21 +214,16 @@ public CoroutineBase getCurrent() {

/**
* get next {@link Coroutine} from current thread's doubly linked {@link Coroutine} list
*
* @param coroPtr hotspot coroutine
* @return java Coroutine
*/
private static native Coroutine getNextCoroutine(long coroPtr);

/**
* move coroPtr to targetPtr's next field in underlying hotspot coroutine list
* @param coroPtr current threadCoroutine
* @param targetPtr coroutine that is about to exit
*/
private static native void moveCoroutine(long coroPtr, long targetPtr);

/**
* track hotspot couroutine with java coroutine.
* @param coroPtr threadCoroutine in hotspot
*
* @param coroPtr threadCoroutine in hotspot
* @param threadCoroutine threadCoroutine in java
*/
private static native void markThreadCoroutine(long coroPtr, CoroutineBase threadCoroutine);
Expand All @@ -379,9 +238,13 @@ public CoroutineBase getCurrent() {

/**
* this will turn on a safepoint to stop all threads.
*
* @param coroPtr coroutine pointer used in VM.
* @return target coroutine's stack
*/
public static native StackTraceElement[] getCoroutineStack(long coroPtr);

private static native boolean shouldThrowException0(long coroPtr);}
private static native boolean shouldThrowException0(long coroPtr);

public static native void markAsCarrier();
}