// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package array

import (
	
	
	
	

	
	
	
	
	
)

func min(,  int) int {
	if  <  {
		return 
	}
	return 
}

type fromJSONCfg struct {
	multiDocument bool
	startOffset   int64
	useNumber     bool
}

type FromJSONOption func(*fromJSONCfg)

func () FromJSONOption {
	return func( *fromJSONCfg) {
		.multiDocument = true
	}
}

// WithStartOffset attempts to start decoding from the reader at the offset
// passed in. If using this option the reader must fulfill the io.ReadSeeker
// interface, or else an error will be returned.
//
// It will call Seek(off, io.SeekStart) on the reader
func ( int64) FromJSONOption {
	return func( *fromJSONCfg) {
		.startOffset = 
	}
}

// WithUseNumber enables the 'UseNumber' option on the json decoder, using
// the json.Number type instead of assuming float64 for numbers. This is critical
// if you have numbers that are larger than what can fit into the 53 bits of
// an IEEE float64 mantissa and want to preserve its value.
func () FromJSONOption {
	return func( *fromJSONCfg) {
		.useNumber = true
	}
}

// FromJSON creates an arrow.Array from a corresponding JSON stream and defined data type. If the types in the
// json do not match the type provided, it will return errors. This is *not* the integration test format
// and should not be used as such. This intended to be used by consumers more similarly to the current exposing of
// the csv reader/writer. It also returns the input offset in the reader where it finished decoding since buffering
// by the decoder could leave the reader's cursor past where the parsing finished if attempting to parse multiple json
// arrays from one stream.
//
// All the Array types implement json.Marshaller and thus can be written to json
// using the json.Marshal function
//
// The JSON provided must be formatted in one of two ways:
//
//	Default: the top level of the json must be a list which matches the type specified exactly
//	Example: `[1, 2, 3, 4, 5]` for any integer type or `[[...], null, [], .....]` for a List type
//				Struct arrays are represented a list of objects: `[{"foo": 1, "bar": "moo"}, {"foo": 5, "bar": "baz"}]`
//
//	Using WithMultipleDocs:
//		If the JSON provided is multiple newline separated json documents, then use this option
//		and each json document will be treated as a single row of the array. This is most useful for record batches
//		and interacting with other processes that use json. For example:
//			`{"col1": 1, "col2": "row1", "col3": ...}\n{"col1": 2, "col2": "row2", "col3": ...}\n.....`
//
// Duration values get formated upon marshalling as a string consisting of their numeric
// value followed by the unit suffix such as "10s" for a value of 10 and unit of Seconds.
// with "ms" for millisecond, "us" for microsecond, and "ns" for nanosecond as the suffixes.
// Unmarshalling duration values is more permissive since it first tries to use Go's
// time.ParseDuration function which means it allows values in the form 3h25m0.3s in addition
// to the same values which are output.
//
// Interval types are marshalled / unmarshalled as follows:
//
//	 MonthInterval is marshalled as an object with the format:
//		 { "months": #}
//	 DayTimeInterval is marshalled using Go's regular marshalling of structs:
//		 { "days": #, "milliseconds": # }
//	 MonthDayNanoInterval values are marshalled the same as DayTime using Go's struct marshalling:
//	  { "months": #, "days": #, "nanoseconds": # }
//
// Times use a format of HH:MM or HH:MM:SS[.zzz] where the fractions of a second cannot
// exceed the precision allowed by the time unit, otherwise unmarshalling will error.
//
// # Dates use YYYY-MM-DD format
//
// Timestamps use RFC3339Nano format except without a timezone, all of the following are valid:
//
//		YYYY-MM-DD
//		YYYY-MM-DD[T]HH
//		YYYY-MM-DD[T]HH:MM
//	 YYYY-MM-DD[T]HH:MM:SS[.zzzzzzzzzz]
//
// The fractions of a second cannot exceed the precision allowed by the timeunit of the datatype.
//
// When processing structs as objects order of keys does not matter, but keys cannot be repeated.
func ( memory.Allocator,  arrow.DataType,  io.Reader,  ...FromJSONOption) ( arrow.Array,  int64,  error) {
	var  fromJSONCfg
	for ,  := range  {
		(&)
	}

	if .startOffset != 0 {
		,  := .(io.ReadSeeker)
		if ! {
			return nil, 0, errors.New("using StartOffset option requires reader to be a ReadSeeker, cannot seek")
		}

		.Seek(.startOffset, io.SeekStart)
	}

	 := NewBuilder(, )
	defer .Release()

	 := json.NewDecoder()
	defer func() {
		if errors.Is(, io.EOF) {
			 = fmt.Errorf("failed parsing json: %w", io.ErrUnexpectedEOF)
		}
	}()

	if .useNumber {
		.UseNumber()
	}

	if !.multiDocument {
		,  := .Token()
		if  != nil {
			return nil, .InputOffset(), 
		}

		if ,  := .(json.Delim); ! ||  != '[' {
			return nil, .InputOffset(), fmt.Errorf("json doc must be an array, found %s", )
		}
	}

	if  = .Unmarshal();  != nil {
		return nil, .InputOffset(), 
	}

	if !.multiDocument {
		// consume the last ']'
		if _,  = .Token();  != nil {
			return nil, .InputOffset(), 
		}
	}

	return .NewArray(), .InputOffset(), nil
}

