Skip to content

Commit

Permalink
Merge branch 'working' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
grf14003 committed Jan 20, 2020
2 parents 1f9bc0a + 7ddbee6 commit 32ef926
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 49 deletions.
2 changes: 2 additions & 0 deletions breadcrumbs/include/Algorithm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class Algorithm

virtual void loop() = 0;
virtual bool loopCondition() { return false; };

vector<Attribute>* pollForAttributes();
private:
size_t numIoProcs = 0;
AlgorithmServer* server;
Expand Down
15 changes: 10 additions & 5 deletions breadcrumbs/include/AlgorithmServer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include "CMakeConfig.h"
#include "DataSyncThread.hpp"

#define ALGORITHM_PORT "10101"
#define MAX_ACCEPT_FAILURES 5

#pragma comment (lib, "Ws2_32.lib")
Expand All @@ -37,17 +36,23 @@ class AlgorithmServer
}

// Updates Algorithm key/value store with IO key/value updates
void pollForUpdates();
vector<Attribute>* getAllIncomingAttributes();
void startServer();
void stopServer();
int stopServer();

DWORD lockClientThreadsMutex();
void unlockClientThreadsMutex();

private:
SOCKET *ServerSocket = NULL;

HANDLE hThread;
DWORD dwThreadId;
bool continueThread = false;
bool continueThread = true;

size_t numClients;
std::vector<DataSyncThread> clientThreads;
HANDLE clientThreadsMutex = NULL;
vector<DataSyncThread*> clientThreads;
};

#endif
37 changes: 35 additions & 2 deletions breadcrumbs/include/Attribute.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,42 @@
#ifndef ATTRIBUTE_HPP
#define ATTRIBUTE_HPP

#include <stdio.h>
#include <string>

#define ATTRIB_KEY_SIZE 8

using namespace std;

typedef struct BinaryAttributeStructure {
char key[8];
unsigned char length[2];
char key[ATTRIB_KEY_SIZE];
char length;
} bAttrib;

class Attribute
{
private:
string key;
size_t length;
void* value;
public:
Attribute(BinaryAttributeStructure bin, void* value) {
key = "";
for (int i = 0; i < ATTRIB_KEY_SIZE; i++)
key += bin.key[i];
length = bin.length;
this->value = value;
}

~Attribute() {

}

// Getters and setters
string getKey() { return key; };
size_t getLength() { return length; };
void* getValue() { return value; };
void* setValue(void* newValue) { void* old = value; value = newValue; return old; };
};

#endif
22 changes: 18 additions & 4 deletions breadcrumbs/include/DataSyncThread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <ws2tcpip.h>
#include <stdlib.h>
#include <stdio.h>
#include <vector>

#include "Attribute.hpp"
#include "CMakeConfig.h"
Expand All @@ -26,13 +27,20 @@ class DataSyncThread
{
private:
SOCKET sock;
HANDLE hThread;
HANDLE hThread = INVALID_HANDLE_VALUE;
DWORD dwThreadId;
bool continueThread = false;

std::vector<Attribute> incomingAttributes;
HANDLE attribMutex;
public:
DataSyncThread(SOCKET s) { sock = s; };
DataSyncThread(SOCKET s)
{
sock = s;
attribMutex = CreateMutex(NULL, false, NULL);
};

// Synchronous functions
// Synchronous functions (only called from thread)
void threadRuntime();
static DWORD threadInit(LPVOID pThreadArgs)
{
Expand All @@ -41,11 +49,17 @@ class DataSyncThread
return NULL;
}
int recvBytes(void* buffer, size_t numBytes);
void addIncomingAttribute(Attribute attrib);

// Async control
// Async control (only called outside of thread)
void startComms();
bool stopComms();
bool isClientConnected() { return continueThread; };
unsigned int getSocketNumber() { return (unsigned int) socket; };
int connectToAlgorithm(char* serverName);
bool areIncomingAttributesAvailable();
std::vector<Attribute> *getIncomingAttributes();
bool sendAttribute(Attribute attrib);
};

#endif
1 change: 1 addition & 0 deletions breadcrumbs/src/Breadcrumbs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ int main()
while (algorithm->loopCondition())
{
algorithm->loop();
_sleep(1000);
}

return 0;
Expand Down
9 changes: 7 additions & 2 deletions breadcrumbs/src/algos/AlgoBreadcrumbs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@

void AlgoBreadcrumbs::loop()
{
printf("Breadcrumbs algorithm loop!\n");
vector<Attribute>* attribs = pollForAttributes();
// printf("Attrib length: %d\n", attribs->size());
if (attribs->size() > 0)
{
printf("Attrib length: %d\n", attribs->size());
}
}

bool AlgoBreadcrumbs::loopCondition()
{
return --iterations >= 0;
return true;
}
5 changes: 5 additions & 0 deletions breadcrumbs/src/algos/Algorithm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,8 @@ Algorithm::~Algorithm()
{
delete server;
}

