Skip to content
Permalink
master
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
#!/usr/bin/python
import socket
import sys
import os
import threading
import time
import subprocess
from mysocket import send_message, receive_message
from playback_buffer import PlaybackBuffer
from extras import exec_and_monitor, start_tcpprobe, MovingRecord
from argparse import ArgumentParser
MOVING_MEDIAN_FILTER_SAMPLE_SIZE =10
MOVING_AVG_FILTER_SAMPLE_SIZE = 10
# rate (bytes per second) = (rate in kilobits * 1000 bits/1 kilo) / 8 bits/byte;
RATES = [235, 375, 560, 750, 1050, 1400, 1750]
# bytes per sec
# [29375, 46875, 70000, 93750, 131250, 175000, 218750]
# bytes for 4 seconds
# [117500, 187500, 280000, 375000, 525000, 700000, 875000]
RATES[:] = [x* (1000 / 8) for x in RATES]
class Client:
def __init__(self, server_ip, dir, buffer_size, debug=False):
self.server_ip = server_ip
self.video_rate_filename = '%s/video_rate.txt' % dir
self.tcp_rate_filename = '%s/tcp_rate.txt' % dir
self.request_interval_filename = '%s/request_interval.txt' % dir
self.debug = debug
self.download_record = MovingRecord(10)
self.playback_buffer = PlaybackBuffer(buffer_size, debug=False)
exec_and_monitor(host=None, cmd=[
'python', 'monitor_throughput.py', '--interface', 'h1-eth0',
'--fname', self.tcp_rate_filename, '--field-name',
'instantaneous-throughput', '--gran'])
def test_rates(self):
nsamples = 3
results = [0] * len(RATES)
for i in range(nsamples):
for idx, r in enumerate(RATES):
self.real_print('checking rate %d for the %dth time' % (r * 8 / 1000, i))
bytes_needed = r * 4
download_segment = self.request_segment(bytes_needed)
bw = download_segment[0] * 8 / download_segment[1]
results[idx] += bw
time.sleep(4)
results = [x / nsamples for x in results]
result_dict = {}
for i in range(len(RATES)):
result_dict[RATES[i] * 8 / 1000] = results[i]
self.real_print('-' * 20)
self.real_print('Results {rate : bandwidth}')
self.real_print('-' * 20)
self.real_print(result_dict)
self.real_print('Hit Ctrl-C to exit (Sorry)')
def run_forever(self):
self.set_up_connection()
self.playback_buffer.start()
if args.dl_rates:
self.test_rates()
return
self.start_time = time.time()
last_request_time = self.start_time
while True:
# Pick rate based on current bandwidth, which is a function of last download size and download_time
rate = self.pick_rate(args.force is not None and time.time() > self.start_time + args.force)
bytes_needed = rate * 4
if self.playback_buffer.available_bytes() == 0:
self.debug_print('buffer too small')
time.sleep(1)
else:
next_request_time = time.time()
request_interval = next_request_time - last_request_time
last_request_time = next_request_time
self.debug_print('buffer big enough')
download_segment = self.request_segment(bytes_needed)
self.download_record.add_sample(download_segment[0] * 8 / download_segment[1])
request_interval_file = open(self.request_interval_filename, 'a')
request_interval_file.write("%f,request-interval,%f,0,0,0,0,0,0\n" % (last_request_time, request_interval))
request_interval_file.close()
self.debug_print('rate: %d\n' % (rate * 8 / 1000))
def set_up_connection(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.server_ip, 8000,))
# Not sure if the callback is necessary.
def request_segment(self, bytes_four_second):
# Request 4-second segments of video
send_message(self.socket, "%s" % bytes_four_second)
(message, download_time) = receive_message(self.socket, timed=True)
self.debug_print('download time %f message length (bytes) %d bw %d' % (download_time, len(message), (len(message)*8)/download_time))
self.playback_buffer.add_four_second_segment(bytes_four_second)
return [len(message), download_time]
# Return tuple of total download size and total download time
def current_bandwidth_estimate(self):
return self.download_record.average()
def pick_rate(self, force):
# According to Figure 9
throughput = self.current_bandwidth_estimate()
# print 'size (bits) %d time %f bw %f instantaneous bw %f' %(bw[0], bw[1], (bw[0]/bw[1]), ((download_segment[0] * 8)/download_segment[1]))
# bw is in kbps
if throughput > (2500 * 1000):
rate = RATES[6] # 1750
elif throughput > (2150 * 1000):
rate = RATES[5] # 1400
elif throughput > (1300 * 1000):
rate = RATES[4] # 1050
elif throughput > (1100 * 1000):
rate = RATES[3] # 750
elif throughput > (740 * 1000):
rate = RATES[2] # 560
elif throughput > (500 * 1000):
rate = RATES[1] # 375
else:
rate = RATES[0]
if force:
rate = RATES[-1]
# unix_timestamp;iface_name;bytes_out;bytes_in;bytes_total;packets_out;packets_in;packets_total;errors_out;errors_in
self.rate_file = open(self.video_rate_filename, 'a')
# video flow throughput is the running average bw
current_time = time.time()
self.rate_file.write("%f,video-flow-throughput,0,%d,0,0,0,0,0\n" % (current_time, throughput))
# Translate rate to bits per second
self.rate_file.write("%f,video-playback-rate,0,%d,0,0,0,0,0\n" % (current_time, (rate * 8)))
self.rate_file.close()
return rate
def real_print(self, msg):
print msg
sys.stdout.flush()
def debug_print(self, msg):
if self.debug:
self.real_print(msg)
def parse_args():
parser = ArgumentParser(description="Buffer sizing tests")
parser.add_argument('--server-ip',
action="store",
help="Max buffer size of network interface in packets",
required=True)
parser.add_argument('--dir', '-d',
dest="dir",
action="store",
help="Directory to store output",
required=True)
parser.add_argument('--buffer-size', '-b',
action="store",
help="Buffer size",
required=True,
type=int)
parser.add_argument('--force', '-f',
action="store",
help="Time to force the video rate to full",
required=False,
type=int)
parser.add_argument('--check-download-rates', help="Enable the source port filter (Default is dest port)", action='store_true', dest="dl_rates", default=False)
return parser.parse_args()
if __name__ == '__main__':
try:
args = parse_args()
Client(server_ip=args.server_ip, dir=args.dir, buffer_size=args.buffer_size, debug=False).run_forever()
except Exception as e:
print '----- error in client -----'
print e
sys.stdout.flush()
sys.stderr.flush()