fixed race condition

This commit is contained in:
orignal 2014-10-13 12:33:51 -04:00
parent a0f43d9772
commit b74f3a1ee1
2 changed files with 20 additions and 12 deletions

View File

@ -12,7 +12,7 @@ namespace i2p
namespace stream namespace stream
{ {
StreamingDestination::StreamingDestination (bool isPublic): StreamingDestination::StreamingDestination (bool isPublic):
m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service), m_IsRunning (false), m_Thread (nullptr), m_Service (nullptr), m_Work (nullptr),
m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic) m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic)
{ {
m_Keys = i2p::data::PrivateKeys::CreateRandomKeys (/*i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256*/); // uncomment for ECDSA m_Keys = i2p::data::PrivateKeys::CreateRandomKeys (/*i2p::data::SIGNING_KEY_TYPE_ECDSA_SHA256_P256*/); // uncomment for ECDSA
@ -24,7 +24,7 @@ namespace stream
} }
StreamingDestination::StreamingDestination (const std::string& fullPath, bool isPublic): StreamingDestination::StreamingDestination (const std::string& fullPath, bool isPublic):
m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service), m_IsRunning (false), m_Thread (nullptr), m_Service (nullptr), m_Work (nullptr),
m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic) m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic)
{ {
std::ifstream s(fullPath.c_str (), std::ifstream::binary); std::ifstream s(fullPath.c_str (), std::ifstream::binary);
@ -59,7 +59,7 @@ namespace stream
} }
StreamingDestination::StreamingDestination (const i2p::data::PrivateKeys& keys, bool isPublic): StreamingDestination::StreamingDestination (const i2p::data::PrivateKeys& keys, bool isPublic):
m_IsRunning (false), m_Thread (nullptr), m_Work (m_Service), m_IsRunning (false), m_Thread (nullptr), m_Service (nullptr), m_Work (nullptr),
m_Keys (keys), m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic) m_Keys (keys), m_CurrentOutboundTunnel (nullptr), m_LeaseSet (nullptr), m_IsPublic (isPublic)
{ {
CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg); CryptoPP::DH dh (i2p::crypto::elgp, i2p::crypto::elgg);
@ -77,21 +77,28 @@ namespace stream
if (m_Pool) if (m_Pool)
i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool); i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool);
delete m_LeaseSet; delete m_LeaseSet;
delete m_Work;
delete m_Service;
} }
void StreamingDestination::Run () void StreamingDestination::Run ()
{ {
m_Service.run (); if (m_Service)
m_Service->run ();
} }
void StreamingDestination::Start () void StreamingDestination::Start ()
{ {
m_Service = new boost::asio::io_service;
m_Work = new boost::asio::io_service::work (*m_Service);
m_Pool->SetActive (true);
m_IsRunning = true; m_IsRunning = true;
m_Thread = new std::thread (std::bind (&StreamingDestination::Run, this)); m_Thread = new std::thread (std::bind (&StreamingDestination::Run, this));
} }
void StreamingDestination::Stop () void StreamingDestination::Stop ()
{ {
ResetAcceptor ();
{ {
std::unique_lock<std::mutex> l(m_StreamsMutex); std::unique_lock<std::mutex> l(m_StreamsMutex);
for (auto it: m_Streams) for (auto it: m_Streams)
@ -101,14 +108,15 @@ namespace stream
if (m_Pool) if (m_Pool)
i2p::tunnel::tunnels.StopTunnelPool (m_Pool); i2p::tunnel::tunnels.StopTunnelPool (m_Pool);
m_IsRunning = false; m_IsRunning = false;
m_Service.stop (); m_Service->stop ();
if (m_Thread) if (m_Thread)
{ {
m_Thread->join (); m_Thread->join ();
delete m_Thread; delete m_Thread;
m_Thread = 0; m_Thread = 0;
} }
m_Service.reset (); delete m_Work; m_Work = nullptr;
delete m_Service; m_Service = nullptr;
} }
void StreamingDestination::SendTunnelDataMsgs (const std::vector<i2p::tunnel::TunnelMessageBlock>& msgs) void StreamingDestination::SendTunnelDataMsgs (const std::vector<i2p::tunnel::TunnelMessageBlock>& msgs)
@ -154,7 +162,7 @@ namespace stream
Stream * StreamingDestination::CreateNewOutgoingStream (const i2p::data::LeaseSet& remote) Stream * StreamingDestination::CreateNewOutgoingStream (const i2p::data::LeaseSet& remote)
{ {
Stream * s = new Stream (m_Service, *this, remote); Stream * s = new Stream (*m_Service, *this, remote);
std::unique_lock<std::mutex> l(m_StreamsMutex); std::unique_lock<std::mutex> l(m_StreamsMutex);
m_Streams[s->GetRecvStreamID ()] = s; m_Streams[s->GetRecvStreamID ()] = s;
return s; return s;
@ -162,7 +170,7 @@ namespace stream
Stream * StreamingDestination::CreateNewIncomingStream () Stream * StreamingDestination::CreateNewIncomingStream ()
{ {
Stream * s = new Stream (m_Service, *this); Stream * s = new Stream (*m_Service, *this);
std::unique_lock<std::mutex> l(m_StreamsMutex); std::unique_lock<std::mutex> l(m_StreamsMutex);
m_Streams[s->GetRecvStreamID ()] = s; m_Streams[s->GetRecvStreamID ()] = s;
return s; return s;
@ -266,12 +274,12 @@ namespace stream
void StreamingDestination::ProcessGarlicMessage (I2NPMessage * msg) void StreamingDestination::ProcessGarlicMessage (I2NPMessage * msg)
{ {
m_Service.post (boost::bind (&StreamingDestination::HandleGarlicMessage, this, msg)); m_Service->post (boost::bind (&StreamingDestination::HandleGarlicMessage, this, msg));
} }
void StreamingDestination::ProcessDeliveryStatusMessage (I2NPMessage * msg) void StreamingDestination::ProcessDeliveryStatusMessage (I2NPMessage * msg)
{ {
m_Service.post (boost::bind (&StreamingDestination::HandleDeliveryStatusMessage, this, msg)); m_Service->post (boost::bind (&StreamingDestination::HandleDeliveryStatusMessage, this, msg));
} }
void StreamingDestination::HandleI2NPMessage (const uint8_t * buf, size_t len) void StreamingDestination::HandleI2NPMessage (const uint8_t * buf, size_t len)

View File

@ -67,8 +67,8 @@ namespace stream
bool m_IsRunning; bool m_IsRunning;
std::thread * m_Thread; std::thread * m_Thread;
boost::asio::io_service m_Service; boost::asio::io_service * m_Service;
boost::asio::io_service::work m_Work; boost::asio::io_service::work * m_Work;
std::map<i2p::data::IdentHash, i2p::data::LeaseSet *> m_RemoteLeaseSets; std::map<i2p::data::IdentHash, i2p::data::LeaseSet *> m_RemoteLeaseSets;
std::mutex m_StreamsMutex; std::mutex m_StreamsMutex;