-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bolt threads not properly closing after graph.run or graph.execute #106
Comments
The following code should near instantly duplicate the issue.
|
@DataFrogman what system are you running on? And what are the versions of Neo4j and neo4rs? How are you connecting to Neo4j? Are you running in release mode? So far, I cannot reproduce this on a mac. I get a few I tested a slightly modified version of your code: use std::sync::{atomic::AtomicUsize, Arc};
use neo4rs::*;
use tokio::sync::Semaphore;
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn concurrency() -> Result<()> {
let uri = "bolt://127.0.0.1:7687";
let user = "neo4j";
let password = "neo4j";
let config = ConfigBuilder::default()
.uri(uri)
.user(user)
.password(password)
.max_connections(10240)
.build()
.unwrap();
let graph = Arc::new(Graph::connect(config).await.unwrap());
let semaphore = Arc::new(Semaphore::new(1024));
let mut acc: usize = 0;
let connections = Arc::new(AtomicUsize::new(0));
let successes = Arc::new(AtomicUsize::new(0));
let errors = Arc::new(AtomicUsize::new(0));
tokio::spawn({
let c = connections.clone();
let s = successes.clone();
let e = errors.clone();
async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
println!(
"live connections: {} successes: {} errors: {}",
c.load(std::sync::atomic::Ordering::Relaxed),
s.load(std::sync::atomic::Ordering::Relaxed),
e.load(std::sync::atomic::Ordering::Relaxed),
);
}
}
});
loop {
let permit = semaphore.clone();
let _permit = permit.acquire_owned().await.unwrap();
let connections = connections.clone();
let successes = successes.clone();
let errors = errors.clone();
let cloned_graph = graph.clone();
let cloned_acc = acc.to_string();
tokio::spawn(async move {
connections.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let temp = cloned_graph
.run(neo4rs::query(&format!(
"MERGE (n:Num {{num: '{cloned_acc}'}});"
)))
.await;
match temp {
Ok(_) => {
successes.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
Err(err) => {
match err {
Error::UnexpectedMessage(msg)
| Error::UnknownMessage(msg)
| Error::AuthenticationError(msg) => {
println!("error: {}", msg);
}
_ => {}
};
errors.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
connections.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
std::mem::drop(_permit);
});
acc += 1;
}
} and I ran this from
and I let it running for 10-something minutes with the following output
Are you running Neo4j in docker, perhaps? I can imagine that a different network stack due to the os or network connection can have an influence on the behavior |
I have an async program that runs
graph.execute
andgraph.run
calls. I am using a semaphore to limit the number of concurrent tasks at once. My neo4j server is set to have 100x the number of concurrent tasks for its thread pool maximum and I am setting my max_connections for the ConfigBuilder at 10x the number of concurrent tasks. When I run agraph.execute
orgraph.run
I make sure toawait
on them, so each task should only have one live connection at a time.Despite this I very quickly hit
AuthenticationError("There are no available threads to serve this request at the moment. You can retry at a later time or consider increasing max thread pool size for bolt connector(s).")
orUnexpectedMessage("unexpected response for RUN: Ok(Failure(Failure { metadata: BoltMap { value: {BoltString { value: \"code\" }: String(BoltString { value: \"Neo.TransientError.Request.NoThreadsAvailable\" }), BoltString { value: \"message\" }: String(BoltString { value: \"There are no available threads to serve this request at the moment. You can retry at a later time or consider increasing max thread pool size for bolt connector(s).\" })} } }))")
. The higher my number of concurrent tasks the faster I will hit these errors, this leads me to believe that there is some lag time with closing the connections despite theawait
statements. At 1024 concurrent tasks it takes about 15 seconds, at 512 concurrent tasks it occurs within a couple of minutes.The text was updated successfully, but these errors were encountered: