package pqarrow

import (
	
	
	
	
	

	
	
	
	

	
)

func ( scalar.Scalar) (parquet.Value, error) {
	switch s := .(type) {
	case *scalar.String:
		return parquet.ValueOf(string(.Data())), nil
	case *scalar.Int64:
		return parquet.ValueOf(.Value), nil
	case *scalar.Int32:
		return parquet.ValueOf(.Value), nil
	case *scalar.Uint64:
		return parquet.ValueOf(.Value), nil
	case *scalar.FixedSizeBinary:
		 := .Type.(*arrow.FixedSizeBinaryType).ByteWidth
		 := [16]byte{}
		copy([:], .Data())
		return parquet.ValueOf(), nil
	case *scalar.Boolean:
		return parquet.ValueOf(.Value), nil
	case *scalar.Null:
		return parquet.NullValue(), nil
	case nil:
		return parquet.Value{}, nil
	default:
		return parquet.Value{}, fmt.Errorf("unsupported scalar type %T", )
	}
}

// singlePassThroughWriter is used to keep a reference to the rows written to
// a parquet writer when only converting a single row. Calling WriteRows more
// than once is unsupported.
type singlePassThroughWriter struct {
	rows []parquet.Row
}

var _ dynparquet.ParquetWriter = (*singlePassThroughWriter)(nil)

func ( *singlePassThroughWriter) () *parquet.Schema { return nil }

func ( *singlePassThroughWriter) ( []any) (int, error) { panic("use WriteRows instead") }

func ( *singlePassThroughWriter) ( []parquet.Row) (int, error) {
	if .rows != nil {
		panic("cannot call WriteRows more than once")
	}
	.rows = 
	return len(), nil
}

func ( *singlePassThroughWriter) () error { return nil }

func ( *singlePassThroughWriter) () error { return nil }

func ( *singlePassThroughWriter) ( io.Writer) {}

// RecordToRow converts an arrow record with dynamic columns into a row using a dynamic parquet schema.
func ( *parquet.Schema,  arrow.Record,  int) (parquet.Row, error) {
	 := &singlePassThroughWriter{}
	if  := recordToRows(, , , +1, .Fields());  != nil {
		return nil, 
	}
	return .rows[0], nil
}

// recordToRows converts a full arrow record to parquet rows which are written
// to the parquet writer.
// The caller should use recordStart=0 and recordEnd=record.NumRows() to convert
// the entire record. Alternatively, the caller may only convert a subset of
// rows by specifying a range of [recordStart, recordEnd).
func recordToRows( dynparquet.ParquetWriter,  arrow.Record, ,  int,  []parquet.Field) error {
	 :=  - 
	 := .Schema()
	 := make(parquet.Row, len())
	 := make([]arrowToParquet, len())
	for  := range  {
		 := []
		 := .Name()
		 := 0
		if .Optional() {
			 = 1
		}
		 := .FieldIndices()
		if len() == 0 {
			[] = writeNull()
			continue
		}
		 := .Column([0])
		switch a := .(type) {
		case *array.List:
			,  := writeList(, , , )
			if  != nil {
				return 
			}
			[] = 
		case *array.Dictionary:
			[] = writeDictionary(, , , )
		case *array.Int32:
			[] = writeInt32(, , , )
		case *array.Uint64:
			[] = writeUint64(, , , )
		case *array.Int64:
			[] = writeInt64(, , , )
		case *array.String:
			[] = writeString(, , , )
		case *array.Binary:
			[] = writeBinary(, , , )
		default:
			[] = writeGeneral(, , , )
		}
	}
	 := make([]parquet.Row, 1)
	for  := 0;  < ; ++ {
		 = [:0]
		for  := range  {
			 = [](, )
		}
		[0] = 
		,  := .WriteRows()
		if  != nil {
			return 
		}
	}
	return nil
}

func writeGeneral(, ,  int,  arrow.Array) arrowToParquet {
	return func( parquet.Row,  int) parquet.Row {
		if .IsNull( + ) {
			return append(,
				parquet.Value{}.Level(0, 0, ),
			)
		}
		return append(,
			parquet.ValueOf(.GetOneForMarshal(+)).Level(0, , ),
		)
	}
}

func writeUint64(, ,  int,  *array.Uint64) arrowToParquet {
	return func( parquet.Row,  int) parquet.Row {
		if .IsNull( + ) {
			return append(,
				parquet.Value{}.Level(0, 0, ),
			)
		}
		return append(,
			parquet.Int64Value(int64(.Value(+))).Level(0, , ),
		)
	}
}

func writeInt64(, ,  int,  *array.Int64) arrowToParquet {
	return func( parquet.Row,  int) parquet.Row {
		if .IsNull( + ) {
			return append(,
				parquet.Value{}.Level(0, 0, ),
			)
		}
		return append(,
			parquet.Int64Value(.Value(+)).Level(0, , ),
		)
	}
}

func writeInt32(, ,  int,  *array.Int32) arrowToParquet {
	return func( parquet.Row,  int) parquet.Row {
		if .IsNull( + ) {
			return append(,
				parquet.Value{}.Level(0, 0, ),
			)
		}
		return append(,
			parquet.Int32Value(.Value(+)).Level(0, , ),
		)
	}
}

func writeBinary(, ,  int,  *array.Binary) arrowToParquet {
	return func( parquet.Row,  int) parquet.Row {
		if .IsNull( + ) {
			return append(,
				parquet.Value{}.Level(0, 0, ),
			)
		}
		return append(,
			parquet.ByteArrayValue(.Value(+)).Level(0, , ),
		)
	}
}

