From a06e2c2f225d76b67b0058a9880222f75d5495c3 Mon Sep 17 00:00:00 2001 From: Michele Calgaro Date: Wed, 9 Dec 2020 14:11:22 +0900 Subject: Renaming of files in preparation for code style tools. Signed-off-by: Michele Calgaro --- .../jingle/libjingle/talk/p2p/base/relayport.cpp | 640 +++++++++++++++++++++ 1 file changed, 640 insertions(+) create mode 100644 kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/relayport.cpp (limited to 'kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/relayport.cpp') diff --git a/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/relayport.cpp b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/relayport.cpp new file mode 100644 index 00000000..4ba12be3 --- /dev/null +++ b/kopete/protocols/jabber/jingle/libjingle/talk/p2p/base/relayport.cpp @@ -0,0 +1,640 @@ +/* + * libjingle + * Copyright 2004--2005, Google Inc. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#if defined(_MSC_VER) && _MSC_VER < 1300 +#pragma warning(disable:4786) +#endif +#include "talk/base/logging.h" +#include "talk/base/asynctcpsocket.h" +#include "talk/p2p/base/relayport.h" +#include "talk/p2p/base/helpers.h" +#include +#include +#ifdef OSX +#include +#endif + +#if defined(_MSC_VER) && _MSC_VER < 1300 +namespace std { + using ::strerror; +} +#endif + +#ifdef POSIX +extern "C" { +#include +} +#endif // POSIX + +namespace cricket { + +const int KEEPALIVE_DELAY = 10 * 60 * 1000; +const int RETRY_DELAY = 50; // 50ms, from ICE spec +const uint32 RETRY_TIMEOUT = 50 * 1000; // ICE says 50 secs + +const uint32 MSG_DISPOSE_SOCKET = 100; // needs to be more than ID used by Port +typedef TypedMessageData DisposeSocketData; + +class AsyncTCPSocket; + +// Manages a single connection to the relayserver. We aim to use each +// connection for only a specific destination address so that we can avoid +// wrapping every packet in a STUN send / data indication. +class RelayEntry : public sigslot::has_slots<> { +public: + RelayEntry(RelayPort* port, const SocketAddress& ext_addr, const SocketAddress& local_addr); + ~RelayEntry(); + + RelayPort* port() { return port_; } + + const SocketAddress& address() { return ext_addr_; } + void set_address(const SocketAddress& addr) { ext_addr_ = addr; } + + AsyncPacketSocket* socket() { return socket_; } + + bool connected() { return connected_; } + void set_connected(bool connected) { connected_ = connected; } + + bool locked() { return locked_; } + + // Returns the last error on the socket of this entry. + int GetError() { return socket_->GetError(); } + + // Sends the STUN requests to the server to initiate this connection. + void Connect(); + + // Called when this entry becomes connected. The address given is the one + // exposed to the outside world on the relay server. + void OnConnect(const SocketAddress& mapped_addr); + + // Sends a packet to the given destination address using the socket of this + // entry. This will wrap the packet in STUN if necessary. + int SendTo(const void* data, size_t size, const SocketAddress& addr); + + // Schedules a keep-alive allocate request. + void ScheduleKeepAlive(); + + void SetServerIndex(size_t sindex) { server_index_ = sindex; } + size_t ServerIndex() const { return server_index_; } + + // Try a different server address + void HandleConnectFailure(); + +private: + RelayPort* port_; + SocketAddress ext_addr_, local_addr_; + size_t server_index_; + AsyncPacketSocket* socket_; + bool connected_; + bool locked_; + StunRequestManager requests_; + + // Called when a TCP connection is established or fails + void OnSocketConnect(AsyncTCPSocket* socket); + void OnSocketClose(AsyncTCPSocket* socket, int error); + + // Called when a packet is received on this socket. + void OnReadPacket( + const char* data, size_t size, const SocketAddress& remote_addr, + AsyncPacketSocket* socket); + + // Called on behalf of a StunRequest to write data to the socket. This is + // already STUN intended for the server, so no wrapping is necessary. + void OnSendPacket(const void* data, size_t size); + + // Sends the given data on the socket to the server with no wrapping. This + // returns the number of bytes written or -1 if an error occurred. + int SendPacket(const void* data, size_t size); +}; + +// Handles an allocate request for a particular RelayEntry. +class AllocateRequest : public StunRequest { +public: + AllocateRequest(RelayEntry* entry); + virtual ~AllocateRequest() {} + + virtual void Prepare(StunMessage* request); + + virtual int GetNextDelay(); + + virtual void OnResponse(StunMessage* response); + virtual void OnErrorResponse(StunMessage* response); + virtual void OnTimeout(); + +private: + RelayEntry* entry_; + uint32 start_time_; +}; + +const std::string RELAY_PORT_TYPE("relay"); + +RelayPort::RelayPort( + Thread* thread, SocketFactory* factory, Network* network, + const SocketAddress& local_addr, const std::string& username, + const std::string& password, const std::string& magic_cookie) + : Port(thread, RELAY_PORT_TYPE, factory, network), local_addr_(local_addr), + ready_(false), magic_cookie_(magic_cookie), error_(0) { + + entries_.push_back(new RelayEntry(this, SocketAddress(), local_addr_)); + + set_username_fragment(username); + set_password(password); + + if (magic_cookie_.size() == 0) + magic_cookie_.append(STUN_MAGIC_COOKIE_VALUE, 4); +} + +RelayPort::~RelayPort() { + for (unsigned i = 0; i < entries_.size(); ++i) + delete entries_[i]; + thread_->Clear(this); +} + +void RelayPort::AddServerAddress(const ProtocolAddress& addr) { + // Since HTTP proxies usually only allow 443, let's up the priority on PROTO_SSLTCP + if ((addr.proto == PROTO_SSLTCP) + && ((proxy().type == PROXY_HTTPS) || (proxy().type == PROXY_UNKNOWN))) { + server_addr_.push_front(addr); + } else { + server_addr_.push_back(addr); + } +} + +void RelayPort::AddExternalAddress(const ProtocolAddress& addr) { + std::string proto_name = ProtoToString(addr.proto); + for (std::vector::const_iterator it = candidates().begin(); it != candidates().end(); ++it) { + if ((it->address() == addr.address) && (it->protocol() == proto_name)) { + LOG(INFO) << "Redundant relay address: " << proto_name << " @ " << addr.address.ToString(); + return; + } + } + add_address(addr.address, proto_name, false); +} + +void RelayPort::SetReady() { + if (!ready_) { + ready_ = true; + SignalAddressReady(this); + } +} + +const ProtocolAddress * RelayPort::ServerAddress(size_t index) const { + if ((index >= 0) && (index < server_addr_.size())) + return &server_addr_[index]; + return 0; +} + +bool RelayPort::HasMagicCookie(const char* data, size_t size) { + if (size < 24 + magic_cookie_.size()) { + return false; + } else { + return 0 == std::memcmp(data + 24, + magic_cookie_.c_str(), + magic_cookie_.size()); + } +} + +void RelayPort::PrepareAddress() { + // We initiate a connect on the first entry. If this completes, it will fill + // in the server address as the address of this port. + assert(entries_.size() == 1); + entries_[0]->Connect(); + ready_ = false; +} + +Connection* RelayPort::CreateConnection(const Candidate& address, CandidateOrigin origin) { + // We only create connections to non-udp sockets if they are incoming on this port + if ((address.protocol() != "udp") && (origin != ORIGIN_THIS_PORT)) + return 0; + + // We don't support loopback on relays + if (address.type() == type()) + return 0; + + size_t index = 0; + for (size_t i = 0; i < candidates().size(); ++i) { + const Candidate& local = candidates()[i]; + if (local.protocol() == address.protocol()) { + index = i; + break; + } + } + + Connection * conn = new ProxyConnection(this, index, address); + AddConnection(conn); + return conn; +} + +int RelayPort::SendTo(const void* data, + size_t size, + const SocketAddress& addr, bool payload) { + + // Try to find an entry for this specific address. Note that the first entry + // created was not given an address initially, so it can be set to the first + // address that comes along. + + RelayEntry* entry = 0; + + for (unsigned i = 0; i < entries_.size(); ++i) { + if (entries_[i]->address().IsAny() && payload) { + entry = entries_[i]; + entry->set_address(addr); + break; + } else if (entries_[i]->address() == addr) { + entry = entries_[i]; + break; + } + } + + // If we did not find one, then we make a new one. This will not be useable + // until it becomes connected, however. + if (!entry && payload) { + entry = new RelayEntry(this, addr, local_addr_); + if (!entries_.empty()) { + // Use the same port to connect to relay server + entry->SetServerIndex(entries_[0]->ServerIndex()); + } + entry->Connect(); + entries_.push_back(entry); + } + + // If the entry is connected, then we can send on it (though wrapping may + // still be necessary). Otherwise, we can't yet use this connection, so we + // default to the first one. + if (!entry || !entry->connected()) { + assert(!entries_.empty()); + entry = entries_[0]; + if (!entry->connected()) { + error_ = EWOULDBLOCK; + return SOCKET_ERROR; + } + } + + // Send the actual contents to the server using the usual mechanism. + int sent = entry->SendTo(data, size, addr); + if (sent <= 0) { + assert(sent < 0); + error_ = entry->GetError(); + return SOCKET_ERROR; + } + + // The caller of the function is expecting the number of user data bytes, + // rather than the size of the packet. + return (int)size; +} + +void RelayPort::OnMessage(Message *pmsg) { + switch (pmsg->message_id) { + case MSG_DISPOSE_SOCKET: { + DisposeSocketData * data = static_cast(pmsg->pdata); + delete data->data(); + delete data; + break; } + default: + Port::OnMessage(pmsg); + } +} + +int RelayPort::SetOption(Socket::Option opt, int value) { + int result = 0; + for (unsigned i = 0; i < entries_.size(); ++i) { + if (entries_[i]->socket()->SetOption(opt, value) < 0) { + result = -1; + error_ = entries_[i]->socket()->GetError(); + } + } + options_.push_back(OptionValue(opt, value)); + return result; +} + +int RelayPort::GetError() { + return error_; +} + +void RelayPort::OnReadPacket( + const char* data, size_t size, const SocketAddress& remote_addr) { + if (Connection* conn = GetConnection(remote_addr)) { + conn->OnReadPacket(data, size); + } else { + Port::OnReadPacket(data, size, remote_addr); + } +} + +void RelayPort::DisposeSocket(AsyncPacketSocket * socket) { + thread_->Post(this, MSG_DISPOSE_SOCKET, new DisposeSocketData(socket)); +} + +RelayEntry::RelayEntry(RelayPort* port, const SocketAddress& ext_addr, + const SocketAddress& local_addr) + : port_(port), ext_addr_(ext_addr), local_addr_(local_addr), server_index_(0), + socket_(0), connected_(false), locked_(false), requests_(port->thread()) { + + requests_.SignalSendPacket.connect(this, &RelayEntry::OnSendPacket); +} + +RelayEntry::~RelayEntry() { + delete socket_; +} + +void RelayEntry::Connect() { + assert(socket_ == 0); + const ProtocolAddress * ra = port()->ServerAddress(server_index_); + if (!ra) { + LOG(INFO) << "Out of relay server connections"; + return; + } + + LOG(INFO) << "Connecting to relay via " << ProtoToString(ra->proto) << " @ " << ra->address.ToString(); + + socket_ = port_->CreatePacketSocket(ra->proto); + assert(socket_ != 0); + + socket_->SignalReadPacket.connect(this, &RelayEntry::OnReadPacket); + if (socket_->Bind(local_addr_) < 0) + LOG(INFO) << "bind: " << std::strerror(socket_->GetError()); + + for (unsigned i = 0; i < port_->options().size(); ++i) + socket_->SetOption(port_->options()[i].first, port_->options()[i].second); + + if ((ra->proto == PROTO_TCP) || (ra->proto == PROTO_SSLTCP)) { + AsyncTCPSocket * tcp = static_cast(socket_); + tcp->SignalClose.connect(this, &RelayEntry::OnSocketClose); + tcp->SignalConnect.connect(this, &RelayEntry::OnSocketConnect); + tcp->Connect(ra->address); + } else { + requests_.Send(new AllocateRequest(this)); + } +} + +void RelayEntry::OnConnect(const SocketAddress& mapped_addr) { + ProtocolType proto = PROTO_UDP; + LOG(INFO) << "Relay allocate succeeded: " << ProtoToString(proto) << " @ " << mapped_addr.ToString(); + connected_ = true; + + port_->AddExternalAddress(ProtocolAddress(mapped_addr, proto)); + port_->SetReady(); +} + +int RelayEntry::SendTo(const void* data, + size_t size, + const SocketAddress& addr) { + + // If this connection is locked to the address given, then we can send the + // packet with no wrapper. + if (locked_ && (ext_addr_ == addr)) + return SendPacket(data, size); + + // Otherwise, we must wrap the given data in a STUN SEND request so that we + // can communicate the destination address to the server. + // + // Note that we do not use a StunRequest here. This is because there is + // likely no reason to resend this packet. If it is late, we just drop it. + // The next send to this address will try again. + + StunMessage request; + request.SetType(STUN_SEND_REQUEST); + request.SetTransactionID(CreateRandomString(16)); + + StunByteStringAttribute* magic_cookie_attr = + StunAttribute::CreateByteString(STUN_ATTR_MAGIC_COOKIE); + magic_cookie_attr->CopyBytes(port_->magic_cookie().c_str(), + (uint16)port_->magic_cookie().size()); + request.AddAttribute(magic_cookie_attr); + + StunByteStringAttribute* username_attr = + StunAttribute::CreateByteString(STUN_ATTR_USERNAME); + username_attr->CopyBytes(port_->username_fragment().c_str(), + (uint16)port_->username_fragment().size()); + request.AddAttribute(username_attr); + + StunAddressAttribute* addr_attr = + StunAttribute::CreateAddress(STUN_ATTR_DESTINATION_ADDRESS); + addr_attr->SetFamily(1); + addr_attr->SetIP(addr.ip()); + addr_attr->SetPort(addr.port()); + request.AddAttribute(addr_attr); + + // Attempt to lock + if (ext_addr_ == addr) { + StunUInt32Attribute* options_attr = + StunAttribute::CreateUInt32(STUN_ATTR_OPTIONS); + options_attr->SetValue(0x1); + request.AddAttribute(options_attr); + } + + StunByteStringAttribute* data_attr = + StunAttribute::CreateByteString(STUN_ATTR_DATA); + data_attr->CopyBytes(data, (uint16)size); + request.AddAttribute(data_attr); + + // TODO: compute the HMAC. + + ByteBuffer buf; + request.Write(&buf); + + return SendPacket(buf.Data(), buf.Length()); +} + +void RelayEntry::ScheduleKeepAlive() { + requests_.SendDelayed(new AllocateRequest(this), KEEPALIVE_DELAY); +} + +void RelayEntry::HandleConnectFailure() { + //if (GetMillisecondCount() - start_time_ > RETRY_TIMEOUT) + // return; + //ScheduleKeepAlive(); + + connected_ = false; + port()->DisposeSocket(socket_); + socket_ = 0; + server_index_ += 1; + Connect(); +} + +void RelayEntry::OnSocketConnect(AsyncTCPSocket* socket) { + assert(socket == socket_); + LOG(INFO) << "relay tcp connected to " << socket->GetRemoteAddress().ToString(); + requests_.Send(new AllocateRequest(this)); +} + +void RelayEntry::OnSocketClose(AsyncTCPSocket* socket, int error) { + assert(socket == socket_); + PLOG(LERROR, error) << "relay tcp connect failed"; + HandleConnectFailure(); +} + +void RelayEntry::OnReadPacket(const char* data, + size_t size, + const SocketAddress& remote_addr, + AsyncPacketSocket* socket) { + assert(socket == socket_); + //assert(remote_addr == port_->server_addr()); TODO: are we worried about this? + + // If the magic cookie is not present, then this is an unwrapped packet sent + // by the server, The actual remote address is the one we recorded. + if (!port_->HasMagicCookie(data, size)) { + if (locked_) { + port_->OnReadPacket(data, size, ext_addr_); + } else { + LOG(WARNING) << "Dropping packet: entry not locked"; + } + return; + } + + ByteBuffer buf(data, size); + StunMessage msg; + if (!msg.Read(&buf)) { + LOG(INFO) << "Incoming packet was not STUN"; + return; + } + + // The incoming packet should be a STUN ALLOCATE response, SEND response, or + // DATA indication. + if (requests_.CheckResponse(&msg)) { + return; + } else if (msg.type() == STUN_SEND_RESPONSE) { + if (const StunUInt32Attribute* options_attr = msg.GetUInt32(STUN_ATTR_OPTIONS)) { + if (options_attr->value() & 0x1) { + locked_ = true; + } + } + return; + } else if (msg.type() != STUN_DATA_INDICATION) { + LOG(INFO) << "Received BAD stun type from server: " << msg.type() + ; + return; + } + + // This must be a data indication. + + const StunAddressAttribute* addr_attr = + msg.GetAddress(STUN_ATTR_SOURCE_ADDRESS2); + if (!addr_attr) { + LOG(INFO) << "Data indication has no source address"; + return; + } else if (addr_attr->family() != 1) { + LOG(INFO) << "Source address has bad family"; + return; + } + + SocketAddress remote_addr2(addr_attr->ip(), addr_attr->port()); + + const StunByteStringAttribute* data_attr = msg.GetByteString(STUN_ATTR_DATA); + if (!data_attr) { + LOG(INFO) << "Data indication has no data"; + return; + } + + // Process the actual data and remote address in the normal manner. + port_->OnReadPacket(data_attr->bytes(), data_attr->length(), remote_addr2); +} + +void RelayEntry::OnSendPacket(const void* data, size_t size) { + SendPacket(data, size); +} + +int RelayEntry::SendPacket(const void* data, size_t size) { + const ProtocolAddress * ra = port_->ServerAddress(server_index_); + if (!ra) { + socket_->SetError(ENOTCONN); + return SOCKET_ERROR; + } + int sent = socket_->SendTo(data, size, ra->address); + if (sent <= 0) { + LOG(LS_VERBOSE) << "sendto: " << std::strerror(socket_->GetError()); + assert(sent < 0); + } + return sent; +} + +AllocateRequest::AllocateRequest(RelayEntry* entry) : entry_(entry) { + start_time_ = GetMillisecondCount(); +} + +void AllocateRequest::Prepare(StunMessage* request) { + request->SetType(STUN_ALLOCATE_REQUEST); + + StunByteStringAttribute* magic_cookie_attr = + StunAttribute::CreateByteString(STUN_ATTR_MAGIC_COOKIE); + magic_cookie_attr->CopyBytes( + entry_->port()->magic_cookie().c_str(), + (uint16)entry_->port()->magic_cookie().size()); + request->AddAttribute(magic_cookie_attr); + + StunByteStringAttribute* username_attr = + StunAttribute::CreateByteString(STUN_ATTR_USERNAME); + username_attr->CopyBytes( + entry_->port()->username_fragment().c_str(), + (uint16)entry_->port()->username_fragment().size()); + request->AddAttribute(username_attr); +} + +int AllocateRequest::GetNextDelay() { + int delay = 100 * _max(1 << count_, 2); + count_ += 1; + if (count_ == 5) + timeout_ = true; + return delay; +} + +void AllocateRequest::OnResponse(StunMessage* response) { + const StunAddressAttribute* addr_attr = + response->GetAddress(STUN_ATTR_MAPPED_ADDRESS); + if (!addr_attr) { + LOG(INFO) << "Allocate response missing mapped address."; + } else if (addr_attr->family() != 1) { + LOG(INFO) << "Mapped address has bad family"; + } else { + SocketAddress addr(addr_attr->ip(), addr_attr->port()); + entry_->OnConnect(addr); + } + + // We will do a keep-alive regardless of whether this request suceeds. + // This should have almost no impact on network usage. + entry_->ScheduleKeepAlive(); +} + +void AllocateRequest::OnErrorResponse(StunMessage* response) { + const StunErrorCodeAttribute* attr = response->GetErrorCode(); + if (!attr) { + LOG(INFO) << "Bad allocate response error code"; + } else { + LOG(INFO) << "Allocate error response:" + << " code=" << static_cast(attr->error_code()) + << " reason='" << attr->reason() << "'"; + } + + if (GetMillisecondCount() - start_time_ <= RETRY_TIMEOUT) + entry_->ScheduleKeepAlive(); +} + +void AllocateRequest::OnTimeout() { + LOG(INFO) << "Allocate request timed out"; + entry_->HandleConnectFailure(); +} + +} // namespace cricket -- cgit v1.2.3