Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
first commit
  • Loading branch information
juw15008 committed Nov 10, 2016
0 parents commit dc117a8
Show file tree
Hide file tree
Showing 21 changed files with 2,111 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
@@ -0,0 +1,3 @@
*.pyc
video_files
rate-*
12 changes: 12 additions & 0 deletions TODO.txt
@@ -0,0 +1,12 @@
Make a streaming client
Socket Communication
Logic for when to request files
Get available bandwidth estimates
Video playback rate as a function of available bandwidth
see figure 9, should be easy given that
Also see text under Figure 20, suggests ten samples
Make a streaming server
Socket Communication
Generate files
Make topology
Bandwidth limited to 5Mbps, buffer size is 120kbit, 4-20 ms RTT
188 changes: 188 additions & 0 deletions client.py
@@ -0,0 +1,188 @@
#!/usr/bin/python
import socket
import sys
import os
import threading
import time
import subprocess
from mysocket import send_message, receive_message
from playback_buffer import PlaybackBuffer

from extras import exec_and_monitor, start_tcpprobe, MovingRecord

from argparse import ArgumentParser
MOVING_MEDIAN_FILTER_SAMPLE_SIZE =10
MOVING_AVG_FILTER_SAMPLE_SIZE = 10

# rate (bytes per second) = (rate in kilobits * 1000 bits/1 kilo) / 8 bits/byte;
RATES = [235, 375, 560, 750, 1050, 1400, 1750]
# bytes per sec
# [29375, 46875, 70000, 93750, 131250, 175000, 218750]
# bytes for 4 seconds
# [117500, 187500, 280000, 375000, 525000, 700000, 875000]
RATES[:] = [x* (1000 / 8) for x in RATES]

class Client:

def __init__(self, server_ip, dir, buffer_size, debug=False):
self.server_ip = server_ip
self.video_rate_filename = '%s/video_rate.txt' % dir
self.tcp_rate_filename = '%s/tcp_rate.txt' % dir
self.request_interval_filename = '%s/request_interval.txt' % dir
self.debug = debug
self.download_record = MovingRecord(10)
self.playback_buffer = PlaybackBuffer(buffer_size, debug=False)

exec_and_monitor(host=None, cmd=[
'python', 'monitor_throughput.py', '--interface', 'h1-eth0',
'--fname', self.tcp_rate_filename, '--field-name',
'instantaneous-throughput', '--gran'])

def test_rates(self):
nsamples = 3
results = [0] * len(RATES)
for i in range(nsamples):
for idx, r in enumerate(RATES):
self.real_print('checking rate %d for the %dth time' % (r * 8 / 1000, i))
bytes_needed = r * 4
download_segment = self.request_segment(bytes_needed)
bw = download_segment[0] * 8 / download_segment[1]
results[idx] += bw
time.sleep(4)
results = [x / nsamples for x in results]
result_dict = {}
for i in range(len(RATES)):
result_dict[RATES[i] * 8 / 1000] = results[i]
self.real_print('-' * 20)
self.real_print('Results {rate : bandwidth}')
self.real_print('-' * 20)
self.real_print(result_dict)
self.real_print('Hit Ctrl-C to exit (Sorry)')

def run_forever(self):
self.set_up_connection()
self.playback_buffer.start()
if args.dl_rates:
self.test_rates()
return
self.start_time = time.time()
last_request_time = self.start_time
while True:
# Pick rate based on current bandwidth, which is a function of last download size and download_time
rate = self.pick_rate(args.force is not None and time.time() > self.start_time + args.force)
bytes_needed = rate * 4
if self.playback_buffer.available_bytes() == 0:
self.debug_print('buffer too small')
time.sleep(1)
else:
next_request_time = time.time()
request_interval = next_request_time - last_request_time
last_request_time = next_request_time
self.debug_print('buffer big enough')
download_segment = self.request_segment(bytes_needed)
self.download_record.add_sample(download_segment[0] * 8 / download_segment[1])
request_interval_file = open(self.request_interval_filename, 'a')
request_interval_file.write("%f,request-interval,%f,0,0,0,0,0,0\n" % (last_request_time, request_interval))
request_interval_file.close()
self.debug_print('rate: %d\n' % (rate * 8 / 1000))

def set_up_connection(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.server_ip, 8000,))

