diff --git a/breadcrumbs/include/Attribute.hpp b/breadcrumbs/include/Attribute.hpp index 399af93..075c07d 100644 --- a/breadcrumbs/include/Attribute.hpp +++ b/breadcrumbs/include/Attribute.hpp @@ -29,8 +29,15 @@ class Attribute this->value = value; } - ~Attribute() { + Attribute(string key, size_t length, void* value) + { + this->key = key; + this->length = length; + this->value = value; + } + ~Attribute() { + } // Getters and setters diff --git a/breadcrumbs/include/DataSyncThread.hpp b/breadcrumbs/include/DataSyncThread.hpp index c8ed1b9..0f1f8a2 100644 --- a/breadcrumbs/include/DataSyncThread.hpp +++ b/breadcrumbs/include/DataSyncThread.hpp @@ -30,6 +30,7 @@ class DataSyncThread HANDLE hThread = INVALID_HANDLE_VALUE; DWORD dwThreadId; bool continueThread = false; + bool threadRunning = false; std::vector incomingAttributes; HANDLE attribMutex; @@ -40,6 +41,7 @@ class DataSyncThread attribMutex = CreateMutex(NULL, false, NULL); }; + // Synchronous functions (only called from thread) void threadRuntime(); static DWORD threadInit(LPVOID pThreadArgs) @@ -49,14 +51,18 @@ class DataSyncThread return NULL; } int recvBytes(void* buffer, size_t numBytes); + int sendBytes(char* buffer, size_t numBytes); void addIncomingAttribute(Attribute attrib); + // Instantiates a datasyncthread on the heap connected to given server. + int connectToAlgorithm(char* serverName); + // Async control (only called outside of thread) void startComms(); + bool isRunning(); bool stopComms(); bool isClientConnected() { return continueThread; }; unsigned int getSocketNumber() { return (unsigned int) socket; }; - int connectToAlgorithm(char* serverName); bool areIncomingAttributesAvailable(); std::vector *getIncomingAttributes(); bool sendAttribute(Attribute attrib); diff --git a/breadcrumbs/src/comms/DataSyncThread.cpp b/breadcrumbs/src/comms/DataSyncThread.cpp index 851fb94..5b73147 100644 --- a/breadcrumbs/src/comms/DataSyncThread.cpp +++ b/breadcrumbs/src/comms/DataSyncThread.cpp @@ -4,7 +4,7 @@ void DataSyncThread::threadRuntime() { - + threadRunning = true; printf("Thread runtime: %d\n", incomingAttributes); /* @@ -52,6 +52,10 @@ void DataSyncThread::threadRuntime() CloseHandle(attribMutex); attribMutex = NULL; } + + WSACleanup(); + printf("Done running thread!"); + threadRunning = false; } int DataSyncThread::recvBytes(void* buffer, size_t numBytes) @@ -77,6 +81,29 @@ int DataSyncThread::recvBytes(void* buffer, size_t numBytes) return numBytes; } +int DataSyncThread::sendBytes(char* buffer, size_t numBytes) +{ + size_t bytesSent = 0; + int iResult; + + while (bytesSent < numBytes) + { + iResult = send(sock, buffer + bytesSent, numBytes - bytesSent, 0); + if (iResult < 0) + { + printf("send failed with error: %d\n", WSAGetLastError()); + return iResult; + } + else if (iResult == 0) { + printf("send returned 0, connection closed.\n"); + return iResult; + } + else + bytesSent += iResult; + } + return numBytes; +} + void DataSyncThread::addIncomingAttribute(Attribute attrib) { DWORD result = WaitForSingleObject(attribMutex, INFINITE); @@ -95,14 +122,27 @@ void DataSyncThread::startComms() Initializes the communication thread */ printf("Start comms: %d\n", incomingAttributes); - continueThread = true; - hThread = CreateThread( - NULL, // default security attributes - 0, // use default stack size - threadInit, // thread function name - this, // argument to thread function - 0, // use default creation flags - &dwThreadId); // returns the thread identifier + if (!threadRunning) + { + threadRunning = false; + continueThread = true; + hThread = CreateThread( + NULL, // default security attributes + 0, // use default stack size + threadInit, // thread function name + this, // argument to thread function + 0, // use default creation flags + &dwThreadId); // returns the thread identifier + + while (1) + if (threadRunning) + break; + } +} + +bool DataSyncThread::isRunning() +{ + return threadRunning; } bool DataSyncThread::stopComms() @@ -111,6 +151,7 @@ bool DataSyncThread::stopComms() Stops the communication thread */ continueThread = false; + while (isRunning()); return true; } @@ -162,7 +203,6 @@ int DataSyncThread::connectToAlgorithm(char* serverName) } break; } - freeaddrinfo(result); if (sock == INVALID_SOCKET) { @@ -170,6 +210,10 @@ int DataSyncThread::connectToAlgorithm(char* serverName) WSACleanup(); return 1; } + else + { + return 0; + } } std::vector *DataSyncThread::getIncomingAttributes() @@ -194,8 +238,23 @@ bool DataSyncThread::areIncomingAttributesAvailable() return incomingAttributes.size() > 0; } -bool sendAttribute(Attribute attrib) +bool DataSyncThread::sendAttribute(Attribute attrib) { - // TODO: Implement this to send attribute in the format described in the recieve section. - return false; + // two extra bytes, one for command 0x00 and one for length: + int streamLength = attrib.getLength() + ATTRIB_KEY_SIZE + 2; + char *bytes = new char[streamLength]; + int iter = 0; + bytes[iter++] = 0x00; + + string key = attrib.getKey(); + for (int i = 0; i < ATTRIB_KEY_SIZE; i++) + bytes[iter++] = key[i]; + + bytes[iter++] = attrib.getLength(); + + for (int i = 0; i < attrib.getLength(); i++) + bytes[iter++] = ((char*) attrib.getValue())[i]; + + int result = sendBytes(bytes, streamLength); + return result == streamLength; } diff --git a/breadcrumbs/src/main/VirtualOutputProcessor.cpp b/breadcrumbs/src/main/VirtualOutputProcessor.cpp index e5da27c..3af1a19 100644 --- a/breadcrumbs/src/main/VirtualOutputProcessor.cpp +++ b/breadcrumbs/src/main/VirtualOutputProcessor.cpp @@ -1,5 +1,18 @@ +#include "DataSyncThread.hpp" + + int main() { + DataSyncThread client(NULL); + client.connectToAlgorithm("localhost"); + client.startComms(); + + char testValue = 'a'; + Attribute attrib = *new Attribute("testKey1", 1, &testValue); + client.sendAttribute(attrib); + + client.stopComms(); + return 0; }