package parquet

import (
	
)

// MultiRowGroup wraps multiple row groups to appear as if it was a single
// RowGroup. RowGroups must have the same schema or it will error.
func ( ...RowGroup) RowGroup {
	return newMultiRowGroup(ReadModeSync, ...)
}

func newMultiRowGroup( ReadMode,  ...RowGroup) RowGroup {
	if len() == 0 {
		return &emptyRowGroup{}
	}
	if len() == 1 {
		return [0]
	}

	,  := compatibleSchemaOf()
	if  != nil {
		panic()
	}

	 := make([]RowGroup, len())
	copy(, )

	 := &multiRowGroup{
		pageReadMode: ,
	}
	.init(, )
	return 
}

func ( *multiRowGroup) ( *Schema,  []RowGroup) error {
	 := make([]multiColumnChunk, len(.Columns()))

	 := make([][]ColumnChunk, len())
	for ,  := range  {
		[] = .ColumnChunks()
	}

	for  := range  {
		[].rowGroup = 
		[].column = 
		[].chunks = make([]ColumnChunk, len())

		for ,  := range  {
			[].chunks[] = []
		}
	}

	.schema = 
	.rowGroups = 
	.columns = make([]ColumnChunk, len())

	for  := range  {
		.columns[] = &[]
	}

	return nil
}

func compatibleSchemaOf( []RowGroup) (*Schema, error) {
	 := [0].Schema()

	// Fast path: Many times all row groups have the exact same schema so a
	// pointer comparison is cheaper.
	 := true
	for ,  := range [1:] {
		if .Schema() !=  {
			 = false
			break
		}
	}
	if  {
		return , nil
	}

	// Slow path: The schema pointers are not the same, but they still have to
	// be compatible.
	for ,  := range [1:] {
		if !nodesAreEqual(, .Schema()) {
			return nil, ErrRowGroupSchemaMismatch
		}
	}

	return , nil
}

type multiRowGroup struct {
	schema       *Schema
	rowGroups    []RowGroup
	columns      []ColumnChunk
	pageReadMode ReadMode
}

func ( *multiRowGroup) () ( int64) {
	for ,  := range .rowGroups {
		 += .NumRows()
	}
	return 
}

func ( *multiRowGroup) () []ColumnChunk { return .columns }

func ( *multiRowGroup) () []SortingColumn { return nil }

func ( *multiRowGroup) () *Schema { return .schema }

func ( *multiRowGroup) () Rows { return newRowGroupRows(, .pageReadMode) }

type multiColumnChunk struct {
	rowGroup *multiRowGroup
	column   int
	chunks   []ColumnChunk
}

func ( *multiColumnChunk) () Type {
	if len(.chunks) != 0 {
		return .chunks[0].Type() // all chunks should be of the same type
	}
	return nil
}

func ( *multiColumnChunk) () int64 {
	 := int64(0)
	for  := range .chunks {
		 += .chunks[].NumValues()
	}
	return 
}

func ( *multiColumnChunk) () int {
	return .column
}

func ( *multiColumnChunk) () Pages {
	return &multiPages{column: }
}

func ( *multiColumnChunk) () (ColumnIndex, error) {
	// TODO: implement
	return nil, nil
}

func ( *multiColumnChunk) () (OffsetIndex, error) {
	// TODO: implement
	return nil, nil
}

func ( *multiColumnChunk) () BloomFilter {
	return multiBloomFilter{}
}

type multiBloomFilter struct{ *multiColumnChunk }

func ( multiBloomFilter) ( []byte,  int64) (int, error) {
	// TODO: add a test for this function
	 := 0

	for  < len(.chunks) {
		if  := .chunks[].BloomFilter();  != nil {
			 := .Size()
			if  <  {
				break
			}
			 -= 
		}
		++
	}

	if  == len(.chunks) {
		return 0, io.EOF
	}

	 := int(0)
	for len() > 0 {
		if  := .chunks[].BloomFilter();  != nil {
			,  := .ReadAt(, )
			 += 
			if  != nil {
				return , 
			}
			if  = [:]; len() == 0 {
				return , nil
			}
			 += int64()
		}
		++
	}

	if  == len(.chunks) {
		return , io.EOF
	}
	return , nil
}

func ( multiBloomFilter) () int64 {
	 := int64(0)
	for ,  := range .chunks {
		if  := .BloomFilter();  != nil {
			 += .Size()
		}
	}
	return 
}

func ( multiBloomFilter) ( Value) (bool, error) {
	for ,  := range .chunks {
		if  := .BloomFilter();  != nil {
			if ,  := .Check();  ||  != nil {
				return , 
			}
		}
	}
	return false, nil
}

type multiPages struct {
	pages  Pages
	index  int
	column *multiColumnChunk
}

func ( *multiPages) () (Page, error) {
	for {
		if .pages != nil {
			,  := .pages.ReadPage()
			if  == nil ||  != io.EOF {
				return , 
			}
			if  := .pages.Close();  != nil {
				return nil, 
			}
			.pages = nil
		}

		if .column == nil || .index == len(.column.chunks) {
			return nil, io.EOF
		}

		.pages = .column.chunks[.index].Pages()
		.index++
	}
}

func ( *multiPages) ( int64) error {
	if .column == nil {
		return io.ErrClosedPipe
	}

	if .pages != nil {
		if  := .pages.Close();  != nil {
			return 
		}
	}

	 := .column.rowGroup.rowGroups
	 := int64(0)
	.pages = nil
	.index = 0

	for .index < len() {
		 = [.index].NumRows()
		if  <  {
			break
		}
		 -= 
		.index++
	}

	if .index < len() {
		.pages = .column.chunks[.index].Pages()
		.index++
		return .pages.SeekToRow()
	}
	return nil
}

func ( *multiPages) () ( error) {
	if .pages != nil {
		 = .pages.Close()
	}
	.pages = nil
	.index = 0
	.column = nil
	return 
}