Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Realistic simulation scenarios #4

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

* DST gossipsub test node
* incl shadow simulation setup
* incl awk scripts for detailed analysis

## Shadow example
Three simulation cases are supportes

## Scenario1 - Basic Simulation Scenario
### Shadow example

```sh
nimble install -dy
Expand All @@ -22,3 +26,44 @@ nim -d:release c plotter
```

The dependencies will be installed in the `nimbledeps` folder, which enables easy tweaking


## Scenario2 - (P2P-Research Branch) -
Homogeneous nodes/links (100 Mbps bandwidth and 100ms latency)
supports variable network/message size, variable number of publishers and message fragments
automatically updates shadow.yaml file to accomodate different network sizes
awk scripts for detailed analysis

```sh
cd shadow
#the run.sh script is automated to meet different experiment needs, use ./run.sh <num_runs num_peers msg_size num_fragments>
#The below example runs the simulation twice for a 1000 node network. each publisher publishes a 15000 bytes messages, and every message is partitioned into 4 fragments. Total 10 publishers use

./run.sh 2 1000 15000 4 10
# The number of nodes is maintained in the shadow.yaml file, and automatically updated by run.sh.
# The output files latencies(x), stats(x) and shadowlog(x) carries the outputs for each simulation run.
# The summary_dontwant.awk, summary_latency.awk, summary_latency_large.awk, and summary_shadowlog.awk parse the output files.
# The run.sh script automatically calls these files to display the output
# a temperary data.shadow folder is created for each simulation and removed by the run.sh after the simulation is over
```

## Scenario3 - (Realistic-Scenarios Branch) -
Heterogeneous nodes/links (bandwidth/latency/packet_loss_ratio controllable through run.sh)
run.sh uses topogen.py to create a realistic gml file to emulate a real-world network scenario
supports variable network/message size, variable number of publishers and message fragments
automatically generates network_topology.gml and shadow.yaml files to accomodate different network sizes
awk scripts called from run.sh for detailed analysis

```sh
cd shadow
#The following sample command runs simulation 1 time, for a 1000 node network. Each published message size
#is 15KB (no-fragmentation), peer bandwidth varies between 50-130 Mbps, Latency between 60-160ms, and
#bandwidth,latency is roughly distributed in five different groups.

./run.sh 1 1000 15000 1 10 50 130 60 160 5 0.0
# The number of nodes is maintained in the shadow.yaml file, and automatically generated by run.sh.
# The output files latencies(x), stats(x) and shadowlog(x) carries the outputs for each simulation run.
# The summary_dontwant.awk, summary_latency.awk, summary_latency_large.awk, and summary_shadowlog.awk parse the output files.
# The run.sh script automatically calls these files to display the output
# a temperary data.shadow folder is created for each simulation and removed by the run.sh after the simulation is over
```
89 changes: 70 additions & 19 deletions shadow/main.nim
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
import stew/endians2, stew/byteutils, tables, strutils, os
import libp2p, libp2p/protocols/pubsub/rpc/messages
import libp2p/muxers/mplex/lpchannel, libp2p/protocols/ping
#import libp2p/protocols/pubsub/pubsubpeer

import chronos
import sequtils, hashes, math, metrics
from times import getTime, toUnix, fromUnix, `-`, initTime, `$`, inMilliseconds
from nativesockets import getHostname

const chunks = 1
#These parameters are passed from yaml file, and each defined peer may receive different parameters (e.g. message size)
var
publisherCount = parseInt(getEnv("PUBLISHERS"))
msg_size = parseInt(getEnv("MSG_SIZE"))
chunks = parseInt(getEnv("FRAGMENTS"))

#we experiment with upto 10 fragments. 1 means, the messages are not fragmented
if chunks < 1 or chunks > 10:
chunks = 1