vector<Attribute>* Algorithm::pollForAttributes()
{
return server->getAllIncomingAttributes();
}
98 changes: 69 additions & 29 deletions breadcrumbs/src/comms/AlgorithmServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ AlgorithmServer::~AlgorithmServer()
void AlgorithmServer::serverThreadRuntime()
{
int iResult;
clientThreadsMutex = CreateMutex(NULL, false, NULL);

SOCKET ListenSocket = INVALID_SOCKET;
SOCKET ClientSocket = INVALID_SOCKET;
Expand Down Expand Up @@ -49,6 +50,7 @@ void AlgorithmServer::serverThreadRuntime()
WSACleanup();
return;
}
ServerSocket = &ListenSocket;

// Setup the TCP listening socket
iResult = bind(ListenSocket, result->ai_addr, (int)result->ai_addrlen);
Expand All @@ -69,66 +71,83 @@ void AlgorithmServer::serverThreadRuntime()
return;
}

// Select stuff
FD_SET readSet;

int AcceptFailures = 0;
// Accept a client socket
while (continueThread)
{
int fds;

FD_ZERO(&readSet);
FD_SET(ListenSocket, &readSet);

if ((fds = select(0, &readSet, NULL, NULL, NULL)) == SOCKET_ERROR)
{
printf("select() returned with error %d\n", WSAGetLastError());
break;
printf("Listening for clients...\n");
ClientSocket = accept(ListenSocket, NULL, NULL);
printf("Hello new client!\n");
if (ClientSocket == INVALID_SOCKET) {
printf("accept failed with error: %d\n", WSAGetLastError());
AcceptFailures++;
if (AcceptFailures >= MAX_ACCEPT_FAILURES)
break;
}
else {
AcceptFailures = 0;
}

if (fds != 0)
// Reaping any old data sync threads
if (lockClientThreadsMutex() == STATUS_WAIT_0)
{
ClientSocket = accept(ListenSocket, NULL, NULL);
if (ClientSocket == INVALID_SOCKET) {
printf("accept failed with error: %d\n", WSAGetLastError());
AcceptFailures++;
if (AcceptFailures >= MAX_ACCEPT_FAILURES)
break;
}
else {
AcceptFailures = 0;
auto it = clientThreads.begin();
while (clientThreads.size() > 0 && it != clientThreads.end())
{
if (!(**it).isClientConnected())
{
clientThreads.erase(it);
}
else {
++it;
}
}

// Creating a new Data sync thread
if (clientThreads.size() < numClients) {
DataSyncThread client = *new DataSyncThread(ClientSocket);
DataSyncThread* client = new DataSyncThread(ClientSocket);
clientThreads.push_back(client);
client.startComms();
client->startComms();
}
else {
printf("Client attempted connection when (%d) clients are already connected", numClients);
}
unlockClientThreadsMutex();
}
else {
printf("Could not acquire mutex to add new client thread\n");
}
}

// cleanup
closesocket(ListenSocket);
WSACleanup();
ServerSocket = NULL;
continueThread = false;
CloseHandle(clientThreadsMutex);
return;
}

void AlgorithmServer::pollForUpdates()
vector<Attribute>* AlgorithmServer::getAllIncomingAttributes()
{
/*
Polls DataSyncThreads for new updates
Updates master storage accordingly
*/
for (DataSyncThread dst : clientThreads)
vector<Attribute>* newAttribs = new vector<Attribute>;
if (lockClientThreadsMutex() == STATUS_WAIT_0)
{
// TODO: Implement this!
for (DataSyncThread* dst : clientThreads)
{
vector<Attribute>* toAdd = dst->getIncomingAttributes();
newAttribs->insert(newAttribs->end(), toAdd->begin(), toAdd->end());
}
unlockClientThreadsMutex();
}
else {
printf("Could not acquire mutex to get incoming attributes.\n");
}
return newAttribs;
}

void AlgorithmServer::startServer()
Expand All @@ -153,7 +172,28 @@ void AlgorithmServer::startServer()
&dwThreadId); // returns the thread identifier
}

void AlgorithmServer::stopServer()
int AlgorithmServer::stopServer()
{
continueThread = false;
if (continueThread && ServerSocket != NULL) {
continueThread = false;
return shutdown(*ServerSocket, SD_BOTH);
}
return 0;
}

DWORD AlgorithmServer::lockClientThreadsMutex()
{
if (clientThreadsMutex != NULL)
{
return WaitForSingleObject(clientThreadsMutex, INFINITE);
}
return -1;
}

void AlgorithmServer::unlockClientThreadsMutex()
{
if (clientThreadsMutex != NULL)
{
ReleaseMutex(clientThreadsMutex);
}
}
Loading

0 comments on commit 32ef926

Please sign in to comment.