package dynparquet

import (
	
	
	
	
	
	
	
	
	
	

	
	
	
	
	

	schemapb 
	schemav2pb 
)

const (
	// The size of the column indicies in parquet files.
	ColumnIndexSize = 16
)

// ColumnDefinition describes a column in a dynamic parquet schema.
type ColumnDefinition struct {
	Name          string
	StorageLayout parquet.Node
	Dynamic       bool
	PreHash       bool
}

// SortingColumn describes a column to sort by in a dynamic parquet schema.
type SortingColumn interface {
	parquet.SortingColumn
	ColumnName() string
}

// Ascending constructs a SortingColumn value which dictates to sort by the column in ascending order.
func ( string) SortingColumn { return ascending{name: , path: []string{}} }

// Descending constructs a SortingColumn value which dictates to sort by the column in descending order.
func ( string) SortingColumn { return descending{name: , path: []string{}} }

// NullsFirst wraps the SortingColumn passed as argument so that it instructs
// the row group to place null values first in the column.
func ( SortingColumn) SortingColumn { return nullsFirst{} }

type ascending struct {
	name string
	path []string
}

func ( ascending) () string     { return "ascending(" + .name + ")" }
func ( ascending) () string { return .name }
func ( ascending) () []string     { return .path }
func ( ascending) () bool   { return false }
func ( ascending) () bool   { return false }

type descending struct {
	name string
	path []string
}

func ( descending) () string     { return "descending(" + .name + ")" }
func ( descending) () string { return .name }
func ( descending) () []string     { return .path }
func ( descending) () bool   { return true }
func ( descending) () bool   { return false }

type nullsFirst struct{ SortingColumn }

func ( nullsFirst) () string   { return fmt.Sprintf("nulls_first+%s", .SortingColumn) }
func ( nullsFirst) () bool { return true }

func makeDynamicSortingColumn( string,  SortingColumn) SortingColumn {
	 := .ColumnName() + "." + 
	return dynamicSortingColumn{
		SortingColumn:     ,
		dynamicColumnName: ,
		fullName:          ,
		path:              []string{},
	}
}

// dynamicSortingColumn is a SortingColumn which is a dynamic column.
type dynamicSortingColumn struct {
	SortingColumn
	dynamicColumnName string
	fullName          string
	path              []string
}

func ( dynamicSortingColumn) () string {
	return fmt.Sprintf("dynamic(%s, %v)", .dynamicColumnName, .SortingColumn)
}

func ( dynamicSortingColumn) () string {
	return .fullName
}

func ( dynamicSortingColumn) () []string { return .path }

// Schema is a dynamic parquet schema. It extends a parquet schema with the
// ability that any column definition that is dynamic will have columns
// dynamically created as their column name is seen for the first time.
type Schema struct {
	def            proto.Message
	columns        []ColumnDefinition
	columnIndexes  map[string]int
	sortingColumns []SortingColumn
	dynamicColumns []int

	UniquePrimaryIndex bool

	writers        *sync.Map
	buffers        *sync.Map
	sortingSchemas *sync.Map
	parquetSchemas *sync.Map
}

// FindDynamicColumnForConcreteColumn returns a column definition for the
// column passed. So "labels.label1" would return the column definition for the
// dynamic column "labels" if it exists.
func ( *Schema) ( string) (ColumnDefinition, bool) {
	 := 0
	 := false
	for ,  := range  {
		if  != '.' {
			continue
		}
		if  {
			// Can't have more than one period.
			return ColumnDefinition{}, false
		}
		 = true
		 = 
	}
	if ! {
		return ColumnDefinition{}, false
	}

	return .FindDynamicColumn([:])
}

// FindDynamicColumn returns a dynamic column definition for the column passed.
func ( *Schema) ( string) (ColumnDefinition, bool) {
	,  := .columnIndexes[]
	if ! {
		return ColumnDefinition{}, false
	}

	 := .columns[]
	// Note: This is different from the FindColumn function.
	if !.Dynamic {
		return ColumnDefinition{}, false
	}

	return , true
}

// FindColumn returns a column definition for the column passed.
func ( *Schema) ( string) (ColumnDefinition, bool) {
	,  := .columnIndexes[]
	if ! {
		return ColumnDefinition{}, false
	}

	 := .columns[]
	// Note: This is different from the FindDynamicColumn function.
	if .Dynamic {
		return ColumnDefinition{}, false
	}

	return , true
}

func findLeavesFromNode( *schemav2pb.Node) []ColumnDefinition {
	switch n := .Type.(type) {
	case *schemav2pb.Node_Leaf:
		,  := storageLayoutToParquetNode(&v2storageLayoutWrapper{.Leaf.StorageLayout})
		if  != nil {
			panic("sheisse")
		}
		return []ColumnDefinition{
			{
				Name:          .Leaf.Name,
				StorageLayout: ,
				Dynamic:       false, // TODO(can we get rid of dynamic cols): do we need dynamic columns to be separate?
			},
		}
	case *schemav2pb.Node_Group:
		 := make([]ColumnDefinition, 0, len(.Group.Nodes))
		for ,  := range .Group.Nodes {
			 = append(, ()...)
		}

		return 
	default:
		panic(fmt.Sprintf("unknown node type: %v", ))
	}
}

func ( *schemav2pb.Schema) *parquet.Schema {
	 := parquet.Group{}
	for ,  := range .Root.Nodes {
		[nameFromNodeDef()] = nodeFromDefinition()
	}

	return parquet.NewSchema(.Root.Name, )
}

func nodeFromDefinition( *schemav2pb.Node) parquet.Node {
	switch n := .Type.(type) {
	case *schemav2pb.Node_Leaf:
		,  := storageLayoutToParquetNode(&v2storageLayoutWrapper{.Leaf.StorageLayout})
		if  != nil {
			panic("sheisse")
		}
		return 
	case *schemav2pb.Node_Group:
		 := parquet.Group{}
		for ,  := range .Group.Nodes {
			[nameFromNodeDef()] = ()
		}

		var  parquet.Node
		 = 
		if .Group.Nullable {
			 = parquet.Optional()
		}

		if .Group.Repeated {
			 = parquet.Repeated()
		}

		return 
	default:
		panic(fmt.Sprintf("unknown node type: %v", ))
	}
}

