yggdrasil-go/src/yggdrasil/peer.go

356 lines
11 KiB
Go
Raw Normal View History

2017-12-29 07:16:20 +03:00
package yggdrasil
// TODO cleanup, this file is kind of a mess
// Commented code should be removed
// Live code should be better commented
2017-12-29 07:16:20 +03:00
2018-06-13 01:50:08 +03:00
import (
"sync"
"sync/atomic"
"time"
)
2017-12-29 07:16:20 +03:00
// The peers struct represents peers with an active connection.
// Incomping packets are passed to the corresponding peer, which handles them somehow.
// In most cases, this involves passing the packet to the handler for outgoing traffic to another peer.
// In other cases, it's link protocol traffic used to build the spanning tree, in which case this checks signatures and passes the message along to the switch.
2017-12-29 07:16:20 +03:00
type peers struct {
2018-06-13 01:50:08 +03:00
core *Core
mutex sync.Mutex // Synchronize writes to atomic
ports atomic.Value //map[switchPort]*peer, use CoW semantics
authMutex sync.RWMutex
allowedEncryptionPublicKeys map[boxPubKey]struct{}
2017-12-29 07:16:20 +03:00
}
// Initializes the peers struct.
2017-12-29 07:16:20 +03:00
func (ps *peers) init(c *Core) {
2018-01-05 01:37:51 +03:00
ps.mutex.Lock()
defer ps.mutex.Unlock()
ps.putPorts(make(map[switchPort]*peer))
ps.core = c
ps.allowedEncryptionPublicKeys = make(map[boxPubKey]struct{})
}
// Returns true if an incoming peer connection to a key is allowed, either because the key is in the whitelist or because the whitelist is empty.
func (ps *peers) isAllowedEncryptionPublicKey(box *boxPubKey) bool {
2018-05-07 03:01:52 +03:00
ps.authMutex.RLock()
defer ps.authMutex.RUnlock()
_, isIn := ps.allowedEncryptionPublicKeys[*box]
return isIn || len(ps.allowedEncryptionPublicKeys) == 0
2017-12-29 07:16:20 +03:00
}
// Adds a key to the whitelist.
func (ps *peers) addAllowedEncryptionPublicKey(box *boxPubKey) {
2018-05-07 03:01:52 +03:00
ps.authMutex.Lock()
defer ps.authMutex.Unlock()
ps.allowedEncryptionPublicKeys[*box] = struct{}{}
2018-05-07 03:01:52 +03:00
}
// Removes a key from the whitelist.
func (ps *peers) removeAllowedEncryptionPublicKey(box *boxPubKey) {
2018-05-07 03:01:52 +03:00
ps.authMutex.Lock()
defer ps.authMutex.Unlock()
delete(ps.allowedEncryptionPublicKeys, *box)
2018-05-07 03:01:52 +03:00
}
// Gets the whitelist of allowed keys for incoming connections.
func (ps *peers) getAllowedEncryptionPublicKeys() []boxPubKey {
2018-05-07 03:01:52 +03:00
ps.authMutex.RLock()
defer ps.authMutex.RUnlock()
keys := make([]boxPubKey, 0, len(ps.allowedEncryptionPublicKeys))
for key := range ps.allowedEncryptionPublicKeys {
2018-05-07 03:01:52 +03:00
keys = append(keys, key)
}
return keys
}
// Atomically gets a map[switchPort]*peer of known peers.
2017-12-29 07:16:20 +03:00
func (ps *peers) getPorts() map[switchPort]*peer {
2018-01-05 01:37:51 +03:00
return ps.ports.Load().(map[switchPort]*peer)
2017-12-29 07:16:20 +03:00
}
// Stores a map[switchPort]*peer (note that you should take a mutex before store operations to avoid conflicts with other nodes attempting to read/change/store at the same time).
2017-12-29 07:16:20 +03:00
func (ps *peers) putPorts(ports map[switchPort]*peer) {
2018-01-05 01:37:51 +03:00
ps.ports.Store(ports)
2017-12-29 07:16:20 +03:00
}
// Information known about a peer, including thier box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic
2017-12-29 07:16:20 +03:00
type peer struct {
bytesSent uint64 // To track bandwidth usage for getPeers
bytesRecvd uint64 // To track bandwidth usage for getPeers
2018-01-05 01:37:51 +03:00
// BUG: sync/atomic, 32 bit platforms need the above to be the first element
core *Core
port switchPort
box boxPubKey
sig sigPubKey
shared boxSharedKey
linkShared boxSharedKey
endpoint string
metadata metadata
firstSeen time.Time // To track uptime for getPeers
linkOut (chan []byte) // used for protocol traffic (to bypass queues)
doSend (chan struct{}) // tell the linkLoop to send a switchMsg
dinfo *dhtInfo // used to keep the DHT working
out func([]byte) // Set up by whatever created the peers struct, used to send packets to other nodes
close func() // Called when a peer is removed, to close the underlying connection, or via admin api
2017-12-29 07:16:20 +03:00
}
2018-01-05 01:37:51 +03:00
// Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unocupied port number.
func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey, linkShared *boxSharedKey, endpoint string, metadata metadata) *peer {
now := time.Now()
2018-01-05 01:37:51 +03:00
p := peer{box: *box,
sig: *sig,
shared: *getSharedKey(&ps.core.boxPriv, box),
linkShared: *linkShared,
endpoint: endpoint,
metadata: metadata,
firstSeen: now,
doSend: make(chan struct{}, 1),
core: ps.core}
2018-01-05 01:37:51 +03:00
ps.mutex.Lock()
defer ps.mutex.Unlock()
oldPorts := ps.getPorts()
newPorts := make(map[switchPort]*peer)
for k, v := range oldPorts {
newPorts[k] = v
}
for idx := switchPort(0); true; idx++ {
if _, isIn := newPorts[idx]; !isIn {
p.port = switchPort(idx)
newPorts[p.port] = &p
break
}
}
ps.putPorts(newPorts)
return &p
2017-12-29 07:16:20 +03:00
}
// Removes a peer for a given port, if one exists.
func (ps *peers) removePeer(port switchPort) {
if port == 0 {
return
} // Can't remove self peer
ps.core.router.doAdmin(func() {
ps.core.switchTable.forgetPeer(port)
})
ps.mutex.Lock()
oldPorts := ps.getPorts()
p, isIn := oldPorts[port]
newPorts := make(map[switchPort]*peer)
for k, v := range oldPorts {
newPorts[k] = v
}
delete(newPorts, port)
ps.putPorts(newPorts)
ps.mutex.Unlock()
if isIn {
if p.close != nil {
p.close()
}
close(p.doSend)
}
}
// If called, sends a notification to each peer that they should send a new switch message.
// Mainly called by the switch after an update.
func (ps *peers) sendSwitchMsgs() {
ports := ps.getPorts()
for _, p := range ports {
if p.port == 0 {
continue
}
p.doSendSwitchMsgs()
}
}
// If called, sends a notification to the peer's linkLoop to trigger a switchMsg send.
// Mainly called by sendSwitchMsgs or during linkLoop startup.
func (p *peer) doSendSwitchMsgs() {
defer func() { recover() }() // In case there's a race with close(p.doSend)
select {
case p.doSend <- struct{}{}:
default:
}
}
// This must be launched in a separate goroutine by whatever sets up the peer struct.
// It handles link protocol traffic.
func (p *peer) linkLoop() {
2018-06-07 18:58:24 +03:00
tick := time.NewTicker(time.Second)
defer tick.Stop()
for {
select {
case _, ok := <-p.doSend:
if !ok {
return
}
p.sendSwitchMsg()
case _ = <-tick.C:
2018-10-22 01:40:43 +03:00
//break // FIXME disabled the below completely to test something
2018-10-21 08:05:04 +03:00
pdinfo := p.dinfo // FIXME this is a bad workarond NPE on the next line
if pdinfo != nil {
dinfo := *pdinfo
p.core.dht.peers <- &dinfo
2018-06-07 18:58:24 +03:00
}
}
2018-01-05 01:37:51 +03:00
}
2017-12-29 07:16:20 +03:00
}
// Called to handle incoming packets.
// Passes the packet to a handler for that packet type.
func (p *peer) handlePacket(packet []byte) {
2018-06-13 01:50:08 +03:00
// FIXME this is off by stream padding and msg length overhead, should be done in tcp.go
atomic.AddUint64(&p.bytesRecvd, uint64(len(packet)))
2018-01-05 01:37:51 +03:00
pType, pTypeLen := wire_decode_uint64(packet)
if pTypeLen == 0 {
return
}
switch pType {
case wire_Traffic:
p.handleTraffic(packet, pTypeLen)
case wire_ProtocolTraffic:
p.handleTraffic(packet, pTypeLen)
case wire_LinkProtocolTraffic:
p.handleLinkTraffic(packet)
default:
util_putBytes(packet)
2018-01-05 01:37:51 +03:00
}
2017-12-29 07:16:20 +03:00
}
// Called to handle traffic or protocolTraffic packets.
// In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node.
2017-12-29 07:16:20 +03:00
func (p *peer) handleTraffic(packet []byte, pTypeLen int) {
2018-06-07 18:58:24 +03:00
if p.port != 0 && p.dinfo == nil {
// Drop traffic until the peer manages to send us at least one good switchMsg
return
}
p.core.switchTable.packetIn <- packet
2017-12-29 07:16:20 +03:00
}
// This just calls p.out(packet) for now.
2017-12-29 07:16:20 +03:00
func (p *peer) sendPacket(packet []byte) {
2018-01-05 01:37:51 +03:00
// Is there ever a case where something more complicated is needed?
// What if p.out blocks?
p.out(packet)
2017-12-29 07:16:20 +03:00
}
// This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers.
// It sends it to p.linkOut, which bypasses the usual packet queues.
2017-12-29 07:16:20 +03:00
func (p *peer) sendLinkPacket(packet []byte) {
innerPayload, innerNonce := boxSeal(&p.linkShared, packet, nil)
innerLinkPacket := wire_linkProtoTrafficPacket{
Nonce: *innerNonce,
Payload: innerPayload,
}
outerPayload := innerLinkPacket.encode()
bs, nonce := boxSeal(&p.shared, outerPayload, nil)
2018-01-05 01:37:51 +03:00
linkPacket := wire_linkProtoTrafficPacket{
2018-06-02 23:21:05 +03:00
Nonce: *nonce,
Payload: bs,
2018-01-05 01:37:51 +03:00
}
packet = linkPacket.encode()
p.linkOut <- packet
2017-12-29 07:16:20 +03:00
}
// Decrypts the outer (permanent) and inner (ephemeral) crypto layers on link traffic.
// Identifies the link traffic type and calls the appropriate handler.
2017-12-29 07:16:20 +03:00
func (p *peer) handleLinkTraffic(bs []byte) {
2018-01-05 01:37:51 +03:00
packet := wire_linkProtoTrafficPacket{}
if !packet.decode(bs) {
return
}
outerPayload, isOK := boxOpen(&p.shared, packet.Payload, &packet.Nonce)
if !isOK {
return
}
innerPacket := wire_linkProtoTrafficPacket{}
if !innerPacket.decode(outerPayload) {
return
}
payload, isOK := boxOpen(&p.linkShared, innerPacket.Payload, &innerPacket.Nonce)
2018-01-05 01:37:51 +03:00
if !isOK {
return
}
pType, pTypeLen := wire_decode_uint64(payload)
if pTypeLen == 0 {
return
}
switch pType {
case wire_SwitchMsg:
p.handleSwitchMsg(payload)
default:
util_putBytes(bs)
2018-01-05 01:37:51 +03:00
}
}
// Gets a switchMsg from the switch, adds signed next-hop info for this peer, and sends it to them.
func (p *peer) sendSwitchMsg() {
msg := p.core.switchTable.getMsg()
if msg == nil {
return
}
bs := getBytesForSig(&p.sig, msg)
msg.Hops = append(msg.Hops, switchMsgHop{
Port: p.port,
Next: p.sig,
Sig: *sign(&p.core.sigPriv, bs),
})
packet := msg.encode()
p.sendLinkPacket(packet)
}
// Handles a switchMsg from the peer, checking signatures and passing good messages to the switch.
// Also creates a dhtInfo struct and arranges for it to be added to the dht (this is how dht bootstrapping begins).
func (p *peer) handleSwitchMsg(packet []byte) {
var msg switchMsg
if !msg.decode(packet) {
return
}
if len(msg.Hops) < 1 {
2018-06-07 22:24:02 +03:00
p.core.peers.removePeer(p.port)
}
var loc switchLocator
prevKey := msg.Root
for idx, hop := range msg.Hops {
// Check signatures and collect coords for dht
sigMsg := msg
sigMsg.Hops = msg.Hops[:idx]
loc.coords = append(loc.coords, hop.Port)
bs := getBytesForSig(&hop.Next, &sigMsg)
if !verify(&prevKey, bs, &hop.Sig) {
2018-06-07 22:24:02 +03:00
p.core.peers.removePeer(p.port)
}
prevKey = hop.Next
}
p.core.switchTable.handleMsg(&msg, p.port)
if !p.core.switchTable.checkRoot(&msg) {
// Bad switch message
// Stop forwarding traffic from it
// Stop refreshing it in the DHT
p.dinfo = nil
return
}
// Pass a mesage to the dht informing it that this peer (still) exists
loc.coords = loc.coords[:len(loc.coords)-1]
2018-01-05 01:37:51 +03:00
dinfo := dhtInfo{
key: p.box,
coords: loc.getCoords(),
2018-01-05 01:37:51 +03:00
}
//p.core.dht.peers <- &dinfo
2018-06-07 18:58:24 +03:00
p.dinfo = &dinfo
2017-12-29 07:16:20 +03:00
}
// This generates the bytes that we sign or check the signature of for a switchMsg.
// It begins with the next node's key, followed by the root and the timetsamp, followed by coords being advertised to the next node.
func getBytesForSig(next *sigPubKey, msg *switchMsg) []byte {
var loc switchLocator
for _, hop := range msg.Hops {
loc.coords = append(loc.coords, hop.Port)
}
2018-01-05 01:37:51 +03:00
bs := append([]byte(nil), next[:]...)
bs = append(bs, msg.Root[:]...)
bs = append(bs, wire_encode_uint64(wire_intToUint(msg.TStamp))...)
bs = append(bs, wire_encode_coords(loc.getCoords())...)
2018-01-05 01:37:51 +03:00
return bs
2017-12-29 07:16:20 +03:00
}