Skip to content

Commit

Permalink
Merge pull request #1962 from blockstack/fix/1960
Browse files Browse the repository at this point in the history
Fix/1960
  • Loading branch information
jcnelson authored Oct 13, 2020
2 parents 9b62cc3 + 1a93c0f commit 7c0b252
Show file tree
Hide file tree
Showing 8 changed files with 599 additions and 62 deletions.
78 changes: 55 additions & 23 deletions src/net/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ impl NeighborStats {
}
}

pub fn add_relayer(&mut self, addr: NeighborAddress, num_bytes: u64) -> () {
if let Some(stats) = self.relayed_messages.get_mut(&addr) {
pub fn add_relayer(&mut self, addr: &NeighborAddress, num_bytes: u64) -> () {
if let Some(stats) = self.relayed_messages.get_mut(addr) {
stats.num_messages += 1;
stats.num_bytes += num_bytes;
stats.last_seen = get_epoch_time_secs();
Expand All @@ -219,7 +219,7 @@ impl NeighborStats {
num_bytes: num_bytes,
last_seen: get_epoch_time_secs(),
};
self.relayed_messages.insert(addr, info);
self.relayed_messages.insert(addr.clone(), info);
}
}

Expand Down Expand Up @@ -835,7 +835,7 @@ impl ConversationP2P {
&mut self,
msg: StacksMessage,
) -> Result<ReplyHandleP2P, net_error> {
let _name = msg.get_message_name();
let _name = msg.payload.get_message_description();
let _seq = msg.request_id();

let mut handle = self.connection.make_relay_handle(self.conn_id)?;
Expand Down Expand Up @@ -1241,6 +1241,7 @@ impl ConversationP2P {
chainstate: &mut StacksChainState,
header_cache: &mut BlockHeaderCache,
get_blocks_inv: &GetBlocksInv,
connection_opts: &ConnectionOptions,
) -> Result<StacksMessageType, net_error> {
// must not ask for more than a reasonable number of blocks
if get_blocks_inv.num_blocks == 0
Expand Down Expand Up @@ -1367,7 +1368,7 @@ impl ConversationP2P {
// update cache
SortitionDB::merge_block_header_cache(header_cache, &block_hashes);

let blocks_inv_data: BlocksInvData = chainstate
let mut blocks_inv_data: BlocksInvData = chainstate
.get_blocks_inventory(&block_hashes)
.map_err(|e| net_error::from(e))?;

Expand All @@ -1376,6 +1377,20 @@ impl ConversationP2P {
&local_peer, &blocks_inv_data, get_blocks_inv
);

if connection_opts.disable_inv_chat {
// never reply that we have blocks
test_debug!(
"{:?}: Disable inv chat -- pretend like we have nothing",
local_peer
);
for i in 0..blocks_inv_data.block_bitvec.len() {
blocks_inv_data.block_bitvec[i] = 0;
}
for i in 0..blocks_inv_data.microblocks_bitvec.len() {
blocks_inv_data.microblocks_bitvec[i] = 0;
}
}

Ok(StacksMessageType::BlocksInv(blocks_inv_data))
}

Expand All @@ -1399,6 +1414,7 @@ impl ConversationP2P {
chainstate,
header_cache,
get_blocks_inv,
&self.connection.options,
)?;
self.sign_and_reply(local_peer, burnchain_view, preamble, response)
}
Expand Down Expand Up @@ -1534,7 +1550,7 @@ impl ConversationP2P {
fn check_relayers_remote(local_peer: &LocalPeer, relayers: &Vec<RelayData>) -> bool {
let addr = local_peer.to_neighbor_addr();
for r in relayers.iter() {
if r.peer == addr {
if r.peer.public_key_hash == addr.public_key_hash {
return false;
}
}
Expand All @@ -1549,24 +1565,27 @@ impl ConversationP2P {
&mut self,
local_peer: &LocalPeer,
preamble: &Preamble,
mut relayers: Vec<RelayData>,
relayers: &Vec<RelayData>,
) -> bool {
if !ConversationP2P::check_relayer_cycles(&relayers) {
debug!("Message from {:?} contains a cycle", self.to_neighbor_key());
if !ConversationP2P::check_relayer_cycles(relayers) {
debug!(
"Invalid relayers -- message from {:?} contains a cycle",
self.to_neighbor_key()
);
return false;
}

if !ConversationP2P::check_relayers_remote(local_peer, &relayers) {
if !ConversationP2P::check_relayers_remote(local_peer, relayers) {
debug!(
"Message originates from us ({})",
"Invalid relayers -- message originates from us ({})",
local_peer.to_neighbor_addr()
);
return false;
}

for relayer in relayers.drain(..) {
for relayer in relayers.iter() {
self.stats
.add_relayer(relayer.peer, (preamble.payload_len - 1) as u64);
.add_relayer(&relayer.peer, (preamble.payload_len - 1) as u64);
}

return true;
Expand All @@ -1583,7 +1602,8 @@ impl ConversationP2P {
) -> Result<Option<ReplyHandleP2P>, net_error> {
assert!(preamble.payload_len > 5); // don't count 1-byte type prefix + 4 byte vector length

if !self.process_relayers(local_peer, preamble, relayers) {
if !self.process_relayers(local_peer, preamble, &relayers) {
debug!("Drop pushed blocks -- invalid relayers {:?}", &relayers);
self.stats.msgs_err += 1;
return Err(net_error::InvalidMessage);
}
Expand Down Expand Up @@ -1619,7 +1639,11 @@ impl ConversationP2P {
) -> Result<Option<ReplyHandleP2P>, net_error> {
assert!(preamble.payload_len > 5); // don't count 1-byte type prefix + 4 byte vector length

if !self.process_relayers(local_peer, preamble, relayers) {
if !self.process_relayers(local_peer, preamble, &relayers) {
debug!(
"Drop pushed microblocks -- invalid relayers {:?}",
&relayers
);
self.stats.msgs_err += 1;
return Err(net_error::InvalidMessage);
}
Expand Down Expand Up @@ -1650,7 +1674,11 @@ impl ConversationP2P {
) -> Result<Option<ReplyHandleP2P>, net_error> {
assert!(preamble.payload_len > 1); // don't count 1-byte type prefix

if !self.process_relayers(local_peer, preamble, relayers) {
if !self.process_relayers(local_peer, preamble, &relayers) {
debug!(
"Drop pushed transaction -- invalid relayers {:?}",
&relayers
);
self.stats.msgs_err += 1;
return Err(net_error::InvalidMessage);
}
Expand Down Expand Up @@ -1797,7 +1825,7 @@ impl ConversationP2P {
}
}
}
debug!("{:?}: received {} bytes", self, total_recved);
test_debug!("{:?}: received {} bytes", self, total_recved);
Ok(total_recved)
}

Expand Down Expand Up @@ -1825,7 +1853,7 @@ impl ConversationP2P {
}
}
}
debug!("{:?}: sent {} bytes", self, total_sent);
test_debug!("{:?}: sent {} bytes", self, total_sent);
Ok(total_sent)
}

Expand Down Expand Up @@ -2154,7 +2182,8 @@ impl ConversationP2P {
}

let now = get_epoch_time_secs();
let _msgtype = msg.payload.get_message_name().to_owned();
let _msgtype = msg.payload.get_message_description().to_owned();
let _relayers = format!("{:?}", &msg.relayers);
let _seq = msg.request_id();

if update_stats {
Expand Down Expand Up @@ -2201,7 +2230,10 @@ impl ConversationP2P {
self.stats.msgs_rx_unsolicited += 1;
}

debug!("{:?}: Received message {}", &self, _msgtype);
debug!(
"{:?}: Received message {}, relayed by {}",
&self, &_msgtype, &_relayers
);

// Is there someone else waiting for this message? If so, pass it along.
let fulfill_opt = self.connection.fulfill_request(msg);
Expand Down Expand Up @@ -4111,10 +4143,10 @@ mod test {
},
];

assert!(!convo.process_relayers(&local_peer, &msg.preamble, relay_cycles));
assert!(!convo.process_relayers(&local_peer, &msg.preamble, self_sent));
assert!(!convo.process_relayers(&local_peer, &msg.preamble, &relay_cycles));
assert!(!convo.process_relayers(&local_peer, &msg.preamble, &self_sent));

assert!(convo.process_relayers(&local_peer, &msg.preamble, relayers.clone()));
assert!(convo.process_relayers(&local_peer, &msg.preamble, &relayers));

// stats updated
assert_eq!(convo.stats.relayed_messages.len(), 2);
Expand Down
14 changes: 11 additions & 3 deletions src/net/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1249,14 +1249,22 @@ impl StacksMessage {
our_seq: u32,
our_addr: &NeighborAddress,
) -> Result<(), net_error> {
while self.relayers.len() >= (MAX_RELAYERS_LEN as usize) {
// remove (old) nodes at the front
self.relayers.remove(0);
if self.relayers.len() >= MAX_RELAYERS_LEN as usize {
warn!(
"Message {:?} has too many relayers; will not sign",
self.payload.get_message_description()
);
return Err(net_error::InvalidMessage);
}

// don't sign if signed more than once
for relayer in &self.relayers {
if relayer.peer.public_key_hash == our_addr.public_key_hash {
warn!(
"Message {:?} already signed by {}",
self.payload.get_message_description(),
&our_addr.public_key_hash
);
return Err(net_error::InvalidMessage);
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/net/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ pub struct ConnectionOptions {
pub disable_neighbor_walk: bool,
pub disable_chat_neighbors: bool,
pub disable_inv_sync: bool,
pub disable_inv_chat: bool,
pub disable_block_download: bool,
pub disable_network_prune: bool,
pub disable_network_bans: bool,
Expand Down Expand Up @@ -417,6 +418,7 @@ impl std::default::Default for ConnectionOptions {
disable_neighbor_walk: false,
disable_chat_neighbors: false,
disable_inv_sync: false,
disable_inv_chat: false,
disable_block_download: false,
disable_network_prune: false,
disable_network_bans: false,
Expand Down
2 changes: 1 addition & 1 deletion src/net/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ impl BlockDownloader {
pending_microblock_requests.insert(block_key, event_id);
} else {
debug!(
"Event {} ({:?}, {:?} for microblocks built by ({}) failed to connect",
"Event {} ({:?}, {:?} for microblocks built by ({}) failed to connect. Temporarily blocking URL.",
&block_key.neighbor,
&block_key.data_url,
&block_key.index_block_hash,
Expand Down
18 changes: 14 additions & 4 deletions src/net/inv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -950,11 +950,16 @@ impl InvState {
self.sync_peers.clear();
self.sync_peers = peers;

for peer in self.sync_peers.iter() {
if !self.block_stats.contains_key(peer) {
self.block_stats.remove(peer);
// clear out block_stats for peers we aren't talking to anymore
let mut to_remove = HashSet::new();
for (nk, _) in self.block_stats.iter() {
if !self.sync_peers.contains(&nk) {
to_remove.insert(nk.clone());
}
}
for nk in to_remove.into_iter() {
self.block_stats.remove(&nk);
}

for (_, stats) in self.block_stats.iter_mut() {
if stats.status != NodeStatus::Online {
Expand Down Expand Up @@ -1275,7 +1280,7 @@ impl PeerNetwork {
target_pox_reward_cycle: u64,
) -> Result<Option<GetPoxInv>, net_error> {
if target_pox_reward_cycle >= self.pox_id.len() as u64 {
debug!("{:?}: target reward cycle for neighbor {:?} is {}, which is higher than our PoX bit vector length {}", &self.local_peer, nk, target_pox_reward_cycle, self.pox_id.len());
debug!("{:?}: target reward cycle for neighbor {:?} is {}, which is equal to or higher than our PoX bit vector length {}", &self.local_peer, nk, target_pox_reward_cycle, self.pox_id.len());
return Ok(None);
}

Expand Down Expand Up @@ -3015,6 +3020,7 @@ mod test {
chainstate,
&mut network.header_cache,
&getblocksinv_request,
&network.connection_opts,
)
})
.unwrap();
Expand Down Expand Up @@ -3069,6 +3075,7 @@ mod test {
chainstate,
&mut network.header_cache,
&getblocksinv_request,
&network.connection_opts,
)
})
.unwrap();
Expand Down Expand Up @@ -3124,6 +3131,7 @@ mod test {
chainstate,
&mut network.header_cache,
&getblocksinv_request,
&network.connection_opts,
)
})
.unwrap();
Expand Down Expand Up @@ -3177,6 +3185,7 @@ mod test {
chainstate,
&mut network.header_cache,
&getblocksinv_request,
&network.connection_opts,
)
})
.unwrap();
Expand Down Expand Up @@ -3215,6 +3224,7 @@ mod test {
chainstate,
&mut network.header_cache,
&getblocksinv_request,
&network.connection_opts,
)
})
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions src/net/neighbors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2725,7 +2725,7 @@ impl PeerNetwork {
self.walk_deadline =
self.connection_opts.walk_interval + get_epoch_time_secs();

if self.walk_count > NUM_INITIAL_WALKS
if self.walk_count > self.connection_opts.num_initial_walks
&& self.prune_deadline < get_epoch_time_secs()
{
// clean up
Expand All @@ -2747,7 +2747,7 @@ impl PeerNetwork {
);

if walk_opt.is_some()
&& self.walk_count > NUM_INITIAL_WALKS
&& self.walk_count > self.connection_opts.num_initial_walks
&& walk.walk_step_count >= walk.walk_min_duration
{
// consider re-setting the walk state, now that we completed a walk.
Expand Down
Loading

0 comments on commit 7c0b252

Please sign in to comment.