Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Generator-Server/sender/sender.go
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
161 lines (129 sloc)
3.86 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"generator-server/data-types" | |
"github.com/gorilla/websocket" | |
"math/rand" | |
"strconv" | |
"sync" | |
"time" | |
) | |
/* This file tries to make a websocket connection to your server, and then sends it 100 messages at varying intervals | |
* that request a response after a certain duration. Your server will read the messages, wait that amount of time, | |
* and then send a message back with the IDs of the messages it has waited for. | |
*/ | |
var globalwait sync.WaitGroup | |
func main() { | |
// Initialize the waitgroup that will keep this running until the senders are done | |
globalwait = sync.WaitGroup{} | |
// Create and start some senders | |
for i := 0; i < 5; i++ { | |
// Try to get a websocket connection | |
c, _, err := websocket.DefaultDialer.Dial("ws://localhost:3000/v1/ws", nil) | |
if err != nil { | |
fmt.Println(err.Error()) | |
return | |
} | |
// Name the senders 0-4 | |
sender := NewSender(strconv.Itoa(i), c) | |
sender.run() | |
time.Sleep(3 * time.Second) | |
} | |
// Wait on the waitgroup | |
globalwait.Wait() | |
} | |
// Struct to manage our data senders | |
type sender struct { | |
name string | |
conn *websocket.Conn | |
lock sync.RWMutex | |
check *checker | |
} | |
// Initialize and return a sender | |
func NewSender(n string, conn *websocket.Conn) *sender { | |
return &sender{name: n, conn: conn, lock: sync.RWMutex{}, check: NewChecker()} | |
} | |
// Wrapper around the async run routine that tells globalwait there's something going on and starts the sender | |
func (s *sender) run() { | |
globalwait.Add(1) | |
go s._run() | |
} | |
// main sender routine | |
func (s *sender) _run() { | |
// Need to wait on checker, then close the connection and tell globalwait we're done | |
defer func() { | |
s.check.wg.Wait() | |
s.conn.WriteMessage(websocket.CloseMessage, nil) | |
s.conn.Close() | |
globalwait.Done() | |
}() | |
// Start the listener for this sender | |
go s.listen() | |
// Change the seed so we don't get the same thing every time | |
rand.Seed(int64(time.Now().Unix())) | |
// Loop and send packets | |
for i := 0; i < 100; i++ { | |
// Get a random duration | |
d := rand.Intn(3) | |
// Send ID, duration, timestamp | |
ds := &data_types.DataStruct{ID: i, Duration: d, Stamp: int(time.Now().Unix())} | |
// Tell the sender to send the packet | |
go s.send(ds) | |
// Wait a random amount of time | |
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) | |
} | |
} | |
// Main thread for listening for responses | |
func (s *sender) listen() { | |
// initialize a response struct to unpack response into | |
var r data_types.ResponseStruct | |
// while loop -- repeat indefinitely | |
for { | |
err := s.conn.ReadJSON(&r) | |
if err != nil { | |
if websocket.IsUnexpectedCloseError(err) { | |
fmt.Printf("Sender %v lost reading connection: %v\n", s.name, err.Error()) | |
} | |
break | |
} | |
// Tell the checker we got a response | |
s.received(r.ID) | |
} | |
} | |
// type for our response checking management | |
type checker struct { | |
m map[int]*data_types.DataStruct | |
lock sync.RWMutex | |
wg sync.WaitGroup | |
} | |
// Method to get a new, empty checker | |
func NewChecker() *checker { | |
return &checker{m: make(map[int]*data_types.DataStruct), lock: sync.RWMutex{}, wg: sync.WaitGroup{}} | |
} | |
// Tell the checker we sent something | |
func (s *sender) send(ds *data_types.DataStruct) { | |
// Add to wait group | |
s.check.wg.Add(1) | |
// Send the message, lock to be safe | |
s.lock.Lock() | |
if err := s.conn.WriteJSON(ds); err != nil { | |
fmt.Printf("Sender %v error writing: %v", s.name, err.Error()) | |
} | |
s.lock.Unlock() | |
// Add to checker map, lock to be safe | |
s.check.lock.Lock() | |
s.check.m[ds.ID] = ds | |
s.check.lock.Unlock() | |
} | |
// Tell the checker we got something | |
func (s *sender) received(i int) { | |
// Grab the data entry and then delete it, locking for safe map access | |
s.check.lock.Lock() | |
ds := s.check.m[i] | |
delete(s.check.m, i) | |
s.check.lock.Unlock() | |
s.check.wg.Done() | |
// Check how long ago we sent this | |
diff := int(time.Now().Unix()) - ds.Stamp | |
fmt.Printf("sender %v received response for %v at %v seconds later than expected\n", s.name, i, diff-ds.Duration) | |
} |