proc msgIdProvider(m: Message): Result[MessageId, ValidationResult] =
return ok(($m.data.hash).toBytes())
Expand All @@ -15,9 +25,7 @@ proc main {.async.} =
let
hostname = getHostname()
myId = parseInt(hostname[4..^1])
#publisherCount = client.param(int, "publisher_count")
publisherCount = 10
isPublisher = myId <= publisherCount
isPublisher = myId <= publisherCount #need to adjust is publishers ldont start from peer1
#isAttacker = (not isPublisher) and myId - publisherCount <= client.param(int, "attacker_count")
isAttacker = false
rng = libp2p.newRng()
Expand Down Expand Up @@ -45,7 +53,7 @@ proc main {.async.} =
anonymize = true,
)
pingProtocol = Ping.new(rng=rng)
gossipSub.parameters.floodPublish = false
gossipSub.parameters.floodPublish = false
#gossipSub.parameters.lazyPushThreshold = 1_000_000_000
#gossipSub.parameters.lazyPushThreshold = 0
gossipSub.parameters.opportunisticGraftThreshold = -10000
Expand Down Expand Up @@ -79,6 +87,9 @@ proc main {.async.} =
sentNanosecs = nanoseconds(sentMoment - seconds(sentMoment.seconds))
sentDate = initTime(sentMoment.seconds, sentNanosecs)
diff = getTime() - sentDate

# pubId = byte(data[11])

echo sentUint, " milliseconds: ", diff.inMilliseconds()


Expand Down Expand Up @@ -121,7 +132,7 @@ proc main {.async.} =
let connectTo = parseInt(getEnv("CONNECTTO"))
var connected = 0
for peerInfo in peersInfo:
if connected >= connectTo: break
if connected >= connectTo+2: break
let tAddress = "peer" & $peerInfo & ":5000"
echo tAddress
let addrs = resolveTAddress(tAddress).mapIt(MultiAddress.init(it).tryGet())
Expand All @@ -137,24 +148,64 @@ proc main {.async.} =
# warmupMessages = client.param(int, "warmup_messages")
#startOfTest = Moment.now() + milliseconds(warmupMessages * maxMessageDelay div 2)

await sleepAsync(10.seconds)
echo "Mesh size: ", gossipSub.mesh.getOrDefault("test").len
await sleepAsync(12.seconds)
echo "Mesh size: ", gossipSub.mesh.getOrDefault("test").len,
", Total Peers Known : ", gossipSub.gossipsub.getOrDefault("test").len,
", Direct Peers : ", gossipSub.subscribedDirectPeers.getOrDefault("test").len,
", Fanout", gossipSub.fanout.getOrDefault("test").len,
", Heartbeat : ", gossipSub.parameters.heartbeatInterval.milliseconds

await sleepAsync(5.seconds)

for msg in 0 ..< 10:#client.param(int, "message_count"):
await sleepAsync(12.seconds)
if msg mod publisherCount == myId - 1:
#if myId == 1:
# Actual message publishing, one message published every 3 seconds
# First 1-2 messages take longer than expected time due to low cwnd.
# warmup_messages can set cwnd to a desired level. or alternatively, warmup messages can be set to 0
let
warmup_messages = 2
#shadow.yaml defines peers with changing latency/bandwith. In the current arrangement all the publishers
#will get different latency/bandwidth
pubStart = 4
pubEnd = pubStart + publisherCount + warmup_messages


#we send warmup_messages for adjusting TCP cwnd
for i in pubStart..<(pubStart + warmup_messages):
await sleepAsync(2.seconds)
if i == myId:
#two warmup messages for cwnd raising
let
now = getTime()
nowInt = seconds(now.toUnix()) + nanoseconds(times.nanosecond(now))
var nowBytes = @(toBytesLE(uint64(nowInt.nanoseconds))) & newSeq[byte](msg_size)
doAssert((await gossipSub.publish("test", nowBytes)) > 0)
#done sending warmup_messages , wait for short time
await sleepAsync(5.seconds)


#We now send publisher_count messages
for msg in (pubStart + warmup_messages) .. pubEnd:#client.param(int, "message_count"):
await sleepAsync(3.seconds)
if msg mod (pubEnd+1) == myId:
let
now = getTime()
nowInt = seconds(now.toUnix()) + nanoseconds(times.nanosecond(now))
#var nowBytes = @(toBytesLE(uint64(nowInt.nanoseconds))) & newSeq[byte](500_000 div chunks)
var nowBytes = @(toBytesLE(uint64(nowInt.nanoseconds))) & newSeq[byte](50)
#echo "sending ", uint64(nowInt.nanoseconds)
#[
if chunks == 1:
var nowBytes = @(toBytesLE(uint64(nowInt.nanoseconds))) & newSeq[byte](50000)
else:
var nowBytes = @(toBytesLE(uint64(nowInt.nanoseconds))) & newSeq[byte](500_000 div chunks)
]#
var nowBytes = @(toBytesLE(uint64(nowInt.nanoseconds))) & newSeq[byte](msg_size div chunks)
for chunk in 0..<chunks:
nowBytes[10] = byte(chunk)
doAssert((await gossipSub.publish("test", nowBytes)) > 0)

