package avro
import (
"bytes"
"crypto/md5"
"crypto/sha256"
"errors"
"fmt"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"github.com/hamba/avro/v2/pkg/crc64"
jsoniter "github.com/json-iterator/go"
)
var jsoniterAPI = jsoniter .Config {
EscapeHTML : true ,
SortMapKeys : true ,
}.Froze ()
type nullDefaultType struct {}
func (nullDefaultType ) MarshalJSON () ([]byte , error ) {
return []byte ("null" ), nil
}
var nullDefault nullDefaultType = struct {}{}
var (
recordReserved = []string {"type" , "name" , "namespace" , "doc" , "aliases" , "fields" }
fieldReserved = []string {"name" , "doc" , "type" , "order" , "aliases" , "default" }
enumReserved = []string {"type" , "name" , "namespace" , "aliases" , "doc" , "symbols" , "default" }
arrayReserved = []string {"type" , "items" }
mapReserved = []string {"type" , "values" }
fixedReserved = []string {"type" , "name" , "namespace" , "aliases" , "size" }
fixedWithLogicalTypeReserved = []string {"type" , "name" , "namespace" , "aliases" , "size" , "logicalType" }
fixedWithDecimalTypeReserved = []string {
"type" , "name" , "namespace" , "aliases" , "size" , "logicalType" , "precision" , "scale" ,
}
primitiveReserved = []string {"type" }
primitiveWithLogicalTypeReserved = []string {"type" , "logicalType" }
primitiveWithDecimalTypeReserved = []string {"type" , "logicalType" , "precision" , "scale" }
)
type Type string
const (
Record Type = "record"
Error Type = "error"
Ref Type = "<ref>"
Enum Type = "enum"
Array Type = "array"
Map Type = "map"
Union Type = "union"
Fixed Type = "fixed"
String Type = "string"
Bytes Type = "bytes"
Int Type = "int"
Long Type = "long"
Float Type = "float"
Double Type = "double"
Boolean Type = "boolean"
Null Type = "null"
)
type Order string
const (
Asc Order = "ascending"
Desc Order = "descending"
Ignore Order = "ignore"
)
type LogicalType string
const (
Decimal LogicalType = "decimal"
UUID LogicalType = "uuid"
Date LogicalType = "date"
TimeMillis LogicalType = "time-millis"
TimeMicros LogicalType = "time-micros"
TimestampMillis LogicalType = "timestamp-millis"
TimestampMicros LogicalType = "timestamp-micros"
LocalTimestampMillis LogicalType = "local-timestamp-millis"
LocalTimestampMicros LogicalType = "local-timestamp-micros"
Duration LogicalType = "duration"
)
type Action string
const (
FieldIgnore Action = "ignore"
FieldSetDefault Action = "set_default"
)
type FingerprintType string
const (
CRC64Avro FingerprintType = "CRC64-AVRO"
CRC64AvroLE FingerprintType = "CRC64-AVRO-LE"
MD5 FingerprintType = "MD5"
SHA256 FingerprintType = "SHA256"
)
type SchemaCache struct {
cache sync .Map
}
func (c *SchemaCache ) Add (name string , schema Schema ) {
c .cache .Store (name , schema )
}
func (c *SchemaCache ) Get (name string ) Schema {
if v , ok := c .cache .Load (name ); ok {
return v .(Schema )
}
return nil
}
func (c *SchemaCache ) AddAll (cache *SchemaCache ) {
if cache == nil {
return
}
cache .cache .Range (func (key , value interface {}) bool {
c .cache .Store (key , value )
return true
})
}
type Schemas []Schema
func (s Schemas ) Get (name string ) (Schema , int ) {
for i , schema := range s {
if schemaTypeName (schema ) == name {
return schema , i
}
}
return nil , -1
}
type Schema interface {
Type () Type
String () string
Fingerprint () [32 ]byte
FingerprintUsing (FingerprintType ) ([]byte , error )
CacheFingerprint () [32 ]byte
}
type LogicalSchema interface {
Type () LogicalType
String () string
}
type PropertySchema interface {
Prop (string ) any
}
type NamedSchema interface {
Schema
PropertySchema
Name () string
Namespace () string
FullName () string
Aliases () []string
}
type LogicalTypeSchema interface {
Logical () LogicalSchema
}
type name struct {
name string
namespace string
full string
aliases []string
}
func newName(n , ns string , aliases []string ) (name , error ) {
if idx := strings .LastIndexByte (n , '.' ); idx > -1 {
ns = n [:idx ]
n = n [idx +1 :]
}
full := n
if ns != "" {
full = ns + "." + n
}
for _ , part := range strings .Split (full , "." ) {
if err := validateName (part ); err != nil {
return name {}, fmt .Errorf ("avro: invalid name part %q in name %q: %w" , full , part , err )
}
}
a := make ([]string , 0 , len (aliases ))
for _ , alias := range aliases {
if !strings .Contains (alias , "." ) {
if err := validateName (alias ); err != nil {
return name {}, fmt .Errorf ("avro: invalid name %q: %w" , alias , err )
}
if ns == "" {
a = append (a , alias )
continue
}
a = append (a , ns +"." +alias )
continue
}
for _ , part := range strings .Split (alias , "." ) {
if err := validateName (part ); err != nil {
return name {}, fmt .Errorf ("avro: invalid name part %q in name %q: %w" , full , part , err )
}
}
a = append (a , alias )
}
return name {
name : n ,
namespace : ns ,
full : full ,
aliases : a ,
}, nil
}
func (n name ) Name () string {
return n .name
}
func (n name ) Namespace () string {
return n .namespace
}
func (n name ) FullName () string {
return n .full
}
func (n name ) Aliases () []string {
return n .aliases
}
type fingerprinter struct {
fingerprint atomic .Value
cache sync .Map
}
func (f *fingerprinter ) Fingerprint (stringer fmt .Stringer ) [32 ]byte {
if v := f .fingerprint .Load (); v != nil {
return v .([32 ]byte )
}
fingerprint := sha256 .Sum256 ([]byte (stringer .String ()))
f .fingerprint .Store (fingerprint )
return fingerprint
}
func (f *fingerprinter ) FingerprintUsing (typ FingerprintType , stringer fmt .Stringer ) ([]byte , error ) {
if v , ok := f .cache .Load (typ ); ok {
return v .([]byte ), nil
}
data := []byte (stringer .String ())
var fingerprint []byte
switch typ {
case CRC64Avro :
h := crc64 .Sum (data )
fingerprint = h [:]
case CRC64AvroLE :
h := crc64 .SumWithByteOrder (data , crc64 .LittleEndian )
fingerprint = h [:]
case MD5 :
h := md5 .Sum (data )
fingerprint = h [:]
case SHA256 :
h := sha256 .Sum256 (data )
fingerprint = h [:]
default :
return nil , fmt .Errorf ("avro: unknown fingerprint algorithm %s" , typ )
}
f .cache .Store (typ , fingerprint )
return fingerprint , nil
}
type cacheFingerprinter struct {
writerFingerprint *[32 ]byte
cache atomic .Value
}
func (i *cacheFingerprinter ) CacheFingerprint (schema Schema , fn func () []byte ) [32 ]byte {
if v := i .cache .Load (); v != nil {
return v .([32 ]byte )
}
if i .writerFingerprint == nil {
fp := schema .Fingerprint ()
i .cache .Store (fp )
return fp
}
fp := schema .Fingerprint ()
d := append ([]byte {}, fp [:]...)
d = append (d , (*i .writerFingerprint )[:]...)
if fn != nil {
d = append (d , fn ()...)
}
ident := sha256 .Sum256 (d )
i .cache .Store (ident )
return ident
}
type properties struct {
props map [string ]any
}
func newProperties(props map [string ]any , res []string ) properties {
p := properties {props : map [string ]any {}}
for k , v := range props {
if isReserved (res , k ) {
continue
}
p .props [k ] = v
}
return p
}
func isReserved(res []string , k string ) bool {
for _ , r := range res {
if k == r {
return true
}
}
return false
}
func (p properties ) Prop (name string ) any {
if p .props == nil {
return nil
}
return p .props [name ]
}
func (p properties ) Props () map [string ]any {
return p .props
}
func (p properties ) marshalPropertiesToJSON (buf *bytes .Buffer ) error {
sortedPropertyKeys := make ([]string , 0 , len (p .props ))
for k := range p .props {
sortedPropertyKeys = append (sortedPropertyKeys , k )
}
sort .Strings (sortedPropertyKeys )
for _ , k := range sortedPropertyKeys {
vv , err := jsoniterAPI .Marshal (p .props [k ])
if err != nil {
return err
}
kk , err := jsoniterAPI .Marshal (k )
if err != nil {
return err
}
buf .WriteString (`,` )
buf .Write (kk )
buf .WriteString (`:` )
buf .Write (vv )
}
return nil
}
type schemaConfig struct {
aliases []string
doc string
def any
order Order
props map [string ]any
wfp *[32 ]byte
}
type SchemaOption func (*schemaConfig )
func WithAliases (aliases []string ) SchemaOption {
return func (opts *schemaConfig ) {
opts .aliases = aliases
}
}
func WithDoc (doc string ) SchemaOption {
return func (opts *schemaConfig ) {
opts .doc = doc
}
}
func WithDefault (def any ) SchemaOption {
return func (opts *schemaConfig ) {
opts .def = def
}
}
func WithOrder (order Order ) SchemaOption {
return func (opts *schemaConfig ) {
opts .order = order
}
}
func WithProps (props map [string ]any ) SchemaOption {
return func (opts *schemaConfig ) {
opts .props = props
}
}
func withWriterFingerprint(fp [32 ]byte ) SchemaOption {
return func (opts *schemaConfig ) {
opts .wfp = &fp
}
}
func withWriterFingerprintIfResolved(fp [32 ]byte , resolved bool ) SchemaOption {
return func (opts *schemaConfig ) {
if resolved {
opts .wfp = &fp
}
}
}
type PrimitiveSchema struct {
properties
fingerprinter
cacheFingerprinter
typ Type
logical LogicalSchema
encodedType Type
}
func NewPrimitiveSchema (t Type , l LogicalSchema , opts ...SchemaOption ) *PrimitiveSchema {
var cfg schemaConfig
for _ , opt := range opts {
opt (&cfg )
}
reservedProps := primitiveReserved
if l != nil {
if l .Type () == Decimal {
reservedProps = primitiveWithDecimalTypeReserved
} else {
reservedProps = primitiveWithLogicalTypeReserved
}
}
return &PrimitiveSchema {
properties : newProperties (cfg .props , reservedProps ),
cacheFingerprinter : cacheFingerprinter {writerFingerprint : cfg .wfp },
typ : t ,
logical : l ,
}
}
func (s *PrimitiveSchema ) Type () Type {
return s .typ
}
func (s *PrimitiveSchema ) Logical () LogicalSchema {
return s .logical
}
func (s *PrimitiveSchema ) String () string {
if s .logical == nil {
return `"` + string (s .typ ) + `"`
}
return `{"type":"` + string (s .typ ) + `",` + s .logical .String () + `}`
}
func (s *PrimitiveSchema ) MarshalJSON () ([]byte , error ) {
if s .logical == nil && len (s .props ) == 0 {
return jsoniterAPI .Marshal (s .typ )
}
buf := new (bytes .Buffer )
buf .WriteString (`{"type":"` + string (s .typ ) + `"` )
if s .logical != nil {
buf .WriteString (`,"logicalType":"` + string (s .logical .Type ()) + `"` )
if d , ok := s .logical .(*DecimalLogicalSchema ); ok {
buf .WriteString (`,"precision":` + strconv .Itoa (d .prec ))
if d .scale > 0 {
buf .WriteString (`,"scale":` + strconv .Itoa (d .scale ))
}
}
}
if err := s .marshalPropertiesToJSON (buf ); err != nil {
return nil , err
}
buf .WriteString ("}" )
return buf .Bytes (), nil
}
func (s *PrimitiveSchema ) Fingerprint () [32 ]byte {
return s .fingerprinter .Fingerprint (s )
}
func (s *PrimitiveSchema ) FingerprintUsing (typ FingerprintType ) ([]byte , error ) {
return s .fingerprinter .FingerprintUsing (typ , s )
}
func (s *PrimitiveSchema ) CacheFingerprint () [32 ]byte {
return s .cacheFingerprinter .CacheFingerprint (s , nil )
}
type RecordSchema struct {
name
properties
fingerprinter
cacheFingerprinter
isError bool
fields []*Field
doc string
}
func NewRecordSchema (name , namespace string , fields []*Field , opts ...SchemaOption ) (*RecordSchema , error ) {
var cfg schemaConfig
for _ , opt := range opts {
opt (&cfg )
}
n , err := newName (name , namespace , cfg .aliases )
if err != nil {
return nil , err
}
return &RecordSchema {
name : n ,
properties : newProperties (cfg .props , recordReserved ),
cacheFingerprinter : cacheFingerprinter {writerFingerprint : cfg .wfp },
fields : fields ,
doc : cfg .doc ,
}, nil
}
func NewErrorRecordSchema (name , namespace string , fields []*Field , opts ...SchemaOption ) (*RecordSchema , error ) {
rec , err := NewRecordSchema (name , namespace , fields , opts ...)
if err != nil {
return nil , err
}
rec .isError = true
return rec , nil
}
func (s *RecordSchema ) Type () Type {
return Record
}
func (s *RecordSchema ) Doc () string {
return s .doc
}
func (s *RecordSchema ) IsError () bool {
return s .isError
}
func (s *RecordSchema ) Fields () []*Field {
return s .fields
}
func (s *RecordSchema ) String () string {
typ := "record"
if s .isError {
typ = "error"
}
fields := ""
for _ , f := range s .fields {
fields += f .String () + ","
}
if len (fields ) > 0 {
fields = fields [:len (fields )-1 ]
}
return `{"name":"` + s .FullName () + `","type":"` + typ + `","fields":[` + fields + `]}`
}
func (s *RecordSchema ) MarshalJSON () ([]byte , error ) {
buf := new (bytes .Buffer )
buf .WriteString (`{"name":"` + s .full + `"` )
if len (s .aliases ) > 0 {
aliasesJSON , err := jsoniterAPI .Marshal (s .aliases )
if err != nil {
return nil , err
}
buf .WriteString (`,"aliases":` )
buf .Write (aliasesJSON )
}
if s .doc != "" {
docJSON , err := jsoniterAPI .Marshal (s .doc )
if err != nil {
return nil , err
}
buf .WriteString (`,"doc":` )
buf .Write (docJSON )
}
if s .isError {
buf .WriteString (`,"type":"error"` )
} else {
buf .WriteString (`,"type":"record"` )
}
fieldsJSON , err := jsoniterAPI .Marshal (s .fields )
if err != nil {
return nil , err
}
buf .WriteString (`,"fields":` )
buf .Write (fieldsJSON )
if err := s .marshalPropertiesToJSON (buf ); err != nil {
return nil , err
}
buf .WriteString ("}" )
return buf .Bytes (), nil
}
func (s *RecordSchema ) Fingerprint () [32 ]byte {
return s .fingerprinter .Fingerprint (s )
}
func (s *RecordSchema ) FingerprintUsing (typ FingerprintType ) ([]byte , error ) {
return s .fingerprinter .FingerprintUsing (typ , s )
}
func (s *RecordSchema ) CacheFingerprint () [32 ]byte {
return s .cacheFingerprinter .CacheFingerprint (s , func () []byte {
var defs []any
for _ , field := range s .fields {
if !field .HasDefault () {
continue
}
defs = append (defs , field .Default ())
}
b , _ := jsoniterAPI .Marshal (defs )
return b
})
}
type Field struct {
properties
name string
aliases []string
doc string
typ Schema
hasDef bool
def any
order Order
action Action
encodedDef atomic .Value
}
type noDef struct {}
var NoDefault = noDef {}
func NewField (name string , typ Schema , opts ...SchemaOption ) (*Field , error ) {
cfg := schemaConfig {def : NoDefault }
for _ , opt := range opts {
opt (&cfg )
}
if err := validateName (name ); err != nil {
return nil , err
}
for _ , a := range cfg .aliases {
if err := validateName (a ); err != nil {
return nil , err
}
}
switch cfg .order {
case "" :
cfg .order = Asc
case Asc , Desc , Ignore :
default :
return nil , fmt .Errorf ("avro: field %q order %q is invalid" , name , cfg .order )
}
f := &Field {
properties : newProperties (cfg .props , fieldReserved ),
name : name ,
aliases : cfg .aliases ,
doc : cfg .doc ,
typ : typ ,
order : cfg .order ,
}
if cfg .def != NoDefault {
def , err := validateDefault (name , typ , cfg .def )
if err != nil {
return nil , err
}
f .def = def
f .hasDef = true
}
return f , nil
}
func (f *Field ) Name () string {
return f .name
}
func (f *Field ) Aliases () []string {
return f .aliases
}
func (f *Field ) Type () Schema {
return f .typ
}
func (f *Field ) HasDefault () bool {
return f .hasDef
}
func (f *Field ) Default () any {
if f .def == nullDefault {
return nil
}
return f .def
}
func (f *Field ) encodeDefault (encode func (any ) ([]byte , error )) ([]byte , error ) {
if v := f .encodedDef .Load (); v != nil {
return v .([]byte ), nil
}
if !f .hasDef {
return nil , fmt .Errorf ("avro: '%s' field must have a non-empty default value" , f .name )
}
if encode == nil {
return nil , fmt .Errorf ("avro: failed to encode '%s' default value" , f .name )
}
b , err := encode (f .Default ())
if err != nil {
return nil , err
}
f .encodedDef .Store (b )
return b , nil
}
func (f *Field ) Doc () string {
return f .doc
}
func (f *Field ) Order () Order {
return f .order
}
func (f *Field ) String () string {
return `{"name":"` + f .name + `","type":` + f .typ .String () + `}`
}
func (f *Field ) MarshalJSON () ([]byte , error ) {
buf := new (bytes .Buffer )
buf .WriteString (`{"name":"` + f .name + `"` )
if len (f .aliases ) > 0 {
aliasesJSON , err := jsoniterAPI .Marshal (f .aliases )
if err != nil {
return nil , err
}
buf .WriteString (`,"aliases":` )
buf .Write (aliasesJSON )
}
if f .doc != "" {
docJSON , err := jsoniterAPI .Marshal (f .doc )
if err != nil {
return nil , err
}
buf .WriteString (`,"doc":` )
buf .Write (docJSON )
}
typeJSON , err := jsoniterAPI .Marshal (f .typ )
if err != nil {
return nil , err
}
buf .WriteString (`,"type":` )
buf .Write (typeJSON )
if f .hasDef {
defaultValueJSON , err := jsoniterAPI .Marshal (f .Default ())
if err != nil {
return nil , err
}
buf .WriteString (`,"default":` )
buf .Write (defaultValueJSON )
}
if f .order != "" && f .order != Asc {
buf .WriteString (`,"order":"` + string (f .order ) + `"` )
}
if err := f .marshalPropertiesToJSON (buf ); err != nil {
return nil , err
}
buf .WriteString ("}" )
return buf .Bytes (), nil
}
type EnumSchema struct {
name
properties
fingerprinter
cacheFingerprinter
symbols []string
def string
doc string
encodedSymbols []string
}
func NewEnumSchema (name , namespace string , symbols []string , opts ...SchemaOption ) (*EnumSchema , error ) {
var cfg schemaConfig
for _ , opt := range opts {
opt (&cfg )
}
n , err := newName (name , namespace , cfg .aliases )
if err != nil {
return nil , err
}
if len (symbols ) == 0 {
return nil , errors .New ("avro: enum must have a non-empty array of symbols" )
}
symbolNames := make (map [string ]struct {})
for _ , sym := range symbols {
if err = validateName (sym ); err != nil {
return nil , fmt .Errorf ("avro: invalid symbol %q" , sym )
}
if _ , exists := symbolNames [sym ]; exists {
return nil , fmt .Errorf ("avro: duplicate symbol %q" , sym )
}
symbolNames [sym ] = struct {}{}
}
var def string
if d , ok := cfg .def .(string ); ok && d != "" {
if !hasSymbol (symbols , d ) {
return nil , fmt .Errorf ("avro: symbol default %q must be a symbol" , d )
}
def = d
}
return &EnumSchema {
name : n ,
properties : newProperties (cfg .props , enumReserved ),
cacheFingerprinter : cacheFingerprinter {writerFingerprint : cfg .wfp },
symbols : symbols ,
def : def ,
doc : cfg .doc ,
}, nil
}
func hasSymbol(symbols []string , sym string ) bool {
for _ , s := range symbols {
if s == sym {
return true
}
}
return false
}
func (s *EnumSchema ) Type () Type {
return Enum
}
func (s *EnumSchema ) Doc () string {
return s .doc
}
func (s *EnumSchema ) Symbols () []string {
return s .symbols
}
func (s *EnumSchema ) Symbol (i int ) (string , bool ) {
resolv := len (s .encodedSymbols ) > 0
symbols := s .symbols
if resolv {
symbols = s .encodedSymbols
}
if i < 0 || i >= len (symbols ) {
return "" , false
}
symbol := symbols [i ]
if resolv && !hasSymbol (s .symbols , symbol ) {
if !s .HasDefault () {
return "" , false
}
return s .Default (), true
}
return symbol , true
}
func (s *EnumSchema ) Default () string {
return s .def
}
func (s *EnumSchema ) HasDefault () bool {
return s .def != ""
}
func (s *EnumSchema ) String () string {
symbols := ""
for _ , sym := range s .symbols {
symbols += `"` + sym + `",`
}
if len (symbols ) > 0 {
symbols = symbols [:len (symbols )-1 ]
}
return `{"name":"` + s .FullName () + `","type":"enum","symbols":[` + symbols + `]}`
}
func (s *EnumSchema ) MarshalJSON () ([]byte , error ) {
buf := new (bytes .Buffer )
buf .WriteString (`{"name":"` + s .full + `"` )
if len (s .aliases ) > 0 {
aliasesJSON , err := jsoniterAPI .Marshal (s .aliases )
if err != nil {
return nil , err
}
buf .WriteString (`,"aliases":` )
buf .Write (aliasesJSON )
}
if s .doc != "" {
docJSON , err := jsoniterAPI .Marshal (s .doc )
if err != nil {
return nil , err
}
buf .WriteString (`,"doc":` )
buf .Write (docJSON )
}
buf .WriteString (`,"type":"enum"` )
symbolsJSON , err := jsoniterAPI .Marshal (s .symbols )
if err != nil {
return nil , err
}
buf .WriteString (`,"symbols":` )
buf .Write (symbolsJSON )
if s .def != "" {
buf .WriteString (`,"default":"` + s .def + `"` )
}
if err := s .marshalPropertiesToJSON (buf ); err != nil {
return nil , err
}
buf .WriteString ("}" )
return buf .Bytes (), nil
}
func (s *EnumSchema ) Fingerprint () [32 ]byte {
return s .fingerprinter .Fingerprint (s )
}
func (s *EnumSchema ) FingerprintUsing (typ FingerprintType ) ([]byte , error ) {
return s .fingerprinter .FingerprintUsing (typ , s )
}
func (s *EnumSchema ) CacheFingerprint () [32 ]byte {
return s .cacheFingerprinter .CacheFingerprint (s , func () []byte {
if !s .HasDefault () {
return []byte {}
}
return []byte (s .Default ())
})
}
type ArraySchema struct {
properties
fingerprinter
cacheFingerprinter
items Schema
}
func NewArraySchema (items Schema , opts ...SchemaOption ) *ArraySchema {
var cfg schemaConfig
for _ , opt := range opts {
opt (&cfg )
}
return &ArraySchema {
properties : newProperties (cfg .props , arrayReserved ),
cacheFingerprinter : cacheFingerprinter {writerFingerprint : cfg .wfp },
items : items ,
}
}
func (s *ArraySchema ) Type () Type {
return Array
}
func (s *ArraySchema ) Items () Schema {
return s .items
}
func (s *ArraySchema ) String () string {
return `{"type":"array","items":` + s .items .String () + `}`
}
func (s *ArraySchema ) MarshalJSON () ([]byte , error ) {
buf := new (bytes .Buffer )
buf .WriteString (`{"type":"array"` )
itemsJSON , err := jsoniterAPI .Marshal (s .items )
if err != nil {
return nil , err
}
buf .WriteString (`,"items":` )
buf .Write (itemsJSON )
if err = s .marshalPropertiesToJSON (buf ); err != nil {
return nil , err
}
buf .WriteString ("}" )
return buf .Bytes (), nil
}
func (s *ArraySchema ) Fingerprint () [32 ]byte {
return s .fingerprinter .Fingerprint (s )
}
func (s *ArraySchema ) FingerprintUsing (typ FingerprintType ) ([]byte , error ) {
return s .fingerprinter .FingerprintUsing (typ , s )
}
func (s *ArraySchema ) CacheFingerprint () [32 ]byte {
return s .cacheFingerprinter .CacheFingerprint (s , nil )
}
type MapSchema struct {
properties
fingerprinter
cacheFingerprinter
values Schema
}
func NewMapSchema (values Schema , opts ...SchemaOption ) *MapSchema {
var cfg schemaConfig
for _ , opt := range opts {
opt (&cfg )
}
return &MapSchema {
properties : newProperties (cfg .props , mapReserved ),
cacheFingerprinter : cacheFingerprinter {writerFingerprint : cfg .wfp },
values : values ,
}
}
func (s *MapSchema ) Type () Type {
return Map
}
func (s *MapSchema ) Values () Schema {
return s .values
}
func (s *MapSchema ) String () string {
return `{"type":"map","values":` + s .values .String () + `}`
}
func (s *MapSchema ) MarshalJSON () ([]byte , error ) {
buf := new (bytes .Buffer )
buf .WriteString (`{"type":"map"` )
valuesJSON , err := jsoniterAPI .Marshal (s .values )
if err != nil {
return nil , err
}
buf .WriteString (`,"values":` )
buf .Write (valuesJSON )
if err := s .marshalPropertiesToJSON (buf ); err != nil {
return nil , err
}
buf .WriteString ("}" )
return buf .Bytes (), nil
}
func (s *MapSchema ) Fingerprint () [32 ]byte {
return s .fingerprinter .Fingerprint (s )
}
func (s *MapSchema ) FingerprintUsing (typ FingerprintType ) ([]byte , error ) {
return s .fingerprinter .FingerprintUsing (typ , s )
}
func (s *MapSchema ) CacheFingerprint () [32 ]byte {
return s .cacheFingerprinter .CacheFingerprint (s , nil )
}
type UnionSchema struct {
fingerprinter
cacheFingerprinter
types Schemas
}
func NewUnionSchema (types []Schema , opts ...SchemaOption ) (*UnionSchema , error ) {
var cfg schemaConfig
for _ , opt := range opts {
opt (&cfg )
}
seen := map [string ]bool {}
for _ , schema := range types {
if schema .Type () == Union {
return nil , errors .New ("avro: union type cannot be a union" )
}
strType := schemaTypeName (schema )
if seen [strType ] {
return nil , errors .New ("avro: union type must be unique" )
}
seen [strType ] = true
}
return &UnionSchema {
cacheFingerprinter : cacheFingerprinter {writerFingerprint : cfg .wfp },
types : types ,
}, nil
}
func (s *UnionSchema ) Type () Type {
return Union
}
func (s *UnionSchema ) Types () Schemas {
return s .types
}
func (s *UnionSchema ) Contains (typ Type ) bool {
_ , pos := s .types .Get (string (typ ))
return pos != -1
}
func (s *UnionSchema ) Nullable () bool {
if len (s .types ) != 2 || s .types [0 ].Type () != Null && s .types [1 ].Type () != Null {
return false
}
return true
}
func (s *UnionSchema ) Indices () (null , typ int ) {
if !s .Nullable () {
return 0 , 0
}
if s .types [0 ].Type () == Null {
return 0 , 1
}
return 1 , 0
}
func (s *UnionSchema ) String () string {
types := ""
for _ , typ := range s .types {
types += typ .String () + ","
}
if len (types ) > 0 {
types = types [:len (types )-1 ]
}
return `[` + types + `]`
}
func (s *UnionSchema ) MarshalJSON () ([]byte , error ) {
return jsoniterAPI .Marshal (s .types )
}
func (s *UnionSchema ) Fingerprint () [32 ]byte {
return s .fingerprinter .Fingerprint (s )
}
func (s *UnionSchema ) FingerprintUsing (typ FingerprintType ) ([]byte , error ) {
return s .fingerprinter .FingerprintUsing (typ , s )
}
func (s *UnionSchema ) CacheFingerprint () [32 ]byte {
return s .cacheFingerprinter .CacheFingerprint (s , nil )
}
type FixedSchema struct {
name
properties
fingerprinter
cacheFingerprinter
size int
logical LogicalSchema
}
func NewFixedSchema (
name , namespace string ,
size int ,
logical LogicalSchema ,
opts ...SchemaOption ,
) (*FixedSchema , error ) {
var cfg schemaConfig
for _ , opt := range opts {
opt (&cfg )
}
n , err := newName (name , namespace , cfg .aliases )
if err != nil {
return nil , err
}
if size < 0 {
return nil , errors .New ("avro: fixed size cannot be negative" )
}
reservedProps := fixedReserved
if logical != nil {
if logical .Type () == Decimal {
reservedProps = fixedWithDecimalTypeReserved
} else {
reservedProps = fixedWithLogicalTypeReserved
}
}
return &FixedSchema {
name : n ,
properties : newProperties (cfg .props , reservedProps ),
cacheFingerprinter : cacheFingerprinter {writerFingerprint : cfg .wfp },
size : size ,
logical : logical ,
}, nil
}
func (s *FixedSchema ) Type () Type {
return Fixed
}
func (s *FixedSchema ) Size () int {
return s .size
}
func (s *FixedSchema ) Logical () LogicalSchema {
return s .logical
}
func (s *FixedSchema ) String () string {
size := strconv .Itoa (s .size )
var logical string
if s .logical != nil {
logical = "," + s .logical .String ()
}
return `{"name":"` + s .FullName () + `","type":"fixed","size":` + size + logical + `}`
}
func (s *FixedSchema ) MarshalJSON () ([]byte , error ) {
buf := new (bytes .Buffer )
buf .WriteString (`{"name":"` + s .full + `"` )
if len (s .aliases ) > 0 {
aliasesJSON , err := jsoniterAPI .Marshal (s .aliases )
if err != nil {
return nil , err
}
buf .WriteString (`,"aliases":` )
buf .Write (aliasesJSON )
}
buf .WriteString (`,"type":"fixed"` )
buf .WriteString (`,"size":` + strconv .Itoa (s .size ))
if s .logical != nil {
buf .WriteString (`,"logicalType":"` + string (s .logical .Type ()) + `"` )
if d , ok := s .logical .(*DecimalLogicalSchema ); ok {
buf .WriteString (`,"precision":` + strconv .Itoa (d .prec ))
if d .scale > 0 {
buf .WriteString (`,"scale":` + strconv .Itoa (d .scale ))
}
}
}
if err := s .marshalPropertiesToJSON (buf ); err != nil {
return nil , err
}
buf .WriteString ("}" )
return buf .Bytes (), nil
}
func (s *FixedSchema ) Fingerprint () [32 ]byte {
return s .fingerprinter .Fingerprint (s )
}
func (s *FixedSchema ) FingerprintUsing (typ FingerprintType ) ([]byte , error ) {
return s .fingerprinter .FingerprintUsing (typ , s )
}
func (s *FixedSchema ) CacheFingerprint () [32 ]byte {
return s .cacheFingerprinter .CacheFingerprint (s , nil )
}
type NullSchema struct {
properties
fingerprinter
}
func NewNullSchema (opts ...SchemaOption ) *NullSchema {
var cfg schemaConfig
for _ , opt := range opts {
opt (&cfg )
}
return &NullSchema {
properties : newProperties (cfg .props , primitiveReserved ),
}
}
func (s *NullSchema ) Type () Type {
return Null
}
func (s *NullSchema ) String () string {
return `"null"`
}
func (s *NullSchema ) MarshalJSON () ([]byte , error ) {
if len (s .props ) == 0 {
return []byte (`"null"` ), nil
}
buf := new (bytes .Buffer )
buf .WriteString (`{"type":"null"` )
if err := s .marshalPropertiesToJSON (buf ); err != nil {
return nil , err
}
buf .WriteString ("}" )
return buf .Bytes (), nil
}
func (s *NullSchema ) Fingerprint () [32 ]byte {
return s .fingerprinter .Fingerprint (s )
}
func (s *NullSchema ) FingerprintUsing (typ FingerprintType ) ([]byte , error ) {
return s .fingerprinter .FingerprintUsing (typ , s )
}
func (s *NullSchema ) CacheFingerprint () [32 ]byte {
return s .Fingerprint ()
}
type RefSchema struct {
actual NamedSchema
}
func NewRefSchema (schema NamedSchema ) *RefSchema {
return &RefSchema {
actual : schema ,
}
}
func (s *RefSchema ) Type () Type {
return Ref
}
func (s *RefSchema ) Schema () NamedSchema {
return s .actual
}
func (s *RefSchema ) String () string {
return `"` + s .actual .FullName () + `"`
}
func (s *RefSchema ) MarshalJSON () ([]byte , error ) {
return []byte (`"` + s .actual .FullName () + `"` ), nil
}
func (s *RefSchema ) Fingerprint () [32 ]byte {
return s .actual .Fingerprint ()
}
func (s *RefSchema ) FingerprintUsing (typ FingerprintType ) ([]byte , error ) {
return s .actual .FingerprintUsing (typ )
}
func (s *RefSchema ) CacheFingerprint () [32 ]byte {
return s .actual .CacheFingerprint ()
}
type PrimitiveLogicalSchema struct {
typ LogicalType
}
func NewPrimitiveLogicalSchema (typ LogicalType ) *PrimitiveLogicalSchema {
return &PrimitiveLogicalSchema {
typ : typ ,
}
}
func (s *PrimitiveLogicalSchema ) Type () LogicalType {
return s .typ
}
func (s *PrimitiveLogicalSchema ) String () string {
return `"logicalType":"` + string (s .typ ) + `"`
}
type DecimalLogicalSchema struct {
prec int
scale int
}
func NewDecimalLogicalSchema (prec , scale int ) *DecimalLogicalSchema {
return &DecimalLogicalSchema {
prec : prec ,
scale : scale ,
}
}
func (s *DecimalLogicalSchema ) Type () LogicalType {
return Decimal
}
func (s *DecimalLogicalSchema ) Precision () int {
return s .prec
}
func (s *DecimalLogicalSchema ) Scale () int {
return s .scale
}
func (s *DecimalLogicalSchema ) String () string {
var scale string
if s .scale > 0 {
scale = `,"scale":` + strconv .Itoa (s .scale )
}
precision := strconv .Itoa (s .prec )
return `"logicalType":"` + string (Decimal ) + `","precision":` + precision + scale
}
func invalidNameFirstChar(r rune ) bool {
return (r < 'A' || r > 'Z' ) && (r < 'a' || r > 'z' ) && r != '_'
}
func invalidNameOtherChar(r rune ) bool {
return invalidNameFirstChar (r ) && (r < '0' || r > '9' )
}
func validateName(name string ) error {
if name == "" {
return errors .New ("name must be a non-empty" )
}
if SkipNameValidation {
return nil
}
if strings .IndexFunc (name [:1 ], invalidNameFirstChar ) > -1 {
return fmt .Errorf ("invalid name %s" , name )
}
if strings .IndexFunc (name [1 :], invalidNameOtherChar ) > -1 {
return fmt .Errorf ("invalid name %s" , name )
}
return nil
}
func validateDefault(name string , schema Schema , def any ) (any , error ) {
def , ok := isValidDefault (schema , def )
if !ok {
return nil , fmt .Errorf ("avro: invalid default for field %s. %+v not a %s" , name , def , schema .Type ())
}
return def , nil
}
func isValidDefault(schema Schema , def any ) (any , bool ) {
switch schema .Type () {
case Ref :
ref := schema .(*RefSchema )
return isValidDefault (ref .Schema (), def )
case Null :
return nullDefault , def == nil
case Enum :
v , ok := def .(string )
if !ok || len (v ) == 0 {
return def , false
}
var found bool
for _ , sym := range schema .(*EnumSchema ).symbols {
if def == sym {
found = true
break
}
}
return def , found
case String :
if _ , ok := def .(string ); ok {
return def , true
}
case Bytes , Fixed :
if d , ok := def .(string ); ok {
if b , ok := isValidDefaultBytes (d ); ok {
if schema .Type () == Fixed {
return byteSliceToArray (b , schema .(*FixedSchema ).Size ()), true
}
return b , true
}
}
case Boolean :
if _ , ok := def .(bool ); ok {
return def , true
}
case Int :
if i , ok := def .(int8 ); ok {
return int (i ), true
}
if i , ok := def .(int16 ); ok {
return int (i ), true
}
if i , ok := def .(int32 ); ok {
return int (i ), true
}
if _ , ok := def .(int ); ok {
return def , true
}
if f , ok := def .(float64 ); ok {
return int (f ), true
}
case Long :
if _ , ok := def .(int64 ); ok {
return def , true
}
if f , ok := def .(float64 ); ok {
return int64 (f ), true
}
case Float :
if _ , ok := def .(float32 ); ok {
return def , true
}
if f , ok := def .(float64 ); ok {
return float32 (f ), true
}
case Double :
if _ , ok := def .(float64 ); ok {
return def , true
}
case Array :
arr , ok := def .([]any )
if !ok {
return nil , false
}
as := schema .(*ArraySchema )
for i , v := range arr {
v , ok := isValidDefault (as .Items (), v )
if !ok {
return nil , false
}
arr [i ] = v
}
return arr , true
case Map :
m , ok := def .(map [string ]any )
if !ok {
return nil , false
}
ms := schema .(*MapSchema )
for k , v := range m {
v , ok := isValidDefault (ms .Values (), v )
if !ok {
return nil , false
}
m [k ] = v
}
return m , true
case Union :
unionSchema := schema .(*UnionSchema )
return isValidDefault (unionSchema .Types ()[0 ], def )
case Record :
m , ok := def .(map [string ]any )
if !ok {
return nil , false
}
for _ , field := range schema .(*RecordSchema ).Fields () {
fieldDef := field .Default ()
if newDef , ok := m [field .Name ()]; ok {
fieldDef = newDef
}
v , ok := isValidDefault (field .Type (), fieldDef )
if !ok {
return nil , false
}
m [field .Name ()] = v
}
return m , true
}
return nil , false
}
func schemaTypeName(schema Schema ) string {
if schema .Type () == Ref {
schema = schema .(*RefSchema ).Schema ()
}
if n , ok := schema .(NamedSchema ); ok {
return n .FullName ()
}
sname := string (schema .Type ())
if lt := getLogicalType (schema ); lt != "" {
sname += "." + string (lt )
}
return sname
}
func isValidDefaultBytes(def string ) ([]byte , bool ) {
runes := []rune (def )
l := len (runes )
b := make ([]byte , l )
for i := range l {
if runes [i ] < 0 || runes [i ] > 255 {
return nil , false
}
b [i ] = byte (runes [i ])
}
return b , true
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .