diff --git a/breadcrumbs/include/Algorithm.hpp b/breadcrumbs/include/Algorithm.hpp index 8edd655..2d2b5e2 100644 --- a/breadcrumbs/include/Algorithm.hpp +++ b/breadcrumbs/include/Algorithm.hpp @@ -12,6 +12,8 @@ class Algorithm virtual void loop() = 0; virtual bool loopCondition() { return false; }; + + vector* pollForAttributes(); private: size_t numIoProcs = 0; AlgorithmServer* server; diff --git a/breadcrumbs/include/AlgorithmServer.hpp b/breadcrumbs/include/AlgorithmServer.hpp index 85520d0..ba48992 100644 --- a/breadcrumbs/include/AlgorithmServer.hpp +++ b/breadcrumbs/include/AlgorithmServer.hpp @@ -17,7 +17,6 @@ #include "CMakeConfig.h" #include "DataSyncThread.hpp" -#define ALGORITHM_PORT "10101" #define MAX_ACCEPT_FAILURES 5 #pragma comment (lib, "Ws2_32.lib") @@ -37,17 +36,23 @@ class AlgorithmServer } // Updates Algorithm key/value store with IO key/value updates - void pollForUpdates(); + vector* getAllIncomingAttributes(); void startServer(); - void stopServer(); + int stopServer(); + + DWORD lockClientThreadsMutex(); + void unlockClientThreadsMutex(); private: + SOCKET *ServerSocket = NULL; + HANDLE hThread; DWORD dwThreadId; - bool continueThread = false; + bool continueThread = true; size_t numClients; - std::vector clientThreads; + HANDLE clientThreadsMutex = NULL; + vector clientThreads; }; #endif diff --git a/breadcrumbs/include/Attribute.hpp b/breadcrumbs/include/Attribute.hpp index 717ae02..399af93 100644 --- a/breadcrumbs/include/Attribute.hpp +++ b/breadcrumbs/include/Attribute.hpp @@ -2,9 +2,42 @@ #ifndef ATTRIBUTE_HPP #define ATTRIBUTE_HPP +#include +#include + +#define ATTRIB_KEY_SIZE 8 + +using namespace std; + typedef struct BinaryAttributeStructure { - char key[8]; - unsigned char length[2]; + char key[ATTRIB_KEY_SIZE]; + char length; } bAttrib; +class Attribute +{ +private: + string key; + size_t length; + void* value; +public: + Attribute(BinaryAttributeStructure bin, void* value) { + key = ""; + for (int i = 0; i < ATTRIB_KEY_SIZE; i++) + key += bin.key[i]; + length = bin.length; + this->value = value; + } + + ~Attribute() { + + } + + // Getters and setters + string getKey() { return key; }; + size_t getLength() { return length; }; + void* getValue() { return value; }; + void* setValue(void* newValue) { void* old = value; value = newValue; return old; }; +}; + #endif \ No newline at end of file diff --git a/breadcrumbs/include/DataSyncThread.hpp b/breadcrumbs/include/DataSyncThread.hpp index ba17598..c8ed1b9 100644 --- a/breadcrumbs/include/DataSyncThread.hpp +++ b/breadcrumbs/include/DataSyncThread.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include "Attribute.hpp" #include "CMakeConfig.h" @@ -26,13 +27,20 @@ class DataSyncThread { private: SOCKET sock; - HANDLE hThread; + HANDLE hThread = INVALID_HANDLE_VALUE; DWORD dwThreadId; bool continueThread = false; + + std::vector incomingAttributes; + HANDLE attribMutex; public: - DataSyncThread(SOCKET s) { sock = s; }; + DataSyncThread(SOCKET s) + { + sock = s; + attribMutex = CreateMutex(NULL, false, NULL); + }; - // Synchronous functions + // Synchronous functions (only called from thread) void threadRuntime(); static DWORD threadInit(LPVOID pThreadArgs) { @@ -41,11 +49,17 @@ class DataSyncThread return NULL; } int recvBytes(void* buffer, size_t numBytes); + void addIncomingAttribute(Attribute attrib); - // Async control + // Async control (only called outside of thread) void startComms(); bool stopComms(); + bool isClientConnected() { return continueThread; }; + unsigned int getSocketNumber() { return (unsigned int) socket; }; int connectToAlgorithm(char* serverName); + bool areIncomingAttributesAvailable(); + std::vector *getIncomingAttributes(); + bool sendAttribute(Attribute attrib); }; #endif \ No newline at end of file diff --git a/breadcrumbs/src/Breadcrumbs.cpp b/breadcrumbs/src/Breadcrumbs.cpp index cec95a9..060accf 100644 --- a/breadcrumbs/src/Breadcrumbs.cpp +++ b/breadcrumbs/src/Breadcrumbs.cpp @@ -13,6 +13,7 @@ int main() while (algorithm->loopCondition()) { algorithm->loop(); + _sleep(1000); } return 0; diff --git a/breadcrumbs/src/algos/AlgoBreadcrumbs.cpp b/breadcrumbs/src/algos/AlgoBreadcrumbs.cpp index 7278eb5..04ef348 100644 --- a/breadcrumbs/src/algos/AlgoBreadcrumbs.cpp +++ b/breadcrumbs/src/algos/AlgoBreadcrumbs.cpp @@ -4,10 +4,15 @@ void AlgoBreadcrumbs::loop() { - printf("Breadcrumbs algorithm loop!\n"); + vector* attribs = pollForAttributes(); + // printf("Attrib length: %d\n", attribs->size()); + if (attribs->size() > 0) + { + printf("Attrib length: %d\n", attribs->size()); + } } bool AlgoBreadcrumbs::loopCondition() { - return --iterations >= 0; + return true; } diff --git a/breadcrumbs/src/algos/Algorithm.cpp b/breadcrumbs/src/algos/Algorithm.cpp index 7ae56b8..5f2be18 100644 --- a/breadcrumbs/src/algos/Algorithm.cpp +++ b/breadcrumbs/src/algos/Algorithm.cpp @@ -13,3 +13,8 @@ Algorithm::~Algorithm() { delete server; } + +vector* Algorithm::pollForAttributes() +{ + return server->getAllIncomingAttributes(); +} diff --git a/breadcrumbs/src/comms/AlgorithmServer.cpp b/breadcrumbs/src/comms/AlgorithmServer.cpp index 61f43db..84df34e 100644 --- a/breadcrumbs/src/comms/AlgorithmServer.cpp +++ b/breadcrumbs/src/comms/AlgorithmServer.cpp @@ -18,6 +18,7 @@ AlgorithmServer::~AlgorithmServer() void AlgorithmServer::serverThreadRuntime() { int iResult; + clientThreadsMutex = CreateMutex(NULL, false, NULL); SOCKET ListenSocket = INVALID_SOCKET; SOCKET ClientSocket = INVALID_SOCKET; @@ -49,6 +50,7 @@ void AlgorithmServer::serverThreadRuntime() WSACleanup(); return; } + ServerSocket = &ListenSocket; // Setup the TCP listening socket iResult = bind(ListenSocket, result->ai_addr, (int)result->ai_addrlen); @@ -69,66 +71,83 @@ void AlgorithmServer::serverThreadRuntime() return; } - // Select stuff - FD_SET readSet; - int AcceptFailures = 0; // Accept a client socket while (continueThread) { - int fds; - - FD_ZERO(&readSet); - FD_SET(ListenSocket, &readSet); - - if ((fds = select(0, &readSet, NULL, NULL, NULL)) == SOCKET_ERROR) - { - printf("select() returned with error %d\n", WSAGetLastError()); - break; + printf("Listening for clients...\n"); + ClientSocket = accept(ListenSocket, NULL, NULL); + printf("Hello new client!\n"); + if (ClientSocket == INVALID_SOCKET) { + printf("accept failed with error: %d\n", WSAGetLastError()); + AcceptFailures++; + if (AcceptFailures >= MAX_ACCEPT_FAILURES) + break; + } + else { + AcceptFailures = 0; } - if (fds != 0) + // Reaping any old data sync threads + if (lockClientThreadsMutex() == STATUS_WAIT_0) { - ClientSocket = accept(ListenSocket, NULL, NULL); - if (ClientSocket == INVALID_SOCKET) { - printf("accept failed with error: %d\n", WSAGetLastError()); - AcceptFailures++; - if (AcceptFailures >= MAX_ACCEPT_FAILURES) - break; - } - else { - AcceptFailures = 0; + auto it = clientThreads.begin(); + while (clientThreads.size() > 0 && it != clientThreads.end()) + { + if (!(**it).isClientConnected()) + { + clientThreads.erase(it); + } + else { + ++it; + } } // Creating a new Data sync thread if (clientThreads.size() < numClients) { - DataSyncThread client = *new DataSyncThread(ClientSocket); + DataSyncThread* client = new DataSyncThread(ClientSocket); clientThreads.push_back(client); - client.startComms(); + client->startComms(); } else { printf("Client attempted connection when (%d) clients are already connected", numClients); } + unlockClientThreadsMutex(); + } + else { + printf("Could not acquire mutex to add new client thread\n"); } } // cleanup closesocket(ListenSocket); WSACleanup(); + ServerSocket = NULL; continueThread = false; + CloseHandle(clientThreadsMutex); return; } -void AlgorithmServer::pollForUpdates() +vector* AlgorithmServer::getAllIncomingAttributes() { /* Polls DataSyncThreads for new updates Updates master storage accordingly */ - for (DataSyncThread dst : clientThreads) + vector* newAttribs = new vector; + if (lockClientThreadsMutex() == STATUS_WAIT_0) { - // TODO: Implement this! + for (DataSyncThread* dst : clientThreads) + { + vector* toAdd = dst->getIncomingAttributes(); + newAttribs->insert(newAttribs->end(), toAdd->begin(), toAdd->end()); + } + unlockClientThreadsMutex(); + } + else { + printf("Could not acquire mutex to get incoming attributes.\n"); } + return newAttribs; } void AlgorithmServer::startServer() @@ -153,7 +172,28 @@ void AlgorithmServer::startServer() &dwThreadId); // returns the thread identifier } -void AlgorithmServer::stopServer() +int AlgorithmServer::stopServer() { - continueThread = false; + if (continueThread && ServerSocket != NULL) { + continueThread = false; + return shutdown(*ServerSocket, SD_BOTH); + } + return 0; +} + +DWORD AlgorithmServer::lockClientThreadsMutex() +{ + if (clientThreadsMutex != NULL) + { + return WaitForSingleObject(clientThreadsMutex, INFINITE); + } + return -1; +} + +void AlgorithmServer::unlockClientThreadsMutex() +{ + if (clientThreadsMutex != NULL) + { + ReleaseMutex(clientThreadsMutex); + } } diff --git a/breadcrumbs/src/comms/DataSyncThread.cpp b/breadcrumbs/src/comms/DataSyncThread.cpp index a5efa72..851fb94 100644 --- a/breadcrumbs/src/comms/DataSyncThread.cpp +++ b/breadcrumbs/src/comms/DataSyncThread.cpp @@ -4,6 +4,9 @@ void DataSyncThread::threadRuntime() { + + printf("Thread runtime: %d\n", incomingAttributes); + /* Handles the data sync between the other end of the socket */ @@ -11,6 +14,9 @@ void DataSyncThread::threadRuntime() while (continueThread && iResult > 0) { + + printf("Waiting to receive bytes\n"); + // Master while loop that will receive incoming messages char commandByte; iResult = recvBytes(&commandByte, 1); @@ -20,22 +26,32 @@ void DataSyncThread::threadRuntime() switch (commandByte) { case 0: - bAttrib attrib; - iResult = recvBytes(&attrib, sizeof(bAttrib)); + printf("Attribute update...\n"); + bAttrib bAttr; + iResult = recvBytes(&bAttr, sizeof(bAttrib)); if (iResult <= 0) break; - printf("Attribute update received, key:%.8s len:%us\n", attrib.key, (unsigned short) attrib.length); - void* value = malloc((unsigned short)attrib.length); - iResult = recvBytes(value, (unsigned short)attrib.length); + + printf("Attribute update received, key:%.8s len:%x\n", bAttr.key, bAttr.length); + if (bAttr.length <= 0) + break; + + void* value = malloc(bAttr.length); + iResult = recvBytes(value, bAttr.length); if (iResult <= 0) break; - // TODO: Storing the attrib update - break; + // Storing the attrib update + addIncomingAttribute(*new Attribute(bAttr, value)); + } } continueThread = false; + if (attribMutex != NULL) { + CloseHandle(attribMutex); + attribMutex = NULL; + } } int DataSyncThread::recvBytes(void* buffer, size_t numBytes) @@ -61,11 +77,24 @@ int DataSyncThread::recvBytes(void* buffer, size_t numBytes) return numBytes; } +void DataSyncThread::addIncomingAttribute(Attribute attrib) +{ + DWORD result = WaitForSingleObject(attribMutex, INFINITE); + if (result == WAIT_OBJECT_0) { + incomingAttributes.push_back(attrib); + ReleaseMutex(attribMutex); + } + else { + printf("1 Failed to acquire mutex, wait returned %x, error: %d\n", result, GetLastError()); + } +} + void DataSyncThread::startComms() { /* Initializes the communication thread */ + printf("Start comms: %d\n", incomingAttributes); continueThread = true; hThread = CreateThread( NULL, // default security attributes @@ -142,3 +171,31 @@ int DataSyncThread::connectToAlgorithm(char* serverName) return 1; } } + +std::vector *DataSyncThread::getIncomingAttributes() +{ + vector* newVector = new vector; + + DWORD result = WaitForSingleObject(attribMutex, INFINITE); + if (result == WAIT_OBJECT_0) { + for (Attribute attrib : incomingAttributes) + (*newVector).push_back(attrib); + ReleaseMutex(attribMutex); + } + else { + printf("2 Failed to acquire mutex, wait returned %x, error: %d\n", result, GetLastError()); + } + + return newVector; +} + +bool DataSyncThread::areIncomingAttributesAvailable() +{ + return incomingAttributes.size() > 0; +} + +bool sendAttribute(Attribute attrib) +{ + // TODO: Implement this to send attribute in the format described in the recieve section. + return false; +}