package relay

import (
	
	
	
	
	
	
	

	
	
	
	
	
	pbv2 
	
	

	logging 
	pool 
	ma 
	manet 
)

const (
	ServiceName = "libp2p.relay/v2"

	ReservationTagWeight = 10

	StreamTimeout    = time.Minute
	ConnectTimeout   = 30 * time.Second
	HandshakeTimeout = time.Minute

	relayHopTag      = "relay-v2-hop"
	relayHopTagValue = 2

	maxMessageSize = 4096
)

var log = logging.Logger("relay")

// Relay is the (limited) relay service object.
type Relay struct {
	ctx    context.Context
	cancel func()

	host        host.Host
	rc          Resources
	acl         ACLFilter
	constraints *constraints
	scope       network.ResourceScopeSpan
	notifiee    network.Notifiee

	mx     sync.Mutex
	rsvp   map[peer.ID]time.Time
	conns  map[peer.ID]int
	closed bool

	selfAddr ma.Multiaddr

	metricsTracer MetricsTracer
}

// New constructs a new limited relay that can provide relay services in the given host.
func ( host.Host,  ...Option) (*Relay, error) {
	,  := context.WithCancel(context.Background())

	 := &Relay{
		ctx:    ,
		cancel: ,
		host:   ,
		rc:     DefaultResources(),
		acl:    nil,
		rsvp:   make(map[peer.ID]time.Time),
		conns:  make(map[peer.ID]int),
	}

	for ,  := range  {
		 := ()
		if  != nil {
			return nil, fmt.Errorf("error applying relay option: %w", )
		}
	}

	// get a scope for memory reservations at service level
	 := .Network().ResourceManager().ViewService(ServiceName,
		func( network.ServiceScope) error {
			var  error
			.scope,  = .BeginSpan()
			return 
		})
	if  != nil {
		return nil, 
	}

	.constraints = newConstraints(&.rc)
	.selfAddr = ma.StringCast(fmt.Sprintf("/p2p/%s", .ID()))

	.SetStreamHandler(proto.ProtoIDv2Hop, .handleStream)
	.notifiee = &network.NotifyBundle{DisconnectedF: .disconnected}
	.Network().Notify(.notifiee)

	if .metricsTracer != nil {
		.metricsTracer.RelayStatus(true)
	}
	go .background()

	return , nil
}

func ( *Relay) () error {
	.mx.Lock()
	if !.closed {
		.closed = true
		.mx.Unlock()

		.host.RemoveStreamHandler(proto.ProtoIDv2Hop)
		.host.Network().StopNotify(.notifiee)
		defer .scope.Done()
		.cancel()
		.gc()
		if .metricsTracer != nil {
			.metricsTracer.RelayStatus(false)
		}
		return nil
	}
	.mx.Unlock()
	return nil
}

func ( *Relay) ( network.Stream) {
	log.Infof("new relay stream from: %s", .Conn().RemotePeer())

	if  := .Scope().SetService(ServiceName);  != nil {
		log.Debugf("error attaching stream to relay service: %s", )
		.Reset()
		return
	}

	if  := .Scope().ReserveMemory(maxMessageSize, network.ReservationPriorityAlways);  != nil {
		log.Debugf("error reserving memory for stream: %s", )
		.Reset()
		return
	}
	defer .Scope().ReleaseMemory(maxMessageSize)

	 := util.NewDelimitedReader(, maxMessageSize)
	defer .Close()

	.SetReadDeadline(time.Now().Add(StreamTimeout))

	var  pbv2.HopMessage

	 := .ReadMsg(&)
	if  != nil {
		.handleError(, pbv2.Status_MALFORMED_MESSAGE)
		return
	}
	// reset stream deadline as message has been read
	.SetReadDeadline(time.Time{})
	switch .GetType() {
	case pbv2.HopMessage_RESERVE:
		 := .handleReserve()
		if .metricsTracer != nil {
			.metricsTracer.ReservationRequestHandled()
		}
	case pbv2.HopMessage_CONNECT:
		 := .handleConnect(, &)
		if .metricsTracer != nil {
			.metricsTracer.ConnectionRequestHandled()
		}
	default:
		.handleError(, pbv2.Status_MALFORMED_MESSAGE)
	}
}

