From 94cc09c9877289452d501a681955786ce83d259e Mon Sep 17 00:00:00 2001 From: Greg Foss Date: Sun, 1 Dec 2019 17:39:56 -0500 Subject: [PATCH 1/5] First draft adding client code, skeleton of server code. --- breadcrumbs/include/Algorithm.hpp | 46 +++++-- breadcrumbs/include/AlgorithmServer.hpp | 7 ++ breadcrumbs/include/Attribute.hpp | 10 ++ breadcrumbs/include/DataSyncThread.hpp | 49 ++++++++ breadcrumbs/include/IOProcessor.hpp | 36 ++---- breadcrumbs/scripts/startbfs.py | 24 ++++ breadcrumbs/src/Breadcrumbs.cpp | 9 +- breadcrumbs/src/algos/Algorithm.cpp | 55 +++++---- breadcrumbs/src/comms/AlgorithmServer.cpp | 0 breadcrumbs/src/comms/DataSyncThread.cpp | 143 ++++++++++++++++++++++ breadcrumbs/src/io/IOProcessor.cpp | 39 ------ 11 files changed, 305 insertions(+), 113 deletions(-) create mode 100644 breadcrumbs/include/AlgorithmServer.hpp create mode 100644 breadcrumbs/include/Attribute.hpp create mode 100644 breadcrumbs/include/DataSyncThread.hpp create mode 100644 breadcrumbs/scripts/startbfs.py create mode 100644 breadcrumbs/src/comms/AlgorithmServer.cpp create mode 100644 breadcrumbs/src/comms/DataSyncThread.cpp diff --git a/breadcrumbs/include/Algorithm.hpp b/breadcrumbs/include/Algorithm.hpp index 43b92ce..9cea37f 100644 --- a/breadcrumbs/include/Algorithm.hpp +++ b/breadcrumbs/include/Algorithm.hpp @@ -2,27 +2,47 @@ #ifndef ALGORITHM_HPP #define ALGORITHM_HPP -#include "IOProcessor.hpp" +#include +#include +#include +#include +#include +#include + +#include "DataSyncThread.hpp" + +// Need to link with Ws2_32.lib +#pragma comment (lib, "Ws2_32.lib") + +#define DEFAULT_PORT 10101 class Algorithm { public: - explicit Algorithm(IOProcessor* ins, SIZE_T num_inputs, IOProcessor* outs, SIZE_T num_outputs); + explicit Algorithm(size_t numProcs); - virtual VOID loop() = 0; - virtual BOOLEAN loopCondition() { return FALSE; }; + virtual void loop() = 0; + virtual bool loopCondition() { return false; }; - VOID startIOProcessors(); - VOID waitForIOProcessors(); - VOID stopIOProcessors(); + void serverThreadRuntime(); + static DWORD serverThreadInit(LPVOID pThreadArgs) + { + Algorithm* pAlgorithmThread = (Algorithm*)pThreadArgs; + pAlgorithmThread->serverThreadRuntime(); + return NULL; + } - // Updates Algorithm key/value store with IO key/value stores - VOID pollInputs(); - // Updates IO key/value stores with Algorithm key/value store - VOID pollOutputs(); + // Updates Algorithm key/value store with IO key/value updates + void pollForUpdates(); private: - SIZE_T numInputs, numOutputs; - IOProcessor* inputs, *outputs; + HANDLE hThread; + DWORD dwThreadId; + bool continueThread = false; + + size_t numIO; + std::vector ioComms; + + void startServer(); }; #endif diff --git a/breadcrumbs/include/AlgorithmServer.hpp b/breadcrumbs/include/AlgorithmServer.hpp new file mode 100644 index 0000000..b8fe319 --- /dev/null +++ b/breadcrumbs/include/AlgorithmServer.hpp @@ -0,0 +1,7 @@ + +#ifndef ALGORITHM_SERVER_HPP +#define ALGORITHM_SERVER_HPP + +#define ALGORITHM_PORT "10101" + +#endif diff --git a/breadcrumbs/include/Attribute.hpp b/breadcrumbs/include/Attribute.hpp new file mode 100644 index 0000000..717ae02 --- /dev/null +++ b/breadcrumbs/include/Attribute.hpp @@ -0,0 +1,10 @@ + +#ifndef ATTRIBUTE_HPP +#define ATTRIBUTE_HPP + +typedef struct BinaryAttributeStructure { + char key[8]; + unsigned char length[2]; +} bAttrib; + +#endif \ No newline at end of file diff --git a/breadcrumbs/include/DataSyncThread.hpp b/breadcrumbs/include/DataSyncThread.hpp new file mode 100644 index 0000000..da859f9 --- /dev/null +++ b/breadcrumbs/include/DataSyncThread.hpp @@ -0,0 +1,49 @@ + +#ifndef DATA_SYNC_THREAD_HPP +#define DATA_SYNC_THREAD_HPP + +#include +#include +#include +#include +#include + +#include "Attribute.hpp" +#include "AlgorithmServer.hpp" + +// Need to link with Ws2_32.lib, Mswsock.lib, and Advapi32.lib +#pragma comment (lib, "Ws2_32.lib") +#pragma comment (lib, "Mswsock.lib") +#pragma comment (lib, "AdvApi32.lib") + +/* +This class contains the thread for communicating back and forth along a socket to sync attributes + +*/ +class DataSyncThread +{ +private: + SOCKET sock; + HANDLE hThread; + DWORD dwThreadId; + bool continueThread = false; +public: + DataSyncThread(SOCKET s) { sock = s; }; + + // Synchronous functions + void threadRuntime(); + static DWORD threadInit(LPVOID pThreadArgs) + { + DataSyncThread* pDataSyncThread = (DataSyncThread*)pThreadArgs; + pDataSyncThread->threadRuntime(); + return NULL; + } + int recvBytes(void* buffer, size_t numBytes); + + // Async control + void startComms(); + bool stopComms(); + int connectToAlgorithm(char* serverName); +}; + +#endif \ No newline at end of file diff --git a/breadcrumbs/include/IOProcessor.hpp b/breadcrumbs/include/IOProcessor.hpp index f7a2cce..00fead2 100644 --- a/breadcrumbs/include/IOProcessor.hpp +++ b/breadcrumbs/include/IOProcessor.hpp @@ -1,41 +1,25 @@ -/* - * - * Stores the interface for building a sensor thread - * implementation. - * - */ - -#ifndef SENSOR_INTERFACE_HPP -#define SENSOR_INTERFACE_HPP +#ifndef IO_PROCESSOR_HPP +#define IO_PROCESSOR_HPP #include #include +/* +This class is the class that abstracts the io processor side of the data exchange between +IO processors and algorithms + +*/ class IOProcessor { public: - IOProcessor(LPCVOID pThreadArgs) { threadArgs = pThreadArgs; }; - - // Thread routine for implementation to override - virtual VOID threadRuntime(IOProcessor *ioProc) = 0; - // Thread initialization, should not be called directly! - static DWORD threadInit(LPVOID pThreadArgs); + IOProcessor() {}; // Async control - UINT8 startThread(); - BOOL waitForThread(); - - // Sync control - BOOLEAN bufferDataAvailable(); - SIZE_T getBufferData(LPCSTR* bufferKeyArray, LPCSTR* bufferValueArray); - VOID setDataStoreValue(LPCSTR key, LPCVOID value, SIZE_T valueSize); -protected: - LPCVOID threadArgs; - DWORD dwThreadId; - HANDLE hThread; + unsigned int startComms(); + bool stopComms(); }; diff --git a/breadcrumbs/scripts/startbfs.py b/breadcrumbs/scripts/startbfs.py new file mode 100644 index 0000000..618e877 --- /dev/null +++ b/breadcrumbs/scripts/startbfs.py @@ -0,0 +1,24 @@ + +import os +import multiprocessing as mp +import subprocess + + +def start_program(command): + cmd_list = command.split(" ") + return subprocess.call(cmd_list) + + +def start_program_no_hang(command): + print("Running %s from cwd %s" % (command, os.getcwd())) + proc = mp.Process(target=start_program, args=(command,)) + proc.start() + return proc + + +def main(): + start_program_no_hang("start cmd.exe /k \"..\\bin\\Breadcrumbs.exe\"") + + +if __name__ == "__main__": + main() diff --git a/breadcrumbs/src/Breadcrumbs.cpp b/breadcrumbs/src/Breadcrumbs.cpp index 71f5809..97b94e9 100644 --- a/breadcrumbs/src/Breadcrumbs.cpp +++ b/breadcrumbs/src/Breadcrumbs.cpp @@ -7,14 +7,7 @@ int main() { - - // Initialization - int threadID = 0; - IOProcessor *processor = new VirtualOutputProcessor((LPCVOID) &threadID); - Algorithm* algorithm = new AlgoBreadcrumbs(NULL, 0, processor, 1); - - // Starting - algorithm->startIOProcessors(); + Algorithm* algorithm = new AlgoBreadcrumbs(NULL, 0, 1); // Loop while (algorithm->loopCondition()) diff --git a/breadcrumbs/src/algos/Algorithm.cpp b/breadcrumbs/src/algos/Algorithm.cpp index 5487e55..1448ae0 100644 --- a/breadcrumbs/src/algos/Algorithm.cpp +++ b/breadcrumbs/src/algos/Algorithm.cpp @@ -1,41 +1,42 @@ #include "Algorithm.hpp" -Algorithm::Algorithm(IOProcessor* ins, SIZE_T numIns, IOProcessor* outs, SIZE_T numOuts) +Algorithm::Algorithm(size_t numProcs) { - numInputs = numIns; numOutputs = numOuts; - inputs = ins; outputs = outs; + numIO = numProcs; + ioComms.reserve(numIO); + + startServer(); } -VOID Algorithm::startIOProcessors() +void Algorithm::pollForUpdates() { - int i; - for (i = 0; i < numInputs; i++) - (inputs + i)->startThread(); - for (i = 0; i < numOutputs; i++) - (outputs + i)->startThread(); + /* + Polls DataSyncThreads for new updates + Updates master storage accordingly + */ + for (DataSyncThread dst : ioComms) + { + // TODO: Implement this! + } } -VOID Algorithm::waitForIOProcessors() +void Algorithm::startServer() { - int i; - for (i = 0; i < numInputs; i++) - (inputs + i)->waitForThread(); - for (i = 0; i < numOutputs; i++) - (outputs + i)->waitForThread(); + /* + Starts the server socket thread + */ + continueThread = true; + hThread = CreateThread( + NULL, // default security attributes + 0, // use default stack size + serverThreadInit, // thread function name + this, // argument to thread function + 0, // use default creation flags + &dwThreadId); // returns the thread identifier } -VOID Algorithm::stopIOProcessors() +void Algorithm::serverThreadRuntime() { - waitForIOProcessors(); -} - -VOID Algorithm::pollInputs() -{ - -} - -VOID Algorithm::pollOutputs() -{ - + } diff --git a/breadcrumbs/src/comms/AlgorithmServer.cpp b/breadcrumbs/src/comms/AlgorithmServer.cpp new file mode 100644 index 0000000..e69de29 diff --git a/breadcrumbs/src/comms/DataSyncThread.cpp b/breadcrumbs/src/comms/DataSyncThread.cpp new file mode 100644 index 0000000..d0eb563 --- /dev/null +++ b/breadcrumbs/src/comms/DataSyncThread.cpp @@ -0,0 +1,143 @@ + +#include "DataSyncThread.hpp" + + +void DataSyncThread::threadRuntime() +{ + /* + Handles the data sync between the other end of the socket + */ + int iResult; + + while (continueThread && iResult > 0) + { + // Master while loop that will receive incoming messages + char commandByte; + iResult = recvBytes(&commandByte, 1); + if (iResult <= 0) + break; + + switch (commandByte) + { + case 0: + bAttrib attrib; + iResult = recvBytes(&attrib, sizeof(bAttrib)); + if (iResult <= 0) + break; + printf("Attribute update received, key:%.8s len:%us\n", attrib.key, attrib.length); + void* value = malloc((unsigned short)attrib.length); + iResult = recvBytes(value, (unsigned short)attrib.length); + if (iResult <= 0) + break; + // TODO: Storing the attrib update + + break; + } + } + + continueThread = false; +} + +int DataSyncThread::recvBytes(void* buffer, size_t numBytes) +{ + size_t bytesRecved = 0; + int iResult; + + while (bytesRecved < numBytes) + { + iResult = recv(sock, ((char*) buffer) + bytesRecved, numBytes - bytesRecved, 0); + if (iResult < 0) + { + printf("recv failed with error: %d\n", WSAGetLastError()); + return iResult; + } + else if (iResult == 0) { + printf("recv returned 0, connection closed.\n"); + return iResult; + } + else + bytesRecved += iResult; + } + return numBytes; +} + +void DataSyncThread::startComms() +{ + /* + Initializes the communication thread + */ + continueThread = true; + hThread = CreateThread( + NULL, // default security attributes + 0, // use default stack size + threadInit, // thread function name + this, // argument to thread function + 0, // use default creation flags + &dwThreadId); // returns the thread identifier +} + +bool DataSyncThread::stopComms() +{ + /* + Stops the communication thread + */ + continueThread = false; +} + +int DataSyncThread::connectToAlgorithm(char* serverName) +{ + sock = INVALID_SOCKET; + WSADATA wsaData; + struct addrinfo* result = NULL,* ptr = NULL, hints; + int iResult; + + // Initialize Winsock + iResult = WSAStartup(MAKEWORD(2, 2), &wsaData); + if (iResult != 0) { + printf("WSAStartup failed with error: %d\n", iResult); + return 1; + } + + ZeroMemory(&hints, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + + // Resolve the server address and port + iResult = getaddrinfo(serverName, ALGORITHM_PORT, &hints, &result); + if (iResult != 0) { + printf("getaddrinfo failed with error: %d\n", iResult); + WSACleanup(); + return 1; + } + + // Attempt to connect to an address until one succeeds + for (ptr = result; ptr != NULL; ptr = ptr->ai_next) { + + // Create a SOCKET for connecting to server + sock = socket(ptr->ai_family, ptr->ai_socktype, + ptr->ai_protocol); + if (sock == INVALID_SOCKET) { + printf("socket failed with error: %ld\n", WSAGetLastError()); + WSACleanup(); + return 1; + } + + // Connect to server. + iResult = connect(sock, ptr->ai_addr, (int)ptr->ai_addrlen); + if (iResult == SOCKET_ERROR) { + closesocket(sock); + sock = INVALID_SOCKET; + continue; + } + break; + } + + freeaddrinfo(result); + + if (sock == INVALID_SOCKET) { + printf("Unable to connect to server!\n"); + WSACleanup(); + return 1; + } +} diff --git a/breadcrumbs/src/io/IOProcessor.cpp b/breadcrumbs/src/io/IOProcessor.cpp index 24c70cd..437de23 100644 --- a/breadcrumbs/src/io/IOProcessor.cpp +++ b/breadcrumbs/src/io/IOProcessor.cpp @@ -2,42 +2,3 @@ #include "IOProcessor.hpp" -DWORD IOProcessor::threadInit(LPVOID pThreadArgs) -{ - IOProcessor* pIOProcessor = (IOProcessor*)pThreadArgs; - pIOProcessor->threadRuntime(pIOProcessor); - return 0; -} - -UINT8 IOProcessor::startThread() -{ - hThread = CreateThread( - NULL, // default security attributes - 0, // use default stack size - threadInit, // thread function name - this, // argument to thread function - 0, // use default creation flags - &dwThreadId); // returns the thread identifier - return TRUE; -} - -BOOL IOProcessor::waitForThread() -{ - WaitForSingleObject(hThread, INFINITE); - return TRUE; -} - -BOOLEAN bufferDataAvailable() -{ - return FALSE; -} - -SIZE_T getBufferData(LPCSTR* bufferKeyArray, LPCSTR* bufferValueArray) -{ - return 0; -} - -VOID setDataStoreValue(LPCSTR key, LPCVOID value, SIZE_T valueSize) -{ - -} From 0c24eb91a8dd0af636a1bb35e74eb22fb2e80dd0 Mon Sep 17 00:00:00 2001 From: Greg Foss Date: Thu, 5 Dec 2019 12:48:34 -0500 Subject: [PATCH 2/5] Updating working branch with newest commit --- breadcrumbs/CMakeConfig.h.in | 6 +- breadcrumbs/CMakeLists.txt | 2 + breadcrumbs/include/AlgoBreadcrumbs.hpp | 4 +- breadcrumbs/include/Algorithm.hpp | 36 +--- breadcrumbs/include/AlgorithmServer.hpp | 46 +++++ breadcrumbs/include/CMakeConfig.h | 6 +- breadcrumbs/include/DataSyncThread.hpp | 4 +- .../include/VirtualOutputProcessor.hpp | 2 +- breadcrumbs/src/Breadcrumbs.cpp | 8 +- breadcrumbs/src/algos/AlgoBreadcrumbs.cpp | 4 +- breadcrumbs/src/algos/Algorithm.cpp | 37 +--- breadcrumbs/src/comms/AlgorithmServer.cpp | 159 ++++++++++++++++++ breadcrumbs/src/comms/DataSyncThread.cpp | 7 +- .../io/out_procs/VirtualOutputProcessor.cpp | 4 +- 14 files changed, 239 insertions(+), 86 deletions(-) diff --git a/breadcrumbs/CMakeConfig.h.in b/breadcrumbs/CMakeConfig.h.in index e47bb02..be22a44 100644 --- a/breadcrumbs/CMakeConfig.h.in +++ b/breadcrumbs/CMakeConfig.h.in @@ -1,4 +1,6 @@ // the configured options and settings for Tutorial -#define Tutorial_VERSION_MAJOR @Bfs_VERSION_MAJOR@ -#define Tutorial_VERSION_MINOR @Bfs_VERSION_MINOR@ \ No newline at end of file +#define VERSION_MAJOR @Bfs_VERSION_MAJOR@ +#define VERSION_MINOR @Bfs_VERSION_MINOR@ + +#define ALGORITHM_SERVER_PORT @Bfs_ALGORITHM_SERVER_PORT@ \ No newline at end of file diff --git a/breadcrumbs/CMakeLists.txt b/breadcrumbs/CMakeLists.txt index d214ccf..94f900a 100644 --- a/breadcrumbs/CMakeLists.txt +++ b/breadcrumbs/CMakeLists.txt @@ -26,6 +26,8 @@ message("Library directory: ${CMAKE_LIBRARY_OUTPUT_DIRECTORY}") message("Header file directory: ${INCLUDES_DIRECTORY}") # Configure a header file to pass some of the CMake settings to the source code +set (Bfs_ALGORITHM_SERVER_PORT \"27634\") + configure_file ( "${PROJECT_SOURCE_DIR}/CMakeConfig.h.in" "${CMAKE_INCLUDE_PATH}/CMakeConfig.h" diff --git a/breadcrumbs/include/AlgoBreadcrumbs.hpp b/breadcrumbs/include/AlgoBreadcrumbs.hpp index e9a0995..3502ff5 100644 --- a/breadcrumbs/include/AlgoBreadcrumbs.hpp +++ b/breadcrumbs/include/AlgoBreadcrumbs.hpp @@ -9,8 +9,8 @@ class AlgoBreadcrumbs : public Algorithm public: using Algorithm::Algorithm; - VOID loop(); - BOOLEAN loopCondition(); + void loop(); + bool loopCondition(); private: INT iterations = 1; }; diff --git a/breadcrumbs/include/Algorithm.hpp b/breadcrumbs/include/Algorithm.hpp index 9cea37f..8edd655 100644 --- a/breadcrumbs/include/Algorithm.hpp +++ b/breadcrumbs/include/Algorithm.hpp @@ -2,47 +2,19 @@ #ifndef ALGORITHM_HPP #define ALGORITHM_HPP -#include -#include -#include -#include -#include -#include - -#include "DataSyncThread.hpp" - -// Need to link with Ws2_32.lib -#pragma comment (lib, "Ws2_32.lib") - -#define DEFAULT_PORT 10101 +#include "AlgorithmServer.hpp" class Algorithm { public: explicit Algorithm(size_t numProcs); + ~Algorithm(); virtual void loop() = 0; virtual bool loopCondition() { return false; }; - - void serverThreadRuntime(); - static DWORD serverThreadInit(LPVOID pThreadArgs) - { - Algorithm* pAlgorithmThread = (Algorithm*)pThreadArgs; - pAlgorithmThread->serverThreadRuntime(); - return NULL; - } - - // Updates Algorithm key/value store with IO key/value updates - void pollForUpdates(); private: - HANDLE hThread; - DWORD dwThreadId; - bool continueThread = false; - - size_t numIO; - std::vector ioComms; - - void startServer(); + size_t numIoProcs = 0; + AlgorithmServer* server; }; #endif diff --git a/breadcrumbs/include/AlgorithmServer.hpp b/breadcrumbs/include/AlgorithmServer.hpp index b8fe319..85520d0 100644 --- a/breadcrumbs/include/AlgorithmServer.hpp +++ b/breadcrumbs/include/AlgorithmServer.hpp @@ -2,6 +2,52 @@ #ifndef ALGORITHM_SERVER_HPP #define ALGORITHM_SERVER_HPP + +#undef UNICODE + +#define WIN32_LEAN_AND_MEAN + +#include +#include +#include +#include +#include +#include + +#include "CMakeConfig.h" +#include "DataSyncThread.hpp" + #define ALGORITHM_PORT "10101" +#define MAX_ACCEPT_FAILURES 5 + +#pragma comment (lib, "Ws2_32.lib") + +class AlgorithmServer +{ +public: + AlgorithmServer(size_t numClients); + ~AlgorithmServer(); + + void serverThreadRuntime(); + static DWORD serverThreadInit(LPVOID pThreadArgs) + { + AlgorithmServer* pAlgorithmServerThread = (AlgorithmServer*)pThreadArgs; + pAlgorithmServerThread->serverThreadRuntime(); + return NULL; + } + + // Updates Algorithm key/value store with IO key/value updates + void pollForUpdates(); + void startServer(); + void stopServer(); + +private: + HANDLE hThread; + DWORD dwThreadId; + bool continueThread = false; + + size_t numClients; + std::vector clientThreads; +}; #endif diff --git a/breadcrumbs/include/CMakeConfig.h b/breadcrumbs/include/CMakeConfig.h index 520c12d..b625ea5 100644 --- a/breadcrumbs/include/CMakeConfig.h +++ b/breadcrumbs/include/CMakeConfig.h @@ -1,4 +1,6 @@ // the configured options and settings for Tutorial -#define Tutorial_VERSION_MAJOR 1 -#define Tutorial_VERSION_MINOR 0 +#define VERSION_MAJOR 1 +#define VERSION_MINOR 0 + +#define ALGORITHM_SERVER_PORT "27634" diff --git a/breadcrumbs/include/DataSyncThread.hpp b/breadcrumbs/include/DataSyncThread.hpp index da859f9..ba17598 100644 --- a/breadcrumbs/include/DataSyncThread.hpp +++ b/breadcrumbs/include/DataSyncThread.hpp @@ -2,6 +2,8 @@ #ifndef DATA_SYNC_THREAD_HPP #define DATA_SYNC_THREAD_HPP +#define WIN32_LEAN_AND_MEAN + #include #include #include @@ -9,7 +11,7 @@ #include #include "Attribute.hpp" -#include "AlgorithmServer.hpp" +#include "CMakeConfig.h" // Need to link with Ws2_32.lib, Mswsock.lib, and Advapi32.lib #pragma comment (lib, "Ws2_32.lib") diff --git a/breadcrumbs/include/VirtualOutputProcessor.hpp b/breadcrumbs/include/VirtualOutputProcessor.hpp index b8a8ac7..067fef4 100644 --- a/breadcrumbs/include/VirtualOutputProcessor.hpp +++ b/breadcrumbs/include/VirtualOutputProcessor.hpp @@ -13,7 +13,7 @@ class VirtualOutputProcessor : public IOProcessor public: using IOProcessor::IOProcessor; - VOID threadRuntime(IOProcessor* ioProc); + void threadRuntime(IOProcessor* ioProc); }; #endif diff --git a/breadcrumbs/src/Breadcrumbs.cpp b/breadcrumbs/src/Breadcrumbs.cpp index 97b94e9..cec95a9 100644 --- a/breadcrumbs/src/Breadcrumbs.cpp +++ b/breadcrumbs/src/Breadcrumbs.cpp @@ -7,19 +7,13 @@ int main() { - Algorithm* algorithm = new AlgoBreadcrumbs(NULL, 0, 1); + Algorithm* algorithm = new AlgoBreadcrumbs(1); // Loop while (algorithm->loopCondition()) { - algorithm->pollInputs(); algorithm->loop(); - algorithm->pollOutputs(); } - // Cleanup - processor->waitForThread(); - delete processor; - return 0; } diff --git a/breadcrumbs/src/algos/AlgoBreadcrumbs.cpp b/breadcrumbs/src/algos/AlgoBreadcrumbs.cpp index 2aba742..7278eb5 100644 --- a/breadcrumbs/src/algos/AlgoBreadcrumbs.cpp +++ b/breadcrumbs/src/algos/AlgoBreadcrumbs.cpp @@ -2,12 +2,12 @@ #include "AlgoBreadcrumbs.hpp" -VOID AlgoBreadcrumbs::loop() +void AlgoBreadcrumbs::loop() { printf("Breadcrumbs algorithm loop!\n"); } -BOOLEAN AlgoBreadcrumbs::loopCondition() +bool AlgoBreadcrumbs::loopCondition() { return --iterations >= 0; } diff --git a/breadcrumbs/src/algos/Algorithm.cpp b/breadcrumbs/src/algos/Algorithm.cpp index 1448ae0..7ae56b8 100644 --- a/breadcrumbs/src/algos/Algorithm.cpp +++ b/breadcrumbs/src/algos/Algorithm.cpp @@ -2,41 +2,14 @@ #include "Algorithm.hpp" Algorithm::Algorithm(size_t numProcs) -{ - numIO = numProcs; - ioComms.reserve(numIO); - - startServer(); -} - -void Algorithm::pollForUpdates() { - /* - Polls DataSyncThreads for new updates - Updates master storage accordingly - */ - for (DataSyncThread dst : ioComms) - { - // TODO: Implement this! - } -} + numIoProcs = numProcs; + server = new AlgorithmServer(numProcs); -void Algorithm::startServer() -{ - /* - Starts the server socket thread - */ - continueThread = true; - hThread = CreateThread( - NULL, // default security attributes - 0, // use default stack size - serverThreadInit, // thread function name - this, // argument to thread function - 0, // use default creation flags - &dwThreadId); // returns the thread identifier + server->startServer(); } -void Algorithm::serverThreadRuntime() +Algorithm::~Algorithm() { - + delete server; } diff --git a/breadcrumbs/src/comms/AlgorithmServer.cpp b/breadcrumbs/src/comms/AlgorithmServer.cpp index e69de29..61f43db 100644 --- a/breadcrumbs/src/comms/AlgorithmServer.cpp +++ b/breadcrumbs/src/comms/AlgorithmServer.cpp @@ -0,0 +1,159 @@ + +#include "AlgorithmServer.hpp" + +AlgorithmServer::AlgorithmServer(size_t numClients) +{ + this->hThread = NULL; + this->dwThreadId = 0; + this->numClients = numClients; + + clientThreads.reserve(numClients); +} + +AlgorithmServer::~AlgorithmServer() +{ + this->stopServer(); +} + +void AlgorithmServer::serverThreadRuntime() +{ + int iResult; + + SOCKET ListenSocket = INVALID_SOCKET; + SOCKET ClientSocket = INVALID_SOCKET; + + struct addrinfo* result = NULL; + struct addrinfo hints; + + int iSendResult; + + ZeroMemory(&hints, sizeof(hints)); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + hints.ai_flags = AI_PASSIVE; + + // Resolve the server address and port + iResult = getaddrinfo(NULL, ALGORITHM_SERVER_PORT, &hints, &result); + if (iResult != 0) { + printf("getaddrinfo failed with error: %d\n", iResult); + WSACleanup(); + return; + } + + // Create a SOCKET for connecting to server + ListenSocket = socket(result->ai_family, result->ai_socktype, result->ai_protocol); + if (ListenSocket == INVALID_SOCKET) { + printf("socket failed with error: %ld\n", WSAGetLastError()); + freeaddrinfo(result); + WSACleanup(); + return; + } + + // Setup the TCP listening socket + iResult = bind(ListenSocket, result->ai_addr, (int)result->ai_addrlen); + if (iResult == SOCKET_ERROR) { + printf("bind failed with error: %d\n", WSAGetLastError()); + freeaddrinfo(result); + closesocket(ListenSocket); + WSACleanup(); + return; + } + freeaddrinfo(result); + + iResult = listen(ListenSocket, SOMAXCONN); + if (iResult == SOCKET_ERROR) { + printf("listen failed with error: %d\n", WSAGetLastError()); + closesocket(ListenSocket); + WSACleanup(); + return; + } + + // Select stuff + FD_SET readSet; + + int AcceptFailures = 0; + // Accept a client socket + while (continueThread) + { + int fds; + + FD_ZERO(&readSet); + FD_SET(ListenSocket, &readSet); + + if ((fds = select(0, &readSet, NULL, NULL, NULL)) == SOCKET_ERROR) + { + printf("select() returned with error %d\n", WSAGetLastError()); + break; + } + + if (fds != 0) + { + ClientSocket = accept(ListenSocket, NULL, NULL); + if (ClientSocket == INVALID_SOCKET) { + printf("accept failed with error: %d\n", WSAGetLastError()); + AcceptFailures++; + if (AcceptFailures >= MAX_ACCEPT_FAILURES) + break; + } + else { + AcceptFailures = 0; + } + + // Creating a new Data sync thread + if (clientThreads.size() < numClients) { + DataSyncThread client = *new DataSyncThread(ClientSocket); + clientThreads.push_back(client); + client.startComms(); + } + else { + printf("Client attempted connection when (%d) clients are already connected", numClients); + } + } + } + + // cleanup + closesocket(ListenSocket); + WSACleanup(); + continueThread = false; + return; +} + +void AlgorithmServer::pollForUpdates() +{ + /* + Polls DataSyncThreads for new updates + Updates master storage accordingly + */ + for (DataSyncThread dst : clientThreads) + { + // TODO: Implement this! + } +} + +void AlgorithmServer::startServer() +{ + + // Initialize Winsock + WSADATA wsaData; + int result; + if (result = WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) { + printf("WSAStartup failed with error: %d\n", result); + return; + } + + // Starts the server thread: + continueThread = true; + hThread = CreateThread( + NULL, // default security attributes + 0, // use default stack size + serverThreadInit, // thread function name + this, // argument to thread function + 0, // use default creation flags + &dwThreadId); // returns the thread identifier +} + +void AlgorithmServer::stopServer() +{ + continueThread = false; +} diff --git a/breadcrumbs/src/comms/DataSyncThread.cpp b/breadcrumbs/src/comms/DataSyncThread.cpp index d0eb563..a5efa72 100644 --- a/breadcrumbs/src/comms/DataSyncThread.cpp +++ b/breadcrumbs/src/comms/DataSyncThread.cpp @@ -7,7 +7,7 @@ void DataSyncThread::threadRuntime() /* Handles the data sync between the other end of the socket */ - int iResult; + int iResult = 1; while (continueThread && iResult > 0) { @@ -24,7 +24,7 @@ void DataSyncThread::threadRuntime() iResult = recvBytes(&attrib, sizeof(bAttrib)); if (iResult <= 0) break; - printf("Attribute update received, key:%.8s len:%us\n", attrib.key, attrib.length); + printf("Attribute update received, key:%.8s len:%us\n", attrib.key, (unsigned short) attrib.length); void* value = malloc((unsigned short)attrib.length); iResult = recvBytes(value, (unsigned short)attrib.length); if (iResult <= 0) @@ -82,6 +82,7 @@ bool DataSyncThread::stopComms() Stops the communication thread */ continueThread = false; + return true; } int DataSyncThread::connectToAlgorithm(char* serverName) @@ -104,7 +105,7 @@ int DataSyncThread::connectToAlgorithm(char* serverName) hints.ai_protocol = IPPROTO_TCP; // Resolve the server address and port - iResult = getaddrinfo(serverName, ALGORITHM_PORT, &hints, &result); + iResult = getaddrinfo(serverName, ALGORITHM_SERVER_PORT, &hints, &result); if (iResult != 0) { printf("getaddrinfo failed with error: %d\n", iResult); WSACleanup(); diff --git a/breadcrumbs/src/io/out_procs/VirtualOutputProcessor.cpp b/breadcrumbs/src/io/out_procs/VirtualOutputProcessor.cpp index e6905c0..01bf3cc 100644 --- a/breadcrumbs/src/io/out_procs/VirtualOutputProcessor.cpp +++ b/breadcrumbs/src/io/out_procs/VirtualOutputProcessor.cpp @@ -2,7 +2,7 @@ #include "VirtualOutputProcessor.hpp" -VOID VirtualOutputProcessor::threadRuntime(IOProcessor* ioProc) +void VirtualOutputProcessor::threadRuntime(IOProcessor* ioProc) { - printf("VirtualOutputProcessor %d started.\n", *((int*) threadArgs)); + printf("VirtualOutputProcessor started.\n"); } From b947e5a31e0f15e31a408488b125ad5bcd52ce8d Mon Sep 17 00:00:00 2001 From: Greg Foss Date: Thu, 16 Jan 2020 10:59:56 -0500 Subject: [PATCH 3/5] Stabalizing server code and fixing attrib update decoding --- breadcrumbs/include/AlgorithmServer.hpp | 7 +-- breadcrumbs/include/Attribute.hpp | 2 +- breadcrumbs/include/DataSyncThread.hpp | 6 ++- breadcrumbs/src/Breadcrumbs.cpp | 2 + breadcrumbs/src/comms/AlgorithmServer.cpp | 66 ++++++++++++----------- breadcrumbs/src/comms/DataSyncThread.cpp | 18 +++++-- 6 files changed, 59 insertions(+), 42 deletions(-) diff --git a/breadcrumbs/include/AlgorithmServer.hpp b/breadcrumbs/include/AlgorithmServer.hpp index 85520d0..67c3731 100644 --- a/breadcrumbs/include/AlgorithmServer.hpp +++ b/breadcrumbs/include/AlgorithmServer.hpp @@ -17,7 +17,6 @@ #include "CMakeConfig.h" #include "DataSyncThread.hpp" -#define ALGORITHM_PORT "10101" #define MAX_ACCEPT_FAILURES 5 #pragma comment (lib, "Ws2_32.lib") @@ -39,12 +38,14 @@ class AlgorithmServer // Updates Algorithm key/value store with IO key/value updates void pollForUpdates(); void startServer(); - void stopServer(); + int stopServer(); private: + SOCKET *ServerSocket = NULL; + HANDLE hThread; DWORD dwThreadId; - bool continueThread = false; + bool continueThread = true; size_t numClients; std::vector clientThreads; diff --git a/breadcrumbs/include/Attribute.hpp b/breadcrumbs/include/Attribute.hpp index 717ae02..fc1ed4e 100644 --- a/breadcrumbs/include/Attribute.hpp +++ b/breadcrumbs/include/Attribute.hpp @@ -4,7 +4,7 @@ typedef struct BinaryAttributeStructure { char key[8]; - unsigned char length[2]; + char length; } bAttrib; #endif \ No newline at end of file diff --git a/breadcrumbs/include/DataSyncThread.hpp b/breadcrumbs/include/DataSyncThread.hpp index ba17598..05a7998 100644 --- a/breadcrumbs/include/DataSyncThread.hpp +++ b/breadcrumbs/include/DataSyncThread.hpp @@ -32,7 +32,7 @@ class DataSyncThread public: DataSyncThread(SOCKET s) { sock = s; }; - // Synchronous functions + // Synchronous functions (only called from thread) void threadRuntime(); static DWORD threadInit(LPVOID pThreadArgs) { @@ -42,9 +42,11 @@ class DataSyncThread } int recvBytes(void* buffer, size_t numBytes); - // Async control + // Async control (only called outside of thread) void startComms(); bool stopComms(); + bool isClientConnected() { return continueThread; }; + unsigned int getSocketNumber() { return (unsigned int) socket; }; int connectToAlgorithm(char* serverName); }; diff --git a/breadcrumbs/src/Breadcrumbs.cpp b/breadcrumbs/src/Breadcrumbs.cpp index cec95a9..7d20d0f 100644 --- a/breadcrumbs/src/Breadcrumbs.cpp +++ b/breadcrumbs/src/Breadcrumbs.cpp @@ -15,5 +15,7 @@ int main() algorithm->loop(); } + while (1); + return 0; } diff --git a/breadcrumbs/src/comms/AlgorithmServer.cpp b/breadcrumbs/src/comms/AlgorithmServer.cpp index 61f43db..7023659 100644 --- a/breadcrumbs/src/comms/AlgorithmServer.cpp +++ b/breadcrumbs/src/comms/AlgorithmServer.cpp @@ -49,6 +49,7 @@ void AlgorithmServer::serverThreadRuntime() WSACleanup(); return; } + ServerSocket = &ListenSocket; // Setup the TCP listening socket iResult = bind(ListenSocket, result->ai_addr, (int)result->ai_addrlen); @@ -69,52 +70,51 @@ void AlgorithmServer::serverThreadRuntime() return; } - // Select stuff - FD_SET readSet; - int AcceptFailures = 0; // Accept a client socket while (continueThread) { - int fds; - - FD_ZERO(&readSet); - FD_SET(ListenSocket, &readSet); - - if ((fds = select(0, &readSet, NULL, NULL, NULL)) == SOCKET_ERROR) - { - printf("select() returned with error %d\n", WSAGetLastError()); - break; + printf("Listening for clients...\n"); + ClientSocket = accept(ListenSocket, NULL, NULL); + printf("Hello new client!\n"); + if (ClientSocket == INVALID_SOCKET) { + printf("accept failed with error: %d\n", WSAGetLastError()); + AcceptFailures++; + if (AcceptFailures >= MAX_ACCEPT_FAILURES) + break; + } + else { + AcceptFailures = 0; } - if (fds != 0) + // Reaping any old data sync threads + auto it = clientThreads.begin(); + while (clientThreads.size() > 0 && it != clientThreads.end()) { - ClientSocket = accept(ListenSocket, NULL, NULL); - if (ClientSocket == INVALID_SOCKET) { - printf("accept failed with error: %d\n", WSAGetLastError()); - AcceptFailures++; - if (AcceptFailures >= MAX_ACCEPT_FAILURES) - break; + if (!(*it).isClientConnected()) + { + clientThreads.erase(it); } else { - AcceptFailures = 0; + ++it; } + } - // Creating a new Data sync thread - if (clientThreads.size() < numClients) { - DataSyncThread client = *new DataSyncThread(ClientSocket); - clientThreads.push_back(client); - client.startComms(); - } - else { - printf("Client attempted connection when (%d) clients are already connected", numClients); - } + // Creating a new Data sync thread + if (clientThreads.size() < numClients) { + DataSyncThread client = *new DataSyncThread(ClientSocket); + clientThreads.push_back(client); + client.startComms(); + } + else { + printf("Client attempted connection when (%d) clients are already connected", numClients); } } // cleanup closesocket(ListenSocket); WSACleanup(); + ServerSocket = NULL; continueThread = false; return; } @@ -153,7 +153,11 @@ void AlgorithmServer::startServer() &dwThreadId); // returns the thread identifier } -void AlgorithmServer::stopServer() +int AlgorithmServer::stopServer() { - continueThread = false; + if (continueThread && ServerSocket != NULL) { + continueThread = false; + return shutdown(*ServerSocket, SD_BOTH); + } + return 0; } diff --git a/breadcrumbs/src/comms/DataSyncThread.cpp b/breadcrumbs/src/comms/DataSyncThread.cpp index a5efa72..b18e238 100644 --- a/breadcrumbs/src/comms/DataSyncThread.cpp +++ b/breadcrumbs/src/comms/DataSyncThread.cpp @@ -11,6 +11,9 @@ void DataSyncThread::threadRuntime() while (continueThread && iResult > 0) { + + printf("Waiting to receive bytes\n"); + // Master while loop that will receive incoming messages char commandByte; iResult = recvBytes(&commandByte, 1); @@ -20,18 +23,23 @@ void DataSyncThread::threadRuntime() switch (commandByte) { case 0: + printf("Attribute update...\n"); bAttrib attrib; iResult = recvBytes(&attrib, sizeof(bAttrib)); if (iResult <= 0) break; - printf("Attribute update received, key:%.8s len:%us\n", attrib.key, (unsigned short) attrib.length); - void* value = malloc((unsigned short)attrib.length); - iResult = recvBytes(value, (unsigned short)attrib.length); + + printf("Attribute update received, key:%.8s len:%x\n", attrib.key, attrib.length); + if (attrib.length <= 0) + break; + + void* value = malloc(attrib.length); + iResult = recvBytes(value, attrib.length); if (iResult <= 0) break; - // TODO: Storing the attrib update - break; + // TODO: Storing the attrib update + } } From a01e7a7e7a8cb794092db974a0be20faf79f0f37 Mon Sep 17 00:00:00 2001 From: Greg Foss Date: Thu, 16 Jan 2020 13:18:08 -0500 Subject: [PATCH 4/5] Adding new attribs to vector and allow algorithm to get new values --- breadcrumbs/include/Attribute.hpp | 35 +++++++++++++++++- breadcrumbs/include/DataSyncThread.hpp | 13 +++++-- breadcrumbs/src/comms/DataSyncThread.cpp | 46 +++++++++++++++++++----- 3 files changed, 83 insertions(+), 11 deletions(-) diff --git a/breadcrumbs/include/Attribute.hpp b/breadcrumbs/include/Attribute.hpp index fc1ed4e..399af93 100644 --- a/breadcrumbs/include/Attribute.hpp +++ b/breadcrumbs/include/Attribute.hpp @@ -2,9 +2,42 @@ #ifndef ATTRIBUTE_HPP #define ATTRIBUTE_HPP +#include +#include + +#define ATTRIB_KEY_SIZE 8 + +using namespace std; + typedef struct BinaryAttributeStructure { - char key[8]; + char key[ATTRIB_KEY_SIZE]; char length; } bAttrib; +class Attribute +{ +private: + string key; + size_t length; + void* value; +public: + Attribute(BinaryAttributeStructure bin, void* value) { + key = ""; + for (int i = 0; i < ATTRIB_KEY_SIZE; i++) + key += bin.key[i]; + length = bin.length; + this->value = value; + } + + ~Attribute() { + + } + + // Getters and setters + string getKey() { return key; }; + size_t getLength() { return length; }; + void* getValue() { return value; }; + void* setValue(void* newValue) { void* old = value; value = newValue; return old; }; +}; + #endif \ No newline at end of file diff --git a/breadcrumbs/include/DataSyncThread.hpp b/breadcrumbs/include/DataSyncThread.hpp index 05a7998..4c428da 100644 --- a/breadcrumbs/include/DataSyncThread.hpp +++ b/breadcrumbs/include/DataSyncThread.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include "Attribute.hpp" #include "CMakeConfig.h" @@ -26,11 +27,17 @@ class DataSyncThread { private: SOCKET sock; - HANDLE hThread; + HANDLE hThread = INVALID_HANDLE_VALUE; DWORD dwThreadId; bool continueThread = false; + + HANDLE attribMutex; + std::vector incomingAttributes; public: - DataSyncThread(SOCKET s) { sock = s; }; + DataSyncThread(SOCKET s) + { + sock = s; + }; // Synchronous functions (only called from thread) void threadRuntime(); @@ -41,6 +48,7 @@ class DataSyncThread return NULL; } int recvBytes(void* buffer, size_t numBytes); + void addIncomingAttribute(Attribute attrib); // Async control (only called outside of thread) void startComms(); @@ -48,6 +56,7 @@ class DataSyncThread bool isClientConnected() { return continueThread; }; unsigned int getSocketNumber() { return (unsigned int) socket; }; int connectToAlgorithm(char* serverName); + std::vector *getIncomingAttributes(); }; #endif \ No newline at end of file diff --git a/breadcrumbs/src/comms/DataSyncThread.cpp b/breadcrumbs/src/comms/DataSyncThread.cpp index b18e238..a2144ac 100644 --- a/breadcrumbs/src/comms/DataSyncThread.cpp +++ b/breadcrumbs/src/comms/DataSyncThread.cpp @@ -4,6 +4,8 @@ void DataSyncThread::threadRuntime() { + attribMutex = CreateMutex(NULL, false, NULL); + /* Handles the data sync between the other end of the socket */ @@ -24,26 +26,31 @@ void DataSyncThread::threadRuntime() { case 0: printf("Attribute update...\n"); - bAttrib attrib; - iResult = recvBytes(&attrib, sizeof(bAttrib)); + bAttrib bAttr; + iResult = recvBytes(&bAttr, sizeof(bAttrib)); if (iResult <= 0) break; - printf("Attribute update received, key:%.8s len:%x\n", attrib.key, attrib.length); - if (attrib.length <= 0) + printf("Attribute update received, key:%.8s len:%x\n", bAttr.key, bAttr.length); + if (bAttr.length <= 0) break; - void* value = malloc(attrib.length); - iResult = recvBytes(value, attrib.length); + void* value = malloc(bAttr.length); + iResult = recvBytes(value, bAttr.length); if (iResult <= 0) break; - // TODO: Storing the attrib update - + // Storing the attrib update + addIncomingAttribute(*new Attribute(bAttr, value)); + } } continueThread = false; + if (attribMutex != NULL) { + CloseHandle(attribMutex); + attribMutex = NULL; + } } int DataSyncThread::recvBytes(void* buffer, size_t numBytes) @@ -69,6 +76,15 @@ int DataSyncThread::recvBytes(void* buffer, size_t numBytes) return numBytes; } +void DataSyncThread::addIncomingAttribute(Attribute attrib) +{ + DWORD result = WaitForSingleObject(attribMutex, INFINITE); + if (result == WAIT_OBJECT_0) { + incomingAttributes.push_back(attrib); + ReleaseMutex(attribMutex); + } +} + void DataSyncThread::startComms() { /* @@ -150,3 +166,17 @@ int DataSyncThread::connectToAlgorithm(char* serverName) return 1; } } + +std::vector *DataSyncThread::getIncomingAttributes() +{ + vector* newVector = new vector; + + DWORD result = WaitForSingleObject(attribMutex, INFINITE); + if (result == WAIT_OBJECT_0) { + for (Attribute attrib : incomingAttributes) + (*newVector).push_back(attrib); + ReleaseMutex(attribMutex); + } + + return newVector; +} From 7ddbee693350dedf46bdb2d5d5858d07a5732e62 Mon Sep 17 00:00:00 2001 From: Greg Foss Date: Thu, 16 Jan 2020 17:14:31 -0500 Subject: [PATCH 5/5] Finishing basic attribute update framework --- breadcrumbs/include/Algorithm.hpp | 2 + breadcrumbs/include/AlgorithmServer.hpp | 8 ++- breadcrumbs/include/DataSyncThread.hpp | 5 +- breadcrumbs/src/Breadcrumbs.cpp | 3 +- breadcrumbs/src/algos/AlgoBreadcrumbs.cpp | 9 ++- breadcrumbs/src/algos/Algorithm.cpp | 5 ++ breadcrumbs/src/comms/AlgorithmServer.cpp | 68 +++++++++++++++++------ breadcrumbs/src/comms/DataSyncThread.cpp | 21 ++++++- 8 files changed, 97 insertions(+), 24 deletions(-) diff --git a/breadcrumbs/include/Algorithm.hpp b/breadcrumbs/include/Algorithm.hpp index 8edd655..2d2b5e2 100644 --- a/breadcrumbs/include/Algorithm.hpp +++ b/breadcrumbs/include/Algorithm.hpp @@ -12,6 +12,8 @@ class Algorithm virtual void loop() = 0; virtual bool loopCondition() { return false; }; + + vector* pollForAttributes(); private: size_t numIoProcs = 0; AlgorithmServer* server; diff --git a/breadcrumbs/include/AlgorithmServer.hpp b/breadcrumbs/include/AlgorithmServer.hpp index 67c3731..ba48992 100644 --- a/breadcrumbs/include/AlgorithmServer.hpp +++ b/breadcrumbs/include/AlgorithmServer.hpp @@ -36,10 +36,13 @@ class AlgorithmServer } // Updates Algorithm key/value store with IO key/value updates - void pollForUpdates(); + vector* getAllIncomingAttributes(); void startServer(); int stopServer(); + DWORD lockClientThreadsMutex(); + void unlockClientThreadsMutex(); + private: SOCKET *ServerSocket = NULL; @@ -48,7 +51,8 @@ class AlgorithmServer bool continueThread = true; size_t numClients; - std::vector clientThreads; + HANDLE clientThreadsMutex = NULL; + vector clientThreads; }; #endif diff --git a/breadcrumbs/include/DataSyncThread.hpp b/breadcrumbs/include/DataSyncThread.hpp index 4c428da..c8ed1b9 100644 --- a/breadcrumbs/include/DataSyncThread.hpp +++ b/breadcrumbs/include/DataSyncThread.hpp @@ -31,12 +31,13 @@ class DataSyncThread DWORD dwThreadId; bool continueThread = false; - HANDLE attribMutex; std::vector incomingAttributes; + HANDLE attribMutex; public: DataSyncThread(SOCKET s) { sock = s; + attribMutex = CreateMutex(NULL, false, NULL); }; // Synchronous functions (only called from thread) @@ -56,7 +57,9 @@ class DataSyncThread bool isClientConnected() { return continueThread; }; unsigned int getSocketNumber() { return (unsigned int) socket; }; int connectToAlgorithm(char* serverName); + bool areIncomingAttributesAvailable(); std::vector *getIncomingAttributes(); + bool sendAttribute(Attribute attrib); }; #endif \ No newline at end of file diff --git a/breadcrumbs/src/Breadcrumbs.cpp b/breadcrumbs/src/Breadcrumbs.cpp index 7d20d0f..060accf 100644 --- a/breadcrumbs/src/Breadcrumbs.cpp +++ b/breadcrumbs/src/Breadcrumbs.cpp @@ -13,9 +13,8 @@ int main() while (algorithm->loopCondition()) { algorithm->loop(); + _sleep(1000); } - while (1); - return 0; } diff --git a/breadcrumbs/src/algos/AlgoBreadcrumbs.cpp b/breadcrumbs/src/algos/AlgoBreadcrumbs.cpp index 7278eb5..04ef348 100644 --- a/breadcrumbs/src/algos/AlgoBreadcrumbs.cpp +++ b/breadcrumbs/src/algos/AlgoBreadcrumbs.cpp @@ -4,10 +4,15 @@ void AlgoBreadcrumbs::loop() { - printf("Breadcrumbs algorithm loop!\n"); + vector* attribs = pollForAttributes(); + // printf("Attrib length: %d\n", attribs->size()); + if (attribs->size() > 0) + { + printf("Attrib length: %d\n", attribs->size()); + } } bool AlgoBreadcrumbs::loopCondition() { - return --iterations >= 0; + return true; } diff --git a/breadcrumbs/src/algos/Algorithm.cpp b/breadcrumbs/src/algos/Algorithm.cpp index 7ae56b8..5f2be18 100644 --- a/breadcrumbs/src/algos/Algorithm.cpp +++ b/breadcrumbs/src/algos/Algorithm.cpp @@ -13,3 +13,8 @@ Algorithm::~Algorithm() { delete server; } + +vector* Algorithm::pollForAttributes() +{ + return server->getAllIncomingAttributes(); +} diff --git a/breadcrumbs/src/comms/AlgorithmServer.cpp b/breadcrumbs/src/comms/AlgorithmServer.cpp index 7023659..84df34e 100644 --- a/breadcrumbs/src/comms/AlgorithmServer.cpp +++ b/breadcrumbs/src/comms/AlgorithmServer.cpp @@ -18,6 +18,7 @@ AlgorithmServer::~AlgorithmServer() void AlgorithmServer::serverThreadRuntime() { int iResult; + clientThreadsMutex = CreateMutex(NULL, false, NULL); SOCKET ListenSocket = INVALID_SOCKET; SOCKET ClientSocket = INVALID_SOCKET; @@ -88,26 +89,33 @@ void AlgorithmServer::serverThreadRuntime() } // Reaping any old data sync threads - auto it = clientThreads.begin(); - while (clientThreads.size() > 0 && it != clientThreads.end()) + if (lockClientThreadsMutex() == STATUS_WAIT_0) { - if (!(*it).isClientConnected()) + auto it = clientThreads.begin(); + while (clientThreads.size() > 0 && it != clientThreads.end()) { - clientThreads.erase(it); + if (!(**it).isClientConnected()) + { + clientThreads.erase(it); + } + else { + ++it; + } + } + + // Creating a new Data sync thread + if (clientThreads.size() < numClients) { + DataSyncThread* client = new DataSyncThread(ClientSocket); + clientThreads.push_back(client); + client->startComms(); } else { - ++it; + printf("Client attempted connection when (%d) clients are already connected", numClients); } - } - - // Creating a new Data sync thread - if (clientThreads.size() < numClients) { - DataSyncThread client = *new DataSyncThread(ClientSocket); - clientThreads.push_back(client); - client.startComms(); + unlockClientThreadsMutex(); } else { - printf("Client attempted connection when (%d) clients are already connected", numClients); + printf("Could not acquire mutex to add new client thread\n"); } } @@ -116,19 +124,30 @@ void AlgorithmServer::serverThreadRuntime() WSACleanup(); ServerSocket = NULL; continueThread = false; + CloseHandle(clientThreadsMutex); return; } -void AlgorithmServer::pollForUpdates() +vector* AlgorithmServer::getAllIncomingAttributes() { /* Polls DataSyncThreads for new updates Updates master storage accordingly */ - for (DataSyncThread dst : clientThreads) + vector* newAttribs = new vector; + if (lockClientThreadsMutex() == STATUS_WAIT_0) { - // TODO: Implement this! + for (DataSyncThread* dst : clientThreads) + { + vector* toAdd = dst->getIncomingAttributes(); + newAttribs->insert(newAttribs->end(), toAdd->begin(), toAdd->end()); + } + unlockClientThreadsMutex(); + } + else { + printf("Could not acquire mutex to get incoming attributes.\n"); } + return newAttribs; } void AlgorithmServer::startServer() @@ -161,3 +180,20 @@ int AlgorithmServer::stopServer() } return 0; } + +DWORD AlgorithmServer::lockClientThreadsMutex() +{ + if (clientThreadsMutex != NULL) + { + return WaitForSingleObject(clientThreadsMutex, INFINITE); + } + return -1; +} + +void AlgorithmServer::unlockClientThreadsMutex() +{ + if (clientThreadsMutex != NULL) + { + ReleaseMutex(clientThreadsMutex); + } +} diff --git a/breadcrumbs/src/comms/DataSyncThread.cpp b/breadcrumbs/src/comms/DataSyncThread.cpp index a2144ac..851fb94 100644 --- a/breadcrumbs/src/comms/DataSyncThread.cpp +++ b/breadcrumbs/src/comms/DataSyncThread.cpp @@ -4,7 +4,8 @@ void DataSyncThread::threadRuntime() { - attribMutex = CreateMutex(NULL, false, NULL); + + printf("Thread runtime: %d\n", incomingAttributes); /* Handles the data sync between the other end of the socket @@ -83,6 +84,9 @@ void DataSyncThread::addIncomingAttribute(Attribute attrib) incomingAttributes.push_back(attrib); ReleaseMutex(attribMutex); } + else { + printf("1 Failed to acquire mutex, wait returned %x, error: %d\n", result, GetLastError()); + } } void DataSyncThread::startComms() @@ -90,6 +94,7 @@ void DataSyncThread::startComms() /* Initializes the communication thread */ + printf("Start comms: %d\n", incomingAttributes); continueThread = true; hThread = CreateThread( NULL, // default security attributes @@ -177,6 +182,20 @@ std::vector *DataSyncThread::getIncomingAttributes() (*newVector).push_back(attrib); ReleaseMutex(attribMutex); } + else { + printf("2 Failed to acquire mutex, wait returned %x, error: %d\n", result, GetLastError()); + } return newVector; } + +bool DataSyncThread::areIncomingAttributesAvailable() +{ + return incomingAttributes.size() > 0; +} + +bool sendAttribute(Attribute attrib) +{ + // TODO: Implement this to send attribute in the format described in the recieve section. + return false; +}