Skip to content

Commit

Permalink
Move peer discovery from "services" to "peers-clear"
Browse files Browse the repository at this point in the history
This is necessary to implement TLS as service only report the clear service port.
It will be easy to expand the "services_name" to return peers-tls
  • Loading branch information
geobeau committed May 10, 2022
1 parent 6ac1a06 commit 65c88ef
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 32 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ error-chain = "0.12"
parking_lot = "0.9"
pwhash = "0.3"
serde = { version = "1.0", features = ["derive"], optional = true }
logos = "0.12.0"

[features]
serialization = ["serde"]
Expand Down
1 change: 1 addition & 0 deletions src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod node;
pub mod node_validator;
pub mod partition;
pub mod partition_tokenizer;
pub mod peers;

use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering};
Expand Down
36 changes: 5 additions & 31 deletions src/cluster/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use crate::errors::{ErrorKind, Result, ResultExt};
use crate::net::{ConnectionPool, Host, PooledConnection};
use crate::policy::ClientPolicy;

use super::peers::parse_peers_info;

pub const PARTITIONS: usize = 4096;

#[derive(Debug)]
Expand Down Expand Up @@ -138,9 +140,9 @@ impl Node {

const fn services_name(&self) -> &'static str {
if self.client_policy.use_services_alternate {
"services-alternate"
"peers-clear-alt"
} else {
"services"
"peers-clear-std"
}
}

Expand Down Expand Up @@ -197,35 +199,7 @@ impl Node {
Some(friend_string) => friend_string,
};

let friend_names = friend_string.split(';');
for friend in friend_names {
let mut friend_info = friend.split(':');
if friend_info.clone().count() != 2 {
error!(
"Node info from asinfo:services is malformed. Expected HOST:PORT, but got \
'{}'",
friend
);
continue;
}

let host = friend_info.next().unwrap();
let port = u16::from_str(friend_info.next().unwrap())?;
let alias = match self.client_policy.ip_map {
Some(ref ip_map) if ip_map.contains_key(host) => {
Host::new(ip_map.get(host).unwrap(), port)
}
_ => Host::new(host, port),
};

if current_aliases.contains_key(&alias) {
self.reference_count.fetch_add(1, Ordering::Relaxed);
} else if !friends.contains(&alias) {
friends.push(alias);
}
}

Ok(friends)
parse_peers_info(friend_string)
}

fn update_partitions(&self, info_map: &HashMap<String, String>) -> Result<()> {
Expand Down
128 changes: 128 additions & 0 deletions src/cluster/peers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use crate::errors::{ErrorKind, Result};
use crate::net::{Host, ToHosts};
use logos::{Lexer, Logos};

#[derive(Logos, Debug, PartialEq)]
enum Token {
#[token("[")]
OpenBracket,

#[token("]")]
CloseBracket,

#[regex("[0-9a-zA-Z-./_: ]+")]
Text,

#[error]
#[regex(r"[,]+", logos::skip)]
Error,
}

fn parse_error(lex: &Lexer<Token>, source: &str) -> String {
format!(
"Failed to parse peers: {}, at {:?} ({})",
source,
lex.span(),
lex.slice()
)
}

pub fn parse_peers_info(info_peers: &str) -> Result<Vec<Host>> {
let mut lex = Token::lexer(info_peers);

let _peer_gen = match lex.next() {
Some(Token::Text) => lex.slice(),
_ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))),
};
let default_port_str = match lex.next() {
Some(Token::Text) => lex.slice(),
_ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))),
};

let default_port = match default_port_str.parse::<u16>() {
Ok(port) => port,
Err(_) => bail!(ErrorKind::BadResponse(format!(
"Invalid default port: {}",
default_port_str
))),
};

match lex.next() {
Some(Token::OpenBracket) => parse_peers(info_peers, &mut lex, default_port),
_ => Ok(Vec::new()),
}
}

fn parse_peers(info_peers: &str, lex: &mut Lexer<Token>, default_port: u16) -> Result<Vec<Host>> {
let mut peers = Vec::new();
loop {
match lex.next() {
Some(Token::OpenBracket) => peers.extend(parse_peer(info_peers, lex, default_port)?),
Some(Token::CloseBracket) => return Ok(peers),
_ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))),
}
lex.next(); // Close brackets
}
}

fn parse_peer(info_peers: &str, lex: &mut Lexer<Token>, default_port: u16) -> Result<Vec<Host>> {
let _id = match lex.next() {
Some(Token::Text) => lex.slice(),
_ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))),
};

let mut token = lex.next();
if Some(Token::Text) == token {
let _tls_hostname = lex.slice();
token = lex.next();
}

match token {
Some(Token::OpenBracket) => (),
_ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))),
};

let hosts = match lex.next() {
Some(Token::Text) => lex.slice(),
_ => bail!(ErrorKind::BadResponse(parse_error(&lex, info_peers))),
}
.to_hosts_with_default_port(default_port)?;

lex.next(); // Close brackets
Ok(hosts)
}

#[cfg(test)]
mod tests {
use std::vec;

use super::*;

#[test]
fn parse_peers_works() {
let work = "6,3000,[[12A0,aerospike.com,[1.2.3.4:4333]],[BB9040011AC4202,,[10.11.12.13]],[11A1,,[localhost]]]";
let fail = "6,3foobar,[[12A0,aerospike.com,[1.2.3.4:4333]],[11A1,,[10.11.12.13:4333]]]";
let empty = "6,3000,[]";
assert!(parse_peers_info(fail).is_err());
let work = parse_peers_info(work).unwrap();
println!("{:?}", work);
assert!(
work == vec![
Host {
name: "1.2.3.4".to_string(),
port: 4333
},
Host {
name: "10.11.12.13".to_string(),
port: 3000
},
Host {
name: "localhost".to_string(),
port: 3000
}
]
);
let empty = parse_peers_info(empty).unwrap();
assert!(empty == vec![]);
}
}
12 changes: 11 additions & 1 deletion src/net/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,24 @@ pub trait ToHosts {
///
/// Any errors encountered during conversion will be returned as an `Err`.
fn to_hosts(&self) -> Result<Vec<Host>>;
fn to_hosts_with_default_port(&self, default_port: u16) -> Result<Vec<Host>>;
}

impl ToHosts for Vec<Host> {
fn to_hosts(&self) -> Result<Vec<Host>> {
Ok(self.clone())
}
fn to_hosts_with_default_port(&self, default_port: u16) -> Result<Vec<Host>> {
Ok(self.clone())
}
}

impl ToHosts for String {
fn to_hosts(&self) -> Result<Vec<Host>> {
let mut parser = Parser::new(self, 3000);
self.to_hosts_with_default_port(3000)
}
fn to_hosts_with_default_port(&self, default_port: u16) -> Result<Vec<Host>> {
let mut parser = Parser::new(self, default_port);
parser
.read_hosts()
.chain_err(|| ErrorKind::InvalidArgument(format!("Invalid hosts list: '{}'", self)))
Expand All @@ -88,6 +95,9 @@ impl<'a> ToHosts for &'a str {
fn to_hosts(&self) -> Result<Vec<Host>> {
(*self).to_string().to_hosts()
}
fn to_hosts_with_default_port(&self, default_port: u16) -> Result<Vec<Host>> {
(*self).to_string().to_hosts_with_default_port(default_port)
}
}

#[cfg(test)]
Expand Down

0 comments on commit 65c88ef

Please sign in to comment.