// RecordToStructArray constructs a struct array from the columns of the record batch
// by referencing them, zero-copy.
func ( arrow.RecordBatch) *Struct {
	 := make([]arrow.ArrayData, .NumCols())
	for ,  := range .Columns() {
		[] = .Data()
	}

	 := NewData(arrow.StructOf(.Schema().Fields()...), int(.NumRows()), []*memory.Buffer{nil}, , 0, 0)
	defer .Release()

	return NewStructData()
}

// RecordFromStructArray is a convenience function for converting a struct array into
// a record batch without copying the data. If the passed in schema is nil, the fields
// of the struct will be used to define the record batch. Otherwise the passed in
// schema will be used to create the record batch. If passed in, the schema must match
// the fields of the struct column.
func ( *Struct,  *arrow.Schema) arrow.RecordBatch {
	if  == nil {
		 = arrow.NewSchema(.DataType().(*arrow.StructType).Fields(), nil)
	}

	return NewRecord(, .fields, int64(.Len()))
}

// RecordFromJSON creates a record batch from JSON data. See array.FromJSON for the details
// of formatting and logic.
//
// A record batch from JSON is equivalent to reading a struct array in from json and then
// converting it to a record batch.
//
// See https://github.com/apache/arrow-go/issues/448 for more details on
// why this isn't a simple wrapper around FromJSON.
func ( memory.Allocator,  *arrow.Schema,  io.Reader,  ...FromJSONOption) (arrow.RecordBatch, int64, error) {
	var  fromJSONCfg
	for ,  := range  {
		(&)
	}

	if .startOffset != 0 {
		,  := .(io.ReadSeeker)
		if ! {
			return nil, 0, errors.New("using StartOffset option requires reader to be a ReadSeeker, cannot seek")
		}
		if ,  := .Seek(.startOffset, io.SeekStart);  != nil {
			return nil, 0, fmt.Errorf("failed to seek to start offset %d: %w", .startOffset, )
		}
	}

	if  == nil {
		 = memory.DefaultAllocator
	}

	 := NewRecordBuilder(, )
	defer .Release()

	 := json.NewDecoder()
	if .useNumber {
		.UseNumber()
	}

	if !.multiDocument {
		,  := .Token()
		if  != nil {
			return nil, .InputOffset(), 
		}
		if ,  := .(json.Delim); ! ||  != '[' {
			return nil, .InputOffset(), fmt.Errorf("json doc must be an array, found %s", )
		}

		for .More() {
			if  := .Decode();  != nil {
				return nil, .InputOffset(), fmt.Errorf("failed to decode json: %w", )
			}
		}

		// consume the last ']'
		if _,  = .Token();  != nil {
			return nil, .InputOffset(), fmt.Errorf("failed to decode json: %w", )
		}

		return .NewRecord(), .InputOffset(), nil
	}

	for {
		 := .Decode()
		if  != nil {
			if errors.Is(, io.EOF) {
				break
			}
			return nil, .InputOffset(), fmt.Errorf("failed to decode json: %w", )
		}
	}

	return .NewRecord(), .InputOffset(), nil
}

// RecordToJSON writes out the given record following the format of each row is a single object
// on a single line of the output.
func ( arrow.RecordBatch,  io.Writer) error {
	 := json.NewEncoder()

	 := .Schema().Fields()

	 := make(map[string]interface{})
	for  := 0; int64() < .NumRows(); ++ {
		for ,  := range .Columns() {
			[[].Name] = .GetOneForMarshal()
		}
		if  := .Encode();  != nil {
			return 
		}
	}
	return nil
}

