Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connect to tunnel server to launch remote server #4234

Merged
merged 3 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions source/adios2/core/ADIOS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ class ADIOS::GlobalServices
};

ADIOS::GlobalServices ADIOS::m_GlobalServices;
adios2::HostOptions *StaticHostOptions = nullptr;
static std::mutex StaticHostOptionsMutex;

std::mutex PerfStubsMutex;
static std::atomic_uint adios_refcount(0); // adios objects at the same time
Expand All @@ -112,6 +114,8 @@ const adios2::UserOptions &ADIOS::GetUserOptions() { return m_UserOptions; };

/** A constant reference to the host options from ~/.config/adios2/hosts.yaml */
const adios2::HostOptions &ADIOS::GetHostOptions() { return m_HostOptions; };
/** A constant reference to the host options from ~/.config/adios2/hosts.yaml */
const adios2::HostOptions &ADIOS::StaticGetHostOptions() { return *StaticHostOptions; };

ADIOS::ADIOS(const std::string configFile, helper::Comm comm, const std::string hostLanguage)
: m_HostLanguage(hostLanguage), m_Comm(std::move(comm)), m_ConfigFile(configFile),
Expand Down Expand Up @@ -229,6 +233,10 @@ void ADIOS::ProcessHostConfig()
{
helper::ParseHostOptionsFile(m_Comm, cfgFile, m_HostOptions, homePath);
}
{
std::lock_guard<std::mutex> lck(StaticHostOptionsMutex);
StaticHostOptions = &m_HostOptions;
}
}

IO &ADIOS::DeclareIO(const std::string name, const ArrayOrdering ArrayOrder)
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/core/ADIOS.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ class ADIOS
public:
/** Global service AWS SDK initialization */
static void Global_init_AWS_API();

static const adios2::HostOptions &StaticGetHostOptions();
};

} // end namespace core
Expand Down
1 change: 1 addition & 0 deletions source/adios2/engine/bp5/BP5Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class BP5Engine
MACRO(FlattenSteps, Bool, bool, false) \
MACRO(IgnoreFlattenSteps, Bool, bool, false) \
MACRO(RemoteDataPath, String, std::string, "") \
MACRO(RemoteHost, String, std::string, "") \
MACRO(MaxOpenFilesAtOnce, UInt, unsigned int, UINT_MAX)

struct BP5Params
Expand Down
10 changes: 5 additions & 5 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,17 +299,17 @@ void BP5Reader::PerformGets()
#ifdef ADIOS2_HAVE_XROOTD
if (getenv("DoXRootD"))
{
m_Remote = std::unique_ptr<XrootdRemote>(new XrootdRemote());
m_Remote = std::unique_ptr<XrootdRemote>(new XrootdRemote(m_HostOptions));
m_Remote->Open("localhost", 1094, m_Name, m_OpenMode, RowMajorOrdering);
}
else
#endif
#ifdef ADIOS2_HAVE_SST
if (getenv("DoRemote"))
{
m_Remote = std::unique_ptr<EVPathRemote>(new EVPathRemote());
m_Remote->Open("localhost", EVPathRemoteCommon::ServerPort, RemoteName, m_OpenMode,
RowMajorOrdering);
m_Remote = std::unique_ptr<EVPathRemote>(new EVPathRemote(m_HostOptions));
int localPort =
m_Remote->LaunchRemoteServerViaConnectionManager(m_Parameters.RemoteHost);
m_Remote->Open("localhost", localPort, RemoteName, m_OpenMode, RowMajorOrdering);
}
#endif
#ifdef ADIOS2_HAVE_KVCACHE
Expand Down
1 change: 1 addition & 0 deletions source/adios2/engine/campaign/CampaignReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ void CampaignReader::InitTransports()
SaveToFile(m_DB, localPath + PathSeparator + bpf.name, bpf);
}
io.SetParameter("RemoteDataPath", remotePath);
io.SetParameter("RemoteHost", m_CampaignData.hosts[ds.hostIdx].hostname);
}
}
else
Expand Down
134 changes: 129 additions & 5 deletions source/adios2/helper/adiosNetwork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@

#ifndef _WIN32

#include <netdb.h> //getFQDN
#include <netinet/in.h>
#include <sys/socket.h> //getFQDN
#include <sys/types.h> //getFQDN
#include <unistd.h> // gethostname

#include <arpa/inet.h>
#include <netdb.h> //getFQDN
#include <sys/types.h> //getFQDN
#include <unistd.h> // gethostname
#define SOCKET int

#if defined(ADIOS2_HAVE_DATAMAN) || defined(ADIOS2_HAVE_TABLE)

Expand All @@ -27,13 +31,24 @@
#include <arpa/inet.h> //AvailableIpAddresses() inet_ntoa
#include <net/if.h> //AvailableIpAddresses() struct if_nameindex
#include <netinet/in.h> //AvailableIpAddresses() struct sockaddr_in
#include <sys/ioctl.h> //AvailableIpAddresses() ioctl

