Skip to content

Commit

Permalink
Merge branch 'dev' of https://github.uconn.edu/grf14003/bfs into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
ncc14003 committed Feb 5, 2020
2 parents 2e8ae8e + 9ddca58 commit 6b71112
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 41 deletions.
18 changes: 14 additions & 4 deletions breadcrumbs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ message("Source directory: ${CMAKE_SOURCE_DIR}")
message("Build directory: ${CMAKE_BINARY_DIR}")
message("Executable directory: ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}")
message("Library directory: ${CMAKE_LIBRARY_OUTPUT_DIRECTORY}")
message("Header file directory: ${INCLUDES_DIRECTORY}")
message("Header file directory: ${INCLUSDES_DIRECTORY}")

# Configure a header file to pass some of the CMake settings to the source code
set (Bfs_ALGORITHM_SERVER_PORT \"27634\")
Expand All @@ -40,8 +40,18 @@ include_directories("${CMAKE_SOURCE_DIR}")

# puts all .cpp files inside src to the SOURCES variable
# TODO: replace this with a script for collecting cpp files
file(GLOB_RECURSE SOURCES CONFIGURE_DEPENDS "${CMAKE_SOURCE_DIR}/*.cpp")
message("Source files: ${SOURCES}")
file(GLOB_RECURSE ALGOS CONFIGURE_DEPENDS "${CMAKE_SOURCE_DIR}/algos/*.cpp")
file(GLOB_RECURSE COMMS CONFIGURE_DEPENDS "${CMAKE_SOURCE_DIR}/comms/*.cpp")
file(GLOB_RECURSE CONFIG CONFIGURE_DEPENDS "${CMAKE_SOURCE_DIR}/config/*.cpp")
file(GLOB_RECURSE LOG CONFIGURE_DEPENDS "${CMAKE_SOURCE_DIR}/logging/*.cpp")

file(GLOB EXECS CONFIGURE_DEPENDS "${CMAKE_SOURCE_DIR}/main/*.cpp")

# Adding executables
add_executable(Breadcrumbs ${SOURCES})
# This is fine for now, but we may want to switch to a more manual versio so we can
# configure which files are included in which exe's
foreach(X IN LISTS EXECS)
get_filename_component(N ${X} NAME_WE)
message(STATUS "Generating Executable: ${N}.exe Main File: ${X}"})
add_executable(${N} ${X} ${ALGOS} ${COMMS} ${CONFIG} ${LOG})
endforeach()
9 changes: 8 additions & 1 deletion breadcrumbs/include/Attribute.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,15 @@ class Attribute
this->value = value;
}

~Attribute() {
Attribute(string key, size_t length, void* value)
{
this->key = key;
this->length = length;
this->value = value;
}

~Attribute() {

}

// Getters and setters
Expand Down
9 changes: 8 additions & 1 deletion breadcrumbs/include/DataSyncThread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class DataSyncThread
HANDLE hThread = INVALID_HANDLE_VALUE;
DWORD dwThreadId;
bool continueThread = false;
bool threadRunning = false;

std::vector<Attribute> incomingAttributes;
HANDLE attribMutex;
Expand All @@ -40,23 +41,29 @@ class DataSyncThread
attribMutex = CreateMutex(NULL, false, NULL);
};


// Synchronous functions (only called from thread)
void threadRuntime();
bool readyToReceive(int interval = 1);
static DWORD threadInit(LPVOID pThreadArgs)
{
DataSyncThread* pDataSyncThread = (DataSyncThread*)pThreadArgs;
pDataSyncThread->threadRuntime();
return NULL;
}
int recvBytes(void* buffer, size_t numBytes);
int sendBytes(char* buffer, size_t numBytes);
void addIncomingAttribute(Attribute attrib);

// Instantiates a datasyncthread on the heap connected to given server.
int connectToAlgorithm(char* serverName);

// Async control (only called outside of thread)
void startComms();
bool isRunning();
bool stopComms();
bool isClientConnected() { return continueThread; };
unsigned int getSocketNumber() { return (unsigned int) socket; };
int connectToAlgorithm(char* serverName);
bool areIncomingAttributesAvailable();
std::vector<Attribute> *getIncomingAttributes();
bool sendAttribute(Attribute attrib);
Expand Down
23 changes: 14 additions & 9 deletions breadcrumbs/scripts/startbfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,25 @@
import subprocess


def start_program(command):
cmd_list = command.split(" ")
return subprocess.call(cmd_list)
def start_program(program):
if not isinstance(program, str):
raise ValueError("Argument passed to start_program_sync() should be a string")
return os.system("cmd /c " + program)


def start_program_no_hang(command):
print("Running %s from cwd %s" % (command, os.getcwd()))
proc = mp.Process(target=start_program, args=(command,))
proc.start()
return proc
def start_program_async(program):
# TODO: This needs to account for the number of cores...
# Add another function called start_programs_async that takes a list
# of programs and starts them on the proper cores.
p = mp.Process(target=start_program, args=(program,))
p.start()
return p


def main():
start_program_no_hang("start cmd.exe /k \"..\\bin\\Breadcrumbs.exe\"")
p = start_program_async("..\\bin\\Breadcrumbs.exe")
p.join()
print("DONE")


if __name__ == "__main__":
Expand Down
103 changes: 89 additions & 14 deletions breadcrumbs/src/comms/DataSyncThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

