diff --git a/Streaming.cpp b/Streaming.cpp index 0d65452a..f6e8bf41 100644 --- a/Streaming.cpp +++ b/Streaming.cpp @@ -603,6 +603,7 @@ namespace stream Stream * StreamingDestination::CreateNewOutgoingStream (const i2p::data::LeaseSet& remote) { Stream * s = new Stream (m_Service, this, remote); + std::unique_lock l(m_StreamsMutex); m_Streams[s->GetRecvStreamID ()] = s; return s; } @@ -610,6 +611,7 @@ namespace stream Stream * StreamingDestination::CreateNewIncomingStream () { Stream * s = new Stream (m_Service, this); + std::unique_lock l(m_StreamsMutex); m_Streams[s->GetRecvStreamID ()] = s; return s; } @@ -618,6 +620,7 @@ namespace stream { if (stream) { + std::unique_lock l(m_StreamsMutex); m_Streams.erase (stream->GetRecvStreamID ()); delete stream; } @@ -716,6 +719,7 @@ namespace stream StreamingDestination * StreamingDestinations::LoadLocalDestination (const std::string& filename, bool isPublic) { auto localDestination = new StreamingDestination (m_Service, i2p::util::filesystem::GetFullPath (filename), isPublic); + std::unique_lock l(m_DestinationsMutex); m_Destinations[localDestination->GetIdentHash ()] = localDestination; return localDestination; } @@ -723,6 +727,7 @@ namespace stream StreamingDestination * StreamingDestinations::CreateNewLocalDestination (bool isPublic) { auto localDestination = new StreamingDestination (m_Service, isPublic); + std::unique_lock l(m_DestinationsMutex); m_Destinations[localDestination->GetIdentHash ()] = localDestination; return localDestination; } @@ -734,6 +739,7 @@ namespace stream if (it != m_Destinations.end ()) { delete it->second; + std::unique_lock l(m_DestinationsMutex); m_Destinations.erase (it); } } @@ -747,6 +753,7 @@ namespace stream return nullptr; } auto localDestination = new StreamingDestination (m_Service, keys, isPublic); + std::unique_lock l(m_DestinationsMutex); m_Destinations[keys.GetPublic ().GetIdentHash ()] = localDestination; return localDestination; } @@ -760,14 +767,7 @@ namespace stream void StreamingDestinations::DeleteStream (Stream * stream) { if (stream) - { - m_Service.post ( - [=](void) - { - stream->GetLocalDestination ()->DeleteStream (stream); - } - ); - } + stream->GetLocalDestination ()->DeleteStream (stream); } void StreamingDestinations::HandleNextPacket (i2p::data::IdentHash destination, Packet * packet) diff --git a/Streaming.h b/Streaming.h index f8ad815f..ba22fb5f 100644 --- a/Streaming.h +++ b/Streaming.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -171,6 +172,7 @@ namespace stream private: boost::asio::io_service& m_Service; + std::mutex m_StreamsMutex; std::map m_Streams; i2p::data::PrivateKeys m_Keys; uint8_t m_EncryptionPublicKey[256], m_EncryptionPrivateKey[256]; @@ -217,6 +219,7 @@ namespace stream boost::asio::io_service m_Service; boost::asio::io_service::work m_Work; + std::mutex m_DestinationsMutex; std::map m_Destinations; StreamingDestination * m_SharedLocalDestination;