// Package ocf implements encoding and decoding of Avro Object Container Files as defined by the Avro specification. // // See the Avro specification for an understanding of Avro: http://avro.apache.org/docs/current/
package ocf import ( ) const ( schemaKey = "avro.schema" codecKey = "avro.codec" ) var ( magicBytes = [4]byte{'O', 'b', 'j', 1} // HeaderSchema is the Avro schema of a container file header. HeaderSchema = avro.MustParse(`{ "type": "record", "name": "org.apache.avro.file.Header", "fields": [ {"name": "magic", "type": {"type": "fixed", "name": "Magic", "size": 4}}, {"name": "meta", "type": {"type": "map", "values": "bytes"}}, {"name": "sync", "type": {"type": "fixed", "name": "Sync", "size": 16}} ] }`) // DefaultSchemaMarshaler calls the schema's String() method, to produce // a "canonical" schema. DefaultSchemaMarshaler = defaultMarshalSchema // FullSchemaMarshaler calls the schema's MarshalJSON() method, to produce // a schema with all details preserved. The "canonical" schema returned by // the default marshaler does not preserve a type's extra properties. FullSchemaMarshaler = fullMarshalSchema ) // Header represents an Avro container file header. type Header struct { Magic [4]byte `avro:"magic"` Meta map[string][]byte `avro:"meta"` Sync [16]byte `avro:"sync"` } type decoderConfig struct { DecoderConfig avro.API SchemaCache *avro.SchemaCache CodecOptions codecOptions } // DecoderFunc represents a configuration function for Decoder. type DecoderFunc func(cfg *decoderConfig) // WithDecoderConfig sets the value decoder config on the OCF decoder. func ( avro.API) DecoderFunc { return func( *decoderConfig) { .DecoderConfig = } } // WithDecoderSchemaCache sets the schema cache for the decoder. // If not specified, defaults to avro.DefaultSchemaCache. func ( *avro.SchemaCache) DecoderFunc { return func( *decoderConfig) { .SchemaCache = } } // WithZStandardDecoderOptions sets the options for the ZStandard decoder. func ( ...zstd.DOption) DecoderFunc { return func( *decoderConfig) { .CodecOptions.ZStandardOptions.DOptions = append(.CodecOptions.ZStandardOptions.DOptions, ...) } } // Decoder reads and decodes Avro values from a container file. type Decoder struct { reader *avro.Reader resetReader *bytesx.ResetReader decoder *avro.Decoder meta map[string][]byte sync [16]byte schema avro.Schema codec Codec count int64 } // NewDecoder returns a new decoder that reads from reader r. func ( io.Reader, ...DecoderFunc) (*Decoder, error) { := decoderConfig{ DecoderConfig: avro.DefaultConfig, SchemaCache: avro.DefaultSchemaCache, CodecOptions: codecOptions{ DeflateCompressionLevel: flate.DefaultCompression, }, } for , := range { (&) } := avro.NewReader(, 1024) , := readHeader(, .SchemaCache, .CodecOptions) if != nil { return nil, fmt.Errorf("decoder: %w", ) } := bytesx.NewResetReader([]byte{}) return &Decoder{ reader: , resetReader: , decoder: .DecoderConfig.NewDecoder(.Schema, ), meta: .Meta, sync: .Sync, codec: .Codec, schema: .Schema, }, nil } // Metadata returns the header metadata. func ( *Decoder) () map[string][]byte { return .meta } // Schema returns the schema that was parsed from the file's metadata // and that is used to interpret the file's contents. func ( *Decoder) () avro.Schema { return .schema } // HasNext determines if there is another value to read. func ( *Decoder) () bool { if .count <= 0 { := .readBlock() .count = } if .reader.Error != nil { return false } return .count > 0 } // Decode reads the next Avro encoded value from its input and stores it in the value pointed to by v. func ( *Decoder) ( any) error { if .count <= 0 { return errors.New("decoder: no data found, call HasNext first") } .count-- return .decoder.Decode() } // Error returns the last reader error. func ( *Decoder) () error { if errors.Is(.reader.Error, io.EOF) { return nil } return .reader.Error } func ( *Decoder) () int64 { _ = .reader.Peek() if errors.Is(.reader.Error, io.EOF) { // There is no next block return 0 } := .reader.ReadLong() := .reader.ReadLong() // Read the blocks data switch { case > 0: := make([]byte, ) .reader.Read() , := .codec.Decode() if != nil { .reader.Error = } .resetReader.Reset() case > 0: // Skip the block data when count is 0 := make([]byte, ) .reader.Read() } // Read the sync. var [16]byte .reader.Read([:]) if .sync != && !errors.Is(.reader.Error, io.EOF) { .reader.Error = errors.New("decoder: invalid block") } return } type encoderConfig struct { BlockLength int BlockSize int CodecName CodecName CodecOptions codecOptions Metadata map[string][]byte Sync [16]byte EncodingConfig avro.API SchemaCache *avro.SchemaCache SchemaMarshaler func(avro.Schema) ([]byte, error) } // EncoderFunc represents a configuration function for Encoder. type EncoderFunc func(cfg *encoderConfig) // WithBlockLength sets the block length on the encoder. func ( int) EncoderFunc { return func( *encoderConfig) { .BlockLength = } } // WithBlockSize sets the maximum uncompressed size of a buffered block before // it is written and flushed to the underlying io.Writer (after compression). func ( int) EncoderFunc { return func( *encoderConfig) { .BlockSize = } } // WithCodec sets the compression codec on the encoder. func ( CodecName) EncoderFunc { return func( *encoderConfig) { .CodecName = } } // WithCompressionLevel sets the compression codec to deflate and // the compression level on the encoder. func ( int) EncoderFunc { return func( *encoderConfig) { .CodecName = Deflate .CodecOptions.DeflateCompressionLevel = } } // WithZStandardEncoderOptions sets the options for the ZStandard encoder. func ( ...zstd.EOption) EncoderFunc { return func( *encoderConfig) { .CodecOptions.ZStandardOptions.EOptions = append(.CodecOptions.ZStandardOptions.EOptions, ...) } } // WithMetadata sets the metadata on the encoder header. func ( map[string][]byte) EncoderFunc { return func( *encoderConfig) { .Metadata = } } // WithMetadataKeyVal sets a single key-value pair for the metadata on // the encoder header. func ( string, []byte) EncoderFunc { return func( *encoderConfig) { .Metadata[] = } } // WithEncoderSchemaCache sets the schema cache for the encoder. // If not specified, defaults to avro.DefaultSchemaCache. func ( *avro.SchemaCache) EncoderFunc { return func( *encoderConfig) { .SchemaCache = } } // WithSchemaMarshaler sets the schema marshaler for the encoder. // If not specified, defaults to DefaultSchemaMarshaler. func ( func(avro.Schema) ([]byte, error)) EncoderFunc { return func( *encoderConfig) { .SchemaMarshaler = } } // WithSyncBlock sets the sync block. func ( [16]byte) EncoderFunc { return func( *encoderConfig) { .Sync = } } // WithEncodingConfig sets the value encoder config on the OCF encoder. func ( avro.API) EncoderFunc { return func( *encoderConfig) { .EncodingConfig = } } // Encoder writes Avro container file to an output stream. type Encoder struct { writer *avro.Writer buf *bytes.Buffer encoder *avro.Encoder sync [16]byte codec Codec blockLength int count int blockSize int } // NewEncoder returns a new encoder that writes to w using schema s. // // If the writer is an existing ocf file, it will append data using the // existing schema. func ( string, io.Writer, ...EncoderFunc) (*Encoder, error) { := computeEncoderConfig() , := avro.ParseWithCache(, "", .SchemaCache) if != nil { return nil, } return newEncoder(, , ) } // NewEncoderWithSchema returns a new encoder that writes to w using schema s. // // If the writer is an existing ocf file, it will append data using the // existing schema. func ( avro.Schema, io.Writer, ...EncoderFunc) (*Encoder, error) { return newEncoder(, , computeEncoderConfig()) } func newEncoder( avro.Schema, io.Writer, encoderConfig) (*Encoder, error) { switch file := .(type) { case nil: return nil, errors.New("writer cannot be nil") case *os.File: , := .Stat() if != nil { return nil, } if .Size() > 0 { := avro.NewReader(, 1024) , := readHeader(, .SchemaCache, .CodecOptions) if != nil { return nil, } if = skipToEnd(, .Sync); != nil { return nil, } := avro.NewWriter(, 512, avro.WithWriterConfig(.EncodingConfig)) := &bytes.Buffer{} := &Encoder{ writer: , buf: , encoder: .EncodingConfig.NewEncoder(.Schema, ), sync: .Sync, codec: .Codec, blockLength: .BlockLength, blockSize: .BlockSize, } return , nil } } , := .SchemaMarshaler() if != nil { return nil, } .Metadata[schemaKey] = .Metadata[codecKey] = []byte(.CodecName) := Header{ Magic: magicBytes, Meta: .Metadata, } .Sync = .Sync if .Sync == [16]byte{} { _, _ = rand.Read(.Sync[:]) } , := resolveCodec(.CodecName, .CodecOptions) if != nil { return nil, } := avro.NewWriter(, 512, avro.WithWriterConfig(.EncodingConfig)) .WriteVal(HeaderSchema, ) if = .Flush(); != nil { return nil, } := &bytes.Buffer{} := &Encoder{ writer: , buf: , encoder: .EncodingConfig.NewEncoder(, ), sync: .Sync, codec: , blockLength: .BlockLength, blockSize: .BlockSize, } return , nil } func computeEncoderConfig( []EncoderFunc) encoderConfig { := encoderConfig{ BlockLength: 100, CodecName: Null, CodecOptions: codecOptions{ DeflateCompressionLevel: flate.DefaultCompression, }, Metadata: map[string][]byte{}, EncodingConfig: avro.DefaultConfig, SchemaCache: avro.DefaultSchemaCache, SchemaMarshaler: DefaultSchemaMarshaler, } for , := range { (&) } return } func ( *Encoder) () bool { := .blockLength != 0 && .count >= .blockLength := .blockSize != 0 && .buf.Len() >= .blockSize return || } // Write v to the internal buffer. This method skips the internal encoder and // therefore the caller is responsible for encoding the bytes. No error will be // thrown if the bytes does not conform to the schema given to NewEncoder, but // the final ocf data will be corrupted. func ( *Encoder) ( []byte) ( int, error) { , = .buf.Write() if != nil { return , } .count++ if .shouldWriteDataBlock() { if = .writerBlock(); != nil { return , } } return , .writer.Error } // Encode writes the Avro encoding of v to the stream. func ( *Encoder) ( any) error { if := .encoder.Encode(); != nil { return } .count++ if .shouldWriteDataBlock() { if := .writerBlock(); != nil { return } } return .writer.Error } // Flush flushes the underlying writer. func ( *Encoder) () error { if .count == 0 { return nil } if := .writerBlock(); != nil { return } return .writer.Error } // Close closes the encoder, flushing the writer. func ( *Encoder) () error { return .Flush() } func ( *Encoder) () error { .writer.WriteLong(int64(.count)) := .codec.Encode(.buf.Bytes()) .writer.WriteLong(int64(len())) _, _ = .writer.Write() _, _ = .writer.Write(.sync[:]) .count = 0 .buf.Reset() return .writer.Flush() } type ocfHeader struct { Schema avro.Schema Codec Codec Meta map[string][]byte Sync [16]byte } func readHeader( *avro.Reader, *avro.SchemaCache, codecOptions) (*ocfHeader, error) { var Header .ReadVal(HeaderSchema, &) if .Error != nil { return nil, fmt.Errorf("unexpected error: %w", .Error) } if .Magic != magicBytes { return nil, errors.New("invalid avro file") } , := avro.ParseBytesWithCache(.Meta[schemaKey], "", ) if != nil { return nil, } , := resolveCodec(CodecName(.Meta[codecKey]), ) if != nil { return nil, } return &ocfHeader{ Schema: , Codec: , Meta: .Meta, Sync: .Sync, }, nil } func skipToEnd( *avro.Reader, [16]byte) error { for { _ = .ReadLong() if errors.Is(.Error, io.EOF) { return nil } := .ReadLong() .SkipNBytes(int()) if .Error != nil { return .Error } var [16]byte .Read([:]) if != && !errors.Is(.Error, io.EOF) { .Error = errors.New("invalid block") } } } func defaultMarshalSchema( avro.Schema) ([]byte, error) { return []byte(.String()), nil } func fullMarshalSchema( avro.Schema) ([]byte, error) { return json.Marshal() }