func nameFromNodeDef( *schemav2pb.Node) string {
	switch n := .Type.(type) {
	case *schemav2pb.Node_Leaf:
		return .Leaf.Name
	case *schemav2pb.Node_Group:
		return .Group.Name
	default:
		panic(fmt.Sprintf("unknown node type: %v", ))
	}
}

func ( proto.Message) (*Schema, error) {
	var (
		            []ColumnDefinition
		     []SortingColumn
		 bool
	)
	switch def := .(type) {
	case *schemapb.Schema:
		 = make([]ColumnDefinition, 0, len(.Columns))
		for ,  := range .Columns {
			,  := storageLayoutToParquetNode(&v1storageLayoutWrapper{.StorageLayout})
			if  != nil {
				return nil, 
			}
			 = append(, ColumnDefinition{
				Name:          .Name,
				StorageLayout: ,
				Dynamic:       .Dynamic,
				PreHash:       .Prehash,
			})
		}

		 = make([]SortingColumn, 0, len(.SortingColumns))
		for ,  := range .SortingColumns {
			var  SortingColumn
			switch .Direction {
			case schemapb.SortingColumn_DIRECTION_ASCENDING:
				 = Ascending(.Name)
			case schemapb.SortingColumn_DIRECTION_DESCENDING:
				 = Descending(.Name)
			default:
				return nil, fmt.Errorf("unknown sorting direction %q, only \"ascending\", \"descending\" are valid choices", .Direction)
			}
			if .NullsFirst {
				 = NullsFirst()
			}
			 = append(, )
		}
		 = .UniquePrimaryIndex
	case *schemav2pb.Schema:
		 = []ColumnDefinition{}
		for ,  := range .Root.Nodes {
			 = append(, findLeavesFromNode()...)
		}

		 = make([]SortingColumn, 0, len(.SortingColumns))
		for ,  := range .SortingColumns {
			var  SortingColumn
			switch .Direction {
			case schemav2pb.SortingColumn_DIRECTION_ASCENDING:
				 = Ascending(.Path)
			case schemav2pb.SortingColumn_DIRECTION_DESCENDING:
				 = Descending(.Path)
			default:
				return nil, fmt.Errorf("unknown sorting direction %q, only \"ascending\", \"descending\" are valid choices", .Direction)
			}
			if .NullsFirst {
				 = NullsFirst()
			}
			 = append(, )
		}
		 = .UniquePrimaryIndex
	}

	return newSchema(, , , ), nil
}

// DefinitionFromParquetFile converts a parquet file into a schemapb.Schema.
func ( *parquet.File) (*schemapb.Schema, error) {
	 := .Schema()

	,  := NewSerializedBuffer()
	if  != nil {
		return nil, 
	}
	 := .DynamicColumns()
	 := map[string]struct{}{}
	 := []*schemapb.Column{}
	 := .Metadata()
	 := []*schemapb.SortingColumn{}
	 := map[string]struct{}{}
	for ,  := range .RowGroups {
		// Extract the sorting column information
		for ,  := range .SortingColumns {
			 := .Columns[.ColumnIdx].MetaData.PathInSchema[0]
			 := false
			 := strings.Split(, ".")
			 := [0]
			if len() > 1 && len([]) != 0 {
				 = true
			}

			if  {
				 = 
			}

			// we need a set to filter out duplicates
			if ,  := [];  {
				continue
			}
			[] = struct{}{}

			 := schemapb.SortingColumn_DIRECTION_ASCENDING
			if .Descending {
				 = schemapb.SortingColumn_DIRECTION_DESCENDING
			}
			 = append(, &schemapb.SortingColumn{
				Name:       ,
				Direction:  ,
				NullsFirst: .NullsFirst,
			})
		}

		for ,  := range .Columns {
			 := .MetaData.PathInSchema[0] // we only support flat schemas

			// Check if the column is optional
			 := false
			for ,  := range .Fields() {
				if .Name() ==  {
					 = .Optional()
				}
			}

			 := false
			 := strings.Split(, ".")
			 := [0]
			if len() > 1 && len([]) != 0 {
				 = true
			}

			// Mark the dynamic column as being found
			if ,  := [];  {
				continue
			}
			[] = struct{}{}

			 = append(, &schemapb.Column{
				Name:          [0],
				StorageLayout: parquetColumnMetaDataToStorageLayout(.MetaData, ),
				Dynamic:       ,
			})
		}
	}

	return &schemapb.Schema{
		Name:           .Name(),
		Columns:        ,
		SortingColumns: ,
	}, nil
}

// SchemaFromParquetFile converts a parquet file into a dnyparquet.Schema.
func ( *parquet.File) (*Schema, error) {
	,  := DefinitionFromParquetFile()
	if  != nil {
		return nil, 
	}

	return SchemaFromDefinition()
}

func parquetColumnMetaDataToStorageLayout( format.ColumnMetaData,  bool) *schemapb.StorageLayout {
	 := &schemapb.StorageLayout{
		Nullable: ,
	}

	switch .Encoding[len(.Encoding)-1] {
	case format.RLEDictionary:
		.Encoding = schemapb.StorageLayout_ENCODING_RLE_DICTIONARY
	case format.DeltaBinaryPacked:
		.Encoding = schemapb.StorageLayout_ENCODING_DELTA_BINARY_PACKED
	}

	switch .Codec {
	case format.Snappy:
		.Compression = schemapb.StorageLayout_COMPRESSION_SNAPPY
	case format.Gzip:
		.Compression = schemapb.StorageLayout_COMPRESSION_GZIP
	case format.Brotli:
		.Compression = schemapb.StorageLayout_COMPRESSION_BROTLI
	case format.Lz4Raw:
		.Compression = schemapb.StorageLayout_COMPRESSION_LZ4_RAW
	case format.Zstd:
		.Compression = schemapb.StorageLayout_COMPRESSION_ZSTD
	}

	switch .Type {
	case format.ByteArray:
		.Type = schemapb.StorageLayout_TYPE_STRING
	case format.Int64:
		.Type = schemapb.StorageLayout_TYPE_INT64
	case format.Double:
		.Type = schemapb.StorageLayout_TYPE_DOUBLE
	case format.Boolean:
		.Type = schemapb.StorageLayout_TYPE_BOOL
	}

	return 
}