# Not sure if the callback is necessary.
def request_segment(self, bytes_four_second):
# Request 4-second segments of video
send_message(self.socket, "%s" % bytes_four_second)
(message, download_time) = receive_message(self.socket, timed=True)
self.debug_print('download time %f message length (bytes) %d bw %d' % (download_time, len(message), (len(message)*8)/download_time))
self.playback_buffer.add_four_second_segment(bytes_four_second)
return [len(message), download_time]

# Return tuple of total download size and total download time
def current_bandwidth_estimate(self):
return self.download_record.average()

def pick_rate(self, force):
# According to Figure 9
throughput = self.current_bandwidth_estimate()

# print 'size (bits) %d time %f bw %f instantaneous bw %f' %(bw[0], bw[1], (bw[0]/bw[1]), ((download_segment[0] * 8)/download_segment[1]))

# bw is in kbps
if throughput > (2500 * 1000):
rate = RATES[6] # 1750
elif throughput > (2150 * 1000):
rate = RATES[5] # 1400
elif throughput > (1300 * 1000):
rate = RATES[4] # 1050
elif throughput > (1100 * 1000):
rate = RATES[3] # 750
elif throughput > (740 * 1000):
rate = RATES[2] # 560
elif throughput > (500 * 1000):
rate = RATES[1] # 375
else:
rate = RATES[0]

if force:
rate = RATES[-1]

# unix_timestamp;iface_name;bytes_out;bytes_in;bytes_total;packets_out;packets_in;packets_total;errors_out;errors_in
self.rate_file = open(self.video_rate_filename, 'a')
# video flow throughput is the running average bw
current_time = time.time()
self.rate_file.write("%f,video-flow-throughput,0,%d,0,0,0,0,0\n" % (current_time, throughput))
# Translate rate to bits per second
self.rate_file.write("%f,video-playback-rate,0,%d,0,0,0,0,0\n" % (current_time, (rate * 8)))
self.rate_file.close()

return rate

def real_print(self, msg):
print msg
sys.stdout.flush()

def debug_print(self, msg):
if self.debug:
self.real_print(msg)

def parse_args():
parser = ArgumentParser(description="Buffer sizing tests")

parser.add_argument('--server-ip',
action="store",
help="Max buffer size of network interface in packets",
required=True)

parser.add_argument('--dir', '-d',
dest="dir",
action="store",
help="Directory to store output",
required=True)

parser.add_argument('--buffer-size', '-b',
action="store",
help="Buffer size",
required=True,
type=int)

parser.add_argument('--force', '-f',
action="store",
help="Time to force the video rate to full",
required=False,
type=int)

parser.add_argument('--check-download-rates', help="Enable the source port filter (Default is dest port)", action='store_true', dest="dl_rates", default=False)

return parser.parse_args()

if __name__ == '__main__':
try:
args = parse_args()
Client(server_ip=args.server_ip, dir=args.dir, buffer_size=args.buffer_size, debug=False).run_forever()
except Exception as e:
print '----- error in client -----'
print e
sys.stdout.flush()
sys.stderr.flush()
104 changes: 104 additions & 0 deletions extras.py
@@ -0,0 +1,104 @@
import os
import subprocess
import sys
import threading

import termcolor as T

class MovingRecord:
def __init__(self, num_samples):
self.samples = []
self.num_samples = num_samples

def add_sample(self, sample):
if len(self.samples) == self.num_samples:
self.samples.pop(0)
self.samples.append(sample)

def average(self):
if len(self.samples) == 0:
return 0
return sum(self.samples) / len(self.samples)

def median(self):
n = len(self.samples)
if n == 0:
return 0
s = sorted(self.samples)
if n % 2 == 0:
return (s[n/2] + s[n/2 - 1]) / 2
else:
return s[n/2]

def start_tcpprobe(fname):
"Install tcp_probe module and dump to file"
os.system("rmmod tcp_probe 2>/dev/null; modprobe tcp_probe;")
subprocess.Popen("cat /proc/net/tcpprobe > %s" %
fname, shell=True)

def stop_tcpprobe():
os.system("killall -9 cat; rmmod tcp_probe &>/dev/null;")

def cprint(s, color, cr=True):
"""Print in color
s: string to print
color: color to use"""
if cr:
print T.colored(s, color)
else:
print T.colored(s, color),

def monitor_stderr(p):
lines_iterator = iter(p.stderr.readline, b"")
for line in lines_iterator:
print(line.strip())

