diff --git a/breadcrumbs/CMakeConfig.h.in b/breadcrumbs/CMakeConfig.h.in index e47bb02..be22a44 100644 --- a/breadcrumbs/CMakeConfig.h.in +++ b/breadcrumbs/CMakeConfig.h.in @@ -1,4 +1,6 @@ // the configured options and settings for Tutorial -#define Tutorial_VERSION_MAJOR @Bfs_VERSION_MAJOR@ -#define Tutorial_VERSION_MINOR @Bfs_VERSION_MINOR@ \ No newline at end of file +#define VERSION_MAJOR @Bfs_VERSION_MAJOR@ +#define VERSION_MINOR @Bfs_VERSION_MINOR@ + +#define ALGORITHM_SERVER_PORT @Bfs_ALGORITHM_SERVER_PORT@ \ No newline at end of file diff --git a/breadcrumbs/CMakeLists.txt b/breadcrumbs/CMakeLists.txt index d214ccf..94f900a 100644 --- a/breadcrumbs/CMakeLists.txt +++ b/breadcrumbs/CMakeLists.txt @@ -26,6 +26,8 @@ message("Library directory: ${CMAKE_LIBRARY_OUTPUT_DIRECTORY}") message("Header file directory: ${INCLUDES_DIRECTORY}") # Configure a header file to pass some of the CMake settings to the source code +set (Bfs_ALGORITHM_SERVER_PORT \"27634\") + configure_file ( "${PROJECT_SOURCE_DIR}/CMakeConfig.h.in" "${CMAKE_INCLUDE_PATH}/CMakeConfig.h" diff --git a/breadcrumbs/include/AlgoBreadcrumbs.hpp b/breadcrumbs/include/AlgoBreadcrumbs.hpp index e9a0995..3502ff5 100644 --- a/breadcrumbs/include/AlgoBreadcrumbs.hpp +++ b/breadcrumbs/include/AlgoBreadcrumbs.hpp @@ -9,8 +9,8 @@ class AlgoBreadcrumbs : public Algorithm public: using Algorithm::Algorithm; - VOID loop(); - BOOLEAN loopCondition(); + void loop(); + bool loopCondition(); private: INT iterations = 1; }; diff --git a/breadcrumbs/include/Algorithm.hpp b/breadcrumbs/include/Algorithm.hpp index 9cea37f..8edd655 100644 --- a/breadcrumbs/include/Algorithm.hpp +++ b/breadcrumbs/include/Algorithm.hpp @@ -2,47 +2,19 @@ #ifndef ALGORITHM_HPP #define ALGORITHM_HPP -#include -#include -#include -#include -#include -#include - -#include "DataSyncThread.hpp" - -// Need to link with Ws2_32.lib -#pragma comment (lib, "Ws2_32.lib") - -#define DEFAULT_PORT 10101 +#include "AlgorithmServer.hpp" class Algorithm { public: explicit Algorithm(size_t numProcs); + ~Algorithm(); virtual void loop() = 0; virtual bool loopCondition() { return false; }; - - void serverThreadRuntime(); - static DWORD serverThreadInit(LPVOID pThreadArgs) - { - Algorithm* pAlgorithmThread = (Algorithm*)pThreadArgs; - pAlgorithmThread->serverThreadRuntime(); - return NULL; - } - - // Updates Algorithm key/value store with IO key/value updates - void pollForUpdates(); private: - HANDLE hThread; - DWORD dwThreadId; - bool continueThread = false; - - size_t numIO; - std::vector ioComms; - - void startServer(); + size_t numIoProcs = 0; + AlgorithmServer* server; }; #endif diff --git a/breadcrumbs/include/AlgorithmServer.hpp b/breadcrumbs/include/AlgorithmServer.hpp index b8fe319..85520d0 100644 --- a/breadcrumbs/include/AlgorithmServer.hpp +++ b/breadcrumbs/include/AlgorithmServer.hpp @@ -2,6 +2,52 @@ #ifndef ALGORITHM_SERVER_HPP #define ALGORITHM_SERVER_HPP + +#undef UNICODE + +#define WIN32_LEAN_AND_MEAN + +#include +#include +#include +#include +#include +#include + +#include "CMakeConfig.h" +#include "DataSyncThread.hpp" + #define ALGORITHM_PORT "10101" +#define MAX_ACCEPT_FAILURES 5 + +#pragma comment (lib, "Ws2_32.lib") + +class AlgorithmServer +{ +public: + AlgorithmServer(size_t numClients); + ~AlgorithmServer(); + + void serverThreadRuntime(); + static DWORD serverThreadInit(LPVOID pThreadArgs) + { + AlgorithmServer* pAlgorithmServerThread = (AlgorithmServer*)pThreadArgs; + pAlgorithmServerThread->serverThreadRuntime(); + return NULL; + } + + // Updates Algorithm key/value store with IO key/value updates + void pollForUpdates(); + void startServer(); + void stopServer(); + +private: + HANDLE hThread; + DWORD dwThreadId; + bool continueThread = false; + + size_t numClients; + std::vector clientThreads; +}; #endif diff --git a/breadcrumbs/include/CMakeConfig.h b/breadcrumbs/include/CMakeConfig.h index 520c12d..b625ea5 100644 --- a/breadcrumbs/include/CMakeConfig.h +++ b/breadcrumbs/include/CMakeConfig.h @@ -1,4 +1,6 @@ // the configured options and settings for Tutorial -#define Tutorial_VERSION_MAJOR 1 -#define Tutorial_VERSION_MINOR 0 +#define VERSION_MAJOR 1 +#define VERSION_MINOR 0 + +#define ALGORITHM_SERVER_PORT "27634" diff --git a/breadcrumbs/include/DataSyncThread.hpp b/breadcrumbs/include/DataSyncThread.hpp index da859f9..ba17598 100644 --- a/breadcrumbs/include/DataSyncThread.hpp +++ b/breadcrumbs/include/DataSyncThread.hpp @@ -2,6 +2,8 @@ #ifndef DATA_SYNC_THREAD_HPP #define DATA_SYNC_THREAD_HPP +#define WIN32_LEAN_AND_MEAN + #include #include #include @@ -9,7 +11,7 @@ #include #include "Attribute.hpp" -#include "AlgorithmServer.hpp" +#include "CMakeConfig.h" // Need to link with Ws2_32.lib, Mswsock.lib, and Advapi32.lib #pragma comment (lib, "Ws2_32.lib") diff --git a/breadcrumbs/include/VirtualOutputProcessor.hpp b/breadcrumbs/include/VirtualOutputProcessor.hpp index b8a8ac7..067fef4 100644 --- a/breadcrumbs/include/VirtualOutputProcessor.hpp +++ b/breadcrumbs/include/VirtualOutputProcessor.hpp @@ -13,7 +13,7 @@ class VirtualOutputProcessor : public IOProcessor public: using IOProcessor::IOProcessor; - VOID threadRuntime(IOProcessor* ioProc); + void threadRuntime(IOProcessor* ioProc); }; #endif diff --git a/breadcrumbs/src/Breadcrumbs.cpp b/breadcrumbs/src/Breadcrumbs.cpp index 97b94e9..cec95a9 100644 --- a/breadcrumbs/src/Breadcrumbs.cpp +++ b/breadcrumbs/src/Breadcrumbs.cpp @@ -7,19 +7,13 @@ int main() { - Algorithm* algorithm = new AlgoBreadcrumbs(NULL, 0, 1); + Algorithm* algorithm = new AlgoBreadcrumbs(1); // Loop while (algorithm->loopCondition()) { - algorithm->pollInputs(); algorithm->loop(); - algorithm->pollOutputs(); } - // Cleanup - processor->waitForThread(); - delete processor; - return 0; } diff --git a/breadcrumbs/src/algos/AlgoBreadcrumbs.cpp b/breadcrumbs/src/algos/AlgoBreadcrumbs.cpp index 2aba742..7278eb5 100644 --- a/breadcrumbs/src/algos/AlgoBreadcrumbs.cpp +++ b/breadcrumbs/src/algos/AlgoBreadcrumbs.cpp @@ -2,12 +2,12 @@ #include "AlgoBreadcrumbs.hpp" -VOID AlgoBreadcrumbs::loop() +void AlgoBreadcrumbs::loop() { printf("Breadcrumbs algorithm loop!\n"); } -BOOLEAN AlgoBreadcrumbs::loopCondition() +bool AlgoBreadcrumbs::loopCondition() { return --iterations >= 0; } diff --git a/breadcrumbs/src/algos/Algorithm.cpp b/breadcrumbs/src/algos/Algorithm.cpp index 1448ae0..7ae56b8 100644 --- a/breadcrumbs/src/algos/Algorithm.cpp +++ b/breadcrumbs/src/algos/Algorithm.cpp @@ -2,41 +2,14 @@ #include "Algorithm.hpp" Algorithm::Algorithm(size_t numProcs) -{ - numIO = numProcs; - ioComms.reserve(numIO); - - startServer(); -} - -void Algorithm::pollForUpdates() { - /* - Polls DataSyncThreads for new updates - Updates master storage accordingly - */ - for (DataSyncThread dst : ioComms) - { - // TODO: Implement this! - } -} + numIoProcs = numProcs; + server = new AlgorithmServer(numProcs); -void Algorithm::startServer() -{ - /* - Starts the server socket thread - */ - continueThread = true; - hThread = CreateThread( - NULL, // default security attributes - 0, // use default stack size - serverThreadInit, // thread function name - this, // argument to thread function - 0, // use default creation flags - &dwThreadId); // returns the thread identifier + server->startServer(); } -void Algorithm::serverThreadRuntime() +Algorithm::~Algorithm() { - + delete server; } diff --git a/breadcrumbs/src/comms/AlgorithmServer.cpp b/breadcrumbs/src/comms/AlgorithmServer.cpp index e69de29..61f43db 100644 --- a/breadcrumbs/src/comms/AlgorithmServer.cpp +++ b/breadcrumbs/src/comms/AlgorithmServer.cpp @@ -0,0 +1,159 @@ + +#include "AlgorithmServer.hpp" + +AlgorithmServer::AlgorithmServer(size_t numClients) +{ + this->hThread = NULL; + this->dwThreadId = 0; + this->numClients = numClients; + + clientThreads.reserve(numClients); +} + +AlgorithmServer::~AlgorithmServer() +{ + this->stopServer(); +} + +void AlgorithmServer::serverThreadRuntime() +{ + int iResult; + + SOCKET ListenSocket = INVALID_SOCKET; + SOCKET ClientSocket = INVALID_SOCKET; + + struct addrinfo* result = NULL; + struct addrinfo hints; + + int iSendResult; + + ZeroMemory(&hints, sizeof(hints)); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + hints.ai_flags = AI_PASSIVE; + + // Resolve the server address and port + iResult = getaddrinfo(NULL, ALGORITHM_SERVER_PORT, &hints, &result); + if (iResult != 0) { + printf("getaddrinfo failed with error: %d\n", iResult); + WSACleanup(); + return; + } + + // Create a SOCKET for connecting to server + ListenSocket = socket(result->ai_family, result->ai_socktype, result->ai_protocol); + if (ListenSocket == INVALID_SOCKET) { + printf("socket failed with error: %ld\n", WSAGetLastError()); + freeaddrinfo(result); + WSACleanup(); + return; + } + + // Setup the TCP listening socket + iResult = bind(ListenSocket, result->ai_addr, (int)result->ai_addrlen); + if (iResult == SOCKET_ERROR) { + printf("bind failed with error: %d\n", WSAGetLastError()); + freeaddrinfo(result); + closesocket(ListenSocket); + WSACleanup(); + return; + } + freeaddrinfo(result); + + iResult = listen(ListenSocket, SOMAXCONN); + if (iResult == SOCKET_ERROR) { + printf("listen failed with error: %d\n", WSAGetLastError()); + closesocket(ListenSocket); + WSACleanup(); + return; + } + + // Select stuff + FD_SET readSet; + + int AcceptFailures = 0; + // Accept a client socket + while (continueThread) + { + int fds; + + FD_ZERO(&readSet); + FD_SET(ListenSocket, &readSet); + + if ((fds = select(0, &readSet, NULL, NULL, NULL)) == SOCKET_ERROR) + { + printf("select() returned with error %d\n", WSAGetLastError()); + break; + } + + if (fds != 0) + { + ClientSocket = accept(ListenSocket, NULL, NULL); + if (ClientSocket == INVALID_SOCKET) { + printf("accept failed with error: %d\n", WSAGetLastError()); + AcceptFailures++; + if (AcceptFailures >= MAX_ACCEPT_FAILURES) + break; + } + else { + AcceptFailures = 0; + } + + // Creating a new Data sync thread + if (clientThreads.size() < numClients) { + DataSyncThread client = *new DataSyncThread(ClientSocket); + clientThreads.push_back(client); + client.startComms(); + } + else { + printf("Client attempted connection when (%d) clients are already connected", numClients); + } + } + } + + // cleanup + closesocket(ListenSocket); + WSACleanup(); + continueThread = false; + return; +} + +void AlgorithmServer::pollForUpdates() +{ + /* + Polls DataSyncThreads for new updates + Updates master storage accordingly + */ + for (DataSyncThread dst : clientThreads) + { + // TODO: Implement this! + } +} + +void AlgorithmServer::startServer() +{ + + // Initialize Winsock + WSADATA wsaData; + int result; + if (result = WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) { + printf("WSAStartup failed with error: %d\n", result); + return; + } + + // Starts the server thread: + continueThread = true; + hThread = CreateThread( + NULL, // default security attributes + 0, // use default stack size + serverThreadInit, // thread function name + this, // argument to thread function + 0, // use default creation flags + &dwThreadId); // returns the thread identifier +} + +void AlgorithmServer::stopServer() +{ + continueThread = false; +} diff --git a/breadcrumbs/src/comms/DataSyncThread.cpp b/breadcrumbs/src/comms/DataSyncThread.cpp index d0eb563..a5efa72 100644 --- a/breadcrumbs/src/comms/DataSyncThread.cpp +++ b/breadcrumbs/src/comms/DataSyncThread.cpp @@ -7,7 +7,7 @@ void DataSyncThread::threadRuntime() /* Handles the data sync between the other end of the socket */ - int iResult; + int iResult = 1; while (continueThread && iResult > 0) { @@ -24,7 +24,7 @@ void DataSyncThread::threadRuntime() iResult = recvBytes(&attrib, sizeof(bAttrib)); if (iResult <= 0) break; - printf("Attribute update received, key:%.8s len:%us\n", attrib.key, attrib.length); + printf("Attribute update received, key:%.8s len:%us\n", attrib.key, (unsigned short) attrib.length); void* value = malloc((unsigned short)attrib.length); iResult = recvBytes(value, (unsigned short)attrib.length); if (iResult <= 0) @@ -82,6 +82,7 @@ bool DataSyncThread::stopComms() Stops the communication thread */ continueThread = false; + return true; } int DataSyncThread::connectToAlgorithm(char* serverName) @@ -104,7 +105,7 @@ int DataSyncThread::connectToAlgorithm(char* serverName) hints.ai_protocol = IPPROTO_TCP; // Resolve the server address and port - iResult = getaddrinfo(serverName, ALGORITHM_PORT, &hints, &result); + iResult = getaddrinfo(serverName, ALGORITHM_SERVER_PORT, &hints, &result); if (iResult != 0) { printf("getaddrinfo failed with error: %d\n", iResult); WSACleanup(); diff --git a/breadcrumbs/src/io/out_procs/VirtualOutputProcessor.cpp b/breadcrumbs/src/io/out_procs/VirtualOutputProcessor.cpp index e6905c0..01bf3cc 100644 --- a/breadcrumbs/src/io/out_procs/VirtualOutputProcessor.cpp +++ b/breadcrumbs/src/io/out_procs/VirtualOutputProcessor.cpp @@ -2,7 +2,7 @@ #include "VirtualOutputProcessor.hpp" -VOID VirtualOutputProcessor::threadRuntime(IOProcessor* ioProc) +void VirtualOutputProcessor::threadRuntime(IOProcessor* ioProc) { - printf("VirtualOutputProcessor %d started.\n", *((int*) threadArgs)); + printf("VirtualOutputProcessor started.\n"); }