diff --git a/breadcrumbs/CMakeLists.txt b/breadcrumbs/CMakeLists.txt index 94f900a..a0eb54b 100644 --- a/breadcrumbs/CMakeLists.txt +++ b/breadcrumbs/CMakeLists.txt @@ -23,7 +23,7 @@ message("Source directory: ${CMAKE_SOURCE_DIR}") message("Build directory: ${CMAKE_BINARY_DIR}") message("Executable directory: ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}") message("Library directory: ${CMAKE_LIBRARY_OUTPUT_DIRECTORY}") -message("Header file directory: ${INCLUDES_DIRECTORY}") +message("Header file directory: ${INCLUSDES_DIRECTORY}") # Configure a header file to pass some of the CMake settings to the source code set (Bfs_ALGORITHM_SERVER_PORT \"27634\") @@ -40,8 +40,18 @@ include_directories("${CMAKE_SOURCE_DIR}") # puts all .cpp files inside src to the SOURCES variable # TODO: replace this with a script for collecting cpp files -file(GLOB_RECURSE SOURCES CONFIGURE_DEPENDS "${CMAKE_SOURCE_DIR}/*.cpp") -message("Source files: ${SOURCES}") +file(GLOB_RECURSE ALGOS CONFIGURE_DEPENDS "${CMAKE_SOURCE_DIR}/algos/*.cpp") +file(GLOB_RECURSE COMMS CONFIGURE_DEPENDS "${CMAKE_SOURCE_DIR}/comms/*.cpp") +file(GLOB_RECURSE CONFIG CONFIGURE_DEPENDS "${CMAKE_SOURCE_DIR}/config/*.cpp") +file(GLOB_RECURSE LOG CONFIGURE_DEPENDS "${CMAKE_SOURCE_DIR}/logging/*.cpp") + +file(GLOB EXECS CONFIGURE_DEPENDS "${CMAKE_SOURCE_DIR}/main/*.cpp") # Adding executables -add_executable(Breadcrumbs ${SOURCES}) +# This is fine for now, but we may want to switch to a more manual versio so we can +# configure which files are included in which exe's +foreach(X IN LISTS EXECS) + get_filename_component(N ${X} NAME_WE) + message(STATUS "Generating Executable: ${N}.exe Main File: ${X}"}) + add_executable(${N} ${X} ${ALGOS} ${COMMS} ${CONFIG} ${LOG}) +endforeach() diff --git a/breadcrumbs/include/Attribute.hpp b/breadcrumbs/include/Attribute.hpp index 399af93..075c07d 100644 --- a/breadcrumbs/include/Attribute.hpp +++ b/breadcrumbs/include/Attribute.hpp @@ -29,8 +29,15 @@ class Attribute this->value = value; } - ~Attribute() { + Attribute(string key, size_t length, void* value) + { + this->key = key; + this->length = length; + this->value = value; + } + ~Attribute() { + } // Getters and setters diff --git a/breadcrumbs/include/DataSyncThread.hpp b/breadcrumbs/include/DataSyncThread.hpp index c8ed1b9..52ea144 100644 --- a/breadcrumbs/include/DataSyncThread.hpp +++ b/breadcrumbs/include/DataSyncThread.hpp @@ -30,6 +30,7 @@ class DataSyncThread HANDLE hThread = INVALID_HANDLE_VALUE; DWORD dwThreadId; bool continueThread = false; + bool threadRunning = false; std::vector incomingAttributes; HANDLE attribMutex; @@ -40,8 +41,10 @@ class DataSyncThread attribMutex = CreateMutex(NULL, false, NULL); }; + // Synchronous functions (only called from thread) void threadRuntime(); + bool readyToReceive(int interval = 1); static DWORD threadInit(LPVOID pThreadArgs) { DataSyncThread* pDataSyncThread = (DataSyncThread*)pThreadArgs; @@ -49,14 +52,18 @@ class DataSyncThread return NULL; } int recvBytes(void* buffer, size_t numBytes); + int sendBytes(char* buffer, size_t numBytes); void addIncomingAttribute(Attribute attrib); + // Instantiates a datasyncthread on the heap connected to given server. + int connectToAlgorithm(char* serverName); + // Async control (only called outside of thread) void startComms(); + bool isRunning(); bool stopComms(); bool isClientConnected() { return continueThread; }; unsigned int getSocketNumber() { return (unsigned int) socket; }; - int connectToAlgorithm(char* serverName); bool areIncomingAttributesAvailable(); std::vector *getIncomingAttributes(); bool sendAttribute(Attribute attrib); diff --git a/breadcrumbs/scripts/startbfs.py b/breadcrumbs/scripts/startbfs.py index 618e877..1909112 100644 --- a/breadcrumbs/scripts/startbfs.py +++ b/breadcrumbs/scripts/startbfs.py @@ -4,20 +4,25 @@ import subprocess -def start_program(command): - cmd_list = command.split(" ") - return subprocess.call(cmd_list) +def start_program(program): + if not isinstance(program, str): + raise ValueError("Argument passed to start_program_sync() should be a string") + return os.system("cmd /c " + program) -def start_program_no_hang(command): - print("Running %s from cwd %s" % (command, os.getcwd())) - proc = mp.Process(target=start_program, args=(command,)) - proc.start() - return proc +def start_program_async(program): + # TODO: This needs to account for the number of cores... + # Add another function called start_programs_async that takes a list + # of programs and starts them on the proper cores. + p = mp.Process(target=start_program, args=(program,)) + p.start() + return p def main(): - start_program_no_hang("start cmd.exe /k \"..\\bin\\Breadcrumbs.exe\"") + p = start_program_async("..\\bin\\Breadcrumbs.exe") + p.join() + print("DONE") if __name__ == "__main__": diff --git a/breadcrumbs/src/comms/DataSyncThread.cpp b/breadcrumbs/src/comms/DataSyncThread.cpp index 7f64788..99b7c95 100644 --- a/breadcrumbs/src/comms/DataSyncThread.cpp +++ b/breadcrumbs/src/comms/DataSyncThread.cpp @@ -4,7 +4,7 @@ void DataSyncThread::threadRuntime() { - + threadRunning = true; printf("Thread runtime: %d\n", incomingAttributes); /* @@ -16,8 +16,9 @@ void DataSyncThread::threadRuntime() { write_log("Waiting to receive bytes\n", 1, logger->filename, logger); printf("Waiting to receive bytes\n"); + if (!readyToReceive()) + continue; - // Master while loop that will receive incoming messages char commandByte; iResult = recvBytes(&commandByte, 1); if (iResult <= 0) @@ -52,6 +53,25 @@ void DataSyncThread::threadRuntime() CloseHandle(attribMutex); attribMutex = NULL; } + + printf("Done running data sync thread!\n"); + threadRunning = false; +} + +bool DataSyncThread::readyToReceive(int interval) +{ + fd_set fds; + FD_ZERO(&fds); + FD_SET(sock, &fds); + + timeval tv; + tv.tv_sec = interval; + tv.tv_usec = 0; + + bool result = select(sock + 1, &fds, 0, 0, &tv) == 1; + if (FD_ISSET(sock, &fds)) + return true; + return result; } int DataSyncThread::recvBytes(void* buffer, size_t numBytes) @@ -77,6 +97,29 @@ int DataSyncThread::recvBytes(void* buffer, size_t numBytes) return numBytes; } +int DataSyncThread::sendBytes(char* buffer, size_t numBytes) +{ + size_t bytesSent = 0; + int iResult; + + while (bytesSent < numBytes) + { + iResult = send(sock, buffer + bytesSent, numBytes - bytesSent, 0); + if (iResult < 0) + { + printf("send failed with error: %d\n", WSAGetLastError()); + return iResult; + } + else if (iResult == 0) { + printf("send returned 0, connection closed.\n"); + return iResult; + } + else + bytesSent += iResult; + } + return numBytes; +} + void DataSyncThread::addIncomingAttribute(Attribute attrib) { DWORD result = WaitForSingleObject(attribMutex, INFINITE); @@ -98,14 +141,27 @@ void DataSyncThread::startComms() //write_log(std::to_string(incomingAttributes), 1, logger->filename, logger); printf("Start comms: %d\n", incomingAttributes); - continueThread = true; - hThread = CreateThread( - NULL, // default security attributes - 0, // use default stack size - threadInit, // thread function name - this, // argument to thread function - 0, // use default creation flags - &dwThreadId); // returns the thread identifier + if (!threadRunning) + { + threadRunning = false; + continueThread = true; + hThread = CreateThread( + NULL, // default security attributes + 0, // use default stack size + threadInit, // thread function name + this, // argument to thread function + 0, // use default creation flags + &dwThreadId); // returns the thread identifier + + while (1) + if (threadRunning) + break; + } +} + +bool DataSyncThread::isRunning() +{ + return threadRunning; } bool DataSyncThread::stopComms() @@ -114,6 +170,7 @@ bool DataSyncThread::stopComms() Stops the communication thread */ continueThread = false; + while (isRunning()); return true; } @@ -165,7 +222,6 @@ int DataSyncThread::connectToAlgorithm(char* serverName) } break; } - freeaddrinfo(result); if (sock == INVALID_SOCKET) { @@ -173,6 +229,10 @@ int DataSyncThread::connectToAlgorithm(char* serverName) WSACleanup(); return 1; } + else + { + return 0; + } } std::vector *DataSyncThread::getIncomingAttributes() @@ -197,8 +257,23 @@ bool DataSyncThread::areIncomingAttributesAvailable() return incomingAttributes.size() > 0; } -bool sendAttribute(Attribute attrib) +bool DataSyncThread::sendAttribute(Attribute attrib) { - // TODO: Implement this to send attribute in the format described in the recieve section. - return false; + // two extra bytes, one for command 0x00 and one for length: + int streamLength = attrib.getLength() + ATTRIB_KEY_SIZE + 2; + char *bytes = new char[streamLength]; + int iter = 0; + bytes[iter++] = 0x00; + + string key = attrib.getKey(); + for (int i = 0; i < ATTRIB_KEY_SIZE; i++) + bytes[iter++] = key[i]; + + bytes[iter++] = attrib.getLength(); + + for (int i = 0; i < attrib.getLength(); i++) + bytes[iter++] = ((char*) attrib.getValue())[i]; + + int result = sendBytes(bytes, streamLength); + return result == streamLength; } diff --git a/breadcrumbs/src/io/IOProcessor.cpp b/breadcrumbs/src/io/IOProcessor.cpp deleted file mode 100644 index 437de23..0000000 --- a/breadcrumbs/src/io/IOProcessor.cpp +++ /dev/null @@ -1,4 +0,0 @@ - -#include "IOProcessor.hpp" - - diff --git a/breadcrumbs/src/io/in_procs/.blank b/breadcrumbs/src/io/in_procs/.blank deleted file mode 100644 index e69de29..0000000 diff --git a/breadcrumbs/src/io/out_procs/.blank b/breadcrumbs/src/io/out_procs/.blank deleted file mode 100644 index e69de29..0000000 diff --git a/breadcrumbs/src/io/out_procs/VirtualOutputProcessor.cpp b/breadcrumbs/src/io/out_procs/VirtualOutputProcessor.cpp deleted file mode 100644 index 01bf3cc..0000000 --- a/breadcrumbs/src/io/out_procs/VirtualOutputProcessor.cpp +++ /dev/null @@ -1,8 +0,0 @@ - -#include "VirtualOutputProcessor.hpp" - - -void VirtualOutputProcessor::threadRuntime(IOProcessor* ioProc) -{ - printf("VirtualOutputProcessor started.\n"); -} diff --git a/breadcrumbs/src/Breadcrumbs.cpp b/breadcrumbs/src/main/Breadcrumbs.cpp similarity index 100% rename from breadcrumbs/src/Breadcrumbs.cpp rename to breadcrumbs/src/main/Breadcrumbs.cpp diff --git a/breadcrumbs/src/main/VirtualOutputProcessor.cpp b/breadcrumbs/src/main/VirtualOutputProcessor.cpp new file mode 100644 index 0000000..9c87767 --- /dev/null +++ b/breadcrumbs/src/main/VirtualOutputProcessor.cpp @@ -0,0 +1,26 @@ + +#include "DataSyncThread.hpp" +#include + + +int main() +{ + DataSyncThread client(NULL); + if (!client.connectToAlgorithm("localhost")) + { + client.startComms(); + + for (int i = 0; i < 10; i++) + { + char testValue = 'a' + i; + Attribute attrib("testKey1", 1, &testValue); + client.sendAttribute(attrib); + } + + client.stopComms(); + WSACleanup(); + + return 0; + } + return 1; +}