#echo "BW: ", libp2p_protocols_bytes.value(labelValues=["/meshsub/1.1.0", "in"]) + libp2p_protocols_bytes.value(labelValues=["/meshsub/1.1.0", "out"])
#echo "DUPS: ", libp2p_gossipsub_duplicate.value(), " / ", libp2p_gossipsub_received.value()

echo "Done Publishing ", nowInt.nanoseconds

#we need to export these counters from gossipsub.nim
echo "statcounters: dup_during_validation ", libp2p_gossipsub_duplicate_during_validation.value(),
"\tidontwant_saves ", libp2p_gossipsub_idontwant_saved_messages.value(),
"\tdup_received ", libp2p_gossipsub_duplicate.value(),
"\tUnique_msg_received ", libp2p_gossipsub_received.value(),
"\tStaggered_Saves ", libp2p_gossipsub_staggerDontWantSave.value(),
"\tDontWant_IN_Stagger ", libp2p_gossipsub_staggerDontWantSave2.value()
waitFor(main())
67 changes: 62 additions & 5 deletions shadow/run.sh
Original file line number Diff line number Diff line change
@@ -1,8 +1,65 @@
#!/bin/sh

set -e

nim c -d:chronicles_colors=None --threads:on -d:metrics -d:libp2p_network_protocols_metrics -d:release main
rm -rf shadow.data/
shadow shadow.yaml
grep -rne 'milliseconds\|BW' shadow.data/ > latencies
if [ $# -ne 11 ]; then
echo "Usage: $0 <runs> <nodes> <Message_size> <num_fragment> <num_publishers>
<min_bandwidth> <max_bandwidth> <min_latency> <max_latency> <anchor_stages <packet_loss>>"

echo "The following sample command runs simulation 1 time, for a 1000 node network. Each published message size \
is 15KB (no-fragmentation), peer bandwidth varies between 50-130 Mbps, Latency between 60-160ms, and \
bandwidth,latency is roughly distributed in five different groups. \
see the generated network_topology.gml and shadow.yaml for peers/edges details"

echo "$0 1 1000 15000 1 10 50 130 60 160 5 0.0"
exit 1
fi

runs="$1" #number of simulation runs
nodes="$2" #number of nodes to simulate
msg_size="$3" #message size to use (in bytes)
num_frag="$4" #number of fragments per message (1 for no fragmentation)
num_publishers="$5" #number of publishers
min_bandwidth="$6"
max_bandwidth="$7"
min_latency="$8"
max_latency="$9"
steps="${10}"
pkt_loss="${11}"

connect_to=5 #number of peers we connect with to form full message mesh


#topogen.py uses networkx module from python to generate gml and yaml files
PYTHON=$(which python3 || which python)

if [ -z "$PYTHON" ]; then
echo "Error: Python, Networkx is required for topology files generation."
exit 1
fi

"$PYTHON" topogen.py $nodes $min_bandwidth $max_bandwidth $min_latency $max_latency $steps $pkt_loss $msg_size $num_frag $num_publishers



rm -f shadowlog* latencies* stats* main && rm -rf shadow.data/
nim c -d:chronicles_colors=None --threads:on -d:metrics -d:libp2p_network_protocols_metrics -d:release main

for i in $(seq $runs); do
echo "Running for turn "$i
shadow shadow.yaml > shadowlog$i &&
grep -rne 'milliseconds\|BW' shadow.data/ > latencies$i &&
grep -rne 'statcounters:' shadow.data/ > stats$i
#uncomment to to receive every nodes log in shadow data (only if runs == 1, or change data directory in yaml file)
#rm -rf shadow.data/
done

for i in $(seq $runs); do
echo "Summary for turn "$i
if [ "$msg_size" -lt 1000 ]; then
awk -f summary_latency.awk latencies$i #precise per hop coverage for short messages only
else
awk -f summary_latency_large.awk latencies$i #estimated coverage for large messages (TxTime adds to latency)
fi
awk -f summary_shadowlog.awk shadowlog$i
awk -f summary_dontwant.awk stats$i
done
Loading