func ( *Relay) ( network.Stream) pbv2.Status {
	defer .Close()
	 := .Conn().RemotePeer()
	 := .Conn().RemoteMultiaddr()

	if isRelayAddr() {
		log.Debugf("refusing relay reservation for %s; reservation attempt over relay connection")
		.handleError(, pbv2.Status_PERMISSION_DENIED)
		return pbv2.Status_PERMISSION_DENIED
	}

	if .acl != nil && !.acl.AllowReserve(, ) {
		log.Debugf("refusing relay reservation for %s; permission denied", )
		.handleError(, pbv2.Status_PERMISSION_DENIED)
		return pbv2.Status_PERMISSION_DENIED
	}

	.mx.Lock()
	// Check if relay is still active. Otherwise ConnManager.UnTagPeer will not be called if this block runs after
	// Close() call
	if .closed {
		.mx.Unlock()
		log.Debugf("refusing relay reservation for %s; relay closed", )
		.handleError(, pbv2.Status_PERMISSION_DENIED)
		return pbv2.Status_PERMISSION_DENIED
	}
	 := time.Now()
	 := .Add(.rc.ReservationTTL)

	,  := .rsvp[]
	if  := .constraints.Reserve(, , );  != nil {
		.mx.Unlock()
		log.Debugf("refusing relay reservation for %s; IP constraint violation: %s", , )
		.handleError(, pbv2.Status_RESERVATION_REFUSED)
		return pbv2.Status_RESERVATION_REFUSED
	}

	.rsvp[] = 
	.host.ConnManager().TagPeer(, "relay-reservation", ReservationTagWeight)
	.mx.Unlock()
	if .metricsTracer != nil {
		.metricsTracer.ReservationAllowed()
	}

	log.Debugf("reserving relay slot for %s", )

	// Delivery of the reservation might fail for a number of reasons.
	// For example, the stream might be reset or the connection might be closed before the reservation is received.
	// In that case, the reservation will just be garbage collected later.
	 := makeReservationMsg(
		.host.Peerstore().PrivKey(.host.ID()),
		.host.ID(),
		.host.Addrs(),
		,
		)
	if  := .writeResponse(, pbv2.Status_OK, , .makeLimitMsg());  != nil {
		log.Debugf("error writing reservation response; retracting reservation for %s", )
		.Reset()
		return pbv2.Status_CONNECTION_FAILED
	}
	return pbv2.Status_OK
}