type StorageLayout interface {
	GetTypeInt32() int32
	GetRepeated() bool
	GetNullable() bool
	GetEncodingInt32() int32
	GetCompressionInt32() int32
}

type v1storageLayoutWrapper struct {
	*schemapb.StorageLayout
}

func ( *v1storageLayoutWrapper) () bool {
	return .Repeated
}

func ( *v1storageLayoutWrapper) () int32 {
	return int32(.GetType())
}

func ( *v1storageLayoutWrapper) () int32 {
	return int32(.GetEncoding())
}

func ( *v1storageLayoutWrapper) () int32 {
	return int32(.GetCompression())
}

type v2storageLayoutWrapper struct {
	*schemav2pb.StorageLayout
}

func ( *v2storageLayoutWrapper) () int32 {
	return int32(.GetType())
}

func ( *v2storageLayoutWrapper) () int32 {
	return int32(.GetEncoding())
}

func ( *v2storageLayoutWrapper) () int32 {
	return int32(.GetCompression())
}

func ( *schemav2pb.StorageLayout) StorageLayout {
	return nil
}

func storageLayoutToParquetNode( StorageLayout) (parquet.Node, error) {
	var  parquet.Node
	switch .GetTypeInt32() {
	case int32(schemapb.StorageLayout_TYPE_STRING):
		 = parquet.String()
	case int32(schemapb.StorageLayout_TYPE_INT64):
		 = parquet.Int(64)
	case int32(schemapb.StorageLayout_TYPE_DOUBLE):
		 = parquet.Leaf(parquet.DoubleType)
	case int32(schemapb.StorageLayout_TYPE_BOOL):
		 = parquet.Leaf(parquet.BooleanType)
	case int32(schemapb.StorageLayout_TYPE_INT32):
		 = parquet.Int(32)
	case int32(schemapb.StorageLayout_TYPE_UINT64):
		 = parquet.Uint(64)
	default:
		return nil, fmt.Errorf("unknown storage layout type: %v", .GetTypeInt32())
	}

	if .GetNullable() {
		 = parquet.Optional()
	}

	if .GetEncodingInt32() != int32(schemapb.StorageLayout_ENCODING_PLAIN_UNSPECIFIED) {
		,  := encodingFromDefinition(.GetEncodingInt32())
		if  != nil {
			return nil, 
		}
		 = parquet.Encoded(, )
	}

	if .GetCompressionInt32() != int32(schemapb.StorageLayout_COMPRESSION_NONE_UNSPECIFIED) {
		,  := compressionFromDefinition(.GetCompressionInt32())
		if  != nil {
			return nil, 
		}
		 = parquet.Compressed(, )
	}

	if .GetRepeated() {
		 = parquet.Repeated()
	}

	return , nil
}

func encodingFromDefinition( int32) (encoding.Encoding, error) {
	switch  {
	case int32(schemapb.StorageLayout_ENCODING_RLE_DICTIONARY):
		return &parquet.RLEDictionary, nil
	case int32(schemapb.StorageLayout_ENCODING_DELTA_BINARY_PACKED):
		return &parquet.DeltaBinaryPacked, nil
	case int32(schemapb.StorageLayout_ENCODING_DELTA_BYTE_ARRAY):
		return &parquet.DeltaByteArray, nil
	case int32(schemapb.StorageLayout_ENCODING_DELTA_LENGTH_BYTE_ARRAY):
		return &parquet.DeltaLengthByteArray, nil
	default:
		return nil, fmt.Errorf("unknown encoding: %v", )
	}
}

func compressionFromDefinition( int32) (compress.Codec, error) {
	switch  {
	case int32(schemapb.StorageLayout_COMPRESSION_SNAPPY):
		return &parquet.Snappy, nil
	case int32(schemapb.StorageLayout_COMPRESSION_GZIP):
		return &parquet.Gzip, nil
	case int32(schemapb.StorageLayout_COMPRESSION_BROTLI):
		return &parquet.Brotli, nil
	case int32(schemapb.StorageLayout_COMPRESSION_LZ4_RAW):
		return &parquet.Lz4Raw, nil
	case int32(schemapb.StorageLayout_COMPRESSION_ZSTD):
		return &parquet.Zstd, nil
	default:
		return nil, fmt.Errorf("unknown compression: %v", )
	}
}

// NewSchema creates a new dynamic parquet schema with the given name, column
// definitions and sorting columns. The order of the sorting columns is
// important as it determines the order in which data is written to a file or
// laid out in memory.
func newSchema(
	 proto.Message,
	 []ColumnDefinition,
	 []SortingColumn,
	 bool,
) *Schema {
	sort.Slice(, func(,  int) bool {
		return [].Name < [].Name
	})

	 := make(map[string]int, len())
	for ,  := range  {
		[.Name] = 
	}

	 := &Schema{
		def:                ,
		columns:            ,
		sortingColumns:     ,
		columnIndexes:      ,
		writers:            &sync.Map{},
		buffers:            &sync.Map{},
		sortingSchemas:     &sync.Map{},
		parquetSchemas:     &sync.Map{},
		UniquePrimaryIndex: ,
	}

	for ,  := range  {
		if .Dynamic {
			.dynamicColumns = append(.dynamicColumns, )
		}
	}

	return 
}

func ( *Schema) () string {
	switch sc := .def.(type) {
	case *schemapb.Schema:
		return .GetName()
	case *schemav2pb.Schema:
		return .Root.GetName()
	default:
		panic("unknown schema version")
	}
}

func ( *Schema) () proto.Message {
	return .def
}

func ( *Schema) ( string) (ColumnDefinition, bool) {
	,  := .columnIndexes[]
	if ! {
		return ColumnDefinition{}, false
	}
	return .columns[], true
}

func ( *Schema) () []ColumnDefinition {
	return .columns
}

