From 7ddbee693350dedf46bdb2d5d5858d07a5732e62 Mon Sep 17 00:00:00 2001 From: Greg Foss Date: Thu, 16 Jan 2020 17:14:31 -0500 Subject: [PATCH] Finishing basic attribute update framework --- breadcrumbs/include/Algorithm.hpp | 2 + breadcrumbs/include/AlgorithmServer.hpp | 8 ++- breadcrumbs/include/DataSyncThread.hpp | 5 +- breadcrumbs/src/Breadcrumbs.cpp | 3 +- breadcrumbs/src/algos/AlgoBreadcrumbs.cpp | 9 ++- breadcrumbs/src/algos/Algorithm.cpp | 5 ++ breadcrumbs/src/comms/AlgorithmServer.cpp | 68 +++++++++++++++++------ breadcrumbs/src/comms/DataSyncThread.cpp | 21 ++++++- 8 files changed, 97 insertions(+), 24 deletions(-) diff --git a/breadcrumbs/include/Algorithm.hpp b/breadcrumbs/include/Algorithm.hpp index 8edd655..2d2b5e2 100644 --- a/breadcrumbs/include/Algorithm.hpp +++ b/breadcrumbs/include/Algorithm.hpp @@ -12,6 +12,8 @@ class Algorithm virtual void loop() = 0; virtual bool loopCondition() { return false; }; + + vector* pollForAttributes(); private: size_t numIoProcs = 0; AlgorithmServer* server; diff --git a/breadcrumbs/include/AlgorithmServer.hpp b/breadcrumbs/include/AlgorithmServer.hpp index 67c3731..ba48992 100644 --- a/breadcrumbs/include/AlgorithmServer.hpp +++ b/breadcrumbs/include/AlgorithmServer.hpp @@ -36,10 +36,13 @@ class AlgorithmServer } // Updates Algorithm key/value store with IO key/value updates - void pollForUpdates(); + vector* getAllIncomingAttributes(); void startServer(); int stopServer(); + DWORD lockClientThreadsMutex(); + void unlockClientThreadsMutex(); + private: SOCKET *ServerSocket = NULL; @@ -48,7 +51,8 @@ class AlgorithmServer bool continueThread = true; size_t numClients; - std::vector clientThreads; + HANDLE clientThreadsMutex = NULL; + vector clientThreads; }; #endif diff --git a/breadcrumbs/include/DataSyncThread.hpp b/breadcrumbs/include/DataSyncThread.hpp index 4c428da..c8ed1b9 100644 --- a/breadcrumbs/include/DataSyncThread.hpp +++ b/breadcrumbs/include/DataSyncThread.hpp @@ -31,12 +31,13 @@ class DataSyncThread DWORD dwThreadId; bool continueThread = false; - HANDLE attribMutex; std::vector incomingAttributes; + HANDLE attribMutex; public: DataSyncThread(SOCKET s) { sock = s; + attribMutex = CreateMutex(NULL, false, NULL); }; // Synchronous functions (only called from thread) @@ -56,7 +57,9 @@ class DataSyncThread bool isClientConnected() { return continueThread; }; unsigned int getSocketNumber() { return (unsigned int) socket; }; int connectToAlgorithm(char* serverName); + bool areIncomingAttributesAvailable(); std::vector *getIncomingAttributes(); + bool sendAttribute(Attribute attrib); }; #endif \ No newline at end of file diff --git a/breadcrumbs/src/Breadcrumbs.cpp b/breadcrumbs/src/Breadcrumbs.cpp index 7d20d0f..060accf 100644 --- a/breadcrumbs/src/Breadcrumbs.cpp +++ b/breadcrumbs/src/Breadcrumbs.cpp @@ -13,9 +13,8 @@ int main() while (algorithm->loopCondition()) { algorithm->loop(); + _sleep(1000); } - while (1); - return 0; } diff --git a/breadcrumbs/src/algos/AlgoBreadcrumbs.cpp b/breadcrumbs/src/algos/AlgoBreadcrumbs.cpp index 7278eb5..04ef348 100644 --- a/breadcrumbs/src/algos/AlgoBreadcrumbs.cpp +++ b/breadcrumbs/src/algos/AlgoBreadcrumbs.cpp @@ -4,10 +4,15 @@ void AlgoBreadcrumbs::loop() { - printf("Breadcrumbs algorithm loop!\n"); + vector* attribs = pollForAttributes(); + // printf("Attrib length: %d\n", attribs->size()); + if (attribs->size() > 0) + { + printf("Attrib length: %d\n", attribs->size()); + } } bool AlgoBreadcrumbs::loopCondition() { - return --iterations >= 0; + return true; } diff --git a/breadcrumbs/src/algos/Algorithm.cpp b/breadcrumbs/src/algos/Algorithm.cpp index 7ae56b8..5f2be18 100644 --- a/breadcrumbs/src/algos/Algorithm.cpp +++ b/breadcrumbs/src/algos/Algorithm.cpp @@ -13,3 +13,8 @@ Algorithm::~Algorithm() { delete server; } + +vector* Algorithm::pollForAttributes() +{ + return server->getAllIncomingAttributes(); +} diff --git a/breadcrumbs/src/comms/AlgorithmServer.cpp b/breadcrumbs/src/comms/AlgorithmServer.cpp index 7023659..84df34e 100644 --- a/breadcrumbs/src/comms/AlgorithmServer.cpp +++ b/breadcrumbs/src/comms/AlgorithmServer.cpp @@ -18,6 +18,7 @@ AlgorithmServer::~AlgorithmServer() void AlgorithmServer::serverThreadRuntime() { int iResult; + clientThreadsMutex = CreateMutex(NULL, false, NULL); SOCKET ListenSocket = INVALID_SOCKET; SOCKET ClientSocket = INVALID_SOCKET; @@ -88,26 +89,33 @@ void AlgorithmServer::serverThreadRuntime() } // Reaping any old data sync threads - auto it = clientThreads.begin(); - while (clientThreads.size() > 0 && it != clientThreads.end()) + if (lockClientThreadsMutex() == STATUS_WAIT_0) { - if (!(*it).isClientConnected()) + auto it = clientThreads.begin(); + while (clientThreads.size() > 0 && it != clientThreads.end()) { - clientThreads.erase(it); + if (!(**it).isClientConnected()) + { + clientThreads.erase(it); + } + else { + ++it; + } + } + + // Creating a new Data sync thread + if (clientThreads.size() < numClients) { + DataSyncThread* client = new DataSyncThread(ClientSocket); + clientThreads.push_back(client); + client->startComms(); } else { - ++it; + printf("Client attempted connection when (%d) clients are already connected", numClients); } - } - - // Creating a new Data sync thread - if (clientThreads.size() < numClients) { - DataSyncThread client = *new DataSyncThread(ClientSocket); - clientThreads.push_back(client); - client.startComms(); + unlockClientThreadsMutex(); } else { - printf("Client attempted connection when (%d) clients are already connected", numClients); + printf("Could not acquire mutex to add new client thread\n"); } } @@ -116,19 +124,30 @@ void AlgorithmServer::serverThreadRuntime() WSACleanup(); ServerSocket = NULL; continueThread = false; + CloseHandle(clientThreadsMutex); return; } -void AlgorithmServer::pollForUpdates() +vector* AlgorithmServer::getAllIncomingAttributes() { /* Polls DataSyncThreads for new updates Updates master storage accordingly */ - for (DataSyncThread dst : clientThreads) + vector* newAttribs = new vector; + if (lockClientThreadsMutex() == STATUS_WAIT_0) { - // TODO: Implement this! + for (DataSyncThread* dst : clientThreads) + { + vector* toAdd = dst->getIncomingAttributes(); + newAttribs->insert(newAttribs->end(), toAdd->begin(), toAdd->end()); + } + unlockClientThreadsMutex(); + } + else { + printf("Could not acquire mutex to get incoming attributes.\n"); } + return newAttribs; } void AlgorithmServer::startServer() @@ -161,3 +180,20 @@ int AlgorithmServer::stopServer() } return 0; } + +DWORD AlgorithmServer::lockClientThreadsMutex() +{ + if (clientThreadsMutex != NULL) + { + return WaitForSingleObject(clientThreadsMutex, INFINITE); + } + return -1; +} + +void AlgorithmServer::unlockClientThreadsMutex() +{ + if (clientThreadsMutex != NULL) + { + ReleaseMutex(clientThreadsMutex); + } +} diff --git a/breadcrumbs/src/comms/DataSyncThread.cpp b/breadcrumbs/src/comms/DataSyncThread.cpp index a2144ac..851fb94 100644 --- a/breadcrumbs/src/comms/DataSyncThread.cpp +++ b/breadcrumbs/src/comms/DataSyncThread.cpp @@ -4,7 +4,8 @@ void DataSyncThread::threadRuntime() { - attribMutex = CreateMutex(NULL, false, NULL); + + printf("Thread runtime: %d\n", incomingAttributes); /* Handles the data sync between the other end of the socket @@ -83,6 +84,9 @@ void DataSyncThread::addIncomingAttribute(Attribute attrib) incomingAttributes.push_back(attrib); ReleaseMutex(attribMutex); } + else { + printf("1 Failed to acquire mutex, wait returned %x, error: %d\n", result, GetLastError()); + } } void DataSyncThread::startComms() @@ -90,6 +94,7 @@ void DataSyncThread::startComms() /* Initializes the communication thread */ + printf("Start comms: %d\n", incomingAttributes); continueThread = true; hThread = CreateThread( NULL, // default security attributes @@ -177,6 +182,20 @@ std::vector *DataSyncThread::getIncomingAttributes() (*newVector).push_back(attrib); ReleaseMutex(attribMutex); } + else { + printf("2 Failed to acquire mutex, wait returned %x, error: %d\n", result, GetLastError()); + } return newVector; } + +bool DataSyncThread::areIncomingAttributesAvailable() +{ + return incomingAttributes.size() > 0; +} + +bool sendAttribute(Attribute attrib) +{ + // TODO: Implement this to send attribute in the format described in the recieve section. + return false; +}