def monitor_stdout(p):
lines_iterator = iter(p.stdout.readline, b"")
for line in lines_iterator:
print(line.strip())

def exec_and_monitor(host, cmd):
if host is not None:
p = host.popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
else:
p = subprocess.Popen(args=cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

t1 = threading.Thread(target=monitor_stdout, args=(p,))
t1.setDaemon(True)
t1.start()
t2 = threading.Thread(target=monitor_stderr, args=(p,))
t2.setDaemon(True)
t2.start()
return p

def verify_rtt_between_endpoints(e1, e2, expected_rtt, tolerance):
proc = e1.popen('ping -c 2 %s' % e2.IP())
output = proc.communicate()[0]
line = output.split('\n')[2]
# "64 bytes from 10.0.0.2: icmp_seq=2 ttl=64 time=174 ms"
rtt = float(line.split(' ')[-2][5:])
if rtt < expected_rtt*(1-tolerance) or rtt > expected_rtt*(1+tolerance):
print 'Latency verification failed'
print 'Got %f instead of %f' % (rtt, expected_rtt)
sys.exit()

def verify_latency(h1, h2, expected_rtt):
print 'Verifying client-server latency...'
verify_rtt_between_endpoints(h1, h2, expected_rtt, 0.05)
verify_rtt_between_endpoints(h2, h2, expected_rtt, 0.05)

def verify_bandwidth(h1, h2, expected_bandwidth):
print 'Verifying bandwidth of bottleneck link...'
server_proc = h2.popen('iperf -s -p 5001')
client_proc = h1.popen('iperf -c %s -t 20' % h2.IP());
output = client_proc.communicate()[0]
# "[ 15] 0.0- 5.1 sec 38.0 MBytes 61.9 Mbits/sec"
line = output.split('\n')[6]
bandwidth = float(line.split(' ')[-2])
tolerance = 0.1
if bandwidth < expected_bandwidth*(1-tolerance) or \
bandwidth > expected_bandwidth*(1+tolerance):
print 'Bandwidth verification failed'
print 'Got %f instead of %f' % (bandwidth, expected_bandwidth)
server_proc.kill()
85 changes: 85 additions & 0 deletions monitor_throughput.py
@@ -0,0 +1,85 @@
#!/usr/bin/python

import threading
import time
import sys

from extras import MovingRecord
from argparse import ArgumentParser

def debug_print(msg):
print msg
sys.stdout.flush()

def get_rxbytes(iface):
f = open('/proc/net/dev', 'r')
lines = f.readlines()
for line in lines:
if iface in line:
break
f.close()
if not line:
raise Exception("could not find iface %s in /proc/net/dev:%s" %
(iface, lines))
# Extract TX bytes from:
#Inter-| Receive | Transmit
# face |bytes packets errs drop fifo frame compressed multicast|bytes packets errs drop fifo colls carrier compressed
# lo: 6175728 53444 0 0 0 0 0 0 6175728 53444 0 0 0 0 0 0
return float(line.split()[1])

def monitor_tcp_throughput(iface, fname, field_name):
last_time = time.time()
first_time = last_time
last_rxbytes = get_rxbytes(iface)
record = MovingRecord(1)
while True:
if args.gran:
time.sleep(0.2)
else:
time.sleep(4.0)
rxbytes = get_rxbytes(iface)
current_time = time.time()
throughput = (rxbytes-last_rxbytes)/(current_time-last_time)
record.add_sample(throughput)
last_time = current_time
last_rxbytes = rxbytes
f = open(fname, 'a')
f.write("%f,%s,0,%d,0,0,0,0,0\n" %
(current_time, field_name, record.average()*8))
f.close()

def parse_args():
parser = ArgumentParser(description="Monitor TCP throughput")

parser.add_argument('--interface',
action='store',
help='Interface to monitor',
required=True)

parser.add_argument('--fname', '-f',
action='store',
help='File to write',
required=True)

parser.add_argument('--field-name',
action='store',
help='Field name for the graph',
required=True)

parser.add_argument('--gran',
action='store_true',
help='Field name for the graph',
default=False)

return parser.parse_args()


if __name__ == '__main__':
try:
args = parse_args()
monitor_tcp_throughput(args.interface, args.fname, args.field_name)
except Exception as e:
print '----- error in tcp monitoring -----'
print e
sys.stdout.flush()
sys.stderr.flush()

0 comments on commit dc117a8

Please sign in to comment.