func ( *Schema) () []SortingColumn {
	 := make([]SortingColumn, len(.sortingColumns))
	copy(, .sortingColumns)
	return 
}

func ( *Schema) () []ColumnDefinition {
	 := make([]ColumnDefinition, len(.sortingColumns))
	for ,  := range .sortingColumns {
		[] = .columns[.columnIndexes[.ColumnName()]]
	}
	return 
}

func ( *Schema) () *parquet.Schema {
	switch schema := .def.(type) {
	case *schemav2pb.Schema:
		return ParquetSchemaFromV2Definition()
	case *schemapb.Schema:
		 := parquet.Group{}
		for ,  := range .columns {
			[.Name] = .StorageLayout
		}
		return parquet.NewSchema(.Name(), )
	default:
		panic(fmt.Sprintf("unknown schema version %T", ))
	}
}

// dynamicParquetSchema returns the parquet schema for the dynamic schema with the
// concrete dynamic column names given in the argument.
func ( Schema) ( map[string][]string) (*parquet.Schema, error) {
	switch def := .def.(type) {
	case *schemav2pb.Schema:
		return ParquetSchemaFromV2Definition(), nil
	case *schemapb.Schema:
		 := parquet.Group{}
		for ,  := range .columns {
			if .Dynamic {
				 := dynamicColumnsFor(.Name, )
				for ,  := range  {
					[.Name+"."+] = .StorageLayout
					if .PreHash {
						[HashedColumnName(.Name+"."+)] = parquet.Int(64) // TODO(thor): Do we need compression etc. here?
					}
				}
				continue
			}
			[.Name] = .StorageLayout
			if .PreHash {
				[HashedColumnName(.Name)] = parquet.Int(64) // TODO(thor): Do we need compression etc. here?
			}
		}

		return parquet.NewSchema(.Name(), ), nil
	default:
		return nil, fmt.Errorf("unsupported schema definition version")
	}
}

// parquetSortingSchema returns the parquet schema of just the sorting columns
// with the concrete dynamic column names given in the argument.
func ( Schema) (
	 map[string][]string,
) (
	*parquet.Schema,
	error,
) {
	 := parquet.Group{}
	for ,  := range .sortingColumns {
		 := .ColumnName()
		 := .columns[.columnIndexes[]]
		if !.Dynamic {
			[] = .StorageLayout
			continue
		}

		 := dynamicColumnsFor(.Name, )
		for ,  := range  {
			[+"."+] = .StorageLayout
		}
	}
	return parquet.NewSchema(.Name(), ), nil
}

// ParquetSortingColumns returns the parquet sorting columns for the dynamic
// sorting columns with the concrete dynamic column names given in the
// argument.
func ( Schema) (
	 map[string][]string,
) []parquet.SortingColumn {
	 := make([]parquet.SortingColumn, 0, len(.sortingColumns))
	for ,  := range .sortingColumns {
		 := .ColumnName()
		if !.columns[.columnIndexes[]].Dynamic {
			 = append(, )
			continue
		}
		 := dynamicColumnsFor(, )
		for ,  := range  {
			 = append(, makeDynamicSortingColumn(, ))
		}
	}
	return 
}

// dynamicColumnsFor returns the concrete dynamic column names for the given dynamic column name.
func dynamicColumnsFor( string,  map[string][]string) []string {
	return []
}

// Buffer represents an batch of rows with a concrete set of dynamic column
// names representing how its parquet schema was created off of a dynamic
// parquet schema.
type Buffer struct {
	buffer         *parquet.Buffer
	dynamicColumns map[string][]string
	fields         []parquet.Field
}

func ( *Buffer) () {
	.buffer.Reset()
}

func ( *Buffer) () int64 {
	return .buffer.Size()
}

func ( *Buffer) () string {
	return prettyRowGroup()
}

// DynamicRowGroup is a parquet.RowGroup that can describe the concrete dynamic
// columns.
type DynamicRowGroup interface {
	parquet.RowGroup
	fmt.Stringer
	// DynamicColumns returns the concrete dynamic column names that were used
	// create its concrete parquet schema with a dynamic parquet schema.
	DynamicColumns() map[string][]string
	// DynamicRows return an iterator over the rows in the row group.
	DynamicRows() DynamicRowReader
}

type prettyWriter struct {
	*tabwriter.Writer

	cellWidth int
	b         *bytes.Buffer
}

func newPrettyWriter() prettyWriter {
	const (
		 = 15
		 = 
		  = 2
		  = ' '
		  = 0
	)
	 := prettyWriter{
		cellWidth: ,
		b:         bytes.NewBuffer(nil),
	}
	.Writer = tabwriter.NewWriter(.b, , , , , )
	return 
}

func ( prettyWriter) () string {
	return .b.String()
}

func ( prettyWriter) ( string) string {
	const  = "..."
	if len() > .cellWidth {
		return [:.cellWidth-len()] + 
	}
	return 
}

func ( prettyWriter) ( DynamicRowGroup) {
	 := .Rows()
	defer .Close()

	// Print sorting schema.
	for ,  := range .SortingColumns() {
		_, _ = .Write([]byte(.truncateString(fmt.Sprintf("%v", .Path())) + "\t"))
	}
	_, _ = .Write([]byte("\n"))

	 := make([]parquet.Row, .NumRows())
	for {
		,  := .ReadRows()
		for ,  := range [:] {
			// Print only sorting columns.
			for ,  := range .SortingColumns() {
				,  := .Schema().Lookup(.Path()...)
				if ! {
					panic(fmt.Sprintf("sorting column not found: %v", .Path()))
				}

				_, _ = .Write([]byte(.truncateString([.ColumnIndex].String()) + "\t"))
			}
			_, _ = .Write([]byte("\n"))
		}
		if  != nil {
			if errors.Is(, io.EOF) {
				break
			}
			panic()
		}
	}
}

func prettyRowGroup( DynamicRowGroup) string {
	 := newPrettyWriter()
	.writePrettyRowGroup()
	_ = .Flush()
	return .String()
}

