// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package parquet

import (
	
	

	format 
)

// Constants that will be used as the default values with encryption/decryption
const (
	// By default we'll use AesGCM as our encryption algorithm
	DefaultEncryptionAlgorithm       = AesGcm
	MaximalAadMetadataLength   int32 = 256
	// if encryption is turned on, we will default to also encrypting the footer
	DefaultEncryptedFooter = true
	DefaultCheckSignature  = true
	// by default if you set the file decryption properties, we will error
	// on any plaintext files unless otherwise specified.
	DefaultAllowPlaintextFiles       = false
	AadFileUniqueLength        int32 = 8
)

// ColumnPathToDecryptionPropsMap maps column paths to decryption properties
type ColumnPathToDecryptionPropsMap map[string]*ColumnDecryptionProperties

// ColumnPathToEncryptionPropsMap maps column paths to encryption properties
type ColumnPathToEncryptionPropsMap map[string]*ColumnEncryptionProperties

// ColumnEncryptionProperties specifies how to encrypt a given column
type ColumnEncryptionProperties struct {
	columnPath             string
	encrypted              bool
	encryptedWithFooterKey bool
	key                    string
	keyMetadata            string
	utilized               bool
}

// ColumnPath returns which column these properties are for
func ( *ColumnEncryptionProperties) () string {
	return .columnPath
}

// IsEncrypted returns true if this column is encrypted.
func ( *ColumnEncryptionProperties) () bool { return .encrypted }

// IsEncryptedWithFooterKey returns if this column was encrypted with the footer key itself, or false if a separate
// key was used for encrypting this column.
func ( *ColumnEncryptionProperties) () bool {
	return .encryptedWithFooterKey
}

// Key returns the key used for encrypting this column if it isn't encrypted by the footer key
func ( *ColumnEncryptionProperties) () string { return .key }

// KeyMetadata returns the key identifier which is used with a KeyRetriever to get the key for this column if it is not
// encrypted using the footer key
func ( *ColumnEncryptionProperties) () string { return .keyMetadata }

// WipeOutEncryptionKey Clears the encryption key, used after completion of file writing
func ( *ColumnEncryptionProperties) () { .key = "" }

// IsUtilized returns whether or not these properties have already been used, if the key is empty
// then this is always false
func ( *ColumnEncryptionProperties) () bool {
	if .key == "" {
		return false
	}
	return .utilized
}

// SetUtilized is used for marking it as utilized once it is used in FileEncryptionProperties
// as the encryption key will be wiped out on completion of writing
func ( *ColumnEncryptionProperties) () {
	.utilized = true
}

// Clone returns a instance of ColumnEncryptionProperties with the same key and metadata
func ( *ColumnEncryptionProperties) () *ColumnEncryptionProperties {
	 := .key
	return NewColumnEncryptionProperties(.columnPath, WithKey(), WithKeyMetadata(.keyMetadata))
}

type colEncryptConfig struct {
	key         string
	keyMetadata string
	encrypted   bool
}

// ColumnEncryptOption how to specify options to the NewColumnEncryptionProperties function.
type ColumnEncryptOption func(*colEncryptConfig)

// WithKey sets a column specific key.
// If key is not set on an encrypted column, the column will be encrypted with the footer key.
// key length must be either 16, 24, or 32 bytes
// the key is cloned and will be wiped out (array values set to 0) upon completion of file writing.
// Caller is responsible for wiping out input key array
func ( string) ColumnEncryptOption {
	return func( *colEncryptConfig) {
		if  != "" {
			.key = 
		}
	}
}

// WithKeyMetadata sets the key retrieval metadata, use either KeyMetadata or KeyID but not both
func ( string) ColumnEncryptOption {
	return func( *colEncryptConfig) {
		.keyMetadata = 
	}
}

// WithKeyID is a convenience function to set the key metadata using a string id.
// Set a key retrieval metadata (converted from String). and use either KeyMetadata or KeyID, not both.
// KeyID will be converted to metadata (UTF-8 Array)
func ( string) ColumnEncryptOption {
	if !utf8.ValidString() {
		panic("parquet: key id should be UTF8 encoded")
	}
	return WithKeyMetadata()
}

