Skip to content

Commit

Permalink
Adding graceful socket shutdown, fixing sent message clipping, fixing…
Browse files Browse the repository at this point in the history
… mutex error, reducing output message number
  • Loading branch information
grf14003 committed Mar 11, 2020
1 parent 1c27399 commit 781ee5a
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 38 deletions.
6 changes: 3 additions & 3 deletions bfs/implementations/breadcrumbs/algos/AlgoBreadcrumbs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

void AlgoBreadcrumbs::loop()
{
vector<Attribute>* attribs = pollForAttributes();
if (attribs->size() > 0)
vector<Attribute> attribs = pollForAttributes();
if (attribs.size() > 0)
{
printf("Attrib length: %d\n", attribs->size());
printf("Attrib length: %d\n", attribs.size());
}
_sleep(1000);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ int main()
return result;
}

return 1;
return 0;
}
2 changes: 1 addition & 1 deletion bfs/include/Algorithm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class Algorithm
virtual void loop() = 0;
virtual bool loopCondition() { return false; };

vector<Attribute>* pollForAttributes();
vector<Attribute> pollForAttributes();
private:
size_t numIoProcs = 0;
AlgorithmServer* server;
Expand Down
2 changes: 1 addition & 1 deletion bfs/include/AlgorithmServer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class AlgorithmServer
}

// Updates Algorithm key/value store with IO key/value updates
vector<Attribute>* getAllIncomingAttributes();
vector<Attribute> getAllIncomingAttributes();
void startServer();
int stopServer();

Expand Down
11 changes: 9 additions & 2 deletions bfs/include/DataSyncThread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <stdlib.h>
#include <stdio.h>
#include <vector>
#include <iostream>

#include "Attribute.hpp"
#include "CMakeConfig.h"
Expand All @@ -19,6 +20,8 @@
#pragma comment (lib, "Mswsock.lib")
#pragma comment (lib, "AdvApi32.lib")

using namespace std;