func ( *Relay) ( network.Stream,  *pbv2.HopMessage) pbv2.Status {
	 := .Conn().RemotePeer()
	 := .Conn().RemoteMultiaddr()

	,  := .scope.BeginSpan()
	if  != nil {
		log.Debugf("failed to begin relay transaction: %s", )
		.handleError(, pbv2.Status_RESOURCE_LIMIT_EXCEEDED)
		return pbv2.Status_RESOURCE_LIMIT_EXCEEDED
	}

	 := func( pbv2.Status) {
		.Done()
		.handleError(, )
	}

	// reserve buffers for the relay
	if  := .ReserveMemory(2*.rc.BufferSize, network.ReservationPriorityHigh);  != nil {
		log.Debugf("error reserving memory for relay: %s", )
		(pbv2.Status_RESOURCE_LIMIT_EXCEEDED)
		return pbv2.Status_RESOURCE_LIMIT_EXCEEDED
	}

	if isRelayAddr() {
		log.Debugf("refusing connection from %s; connection attempt over relay connection")
		(pbv2.Status_PERMISSION_DENIED)
		return pbv2.Status_PERMISSION_DENIED
	}

	,  := util.PeerToPeerInfoV2(.GetPeer())
	if  != nil {
		(pbv2.Status_MALFORMED_MESSAGE)
		return pbv2.Status_MALFORMED_MESSAGE
	}

	if .acl != nil && !.acl.AllowConnect(, .Conn().RemoteMultiaddr(), .ID) {
		log.Debugf("refusing connection from %s to %s; permission denied", , .ID)
		(pbv2.Status_PERMISSION_DENIED)
		return pbv2.Status_PERMISSION_DENIED
	}

	.mx.Lock()
	,  := .rsvp[.ID]
	if ! {
		.mx.Unlock()
		log.Debugf("refusing connection from %s to %s; no reservation", , .ID)
		(pbv2.Status_NO_RESERVATION)
		return pbv2.Status_NO_RESERVATION
	}

	 := .conns[]
	if  >= .rc.MaxCircuits {
		.mx.Unlock()
		log.Debugf("refusing connection from %s to %s; too many connections from %s", , .ID, )
		(pbv2.Status_RESOURCE_LIMIT_EXCEEDED)
		return pbv2.Status_RESOURCE_LIMIT_EXCEEDED
	}

	 := .conns[.ID]
	if  >= .rc.MaxCircuits {
		.mx.Unlock()
		log.Debugf("refusing connection from %s to %s; too many connections to %s", , .ID, .ID)
		(pbv2.Status_RESOURCE_LIMIT_EXCEEDED)
		return pbv2.Status_RESOURCE_LIMIT_EXCEEDED
	}

	.addConn()
	.addConn(.ID)
	.mx.Unlock()

	if .metricsTracer != nil {
		.metricsTracer.ConnectionOpened()
	}
	 := time.Now()

	 := func() {
		defer .Done()
		.mx.Lock()
		.rmConn()
		.rmConn(.ID)
		.mx.Unlock()
		if .metricsTracer != nil {
			.metricsTracer.ConnectionClosed(time.Since())
		}
	}

	,  := context.WithTimeout(.ctx, ConnectTimeout)
	defer ()

	 = network.WithNoDial(, "relay connect")

	,  := .host.NewStream(, .ID, proto.ProtoIDv2Stop)
	if  != nil {
		log.Debugf("error opening relay stream to %s: %s", .ID, )
		()
		.handleError(, pbv2.Status_CONNECTION_FAILED)
		return pbv2.Status_CONNECTION_FAILED
	}

	 = func( pbv2.Status) {
		.Reset()
		()
		.handleError(, )
	}

	if  := .Scope().SetService(ServiceName);  != nil {
		log.Debugf("error attaching stream to relay service: %s", )
		(pbv2.Status_RESOURCE_LIMIT_EXCEEDED)
		return pbv2.Status_RESOURCE_LIMIT_EXCEEDED
	}

	// handshake
	if  := .Scope().ReserveMemory(maxMessageSize, network.ReservationPriorityAlways);  != nil {
		log.Debugf("error reserving memory for stream: %s", )
		(pbv2.Status_RESOURCE_LIMIT_EXCEEDED)
		return pbv2.Status_RESOURCE_LIMIT_EXCEEDED
	}
	defer .Scope().ReleaseMemory(maxMessageSize)

	 := util.NewDelimitedReader(, maxMessageSize)
	 := util.NewDelimitedWriter()
	defer .Close()

	var  pbv2.StopMessage
	.Type = pbv2.StopMessage_CONNECT.Enum()
	.Peer = util.PeerInfoToPeerV2(peer.AddrInfo{ID: })
	.Limit = .makeLimitMsg(.ID)

	.SetDeadline(time.Now().Add(HandshakeTimeout))

	 = .WriteMsg(&)
	if  != nil {
		log.Debugf("error writing stop handshake")
		(pbv2.Status_CONNECTION_FAILED)
		return pbv2.Status_CONNECTION_FAILED
	}

	.Reset()

	 = .ReadMsg(&)
	if  != nil {
		log.Debugf("error reading stop response: %s", .Error())
		(pbv2.Status_CONNECTION_FAILED)
		return pbv2.Status_CONNECTION_FAILED
	}

	if  := .GetType();  != pbv2.StopMessage_STATUS {
		log.Debugf("unexpected stop response; not a status message (%d)", )
		(pbv2.Status_CONNECTION_FAILED)
		return pbv2.Status_CONNECTION_FAILED
	}

	if  := .GetStatus();  != pbv2.Status_OK {
		log.Debugf("relay stop failure: %d", )
		(pbv2.Status_CONNECTION_FAILED)
		return pbv2.Status_CONNECTION_FAILED
	}

	var  pbv2.HopMessage
	.Type = pbv2.HopMessage_STATUS.Enum()
	.Status = pbv2.Status_OK.Enum()
	.Limit = .makeLimitMsg(.ID)

	 = util.NewDelimitedWriter()
	 = .WriteMsg(&)
	if  != nil {
		log.Debugf("error writing relay response: %s", )
		.Reset()
		.Reset()
		()
		return pbv2.Status_CONNECTION_FAILED
	}

	// reset deadline
	.SetDeadline(time.Time{})

	log.Infof("relaying connection from %s to %s", , .ID)

	var  atomic.Int32
	.Store(2)

	 := func() {
		if .Add(-1) == 0 {
			.Close()
			.Close()
			()
		}
	}

	if .rc.Limit != nil {
		 := time.Now().Add(.rc.Limit.Duration)
		.SetDeadline()
		.SetDeadline()
		go .relayLimited(, , , .ID, .rc.Limit.Data, )
		go .relayLimited(, , .ID, , .rc.Limit.Data, )
	} else {
		go .relayUnlimited(, , , .ID, )
		go .relayUnlimited(, , .ID, , )
	}

	return pbv2.Status_OK
}

