diff --git a/breadcrumbs/CMakeConfig.h.in b/breadcrumbs/CMakeConfig.h.in index e47bb02..be22a44 100644 --- a/breadcrumbs/CMakeConfig.h.in +++ b/breadcrumbs/CMakeConfig.h.in @@ -1,4 +1,6 @@ // the configured options and settings for Tutorial -#define Tutorial_VERSION_MAJOR @Bfs_VERSION_MAJOR@ -#define Tutorial_VERSION_MINOR @Bfs_VERSION_MINOR@ \ No newline at end of file +#define VERSION_MAJOR @Bfs_VERSION_MAJOR@ +#define VERSION_MINOR @Bfs_VERSION_MINOR@ + +#define ALGORITHM_SERVER_PORT @Bfs_ALGORITHM_SERVER_PORT@ \ No newline at end of file diff --git a/breadcrumbs/CMakeLists.txt b/breadcrumbs/CMakeLists.txt index d214ccf..94f900a 100644 --- a/breadcrumbs/CMakeLists.txt +++ b/breadcrumbs/CMakeLists.txt @@ -26,6 +26,8 @@ message("Library directory: ${CMAKE_LIBRARY_OUTPUT_DIRECTORY}") message("Header file directory: ${INCLUDES_DIRECTORY}") # Configure a header file to pass some of the CMake settings to the source code +set (Bfs_ALGORITHM_SERVER_PORT \"27634\") + configure_file ( "${PROJECT_SOURCE_DIR}/CMakeConfig.h.in" "${CMAKE_INCLUDE_PATH}/CMakeConfig.h" diff --git a/breadcrumbs/include/AlgoBreadcrumbs.hpp b/breadcrumbs/include/AlgoBreadcrumbs.hpp index e9a0995..3502ff5 100644 --- a/breadcrumbs/include/AlgoBreadcrumbs.hpp +++ b/breadcrumbs/include/AlgoBreadcrumbs.hpp @@ -9,8 +9,8 @@ class AlgoBreadcrumbs : public Algorithm public: using Algorithm::Algorithm; - VOID loop(); - BOOLEAN loopCondition(); + void loop(); + bool loopCondition(); private: INT iterations = 1; }; diff --git a/breadcrumbs/include/Algorithm.hpp b/breadcrumbs/include/Algorithm.hpp index 43b92ce..8edd655 100644 --- a/breadcrumbs/include/Algorithm.hpp +++ b/breadcrumbs/include/Algorithm.hpp @@ -2,27 +2,19 @@ #ifndef ALGORITHM_HPP #define ALGORITHM_HPP -#include "IOProcessor.hpp" +#include "AlgorithmServer.hpp" class Algorithm { public: - explicit Algorithm(IOProcessor* ins, SIZE_T num_inputs, IOProcessor* outs, SIZE_T num_outputs); + explicit Algorithm(size_t numProcs); + ~Algorithm(); - virtual VOID loop() = 0; - virtual BOOLEAN loopCondition() { return FALSE; }; - - VOID startIOProcessors(); - VOID waitForIOProcessors(); - VOID stopIOProcessors(); - - // Updates Algorithm key/value store with IO key/value stores - VOID pollInputs(); - // Updates IO key/value stores with Algorithm key/value store - VOID pollOutputs(); + virtual void loop() = 0; + virtual bool loopCondition() { return false; }; private: - SIZE_T numInputs, numOutputs; - IOProcessor* inputs, *outputs; + size_t numIoProcs = 0; + AlgorithmServer* server; }; #endif diff --git a/breadcrumbs/include/AlgorithmServer.hpp b/breadcrumbs/include/AlgorithmServer.hpp new file mode 100644 index 0000000..85520d0 --- /dev/null +++ b/breadcrumbs/include/AlgorithmServer.hpp @@ -0,0 +1,53 @@ + +#ifndef ALGORITHM_SERVER_HPP +#define ALGORITHM_SERVER_HPP + + +#undef UNICODE + +#define WIN32_LEAN_AND_MEAN + +#include +#include +#include +#include +#include +#include + +#include "CMakeConfig.h" +#include "DataSyncThread.hpp" + +#define ALGORITHM_PORT "10101" +#define MAX_ACCEPT_FAILURES 5 + +#pragma comment (lib, "Ws2_32.lib") + +class AlgorithmServer +{ +public: + AlgorithmServer(size_t numClients); + ~AlgorithmServer(); + + void serverThreadRuntime(); + static DWORD serverThreadInit(LPVOID pThreadArgs) + { + AlgorithmServer* pAlgorithmServerThread = (AlgorithmServer*)pThreadArgs; + pAlgorithmServerThread->serverThreadRuntime(); + return NULL; + } + + // Updates Algorithm key/value store with IO key/value updates + void pollForUpdates(); + void startServer(); + void stopServer(); + +private: + HANDLE hThread; + DWORD dwThreadId; + bool continueThread = false; + + size_t numClients; + std::vector clientThreads; +}; + +#endif diff --git a/breadcrumbs/include/Attribute.hpp b/breadcrumbs/include/Attribute.hpp new file mode 100644 index 0000000..717ae02 --- /dev/null +++ b/breadcrumbs/include/Attribute.hpp @@ -0,0 +1,10 @@ + +#ifndef ATTRIBUTE_HPP +#define ATTRIBUTE_HPP + +typedef struct BinaryAttributeStructure { + char key[8]; + unsigned char length[2]; +} bAttrib; + +#endif \ No newline at end of file diff --git a/breadcrumbs/include/CMakeConfig.h b/breadcrumbs/include/CMakeConfig.h index 520c12d..b625ea5 100644 --- a/breadcrumbs/include/CMakeConfig.h +++ b/breadcrumbs/include/CMakeConfig.h @@ -1,4 +1,6 @@ // the configured options and settings for Tutorial -#define Tutorial_VERSION_MAJOR 1 -#define Tutorial_VERSION_MINOR 0 +#define VERSION_MAJOR 1 +#define VERSION_MINOR 0 + +#define ALGORITHM_SERVER_PORT "27634" diff --git a/breadcrumbs/include/DataSyncThread.hpp b/breadcrumbs/include/DataSyncThread.hpp new file mode 100644 index 0000000..ba17598 --- /dev/null +++ b/breadcrumbs/include/DataSyncThread.hpp @@ -0,0 +1,51 @@ + +#ifndef DATA_SYNC_THREAD_HPP +#define DATA_SYNC_THREAD_HPP + +#define WIN32_LEAN_AND_MEAN + +#include +#include +#include +#include +#include + +#include "Attribute.hpp" +#include "CMakeConfig.h" + +// Need to link with Ws2_32.lib, Mswsock.lib, and Advapi32.lib +#pragma comment (lib, "Ws2_32.lib") +#pragma comment (lib, "Mswsock.lib") +#pragma comment (lib, "AdvApi32.lib") + +/* +This class contains the thread for communicating back and forth along a socket to sync attributes + +*/ +class DataSyncThread +{ +private: + SOCKET sock; + HANDLE hThread; + DWORD dwThreadId; + bool continueThread = false; +public: + DataSyncThread(SOCKET s) { sock = s; }; + + // Synchronous functions + void threadRuntime(); + static DWORD threadInit(LPVOID pThreadArgs) + { + DataSyncThread* pDataSyncThread = (DataSyncThread*)pThreadArgs; + pDataSyncThread->threadRuntime(); + return NULL; + } + int recvBytes(void* buffer, size_t numBytes); + + // Async control + void startComms(); + bool stopComms(); + int connectToAlgorithm(char* serverName); +}; + +#endif \ No newline at end of file diff --git a/breadcrumbs/include/IOProcessor.hpp b/breadcrumbs/include/IOProcessor.hpp index f7a2cce..00fead2 100644 --- a/breadcrumbs/include/IOProcessor.hpp +++ b/breadcrumbs/include/IOProcessor.hpp @@ -1,41 +1,25 @@ -/* - * - * Stores the interface for building a sensor thread - * implementation. - * - */ - -#ifndef SENSOR_INTERFACE_HPP -#define SENSOR_INTERFACE_HPP +#ifndef IO_PROCESSOR_HPP +#define IO_PROCESSOR_HPP #include #include +/* +This class is the class that abstracts the io processor side of the data exchange between +IO processors and algorithms + +*/ class IOProcessor { public: - IOProcessor(LPCVOID pThreadArgs) { threadArgs = pThreadArgs; }; - - // Thread routine for implementation to override - virtual VOID threadRuntime(IOProcessor *ioProc) = 0; - // Thread initialization, should not be called directly! - static DWORD threadInit(LPVOID pThreadArgs); + IOProcessor() {}; // Async control - UINT8 startThread(); - BOOL waitForThread(); - - // Sync control - BOOLEAN bufferDataAvailable(); - SIZE_T getBufferData(LPCSTR* bufferKeyArray, LPCSTR* bufferValueArray); - VOID setDataStoreValue(LPCSTR key, LPCVOID value, SIZE_T valueSize); -protected: - LPCVOID threadArgs; - DWORD dwThreadId; - HANDLE hThread; + unsigned int startComms(); + bool stopComms(); }; diff --git a/breadcrumbs/include/VirtualOutputProcessor.hpp b/breadcrumbs/include/VirtualOutputProcessor.hpp index b8a8ac7..067fef4 100644 --- a/breadcrumbs/include/VirtualOutputProcessor.hpp +++ b/breadcrumbs/include/VirtualOutputProcessor.hpp @@ -13,7 +13,7 @@ class VirtualOutputProcessor : public IOProcessor public: using IOProcessor::IOProcessor; - VOID threadRuntime(IOProcessor* ioProc); + void threadRuntime(IOProcessor* ioProc); }; #endif diff --git a/breadcrumbs/scripts/startbfs.py b/breadcrumbs/scripts/startbfs.py new file mode 100644 index 0000000..618e877 --- /dev/null +++ b/breadcrumbs/scripts/startbfs.py @@ -0,0 +1,24 @@ + +import os +import multiprocessing as mp +import subprocess + + +def start_program(command): + cmd_list = command.split(" ") + return subprocess.call(cmd_list) + + +def start_program_no_hang(command): + print("Running %s from cwd %s" % (command, os.getcwd())) + proc = mp.Process(target=start_program, args=(command,)) + proc.start() + return proc + + +def main(): + start_program_no_hang("start cmd.exe /k \"..\\bin\\Breadcrumbs.exe\"") + + +if __name__ == "__main__": + main() diff --git a/breadcrumbs/src/Breadcrumbs.cpp b/breadcrumbs/src/Breadcrumbs.cpp index 71f5809..cec95a9 100644 --- a/breadcrumbs/src/Breadcrumbs.cpp +++ b/breadcrumbs/src/Breadcrumbs.cpp @@ -7,26 +7,13 @@ int main() { - - // Initialization - int threadID = 0; - IOProcessor *processor = new VirtualOutputProcessor((LPCVOID) &threadID); - Algorithm* algorithm = new AlgoBreadcrumbs(NULL, 0, processor, 1); - - // Starting - algorithm->startIOProcessors(); + Algorithm* algorithm = new AlgoBreadcrumbs(1); // Loop while (algorithm->loopCondition()) { - algorithm->pollInputs(); algorithm->loop(); - algorithm->pollOutputs(); } - // Cleanup - processor->waitForThread(); - delete processor; - return 0; } diff --git a/breadcrumbs/src/algos/AlgoBreadcrumbs.cpp b/breadcrumbs/src/algos/AlgoBreadcrumbs.cpp index 2aba742..7278eb5 100644 --- a/breadcrumbs/src/algos/AlgoBreadcrumbs.cpp +++ b/breadcrumbs/src/algos/AlgoBreadcrumbs.cpp @@ -2,12 +2,12 @@ #include "AlgoBreadcrumbs.hpp" -VOID AlgoBreadcrumbs::loop() +void AlgoBreadcrumbs::loop() { printf("Breadcrumbs algorithm loop!\n"); } -BOOLEAN AlgoBreadcrumbs::loopCondition() +bool AlgoBreadcrumbs::loopCondition() { return --iterations >= 0; } diff --git a/breadcrumbs/src/algos/Algorithm.cpp b/breadcrumbs/src/algos/Algorithm.cpp index 5487e55..7ae56b8 100644 --- a/breadcrumbs/src/algos/Algorithm.cpp +++ b/breadcrumbs/src/algos/Algorithm.cpp @@ -1,41 +1,15 @@ #include "Algorithm.hpp" -Algorithm::Algorithm(IOProcessor* ins, SIZE_T numIns, IOProcessor* outs, SIZE_T numOuts) -{ - numInputs = numIns; numOutputs = numOuts; - inputs = ins; outputs = outs; -} - -VOID Algorithm::startIOProcessors() -{ - int i; - for (i = 0; i < numInputs; i++) - (inputs + i)->startThread(); - for (i = 0; i < numOutputs; i++) - (outputs + i)->startThread(); -} - -VOID Algorithm::waitForIOProcessors() -{ - int i; - for (i = 0; i < numInputs; i++) - (inputs + i)->waitForThread(); - for (i = 0; i < numOutputs; i++) - (outputs + i)->waitForThread(); -} - -VOID Algorithm::stopIOProcessors() -{ - waitForIOProcessors(); -} - -VOID Algorithm::pollInputs() +Algorithm::Algorithm(size_t numProcs) { + numIoProcs = numProcs; + server = new AlgorithmServer(numProcs); + server->startServer(); } -VOID Algorithm::pollOutputs() +Algorithm::~Algorithm() { - + delete server; } diff --git a/breadcrumbs/src/comms/AlgorithmServer.cpp b/breadcrumbs/src/comms/AlgorithmServer.cpp new file mode 100644 index 0000000..61f43db --- /dev/null +++ b/breadcrumbs/src/comms/AlgorithmServer.cpp @@ -0,0 +1,159 @@ + +#include "AlgorithmServer.hpp" + +AlgorithmServer::AlgorithmServer(size_t numClients) +{ + this->hThread = NULL; + this->dwThreadId = 0; + this->numClients = numClients; + + clientThreads.reserve(numClients); +} + +AlgorithmServer::~AlgorithmServer() +{ + this->stopServer(); +} + +void AlgorithmServer::serverThreadRuntime() +{ + int iResult; + + SOCKET ListenSocket = INVALID_SOCKET; + SOCKET ClientSocket = INVALID_SOCKET; + + struct addrinfo* result = NULL; + struct addrinfo hints; + + int iSendResult; + + ZeroMemory(&hints, sizeof(hints)); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + hints.ai_flags = AI_PASSIVE; + + // Resolve the server address and port + iResult = getaddrinfo(NULL, ALGORITHM_SERVER_PORT, &hints, &result); + if (iResult != 0) { + printf("getaddrinfo failed with error: %d\n", iResult); + WSACleanup(); + return; + } + + // Create a SOCKET for connecting to server + ListenSocket = socket(result->ai_family, result->ai_socktype, result->ai_protocol); + if (ListenSocket == INVALID_SOCKET) { + printf("socket failed with error: %ld\n", WSAGetLastError()); + freeaddrinfo(result); + WSACleanup(); + return; + } + + // Setup the TCP listening socket + iResult = bind(ListenSocket, result->ai_addr, (int)result->ai_addrlen); + if (iResult == SOCKET_ERROR) { + printf("bind failed with error: %d\n", WSAGetLastError()); + freeaddrinfo(result); + closesocket(ListenSocket); + WSACleanup(); + return; + } + freeaddrinfo(result); + + iResult = listen(ListenSocket, SOMAXCONN); + if (iResult == SOCKET_ERROR) { + printf("listen failed with error: %d\n", WSAGetLastError()); + closesocket(ListenSocket); + WSACleanup(); + return; + } + + // Select stuff + FD_SET readSet; + + int AcceptFailures = 0; + // Accept a client socket + while (continueThread) + { + int fds; + + FD_ZERO(&readSet); + FD_SET(ListenSocket, &readSet); + + if ((fds = select(0, &readSet, NULL, NULL, NULL)) == SOCKET_ERROR) + { + printf("select() returned with error %d\n", WSAGetLastError()); + break; + } + + if (fds != 0) + { + ClientSocket = accept(ListenSocket, NULL, NULL); + if (ClientSocket == INVALID_SOCKET) { + printf("accept failed with error: %d\n", WSAGetLastError()); + AcceptFailures++; + if (AcceptFailures >= MAX_ACCEPT_FAILURES) + break; + } + else { + AcceptFailures = 0; + } + + // Creating a new Data sync thread + if (clientThreads.size() < numClients) { + DataSyncThread client = *new DataSyncThread(ClientSocket); + clientThreads.push_back(client); + client.startComms(); + } + else { + printf("Client attempted connection when (%d) clients are already connected", numClients); + } + } + } + + // cleanup + closesocket(ListenSocket); + WSACleanup(); + continueThread = false; + return; +} + +void AlgorithmServer::pollForUpdates() +{ + /* + Polls DataSyncThreads for new updates + Updates master storage accordingly + */ + for (DataSyncThread dst : clientThreads) + { + // TODO: Implement this! + } +} + +void AlgorithmServer::startServer() +{ + + // Initialize Winsock + WSADATA wsaData; + int result; + if (result = WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) { + printf("WSAStartup failed with error: %d\n", result); + return; + } + + // Starts the server thread: + continueThread = true; + hThread = CreateThread( + NULL, // default security attributes + 0, // use default stack size + serverThreadInit, // thread function name + this, // argument to thread function + 0, // use default creation flags + &dwThreadId); // returns the thread identifier +} + +void AlgorithmServer::stopServer() +{ + continueThread = false; +} diff --git a/breadcrumbs/src/comms/DataSyncThread.cpp b/breadcrumbs/src/comms/DataSyncThread.cpp new file mode 100644 index 0000000..a5efa72 --- /dev/null +++ b/breadcrumbs/src/comms/DataSyncThread.cpp @@ -0,0 +1,144 @@ + +#include "DataSyncThread.hpp" + + +void DataSyncThread::threadRuntime() +{ + /* + Handles the data sync between the other end of the socket + */ + int iResult = 1; + + while (continueThread && iResult > 0) + { + // Master while loop that will receive incoming messages + char commandByte; + iResult = recvBytes(&commandByte, 1); + if (iResult <= 0) + break; + + switch (commandByte) + { + case 0: + bAttrib attrib; + iResult = recvBytes(&attrib, sizeof(bAttrib)); + if (iResult <= 0) + break; + printf("Attribute update received, key:%.8s len:%us\n", attrib.key, (unsigned short) attrib.length); + void* value = malloc((unsigned short)attrib.length); + iResult = recvBytes(value, (unsigned short)attrib.length); + if (iResult <= 0) + break; + // TODO: Storing the attrib update + + break; + } + } + + continueThread = false; +} + +int DataSyncThread::recvBytes(void* buffer, size_t numBytes) +{ + size_t bytesRecved = 0; + int iResult; + + while (bytesRecved < numBytes) + { + iResult = recv(sock, ((char*) buffer) + bytesRecved, numBytes - bytesRecved, 0); + if (iResult < 0) + { + printf("recv failed with error: %d\n", WSAGetLastError()); + return iResult; + } + else if (iResult == 0) { + printf("recv returned 0, connection closed.\n"); + return iResult; + } + else + bytesRecved += iResult; + } + return numBytes; +} + +void DataSyncThread::startComms() +{ + /* + Initializes the communication thread + */ + continueThread = true; + hThread = CreateThread( + NULL, // default security attributes + 0, // use default stack size + threadInit, // thread function name + this, // argument to thread function + 0, // use default creation flags + &dwThreadId); // returns the thread identifier +} + +bool DataSyncThread::stopComms() +{ + /* + Stops the communication thread + */ + continueThread = false; + return true; +} + +int DataSyncThread::connectToAlgorithm(char* serverName) +{ + sock = INVALID_SOCKET; + WSADATA wsaData; + struct addrinfo* result = NULL,* ptr = NULL, hints; + int iResult; + + // Initialize Winsock + iResult = WSAStartup(MAKEWORD(2, 2), &wsaData); + if (iResult != 0) { + printf("WSAStartup failed with error: %d\n", iResult); + return 1; + } + + ZeroMemory(&hints, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + + // Resolve the server address and port + iResult = getaddrinfo(serverName, ALGORITHM_SERVER_PORT, &hints, &result); + if (iResult != 0) { + printf("getaddrinfo failed with error: %d\n", iResult); + WSACleanup(); + return 1; + } + + // Attempt to connect to an address until one succeeds + for (ptr = result; ptr != NULL; ptr = ptr->ai_next) { + + // Create a SOCKET for connecting to server + sock = socket(ptr->ai_family, ptr->ai_socktype, + ptr->ai_protocol); + if (sock == INVALID_SOCKET) { + printf("socket failed with error: %ld\n", WSAGetLastError()); + WSACleanup(); + return 1; + } + + // Connect to server. + iResult = connect(sock, ptr->ai_addr, (int)ptr->ai_addrlen); + if (iResult == SOCKET_ERROR) { + closesocket(sock); + sock = INVALID_SOCKET; + continue; + } + break; + } + + freeaddrinfo(result); + + if (sock == INVALID_SOCKET) { + printf("Unable to connect to server!\n"); + WSACleanup(); + return 1; + } +} diff --git a/breadcrumbs/src/io/IOProcessor.cpp b/breadcrumbs/src/io/IOProcessor.cpp index 24c70cd..437de23 100644 --- a/breadcrumbs/src/io/IOProcessor.cpp +++ b/breadcrumbs/src/io/IOProcessor.cpp @@ -2,42 +2,3 @@ #include "IOProcessor.hpp" -DWORD IOProcessor::threadInit(LPVOID pThreadArgs) -{ - IOProcessor* pIOProcessor = (IOProcessor*)pThreadArgs; - pIOProcessor->threadRuntime(pIOProcessor); - return 0; -} - -UINT8 IOProcessor::startThread() -{ - hThread = CreateThread( - NULL, // default security attributes - 0, // use default stack size - threadInit, // thread function name - this, // argument to thread function - 0, // use default creation flags - &dwThreadId); // returns the thread identifier - return TRUE; -} - -BOOL IOProcessor::waitForThread() -{ - WaitForSingleObject(hThread, INFINITE); - return TRUE; -} - -BOOLEAN bufferDataAvailable() -{ - return FALSE; -} - -SIZE_T getBufferData(LPCSTR* bufferKeyArray, LPCSTR* bufferValueArray) -{ - return 0; -} - -VOID setDataStoreValue(LPCSTR key, LPCVOID value, SIZE_T valueSize) -{ - -} diff --git a/breadcrumbs/src/io/out_procs/VirtualOutputProcessor.cpp b/breadcrumbs/src/io/out_procs/VirtualOutputProcessor.cpp index e6905c0..01bf3cc 100644 --- a/breadcrumbs/src/io/out_procs/VirtualOutputProcessor.cpp +++ b/breadcrumbs/src/io/out_procs/VirtualOutputProcessor.cpp @@ -2,7 +2,7 @@ #include "VirtualOutputProcessor.hpp" -VOID VirtualOutputProcessor::threadRuntime(IOProcessor* ioProc) +void VirtualOutputProcessor::threadRuntime(IOProcessor* ioProc) { - printf("VirtualOutputProcessor %d started.\n", *((int*) threadArgs)); + printf("VirtualOutputProcessor started.\n"); }