// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils

import (
	
	
	
	
	
)

type Reader interface {
	io.ReadSeeker
	io.ReaderAt
}

// byteReader is a wrapper for bytes.NewReader
type byteReader struct {
	r   *bytes.Reader
	buf []byte
	pos int
}

// NewByteReader creates a new ByteReader instance from the given byte slice.
// It wraps the bytes.NewReader function to implement BufferedReader interface.
func ( []byte) *byteReader {
	 := bytes.NewReader()
	return &byteReader{
		,
		,
		0,
	}
}

func ( *byteReader) ( []byte) ( int,  error) {
	,  = .r.Read()
	.pos += 
	return
}

func ( *byteReader) ( int64,  int) ( int64,  error) {
	,  = .r.Seek(, )
	.pos = int()
	return
}

func ( *byteReader) ( []byte,  int64) ( int,  error) {
	return .r.ReadAt(, )
}

func ( *byteReader) ( int) ([]byte, error) {
	if  < 0 {
		return nil, fmt.Errorf("arrow/bytereader: %w", bufio.ErrNegativeCount)
	}
	 := len(.buf) - .pos
	 := min(, )
	var  error
	if  <  {
		 = io.EOF
	}
	return .buf[.pos : .pos+], 
}

func ( *byteReader) ( int) (int, error) {
	if  < 0 {
		return 0, fmt.Errorf("arrow/bytereader: %w", bufio.ErrNegativeCount)
	}

	var (
		    error
		 = .pos + 
	)

	if  >= len(.buf) {
		 = len(.buf)
		 =  - .pos
		 = io.EOF
	}

	,  := .Seek(int64(), io.SeekCurrent)
	if  != nil {
		return , 
	}
	return , 
}

// Outer returns the byteReader itself, since it has already implemented Reader interface.
func ( *byteReader) () Reader {
	return 
}

func ( *byteReader) (Reader) {}

func ( *byteReader) () int { return len(.buf) }

// bufferedReader is similar to bufio.Reader except
// it will expand the buffer if necessary when asked to Peek
// more bytes than are in the buffer
type bufferedReader struct {
	bufferSz int
	buf      []byte
	r, w     int
	rd       Reader
	err      error
}

// NewBufferedReader returns a buffered reader with similar semantics to bufio.Reader
// except Peek will expand the internal buffer if needed rather than return
// an error.
func ( Reader,  int) *bufferedReader {
	 := &bufferedReader{
		rd: ,
	}
	.resizeBuffer()
	return 
}

func ( *bufferedReader) () Reader { return .rd }

func ( *bufferedReader) ( Reader) {
	.resetBuffer()
	.rd = 
	.r, .w = 0, 0
}

func ( *bufferedReader) () {
	if .buf == nil {
		.buf = make([]byte, .bufferSz)
	} else if .bufferSz > cap(.buf) {
		 := .buf
		.buf = make([]byte, .bufferSz)
		copy(.buf, )
	} else {
		.buf = .buf[:.bufferSz]
	}
}

func ( *bufferedReader) ( int) {
	.bufferSz = 
	.resetBuffer()
}

func ( *bufferedReader) () error {
	// slide existing data to the beginning
	if .r > 0 {
		copy(.buf, .buf[.r:.w])
		.w -= .r
		.r = 0
	}

	if .w >= len(.buf) {
		return fmt.Errorf("arrow/bufferedreader: %w", bufio.ErrBufferFull)
	}

	,  := io.ReadAtLeast(.rd, .buf[.w:], 1)
	if  < 0 {
		return fmt.Errorf("arrow/bufferedreader: filling buffer: %w", bufio.ErrNegativeCount)
	}

	.w += 
	.err = 
	return nil
}

func ( *bufferedReader) () error {
	 := .err
	.err = nil
	return 
}

func ( *bufferedReader) () int { return .bufferSz }

// Buffered returns the number of bytes currently buffered
func ( *bufferedReader) () int { return .w - .r }

// SetBufferSize resets the size of the internal buffer to the desired size.
// Will return an error if newSize is <= 0 or if newSize is less than the size
// of the buffered data.
func ( *bufferedReader) ( int) error {
	if  <= 0 {
		return errors.New("buffer size should be positive")
	}

	if .w >=  {
		return errors.New("cannot shrink read buffer if buffered data remains")
	}

	.resizeBuffer()
	return nil
}

// Peek will buffer and return n bytes from the underlying reader without advancing
// the reader itself. If n is larger than the current buffer size, the buffer will
// be expanded to accommodate the extra bytes rather than error.
func ( *bufferedReader) ( int) ([]byte, error) {
	if  < 0 {
		return nil, fmt.Errorf("arrow/bufferedreader: %w", bufio.ErrNegativeCount)
	}

	if  > len(.buf) {
		if  := .SetBufferSize();  != nil {
			return nil, 
		}
	}

	for .w-.r <  && .w-.r < len(.buf) && .err == nil {
		.fill() // b.w-b.r < len(b.buf) => buffer is not full
	}

	return .buf[.r : .r+], .readErr()
}

// Discard skips the next n bytes either by advancing the internal buffer
// or by reading that many bytes in and throwing them away.
func ( *bufferedReader) ( int) ( int,  error) {
	if  < 0 {
		return 0, fmt.Errorf("arrow/bufferedreader: %w", bufio.ErrNegativeCount)
	}

	if  == 0 {
		return
	}

	 := 
	for {
		 := .Buffered()
		if  == 0 {
			.fill()
			 = .Buffered()
		}
		if  >  {
			 = 
		}
		.r += 
		 -= 
		if  == 0 {
			return , nil
		}
		if .err != nil {
			return  - , .readErr()
		}
	}
}

func ( *bufferedReader) ( []byte) ( int,  error) {
	 = len()
	if  == 0 {
		if .Buffered() > 0 {
			return 0, nil
		}
		return 0, .readErr()
	}

	if .r == .w {
		if .err != nil {
			return 0, .readErr()
		}
		if len() >= len(.buf) {
			// large read, empty buffer
			// read directly into p to avoid extra copy
			, .err = .rd.Read()
			if  < 0 {
				return , fmt.Errorf("arrow/bufferedreader: %w", bufio.ErrNegativeCount)
			}
			return , .readErr()
		}

		// one read
		// don't use b.fill
		.r, .w = 0, 0
		, .err = .rd.Read(.buf)
		if  < 0 {
			return , fmt.Errorf("arrow/bufferedreader: %w", bufio.ErrNegativeCount)
		}
		if  == 0 {
			return 0, .readErr()
		}
		.w += 
	}

	// copy as much as we can
	 = copy(, .buf[.r:.w])
	.r += 
	return , nil
}