func ( *Relay) ( peer.ID) {
	 := .conns[]
	++
	.conns[] = 
	if  == 1 {
		.host.ConnManager().TagPeer(, relayHopTag, relayHopTagValue)
	}
}

func ( *Relay) ( peer.ID) {
	 := .conns[]
	--
	if  > 0 {
		.conns[] = 
	} else {
		delete(.conns, )
		.host.ConnManager().UntagPeer(, relayHopTag)
	}
}

func ( *Relay) (,  network.Stream, ,  peer.ID,  int64,  func()) {
	defer ()

	 := pool.Get(.rc.BufferSize)
	defer pool.Put()

	 := io.LimitReader(, )

	,  := .copyWithBuffer(, , )
	if  != nil {
		log.Debugf("relay copy error: %s", )
		// Reset both.
		.Reset()
		.Reset()
	} else {
		// propagate the close
		.CloseWrite()
		if  ==  {
			// we've reached the limit, discard further input
			.CloseRead()
		}
	}

	log.Debugf("relayed %d bytes from %s to %s", , , )
}

func ( *Relay) (,  network.Stream, ,  peer.ID,  func()) {
	defer ()

	 := pool.Get(.rc.BufferSize)
	defer pool.Put()

	,  := .copyWithBuffer(, , )
	if  != nil {
		log.Debugf("relay copy error: %s", )
		// Reset both.
		.Reset()
		.Reset()
	} else {
		// propagate the close
		.CloseWrite()
	}

	log.Debugf("relayed %d bytes from %s to %s", , , )
}

// errInvalidWrite means that a write returned an impossible count.
// copied from io.errInvalidWrite
var errInvalidWrite = errors.New("invalid write result")

// copyWithBuffer copies from src to dst using the provided buf until either EOF is reached
// on src or an error occurs. It reports the number of bytes transferred to metricsTracer.
// The implementation is a modified form of io.CopyBuffer to support metrics tracking.
func ( *Relay) ( io.Writer,  io.Reader,  []byte) ( int64,  error) {
	for {
		,  := .Read()
		if  > 0 {
			,  := .Write([0:])
			if  < 0 ||  <  {
				 = 0
				if  == nil {
					 = errInvalidWrite
				}
			}
			 += int64()
			if  != nil {
				 = 
				break
			}
			if  !=  {
				 = io.ErrShortWrite
				break
			}
			if .metricsTracer != nil {
				.metricsTracer.BytesTransferred()
			}
		}
		if  != nil {
			if  != io.EOF {
				 = 
			}
			break
		}
	}
	return , 
}