#include <nlohmann_json.hpp>
#include <sys/ioctl.h> //AvailableIpAddresses() ioctl

#endif // ADIOS2_HAVE_DATAMAN || ADIOS2_HAVE_TABLE

#else // _WIN32
#define FD_SETSIZE 1024
#include <process.h>
#include <time.h>
#include <winsock2.h> // SOCKET struct

#include <WS2tcpip.h>
#define getpid() _getpid()
#define read(fd, buf, len) recv(fd, (buf), (len), 0)
#define write(fd, buf, len) send(fd, buf, (len), 0)
#define close(x) closesocket(x)
#define INST_ADDRSTRLEN 50

#include <tchar.h>
#include <windows.h> // GetComputerName
#endif // _WIN32
Expand Down Expand Up @@ -321,5 +336,114 @@ void HandshakeReader(Comm const &comm, size_t &appID, std::vector<std::string> &
#endif // ADIOS2_HAVE_DATAMAN || ADIOS2_HAVE_TABLE
#endif // _WIN32

struct NetworkSocketData
{
sockaddr_in m_Sockaddr;
SOCKET m_Socket;
};

NetworkSocket::NetworkSocket()
{
m_Data = new NetworkSocketData();
m_Data->m_Socket = -1;
};

NetworkSocket::~NetworkSocket() { delete m_Data; };

bool NetworkSocket::valid() const { return (m_Data->m_Socket > 0); }

static sockaddr_in ResolveHostName(std::string m_hostname, uint16_t m_server_port)
{
sockaddr_in sockaddr;

#define _WINSOCK_DEPRECATED_NO_WARNINGS 1
struct hostent *hostent = gethostbyname(m_hostname.c_str());
if (hostent == NULL)
{
helper::Throw<std::ios_base::failure>("Helper", "helper:adiosNetwork", "ResolveHostName",
"error: gethostbyname " + m_hostname);
}

uint32_t addr_tmp = inet_addr(inet_ntoa(*(struct in_addr *)*(hostent->h_addr_list)));
if (addr_tmp == INADDR_NONE)
{
helper::Throw<std::ios_base::failure>("Helper", "helper:adiosNetwork", "ResolveHostName",
"error: inet_addr " +
std::string(*(hostent->h_addr_list)));
}

sockaddr.sin_addr.s_addr = addr_tmp;
sockaddr.sin_family = AF_INET;
sockaddr.sin_port = htons(m_server_port);
return sockaddr;
}

void NetworkSocket::Connect(std::string hostname, uint16_t port, std::string protocol)
{
struct protoent *protoent = getprotobyname(protocol.c_str());
if (protoent == NULL)
{
helper::Throw<std::ios_base::failure>("Helper", "helper:adiosNetwork", "ConnectToServer",
"error: Cannot make getprotobyname \"" + protocol +
"\"");
}

m_Data->m_Sockaddr = ResolveHostName(hostname, port);

m_Data->m_Socket = socket(AF_INET, SOCK_STREAM, protoent->p_proto);
if (m_Data->m_Socket == -1)
{
helper::Throw<std::ios_base::failure>("Helper", "helper:adiosNetwork", "ConnectToServer",
"error: Cannot create socket");
}

int result = connect(m_Data->m_Socket, (sockaddr *)&(m_Data->m_Sockaddr), sizeof(sockaddr));

if (result == -1)
{
helper::Throw<std::ios_base::failure>("Helper", "helper:adiosNetwork", "ConnectToServer",
"error: Cannot connect to server at " + hostname +
":" + std::to_string(port));
}
}

void NetworkSocket::RequestResponse(const std::string &request, char *response,
size_t maxResponseSize)
{
#ifdef _WIN32
int result;
int len = static_cast<int>(request.length());
int maxlen = static_cast<int>(maxResponseSize) - 1;
#else
ssize_t result;
size_t len = request.length();
size_t maxlen = maxResponseSize - 1;
#endif
result = write(m_Data->m_Socket, request.c_str(), len);
if (result == -1)
{
helper::Throw<std::ios_base::failure>("Helper", "helper:adiosNetwork", "RequestResponse",
"error: Cannot send request");
}

result = read(m_Data->m_Socket, response, maxlen);
if (result == -1)
{
helper::Throw<std::ios_base::failure>("Helper", "helper:adiosNetwork", "RequestResponse",
"error: Cannot get response");
}
// safely null terminate
response[result] = '\0';
}

void NetworkSocket::Close()
{
if (m_Data->m_Socket != -1)
{
close(m_Data->m_Socket);
m_Data->m_Socket = -1;
}
}

} // end namespace helper
} // end namespace adios2
19 changes: 19 additions & 0 deletions source/adios2/helper/adiosNetwork.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#define ADIOS2_HELPER_ADIOSNETWORK_H_