// NewColumnEncryptionProperties constructs properties for the provided column path, modified by the options provided
func ( string,  ...ColumnEncryptOption) *ColumnEncryptionProperties {
	var  colEncryptConfig
	.encrypted = true
	for ,  := range  {
		(&)
	}
	return &ColumnEncryptionProperties{
		utilized:               false,
		encrypted:              .encrypted,
		encryptedWithFooterKey: .encrypted && .key == "",
		keyMetadata:            .keyMetadata,
		key:                    .key,
		columnPath:             ,
	}
}

// ColumnDecryptionProperties are the specifications for how to decrypt a given column.
type ColumnDecryptionProperties struct {
	columnPath string
	key        string
	utilized   bool
}

// NewColumnDecryptionProperties constructs a new ColumnDecryptionProperties for the given column path, modified by
// the provided options
func ( string,  ...ColumnDecryptOption) *ColumnDecryptionProperties {
	var  columnDecryptConfig
	for ,  := range  {
		(&)
	}

	return &ColumnDecryptionProperties{
		columnPath: ,
		utilized:   false,
		key:        .key,
	}
}

// ColumnPath returns which column these properties describe how to decrypt
func ( *ColumnDecryptionProperties) () string { return .columnPath }

// Key returns the key specified to decrypt this column, or is empty if the Footer Key should be used.
func ( *ColumnDecryptionProperties) () string { return .key }

// IsUtilized returns whether or not these properties have been used for decryption already
func ( *ColumnDecryptionProperties) () bool { return .utilized }

// SetUtilized is used by the reader to specify when we've decrypted the column and have used the key so we know
// to wipe out the keys.
func ( *ColumnDecryptionProperties) () { .utilized = true }

// WipeOutDecryptionKey is called after decryption to ensure the key doesn't stick around and get re-used.
func ( *ColumnDecryptionProperties) () { .key = "" }

// Clone returns a new instance of ColumnDecryptionProperties with the same key and column
func ( *ColumnDecryptionProperties) () *ColumnDecryptionProperties {
	return NewColumnDecryptionProperties(.columnPath, WithDecryptKey(.key))
}

type columnDecryptConfig struct {
	key string
}

// ColumnDecryptOption is the type of the options passed for constructing Decryption Properties
type ColumnDecryptOption func(*columnDecryptConfig)

// WithDecryptKey specifies the key to utilize for decryption
func ( string) ColumnDecryptOption {
	return func( *columnDecryptConfig) {
		if  != "" {
			.key = 
		}
	}
}

// AADPrefixVerifier is an interface for any object that can be used to verify the identity of the file being decrypted.
// It should panic if the provided AAD identity is bad.
//
// In a data set, AAD Prefixes should be collected, and then checked for missing files.
type AADPrefixVerifier interface {
	// Verify identity of file. panic if bad
	Verify(string)
}

// DecryptionKeyRetriever is an interface for getting the desired key for decryption from metadata. It should take in
// some metadata identifier and return the actual Key to use for decryption.
type DecryptionKeyRetriever interface {
	GetKey(keyMetadata []byte) string
}

// FileDecryptionProperties define the File Level configuration for decrypting a parquet file. Once constructed they are
// read only.
type FileDecryptionProperties struct {
	footerKey                     string
	aadPrefix                     string
	checkPlaintextFooterIntegrity bool
	plaintextAllowed              bool
	utilized                      bool
	columnDecryptProps            ColumnPathToDecryptionPropsMap
	Verifier                      AADPrefixVerifier
	KeyRetriever                  DecryptionKeyRetriever
}

