diff --git a/README.md b/README.md index 863462b..42732fc 100644 Binary files a/README.md and b/README.md differ diff --git a/data-types/data-types.go b/data-types/data-types.go new file mode 100644 index 0000000..5f0982a --- /dev/null +++ b/data-types/data-types.go @@ -0,0 +1,15 @@ +package data_types + +// This file contains data types that are shared between the sender and the responder you will write. + +// Struct to manage responses we get +type ResponseStruct struct { + ID int `json:"id"` +} + +// Struct to manage data we're sending +type DataStruct struct { + ID int `json:"id"` + Duration int `json:"duration"` + Stamp int `json:"stamp"` +} diff --git a/hello.go b/hello.go deleted file mode 100644 index 17957ee..0000000 --- a/hello.go +++ /dev/null @@ -1,9 +0,0 @@ -package main - -import ( - "fmt" -) - -func main() { - fmt.Printf("hello, world\n") -} diff --git a/sender/sender.go b/sender/sender.go new file mode 100644 index 0000000..d4e4fd2 --- /dev/null +++ b/sender/sender.go @@ -0,0 +1,161 @@ +package main + +import ( + "fmt" + "generator-server/data-types" + "github.com/gorilla/websocket" + "math/rand" + "strconv" + "sync" + "time" +) + +/* This file tries to make a websocket connection to your server, and then sends it 100 messages at varying intervals + * that request a response after a certain duration. Your server will read the messages, wait that amount of time, + * and then send a message back with the IDs of the messages it has waited for. + */ + +var globalwait sync.WaitGroup + +func main() { + // Initialize the waitgroup that will keep this running until the senders are done + globalwait = sync.WaitGroup{} + + // Create and start some senders + for i := 0; i < 5; i++ { + // Try to get a websocket connection + c, _, err := websocket.DefaultDialer.Dial("ws://localhost:3000/v1/ws", nil) + if err != nil { + fmt.Println(err.Error()) + return + } + + // Name the senders 0-4 + sender := NewSender(strconv.Itoa(i), c) + sender.run() + time.Sleep(3 * time.Second) + + } + + // Wait on the waitgroup + globalwait.Wait() +} + +// Struct to manage our data senders +type sender struct { + name string + conn *websocket.Conn + lock sync.RWMutex + check *checker +} + +// Initialize and return a sender +func NewSender(n string, conn *websocket.Conn) *sender { + return &sender{name: n, conn: conn, lock: sync.RWMutex{}, check: NewChecker()} +} + +// Wrapper around the async run routine that tells globalwait there's something going on and starts the sender +func (s *sender) run() { + globalwait.Add(1) + go s._run() +} + +// main sender routine +func (s *sender) _run() { + + // Need to wait on checker, then close the connection and tell globalwait we're done + defer func() { + s.check.wg.Wait() + s.conn.WriteMessage(websocket.CloseMessage, nil) + s.conn.Close() + globalwait.Done() + }() + + // Start the listener for this sender + go s.listen() + + // Change the seed so we don't get the same thing every time + rand.Seed(int64(time.Now().Unix())) + + // Loop and send packets + for i := 0; i < 100; i++ { + // Get a random duration + d := rand.Intn(3) + + // Send ID, duration, timestamp + ds := &data_types.DataStruct{ID: i, Duration: d, Stamp: int(time.Now().Unix())} + + // Tell the sender to send the packet + go s.send(ds) + + // Wait a random amount of time + time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) + } +} + +// Main thread for listening for responses +func (s *sender) listen() { + + // initialize a response struct to unpack response into + var r data_types.ResponseStruct + + // while loop -- repeat indefinitely + for { + err := s.conn.ReadJSON(&r) + if err != nil { + if websocket.IsUnexpectedCloseError(err) { + fmt.Printf("Sender %v lost reading connection: %v\n", s.name, err.Error()) + } + break + } + + // Tell the checker we got a response + s.received(r.ID) + } +} + +// type for our response checking management +type checker struct { + m map[int]*data_types.DataStruct + lock sync.RWMutex + wg sync.WaitGroup +} + +// Method to get a new, empty checker +func NewChecker() *checker { + return &checker{m: make(map[int]*data_types.DataStruct), lock: sync.RWMutex{}, wg: sync.WaitGroup{}} +} + +// Tell the checker we sent something +func (s *sender) send(ds *data_types.DataStruct) { + + // Add to wait group + s.check.wg.Add(1) + + // Send the message, lock to be safe + s.lock.Lock() + if err := s.conn.WriteJSON(ds); err != nil { + fmt.Printf("Sender %v error writing: %v", s.name, err.Error()) + } + s.lock.Unlock() + + // Add to checker map, lock to be safe + s.check.lock.Lock() + s.check.m[ds.ID] = ds + s.check.lock.Unlock() +} + +// Tell the checker we got something +func (s *sender) received(i int) { + + // Grab the data entry and then delete it, locking for safe map access + s.check.lock.Lock() + ds := s.check.m[i] + delete(s.check.m, i) + s.check.lock.Unlock() + s.check.wg.Done() + + // Check how long ago we sent this + diff := int(time.Now().Unix()) - ds.Stamp + fmt.Printf("sender %v received response for %v at %v seconds later than expected\n", s.name, i, diff-ds.Duration) +}