/// \cond EXCLUDE_FROM_DOXYGEN
#include <memory>
#include <string>
#include <vector>
/// \endcond
Expand Down Expand Up @@ -60,6 +61,24 @@ void HandshakeReader(Comm const &comm, size_t &appID, std::vector<std::string> &
#endif // ADIOS2_HAVE_DATAMAN || ADIOS2_HAVE_TABLE
#endif // _WIN32

struct NetworkSocketData;

class NetworkSocket
{
public:
NetworkSocket();
~NetworkSocket();

bool valid() const;

void Connect(std::string hostname, uint16_t port, std::string protocol = "tcp");
void RequestResponse(const std::string &request, char *response, size_t maxResponseSize);
void Close();

private:
NetworkSocketData *m_Data;
};

} // end namespace helper
} // end namespace adios2

Expand Down
2 changes: 1 addition & 1 deletion source/adios2/toolkit/remote/EVPathRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
namespace adios2
{

EVPathRemote::EVPathRemote() {}
EVPathRemote::EVPathRemote(const adios2::HostOptions &hostOptions) : Remote(hostOptions) {}

#ifdef ADIOS2_HAVE_SST
EVPathRemote::~EVPathRemote()
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/toolkit/remote/EVPathRemote.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class EVPathRemote : public Remote
* @param type from derived class
* @param comm passed to m_Comm
*/
EVPathRemote();
EVPathRemote(const adios2::HostOptions &hostOptions);
~EVPathRemote();

explicit operator bool() const { return m_Active; }
Expand Down
92 changes: 91 additions & 1 deletion source/adios2/toolkit/remote/Remote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
#include "EVPathRemote.h"
#include "adios2/core/ADIOS.h"
#include "adios2/helper/adiosLog.h"
#include "adios2/helper/adiosNetwork.h"
#include "adios2/helper/adiosString.h"
#include "adios2/helper/adiosSystem.h"
#ifdef _MSC_VER
#define strdup(x) _strdup(x)
#define strtok_r(str, delim, saveptr) strtok_s(str, delim, saveptr)
#endif

#define ThrowUp(x) \
Expand Down Expand Up @@ -52,6 +54,94 @@ Remote::GetHandle Remote::Read(size_t Start, size_t Size, void *Dest)
return (Remote::GetHandle)0;
};
Remote::~Remote() {}
Remote::Remote() {}
Remote::Remote(const adios2::HostOptions &hostOptions)
: m_HostOptions(std::make_shared<adios2::HostOptions>(hostOptions))
{
}

int Remote::LaunchRemoteServerViaConnectionManager(const std::string remoteHost)
{
if (remoteHost.empty() || remoteHost == "localhost")
{
// std::cout << "Remote::LaunchRemoteServerViaConnectionManager: Assume server is already "
// "running at on localhost at port = "
// << 26200 << std::endl;
return 26200;
}

helper::NetworkSocket socket;
socket.Connect("localhost", 30000);

struct adios2::HostConfig *hostconf = nullptr;

auto it = m_HostOptions->find(remoteHost);
if (it != m_HostOptions->end())
{
for (auto &ho : it->second)
{
if (ho.protocol == HostAccessProtocol::SSH)
{
hostconf = &ho;
}
}
}
if (!hostconf)
{
helper::Throw<std::invalid_argument>("Toolkit", "Remote", "EstablishConnection",
"No ssh configuration found for host " + remoteHost +
". Add config in ~/.config/adios2/hosts.yaml");
}

std::string request = "/run_service?group=" + remoteHost + "&service=" + hostconf->name;

char response[2048];
socket.RequestResponse(request, response, 2048);

// responses:
// port:-1,msg:incomplete_service_definition
// port:-1,msg:missing_service_in_request
// port:26200,cookie:0xd93d91e3643c9869,msg:no_error

char *token;
char *rest = response;

int serverPort = -1;
std::string cookie;

// std::cout << "Response = \"" << response << "\"" << std::endl;
while ((token = strtok_r(rest, ",", &rest)))
{
char *key;
char *value = token;
key = strtok_r(value, ":", &value);
if (!strncmp(key, "port", 4))
{
serverPort = atoi(value);
}
else if (!strncmp(key, "cookie", 6))
{
cookie = std::string(value);
}
else if (!strncmp(key, "msg", 3))
{
if (strcmp(value, "no_error"))
{
helper::Throw<std::invalid_argument>("Toolkit", "Remote", "EstablishConnection",
"Error response from connection manager: " +
std::string(value));
}
}
else
{
helper::Throw<std::invalid_argument>(
"Toolkit", "Remote", "EstablishConnection",
"Invalid response from connection manager. Do not understand key " +
std::string(key));
}
}

socket.Close();
return serverPort;
}

} // end namespace adios2
Loading
Loading