// NewFileDecryptionProperties takes in the options for constructing a new FileDecryptionProperties object, otherwise
// it will use the default configuration which will check footer integrity of a plaintext footer for an encrypted file
// for unencrypted parquet files, the decryption properties should not be set.
func ( ...FileDecryptionOption) *FileDecryptionProperties {
	var  fileDecryptConfig
	.checkFooterIntegrity = DefaultCheckSignature
	.plaintextAllowed = DefaultAllowPlaintextFiles
	for ,  := range  {
		(&)
	}
	return &FileDecryptionProperties{
		Verifier:                      .verifier,
		footerKey:                     .footerKey,
		checkPlaintextFooterIntegrity: .checkFooterIntegrity,
		KeyRetriever:                  .retriever,
		aadPrefix:                     .aadPrefix,
		columnDecryptProps:            .colDecrypt,
		plaintextAllowed:              .plaintextAllowed,
		utilized:                      false,
	}
}

// ColumnKey returns the key to be used for decrypting the provided column.
func ( *FileDecryptionProperties) ( string) string {
	if ,  := .columnDecryptProps[];  {
		if  != nil {
			return .Key()
		}
	}
	return ""
}

// FooterKey returns the key utilized for decrypting the Footer if encrypted and any columns that are encrypted with
// the footer key.
func ( *FileDecryptionProperties) () string { return .footerKey }

// AadPrefix returns the prefix to be supplied for constructing the identification strings when decrypting
func ( *FileDecryptionProperties) () string { return .aadPrefix }

// PlaintextFooterIntegrity returns whether or not an integrity check will be performed on a plaintext footer for an
// encrypted file.
func ( *FileDecryptionProperties) () bool {
	return .checkPlaintextFooterIntegrity
}

// PlaintextFilesAllowed returns whether or not this instance of decryption properties are allowed on a plaintext file.
func ( *FileDecryptionProperties) () bool { return .plaintextAllowed }

// SetUtilized is called to mark this instance as utilized once it is used to read a file. A single instance
// can be used for reading one file only. Setting this ensures the keys will be wiped out upon completion of file reading.
func ( *FileDecryptionProperties) () { .utilized = true }

// IsUtilized returns whether or not this instance has been used to decrypt a file. If the footer key and prefix are
// empty and there are no column decryption properties, then this is always false.
func ( *FileDecryptionProperties) () bool {
	if .footerKey == "" && len(.columnDecryptProps) == 0 && .aadPrefix == "" {
		return false
	}
	return .utilized
}

// WipeOutDecryptionKeys will clear all the keys for this instance including the column level ones, this will be called
// after this instance has been utilized.
func ( *FileDecryptionProperties) () {
	.footerKey = ""
	for ,  := range .columnDecryptProps {
		.WipeOutDecryptionKey()
	}
}

// Clone returns a new instance of these properties, changing the prefix if set (keeping the same prefix if left empty)
func ( *FileDecryptionProperties) ( string) *FileDecryptionProperties {
	 := .footerKey
	 := make(ColumnPathToDecryptionPropsMap)
	for ,  := range .columnDecryptProps {
		[] = .Clone()
	}
	if  == "" {
		 = .aadPrefix
	}
	return &FileDecryptionProperties{
		footerKey:                     ,
		KeyRetriever:                  .KeyRetriever,
		checkPlaintextFooterIntegrity: .checkPlaintextFooterIntegrity,
		Verifier:                      .Verifier,
		columnDecryptProps:            ,
		aadPrefix:                     ,
		plaintextAllowed:              .plaintextAllowed,
		utilized:                      false,
	}
}

type fileDecryptConfig struct {
	footerKey            string
	aadPrefix            string
	verifier             AADPrefixVerifier
	colDecrypt           ColumnPathToDecryptionPropsMap
	retriever            DecryptionKeyRetriever
	checkFooterIntegrity bool
	plaintextAllowed     bool
}

// FileDecryptionOption is how to supply options to constructing a new FileDecryptionProperties instance.
type FileDecryptionOption func(*fileDecryptConfig)