func ( memory.Allocator,  *arrow.Schema,  []string,  ...FromJSONOption) (arrow.Table, error) {
	 := make([]arrow.RecordBatch, len())
	for ,  := range  {
		, ,  := RecordFromJSON(, , strings.NewReader(), ...)
		if  != nil {
			return nil, 
		}
		defer .Release()
		[] = 
	}
	return NewTableFromRecords(, ), nil
}

func ( memory.Allocator,  arrow.DataType,  hashing.MemoTable,  int) (*Data, error) {
	 := .Size() - 
	 := []*memory.Buffer{nil, nil}

	[1] = memory.NewResizableBuffer()
	defer [1].Release()

	switch tbl := .(type) {
	case hashing.NumericMemoTable:
		 := .TypeTraits().BytesRequired()
		[1].Resize()
		.WriteOutSubset(, [1].Bytes())
	case *hashing.BinaryMemoTable:
		switch .ID() {
		case arrow.BINARY, arrow.STRING:
			 = append(, memory.NewResizableBuffer())
			defer [2].Release()

			[1].Resize(arrow.Int32Traits.BytesRequired( + 1))
			 := arrow.Int32Traits.CastFromBytes([1].Bytes())
			.CopyOffsetsSubset(, )

			 := [len()-1] - [0]
			[2].Resize(int())
			.CopyValuesSubset(, [2].Bytes())
		case arrow.LARGE_BINARY, arrow.LARGE_STRING:
			 = append(, memory.NewResizableBuffer())
			defer [2].Release()

			[1].Resize(arrow.Int64Traits.BytesRequired( + 1))
			 := arrow.Int64Traits.CastFromBytes([1].Bytes())
			.CopyLargeOffsetsSubset(, )

			 := [len()-1] - [0]
			[2].Resize(int())
			.CopyValuesSubset(, [2].Bytes())
		default: // fixed size
			 := int(bitutil.BytesForBits(int64(.(arrow.FixedWidthDataType).BitWidth())))
			[1].Resize( * )
			.CopyFixedWidthValues(, , [1].Bytes())
		}
	default:
		return nil, fmt.Errorf("arrow/array: dictionary unifier unimplemented type: %s", )
	}

	var  int
	if ,  := .GetNull();  &&  >=  {
		[0] = memory.NewResizableBuffer()
		defer [0].Release()
		 = 1
		[0].Resize(int(bitutil.BytesForBits(int64())))
		memory.Set([0].Bytes(), 0xFF)
		bitutil.ClearBit([0].Bytes(), )
	}

	return NewData(, , , nil, , 0), nil
}

func ( memory.Allocator,  *arrow.DictionaryType, ,  string) (arrow.Array, error) {
	, ,  := FromJSON(, .IndexType, strings.NewReader())
	if  != nil {
		return nil, 
	}
	defer .Release()

	, ,  := FromJSON(, .ValueType, strings.NewReader())
	if  != nil {
		return nil, 
	}
	defer .Release()

	return NewDictionaryArray(, , ), nil
}

func ( memory.Allocator,  arrow.DataType,  []string,  ...FromJSONOption) (*arrow.Chunked, error) {
	 := make([]arrow.Array, len())
	defer func() {
		for ,  := range  {
			if  != nil {
				.Release()
			}
		}
	}()

	var  error
	for ,  := range  {
		[], _,  = FromJSON(, , strings.NewReader(), ...)
		if  != nil {
			return nil, 
		}
	}

	return arrow.NewChunked(, ), nil
}

