// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package udp

import (
	
	
	
	
	
	

	
	
)

// BatchWriter represents conn can write messages in batch
type BatchWriter interface {
	WriteBatch(ms []ipv4.Message, flags int) (int, error)
}

// BatchReader represents conn can read messages in batch
type BatchReader interface {
	ReadBatch(msg []ipv4.Message, flags int) (int, error)
}

// BatchPacketConn represents conn can read/write messages in batch
type BatchPacketConn interface {
	BatchWriter
	BatchReader
	io.Closer
}

// BatchConn uses ipv4/v6.NewPacketConn to wrap a net.PacketConn to write/read messages in batch,
// only available in linux. In other platform, it will use single Write/Read as same as net.Conn.
type BatchConn struct {
	net.PacketConn

	batchConn BatchPacketConn

	batchWriteMutex    sync.Mutex
	batchWriteMessages []ipv4.Message
	batchWritePos      int
	batchWriteLast     time.Time

	batchWriteSize     int
	batchWriteInterval time.Duration

	closed int32
}

// NewBatchConn creates a *BatchConn from net.PacketConn with batch configs.
func ( net.PacketConn,  int,  time.Duration) *BatchConn {
	 := &BatchConn{
		PacketConn:         ,
		batchWriteLast:     time.Now(),
		batchWriteInterval: ,
		batchWriteSize:     ,
		batchWriteMessages: make([]ipv4.Message, ),
	}
	for  := range .batchWriteMessages {
		.batchWriteMessages[].Buffers = [][]byte{make([]byte, sendMTU)}
	}

	// batch write only supports linux
	if runtime.GOOS == "linux" {
		if  := ipv4.NewPacketConn();  != nil {
			.batchConn = 
		} else if  := ipv6.NewPacketConn();  != nil {
			.batchConn = 
		}
	}

	if .batchConn != nil {
		go func() {
			 := time.NewTicker( / 2)
			defer .Stop()
			for atomic.LoadInt32(&.closed) != 1 {
				<-.C
				.batchWriteMutex.Lock()
				if .batchWritePos > 0 && time.Since(.batchWriteLast) >= .batchWriteInterval {
					_ = .flush()
				}
				.batchWriteMutex.Unlock()
			}
		}()
	}

	return 
}

// Close batchConn and the underlying PacketConn
func ( *BatchConn) () error {
	atomic.StoreInt32(&.closed, 1)
	.batchWriteMutex.Lock()
	if .batchWritePos > 0 {
		_ = .flush()
	}
	.batchWriteMutex.Unlock()
	if .batchConn != nil {
		return .batchConn.Close()
	}
	return .PacketConn.Close()
}

// WriteTo write message to an UDPAddr, addr should be nil if it is a connected socket.
func ( *BatchConn) ( []byte,  net.Addr) (int, error) {
	if .batchConn == nil {
		return .PacketConn.WriteTo(, )
	}
	return .enqueueMessage(, )
}

func ( *BatchConn) ( []byte,  net.Addr) (int, error) {
	var  error
	.batchWriteMutex.Lock()
	defer .batchWriteMutex.Unlock()

	 := &.batchWriteMessages[.batchWritePos]
	// reset buffers
	.Buffers = .Buffers[:1]
	.Buffers[0] = .Buffers[0][:cap(.Buffers[0])]

	.batchWritePos++
	if  != nil {
		.Addr = 
	}
	if  := copy(.Buffers[0], );  < len() {
		 := make([]byte, len()-)
		copy(, [:])
		.Buffers = append(.Buffers, )
	} else {
		.Buffers[0] = .Buffers[0][:]
	}
	if .batchWritePos == .batchWriteSize {
		 = .flush()
	}
	return len(), 
}

// ReadBatch reads messages in batch, return length of message readed and error.
func ( *BatchConn) ( []ipv4.Message,  int) (int, error) {
	if .batchConn == nil {
		, ,  := .PacketConn.ReadFrom([0].Buffers[0])
		if  == nil {
			[0].N = 
			[0].Addr = 
			return 1, nil
		}
		return 0, 
	}
	return .batchConn.ReadBatch(, )
}

func ( *BatchConn) () error {
	var  error
	var  int
	for  < .batchWritePos {
		,  := .batchConn.WriteBatch(.batchWriteMessages[:.batchWritePos], 0)
		if  != nil {
			 = 
			break
		}
		 += 
	}
	.batchWritePos = 0
	.batchWriteLast = time.Now()
	return 
}