func ( *Relay) ( network.Stream,  pbv2.Status) {
	log.Debugf("relay error: %s (%d)", pbv2.Status_name[int32()], )
	 := .writeResponse(, , nil, nil)
	if  != nil {
		.Reset()
		log.Debugf("error writing relay response: %s", .Error())
	} else {
		.Close()
	}
}

func ( *Relay) ( network.Stream,  pbv2.Status,  *pbv2.Reservation,  *pbv2.Limit) error {
	.SetWriteDeadline(time.Now().Add(StreamTimeout))
	defer .SetWriteDeadline(time.Time{})
	 := util.NewDelimitedWriter()

	var  pbv2.HopMessage
	.Type = pbv2.HopMessage_STATUS.Enum()
	.Status = .Enum()
	.Reservation = 
	.Limit = 

	return .WriteMsg(&)
}

func makeReservationMsg(
	 crypto.PrivKey,
	 peer.ID,
	 []ma.Multiaddr,
	 peer.ID,
	 time.Time,
) *pbv2.Reservation {
	 := uint64(.Unix())

	 := &pbv2.Reservation{Expire: &}

	,  := ma.NewComponent("p2p", .String())
	if  != nil {
		log.Errorf("error creating p2p component: %s", )
		return 
	}

	 := make([][]byte, 0, len())
	for ,  := range  {
		if !manet.IsPublicAddr() {
			continue
		}

		,  := peer.IDFromP2PAddr()
		switch {
		case  == "":
			// No ID, we'll add one to the address
			 = .Encapsulate()
		case  == :
		// This address already has our ID in it.
		// Do nothing
		case  != :
			// This address has a different ID in it. Skip it.
			log.Warnf("skipping address %s: contains an unexpected ID", )
			continue
		}
		 = append(, .Bytes())
	}

	.Addrs = 

	 := &proto.ReservationVoucher{
		Relay:      ,
		Peer:       ,
		Expiration: ,
	}

	,  := record.Seal(, )
	if  != nil {
		log.Errorf("error sealing voucher for %s: %s", , )
		return 
	}

	,  := .Marshal()
	if  != nil {
		log.Errorf("error marshalling voucher for %s: %s", , )
		return 
	}

	.Voucher = 

	return 
}

func ( *Relay) ( peer.ID) *pbv2.Limit {
	if .rc.Limit == nil {
		return nil
	}

	 := uint32(.rc.Limit.Duration / time.Second)
	 := uint64(.rc.Limit.Data)

	return &pbv2.Limit{
		Duration: &,
		Data:     &,
	}
}

func ( *Relay) () {
	 := time.NewTicker(time.Minute)
	defer .Stop()

	for {
		select {
		case <-.C:
			.gc()
		case <-.ctx.Done():
			return
		}
	}
}

func ( *Relay) () {
	.mx.Lock()
	defer .mx.Unlock()

	 := time.Now()
	 := 0
	for ,  := range .rsvp {
		if .closed || .Before() {
			delete(.rsvp, )
			.host.ConnManager().UntagPeer(, "relay-reservation")
			++
		}
	}
	if .metricsTracer != nil {
		.metricsTracer.ReservationClosed()
	}

	for ,  := range .conns {
		if  == 0 {
			delete(.conns, )
		}
	}
}

func ( *Relay) ( network.Network,  network.Conn) {
	 := .RemotePeer()
	if .Connectedness() == network.Connected {
		return
	}

	.mx.Lock()
	,  := .rsvp[]
	if  {
		delete(.rsvp, )
	}
	.constraints.cleanupPeer()
	.mx.Unlock()

	if  && .metricsTracer != nil {
		.metricsTracer.ReservationClosed(1)
	}
}

func isRelayAddr( ma.Multiaddr) bool {
	,  := .ValueForProtocol(ma.P_CIRCUIT)
	return  == nil
}