// WithFooterKey sets an explicit footer key. If Applied on a file that contains footer key
// metadata the metadata will be ignored, the footer will be decrypted/verified with this key.
//
// If the explicit key is not set, footer key will be fetched from the key retriever.
// With explicit keys or AAD prefix, new encryption properties object must be created for each
// encrypted file.
//
// Explicit encryption keys (footer and column) are cloned.
// Upon completion of file reading, the cloned encryption keys in the properties will be wiped out
// Caller is responsible for wiping out the input key array
// footer key length must be either 16, 24, or 32 bytes
func ( string) FileDecryptionOption {
	return func( *fileDecryptConfig) {
		if  != "" {
			.footerKey = 
		}
	}
}

// WithPrefixVerifier supplies a verifier object to use for verifying the AAD Prefixes stored in the file.
func ( AADPrefixVerifier) FileDecryptionOption {
	return func( *fileDecryptConfig) {
		if  != nil {
			.verifier = 
		}
	}
}

// WithColumnKeys sets explicit column keys.
//
// It's also possible to set a key retriever on this property object.
//
// Upon file decryption, availability of explicit keys is checked before invocation
// of the retriever callback.
//
// If an explicit key is available for a footer or a column, its key metadata will be ignored.
func ( ColumnPathToDecryptionPropsMap) FileDecryptionOption {
	return func( *fileDecryptConfig) {
		if len() == 0 {
			return
		}
		if len(.colDecrypt) != 0 {
			panic("column properties already set")
		}
		for ,  := range  {
			if .IsUtilized() {
				panic("parquet: column properties utilized in another file")
			}
			.SetUtilized()
		}
		.colDecrypt = 
	}
}

// WithKeyRetriever sets a key retriever callback. It's also possible to set explicit footer or column keys.
func ( DecryptionKeyRetriever) FileDecryptionOption {
	return func( *fileDecryptConfig) {
		if  != nil {
			.retriever = 
		}
	}
}

// DisableFooterSignatureVerification skips integrity verification of plaintext footers.
//
// If not called, integrity of plaintext footers will be checked in runtime, and will panic
// if the footer signing key is not available
// or if the footer content and signature don't match
func () FileDecryptionOption {
	return func( *fileDecryptConfig) {
		.checkFooterIntegrity = false
	}
}

// WithPlaintextAllowed sets allowing plaintext files.
//
// By default, reading plaintext (unencrypted) files is not allowed when using
// a decryptor.
//
// In order to detect files that were not encrypted by mistake.
// However the default behavior can be overridden by using this method.
func () FileDecryptionOption {
	return func( *fileDecryptConfig) {
		.plaintextAllowed = true
	}
}

// WithDecryptAadPrefix explicitly supplies the file aad prefix.
//
// A must when a prefix is used for file encryption, but not stored in the file.
func ( string) FileDecryptionOption {
	return func( *fileDecryptConfig) {
		if  != "" {
			.aadPrefix = 
		}
	}
}

// Algorithm describes how something was encrypted, representing the EncryptionAlgorithm object from the
// parquet.thrift file.
type Algorithm struct {
	Algo Cipher
	Aad  struct {
		AadPrefix       []byte
		AadFileUnique   []byte
		SupplyAadPrefix bool
	}
}

// ToThrift returns an instance to be used for serializing when writing a file.
func ( Algorithm) () *format.EncryptionAlgorithm {
	if .Algo == AesGcm {
		return &format.EncryptionAlgorithm{
			AES_GCM_V1: &format.AesGcmV1{
				AadPrefix:       .Aad.AadPrefix,
				AadFileUnique:   .Aad.AadFileUnique,
				SupplyAadPrefix: &.Aad.SupplyAadPrefix,
			},
		}
	}
	return &format.EncryptionAlgorithm{
		AES_GCM_CTR_V1: &format.AesGcmCtrV1{
			AadPrefix:       .Aad.AadPrefix,
			AadFileUnique:   .Aad.AadFileUnique,
			SupplyAadPrefix: &.Aad.SupplyAadPrefix,
		},
	}
}

