Skip to content
Permalink
21efbfd7c0
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
273 lines (223 sloc) 8.63 KB
#!/usr/bin/python
"CS244 Assignment 2: Buffer Sizing"
from mininet.topo import Topo
from mininet.node import CPULimitedHost
from mininet.link import TCLink
from mininet.net import Mininet
from mininet.log import lg
from mininet.util import dumpNodeConnections
from mininet.cli import CLI
import threading
import subprocess
from subprocess import Popen, PIPE
import time
from multiprocessing import Process
from argparse import ArgumentParser
from extras import exec_and_monitor, cprint
import sys
import os
from util.monitor import monitor_qlen
# Time to wait between samples, in seconds, as a float.
SAMPLE_PERIOD_SEC = 1.0
# Time to wait for first sample, in seconds, as a float.
SAMPLE_WAIT_SEC = 3.0
# Parse arguments
parser = ArgumentParser(description="Buffer sizing tests")
# Default maxq = 120 * 10^3 bit / (1500 bytes * 8 bits/byte) = 10 packets
parser.add_argument('--maxq',
dest="maxq",
action="store",
type=int,
help="Max buffer size of network interface in packets",
default=10)
parser.add_argument('--time', '-t',
action="store",
type=int,
help="Time to run experiment",
default=400)
parser.add_argument('--competing-flow-wait-time', '-c',
action="store",
type=int,
help="Time to wait before starting competing flow",
default=200)
parser.add_argument('--competing-flow-run-time', '-r',
action="store",
type=int,
help="Time to wait before stopping competing flow",
default=200)
parser.add_argument('--force',
action="store",
type=int,
help="Time to force the playback rate",
default=200)
parser.add_argument('--playback-buffer-size', '-p',
action="store",
type=int,
help="Playback buffer size",
default=240)
parser.add_argument('--dir', '-d',
dest="dir",
action="store",
help="Directory to store output",
required=True)
parser.add_argument('--test',
dest="test",
action="store",
help="Test to run",
required=True)
# Expt parameters
args = parser.parse_args()
if not os.path.exists(args.dir):
os.makedirs(args.dir)
lg.setLogLevel('info')
# Topology to be instantiated in Mininet
class StarTopo(Topo):
# bw_host = bandwidth of long-lived TCP video and competing streams
# = 100 Mb/s
# delay = RTT = 4 ms
# bw_net = bottleneck bandwidth = 5 Mb/s
# maxq = default 10 pkts (but we will vary this parameter)
def __init__(self, n=3, cpu=None, bw_host=None, bw_net=None,
delay=None, maxq=None):
super(StarTopo, self ).__init__()
self.n = n
self.cpu = cpu
self.bw_host = 100
self.bw_net = 5
self.delay = '1ms'
self.maxq = args.maxq
self.create_topology()
# TODO: Fill in the following function to Create the experiment
# topology Set appropriate values for bandwidth, delay, and queue
# size.
def create_topology(self):
server = self.addHost('h%d' % self.n)
switch = self.addSwitch('s1')
for i in range(self.n-1):
host = self.addHost('h%d' % (i+1))
self.addLink(host, switch, bw=self.bw_host,
delay=self.delay, use_htb=True)
self.addLink(switch, server, bw=self.bw_net, delay=self.delay,
max_queue_size=self.maxq, use_htb=True)
def start_tcpprobe():
"Install tcp_probe module and dump to file"
os.system("rmmod tcp_probe 2>/dev/null; modprobe tcp_probe port=8000;")
Popen("cat /proc/net/tcpprobe > %s/tcp_probe.txt" %
args.dir, shell=True)
def stop_tcpprobe():
os.system("killall -9 cat; rmmod tcp_probe &>/dev/null;")
def count_connections():
"Count current connections in iperf output file"
out = args.dir + "/iperf_server.txt"
lines = Popen("grep connected %s | wc -l" % out,
shell=True, stdout=PIPE).communicate()[0]
return int(lines)
def set_q(iface, q):
"Change queue size limit of interface"
cmd = ("tc qdisc change dev %s parent 5:1 "
"handle 10: netem limit %s" % (iface, q))
#os.system(cmd)
subprocess.check_output(cmd, shell=True)
def set_speed(iface, spd):
"Change htb maximum rate for interface"
cmd = ("tc class change dev %s parent 5:0 classid 5:1 "
"htb rate %s burst 15k" % (iface, spd))
os.system(cmd)
def start_competing_flow(net):
h2, h3 = net.getNodeByName('h2', 'h3')
h2.popen(
"iperf -s -p 5001 -w 16m > %s/iperf_server.txt" % args.dir, shell=True)
h3.popen("iperf -c %s -p 5001 -t 3600 -i 1 -y -Z cong > %s/competing_flow.txt" %
(h2.IP(), args.dir,), shell=True)
fname = '%s/competing_throughput.txt' % args.dir
exec_and_monitor(h2, 'python monitor_throughput.py --interface h2-eth0 --fname %s --field-name competing-throughput' % fname)
def stop_competing_flow(net):
os.system('killall -9 iperf')
def force_test(net):
pass
def download_rates_test(net):
h1, h2, h3 = net.getNodeByName('h1', 'h2', 'h3')
print 'starting server'
server_proc = exec_and_monitor(h3, 'python server.py')
time.sleep(0.5)
print 'starting client'
client_proc = exec_and_monitor(
h1, 'python client.py --server-ip %s --dir %s --buffer-size %d --check-download-rates' %
(h3.IP(), args.dir, args.playback_buffer_size))
while True:
time.sleep(5)
def normal_test(net, force_time):
h1, h2, h3 = net.getNodeByName('h1', 'h2', 'h3')
print 'starting server'
server_proc = exec_and_monitor(h3, 'python server.py')
time.sleep(0.5)
start_tcpprobe()
print 'starting client'
if force_time is not None:
client_proc = exec_and_monitor(
h1, 'python client.py --server-ip %s --dir %s --buffer-size %d --force %d' %
(h3.IP(), args.dir, args.playback_buffer_size, force_time))
else:
client_proc = exec_and_monitor(
h1, 'python client.py --server-ip %s --dir %s --buffer-size %d' %
(h3.IP(), args.dir, args.playback_buffer_size))
# Queue monitoring start
iface='s1-eth3'
outfile = '%s/qlen_%s.txt' % (args.dir, iface)
monitor = Process(target=monitor_qlen, args=(iface, 0.01, outfile))
monitor.start()
start_time = time.time()
competing_flow_started = False
competing_flow_stopped = False
competing_flow_start_time = start_time + args.competing_flow_wait_time
competing_flow_stop_time = competing_flow_start_time + args.competing_flow_run_time
while (True):
current_time = time.time()
if current_time > start_time + args.time:
break
if (current_time > competing_flow_start_time and not competing_flow_started):
start_competing_flow(net)
competing_flow_started = True
if (current_time > competing_flow_stop_time and not competing_flow_stopped):
stop_competing_flow(net)
competing_flow_stopped = True
cprint("Time remaining: %f" % (start_time + args.time - current_time), "blue")
time.sleep(5)
server_proc.kill()
client_proc.kill()
# Queue monitoring stop
monitor.terminate()
net.stop()
def main():
start = time.time()
"Create network and run Buffer Sizing experiment"
# Reset to known state
topo = StarTopo(n=3, bw_host=100, delay='4ms', bw_net=5, maxq=args.maxq)
net = Mininet(topo=topo, host=CPULimitedHost, link=TCLink)
net.start()
dumpNodeConnections(net.hosts)
net.pingAll()
cprint("Starting experiment", "green")
print "Directory logs: %s" % args.dir
if args.test == 'none':
normal_test(net, None)
elif args.test == 'dl':
download_rates_test(net)
elif args.test == 'force':
normal_test(net, args.force)
# Shut down iperf processes
os.system('killall -9 iperf')
Popen("killall -9 top bwm-ng tcpdump cat mnexec iperf python", shell=True).wait()
stop_tcpprobe()
end = time.time()
cprint("Sweep took %.3f seconds" % (end - start), "yellow")
if __name__ == '__main__':
try:
main()
except:
print "-"*80
print "Caught exception. Cleaning up..."
print "-"*80
import traceback
traceback.print_exc()
os.system("killall -9 top bwm-ng tcpdump cat mnexec iperf python; mn -c")