// DynamicRowReader is an iterator over the rows in a DynamicRowGroup.
type DynamicRowReader interface {
	parquet.RowSeeker
	ReadRows(*DynamicRows) (int, error)
	Close() error
}

type dynamicRowGroupReader struct {
	schema         *parquet.Schema
	dynamicColumns map[string][]string
	rows           parquet.Rows
	fields         []parquet.Field
}

func newDynamicRowGroupReader( DynamicRowGroup,  []parquet.Field) *dynamicRowGroupReader {
	return &dynamicRowGroupReader{
		schema:         .Schema(),
		dynamicColumns: .DynamicColumns(),
		rows:           .Rows(),
		fields:         ,
	}
}

func ( *dynamicRowGroupReader) ( int64) error {
	return .rows.SeekToRow()
}

// ReadRows implements the DynamicRows interface.
func ( *dynamicRowGroupReader) ( *DynamicRows) (int, error) {
	if .DynamicColumns == nil {
		.DynamicColumns = .dynamicColumns
	}
	if .Schema == nil {
		.Schema = .schema
	}
	if .fields == nil {
		.fields = .fields
	}

	,  := .rows.ReadRows(.Rows)
	if  == io.EOF {
		.Rows = .Rows[:]
		return , io.EOF
	}
	if  != nil {
		return , fmt.Errorf("read row: %w", )
	}
	.Rows = .Rows[:]

	return , nil
}

func ( *dynamicRowGroupReader) () error {
	return .rows.Close()
}

// ColumnChunks returns the list of parquet.ColumnChunk for the given index.
// It contains all the pages associated with this row group's column.
// Implements the parquet.RowGroup interface.
func ( *Buffer) () []parquet.ColumnChunk {
	return .buffer.ColumnChunks()
}

// NumRows returns the number of rows in the buffer. Implements the
// parquet.RowGroup interface.
func ( *Buffer) () int64 {
	return .buffer.NumRows()
}

func ( *Buffer) () {
	sort.Sort(.buffer)
}

func ( *Buffer) () (*Buffer, error) {
	 := parquet.NewBuffer(
		.buffer.Schema(),
		parquet.SortingRowGroupConfig(
			parquet.SortingColumns(.buffer.SortingColumns()...),
		),
	)

	 := .buffer.Rows()
	defer .Close()
	for {
		 := make([]parquet.Row, 64)
		,  := .ReadRows()
		if  == io.EOF &&  == 0 {
			break
		}
		if  != nil &&  != io.EOF {
			return nil, 
		}
		 = [:]
		_,  = .WriteRows()
		if  != nil {
			return nil, 
		}
		if  == io.EOF {
			break
		}
	}

	return &Buffer{
		buffer:         ,
		dynamicColumns: .dynamicColumns,
		fields:         .fields,
	}, nil
}

// Schema returns the concrete parquet.Schema of the buffer. Implements the
// parquet.RowGroup interface.
func ( *Buffer) () *parquet.Schema {
	return .buffer.Schema()
}

// SortingColumns returns the concrete slice of parquet.SortingColumns of the
// buffer. Implements the parquet.RowGroup interface.
func ( *Buffer) () []parquet.SortingColumn {
	return .buffer.SortingColumns()
}

// DynamicColumns returns the concrete dynamic column names of the buffer. It
// implements the DynamicRowGroup interface.
func ( *Buffer) () map[string][]string {
	return .dynamicColumns
}

// WriteRow writes a single row to the buffer.
func ( *Buffer) ( []parquet.Row) (int, error) {
	return .buffer.WriteRows()
}

// WriteRowGroup writes a single row to the buffer.
func ( *Buffer) ( parquet.RowGroup) (int64, error) {
	return .buffer.WriteRowGroup()
}

// Rows returns an iterator for the rows in the buffer. It implements the
// parquet.RowGroup interface.
func ( *Buffer) () parquet.Rows {
	return .buffer.Rows()
}

// DynamicRows returns an iterator for the rows in the buffer. It implements the
// DynamicRowGroup interface.
func ( *Buffer) () DynamicRowReader {
	return newDynamicRowGroupReader(, .fields)
}

var (
	matchFirstCap = regexp.MustCompile("(.)([A-Z][a-z]+)")
	matchAllCap   = regexp.MustCompile("([a-z0-9])([A-Z])")
)

func ( string) string {
	 := matchFirstCap.ReplaceAllString(, "${1}_${2}")
	 = matchAllCap.ReplaceAllString(, "${1}_${2}")
	return strings.ToLower()
}

// NewBuffer returns a new buffer with a concrete parquet schema generated
// using the given concrete dynamic column names.
func ( *Schema) ( map[string][]string) (*Buffer, error) {
	,  := .GetDynamicParquetSchema()
	if  != nil {
		return nil, fmt.Errorf("create parquet schema for buffer: %w", )
	}
	defer .PutPooledParquetSchema()

	 := .ParquetSortingColumns()
	return &Buffer{
		dynamicColumns: ,
		buffer: parquet.NewBuffer(
			.Schema,
			parquet.SortingRowGroupConfig(
				parquet.SortingColumns(...),
			),
		),
		fields: .Schema.Fields(),
	}, nil
}

func ( *Schema) ( ...*schemav2pb.Node) (*Buffer, error) {
	,  := .def.(*schemav2pb.Schema)
	if ! {
		return nil, fmt.Errorf("unsupported schema")
	}

	// merge all the dynamic columns; then merge into the top-level schema
	if len() > 0 {
		 := [0]
		for  := 1;  < len(); ++ {
			proto.Merge(, [])
		}
		proto.Merge(, &schemav2pb.Schema{
			Root: &schemav2pb.Group{
				Nodes: []*schemav2pb.Node{},
			},
		})
	}

	 := ParquetSchemaFromV2Definition()
	 := .ParquetSortingColumns(map[string][]string{})
	return &Buffer{
		dynamicColumns: map[string][]string{}, // unused for v2
		buffer: parquet.NewBuffer(
			,
			parquet.SortingRowGroupConfig(
				parquet.SortingColumns(...),
			),
		),
		fields: .Fields(),
	}, nil
}

