From b947e5a31e0f15e31a408488b125ad5bcd52ce8d Mon Sep 17 00:00:00 2001 From: Greg Foss Date: Thu, 16 Jan 2020 10:59:56 -0500 Subject: [PATCH] Stabalizing server code and fixing attrib update decoding --- breadcrumbs/include/AlgorithmServer.hpp | 7 +-- breadcrumbs/include/Attribute.hpp | 2 +- breadcrumbs/include/DataSyncThread.hpp | 6 ++- breadcrumbs/src/Breadcrumbs.cpp | 2 + breadcrumbs/src/comms/AlgorithmServer.cpp | 66 ++++++++++++----------- breadcrumbs/src/comms/DataSyncThread.cpp | 18 +++++-- 6 files changed, 59 insertions(+), 42 deletions(-) diff --git a/breadcrumbs/include/AlgorithmServer.hpp b/breadcrumbs/include/AlgorithmServer.hpp index 85520d0..67c3731 100644 --- a/breadcrumbs/include/AlgorithmServer.hpp +++ b/breadcrumbs/include/AlgorithmServer.hpp @@ -17,7 +17,6 @@ #include "CMakeConfig.h" #include "DataSyncThread.hpp" -#define ALGORITHM_PORT "10101" #define MAX_ACCEPT_FAILURES 5 #pragma comment (lib, "Ws2_32.lib") @@ -39,12 +38,14 @@ class AlgorithmServer // Updates Algorithm key/value store with IO key/value updates void pollForUpdates(); void startServer(); - void stopServer(); + int stopServer(); private: + SOCKET *ServerSocket = NULL; + HANDLE hThread; DWORD dwThreadId; - bool continueThread = false; + bool continueThread = true; size_t numClients; std::vector clientThreads; diff --git a/breadcrumbs/include/Attribute.hpp b/breadcrumbs/include/Attribute.hpp index 717ae02..fc1ed4e 100644 --- a/breadcrumbs/include/Attribute.hpp +++ b/breadcrumbs/include/Attribute.hpp @@ -4,7 +4,7 @@ typedef struct BinaryAttributeStructure { char key[8]; - unsigned char length[2]; + char length; } bAttrib; #endif \ No newline at end of file diff --git a/breadcrumbs/include/DataSyncThread.hpp b/breadcrumbs/include/DataSyncThread.hpp index ba17598..05a7998 100644 --- a/breadcrumbs/include/DataSyncThread.hpp +++ b/breadcrumbs/include/DataSyncThread.hpp @@ -32,7 +32,7 @@ class DataSyncThread public: DataSyncThread(SOCKET s) { sock = s; }; - // Synchronous functions + // Synchronous functions (only called from thread) void threadRuntime(); static DWORD threadInit(LPVOID pThreadArgs) { @@ -42,9 +42,11 @@ class DataSyncThread } int recvBytes(void* buffer, size_t numBytes); - // Async control + // Async control (only called outside of thread) void startComms(); bool stopComms(); + bool isClientConnected() { return continueThread; }; + unsigned int getSocketNumber() { return (unsigned int) socket; }; int connectToAlgorithm(char* serverName); }; diff --git a/breadcrumbs/src/Breadcrumbs.cpp b/breadcrumbs/src/Breadcrumbs.cpp index cec95a9..7d20d0f 100644 --- a/breadcrumbs/src/Breadcrumbs.cpp +++ b/breadcrumbs/src/Breadcrumbs.cpp @@ -15,5 +15,7 @@ int main() algorithm->loop(); } + while (1); + return 0; } diff --git a/breadcrumbs/src/comms/AlgorithmServer.cpp b/breadcrumbs/src/comms/AlgorithmServer.cpp index 61f43db..7023659 100644 --- a/breadcrumbs/src/comms/AlgorithmServer.cpp +++ b/breadcrumbs/src/comms/AlgorithmServer.cpp @@ -49,6 +49,7 @@ void AlgorithmServer::serverThreadRuntime() WSACleanup(); return; } + ServerSocket = &ListenSocket; // Setup the TCP listening socket iResult = bind(ListenSocket, result->ai_addr, (int)result->ai_addrlen); @@ -69,52 +70,51 @@ void AlgorithmServer::serverThreadRuntime() return; } - // Select stuff - FD_SET readSet; - int AcceptFailures = 0; // Accept a client socket while (continueThread) { - int fds; - - FD_ZERO(&readSet); - FD_SET(ListenSocket, &readSet); - - if ((fds = select(0, &readSet, NULL, NULL, NULL)) == SOCKET_ERROR) - { - printf("select() returned with error %d\n", WSAGetLastError()); - break; + printf("Listening for clients...\n"); + ClientSocket = accept(ListenSocket, NULL, NULL); + printf("Hello new client!\n"); + if (ClientSocket == INVALID_SOCKET) { + printf("accept failed with error: %d\n", WSAGetLastError()); + AcceptFailures++; + if (AcceptFailures >= MAX_ACCEPT_FAILURES) + break; + } + else { + AcceptFailures = 0; } - if (fds != 0) + // Reaping any old data sync threads + auto it = clientThreads.begin(); + while (clientThreads.size() > 0 && it != clientThreads.end()) { - ClientSocket = accept(ListenSocket, NULL, NULL); - if (ClientSocket == INVALID_SOCKET) { - printf("accept failed with error: %d\n", WSAGetLastError()); - AcceptFailures++; - if (AcceptFailures >= MAX_ACCEPT_FAILURES) - break; + if (!(*it).isClientConnected()) + { + clientThreads.erase(it); } else { - AcceptFailures = 0; + ++it; } + } - // Creating a new Data sync thread - if (clientThreads.size() < numClients) { - DataSyncThread client = *new DataSyncThread(ClientSocket); - clientThreads.push_back(client); - client.startComms(); - } - else { - printf("Client attempted connection when (%d) clients are already connected", numClients); - } + // Creating a new Data sync thread + if (clientThreads.size() < numClients) { + DataSyncThread client = *new DataSyncThread(ClientSocket); + clientThreads.push_back(client); + client.startComms(); + } + else { + printf("Client attempted connection when (%d) clients are already connected", numClients); } } // cleanup closesocket(ListenSocket); WSACleanup(); + ServerSocket = NULL; continueThread = false; return; } @@ -153,7 +153,11 @@ void AlgorithmServer::startServer() &dwThreadId); // returns the thread identifier } -void AlgorithmServer::stopServer() +int AlgorithmServer::stopServer() { - continueThread = false; + if (continueThread && ServerSocket != NULL) { + continueThread = false; + return shutdown(*ServerSocket, SD_BOTH); + } + return 0; } diff --git a/breadcrumbs/src/comms/DataSyncThread.cpp b/breadcrumbs/src/comms/DataSyncThread.cpp index a5efa72..b18e238 100644 --- a/breadcrumbs/src/comms/DataSyncThread.cpp +++ b/breadcrumbs/src/comms/DataSyncThread.cpp @@ -11,6 +11,9 @@ void DataSyncThread::threadRuntime() while (continueThread && iResult > 0) { + + printf("Waiting to receive bytes\n"); + // Master while loop that will receive incoming messages char commandByte; iResult = recvBytes(&commandByte, 1); @@ -20,18 +23,23 @@ void DataSyncThread::threadRuntime() switch (commandByte) { case 0: + printf("Attribute update...\n"); bAttrib attrib; iResult = recvBytes(&attrib, sizeof(bAttrib)); if (iResult <= 0) break; - printf("Attribute update received, key:%.8s len:%us\n", attrib.key, (unsigned short) attrib.length); - void* value = malloc((unsigned short)attrib.length); - iResult = recvBytes(value, (unsigned short)attrib.length); + + printf("Attribute update received, key:%.8s len:%x\n", attrib.key, attrib.length); + if (attrib.length <= 0) + break; + + void* value = malloc(attrib.length); + iResult = recvBytes(value, attrib.length); if (iResult <= 0) break; - // TODO: Storing the attrib update - break; + // TODO: Storing the attrib update + } }