// AlgorithmFromThrift converts the thrift object to the Algorithm struct for easier usage.
func ( *format.EncryptionAlgorithm) ( Algorithm) {
	if .IsSetAES_GCM_V1() {
		.Algo = AesGcm
		.Aad.AadFileUnique = .AES_GCM_V1.AadFileUnique
		.Aad.AadPrefix = .AES_GCM_V1.AadPrefix
		.Aad.SupplyAadPrefix = *.AES_GCM_V1.SupplyAadPrefix
		return
	}
	.Algo = AesCtr
	.Aad.AadFileUnique = .AES_GCM_CTR_V1.AadFileUnique
	.Aad.AadPrefix = .AES_GCM_CTR_V1.AadPrefix
	.Aad.SupplyAadPrefix = *.AES_GCM_CTR_V1.SupplyAadPrefix
	return
}

// FileEncryptionProperties describe how to encrypt a parquet file when writing data.
type FileEncryptionProperties struct {
	alg                  Algorithm
	footerKey            string
	footerKeyMetadata    string
	encryptedFooter      bool
	fileAad              string
	utilized             bool
	storeAadPrefixInFile bool
	aadPrefix            string
	encryptedCols        ColumnPathToEncryptionPropsMap
}

// EncryptedFooter returns if the footer for this file should be encrypted or left in plaintext.
func ( *FileEncryptionProperties) () bool { return .encryptedFooter }

// Algorithm returns the description of how we will perform the encryption, the algorithm, prefixes, and so on.
func ( *FileEncryptionProperties) () Algorithm { return .alg }

// FooterKey returns the actual key used to encrypt the footer if it is encrypted, or to encrypt any columns which
// will be encrypted with it rather than their own keys.
func ( *FileEncryptionProperties) () string { return .footerKey }

// FooterKeyMetadata is used for retrieving a key from the key retriever in order to set the footer key
func ( *FileEncryptionProperties) () string { return .footerKeyMetadata }

// FileAad returns the aad identification to be used at the file level which gets concatenated with the row and column
// information for encrypting data.
func ( *FileEncryptionProperties) () string { return .fileAad }

// IsUtilized returns whether or not this instance has been used to encrypt a file
func ( *FileEncryptionProperties) () bool { return .utilized }

// SetUtilized is called after writing a file. A FileEncryptionProperties object can be used for writing one file only,
// the encryption keys will be wiped out upon completion of writing the file.
func ( *FileEncryptionProperties) () { .utilized = true }

// EncryptedColumns returns the mapping of column paths to column encryption properties
func ( *FileEncryptionProperties) () ColumnPathToEncryptionPropsMap {
	return .encryptedCols
}

// ColumnEncryptionProperties returns the properties for encrypting a given column.
//
// This may be nil for columns that aren't encrypted or may be default properties.
func ( *FileEncryptionProperties) ( string) *ColumnEncryptionProperties {
	if len(.encryptedCols) == 0 {
		return NewColumnEncryptionProperties()
	}
	if ,  := .encryptedCols[];  {
		return 
	}
	return nil
}

// Clone allows returning an identical property setup for another file with the option to update the aadPrefix,
// (if given the empty string, the current aad prefix will be used) since a single instance can only be used
// to encrypt one file before wiping out the keys.
func ( *FileEncryptionProperties) ( string) *FileEncryptionProperties {
	 := .footerKey
	 := make(ColumnPathToEncryptionPropsMap)
	for ,  := range .encryptedCols {
		[] = .Clone()
	}
	if  == "" {
		 = .aadPrefix
	}

	 := []EncryptOption{
		WithAlg(.alg.Algo), WithFooterKeyMetadata(.footerKeyMetadata),
		WithAadPrefix(), WithEncryptedColumns(),
	}
	if !.encryptedFooter {
		 = append(, WithPlaintextFooter())
	}
	if !.storeAadPrefixInFile {
		 = append(, DisableAadPrefixStorage())
	}
	return NewFileEncryptionProperties(, ...)
}

// WipeOutEncryptionKeys clears all of the encryption keys for this and the columns
func ( *FileEncryptionProperties) () {
	.footerKey = ""
	for ,  := range .encryptedCols {
		.WipeOutEncryptionKey()
	}
}