/*
This class contains the thread for communicating back and forth along a socket to sync attributes
Expand All @@ -29,7 +32,12 @@ class DataSyncThread
SOCKET sock;
HANDLE hThread = INVALID_HANDLE_VALUE;
DWORD dwThreadId;

// Flag for signalling this thread to terminate
bool continueThread = false;

// Variable changed only by the data sync thread itself for other threads to determine
// if this one is still running
bool threadRunning = false;

std::vector<Attribute> incomingAttributes;
Expand All @@ -41,7 +49,6 @@ class DataSyncThread
attribMutex = CreateMutex(NULL, false, NULL);
};


// Synchronous functions (only called from thread)
void threadRuntime();
bool readyToReceive(int interval = 1);
Expand All @@ -65,7 +72,7 @@ class DataSyncThread
bool isClientConnected() { return continueThread; };
unsigned int getSocketNumber() { return (unsigned int) socket; };
bool areIncomingAttributesAvailable();
std::vector<Attribute> *getIncomingAttributes();
vector<Attribute> getIncomingAttributes();
bool sendAttribute(Attribute attrib);
};

Expand Down
78 changes: 78 additions & 0 deletions bfs/scripts/log.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,81 @@ Could not acquire mutex to add new client thread
Listening for clients...
Hello new client!
Listening for clients...
Could not acquire mutex to add new client thread
Listening for clients...
Hello new client!
Listening for clients...
Could not acquire mutex to add new client thread
Listening for clients...
Hello new client!
Listening for clients...
Could not acquire mutex to add new client thread
Listening for clients...
Hello new client!
Listening for clients...
Listening for clients...
Hello new client!
Listening for clients...
Listening for clients...
Hello new client!
Listening for clients...
Listening for clients...
Listening for clients...
Hello new client!
Listening for clients...
Listening for clients...
Hello new client!
Listening for clients...
Listening for clients...
Hello new client!
Listening for clients...
Listening for clients...
Hello new client!
Listening for clients...
Listening for clients...
Hello new client!
Listening for clients...
Listening for clients...
Hello new client!
Listening for clients...
Listening for clients...
Hello new client!
Listening for clients...
Listening for clients...
Hello new client!
Listening for clients...
Hello new client!
Listening for clients...
Hello new client!
Listening for clients...
Hello new client!
Listening for clients...
Listening for clients...
Hello new client!
Listening for clients...
Listening for clients...
Hello new client!
Listening for clients...
Listening for clients...
Hello new client!
Listening for clients...
Listening for clients...
Hello new client!
Listening for clients...
Listening for clients...
Hello new client!
Listening for clients...
Listening for clients...
Hello new client!
Listening for clients...
Listening for clients...
Hello new client!
Listening for clients...
Hello new client!
Listening for clients...
Listening for clients...
Hello new client!
Listening for clients...
Listening for clients...
Hello new client!
Listening for clients...
2 changes: 1 addition & 1 deletion bfs/src/algos/Algorithm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Algorithm::~Algorithm()
delete server;
}

vector<Attribute>* Algorithm::pollForAttributes()
vector<Attribute> Algorithm::pollForAttributes()
{
return server->getAllIncomingAttributes();
}
9 changes: 4 additions & 5 deletions bfs/src/comms/AlgorithmServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,25 +138,24 @@ void AlgorithmServer::serverThreadRuntime()
return;
}

vector<Attribute>* AlgorithmServer::getAllIncomingAttributes()
vector<Attribute> AlgorithmServer::getAllIncomingAttributes()
{
/*
Polls DataSyncThreads for new updates
Updates master storage accordingly
*/
Logger* logger = getLogger();
vector<Attribute>* newAttribs = new vector<Attribute>;
vector<Attribute> newAttribs;
if (lockClientThreadsMutex() == STATUS_WAIT_0)
{
for (DataSyncThread* dst : clientThreads)
{
vector<Attribute>* toAdd = dst->getIncomingAttributes();
newAttribs->insert(newAttribs->end(), toAdd->begin(), toAdd->end());
vector<Attribute> toAdd = dst->getIncomingAttributes();
newAttribs.insert(newAttribs.end(), toAdd.begin(), toAdd.end());
}
unlockClientThreadsMutex();
}
else {
write_log("Could not acquire mutex to add new client thread\n", 1, logger->filename, logger);
printf("Could not acquire mutex to get incoming attributes.\n");
}
return newAttribs;
Expand Down
59 changes: 37 additions & 22 deletions bfs/src/comms/DataSyncThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
void DataSyncThread::threadRuntime()
{
threadRunning = true;
printf("Thread runtime: %d\n", incomingAttributes);
cout << "Starting DataSyncThread: " << this << endl;

/*
Handles the data sync between the other end of the socket
Expand All @@ -14,8 +14,6 @@ void DataSyncThread::threadRuntime()

while (continueThread && iResult > 0)
{
//write_log("Waiting to receive bytes\n", 1, logger->filename, logger);
printf("Waiting to receive bytes\n");
if (!readyToReceive())
continue;

Expand All @@ -27,13 +25,11 @@ void DataSyncThread::threadRuntime()
switch (commandByte)
{
case 0:
printf("Attribute update...\n");
bAttrib bAttr;
iResult = recvBytes(&bAttr, sizeof(bAttrib));
if (iResult <= 0)
break;

printf("Attribute update received, key:%.8s len:%x\n", bAttr.key, bAttr.length);
if (bAttr.length <= 0)
break;

Expand All @@ -42,10 +38,13 @@ void DataSyncThread::threadRuntime()
if (iResult <= 0)
break;

// Storing the attrib update
printf("Storing attrib update...\n");
addIncomingAttribute(*new Attribute(bAttr, value));
Attribute attr(bAttr, value);

// <DataSyncThreadAddress>: AttributeUpdate(<key>, <size>)
cout << this << ": AttributeUpdate(" << attr.getKey() << ", " << attr.getLength() << ")" << endl;

// Storing the attrib update
addIncomingAttribute(attr);
}
}

Expand All @@ -55,10 +54,23 @@ void DataSyncThread::threadRuntime()
attribMutex = NULL;
}

printf("Done running data sync thread!\n");
// Gracefully closing the socket:
shutdown(sock, SD_SEND);
char buf;
int result = recv(sock, &buf, 1, 0);
if (!result)
cout << "Graceful socket shutdown success!" << endl;
else if (result < 0)
cout << "Graceful socket shutdown failed! ERROR: " << result << endl;
else
cout << "Graceful socket shutdown failed! Still more data to read!" << endl;
closesocket(sock);

cout << "Stopping DataSyncThread: " << this << endl;
threadRunning = false;
}

// Returns 1 if it is the socket file descriptor is waiting for a read call, 0 otherwise
bool DataSyncThread::readyToReceive(int interval)
{
fd_set fds;
Expand All @@ -75,6 +87,7 @@ bool DataSyncThread::readyToReceive(int interval)
return result;
}

// Garentees receiving the given number of bytes
int DataSyncThread::recvBytes(void* buffer, size_t numBytes)
{
size_t bytesRecved = 0;
Expand All @@ -98,6 +111,7 @@ int DataSyncThread::recvBytes(void* buffer, size_t numBytes)
return numBytes;
}

// Garentees sending the given number of bytes
int DataSyncThread::sendBytes(char* buffer, size_t numBytes)
{
size_t bytesSent = 0;
Expand Down Expand Up @@ -129,7 +143,7 @@ void DataSyncThread::addIncomingAttribute(Attribute attrib)
ReleaseMutex(attribMutex);
}
else {
printf("1 Failed to acquire mutex, wait returned %x, error: %d\n", result, GetLastError());
printf("Failed to acquire mutex, wait returned %x, error: %d\n", result, GetLastError());
}
}

Expand All @@ -141,7 +155,6 @@ void DataSyncThread::startComms()
//write_log("Start comms: ", 1, logger->filename, logger);
//write_log(std::to_string(incomingAttributes), 1, logger->filename, logger);

printf("Start comms: %d\n", incomingAttributes);
if (!threadRunning)
{
threadRunning = false;
Expand All @@ -154,9 +167,8 @@ void DataSyncThread::startComms()
0, // use default creation flags
&dwThreadId); // returns the thread identifier

while (1)
if (threadRunning)
break;
// Waiting for the thread to start
while (! threadRunning);
}
}

Expand Down Expand Up @@ -198,7 +210,6 @@ int DataSyncThread::connectToAlgorithm(char* serverName)
iResult = getaddrinfo(serverName, ALGORITHM_SERVER_PORT, &hints, &result);
if (iResult != 0) {
printf("getaddrinfo failed with error: %d\n", iResult);
WSACleanup();
return 1;
}

Expand All @@ -210,7 +221,6 @@ int DataSyncThread::connectToAlgorithm(char* serverName)
ptr->ai_protocol);
if (sock == INVALID_SOCKET) {
printf("socket failed with error: %ld\n", WSAGetLastError());
WSACleanup();
return 1;
}

Expand All @@ -227,7 +237,6 @@ int DataSyncThread::connectToAlgorithm(char* serverName)

if (sock == INVALID_SOCKET) {
printf("Unable to connect to server!\n");
WSACleanup();
return 1;
}
else
Expand All @@ -236,25 +245,31 @@ int DataSyncThread::connectToAlgorithm(char* serverName)
}
}

std::vector<Attribute> *DataSyncThread::getIncomingAttributes()
vector<Attribute> DataSyncThread::getIncomingAttributes()
{
vector<Attribute>* newVector = new vector<Attribute>;
vector<Attribute> newAttribVector;
if (! areIncomingAttributesAvailable())
return newAttribVector;

DWORD result = WaitForSingleObject(attribMutex, INFINITE);
if (result == WAIT_OBJECT_0) {
for (Attribute attrib : incomingAttributes)
(*newVector).push_back(attrib);
{
newAttribVector.push_back(attrib);
}
ReleaseMutex(attribMutex);
}
else {
printf("2 Failed to acquire mutex, wait returned %x, error: %d\n", result, GetLastError());
printf("Failed to acquire mutex, wait returned %x, error: %d\n", result, GetLastError());
}

return newVector;
return newAttribVector;
}

bool DataSyncThread::areIncomingAttributesAvailable()
{
if (!threadRunning)
return 0;
return incomingAttributes.size() > 0;
}

Expand Down
Loading

0 comments on commit 781ee5a

Please sign in to comment.