minor fixes to peer stuff

This commit is contained in:
Arceliar 2019-08-24 14:24:42 -05:00
parent 0539dee900
commit b337228aa4
3 changed files with 38 additions and 27 deletions

View File

@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"net" "net"
"sort" "sort"
"sync/atomic"
"time" "time"
"github.com/gologme/log" "github.com/gologme/log"
@ -106,15 +105,18 @@ func (c *Core) GetPeers() []Peer {
sort.Slice(ps, func(i, j int) bool { return ps[i] < ps[j] }) sort.Slice(ps, func(i, j int) bool { return ps[i] < ps[j] })
for _, port := range ps { for _, port := range ps {
p := ports[port] p := ports[port]
info := Peer{ var info Peer
<-p.SyncExec(func() {
info = Peer{
Endpoint: p.intf.name, Endpoint: p.intf.name,
BytesSent: atomic.LoadUint64(&p.bytesSent), BytesSent: p.bytesSent,
BytesRecvd: atomic.LoadUint64(&p.bytesRecvd), BytesRecvd: p.bytesRecvd,
Protocol: p.intf.info.linkType, Protocol: p.intf.info.linkType,
Port: uint64(port), Port: uint64(port),
Uptime: time.Since(p.firstSeen), Uptime: time.Since(p.firstSeen),
} }
copy(info.PublicKey[:], p.box[:]) copy(info.PublicKey[:], p.box[:])
})
peers = append(peers, info) peers = append(peers, info)
} }
return peers return peers
@ -135,15 +137,18 @@ func (c *Core) GetSwitchPeers() []SwitchPeer {
continue continue
} }
coords := elem.locator.getCoords() coords := elem.locator.getCoords()
info := SwitchPeer{ var info SwitchPeer
<-peer.SyncExec(func() {
info = SwitchPeer{
Coords: append([]uint64{}, wire_coordsBytestoUint64s(coords)...), Coords: append([]uint64{}, wire_coordsBytestoUint64s(coords)...),
BytesSent: atomic.LoadUint64(&peer.bytesSent), BytesSent: peer.bytesSent,
BytesRecvd: atomic.LoadUint64(&peer.bytesRecvd), BytesRecvd: peer.bytesRecvd,
Port: uint64(elem.port), Port: uint64(elem.port),
Protocol: peer.intf.info.linkType, Protocol: peer.intf.info.linkType,
Endpoint: peer.intf.info.remote, Endpoint: peer.intf.info.remote,
} }
copy(info.PublicKey[:], peer.box[:]) copy(info.PublicKey[:], peer.box[:])
})
switchpeers = append(switchpeers, info) switchpeers = append(switchpeers, info)
} }
return switchpeers return switchpeers

View File

@ -576,7 +576,7 @@ func DEBUG_simLinkPeers(p, q *peer) {
default: default:
} }
if len(packets) > 0 { if len(packets) > 0 {
dest.handlePacket(packets[0]) <-dest.SyncExec(func() { dest._handlePacket(packets[0]) })
packets = packets[1:] packets = packets[1:]
continue continue
} }

View File

@ -96,9 +96,6 @@ func (ps *peers) putPorts(ports map[switchPort]*peer) {
// Information known about a peer, including thier box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic // Information known about a peer, including thier box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic
type peer struct { type peer struct {
bytesSent uint64 // To track bandwidth usage for getPeers
bytesRecvd uint64 // To track bandwidth usage for getPeers
// BUG: sync/atomic, 32 bit platforms need the above to be the first element
phony.Actor phony.Actor
core *Core core *Core
intf *linkInterface intf *linkInterface
@ -114,6 +111,9 @@ type peer struct {
out func([][]byte) // Set up by whatever created the peers struct, used to send packets to other nodes out func([][]byte) // Set up by whatever created the peers struct, used to send packets to other nodes
done (chan struct{}) // closed to exit the linkLoop done (chan struct{}) // closed to exit the linkLoop
close func() // Called when a peer is removed, to close the underlying connection, or via admin api close func() // Called when a peer is removed, to close the underlying connection, or via admin api
// The below aren't actually useful internally, they're just gathered for getPeers statistics
bytesSent uint64
bytesRecvd uint64
} }
// Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number. // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number.
@ -217,7 +217,7 @@ func (p *peer) handlePacketFrom(from phony.IActor, packet []byte) {
// Passes the packet to a handler for that packet type. // Passes the packet to a handler for that packet type.
func (p *peer) _handlePacket(packet []byte) { func (p *peer) _handlePacket(packet []byte) {
// FIXME this is off by stream padding and msg length overhead, should be done in tcp.go // FIXME this is off by stream padding and msg length overhead, should be done in tcp.go
atomic.AddUint64(&p.bytesRecvd, uint64(len(packet))) p.bytesRecvd += uint64(len(packet))
pType, pTypeLen := wire_decode_uint64(packet) pType, pTypeLen := wire_decode_uint64(packet)
if pTypeLen == 0 { if pTypeLen == 0 {
return return
@ -259,10 +259,12 @@ func (p *peer) _sendPackets(packets [][]byte) {
for _, packet := range packets { for _, packet := range packets {
size += len(packet) size += len(packet)
} }
atomic.AddUint64(&p.bytesSent, uint64(size)) p.bytesSent += uint64(size)
p.out(packets) p.out(packets)
} }
var peerLinkOutHelper phony.Actor
// This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers. // This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers.
// It sends it to p.linkOut, which bypasses the usual packet queues. // It sends it to p.linkOut, which bypasses the usual packet queues.
func (p *peer) _sendLinkPacket(packet []byte) { func (p *peer) _sendLinkPacket(packet []byte) {
@ -279,8 +281,12 @@ func (p *peer) _sendLinkPacket(packet []byte) {
} }
packet = linkPacket.encode() packet = linkPacket.encode()
// TODO replace this with a message send if/when the link becomes an actor // TODO replace this with a message send if/when the link becomes an actor
// FIXME not 100% sure the channel send version is deadlock-free... peerLinkOutHelper.EnqueueFrom(nil, func() {
p.linkOut <- packet select {
case p.linkOut <- packet:
case <-p.done:
}
})
} }
// Decrypts the outer (permanent) and inner (ephemeral) crypto layers on link traffic. // Decrypts the outer (permanent) and inner (ephemeral) crypto layers on link traffic.