From 781ee5a48d41a88ba9244f8196c441db574d5433 Mon Sep 17 00:00:00 2001 From: Greg Foss Date: Wed, 11 Mar 2020 17:25:10 -0400 Subject: [PATCH] Adding graceful socket shutdown, fixing sent message clipping, fixing mutex error, reducing output message number --- .../breadcrumbs/algos/AlgoBreadcrumbs.cpp | 6 +- .../gen/VirtualOutputIOProcessor.cpp | 2 +- bfs/include/Algorithm.hpp | 2 +- bfs/include/AlgorithmServer.hpp | 2 +- bfs/include/DataSyncThread.hpp | 11 ++- bfs/scripts/log.txt | 78 +++++++++++++++++++ bfs/src/algos/Algorithm.cpp | 2 +- bfs/src/comms/AlgorithmServer.cpp | 9 +-- bfs/src/comms/DataSyncThread.cpp | 59 ++++++++------ bfs/src/io_procs/IOProcessor.cpp | 3 +- bfs/src/template/IOProcessorTemplate.cpp.in | 2 +- 11 files changed, 138 insertions(+), 38 deletions(-) diff --git a/bfs/implementations/breadcrumbs/algos/AlgoBreadcrumbs.cpp b/bfs/implementations/breadcrumbs/algos/AlgoBreadcrumbs.cpp index 7bdba8f..953593a 100644 --- a/bfs/implementations/breadcrumbs/algos/AlgoBreadcrumbs.cpp +++ b/bfs/implementations/breadcrumbs/algos/AlgoBreadcrumbs.cpp @@ -4,10 +4,10 @@ void AlgoBreadcrumbs::loop() { - vector* attribs = pollForAttributes(); - if (attribs->size() > 0) + vector attribs = pollForAttributes(); + if (attribs.size() > 0) { - printf("Attrib length: %d\n", attribs->size()); + printf("Attrib length: %d\n", attribs.size()); } _sleep(1000); } diff --git a/bfs/implementations/breadcrumbs/gen/VirtualOutputIOProcessor.cpp b/bfs/implementations/breadcrumbs/gen/VirtualOutputIOProcessor.cpp index 526ba8a..3a81360 100644 --- a/bfs/implementations/breadcrumbs/gen/VirtualOutputIOProcessor.cpp +++ b/bfs/implementations/breadcrumbs/gen/VirtualOutputIOProcessor.cpp @@ -19,5 +19,5 @@ int main() return result; } - return 1; + return 0; } diff --git a/bfs/include/Algorithm.hpp b/bfs/include/Algorithm.hpp index 2d2b5e2..8745c27 100644 --- a/bfs/include/Algorithm.hpp +++ b/bfs/include/Algorithm.hpp @@ -13,7 +13,7 @@ class Algorithm virtual void loop() = 0; virtual bool loopCondition() { return false; }; - vector* pollForAttributes(); + vector pollForAttributes(); private: size_t numIoProcs = 0; AlgorithmServer* server; diff --git a/bfs/include/AlgorithmServer.hpp b/bfs/include/AlgorithmServer.hpp index ba48992..4928c52 100644 --- a/bfs/include/AlgorithmServer.hpp +++ b/bfs/include/AlgorithmServer.hpp @@ -36,7 +36,7 @@ class AlgorithmServer } // Updates Algorithm key/value store with IO key/value updates - vector* getAllIncomingAttributes(); + vector getAllIncomingAttributes(); void startServer(); int stopServer(); diff --git a/bfs/include/DataSyncThread.hpp b/bfs/include/DataSyncThread.hpp index 52ea144..d557f5f 100644 --- a/bfs/include/DataSyncThread.hpp +++ b/bfs/include/DataSyncThread.hpp @@ -10,6 +10,7 @@ #include #include #include +#include #include "Attribute.hpp" #include "CMakeConfig.h" @@ -19,6 +20,8 @@ #pragma comment (lib, "Mswsock.lib") #pragma comment (lib, "AdvApi32.lib") +using namespace std; + /* This class contains the thread for communicating back and forth along a socket to sync attributes @@ -29,7 +32,12 @@ class DataSyncThread SOCKET sock; HANDLE hThread = INVALID_HANDLE_VALUE; DWORD dwThreadId; + + // Flag for signalling this thread to terminate bool continueThread = false; + + // Variable changed only by the data sync thread itself for other threads to determine + // if this one is still running bool threadRunning = false; std::vector incomingAttributes; @@ -41,7 +49,6 @@ class DataSyncThread attribMutex = CreateMutex(NULL, false, NULL); }; - // Synchronous functions (only called from thread) void threadRuntime(); bool readyToReceive(int interval = 1); @@ -65,7 +72,7 @@ class DataSyncThread bool isClientConnected() { return continueThread; }; unsigned int getSocketNumber() { return (unsigned int) socket; }; bool areIncomingAttributesAvailable(); - std::vector *getIncomingAttributes(); + vector getIncomingAttributes(); bool sendAttribute(Attribute attrib); }; diff --git a/bfs/scripts/log.txt b/bfs/scripts/log.txt index 2c3741a..2a1ea89 100644 --- a/bfs/scripts/log.txt +++ b/bfs/scripts/log.txt @@ -127,3 +127,81 @@ Could not acquire mutex to add new client thread Listening for clients... Hello new client! Listening for clients... +Could not acquire mutex to add new client thread +Listening for clients... +Hello new client! +Listening for clients... +Could not acquire mutex to add new client thread +Listening for clients... +Hello new client! +Listening for clients... +Could not acquire mutex to add new client thread +Listening for clients... +Hello new client! +Listening for clients... +Listening for clients... +Hello new client! +Listening for clients... +Listening for clients... +Hello new client! +Listening for clients... +Listening for clients... +Listening for clients... +Hello new client! +Listening for clients... +Listening for clients... +Hello new client! +Listening for clients... +Listening for clients... +Hello new client! +Listening for clients... +Listening for clients... +Hello new client! +Listening for clients... +Listening for clients... +Hello new client! +Listening for clients... +Listening for clients... +Hello new client! +Listening for clients... +Listening for clients... +Hello new client! +Listening for clients... +Listening for clients... +Hello new client! +Listening for clients... +Hello new client! +Listening for clients... +Hello new client! +Listening for clients... +Hello new client! +Listening for clients... +Listening for clients... +Hello new client! +Listening for clients... +Listening for clients... +Hello new client! +Listening for clients... +Listening for clients... +Hello new client! +Listening for clients... +Listening for clients... +Hello new client! +Listening for clients... +Listening for clients... +Hello new client! +Listening for clients... +Listening for clients... +Hello new client! +Listening for clients... +Listening for clients... +Hello new client! +Listening for clients... +Hello new client! +Listening for clients... +Listening for clients... +Hello new client! +Listening for clients... +Listening for clients... +Hello new client! +Listening for clients... diff --git a/bfs/src/algos/Algorithm.cpp b/bfs/src/algos/Algorithm.cpp index 5f2be18..4e7991b 100644 --- a/bfs/src/algos/Algorithm.cpp +++ b/bfs/src/algos/Algorithm.cpp @@ -14,7 +14,7 @@ Algorithm::~Algorithm() delete server; } -vector* Algorithm::pollForAttributes() +vector Algorithm::pollForAttributes() { return server->getAllIncomingAttributes(); } diff --git a/bfs/src/comms/AlgorithmServer.cpp b/bfs/src/comms/AlgorithmServer.cpp index 3a81708..e6fb659 100644 --- a/bfs/src/comms/AlgorithmServer.cpp +++ b/bfs/src/comms/AlgorithmServer.cpp @@ -138,25 +138,24 @@ void AlgorithmServer::serverThreadRuntime() return; } -vector* AlgorithmServer::getAllIncomingAttributes() +vector AlgorithmServer::getAllIncomingAttributes() { /* Polls DataSyncThreads for new updates Updates master storage accordingly */ Logger* logger = getLogger(); - vector* newAttribs = new vector; + vector newAttribs; if (lockClientThreadsMutex() == STATUS_WAIT_0) { for (DataSyncThread* dst : clientThreads) { - vector* toAdd = dst->getIncomingAttributes(); - newAttribs->insert(newAttribs->end(), toAdd->begin(), toAdd->end()); + vector toAdd = dst->getIncomingAttributes(); + newAttribs.insert(newAttribs.end(), toAdd.begin(), toAdd.end()); } unlockClientThreadsMutex(); } else { - write_log("Could not acquire mutex to add new client thread\n", 1, logger->filename, logger); printf("Could not acquire mutex to get incoming attributes.\n"); } return newAttribs; diff --git a/bfs/src/comms/DataSyncThread.cpp b/bfs/src/comms/DataSyncThread.cpp index 2261bfa..9d22dec 100644 --- a/bfs/src/comms/DataSyncThread.cpp +++ b/bfs/src/comms/DataSyncThread.cpp @@ -5,7 +5,7 @@ void DataSyncThread::threadRuntime() { threadRunning = true; - printf("Thread runtime: %d\n", incomingAttributes); + cout << "Starting DataSyncThread: " << this << endl; /* Handles the data sync between the other end of the socket @@ -14,8 +14,6 @@ void DataSyncThread::threadRuntime() while (continueThread && iResult > 0) { - //write_log("Waiting to receive bytes\n", 1, logger->filename, logger); - printf("Waiting to receive bytes\n"); if (!readyToReceive()) continue; @@ -27,13 +25,11 @@ void DataSyncThread::threadRuntime() switch (commandByte) { case 0: - printf("Attribute update...\n"); bAttrib bAttr; iResult = recvBytes(&bAttr, sizeof(bAttrib)); if (iResult <= 0) break; - printf("Attribute update received, key:%.8s len:%x\n", bAttr.key, bAttr.length); if (bAttr.length <= 0) break; @@ -42,10 +38,13 @@ void DataSyncThread::threadRuntime() if (iResult <= 0) break; - // Storing the attrib update - printf("Storing attrib update...\n"); - addIncomingAttribute(*new Attribute(bAttr, value)); + Attribute attr(bAttr, value); + + // : AttributeUpdate(, ) + cout << this << ": AttributeUpdate(" << attr.getKey() << ", " << attr.getLength() << ")" << endl; + // Storing the attrib update + addIncomingAttribute(attr); } } @@ -55,10 +54,23 @@ void DataSyncThread::threadRuntime() attribMutex = NULL; } - printf("Done running data sync thread!\n"); + // Gracefully closing the socket: + shutdown(sock, SD_SEND); + char buf; + int result = recv(sock, &buf, 1, 0); + if (!result) + cout << "Graceful socket shutdown success!" << endl; + else if (result < 0) + cout << "Graceful socket shutdown failed! ERROR: " << result << endl; + else + cout << "Graceful socket shutdown failed! Still more data to read!" << endl; + closesocket(sock); + + cout << "Stopping DataSyncThread: " << this << endl; threadRunning = false; } +// Returns 1 if it is the socket file descriptor is waiting for a read call, 0 otherwise bool DataSyncThread::readyToReceive(int interval) { fd_set fds; @@ -75,6 +87,7 @@ bool DataSyncThread::readyToReceive(int interval) return result; } +// Garentees receiving the given number of bytes int DataSyncThread::recvBytes(void* buffer, size_t numBytes) { size_t bytesRecved = 0; @@ -98,6 +111,7 @@ int DataSyncThread::recvBytes(void* buffer, size_t numBytes) return numBytes; } +// Garentees sending the given number of bytes int DataSyncThread::sendBytes(char* buffer, size_t numBytes) { size_t bytesSent = 0; @@ -129,7 +143,7 @@ void DataSyncThread::addIncomingAttribute(Attribute attrib) ReleaseMutex(attribMutex); } else { - printf("1 Failed to acquire mutex, wait returned %x, error: %d\n", result, GetLastError()); + printf("Failed to acquire mutex, wait returned %x, error: %d\n", result, GetLastError()); } } @@ -141,7 +155,6 @@ void DataSyncThread::startComms() //write_log("Start comms: ", 1, logger->filename, logger); //write_log(std::to_string(incomingAttributes), 1, logger->filename, logger); - printf("Start comms: %d\n", incomingAttributes); if (!threadRunning) { threadRunning = false; @@ -154,9 +167,8 @@ void DataSyncThread::startComms() 0, // use default creation flags &dwThreadId); // returns the thread identifier - while (1) - if (threadRunning) - break; + // Waiting for the thread to start + while (! threadRunning); } } @@ -198,7 +210,6 @@ int DataSyncThread::connectToAlgorithm(char* serverName) iResult = getaddrinfo(serverName, ALGORITHM_SERVER_PORT, &hints, &result); if (iResult != 0) { printf("getaddrinfo failed with error: %d\n", iResult); - WSACleanup(); return 1; } @@ -210,7 +221,6 @@ int DataSyncThread::connectToAlgorithm(char* serverName) ptr->ai_protocol); if (sock == INVALID_SOCKET) { printf("socket failed with error: %ld\n", WSAGetLastError()); - WSACleanup(); return 1; } @@ -227,7 +237,6 @@ int DataSyncThread::connectToAlgorithm(char* serverName) if (sock == INVALID_SOCKET) { printf("Unable to connect to server!\n"); - WSACleanup(); return 1; } else @@ -236,25 +245,31 @@ int DataSyncThread::connectToAlgorithm(char* serverName) } } -std::vector *DataSyncThread::getIncomingAttributes() +vector DataSyncThread::getIncomingAttributes() { - vector* newVector = new vector; + vector newAttribVector; + if (! areIncomingAttributesAvailable()) + return newAttribVector; DWORD result = WaitForSingleObject(attribMutex, INFINITE); if (result == WAIT_OBJECT_0) { for (Attribute attrib : incomingAttributes) - (*newVector).push_back(attrib); + { + newAttribVector.push_back(attrib); + } ReleaseMutex(attribMutex); } else { - printf("2 Failed to acquire mutex, wait returned %x, error: %d\n", result, GetLastError()); + printf("Failed to acquire mutex, wait returned %x, error: %d\n", result, GetLastError()); } - return newVector; + return newAttribVector; } bool DataSyncThread::areIncomingAttributesAvailable() { + if (!threadRunning) + return 0; return incomingAttributes.size() > 0; } diff --git a/bfs/src/io_procs/IOProcessor.cpp b/bfs/src/io_procs/IOProcessor.cpp index 9c9c38c..771497b 100644 --- a/bfs/src/io_procs/IOProcessor.cpp +++ b/bfs/src/io_procs/IOProcessor.cpp @@ -17,12 +17,13 @@ int IOProcessor::init() int result = comms->connectToAlgorithm("localhost"); if (!result) comms->startComms(); + else + WSACleanup(); return result; } int IOProcessor::close() { int result = static_cast(comms->stopComms()); - WSACleanup(); return result; } diff --git a/bfs/src/template/IOProcessorTemplate.cpp.in b/bfs/src/template/IOProcessorTemplate.cpp.in index 0f7258d..6a3550f 100644 --- a/bfs/src/template/IOProcessorTemplate.cpp.in +++ b/bfs/src/template/IOProcessorTemplate.cpp.in @@ -19,5 +19,5 @@ int main() return result; } - return 1; + return 0; }