void DataSyncThread::threadRuntime()
{

threadRunning = true;
printf("Thread runtime: %d\n", incomingAttributes);

/*
Expand All @@ -16,8 +16,9 @@ void DataSyncThread::threadRuntime()
{
write_log("Waiting to receive bytes\n", 1, logger->filename, logger);
printf("Waiting to receive bytes\n");
if (!readyToReceive())
continue;

// Master while loop that will receive incoming messages
char commandByte;
iResult = recvBytes(&commandByte, 1);
if (iResult <= 0)
Expand Down Expand Up @@ -52,6 +53,25 @@ void DataSyncThread::threadRuntime()
CloseHandle(attribMutex);
attribMutex = NULL;
}

printf("Done running data sync thread!\n");
threadRunning = false;
}

bool DataSyncThread::readyToReceive(int interval)
{
fd_set fds;
FD_ZERO(&fds);
FD_SET(sock, &fds);

timeval tv;
tv.tv_sec = interval;
tv.tv_usec = 0;

bool result = select(sock + 1, &fds, 0, 0, &tv) == 1;
if (FD_ISSET(sock, &fds))
return true;
return result;
}

int DataSyncThread::recvBytes(void* buffer, size_t numBytes)
Expand All @@ -77,6 +97,29 @@ int DataSyncThread::recvBytes(void* buffer, size_t numBytes)
return numBytes;
}

int DataSyncThread::sendBytes(char* buffer, size_t numBytes)
{
size_t bytesSent = 0;
int iResult;

while (bytesSent < numBytes)
{
iResult = send(sock, buffer + bytesSent, numBytes - bytesSent, 0);
if (iResult < 0)
{
printf("send failed with error: %d\n", WSAGetLastError());
return iResult;
}
else if (iResult == 0) {
printf("send returned 0, connection closed.\n");
return iResult;
}
else
bytesSent += iResult;
}
return numBytes;
}

void DataSyncThread::addIncomingAttribute(Attribute attrib)
{
DWORD result = WaitForSingleObject(attribMutex, INFINITE);
Expand All @@ -98,14 +141,27 @@ void DataSyncThread::startComms()
//write_log(std::to_string(incomingAttributes), 1, logger->filename, logger);

printf("Start comms: %d\n", incomingAttributes);
continueThread = true;
hThread = CreateThread(
NULL, // default security attributes
0, // use default stack size
threadInit, // thread function name
this, // argument to thread function
0, // use default creation flags
&dwThreadId); // returns the thread identifier
if (!threadRunning)
{
threadRunning = false;
continueThread = true;
hThread = CreateThread(
NULL, // default security attributes
0, // use default stack size
threadInit, // thread function name
this, // argument to thread function
0, // use default creation flags
&dwThreadId); // returns the thread identifier

while (1)
if (threadRunning)
break;
}
}

bool DataSyncThread::isRunning()
{
return threadRunning;
}

bool DataSyncThread::stopComms()
Expand All @@ -114,6 +170,7 @@ bool DataSyncThread::stopComms()
Stops the communication thread
*/
continueThread = false;
while (isRunning());
return true;
}

Expand Down Expand Up @@ -165,14 +222,17 @@ int DataSyncThread::connectToAlgorithm(char* serverName)
}
break;
}

freeaddrinfo(result);

if (sock == INVALID_SOCKET) {
printf("Unable to connect to server!\n");
WSACleanup();
return 1;
}
else
{
return 0;
}
}

std::vector<Attribute> *DataSyncThread::getIncomingAttributes()
Expand All @@ -197,8 +257,23 @@ bool DataSyncThread::areIncomingAttributesAvailable()
return incomingAttributes.size() > 0;
}

bool sendAttribute(Attribute attrib)
bool DataSyncThread::sendAttribute(Attribute attrib)
{
// TODO: Implement this to send attribute in the format described in the recieve section.
return false;
// two extra bytes, one for command 0x00 and one for length:
int streamLength = attrib.getLength() + ATTRIB_KEY_SIZE + 2;
char *bytes = new char[streamLength];
int iter = 0;
bytes[iter++] = 0x00;

string key = attrib.getKey();
for (int i = 0; i < ATTRIB_KEY_SIZE; i++)
bytes[iter++] = key[i];

bytes[iter++] = attrib.getLength();

for (int i = 0; i < attrib.getLength(); i++)
bytes[iter++] = ((char*) attrib.getValue())[i];

int result = sendBytes(bytes, streamLength);
return result == streamLength;
}
4 changes: 0 additions & 4 deletions breadcrumbs/src/io/IOProcessor.cpp

This file was deleted.

Empty file removed breadcrumbs/src/io/in_procs/.blank
Empty file.
Empty file.
8 changes: 0 additions & 8 deletions breadcrumbs/src/io/out_procs/VirtualOutputProcessor.cpp

This file was deleted.

File renamed without changes.
26 changes: 26 additions & 0 deletions breadcrumbs/src/main/VirtualOutputProcessor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@

#include "DataSyncThread.hpp"
#include <iostream>


int main()
{
DataSyncThread client(NULL);
if (!client.connectToAlgorithm("localhost"))
{
client.startComms();

for (int i = 0; i < 10; i++)
{
char testValue = 'a' + i;
Attribute attrib("testKey1", 1, &testValue);
client.sendAttribute(attrib);
}

client.stopComms();
WSACleanup();

return 0;
}
return 1;
}

0 comments on commit 6b71112

Please sign in to comment.