From 94cc09c9877289452d501a681955786ce83d259e Mon Sep 17 00:00:00 2001 From: Greg Foss Date: Sun, 1 Dec 2019 17:39:56 -0500 Subject: [PATCH] First draft adding client code, skeleton of server code. --- breadcrumbs/include/Algorithm.hpp | 46 +++++-- breadcrumbs/include/AlgorithmServer.hpp | 7 ++ breadcrumbs/include/Attribute.hpp | 10 ++ breadcrumbs/include/DataSyncThread.hpp | 49 ++++++++ breadcrumbs/include/IOProcessor.hpp | 36 ++---- breadcrumbs/scripts/startbfs.py | 24 ++++ breadcrumbs/src/Breadcrumbs.cpp | 9 +- breadcrumbs/src/algos/Algorithm.cpp | 55 +++++---- breadcrumbs/src/comms/AlgorithmServer.cpp | 0 breadcrumbs/src/comms/DataSyncThread.cpp | 143 ++++++++++++++++++++++ breadcrumbs/src/io/IOProcessor.cpp | 39 ------ 11 files changed, 305 insertions(+), 113 deletions(-) create mode 100644 breadcrumbs/include/AlgorithmServer.hpp create mode 100644 breadcrumbs/include/Attribute.hpp create mode 100644 breadcrumbs/include/DataSyncThread.hpp create mode 100644 breadcrumbs/scripts/startbfs.py create mode 100644 breadcrumbs/src/comms/AlgorithmServer.cpp create mode 100644 breadcrumbs/src/comms/DataSyncThread.cpp diff --git a/breadcrumbs/include/Algorithm.hpp b/breadcrumbs/include/Algorithm.hpp index 43b92ce..9cea37f 100644 --- a/breadcrumbs/include/Algorithm.hpp +++ b/breadcrumbs/include/Algorithm.hpp @@ -2,27 +2,47 @@ #ifndef ALGORITHM_HPP #define ALGORITHM_HPP -#include "IOProcessor.hpp" +#include +#include +#include +#include +#include +#include + +#include "DataSyncThread.hpp" + +// Need to link with Ws2_32.lib +#pragma comment (lib, "Ws2_32.lib") + +#define DEFAULT_PORT 10101 class Algorithm { public: - explicit Algorithm(IOProcessor* ins, SIZE_T num_inputs, IOProcessor* outs, SIZE_T num_outputs); + explicit Algorithm(size_t numProcs); - virtual VOID loop() = 0; - virtual BOOLEAN loopCondition() { return FALSE; }; + virtual void loop() = 0; + virtual bool loopCondition() { return false; }; - VOID startIOProcessors(); - VOID waitForIOProcessors(); - VOID stopIOProcessors(); + void serverThreadRuntime(); + static DWORD serverThreadInit(LPVOID pThreadArgs) + { + Algorithm* pAlgorithmThread = (Algorithm*)pThreadArgs; + pAlgorithmThread->serverThreadRuntime(); + return NULL; + } - // Updates Algorithm key/value store with IO key/value stores - VOID pollInputs(); - // Updates IO key/value stores with Algorithm key/value store - VOID pollOutputs(); + // Updates Algorithm key/value store with IO key/value updates + void pollForUpdates(); private: - SIZE_T numInputs, numOutputs; - IOProcessor* inputs, *outputs; + HANDLE hThread; + DWORD dwThreadId; + bool continueThread = false; + + size_t numIO; + std::vector ioComms; + + void startServer(); }; #endif diff --git a/breadcrumbs/include/AlgorithmServer.hpp b/breadcrumbs/include/AlgorithmServer.hpp new file mode 100644 index 0000000..b8fe319 --- /dev/null +++ b/breadcrumbs/include/AlgorithmServer.hpp @@ -0,0 +1,7 @@ + +#ifndef ALGORITHM_SERVER_HPP +#define ALGORITHM_SERVER_HPP + +#define ALGORITHM_PORT "10101" + +#endif diff --git a/breadcrumbs/include/Attribute.hpp b/breadcrumbs/include/Attribute.hpp new file mode 100644 index 0000000..717ae02 --- /dev/null +++ b/breadcrumbs/include/Attribute.hpp @@ -0,0 +1,10 @@ + +#ifndef ATTRIBUTE_HPP +#define ATTRIBUTE_HPP + +typedef struct BinaryAttributeStructure { + char key[8]; + unsigned char length[2]; +} bAttrib; + +#endif \ No newline at end of file diff --git a/breadcrumbs/include/DataSyncThread.hpp b/breadcrumbs/include/DataSyncThread.hpp new file mode 100644 index 0000000..da859f9 --- /dev/null +++ b/breadcrumbs/include/DataSyncThread.hpp @@ -0,0 +1,49 @@ + +#ifndef DATA_SYNC_THREAD_HPP +#define DATA_SYNC_THREAD_HPP + +#include +#include +#include +#include +#include + +#include "Attribute.hpp" +#include "AlgorithmServer.hpp" + +// Need to link with Ws2_32.lib, Mswsock.lib, and Advapi32.lib +#pragma comment (lib, "Ws2_32.lib") +#pragma comment (lib, "Mswsock.lib") +#pragma comment (lib, "AdvApi32.lib") + +/* +This class contains the thread for communicating back and forth along a socket to sync attributes + +*/ +class DataSyncThread +{ +private: + SOCKET sock; + HANDLE hThread; + DWORD dwThreadId; + bool continueThread = false; +public: + DataSyncThread(SOCKET s) { sock = s; }; + + // Synchronous functions + void threadRuntime(); + static DWORD threadInit(LPVOID pThreadArgs) + { + DataSyncThread* pDataSyncThread = (DataSyncThread*)pThreadArgs; + pDataSyncThread->threadRuntime(); + return NULL; + } + int recvBytes(void* buffer, size_t numBytes); + + // Async control + void startComms(); + bool stopComms(); + int connectToAlgorithm(char* serverName); +}; + +#endif \ No newline at end of file diff --git a/breadcrumbs/include/IOProcessor.hpp b/breadcrumbs/include/IOProcessor.hpp index f7a2cce..00fead2 100644 --- a/breadcrumbs/include/IOProcessor.hpp +++ b/breadcrumbs/include/IOProcessor.hpp @@ -1,41 +1,25 @@ -/* - * - * Stores the interface for building a sensor thread - * implementation. - * - */ - -#ifndef SENSOR_INTERFACE_HPP -#define SENSOR_INTERFACE_HPP +#ifndef IO_PROCESSOR_HPP +#define IO_PROCESSOR_HPP #include #include +/* +This class is the class that abstracts the io processor side of the data exchange between +IO processors and algorithms + +*/ class IOProcessor { public: - IOProcessor(LPCVOID pThreadArgs) { threadArgs = pThreadArgs; }; - - // Thread routine for implementation to override - virtual VOID threadRuntime(IOProcessor *ioProc) = 0; - // Thread initialization, should not be called directly! - static DWORD threadInit(LPVOID pThreadArgs); + IOProcessor() {}; // Async control - UINT8 startThread(); - BOOL waitForThread(); - - // Sync control - BOOLEAN bufferDataAvailable(); - SIZE_T getBufferData(LPCSTR* bufferKeyArray, LPCSTR* bufferValueArray); - VOID setDataStoreValue(LPCSTR key, LPCVOID value, SIZE_T valueSize); -protected: - LPCVOID threadArgs; - DWORD dwThreadId; - HANDLE hThread; + unsigned int startComms(); + bool stopComms(); }; diff --git a/breadcrumbs/scripts/startbfs.py b/breadcrumbs/scripts/startbfs.py new file mode 100644 index 0000000..618e877 --- /dev/null +++ b/breadcrumbs/scripts/startbfs.py @@ -0,0 +1,24 @@ + +import os +import multiprocessing as mp +import subprocess + + +def start_program(command): + cmd_list = command.split(" ") + return subprocess.call(cmd_list) + + +def start_program_no_hang(command): + print("Running %s from cwd %s" % (command, os.getcwd())) + proc = mp.Process(target=start_program, args=(command,)) + proc.start() + return proc + + +def main(): + start_program_no_hang("start cmd.exe /k \"..\\bin\\Breadcrumbs.exe\"") + + +if __name__ == "__main__": + main() diff --git a/breadcrumbs/src/Breadcrumbs.cpp b/breadcrumbs/src/Breadcrumbs.cpp index 71f5809..97b94e9 100644 --- a/breadcrumbs/src/Breadcrumbs.cpp +++ b/breadcrumbs/src/Breadcrumbs.cpp @@ -7,14 +7,7 @@ int main() { - - // Initialization - int threadID = 0; - IOProcessor *processor = new VirtualOutputProcessor((LPCVOID) &threadID); - Algorithm* algorithm = new AlgoBreadcrumbs(NULL, 0, processor, 1); - - // Starting - algorithm->startIOProcessors(); + Algorithm* algorithm = new AlgoBreadcrumbs(NULL, 0, 1); // Loop while (algorithm->loopCondition()) diff --git a/breadcrumbs/src/algos/Algorithm.cpp b/breadcrumbs/src/algos/Algorithm.cpp index 5487e55..1448ae0 100644 --- a/breadcrumbs/src/algos/Algorithm.cpp +++ b/breadcrumbs/src/algos/Algorithm.cpp @@ -1,41 +1,42 @@ #include "Algorithm.hpp" -Algorithm::Algorithm(IOProcessor* ins, SIZE_T numIns, IOProcessor* outs, SIZE_T numOuts) +Algorithm::Algorithm(size_t numProcs) { - numInputs = numIns; numOutputs = numOuts; - inputs = ins; outputs = outs; + numIO = numProcs; + ioComms.reserve(numIO); + + startServer(); } -VOID Algorithm::startIOProcessors() +void Algorithm::pollForUpdates() { - int i; - for (i = 0; i < numInputs; i++) - (inputs + i)->startThread(); - for (i = 0; i < numOutputs; i++) - (outputs + i)->startThread(); + /* + Polls DataSyncThreads for new updates + Updates master storage accordingly + */ + for (DataSyncThread dst : ioComms) + { + // TODO: Implement this! + } } -VOID Algorithm::waitForIOProcessors() +void Algorithm::startServer() { - int i; - for (i = 0; i < numInputs; i++) - (inputs + i)->waitForThread(); - for (i = 0; i < numOutputs; i++) - (outputs + i)->waitForThread(); + /* + Starts the server socket thread + */ + continueThread = true; + hThread = CreateThread( + NULL, // default security attributes + 0, // use default stack size + serverThreadInit, // thread function name + this, // argument to thread function + 0, // use default creation flags + &dwThreadId); // returns the thread identifier } -VOID Algorithm::stopIOProcessors() +void Algorithm::serverThreadRuntime() { - waitForIOProcessors(); -} - -VOID Algorithm::pollInputs() -{ - -} - -VOID Algorithm::pollOutputs() -{ - + } diff --git a/breadcrumbs/src/comms/AlgorithmServer.cpp b/breadcrumbs/src/comms/AlgorithmServer.cpp new file mode 100644 index 0000000..e69de29 diff --git a/breadcrumbs/src/comms/DataSyncThread.cpp b/breadcrumbs/src/comms/DataSyncThread.cpp new file mode 100644 index 0000000..d0eb563 --- /dev/null +++ b/breadcrumbs/src/comms/DataSyncThread.cpp @@ -0,0 +1,143 @@ + +#include "DataSyncThread.hpp" + + +void DataSyncThread::threadRuntime() +{ + /* + Handles the data sync between the other end of the socket + */ + int iResult; + + while (continueThread && iResult > 0) + { + // Master while loop that will receive incoming messages + char commandByte; + iResult = recvBytes(&commandByte, 1); + if (iResult <= 0) + break; + + switch (commandByte) + { + case 0: + bAttrib attrib; + iResult = recvBytes(&attrib, sizeof(bAttrib)); + if (iResult <= 0) + break; + printf("Attribute update received, key:%.8s len:%us\n", attrib.key, attrib.length); + void* value = malloc((unsigned short)attrib.length); + iResult = recvBytes(value, (unsigned short)attrib.length); + if (iResult <= 0) + break; + // TODO: Storing the attrib update + + break; + } + } + + continueThread = false; +} + +int DataSyncThread::recvBytes(void* buffer, size_t numBytes) +{ + size_t bytesRecved = 0; + int iResult; + + while (bytesRecved < numBytes) + { + iResult = recv(sock, ((char*) buffer) + bytesRecved, numBytes - bytesRecved, 0); + if (iResult < 0) + { + printf("recv failed with error: %d\n", WSAGetLastError()); + return iResult; + } + else if (iResult == 0) { + printf("recv returned 0, connection closed.\n"); + return iResult; + } + else + bytesRecved += iResult; + } + return numBytes; +} + +void DataSyncThread::startComms() +{ + /* + Initializes the communication thread + */ + continueThread = true; + hThread = CreateThread( + NULL, // default security attributes + 0, // use default stack size + threadInit, // thread function name + this, // argument to thread function + 0, // use default creation flags + &dwThreadId); // returns the thread identifier +} + +bool DataSyncThread::stopComms() +{ + /* + Stops the communication thread + */ + continueThread = false; +} + +int DataSyncThread::connectToAlgorithm(char* serverName) +{ + sock = INVALID_SOCKET; + WSADATA wsaData; + struct addrinfo* result = NULL,* ptr = NULL, hints; + int iResult; + + // Initialize Winsock + iResult = WSAStartup(MAKEWORD(2, 2), &wsaData); + if (iResult != 0) { + printf("WSAStartup failed with error: %d\n", iResult); + return 1; + } + + ZeroMemory(&hints, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + + // Resolve the server address and port + iResult = getaddrinfo(serverName, ALGORITHM_PORT, &hints, &result); + if (iResult != 0) { + printf("getaddrinfo failed with error: %d\n", iResult); + WSACleanup(); + return 1; + } + + // Attempt to connect to an address until one succeeds + for (ptr = result; ptr != NULL; ptr = ptr->ai_next) { + + // Create a SOCKET for connecting to server + sock = socket(ptr->ai_family, ptr->ai_socktype, + ptr->ai_protocol); + if (sock == INVALID_SOCKET) { + printf("socket failed with error: %ld\n", WSAGetLastError()); + WSACleanup(); + return 1; + } + + // Connect to server. + iResult = connect(sock, ptr->ai_addr, (int)ptr->ai_addrlen); + if (iResult == SOCKET_ERROR) { + closesocket(sock); + sock = INVALID_SOCKET; + continue; + } + break; + } + + freeaddrinfo(result); + + if (sock == INVALID_SOCKET) { + printf("Unable to connect to server!\n"); + WSACleanup(); + return 1; + } +} diff --git a/breadcrumbs/src/io/IOProcessor.cpp b/breadcrumbs/src/io/IOProcessor.cpp index 24c70cd..437de23 100644 --- a/breadcrumbs/src/io/IOProcessor.cpp +++ b/breadcrumbs/src/io/IOProcessor.cpp @@ -2,42 +2,3 @@ #include "IOProcessor.hpp" -DWORD IOProcessor::threadInit(LPVOID pThreadArgs) -{ - IOProcessor* pIOProcessor = (IOProcessor*)pThreadArgs; - pIOProcessor->threadRuntime(pIOProcessor); - return 0; -} - -UINT8 IOProcessor::startThread() -{ - hThread = CreateThread( - NULL, // default security attributes - 0, // use default stack size - threadInit, // thread function name - this, // argument to thread function - 0, // use default creation flags - &dwThreadId); // returns the thread identifier - return TRUE; -} - -BOOL IOProcessor::waitForThread() -{ - WaitForSingleObject(hThread, INFINITE); - return TRUE; -} - -BOOLEAN bufferDataAvailable() -{ - return FALSE; -} - -SIZE_T getBufferData(LPCSTR* bufferKeyArray, LPCSTR* bufferValueArray) -{ - return 0; -} - -VOID setDataStoreValue(LPCSTR key, LPCVOID value, SIZE_T valueSize) -{ - -}