func ( *Schema) ( io.Writer,  *Buffer) error {
	,  := .GetWriter(, .DynamicColumns(), false)
	if  != nil {
		return fmt.Errorf("create writer: %w", )
	}
	defer .PutWriter()

	 := .Rows()
	defer .Close()
	_,  = parquet.CopyRows(, )
	if  != nil {
		return fmt.Errorf("copy rows: %w", )
	}

	if  := .Close();  != nil {
		return fmt.Errorf("close writer: %w", )
	}

	return nil
}

// bloomFilterBitsPerValue is the number of bits used by the bloom filter. 10
// was the default value used by parquet before it was made configurable.
const bloomFilterBitsPerValue = 10

// NewWriter returns a new parquet writer with a concrete parquet schema
// generated using the given concrete dynamic column names.
func ( *Schema) ( io.Writer,  map[string][]string,  bool,  ...parquet.WriterOption) (ParquetWriter, error) {
	,  := .GetDynamicParquetSchema()
	if  != nil {
		return nil, 
	}
	defer .PutPooledParquetSchema()

	 := .ParquetSortingColumns()
	 := make([]parquet.BloomFilterColumn, 0, len())
	for ,  := range  {
		// Don't add bloom filters to boolean columns
		 := strings.Split(.Path()[0], ".")[0]
		,  := .ColumnByName()
		if ! {
			continue
		}
		if .StorageLayout.Type().Kind() == parquet.Boolean {
			continue
		}

		 = append(
			, parquet.SplitBlockFilter(bloomFilterBitsPerValue, .Path()...),
		)
	}

	 := []parquet.WriterOption{
		.Schema,
		parquet.ColumnIndexSizeLimit(ColumnIndexSize),
		parquet.BloomFilters(...),
		parquet.KeyValueMetadata(
			DynamicColumnsKey,
			serializeDynamicColumns(),
		),
		parquet.SortingWriterConfig(
			parquet.SortingColumns(...),
		),
	}
	 = append(, ...)
	if  {
		return parquet.NewSortingWriter[any](, 32*1024, ...), nil
	}
	return parquet.NewGenericWriter[any](, ...), nil
}

type ParquetWriter interface {
	Schema() *parquet.Schema
	Write(rows []any) (int, error)
	WriteRows(rows []parquet.Row) (int, error)
	Flush() error
	Close() error
	Reset(writer io.Writer)
}

type PooledWriter struct {
	pool *sync.Pool
	ParquetWriter
}

func ( *Schema) ( io.Writer,  map[string][]string,  bool) (*PooledWriter, error) {
	 := serializeDynamicColumns()
	,  := .writers.LoadOrStore(fmt.Sprintf("%s,sorting=%t", , ), &sync.Pool{})
	 := .(*sync.Pool).Get()
	if  == nil {
		,  := .NewWriter(, , )
		if  != nil {
			return nil, 
		}
		return &PooledWriter{
			pool:          .(*sync.Pool),
			ParquetWriter: ,
		}, nil
	}
	.(*PooledWriter).Reset()
	return .(*PooledWriter), nil
}

type PooledParquetSchema struct {
	pool   *sync.Pool
	Schema *parquet.Schema
}

// GetParquetSortingSchema returns a parquet schema of the sorting columns and
// the given dynamic columns.
// The difference with GetDynamicParquetSchema is that non-sorting columns are elided.
func ( *Schema) ( map[string][]string) (*PooledParquetSchema, error) {
	 := serializeDynamicColumns()
	,  := .sortingSchemas.LoadOrStore(, &sync.Pool{})
	 := .(*sync.Pool).Get()
	if  == nil {
		,  := .parquetSortingSchema()
		if  != nil {
			return nil, 
		}
		return &PooledParquetSchema{
			pool:   .(*sync.Pool),
			Schema: ,
		}, nil
	}
	return .(*PooledParquetSchema), nil
}

// GetDynamicParquetSchema returns a parquet schema of the all columns and
// the given dynamic columns.
// The difference with GetParquetSortingSchema is that all columns are included
// in the parquet schema.
func ( *Schema) ( map[string][]string) (*PooledParquetSchema, error) {
	 := serializeDynamicColumns()
	,  := .parquetSchemas.LoadOrStore(, &sync.Pool{})
	 := .(*sync.Pool).Get()
	if  == nil {
		,  := .dynamicParquetSchema()
		if  != nil {
			return nil, 
		}
		return &PooledParquetSchema{
			pool:   .(*sync.Pool),
			Schema: ,
		}, nil
	}
	return .(*PooledParquetSchema), nil
}

func ( *Schema) ( *PooledParquetSchema) {
	.pool.Put()
}

func ( *Schema) ( *PooledWriter) {
	.Reset(bytes.NewBuffer(nil))
	.pool.Put()
}

func ( *Schema) () {
	.writers = &sync.Map{}
}

type PooledBuffer struct {
	pool *sync.Pool
	*Buffer
}

func ( *Schema) ( map[string][]string) (*PooledBuffer, error) {
	 := serializeDynamicColumns()
	,  := .buffers.LoadOrStore(, &sync.Pool{})
	 := .(*sync.Pool).Get()
	if  == nil {
		,  := .NewBuffer()
		if  != nil {
			return nil, 
		}
		return &PooledBuffer{
			pool:   .(*sync.Pool),
			Buffer: ,
		}, nil
	}
	return .(*PooledBuffer), nil
}

func ( *Schema) ( *PooledBuffer) {
	.Reset()
	.pool.Put()
}

func ( *Schema) () {
	.buffers = &sync.Map{}
}

// MergedRowGroup allows wrapping any parquet.RowGroup to implement the
// DynamicRowGroup interface by specifying the concrete dynamic column names
// the RowGroup's schema contains.
type MergedRowGroup struct {
	parquet.RowGroup
	DynCols map[string][]string
	fields  []parquet.Field
}

func ( *MergedRowGroup) () string {
	return prettyRowGroup()
}

// DynamicColumns returns the concrete dynamic column names that were used
// create its concrete parquet schema with a dynamic parquet schema. Implements
// the DynamicRowGroup interface.
func ( *MergedRowGroup) () map[string][]string {
	return .DynCols
}

