From 40d3d86ba268593b82c178aaad53a97ada91d44e Mon Sep 17 00:00:00 2001 From: Greg Foss Date: Tue, 28 Jan 2020 13:18:51 -0500 Subject: [PATCH 1/7] Reconfiguring build path and main folder to handle multiple compiled executables --- breadcrumbs/CMakeLists.txt | 12 ++++++++---- breadcrumbs/scripts/startbfs.py | 3 ++- breadcrumbs/src/io/IOProcessor.cpp | 4 ---- breadcrumbs/src/io/in_procs/.blank | 0 breadcrumbs/src/io/out_procs/.blank | 0 .../src/io/out_procs/VirtualOutputProcessor.cpp | 8 -------- breadcrumbs/src/{ => main}/Breadcrumbs.cpp | 0 breadcrumbs/src/main/VirtualOutputProcessor.cpp | 5 +++++ 8 files changed, 15 insertions(+), 17 deletions(-) delete mode 100644 breadcrumbs/src/io/IOProcessor.cpp delete mode 100644 breadcrumbs/src/io/in_procs/.blank delete mode 100644 breadcrumbs/src/io/out_procs/.blank delete mode 100644 breadcrumbs/src/io/out_procs/VirtualOutputProcessor.cpp rename breadcrumbs/src/{ => main}/Breadcrumbs.cpp (100%) create mode 100644 breadcrumbs/src/main/VirtualOutputProcessor.cpp diff --git a/breadcrumbs/CMakeLists.txt b/breadcrumbs/CMakeLists.txt index 94f900a..d21bf8c 100644 --- a/breadcrumbs/CMakeLists.txt +++ b/breadcrumbs/CMakeLists.txt @@ -23,7 +23,7 @@ message("Source directory: ${CMAKE_SOURCE_DIR}") message("Build directory: ${CMAKE_BINARY_DIR}") message("Executable directory: ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}") message("Library directory: ${CMAKE_LIBRARY_OUTPUT_DIRECTORY}") -message("Header file directory: ${INCLUDES_DIRECTORY}") +message("Header file directory: ${INCLUSDES_DIRECTORY}") # Configure a header file to pass some of the CMake settings to the source code set (Bfs_ALGORITHM_SERVER_PORT \"27634\") @@ -40,8 +40,12 @@ include_directories("${CMAKE_SOURCE_DIR}") # puts all .cpp files inside src to the SOURCES variable # TODO: replace this with a script for collecting cpp files -file(GLOB_RECURSE SOURCES CONFIGURE_DEPENDS "${CMAKE_SOURCE_DIR}/*.cpp") -message("Source files: ${SOURCES}") +file(GLOB_RECURSE ALGOS CONFIGURE_DEPENDS "${CMAKE_SOURCE_DIR}/algos/*.cpp") +file(GLOB_RECURSE COMMS CONFIGURE_DEPENDS "${CMAKE_SOURCE_DIR}/comms/*.cpp") +file(GLOB_RECURSE CONFIG CONFIGURE_DEPENDS "${CMAKE_SOURCE_DIR}/config/*.cpp") +file(GLOB_RECURSE LOG CONFIGURE_DEPENDS "${CMAKE_SOURCE_DIR}/logging/*.cpp") +message("Source files: ${ALGOS}") # Adding executables -add_executable(Breadcrumbs ${SOURCES}) +add_executable(Breadcrumbs "${CMAKE_SOURCE_DIR}/main/Breadcrumbs.cpp" ${ALGOS} ${COMMS} ${CONFIG} ${LOG}) +add_executable(VirtualOutputProcessor "${CMAKE_SOURCE_DIR}/main/VirtualOutputProcessor.cpp" ${ALGOS} ${COMMS} ${CONFIG} ${LOG}) diff --git a/breadcrumbs/scripts/startbfs.py b/breadcrumbs/scripts/startbfs.py index 618e877..e9eab98 100644 --- a/breadcrumbs/scripts/startbfs.py +++ b/breadcrumbs/scripts/startbfs.py @@ -17,7 +17,8 @@ def start_program_no_hang(command): def main(): - start_program_no_hang("start cmd.exe /k \"..\\bin\\Breadcrumbs.exe\"") + os.system("cmd /c ..\\bin\\Breadcrumbs.exe") + print("DONE") if __name__ == "__main__": diff --git a/breadcrumbs/src/io/IOProcessor.cpp b/breadcrumbs/src/io/IOProcessor.cpp deleted file mode 100644 index 437de23..0000000 --- a/breadcrumbs/src/io/IOProcessor.cpp +++ /dev/null @@ -1,4 +0,0 @@ - -#include "IOProcessor.hpp" - - diff --git a/breadcrumbs/src/io/in_procs/.blank b/breadcrumbs/src/io/in_procs/.blank deleted file mode 100644 index e69de29..0000000 diff --git a/breadcrumbs/src/io/out_procs/.blank b/breadcrumbs/src/io/out_procs/.blank deleted file mode 100644 index e69de29..0000000 diff --git a/breadcrumbs/src/io/out_procs/VirtualOutputProcessor.cpp b/breadcrumbs/src/io/out_procs/VirtualOutputProcessor.cpp deleted file mode 100644 index 01bf3cc..0000000 --- a/breadcrumbs/src/io/out_procs/VirtualOutputProcessor.cpp +++ /dev/null @@ -1,8 +0,0 @@ - -#include "VirtualOutputProcessor.hpp" - - -void VirtualOutputProcessor::threadRuntime(IOProcessor* ioProc) -{ - printf("VirtualOutputProcessor started.\n"); -} diff --git a/breadcrumbs/src/Breadcrumbs.cpp b/breadcrumbs/src/main/Breadcrumbs.cpp similarity index 100% rename from breadcrumbs/src/Breadcrumbs.cpp rename to breadcrumbs/src/main/Breadcrumbs.cpp diff --git a/breadcrumbs/src/main/VirtualOutputProcessor.cpp b/breadcrumbs/src/main/VirtualOutputProcessor.cpp new file mode 100644 index 0000000..e5da27c --- /dev/null +++ b/breadcrumbs/src/main/VirtualOutputProcessor.cpp @@ -0,0 +1,5 @@ + +int main() +{ + return 0; +} From 7853c716b7494205b1662bb73271f2bba461216f Mon Sep 17 00:00:00 2001 From: Greg Foss Date: Tue, 28 Jan 2020 13:40:34 -0500 Subject: [PATCH 2/7] Automating the addition of new exe files --- breadcrumbs/CMakeLists.txt | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/breadcrumbs/CMakeLists.txt b/breadcrumbs/CMakeLists.txt index d21bf8c..a0eb54b 100644 --- a/breadcrumbs/CMakeLists.txt +++ b/breadcrumbs/CMakeLists.txt @@ -44,8 +44,14 @@ file(GLOB_RECURSE ALGOS CONFIGURE_DEPENDS "${CMAKE_SOURCE_DIR}/algos/*.cpp") file(GLOB_RECURSE COMMS CONFIGURE_DEPENDS "${CMAKE_SOURCE_DIR}/comms/*.cpp") file(GLOB_RECURSE CONFIG CONFIGURE_DEPENDS "${CMAKE_SOURCE_DIR}/config/*.cpp") file(GLOB_RECURSE LOG CONFIGURE_DEPENDS "${CMAKE_SOURCE_DIR}/logging/*.cpp") -message("Source files: ${ALGOS}") + +file(GLOB EXECS CONFIGURE_DEPENDS "${CMAKE_SOURCE_DIR}/main/*.cpp") # Adding executables -add_executable(Breadcrumbs "${CMAKE_SOURCE_DIR}/main/Breadcrumbs.cpp" ${ALGOS} ${COMMS} ${CONFIG} ${LOG}) -add_executable(VirtualOutputProcessor "${CMAKE_SOURCE_DIR}/main/VirtualOutputProcessor.cpp" ${ALGOS} ${COMMS} ${CONFIG} ${LOG}) +# This is fine for now, but we may want to switch to a more manual versio so we can +# configure which files are included in which exe's +foreach(X IN LISTS EXECS) + get_filename_component(N ${X} NAME_WE) + message(STATUS "Generating Executable: ${N}.exe Main File: ${X}"}) + add_executable(${N} ${X} ${ALGOS} ${COMMS} ${CONFIG} ${LOG}) +endforeach() From 3595c6f8c1a4f7e372421f5511bb94366ebf0898 Mon Sep 17 00:00:00 2001 From: Greg Foss Date: Tue, 28 Jan 2020 13:53:53 -0500 Subject: [PATCH 3/7] Adding async start program function to startbfs.py --- breadcrumbs/scripts/startbfs.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/breadcrumbs/scripts/startbfs.py b/breadcrumbs/scripts/startbfs.py index e9eab98..1909112 100644 --- a/breadcrumbs/scripts/startbfs.py +++ b/breadcrumbs/scripts/startbfs.py @@ -4,20 +4,24 @@ import subprocess -def start_program(command): - cmd_list = command.split(" ") - return subprocess.call(cmd_list) +def start_program(program): + if not isinstance(program, str): + raise ValueError("Argument passed to start_program_sync() should be a string") + return os.system("cmd /c " + program) -def start_program_no_hang(command): - print("Running %s from cwd %s" % (command, os.getcwd())) - proc = mp.Process(target=start_program, args=(command,)) - proc.start() - return proc +def start_program_async(program): + # TODO: This needs to account for the number of cores... + # Add another function called start_programs_async that takes a list + # of programs and starts them on the proper cores. + p = mp.Process(target=start_program, args=(program,)) + p.start() + return p def main(): - os.system("cmd /c ..\\bin\\Breadcrumbs.exe") + p = start_program_async("..\\bin\\Breadcrumbs.exe") + p.join() print("DONE") From 7808d973898e3a4dc5d6d315fd94b949e4547113 Mon Sep 17 00:00:00 2001 From: Greg Foss Date: Wed, 29 Jan 2020 17:02:33 -0500 Subject: [PATCH 4/7] Adding test io processor and onside comms stop --- breadcrumbs/include/Attribute.hpp | 9 +- breadcrumbs/include/DataSyncThread.hpp | 8 +- breadcrumbs/src/comms/DataSyncThread.cpp | 85 ++++++++++++++++--- .../src/main/VirtualOutputProcessor.cpp | 13 +++ 4 files changed, 100 insertions(+), 15 deletions(-) diff --git a/breadcrumbs/include/Attribute.hpp b/breadcrumbs/include/Attribute.hpp index 399af93..075c07d 100644 --- a/breadcrumbs/include/Attribute.hpp +++ b/breadcrumbs/include/Attribute.hpp @@ -29,8 +29,15 @@ class Attribute this->value = value; } - ~Attribute() { + Attribute(string key, size_t length, void* value) + { + this->key = key; + this->length = length; + this->value = value; + } + ~Attribute() { + } // Getters and setters diff --git a/breadcrumbs/include/DataSyncThread.hpp b/breadcrumbs/include/DataSyncThread.hpp index c8ed1b9..0f1f8a2 100644 --- a/breadcrumbs/include/DataSyncThread.hpp +++ b/breadcrumbs/include/DataSyncThread.hpp @@ -30,6 +30,7 @@ class DataSyncThread HANDLE hThread = INVALID_HANDLE_VALUE; DWORD dwThreadId; bool continueThread = false; + bool threadRunning = false; std::vector incomingAttributes; HANDLE attribMutex; @@ -40,6 +41,7 @@ class DataSyncThread attribMutex = CreateMutex(NULL, false, NULL); }; + // Synchronous functions (only called from thread) void threadRuntime(); static DWORD threadInit(LPVOID pThreadArgs) @@ -49,14 +51,18 @@ class DataSyncThread return NULL; } int recvBytes(void* buffer, size_t numBytes); + int sendBytes(char* buffer, size_t numBytes); void addIncomingAttribute(Attribute attrib); + // Instantiates a datasyncthread on the heap connected to given server. + int connectToAlgorithm(char* serverName); + // Async control (only called outside of thread) void startComms(); + bool isRunning(); bool stopComms(); bool isClientConnected() { return continueThread; }; unsigned int getSocketNumber() { return (unsigned int) socket; }; - int connectToAlgorithm(char* serverName); bool areIncomingAttributesAvailable(); std::vector *getIncomingAttributes(); bool sendAttribute(Attribute attrib); diff --git a/breadcrumbs/src/comms/DataSyncThread.cpp b/breadcrumbs/src/comms/DataSyncThread.cpp index 851fb94..5b73147 100644 --- a/breadcrumbs/src/comms/DataSyncThread.cpp +++ b/breadcrumbs/src/comms/DataSyncThread.cpp @@ -4,7 +4,7 @@ void DataSyncThread::threadRuntime() { - + threadRunning = true; printf("Thread runtime: %d\n", incomingAttributes); /* @@ -52,6 +52,10 @@ void DataSyncThread::threadRuntime() CloseHandle(attribMutex); attribMutex = NULL; } + + WSACleanup(); + printf("Done running thread!"); + threadRunning = false; } int DataSyncThread::recvBytes(void* buffer, size_t numBytes) @@ -77,6 +81,29 @@ int DataSyncThread::recvBytes(void* buffer, size_t numBytes) return numBytes; } +int DataSyncThread::sendBytes(char* buffer, size_t numBytes) +{ + size_t bytesSent = 0; + int iResult; + + while (bytesSent < numBytes) + { + iResult = send(sock, buffer + bytesSent, numBytes - bytesSent, 0); + if (iResult < 0) + { + printf("send failed with error: %d\n", WSAGetLastError()); + return iResult; + } + else if (iResult == 0) { + printf("send returned 0, connection closed.\n"); + return iResult; + } + else + bytesSent += iResult; + } + return numBytes; +} + void DataSyncThread::addIncomingAttribute(Attribute attrib) { DWORD result = WaitForSingleObject(attribMutex, INFINITE); @@ -95,14 +122,27 @@ void DataSyncThread::startComms() Initializes the communication thread */ printf("Start comms: %d\n", incomingAttributes); - continueThread = true; - hThread = CreateThread( - NULL, // default security attributes - 0, // use default stack size - threadInit, // thread function name - this, // argument to thread function - 0, // use default creation flags - &dwThreadId); // returns the thread identifier + if (!threadRunning) + { + threadRunning = false; + continueThread = true; + hThread = CreateThread( + NULL, // default security attributes + 0, // use default stack size + threadInit, // thread function name + this, // argument to thread function + 0, // use default creation flags + &dwThreadId); // returns the thread identifier + + while (1) + if (threadRunning) + break; + } +} + +bool DataSyncThread::isRunning() +{ + return threadRunning; } bool DataSyncThread::stopComms() @@ -111,6 +151,7 @@ bool DataSyncThread::stopComms() Stops the communication thread */ continueThread = false; + while (isRunning()); return true; } @@ -162,7 +203,6 @@ int DataSyncThread::connectToAlgorithm(char* serverName) } break; } - freeaddrinfo(result); if (sock == INVALID_SOCKET) { @@ -170,6 +210,10 @@ int DataSyncThread::connectToAlgorithm(char* serverName) WSACleanup(); return 1; } + else + { + return 0; + } } std::vector *DataSyncThread::getIncomingAttributes() @@ -194,8 +238,23 @@ bool DataSyncThread::areIncomingAttributesAvailable() return incomingAttributes.size() > 0; } -bool sendAttribute(Attribute attrib) +bool DataSyncThread::sendAttribute(Attribute attrib) { - // TODO: Implement this to send attribute in the format described in the recieve section. - return false; + // two extra bytes, one for command 0x00 and one for length: + int streamLength = attrib.getLength() + ATTRIB_KEY_SIZE + 2; + char *bytes = new char[streamLength]; + int iter = 0; + bytes[iter++] = 0x00; + + string key = attrib.getKey(); + for (int i = 0; i < ATTRIB_KEY_SIZE; i++) + bytes[iter++] = key[i]; + + bytes[iter++] = attrib.getLength(); + + for (int i = 0; i < attrib.getLength(); i++) + bytes[iter++] = ((char*) attrib.getValue())[i]; + + int result = sendBytes(bytes, streamLength); + return result == streamLength; } diff --git a/breadcrumbs/src/main/VirtualOutputProcessor.cpp b/breadcrumbs/src/main/VirtualOutputProcessor.cpp index e5da27c..3af1a19 100644 --- a/breadcrumbs/src/main/VirtualOutputProcessor.cpp +++ b/breadcrumbs/src/main/VirtualOutputProcessor.cpp @@ -1,5 +1,18 @@ +#include "DataSyncThread.hpp" + + int main() { + DataSyncThread client(NULL); + client.connectToAlgorithm("localhost"); + client.startComms(); + + char testValue = 'a'; + Attribute attrib = *new Attribute("testKey1", 1, &testValue); + client.sendAttribute(attrib); + + client.stopComms(); + return 0; } From a53e2bc794a9f65d48fcabe27ff93e09da2b9b95 Mon Sep 17 00:00:00 2001 From: Greg Foss Date: Wed, 29 Jan 2020 17:22:01 -0500 Subject: [PATCH 5/7] Adding select to data sync listen thread --- breadcrumbs/include/DataSyncThread.hpp | 1 + breadcrumbs/src/comms/DataSyncThread.cpp | 16 +++++++++++++++- breadcrumbs/src/main/VirtualOutputProcessor.cpp | 10 +++++++--- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/breadcrumbs/include/DataSyncThread.hpp b/breadcrumbs/include/DataSyncThread.hpp index 0f1f8a2..52ea144 100644 --- a/breadcrumbs/include/DataSyncThread.hpp +++ b/breadcrumbs/include/DataSyncThread.hpp @@ -44,6 +44,7 @@ class DataSyncThread // Synchronous functions (only called from thread) void threadRuntime(); + bool readyToReceive(int interval = 1); static DWORD threadInit(LPVOID pThreadArgs) { DataSyncThread* pDataSyncThread = (DataSyncThread*)pThreadArgs; diff --git a/breadcrumbs/src/comms/DataSyncThread.cpp b/breadcrumbs/src/comms/DataSyncThread.cpp index 5b73147..1c89fbc 100644 --- a/breadcrumbs/src/comms/DataSyncThread.cpp +++ b/breadcrumbs/src/comms/DataSyncThread.cpp @@ -16,8 +16,9 @@ void DataSyncThread::threadRuntime() { printf("Waiting to receive bytes\n"); + if (!readyToReceive()) + continue; - // Master while loop that will receive incoming messages char commandByte; iResult = recvBytes(&commandByte, 1); if (iResult <= 0) @@ -58,6 +59,19 @@ void DataSyncThread::threadRuntime() threadRunning = false; } +bool DataSyncThread::readyToReceive(int interval) +{ + fd_set fds; + FD_ZERO(&fds); + FD_SET(sock, &fds); + + timeval tv; + tv.tv_sec = interval; + tv.tv_usec = 0; + + return (select(sock + 1, &fds, 0, 0, &tv) == 1); +} + int DataSyncThread::recvBytes(void* buffer, size_t numBytes) { size_t bytesRecved = 0; diff --git a/breadcrumbs/src/main/VirtualOutputProcessor.cpp b/breadcrumbs/src/main/VirtualOutputProcessor.cpp index 3af1a19..7c5befc 100644 --- a/breadcrumbs/src/main/VirtualOutputProcessor.cpp +++ b/breadcrumbs/src/main/VirtualOutputProcessor.cpp @@ -1,5 +1,6 @@ #include "DataSyncThread.hpp" +#include int main() @@ -8,9 +9,12 @@ int main() client.connectToAlgorithm("localhost"); client.startComms(); - char testValue = 'a'; - Attribute attrib = *new Attribute("testKey1", 1, &testValue); - client.sendAttribute(attrib); + for (int i = 0; i < 10; i++) + { + char testValue = 'a' + i; + Attribute attrib("testKey1", 1, &testValue); + client.sendAttribute(attrib); + } client.stopComms(); From dc5f87d7ad5cb51f360ff58a8ef78a16517f82bd Mon Sep 17 00:00:00 2001 From: Greg Foss Date: Wed, 29 Jan 2020 17:35:44 -0500 Subject: [PATCH 6/7] Fixing breadcrumbs server after data sync thread changes --- breadcrumbs/src/comms/DataSyncThread.cpp | 8 +++++--- breadcrumbs/src/main/VirtualOutputProcessor.cpp | 1 + 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/breadcrumbs/src/comms/DataSyncThread.cpp b/breadcrumbs/src/comms/DataSyncThread.cpp index 1c89fbc..5974754 100644 --- a/breadcrumbs/src/comms/DataSyncThread.cpp +++ b/breadcrumbs/src/comms/DataSyncThread.cpp @@ -54,8 +54,7 @@ void DataSyncThread::threadRuntime() attribMutex = NULL; } - WSACleanup(); - printf("Done running thread!"); + printf("Done running data sync thread!\n"); threadRunning = false; } @@ -69,7 +68,10 @@ bool DataSyncThread::readyToReceive(int interval) tv.tv_sec = interval; tv.tv_usec = 0; - return (select(sock + 1, &fds, 0, 0, &tv) == 1); + bool result = select(sock + 1, &fds, 0, 0, &tv) == 1; + if (FD_ISSET(sock, &fds)) + return true; + return result; } int DataSyncThread::recvBytes(void* buffer, size_t numBytes) diff --git a/breadcrumbs/src/main/VirtualOutputProcessor.cpp b/breadcrumbs/src/main/VirtualOutputProcessor.cpp index 7c5befc..50a181d 100644 --- a/breadcrumbs/src/main/VirtualOutputProcessor.cpp +++ b/breadcrumbs/src/main/VirtualOutputProcessor.cpp @@ -17,6 +17,7 @@ int main() } client.stopComms(); + WSACleanup(); return 0; } From 9ddca58131b86fa8d69fb4bd607e0f486cb2a4c9 Mon Sep 17 00:00:00 2001 From: Greg Foss Date: Tue, 4 Feb 2020 12:53:01 -0500 Subject: [PATCH 7/7] Adding error checking for nonexistent server in virtual output processor --- .../src/main/VirtualOutputProcessor.cpp | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/breadcrumbs/src/main/VirtualOutputProcessor.cpp b/breadcrumbs/src/main/VirtualOutputProcessor.cpp index 50a181d..9c87767 100644 --- a/breadcrumbs/src/main/VirtualOutputProcessor.cpp +++ b/breadcrumbs/src/main/VirtualOutputProcessor.cpp @@ -6,18 +6,21 @@ int main() { DataSyncThread client(NULL); - client.connectToAlgorithm("localhost"); - client.startComms(); - - for (int i = 0; i < 10; i++) + if (!client.connectToAlgorithm("localhost")) { - char testValue = 'a' + i; - Attribute attrib("testKey1", 1, &testValue); - client.sendAttribute(attrib); - } + client.startComms(); - client.stopComms(); - WSACleanup(); + for (int i = 0; i < 10; i++) + { + char testValue = 'a' + i; + Attribute attrib("testKey1", 1, &testValue); + client.sendAttribute(attrib); + } - return 0; + client.stopComms(); + WSACleanup(); + + return 0; + } + return 1; }