type configEncrypt struct {
	cipher               Cipher
	encryptFooter        bool
	keyMetadata          string
	aadprefix            string
	storeAadPrefixInFile bool
	encryptedCols        ColumnPathToEncryptionPropsMap
}

// EncryptOption is used for specifying values when building FileEncryptionProperties
type EncryptOption func(*configEncrypt)

// WithPlaintextFooter sets the writer to write the footer in plain text, otherwise the footer will be encrypted
// too (which is the default behavior).
func () EncryptOption {
	return func( *configEncrypt) {
		.encryptFooter = false
	}
}

// WithAlg sets the encryption algorithm to utilize. (default is AesGcm)
func ( Cipher) EncryptOption {
	return func( *configEncrypt) {
		.cipher = 
	}
}

// WithFooterKeyID sets a key retrieval metadata to use (converted from string), this must be a utf8 string.
//
// use either WithFooterKeyID or WithFooterKeyMetadata, not both.
func ( string) EncryptOption {
	if !utf8.ValidString() {
		panic("parquet: footer key id should be UTF8 encoded")
	}
	return WithFooterKeyMetadata()
}

// WithFooterKeyMetadata sets a key retrieval metadata to use for getting the key.
//
// Use either WithFooterKeyID or WithFooterKeyMetadata, not both.
func ( string) EncryptOption {
	return func( *configEncrypt) {
		if  != "" {
			.keyMetadata = 
		}
	}
}

// WithAadPrefix sets the AAD prefix to use for encryption and by default will store it in the file
func ( string) EncryptOption {
	return func( *configEncrypt) {
		if  != "" {
			.aadprefix = 
			.storeAadPrefixInFile = true
		}
	}
}

// DisableAadPrefixStorage will set the properties to not store the AadPrefix in the file. If this isn't called
// and the AadPrefix is set, then it will be stored. This needs to in the options *after* WithAadPrefix to have an effect.
func () EncryptOption {
	return func( *configEncrypt) {
		.storeAadPrefixInFile = false
	}
}

// WithEncryptedColumns sets the map of columns and their properties (keys etc.) If not called, then all columns will
// be encrypted with the footer key. If called, then columns not in the map will be left unencrypted.
func ( ColumnPathToEncryptionPropsMap) EncryptOption {
	 := func(*configEncrypt) {}
	if len() == 0 {
		return 
	}
	return func( *configEncrypt) {
		if len(.encryptedCols) != 0 {
			panic("column properties already set")
		}
		for ,  := range  {
			if .IsUtilized() {
				panic("column properties utilized in another file")
			}
			.SetUtilized()
		}
		.encryptedCols = 
	}
}

// NewFileEncryptionProperties returns a new File Encryption description object using the options provided.
func ( string,  ...EncryptOption) *FileEncryptionProperties {
	var  configEncrypt
	.cipher = DefaultEncryptionAlgorithm
	.encryptFooter = DefaultEncryptedFooter
	for ,  := range  {
		(&)
	}

	 := &FileEncryptionProperties{
		footerKey:            ,
		footerKeyMetadata:    .keyMetadata,
		encryptedFooter:      .encryptFooter,
		aadPrefix:            .aadprefix,
		storeAadPrefixInFile: .storeAadPrefixInFile,
		encryptedCols:        .encryptedCols,
		utilized:             false,
	}

	 := [AadFileUniqueLength]uint8{}
	,  := rand.Read([:])
	if  != nil {
		panic()
	}

	 := false
	if .aadPrefix == "" {
		.fileAad = string([:])
	} else {
		.fileAad = .aadPrefix + string([:])
		if !.storeAadPrefixInFile {
			 = true
		}
	}
	.alg.Algo = .cipher
	.alg.Aad.AadFileUnique = [:]
	.alg.Aad.SupplyAadPrefix = 
	if .aadprefix != "" && .storeAadPrefixInFile {
		.alg.Aad.AadPrefix = []byte(.aadPrefix)
	}
	return 
}