// DynamicRows returns an iterator over the rows in the row group. Implements
// the DynamicRowGroup interface.
func ( *MergedRowGroup) () DynamicRowReader {
	return newDynamicRowGroupReader(, .fields)
}

type mergeOption struct {
	dynamicColumns map[string][]string
	// alreadySorted indicates that the row groups are already sorted and
	// non-overlapping. This results in a parquet.MultiRowGroup, which is just
	// a wrapper without the full-scale merging infrastructure.
	alreadySorted bool
}

type MergeOption func(m *mergeOption)

func ( map[string][]string) MergeOption {
	return func( *mergeOption) {
		.dynamicColumns = 
	}
}

func () MergeOption {
	return func( *mergeOption) {
		.alreadySorted = true
	}
}

// MergeDynamicRowGroups merges the given dynamic row groups into a single
// dynamic row group. It merges the parquet schema in a non-conflicting way by
// merging all the concrete dynamic column names and generating a superset
// parquet schema that all given dynamic row groups are compatible with.
func ( *Schema) ( []DynamicRowGroup,  ...MergeOption) (DynamicRowGroup, error) {
	if len() == 1 {
		return [0], nil
	}

	// Apply options
	 := &mergeOption{}
	for ,  := range  {
		()
	}

	 := .dynamicColumns
	if  == nil {
		 = mergeDynamicRowGroupDynamicColumns()
	}
	,  := .GetDynamicParquetSchema()
	if  != nil {
		return nil, fmt.Errorf("create merged parquet schema merging %d row groups: %w", len(), )
	}
	defer .PutPooledParquetSchema()

	 := .ParquetSortingColumns()

	 := make([]parquet.RowGroup, 0, len())
	for ,  := range  {
		 = append(, NewDynamicRowGroupMergeAdapter(
			.Schema,
			,
			,
			,
		))
	}

	var  []parquet.RowGroupOption
	if !.alreadySorted {
		 = append(, parquet.SortingRowGroupConfig(
			parquet.SortingColumns(...),
		))
	}
	,  := parquet.MergeRowGroups(
		,
		...,
	)
	if  != nil {
		return nil, fmt.Errorf("create merge row groups: %w", )
	}

	return &MergedRowGroup{
		RowGroup: ,
		DynCols:  ,
		fields:   .Schema.Fields(),
	}, nil
}

// mergeDynamicRowGroupDynamicColumns merges the concrete dynamic column names
// of multiple DynamicRowGroups into a single, merged, superset of dynamic
// column names.
func mergeDynamicRowGroupDynamicColumns( []DynamicRowGroup) map[string][]string {
	 := []map[string][]string{}
	for ,  := range  {
		 = append(, .DynamicColumns())
	}

	return MergeDynamicColumnSets()
}

func ( []map[string][]string) map[string][]string {
	 := newDynamicColumnSetMerger()
	defer .Release()
	return .Merge()
}

type dynColSet struct {
	keys   []string
	seen   map[string]struct{}
	values []string
}

func newDynamicColumnSetMerger() *dynColSet {
	return mergeSetPool.Get().(*dynColSet)
}

var mergeSetPool = &sync.Pool{New: func() any {
	return &dynColSet{
		seen:   make(map[string]struct{}),
		keys:   make([]string, 0, 16), // This is arbitrary we anticipate to be lower than values size
		values: make([]string, 0, 64), // This is arbitrary
	}
}}

func ( *dynColSet) () {
	.keys = .keys[:0]
	clear(.seen)
	.values = .values[:0]
	mergeSetPool.Put()
}

func ( *dynColSet) ( []map[string][]string) ( map[string][]string) {
	// TODO:(gernest) use k-way merge
	 = make(map[string][]string)
	for  := range  {
		for  := range [] {
			if ,  := .seen[]; ! {
				.keys = append(.keys, )
				.seen[] = struct{}{}
			}
		}
	}
	for  := range .keys {
		clear(.seen)
		for  := range  {
			,  := [][.keys[]]
			if ! {
				continue
			}
			for  := range  {
				if ,  := .seen[[]]; ! {
					.values = append(.values, [])
					.seen[[]] = struct{}{}
				}
			}
		}
		sort.Strings(.values)
		[.keys[]] = slices.Clone(.values)
		.values = .values[:0]
	}
	return
}

// MergeDeduplicatedDynCols is a light wrapper over sorting the deduplicated
// dynamic column names provided in dyn. It is extracted as a public method
// since this merging determines the order in which dynamic columns are stored
// and components from other packages sometimes need to figure out the physical
// sort order between dynamic columns.
func ( []string) []string {
	sort.Strings()
	return 
}

// NewDynamicRowGroupMergeAdapter returns a *DynamicRowGroupMergeAdapter, which
// maps the columns of the original row group to the columns in the super-set
// schema provided. This allows row groups that have non-conflicting dynamic
// schemas to be merged into a single row group with a superset parquet schema.
// The provided schema must not conflict with the original row group's schema
// it must be strictly a superset, this property is not checked, it is assumed
// to be true for performance reasons.
func (
	 *parquet.Schema,
	 []parquet.SortingColumn,
	 map[string][]string,
	 parquet.RowGroup,
) *DynamicRowGroupMergeAdapter {
	return &DynamicRowGroupMergeAdapter{
		schema:               ,
		sortingColumns:       ,
		mergedDynamicColumns: ,
		originalRowGroup:     ,
		indexMapping: mapMergedColumnNameIndexes(
			schemaRootFieldNames(),
			schemaRootFieldNames(.Schema()),
		),
	}
}

func schemaRootFieldNames( *parquet.Schema) []string {
	 := .Fields()
	 := make([]string, 0, len())
	for ,  := range  {
		 = append(, .Name())
	}
	return 
}

// mapMergedColumnNameIndexes maps the column indexes of the original row group
// to the indexes of the merged schema.
func mapMergedColumnNameIndexes(,  []string) []int {
	 := len()
	 := make([]int, len())
	 := 0
	for ,  := range  {
		if  <  && [] ==  {
			[] = 
			++
			continue
		}
		[] = -1
	}
	return 
}

