Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Example exercise message sender and data types added
  • Loading branch information
wrr14001 committed Dec 7, 2018
1 parent 74ccb14 commit fa21dfe
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 9 deletions.
Binary file modified README.md
Binary file not shown.
15 changes: 15 additions & 0 deletions data-types/data-types.go
@@ -0,0 +1,15 @@
package data_types

// This file contains data types that are shared between the sender and the responder you will write.

// Struct to manage responses we get
type ResponseStruct struct {
ID int `json:"id"`
}

// Struct to manage data we're sending
type DataStruct struct {
ID int `json:"id"`
Duration int `json:"duration"`
Stamp int `json:"stamp"`
}
9 changes: 0 additions & 9 deletions hello.go

This file was deleted.

161 changes: 161 additions & 0 deletions sender/sender.go
@@ -0,0 +1,161 @@
package main

import (
"fmt"
"generator-server/data-types"
"github.com/gorilla/websocket"
"math/rand"
"strconv"
"sync"
"time"
)

/* This file tries to make a websocket connection to your server, and then sends it 100 messages at varying intervals
* that request a response after a certain duration. Your server will read the messages, wait that amount of time,
* and then send a message back with the IDs of the messages it has waited for.
*/

var globalwait sync.WaitGroup

func main() {
// Initialize the waitgroup that will keep this running until the senders are done
globalwait = sync.WaitGroup{}

// Create and start some senders
for i := 0; i < 5; i++ {
// Try to get a websocket connection
c, _, err := websocket.DefaultDialer.Dial("ws://localhost:3000/v1/ws", nil)
if err != nil {
fmt.Println(err.Error())
return
}

// Name the senders 0-4
sender := NewSender(strconv.Itoa(i), c)
sender.run()
time.Sleep(3 * time.Second)

}

// Wait on the waitgroup
globalwait.Wait()
}

// Struct to manage our data senders
type sender struct {
name string
conn *websocket.Conn
lock sync.RWMutex
check *checker
}

// Initialize and return a sender
func NewSender(n string, conn *websocket.Conn) *sender {
return &sender{name: n, conn: conn, lock: sync.RWMutex{}, check: NewChecker()}
}

// Wrapper around the async run routine that tells globalwait there's something going on and starts the sender
func (s *sender) run() {
globalwait.Add(1)
go s._run()
}

// main sender routine
func (s *sender) _run() {

// Need to wait on checker, then close the connection and tell globalwait we're done
defer func() {
s.check.wg.Wait()
s.conn.WriteMessage(websocket.CloseMessage, nil)
s.conn.Close()
globalwait.Done()
}()

// Start the listener for this sender
go s.listen()

// Change the seed so we don't get the same thing every time
rand.Seed(int64(time.Now().Unix()))

// Loop and send packets
for i := 0; i < 100; i++ {
// Get a random duration
d := rand.Intn(3)

// Send ID, duration, timestamp
ds := &data_types.DataStruct{ID: i, Duration: d, Stamp: int(time.Now().Unix())}

// Tell the sender to send the packet
go s.send(ds)

// Wait a random amount of time
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
}
}

// Main thread for listening for responses
func (s *sender) listen() {

// initialize a response struct to unpack response into
var r data_types.ResponseStruct

// while loop -- repeat indefinitely
for {
err := s.conn.ReadJSON(&r)
if err != nil {
if websocket.IsUnexpectedCloseError(err) {
fmt.Printf("Sender %v lost reading connection: %v\n", s.name, err.Error())
}
break
}

// Tell the checker we got a response
s.received(r.ID)
}
}

// type for our response checking management
type checker struct {
m map[int]*data_types.DataStruct
lock sync.RWMutex
wg sync.WaitGroup
}

// Method to get a new, empty checker
func NewChecker() *checker {
return &checker{m: make(map[int]*data_types.DataStruct), lock: sync.RWMutex{}, wg: sync.WaitGroup{}}
}

// Tell the checker we sent something
func (s *sender) send(ds *data_types.DataStruct) {

// Add to wait group
s.check.wg.Add(1)

// Send the message, lock to be safe
s.lock.Lock()
if err := s.conn.WriteJSON(ds); err != nil {
fmt.Printf("Sender %v error writing: %v", s.name, err.Error())
}
s.lock.Unlock()

// Add to checker map, lock to be safe
s.check.lock.Lock()
s.check.m[ds.ID] = ds
s.check.lock.Unlock()
}

// Tell the checker we got something
func (s *sender) received(i int) {

// Grab the data entry and then delete it, locking for safe map access
s.check.lock.Lock()
ds := s.check.m[i]
delete(s.check.m, i)
s.check.lock.Unlock()
s.check.wg.Done()

// Check how long ago we sent this
diff := int(time.Now().Unix()) - ds.Stamp
fmt.Printf("sender %v received response for %v at %v seconds later than expected\n", s.name, i, diff-ds.Duration)
}

0 comments on commit fa21dfe

Please sign in to comment.