func getMaxBufferLen( arrow.DataType,  int) int {
	 := int(bitutil.BytesForBits(int64()))

	 := func( int) int {
		if  >  {
			return 
		}
		return 
	}

	switch dt := .(type) {
	case *arrow.DictionaryType:
		 = ((.ValueType, ))
		return ((.IndexType, ))
	case *arrow.FixedSizeBinaryType:
		return (.ByteWidth * )
	case arrow.FixedWidthDataType:
		return (int(bitutil.BytesForBits(int64(.BitWidth()))) * )
	case *arrow.StructType:
		for ,  := range .Fields() {
			 = ((.Type, ))
		}
		return 
	case *arrow.SparseUnionType:
		// type codes
		 = ()
		// creates children of the same length of the union
		for ,  := range .Fields() {
			 = ((.Type, ))
		}
		return 
	case *arrow.DenseUnionType:
		// type codes
		 = ()
		// offsets
		 = (arrow.Int32SizeBytes * )
		// create children of length 1
		for ,  := range .Fields() {
			 = ((.Type, 1))
		}
		return 
	case arrow.OffsetsDataType:
		return (.OffsetTypeTraits().BytesRequired( + 1))
	case *arrow.FixedSizeListType:
		return ((.Elem(), int(.Len())*))
	case arrow.ExtensionType:
		return ((.StorageType(), ))
	default:
		panic(fmt.Errorf("arrow/array: arrayofnull not implemented for type %s", ))
	}
}

type nullArrayFactory struct {
	mem memory.Allocator
	dt  arrow.DataType
	len int
	buf *memory.Buffer
}

func ( *nullArrayFactory) () *Data {
	if .buf == nil {
		 := getMaxBufferLen(.dt, .len)
		.buf = memory.NewResizableBuffer(.mem)
		.buf.Resize()
		defer .buf.Release()
	}

	var (
		        = .dt
		      = []*memory.Buffer{memory.SliceBuffer(.buf, 0, int(bitutil.BytesForBits(int64(.len))))}
		 []arrow.ArrayData
		  arrow.ArrayData
	)
	defer [0].Release()

	if ,  := .(arrow.ExtensionType);  {
		 = .StorageType()
	}

	if ,  := .(arrow.NestedType);  {
		 = make([]arrow.ArrayData, .NumFields())
	}

	switch dt := .(type) {
	case *arrow.NullType:
	case *arrow.DictionaryType:
		 = append(, .buf)
		 := MakeArrayOfNull(.mem, .ValueType, 0)
		defer .Release()
		 = .Data()
	case arrow.FixedWidthDataType:
		 = append(, .buf)
	case arrow.BinaryDataType:
		 = append(, .buf, .buf)
	case arrow.OffsetsDataType:
		 = append(, .buf)
		[0] = .createChild(, 0, 0)
		defer [0].Release()
	case *arrow.FixedSizeListType:
		[0] = .createChild(, 0, .len*int(.Len()))
		defer [0].Release()
	case *arrow.StructType:
		for  := range .Fields() {
			[] = .createChild(, , .len)
			defer [].Release()
		}
	case *arrow.RunEndEncodedType:
		 := NewBuilder(.mem, .RunEnds())
		defer .Release()

		switch b := .(type) {
		case *Int16Builder:
			.Append(int16(.len))
		case *Int32Builder:
			.Append(int32(.len))
		case *Int64Builder:
			.Append(int64(.len))
		}

		[0] = .newData()
		defer [0].Release()
		[1] = .createChild(.Encoded(), 1, 1)
		defer [1].Release()
	case arrow.UnionType:
		[0].Release()
		[0] = nil
		 = append(, .buf)
		// buffer is zeroed, but 0 may not be a valid type code
		if .TypeCodes()[0] != 0 {
			[1] = memory.NewResizableBuffer(.mem)
			[1].Resize(.len)
			defer [1].Release()
			memory.Set([1].Bytes(), byte(.TypeCodes()[0]))
		}

		// for sparse unions we create children with the same length
		 := .len
		if .Mode() == arrow.DenseMode {
			// for dense unions, offsets are all 0 and make children
			// with length 1
			 = append(, .buf)
			 = 1
		}
		for  := range .Fields() {
			[] = .createChild(, , )
			defer [].Release()
		}
	}

	 := NewData(.dt, .len, , , .len, 0)
	if  != nil {
		.SetDictionary()
	}
	return 
}

func ( *nullArrayFactory) ( arrow.DataType, ,  int) *Data {
	 := &nullArrayFactory{
		mem: .mem, dt: .dt.(arrow.NestedType).Fields()[].Type,
		len: , buf: .buf}
	return .create()
}

// MakeArrayOfNull creates an array of size length which is all null of the given data type.
func ( memory.Allocator,  arrow.DataType,  int) arrow.Array {
	if .ID() == arrow.NULL {
		return NewNull()
	}

	 := (&nullArrayFactory{mem: , dt: , len: }).create()
	defer .Release()
	return MakeFromData()
}

func stripNulls( string) string {
	return strings.TrimRight(, "\x00")
}