func writeString(, ,  int,  *array.String) arrowToParquet {
	return func( parquet.Row,  int) parquet.Row {
		if .IsNull( + ) {
			return append(,
				parquet.Value{}.Level(0, 0, ),
			)
		}
		return append(,
			parquet.ByteArrayValue([]byte(.Value(+))).Level(0, , ),
		)
	}
}

func writeDictionary(, ,  int,  *array.Dictionary) arrowToParquet {
	 := func( int) parquet.Value {
		return parquet.ValueOf(.GetOneForMarshal())
	}
	if ,  := .Dictionary().(*array.Binary);  {
		 = func( int) parquet.Value {
			return parquet.ByteArrayValue(
				.Value(.GetValueIndex()),
			)
		}
	}
	return func( parquet.Row,  int) parquet.Row {
		if .IsNull( + ) {
			return append(,
				parquet.Value{}.Level(0, 0, ),
			)
		}
		return append(,
			(+).Level(0, , ),
		)
	}
}

func writeList(, ,  int,  *array.List) (arrowToParquet, error) {
	var  arrowToParquet
	switch e := .ListValues().(type) {
	case *array.Int32:
		// WHile this is not base type. To avoid breaking things I have left it here.
		 = writeListOf(, , , , func( int) parquet.Value {
			return parquet.Int32Value(.Value())
		})
	case *array.Int64:
		 = writeListOf(, , , , func( int) parquet.Value {
			return parquet.Int64Value(.Value())
		})
	case *array.Boolean:
		 = writeListOf(, , , , func( int) parquet.Value {
			return parquet.BooleanValue(.Value())
		})
	case *array.Float64:
		 = writeListOf(, , , , func( int) parquet.Value {
			return parquet.DoubleValue(.Value())
		})
	case *array.String:
		 = writeListOf(, , , , func( int) parquet.Value {
			return parquet.ByteArrayValue([]byte(.Value()))
		})
	case *array.Binary:
		 = writeListOf(, , , , func( int) parquet.Value {
			return parquet.ByteArrayValue([]byte(.Value()))
		})
	case *array.Dictionary:
		switch d := .Dictionary().(type) {
		case *array.Binary:
			 = writeListOf(, , , , func( int) parquet.Value {
				return parquet.ByteArrayValue(
					.Value(.GetValueIndex()),
				)
			})
		case *array.String:
			 = writeListOf(, , , , func( int) parquet.Value {
				return parquet.ByteArrayValue(
					[]byte(.Value(.GetValueIndex())),
				)
			})
		default:
			return nil, fmt.Errorf("list dictionary not of expected type: %T", )
		}
	default:
		return nil, fmt.Errorf("list not of expected type: %T", )
	}
	return func( parquet.Row,  int) parquet.Row {
		if .IsNull( + ) {
			return append(,
				parquet.Value{}.Level(0, 0, ),
			)
		}
		return (, )
	}, nil
}

func writeListOf(, ,  int,  *array.List,  func( int) parquet.Value) arrowToParquet {
	return func( parquet.Row,  int) parquet.Row {
		,  := .ValueOffsets( + )
		for  := ;  < ; ++ {
			 := 0
			if  !=  {
				 = 1
			}
			 = append(, (int()).Level(, +1, ))
		}
		return 
	}
}

func writeNull( int) arrowToParquet {
	return func( parquet.Row,  int) parquet.Row {
		return append(, parquet.Value{}.Level(0, 0, ))
	}
}

type arrowToParquet func(w parquet.Row, row int) parquet.Row

func ( arrow.Record) ( map[string][]string) {
	 := make(map[string]struct{})
	for  := 0;  < .Schema().NumFields(); ++ {
		 := .Schema().Field()
		if strings.Contains(.Name, ".") {
			[.Name] = struct{}{}
		}
	}
	 = make(map[string][]string)
	for  := range  {
		, ,  := strings.Cut(, ".")
		[] = append([], )
	}
	for  := range  {
		sort.Strings([])
	}
	return
}

func ( *parquet.Schema,  arrow.Record,  map[string][]string,  int) (*dynparquet.DynamicRow, error) {
	if  >= int(.NumRows()) {
		return nil, io.EOF
	}

	,  := RecordToRow(, , )
	if  != nil {
		return nil, 
	}

	return dynparquet.NewDynamicRow(, , , .Fields()), nil
}

func ( arrow.Record,  *dynparquet.Schema) (*dynparquet.SerializedBuffer, error) {
	 := &bytes.Buffer{}
	,  := .GetWriter(, RecordDynamicCols(), false)
	if  != nil {
		return nil, 
	}
	defer .PutWriter()
	if  := RecordToFile(, .ParquetWriter, );  != nil {
		return nil, 
	}
	,  := parquet.OpenFile(bytes.NewReader(.Bytes()), int64(.Len()))
	if  != nil {
		return nil, fmt.Errorf("failed to read buf: %v", )
	}
	,  := dynparquet.NewSerializedBuffer()
	if  != nil {
		return nil, 
	}
	return , nil
}

func ( *dynparquet.Schema,  dynparquet.ParquetWriter,  arrow.Record) error {
	return RecordsToFile(, , []arrow.Record{})
}

func ( *dynparquet.Schema,  dynparquet.ParquetWriter,  []arrow.Record) error {
	 := make([]map[string][]string, 0, len())
	for ,  := range  {
		 = append(, RecordDynamicCols())
	}
	 := dynparquet.MergeDynamicColumnSets()
	defer .Close()

	,  := .GetDynamicParquetSchema()
	if  != nil {
		return 
	}
	defer .PutPooledParquetSchema()

	 := .Schema.Fields()

	for ,  := range  {
		if  := recordToRows(, , 0, int(.NumRows()), );  != nil {
			return 
		}
	}

	return nil
}