diff --git a/AddressBook.cpp b/AddressBook.cpp index db424308..685acbc2 100644 --- a/AddressBook.cpp +++ b/AddressBook.cpp @@ -842,7 +842,7 @@ namespace client else memset (response + 8, 0, 32); // not found memset (response + 40, 0, 4); // set expiration time to zero - m_LocalDestination->GetDatagramDestination ()->SendDatagramTo (response, 44, from.GetIdentHash (), toPort, fromPort); + m_LocalDestination->GetDatagramDestination ()->SendDatagramTo (response, 44, from.GetIdentHash(), toPort, fromPort); } void AddressResolver::AddAddress (const std::string& name, const i2p::data::IdentHash& ident) diff --git a/Datagram.cpp b/Datagram.cpp index d0b0737a..a06a8e5d 100644 --- a/Datagram.cpp +++ b/Datagram.cpp @@ -22,9 +22,9 @@ namespace datagram { m_Sessions.clear(); } - - void DatagramDestination::SendDatagramTo (const uint8_t * payload, size_t len, const i2p::data::IdentHash& ident, uint16_t fromPort, uint16_t toPort) - { + + void DatagramDestination::SendDatagramTo(const uint8_t * payload, size_t len, const i2p::data::IdentHash & identity, uint16_t fromPort, uint16_t toPort) + { auto owner = m_Owner; std::vector v(MAX_DATAGRAM_SIZE); uint8_t * buf = v.data(); @@ -45,8 +45,7 @@ namespace datagram owner->Sign (buf1, len, signature); auto msg = CreateDataMessage (buf, len + headerLen, fromPort, toPort); - auto session = ObtainSession(ident); - session->SendMsg(msg); + ObtainSession(identity)->SendMsg(msg); } @@ -69,6 +68,8 @@ namespace datagram if (verified) { + auto h = identity.GetIdentHash(); + ObtainSession(h)->Ack(); auto r = FindReceiver(toPort); if(r) r(identity, fromPort, toPort, buf + headerLen, len -headerLen); @@ -138,15 +139,15 @@ namespace datagram } } - std::shared_ptr DatagramDestination::ObtainSession(const i2p::data::IdentHash & ident) + std::shared_ptr DatagramDestination::ObtainSession(const i2p::data::IdentHash & identity) { std::shared_ptr session = nullptr; std::lock_guard lock(m_SessionsMutex); - auto itr = m_Sessions.find(ident); + auto itr = m_Sessions.find(identity); if (itr == m_Sessions.end()) { // not found, create new session - session = std::make_shared(m_Owner, ident); - m_Sessions[ident] = session; + session = std::make_shared(m_Owner, identity); + m_Sessions[identity] = session; } else { session = itr->second; } @@ -164,13 +165,13 @@ namespace datagram } DatagramSession::DatagramSession(i2p::client::ClientDestination * localDestination, - const i2p::data::IdentHash & remoteIdent) : + const i2p::data::IdentHash & remoteIdent) : m_LocalDestination(localDestination), - m_RemoteIdentity(remoteIdent), - m_LastUse(i2p::util::GetMillisecondsSinceEpoch ()), - m_LastPathChange(0), - m_LastSuccess(0) + m_RemoteIdent(remoteIdent), + m_SendQueueTimer(localDestination->GetService()) { + m_LastUse = i2p::util::GetMillisecondsSinceEpoch (); + ScheduleFlushSendQueue(); } void DatagramSession::SendMsg(std::shared_ptr msg) @@ -184,262 +185,150 @@ namespace datagram DatagramSession::Info DatagramSession::GetSessionInfo() const { if(!m_RoutingSession) - return DatagramSession::Info(nullptr, nullptr, m_LastUse, m_LastSuccess); + return DatagramSession::Info(nullptr, nullptr, m_LastUse); auto routingPath = m_RoutingSession->GetSharedRoutingPath(); if (!routingPath) - return DatagramSession::Info(nullptr, nullptr, m_LastUse, m_LastSuccess); + return DatagramSession::Info(nullptr, nullptr, m_LastUse); auto lease = routingPath->remoteLease; auto tunnel = routingPath->outboundTunnel; if(lease) { if(tunnel) - return DatagramSession::Info(lease->tunnelGateway, tunnel->GetEndpointIdentHash(), m_LastUse, m_LastSuccess); + return DatagramSession::Info(lease->tunnelGateway, tunnel->GetEndpointIdentHash(), m_LastUse); else - return DatagramSession::Info(lease->tunnelGateway, nullptr, m_LastUse, m_LastSuccess); + return DatagramSession::Info(lease->tunnelGateway, nullptr, m_LastUse); } else if(tunnel) - return DatagramSession::Info(nullptr, tunnel->GetEndpointIdentHash(), m_LastUse, m_LastSuccess); + return DatagramSession::Info(nullptr, tunnel->GetEndpointIdentHash(), m_LastUse); else - return DatagramSession::Info(nullptr, nullptr, m_LastUse, m_LastSuccess); + return DatagramSession::Info(nullptr, nullptr, m_LastUse); } + + void DatagramSession::Ack() + { + m_LastUse = i2p::util::GetMillisecondsSinceEpoch(); + auto path = GetSharedRoutingPath(); + if(path) + path->updateTime = i2p::util::GetSecondsSinceEpoch (); + } + + std::shared_ptr DatagramSession::GetSharedRoutingPath () + { + if(!m_RoutingSession) { + if(!m_RemoteLeaseSet) { + m_RemoteLeaseSet = m_LocalDestination->FindLeaseSet(m_RemoteIdent); + } + if(!m_RemoteLeaseSet) { + // no remote lease set + return nullptr; + } + m_RoutingSession = m_LocalDestination->GetRoutingSession(m_RemoteLeaseSet, true); + } + auto path = m_RoutingSession->GetSharedRoutingPath(); + if(path) { + if (m_CurrentOutboundTunnel && !m_CurrentOutboundTunnel->IsEstablished()) { + // bad outbound tunnel, switch outbound tunnel + m_CurrentOutboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(m_CurrentOutboundTunnel); + path->outboundTunnel = m_CurrentOutboundTunnel; + } + if(m_CurrentRemoteLease && ! m_CurrentRemoteLease->ExpiresWithin(DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW)) { + // bad lease, switch to next one + if(m_RemoteLeaseSet) { + auto ls = m_RemoteLeaseSet->GetNonExpiredLeasesExcluding([&](const i2p::data::Lease& l) -> bool { + return l.tunnelGateway == m_CurrentRemoteLease->tunnelGateway || l.endDate <= m_CurrentRemoteLease->endDate; + }); + auto sz = ls.size(); + if (sz) { + auto idx = rand() % sz; + m_CurrentRemoteLease = ls[idx]; + } else { + // no more leases, bail + LogPrint(eLogWarning, "DatagramSession: no more valid remote leases to ", m_RemoteIdent.ToBase32()); + } + } else { + // no remote lease set? + LogPrint(eLogWarning, "DatagramSession: no cached remote lease set for ", m_RemoteIdent.ToBase32()); + } + path->remoteLease = m_CurrentRemoteLease; + } + } else { + // no current path, make one + path = std::make_shared(); + // switch outbound tunnel if bad + if(m_CurrentOutboundTunnel == nullptr || ! m_CurrentOutboundTunnel->IsEstablished()) { + m_CurrentOutboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(m_CurrentOutboundTunnel); + } + // switch lease if bad + if(m_CurrentRemoteLease == nullptr || m_CurrentRemoteLease->ExpiresWithin(DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW)) { + if(!m_RemoteLeaseSet) { + m_RemoteLeaseSet = m_LocalDestination->FindLeaseSet(m_RemoteIdent); + } + if(m_RemoteLeaseSet) { + // pick random next good lease + auto ls = m_RemoteLeaseSet->GetNonExpiredLeasesExcluding([&] (const i2p::data::Lease & l) -> bool { + if(m_CurrentRemoteLease) + return l.tunnelGateway == m_CurrentRemoteLease->tunnelGateway; + return false; + }); + auto sz = ls.size(); + if(sz) { + auto idx = rand() % sz; + m_CurrentRemoteLease = ls[idx]; + } + } else { + // no remote lease set currently, bail + LogPrint(eLogWarning, "DatagramSession: no remote lease set found for ", m_RemoteIdent.ToBase32()); + return nullptr; + } + } + path->outboundTunnel = m_CurrentOutboundTunnel; + path->remoteLease = m_CurrentRemoteLease; + m_RoutingSession->SetSharedRoutingPath(path); + } + return path; + } + + void DatagramSession::HandleLeaseSetUpdated(std::shared_ptr ls) + { + // only update lease set if found and newer than previous lease set + uint64_t oldExpire = 0; + if(m_RemoteLeaseSet) oldExpire = m_RemoteLeaseSet->GetExpirationTime(); + if(ls && ls->GetExpirationTime() > oldExpire) m_RemoteLeaseSet = ls; + } + void DatagramSession::HandleSend(std::shared_ptr msg) { - if(!m_RoutingSession) - { - // try to get one - if(m_RemoteLeaseSet) m_RoutingSession = m_LocalDestination->GetRoutingSession(m_RemoteLeaseSet, true); - else - { - UpdateLeaseSet(msg); - return; - } - } - // do we have a routing session? - if(m_RoutingSession) - { - // should we switch paths? - if(ShouldUpdateRoutingPath ()) - { - LogPrint(eLogDebug, "DatagramSession: try getting new routing path"); - // try switching paths - auto path = GetNextRoutingPath(); - if(path) - UpdateRoutingPath (path); - else - ResetRoutingPath(); - } - auto routingPath = m_RoutingSession->GetSharedRoutingPath (); - // make sure we have a routing path - if (routingPath) - { - auto outboundTunnel = routingPath->outboundTunnel; - if (outboundTunnel) - { - if(outboundTunnel->IsEstablished()) - { - m_LastSuccess = i2p::util::GetMillisecondsSinceEpoch (); - // we have a routing path and routing session and the outbound tunnel we are using is good - // wrap message with routing session and send down routing path's outbound tunnel wrapped for the IBGW - auto m = m_RoutingSession->WrapSingleMessage(msg); - routingPath->outboundTunnel->SendTunnelDataMsg({i2p::tunnel::TunnelMessageBlock{ - i2p::tunnel::eDeliveryTypeTunnel, - routingPath->remoteLease->tunnelGateway, routingPath->remoteLease->tunnelID, - m - }}); - return; - } - } - } - } - auto now = i2p::util::GetMillisecondsSinceEpoch (); - // if this path looks dead reset the routing path since we didn't seem to be able to get a path in time - if (m_LastPathChange && now - m_LastPathChange >= DATAGRAM_SESSION_PATH_TIMEOUT ) ResetRoutingPath(); - UpdateLeaseSet(msg); - + m_SendQueue.push_back(msg); + // flush queue right away if full + if(m_SendQueue.size() >= DATAGRAM_SEND_QUEUE_MAX_SIZE) FlushSendQueue(); } - void DatagramSession::UpdateRoutingPath(const std::shared_ptr & path) + void DatagramSession::FlushSendQueue () { - if(m_RoutingSession == nullptr && m_RemoteLeaseSet) - m_RoutingSession = m_LocalDestination->GetRoutingSession(m_RemoteLeaseSet, true); - if(!m_RoutingSession) return; - // set routing path and update time we last updated the routing path - m_RoutingSession->SetSharedRoutingPath (path); - m_LastPathChange = i2p::util::GetMillisecondsSinceEpoch (); - } - bool DatagramSession::ShouldUpdateRoutingPath() const - { - bool dead = m_RoutingSession == nullptr || m_RoutingSession->GetSharedRoutingPath () == nullptr; - auto now = i2p::util::GetMillisecondsSinceEpoch (); - // we need to rotate paths becuase the routing path is too old - // if (now - m_LastPathChange >= DATAGRAM_SESSION_PATH_SWITCH_INTERVAL) return true; - // too fast switching paths - if (now - m_LastPathChange < DATAGRAM_SESSION_PATH_MIN_LIFETIME ) return false; - // our path looks dead so we need to rotate paths - if (now - m_LastSuccess >= DATAGRAM_SESSION_PATH_TIMEOUT) return !dead; - // if we have a routing session and routing path we don't need to switch paths - return dead; - } - - - bool DatagramSession::ShouldSwitchLease() const - { - std::shared_ptr routingPath = nullptr; - std::shared_ptr currentLease = nullptr; - if(m_RoutingSession) - routingPath = m_RoutingSession->GetSharedRoutingPath (); + std::vector send; + auto routingPath = GetSharedRoutingPath(); + // if we don't have a routing path we will drop all queued messages if(routingPath) - currentLease = routingPath->remoteLease; - if(currentLease) // if we have a lease return true if it's about to expire otherwise return false - return currentLease->ExpiresWithin( DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW, DATAGRAM_SESSION_LEASE_HANDOVER_FUDGE ); - // we have no current lease, we should switch - return currentLease == nullptr; - } - - std::shared_ptr DatagramSession::GetNextRoutingPath() - { - std::shared_ptr outboundTunnel = nullptr; - std::shared_ptr routingPath = nullptr; - // get existing routing path if we have one - if(m_RoutingSession) - routingPath = m_RoutingSession->GetSharedRoutingPath(); - // do we have an existing outbound tunnel and routing path? - if(routingPath && routingPath->outboundTunnel) { - // is the outbound tunnel we are using good? - if (routingPath->outboundTunnel->IsEstablished()) + for (const auto & msg : m_SendQueue) { - // ya so let's stick with it - outboundTunnel = routingPath->outboundTunnel; + auto m = m_RoutingSession->WrapSingleMessage(msg); + send.push_back(i2p::tunnel::TunnelMessageBlock{i2p::tunnel::eDeliveryTypeTunnel,routingPath->remoteLease->tunnelGateway, routingPath->remoteLease->tunnelID, m}); } - else - outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(routingPath->outboundTunnel); // no so we'll switch outbound tunnels + routingPath->outboundTunnel->SendTunnelDataMsg(send); } - // do we have an outbound tunnel that works already ? - if(!outboundTunnel) - outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(); // no, let's get a new outbound tunnel as we probably just started - - if(outboundTunnel) - { - std::shared_ptr lease = nullptr; - // should we switch leases ? - if (ShouldSwitchLease ()) - { - // yes, get next available lease - lease = GetNextLease(); - } - else if (routingPath) - { - if(routingPath->remoteLease) - { - if(routingPath->remoteLease->ExpiresWithin(DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW, DATAGRAM_SESSION_LEASE_HANDOVER_FUDGE)) - lease = GetNextLease(); - else - lease = routingPath->remoteLease; - } - } - else - lease = GetNextLease(); - if(lease) - { - // we have a valid lease to use and an outbound tunnel - // create new routing path - uint32_t now = i2p::util::GetSecondsSinceEpoch(); - routingPath = std::make_shared(i2p::garlic::GarlicRoutingPath{ - outboundTunnel, - lease, - 0, - now, - 0 - }); - } - else // we don't have a new routing path to give - routingPath = nullptr; - } - return routingPath; + m_SendQueue.clear(); + ScheduleFlushSendQueue(); } - void DatagramSession::ResetRoutingPath() + void DatagramSession::ScheduleFlushSendQueue() { - if(m_RoutingSession) - { - auto routingPath = m_RoutingSession->GetSharedRoutingPath(); - if(routingPath && routingPath->remoteLease) // we have a remote lease already specified and a routing path - { - // get outbound tunnel on this path - auto outboundTunnel = routingPath->outboundTunnel; - // is this outbound tunnel there and established - if (outboundTunnel && outboundTunnel->IsEstablished()) - m_InvalidIBGW.push_back(routingPath->remoteLease->tunnelGateway); // yes, let's mark remote lease as dead because the outbound tunnel seems fine - } - // reset the routing path - UpdateRoutingPath(nullptr); - } - } - - std::shared_ptr DatagramSession::GetNextLease() - { - auto now = i2p::util::GetMillisecondsSinceEpoch (); - std::shared_ptr next = nullptr; - if(m_RemoteLeaseSet) - { - std::vector exclude; - for(const auto & ident : m_InvalidIBGW) - exclude.push_back(ident); - // find get all leases that are not in our ban list and are not going to expire within our lease set handover window + fudge - auto leases = m_RemoteLeaseSet->GetNonExpiredLeasesExcluding( [&exclude, now] (const i2p::data::Lease & l) -> bool { - if(exclude.size()) - { - auto end = std::end(exclude); - return std::find_if(exclude.begin(), end, [l, now] ( const i2p::data::IdentHash & ident) -> bool { - return ident == l.tunnelGateway; - }) != end; - } - else - return false; - }); - if(leases.size()) - { - // pick random valid next lease - uint32_t idx = rand() % leases.size(); - next = leases[idx]; - } - else - LogPrint(eLogWarning, "DatagramDestination: no leases to use"); - } - return next; - } - - void DatagramSession::UpdateLeaseSet(std::shared_ptr msg) - { - LogPrint(eLogInfo, "DatagramSession: updating lease set"); - m_LocalDestination->RequestDestination(m_RemoteIdentity, std::bind(&DatagramSession::HandleGotLeaseSet, this, std::placeholders::_1, msg)); - } - - void DatagramSession::HandleGotLeaseSet(std::shared_ptr remoteIdent, std::shared_ptr msg) - { - if(remoteIdent) - { - // update routing session - if(m_RoutingSession) - m_RoutingSession = nullptr; - m_RoutingSession = m_LocalDestination->GetRoutingSession(remoteIdent, true); - // clear invalid IBGW as we have a new lease set - m_InvalidIBGW.clear(); - m_RemoteLeaseSet = remoteIdent; - // update routing path - auto path = GetNextRoutingPath(); - if (path) - UpdateRoutingPath(path); - else - ResetRoutingPath(); - // send the message that was queued if it was provided - if(msg) - HandleSend(msg); - } + boost::posix_time::milliseconds dlt(100); + m_SendQueueTimer.expires_from_now(dlt); + m_SendQueueTimer.async_wait([&](const boost::system::error_code & ec) { if(ec) return; FlushSendQueue(); }); } } } diff --git a/Datagram.h b/Datagram.h index dc63cccb..8891f0cc 100644 --- a/Datagram.h +++ b/Datagram.h @@ -31,29 +31,33 @@ namespace datagram const uint64_t DATAGRAM_SESSION_LEASE_HANDOVER_FUDGE = 1000; // milliseconds minimum time between path switches const uint64_t DATAGRAM_SESSION_PATH_MIN_LIFETIME = 5 * 1000; + // max 64 messages buffered in send queue for each datagram session + const size_t DATAGRAM_SEND_QUEUE_MAX_SIZE = 64; class DatagramSession { public: DatagramSession(i2p::client::ClientDestination * localDestination, - const i2p::data::IdentHash & remoteIdent); + const i2p::data::IdentHash & remoteIdent); + + + /** @brief ack the garlic routing path */ + void Ack(); /** send an i2np message to remote endpoint for this session */ void SendMsg(std::shared_ptr msg); /** get the last time in milliseconds for when we used this datagram session */ uint64_t LastActivity() const { return m_LastUse; } - /** get the last time in milliseconds when we successfully sent data */ - uint64_t LastSuccess() const { return m_LastSuccess; } + struct Info { std::shared_ptr IBGW; std::shared_ptr OBEP; const uint64_t activity; - const uint64_t success; - Info() : IBGW(nullptr), OBEP(nullptr), activity(0), success(0) {} - Info(const uint8_t * ibgw, const uint8_t * obep, const uint64_t a, const uint64_t s) : - activity(a), - success(s) { + + Info() : IBGW(nullptr), OBEP(nullptr), activity(0) {} + Info(const uint8_t * ibgw, const uint8_t * obep, const uint64_t a) : + activity(a) { if(ibgw) IBGW = std::make_shared(ibgw); else IBGW = nullptr; if(obep) OBEP = std::make_shared(obep); @@ -63,44 +67,28 @@ namespace datagram Info GetSessionInfo() const; - private: - /** update our routing path we are using, mark that we have changed paths */ - void UpdateRoutingPath(const std::shared_ptr & path); + void FlushSendQueue(); + void ScheduleFlushSendQueue(); - /** return true if we should switch routing paths because of path lifetime or timeout otherwise false */ - bool ShouldUpdateRoutingPath() const; + void HandleSend(std::shared_ptr msg); - /** return true if we should switch the lease for out routing path otherwise return false */ - bool ShouldSwitchLease() const; - - /** get next usable routing path, try reusing outbound tunnels */ - std::shared_ptr GetNextRoutingPath(); - /** - * mark current routing path as invalid and clear it - * if the outbound tunnel we were using was okay don't use the IBGW in the routing path's lease next time - */ - void ResetRoutingPath(); + std::shared_ptr GetSharedRoutingPath(); + + void HandleLeaseSetUpdated(std::shared_ptr ls); - /** get next usable lease, does not fetch or update if expired or have no lease set */ - std::shared_ptr GetNextLease(); - - void HandleSend(std::shared_ptr msg); - void HandleGotLeaseSet(std::shared_ptr remoteIdent, - std::shared_ptr msg); - void UpdateLeaseSet(std::shared_ptr msg=nullptr); - private: i2p::client::ClientDestination * m_LocalDestination; - i2p::data::IdentHash m_RemoteIdentity; - std::shared_ptr m_RoutingSession; - // Ident hash of IBGW that are invalid - std::vector m_InvalidIBGW; - std::shared_ptr m_RemoteLeaseSet; - uint64_t m_LastUse; - uint64_t m_LastPathChange; - uint64_t m_LastSuccess; + i2p::data::IdentHash m_RemoteIdent; + std::shared_ptr m_RemoteLeaseSet; + std::shared_ptr m_RoutingSession; + std::shared_ptr m_CurrentRemoteLease; + std::shared_ptr m_CurrentOutboundTunnel; + boost::asio::deadline_timer m_SendQueueTimer; + std::vector > m_SendQueue; + uint64_t m_LastUse; + }; const size_t MAX_DATAGRAM_SIZE = 32768; @@ -112,9 +100,9 @@ namespace datagram DatagramDestination (std::shared_ptr owner); - ~DatagramDestination (); + ~DatagramDestination (); - void SendDatagramTo (const uint8_t * payload, size_t len, const i2p::data::IdentHash& ident, uint16_t fromPort = 0, uint16_t toPort = 0); + void SendDatagramTo (const uint8_t * payload, size_t len, const i2p::data::IdentHash & ident, uint16_t fromPort = 0, uint16_t toPort = 0); void HandleDataMessagePayload (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len); void SetReceiver (const Receiver& receiver) { m_Receiver = receiver; }; @@ -130,7 +118,7 @@ namespace datagram private: - std::shared_ptr ObtainSession(const i2p::data::IdentHash & ident); + std::shared_ptr ObtainSession(const i2p::data::IdentHash & ident); std::shared_ptr CreateDataMessage (const uint8_t * payload, size_t len, uint16_t fromPort, uint16_t toPort);