// DynamicRowGroupMergeAdapter maps a RowBatch with a Schema with a subset of dynamic
// columns to a Schema with a superset of dynamic columns. It implements the
// parquet.RowGroup interface.
type DynamicRowGroupMergeAdapter struct {
	schema               *parquet.Schema
	sortingColumns       []parquet.SortingColumn
	mergedDynamicColumns map[string][]string
	originalRowGroup     parquet.RowGroup
	indexMapping         []int
}

// Returns the number of rows in the group.
func ( *DynamicRowGroupMergeAdapter) () int64 {
	return .originalRowGroup.NumRows()
}

func ( []parquet.Field,  string) parquet.Field {
	for ,  := range  {
		if .Name() ==  {
			return 
		}
	}
	return nil
}

// Returns the leaf column at the given index in the group. Searches for the
// same column in the original batch. If not found returns a column chunk
// filled with nulls.
func ( *DynamicRowGroupMergeAdapter) () []parquet.ColumnChunk {
	// This only works because we currently only support flat schemas.
	 := .schema.Fields()
	 := .originalRowGroup.ColumnChunks()
	 := make([]parquet.ColumnChunk, len())
	for ,  := range  {
		 := .indexMapping[]
		if  == -1 {
			 := FieldByName(, .Name())
			[] = NewNilColumnChunk(.Type(), , int(.NumRows()))
		} else {
			[] = &remappedColumnChunk{
				ColumnChunk:   [],
				remappedIndex: ,
			}
		}
	}
	return 
}

// Returns the schema of rows in the group. The schema is the configured
// merged, superset schema.
func ( *DynamicRowGroupMergeAdapter) () *parquet.Schema {
	return .schema
}

// Returns the list of sorting columns describing how rows are sorted in the
// group.
//
// The method will return an empty slice if the rows are not sorted.
func ( *DynamicRowGroupMergeAdapter) () []parquet.SortingColumn {
	return .sortingColumns
}

// Returns a reader exposing the rows of the row group.
func ( *DynamicRowGroupMergeAdapter) () parquet.Rows {
	return parquet.NewRowGroupRowReader()
}

// remappedColumnChunk is a ColumnChunk that wraps a ColumnChunk and makes it
// appear to the called as if its index in the schema was always the index it
// was remapped to. Implements the parquet.ColumnChunk interface.
type remappedColumnChunk struct {
	parquet.ColumnChunk
	remappedIndex int
}

// Column returns the column chunk's index in the schema. It returns the
// configured remapped index. Implements the parquet.ColumnChunk interface.
func ( *remappedColumnChunk) () int {
	return .remappedIndex
}

// Pages returns the column chunk's pages ensuring that all pages read will be
// remapped to the configured remapped index. Implements the
// parquet.ColumnChunk interface.
func ( *remappedColumnChunk) () parquet.Pages {
	return &remappedPages{
		Pages:         .ColumnChunk.Pages(),
		remappedIndex: .remappedIndex,
	}
}

// remappedPages is an iterator of the column chunk's pages. It ensures that
// all pages returned will appear to belong to the configured remapped column
// index. Implements the parquet.Pages interface.
type remappedPages struct {
	parquet.Pages
	remappedIndex int
}

// ReadPage reads the next page from the page iterator. It ensures that any
// page read from the underlying iterator will appear to belong to the
// configured remapped column index. Implements the parquet.Pages interface.
func ( *remappedPages) () (parquet.Page, error) {
	,  := .Pages.ReadPage()
	if  == io.EOF {
		return nil, 
	}
	if  != nil {
		return nil, fmt.Errorf("read page: %w", )
	}

	return &remappedPage{
		Page:          ,
		remappedIndex: .remappedIndex,
	}, nil
}

// remappedPage is a Page that wraps a Page and makes it appear as if its
// column index is the remapped index. Implements the parquet.Page interface.
type remappedPage struct {
	parquet.Page
	remappedIndex int
}

type releasable interface {
	Release()
}

var _ releasable = (*remappedPage)(nil)

// Column returns the page's column index in the schema. It returns the
// configured remapped index. Implements the parquet.Page interface.
func ( *remappedPage) () int {
	return .remappedIndex
}

// Values returns a parquet.ValueReader that ensures that all values read will
// be remapped to have the configured remapped index. Implements the
// parquet.Page interface.
func ( *remappedPage) () parquet.ValueReader {
	return &remappedValueReader{
		ValueReader:   .Page.Values(),
		remappedIndex: .remappedIndex,
	}
}

func ( *remappedPage) () {
	parquet.Release(.Page)
}

// Values returns the page's values. It ensures that all values read will be
// remapped to have the configured remapped index. Implements the
// parquet.ValueReader interface.
type remappedValueReader struct {
	parquet.ValueReader
	remappedIndex int
}

// ReadValues reads the next batch of values from the value reader. It ensures
// that any value read will be remapped to have the configured remapped index.
// Implements the parquet.ValueReader interface.
func ( *remappedValueReader) ( []parquet.Value) (int, error) {
	,  := .ValueReader.ReadValues()
	for  := 0;  < len([:]); ++ {
		[] = [].Level([].RepetitionLevel(), [].DefinitionLevel(), .remappedIndex)
	}

	if  == io.EOF {
		return , 
	}
	if  != nil {
		return , fmt.Errorf("read values: %w", )
	}

	return , nil
}

func ( *schemav2pb.Schema) ([]parquet.SortingColumn, error) {
	 := make([]parquet.SortingColumn, 0, len(.SortingColumns))
	for ,  := range .SortingColumns {
		var  SortingColumn
		switch .Direction {
		case schemav2pb.SortingColumn_DIRECTION_ASCENDING:
			 = Ascending(.Path)
		case schemav2pb.SortingColumn_DIRECTION_DESCENDING:
			 = Descending(.Path)
		default:
			return nil, fmt.Errorf("unknown sorting direction %q, only \"ascending\", \"descending\" are valid choices", .Direction)
		}
		if .NullsFirst {
			 = NullsFirst()
		}
		 = append(, )
	}

	return , nil
}