mirror of
https://github.com/yggdrasil-network/yggdrasil-go
synced 2024-11-09 23:20:26 +03:00
Switch connAddr (udp map key) from string to a struct that allocates less. Reduce some other allocations. Use larger channel buffers to reduce backpressure from runtime jitter.
This commit is contained in:
parent
ef1e0c902f
commit
75965b6da4
@ -274,9 +274,14 @@ func (c *Core) DEBUG_getGlobalUDPAddr() net.Addr {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Core) DEBUG_maybeSendUDPKeys(saddr string) {
|
func (c *Core) DEBUG_maybeSendUDPKeys(saddr string) {
|
||||||
addr := connAddr(saddr)
|
udpAddr, err := net.ResolveUDPAddr("udp", saddr)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
var addr connAddr
|
||||||
|
addr.fromUDPAddr(udpAddr)
|
||||||
c.udp.mutex.RLock()
|
c.udp.mutex.RLock()
|
||||||
_, isIn := c.udp.conns[connAddr(addr)]
|
_, isIn := c.udp.conns[addr]
|
||||||
c.udp.mutex.RUnlock()
|
c.udp.mutex.RUnlock()
|
||||||
if !isIn {
|
if !isIn {
|
||||||
c.udp.sendKeys(addr)
|
c.udp.sendKeys(addr)
|
||||||
|
@ -171,10 +171,10 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) {
|
|||||||
if to == nil {
|
if to == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
newTTLSlice := wire_encode_uint64(newTTL)
|
newTTLLen := wire_uint64_len(newTTL)
|
||||||
// This mutates the packet in-place if the length of the TTL changes!
|
// This mutates the packet in-place if the length of the TTL changes!
|
||||||
shift := ttlLen - len(newTTLSlice)
|
shift := ttlLen - newTTLLen
|
||||||
copy(packet[ttlBegin+shift:], newTTLSlice)
|
wire_put_uint64(newTTL, packet[ttlBegin+shift:])
|
||||||
copy(packet[shift:], packet[:pTypeLen])
|
copy(packet[shift:], packet[:pTypeLen])
|
||||||
packet = packet[shift:]
|
packet = packet[shift:]
|
||||||
to.sendPacket(packet)
|
to.sendPacket(packet)
|
||||||
|
@ -40,7 +40,7 @@ type router struct {
|
|||||||
func (r *router) init(core *Core) {
|
func (r *router) init(core *Core) {
|
||||||
r.core = core
|
r.core = core
|
||||||
r.addr = *address_addrForNodeID(&r.core.dht.nodeID)
|
r.addr = *address_addrForNodeID(&r.core.dht.nodeID)
|
||||||
in := make(chan []byte, 1) // TODO something better than this...
|
in := make(chan []byte, 1024) // TODO something better than this...
|
||||||
p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub) //, out, in)
|
p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub) //, out, in)
|
||||||
// TODO set in/out functions on the new peer...
|
// TODO set in/out functions on the new peer...
|
||||||
p.out = func(packet []byte) { in <- packet } // FIXME in theory it blocks...
|
p.out = func(packet []byte) { in <- packet } // FIXME in theory it blocks...
|
||||||
@ -50,8 +50,8 @@ func (r *router) init(core *Core) {
|
|||||||
// TODO attach these to the tun
|
// TODO attach these to the tun
|
||||||
// Maybe that's the core's job...
|
// Maybe that's the core's job...
|
||||||
// It creates tun, creates the router, creates channels, sets them?
|
// It creates tun, creates the router, creates channels, sets them?
|
||||||
recv := make(chan []byte, 1)
|
recv := make(chan []byte, 1024)
|
||||||
send := make(chan []byte, 1)
|
send := make(chan []byte, 1024)
|
||||||
r.recv = recv
|
r.recv = recv
|
||||||
r.send = send
|
r.send = send
|
||||||
r.core.tun.recv = recv
|
r.core.tun.recv = recv
|
||||||
|
@ -164,8 +164,8 @@ func (ss *sessions) createSession(theirPermKey *boxPubKey) *sessionInfo {
|
|||||||
sinfo.myHandle = *newHandle()
|
sinfo.myHandle = *newHandle()
|
||||||
sinfo.theirAddr = *address_addrForNodeID(getNodeID(&sinfo.theirPermPub))
|
sinfo.theirAddr = *address_addrForNodeID(getNodeID(&sinfo.theirPermPub))
|
||||||
sinfo.theirSubnet = *address_subnetForNodeID(getNodeID(&sinfo.theirPermPub))
|
sinfo.theirSubnet = *address_subnetForNodeID(getNodeID(&sinfo.theirPermPub))
|
||||||
sinfo.send = make(chan []byte, 1)
|
sinfo.send = make(chan []byte, 1024)
|
||||||
sinfo.recv = make(chan *wire_trafficPacket, 1)
|
sinfo.recv = make(chan *wire_trafficPacket, 1024)
|
||||||
go sinfo.doWorker()
|
go sinfo.doWorker()
|
||||||
sinfo.time = time.Now()
|
sinfo.time = time.Now()
|
||||||
// Do some cleanup
|
// Do some cleanup
|
||||||
|
@ -21,8 +21,31 @@ type udpInterface struct {
|
|||||||
conns map[connAddr]*connInfo
|
conns map[connAddr]*connInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
type connAddr string // TODO something more efficient, but still a valid map key
|
//type connAddr string // TODO something more efficient, but still a valid map key
|
||||||
|
|
||||||
|
type connAddr struct {
|
||||||
|
ip [16]byte
|
||||||
|
port int
|
||||||
|
zone string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *connAddr) fromUDPAddr(u *net.UDPAddr) {
|
||||||
|
copy(c.ip[:], u.IP.To16())
|
||||||
|
c.port = u.Port
|
||||||
|
c.zone = u.Zone
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *connAddr) toUDPAddr() *net.UDPAddr {
|
||||||
|
var u net.UDPAddr
|
||||||
|
u.IP = make([]byte, 16)
|
||||||
|
copy(u.IP, c.ip[:])
|
||||||
|
u.Port = c.port
|
||||||
|
u.Zone = c.zone
|
||||||
|
return &u
|
||||||
|
}
|
||||||
|
|
||||||
type connInfo struct {
|
type connInfo struct {
|
||||||
|
name string
|
||||||
addr connAddr
|
addr connAddr
|
||||||
peer *peer
|
peer *peer
|
||||||
linkIn chan []byte
|
linkIn chan []byte
|
||||||
@ -54,10 +77,7 @@ func (iface *udpInterface) init(core *Core, addr string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (iface *udpInterface) sendKeys(addr connAddr) {
|
func (iface *udpInterface) sendKeys(addr connAddr) {
|
||||||
udpAddr, err := net.ResolveUDPAddr("udp", string(addr))
|
udpAddr := addr.toUDPAddr()
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
msg := []byte{}
|
msg := []byte{}
|
||||||
msg = udp_encode(msg, 0, 0, 0, nil)
|
msg = udp_encode(msg, 0, 0, 0, nil)
|
||||||
msg = append(msg, iface.core.boxPub[:]...)
|
msg = append(msg, iface.core.boxPub[:]...)
|
||||||
@ -91,7 +111,7 @@ func (iface *udpInterface) startConn(info *connInfo) {
|
|||||||
close(info.linkIn)
|
close(info.linkIn)
|
||||||
close(info.keysIn)
|
close(info.keysIn)
|
||||||
close(info.out)
|
close(info.out)
|
||||||
iface.core.log.Println("Removing peer:", info.addr)
|
iface.core.log.Println("Removing peer:", info.name)
|
||||||
}()
|
}()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@ -135,11 +155,13 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) {
|
|||||||
conn, isIn := iface.conns[addr]
|
conn, isIn := iface.conns[addr]
|
||||||
iface.mutex.RUnlock() // TODO? keep the lock longer?...
|
iface.mutex.RUnlock() // TODO? keep the lock longer?...
|
||||||
if !isIn {
|
if !isIn {
|
||||||
udpAddr, err := net.ResolveUDPAddr("udp", string(addr))
|
udpAddr := addr.toUDPAddr()
|
||||||
if err != nil {
|
themNodeID := getNodeID(&ks.box)
|
||||||
panic(err)
|
themAddr := address_addrForNodeID(themNodeID)
|
||||||
}
|
themAddrString := net.IP(themAddr[:]).String()
|
||||||
|
themString := fmt.Sprintf("%s@%s", themAddrString, udpAddr.String())
|
||||||
conn = &connInfo{
|
conn = &connInfo{
|
||||||
|
name: themString,
|
||||||
addr: connAddr(addr),
|
addr: connAddr(addr),
|
||||||
peer: iface.core.peers.newPeer(&ks.box, &ks.sig),
|
peer: iface.core.peers.newPeer(&ks.box, &ks.sig),
|
||||||
linkIn: make(chan []byte, 1),
|
linkIn: make(chan []byte, 1),
|
||||||
@ -208,10 +230,10 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
//var chunks [][]byte
|
|
||||||
var out []byte
|
var out []byte
|
||||||
|
var chunks [][]byte
|
||||||
for msg := range conn.out {
|
for msg := range conn.out {
|
||||||
var chunks [][]byte
|
chunks = chunks[:0]
|
||||||
bs := msg
|
bs := msg
|
||||||
for len(bs) > udp_chunkSize {
|
for len(bs) > udp_chunkSize {
|
||||||
chunks, bs = append(chunks, bs[:udp_chunkSize]), bs[udp_chunkSize:]
|
chunks, bs = append(chunks, bs[:udp_chunkSize]), bs[udp_chunkSize:]
|
||||||
@ -238,11 +260,7 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) {
|
|||||||
iface.mutex.Lock()
|
iface.mutex.Lock()
|
||||||
iface.conns[addr] = conn
|
iface.conns[addr] = conn
|
||||||
iface.mutex.Unlock()
|
iface.mutex.Unlock()
|
||||||
themNodeID := getNodeID(&ks.box)
|
iface.core.log.Println("Adding peer:", conn.name)
|
||||||
themAddr := address_addrForNodeID(themNodeID)
|
|
||||||
themAddrString := net.IP(themAddr[:]).String()
|
|
||||||
themString := fmt.Sprintf("%s@%s", themAddrString, addr)
|
|
||||||
iface.core.log.Println("Adding peer:", themString)
|
|
||||||
go iface.startConn(conn)
|
go iface.startConn(conn)
|
||||||
go conn.peer.linkLoop(conn.linkIn)
|
go conn.peer.linkLoop(conn.linkIn)
|
||||||
iface.sendKeys(conn.addr)
|
iface.sendKeys(conn.addr)
|
||||||
@ -279,7 +297,8 @@ func (iface *udpInterface) reader() {
|
|||||||
}
|
}
|
||||||
//msg := append(util_getBytes(), bs[:n]...)
|
//msg := append(util_getBytes(), bs[:n]...)
|
||||||
msg := bs[:n]
|
msg := bs[:n]
|
||||||
addr := connAddr(udpAddr.String())
|
var addr connAddr
|
||||||
|
addr.fromUDPAddr(udpAddr)
|
||||||
if udp_isKeys(msg) {
|
if udp_isKeys(msg) {
|
||||||
var them address
|
var them address
|
||||||
copy(them[:], udpAddr.IP.To16())
|
copy(them[:], udpAddr.IP.To16())
|
||||||
|
@ -44,6 +44,14 @@ func wire_put_uint64(elem uint64, out []byte) []byte {
|
|||||||
return append(out, bs...)
|
return append(out, bs...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func wire_uint64_len(elem uint64) int {
|
||||||
|
l := 1
|
||||||
|
for e := elem >> 7; e > 0; e >>= 7 {
|
||||||
|
l++
|
||||||
|
}
|
||||||
|
return l
|
||||||
|
}
|
||||||
|
|
||||||
// Decode uint64 from a []byte slice
|
// Decode uint64 from a []byte slice
|
||||||
// Returns the decoded uint64 and the number of bytes used
|
// Returns the decoded uint64 and the number of bytes used
|
||||||
func wire_decode_uint64(bs []byte) (uint64, int) {
|
func wire_decode_uint64(bs []byte) (uint64, int) {
|
||||||
|
Loading…
Reference in New Issue
Block a user