reedsolomon算法实现需要矩阵和伽罗华域(Galois Field)的一些知识,这两者在前面都已做了介绍,并且这部分网上很多人都做了详细说明,这里再挑重点的部分能使用到的地方介绍一下。

1、伽罗华域运算:

伽罗瓦域,该域本质上是一组受限的数字(也称有限域),GF(2^n) 表示含有2^n个元素的有限域,这里n取8,即GF(256).有限域中的每一个元素都对应一个多项式。

(1)、加减法:

将两个多项式中相同阶数的项系数相加或相减。例如 (x^2 + x ) + (x + 1) = x^2 + 2x +1

对于GF(2^w)上的多项式计算,多项式系数只能取 0或1。(如果是GF(3^w),那么系数可以取 0、1、 2)

GF(2^w)的多项式加法中,合并阶数相同的同类项时,由于0+0=0,1+1=0,0+1=1+0=1,因此系数不是进行加法操作,而是进行异或操作。

GF(2^w)的多项式减法等于加法,例如x ^4 – x^4就等于x^4 + x^4。

(2)、乘除法:

乘除法是使用多项式进行,只不过多项式乘了之后需要mod P(x),P(x)是本源多项式,GF(2^w)上的本源多项式一般取:x^8+x^4 + x^3 +x^2 +1。Reedsolomon里面是使用查表法来进行乘除运算。

(3)、正向表和逆向表:

生成元是域上的一类特殊元素,生成元的幂可以遍历域上的所有元素。假设g是域GF(2^w)上生成元,那么集合 {g0,g1 , ……,g(2^w-1) } 包含了域GF(2^w)上所有非零元素。在域GF(2^w)中2总是生成元。

将生成元应用到多项式中, GF(2^w)中的所有多项式都是可以通过多项式生成元g通过幂求得。即域中的任意元素a,都可以表示为a = g^k。

GF(2^w)是一个有限域,就是元素个数是有限的,但指数k是可以无穷的。所以必然存在循环。这个循环的周期是2^w-1(g不能生成多项式 0)。所以当k大于等于2^w-1时,g^k=g^(k%(2^w-1))。

例如2^k = a,有正过程和逆过程。知道指数k求a是正过程,知道值a求指数k是逆过程:深入浅出理解reedsolomon库数据冗余算法原理和具体实现源码分析-编程之家

对于乘法,假设a=g^n,b=g^m。那么a*b = g^n* g^m = g^(n+m)。查表的方法就是根据a和b,分别查表得到n和m,然后查表g^(n+m)即可。

因此需要构造正表和反表,在GF(2^w)域上分别记为gflog和gfilog。gflog是将二进制形式映射为多项式形式,gfilog是将多项式形式映射为二进制形式。

注意:多项式0 ,是无法用生成元生成的。g^0等于多项式1,而不是 0。

对于2^8 列出部分正向表和逆向表:

i 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
gflog[i] 0 1 25 2 50 26 198 3 223 51 238 27 104 199 75 4 100 224 14
gfilog[i] 1 2 4 8 16 32 64 128 29 58 116 232 205 135 19 38 76 152 45 90

举个例子,生成Vandermonde矩阵时需要计算a^n,计算方法是:

(1)、根据a查找gflog表,找出对应的多项式k

(2)、n个多项式加就是多项式k*n

(3)、最后查表gfilog将多项式转成二进制。

2、范德蒙(Vandermonde)矩阵

在线性代数中,Vandermonde矩阵是一个每行有几何级数项的矩阵,即m×n矩阵

深入浅出理解reedsolomon库数据冗余算法原理和具体实现源码分析-编程之家

 也可以表示成:

深入浅出理解reedsolomon库数据冗余算法原理和具体实现源码分析-编程之家

矩阵的行列式的值可以表示成:

深入浅出理解reedsolomon库数据冗余算法原理和具体实现源码分析-编程之家

这称为范德蒙行列式或范德蒙多项式。

范德蒙矩阵是满秩矩阵,满秩矩阵是可逆的,从范德蒙矩阵中任意取n行组成n阶方阵都可逆。

3Reedsolomon

 Reedsolomon算法分成两部分:编码和解码。整体结构如下图:

                                          深入浅出理解reedsolomon库数据冗余算法原理和具体实现源码分析-编程之家

Reedsolomon提供了三种形式的编码矩阵和解码矩阵:Cauchy、PAR1Matrix、vandermonde。默认是使用vandermonde矩阵来完成编解码工作。因为2^8的限制,数据块和校验块只和不能超过256,如果超过256 就无法提供编解码服务。默认是4+2,即4个数据块2个校验块。

(1)、首先生成编/解码矩阵:

func New(dataShards, parityShards int, opts ...Option) (Encoder, error) {r := reedSolomon{DataShards:   dataShards,//数据块个数ParityShards: parityShards, //校验块个数Shards:       dataShards + parityShards,o:            defaultOptions, }for _, opt := range opts {opt(&r.o)
}
//进行一些参数检查if dataShards <= 0 || parityShards <= 0 {return nil, ErrInvShardNum}if dataShards+parityShards > 256 {return nil, ErrMaxShardNum}var err errorswitch {
case r.o.useCauchy://生成柯西矩阵r.m, err = buildMatrixCauchy(dataShards, r.Shards)
case r.o.usePAR1Matrix://生成PAR1矩阵r.m, err = buildMatrixPAR1(dataShards, r.Shards)
default://默认的使用范德蒙矩阵r.m, err = buildMatrix(dataShards, r.Shards)}if err != nil {return nil, err}
if r.o.shardSize > 0 {//使用cpu计算矩阵的设置cacheSize := cpuid.CPU.Cache.L2if cacheSize <= 0 {// Set to 128K if undetectable.cacheSize = 128 << 10}p := runtime.NumCPU()shards := 1 + parityShardsg := (r.o.shardSize * shards) / (cacheSize - (cacheSize >> 4))g *= 2if g < p {g = p}// Have g be multiple of pg += p - 1g -= g % pr.o.maxGoroutines = g}//逆矩阵缓存在树中,数据的无效行的索引作为key值。此tree是为了加快解码,不用每次都重新生成解码矩阵r.tree = newInversionTree(dataShards, parityShards)//记录校验部分的矩阵信息r.parity = make([][]byte, parityShards)for i := range r.parity {r.parity[i] = r.m[dataShards+i]fmt.Println("parity i", i, ":", r.parity[i])}return &r, err
}

 以vandermonde矩阵为例,生成vandermonde矩阵的函数是buildMatrix:

func buildMatrix(dataShards, totalShards int) (matrix, error) {// 生成标准的范德蒙矩阵vm, err := vandermonde(totalShards, dataShards)if err != nil {return nil, err}fmt.Println("vm :", vm)//从范德蒙标准矩阵中获取数据块对应的n阶方阵top, err := vm.SubMatrix(0, 0, dataShards, dataShards)if err != nil {return nil, err}
fmt.Println("top :", top)
//取数据块对应的n阶方阵的逆矩阵topInv, err := top.Invert()if err != nil {return nil, err}
fmt.Println("topInv :", topInv)
//vm是个标准的范德蒙矩阵,并不是我们想要的编码矩阵,使用范德蒙原始矩阵和上面解解出来的逆矩阵相乘,可以得到上部分是单位矩阵下部分是校验矩阵的编码矩阵。return vm.Multiply(topInv)
}

范德蒙标准矩阵的生成使用了我们前面说的a^n计算方法,该方法是在伽罗华域内幂运算。

func vandermonde(rows, cols int) (matrix, error) {// rows是数据块和校验块的和,cols是数据块个数,此处生成一个空矩阵result, err := newMatrix(rows, cols)if err != nil {return nil, err}for r, row := range result {for c := range row {//将次矩阵填充成范德蒙标准形式,使用了幂运算result[r][c] = galExp(byte(r), c)}}return result, nil
}

galExp使用了上面提到的正向表和逆向表,根据查表在二进制和多项式之间切换。

到此,已经生成了一个范德蒙矩阵:

vm : [[1, 0, 0, 0],

[1, 1, 1, 1],

 [1, 2, 4, 8],

 [1, 3, 5, 15],

 [1, 4, 16, 64],

 [1, 5, 17, 85]]

但是还不是我们想要的单位矩阵的形式,对[1, 0, 0, 0], [1, 1, 1, 1], [1, 2, 4, 8], [1, 3, 5, 15]求解对应的逆矩阵,逆变换使用了增广矩阵的初等行变换方法,主要的思想是在该矩阵后面补一个单位矩阵,然后进行初等变换,将单位矩阵变道左侧,右侧的就是对应的逆矩阵:

                                                                  深入浅出理解reedsolomon库数据冗余算法原理和具体实现源码分析-编程之家

gaussianElimination函数是执行初等变换的主要函数,其中用到了除法运算。

func (m matrix) gaussianElimination() error {rows := len(m)columns := len(m[0])for r := 0; r < rows; r++ {if m[r][r] == 0 {for rowBelow := r + 1; rowBelow < rows; rowBelow++ {if m[rowBelow][r] != 0 {m.SwapRows(r, rowBelow)break}}}if m[r][r] == 0 {return errSingular}if m[r][r] != 1 {scale := galDivide(1, m[r][r])for c := 0; c < columns; c++ {m[r][c] = galMultiply(m[r][c], scale)}}for rowBelow := r + 1; rowBelow < rows; rowBelow++ {if m[rowBelow][r] != 0 {scale := m[rowBelow][r]for c := 0; c < columns; c++ {m[rowBelow][c] ^= galMultiply(scale, m[r][c])}}}}for d := 0; d < rows; d++ {for rowAbove := 0; rowAbove < d; rowAbove++ {if m[rowAbove][d] != 0 {scale := m[rowAbove][d]for c := 0; c < columns; c++ {m[rowAbove][c] ^= galMultiply(scale, m[d][c])}}}}return nil
}

逆矩阵求解出来时这个样子:

[[1, 0, 0, 0],

[123, 1, 142, 244],

[0, 122, 244, 142],

[122, 122, 122, 122]]

用原始的范德蒙矩阵和上面求解出来的逆矩阵,相乘就得到了我们希望的编/解码矩阵。这里需要注意,矩阵相乘使用的是查表法,因为一共就256个数相乘,总共有256*256种可能,把这256*256种场景全部提前计算出来做成一个矩阵形式,使用时将需要相乘和两个数分别作为行列号,直接带入表中即可查询到这两个数相乘的结果。

 

func (m matrix) Multiply(right matrix) (matrix, error) {if len(m[0]) != len(right) {return nil, fmt.Errorf("columns on left (%d) is different than rows on right (%d)", len(m[0]), len(right))}result, _ := newMatrix(len(m), len(right[0]))for r, row := range result {for c := range row {var value bytefor i := range m[0] {value ^= galMultiply(m[r][i], right[i][c])}result[r][c] = value}}fmt.Println("Multiply :", result)return result, nil
}

 

最终形态:

[ [1, 0, 0, 0],

 [0, 1, 0, 0],

[0, 0, 1, 0],

[0, 0, 0, 1],

 [27, 28, 18, 20],

[28, 27, 20, 18]]

(2)、编码

编码过程是根据数据块和编码矩阵计算出校验块。因为数据块长度是单位是字节,因此是按照字节将校验块的数据计算出来,根据矩阵乘法我们知道需要每个字节和校验数据相乘,然后在做加法,加法在伽罗华域中是异或运算,知道这点就很容易看看懂下面函数了.

func (r reedSolomon) codeSomeShards(matrixRows, inputs, outputs [][]byte, outputCount, byteCount int) {if r.o.useAVX512 && len(inputs) >= 4 && len(outputs) >= 2 {r.codeSomeShardsAvx512(matrixRows, inputs, outputs, outputCount, byteCount)return} else if r.o.maxGoroutines > 1 && byteCount > r.o.minSplitSize {r.codeSomeShardsP(matrixRows, inputs, outputs, outputCount, byteCount)return}for c := 0; c < r.DataShards; c++ {in := inputs[c]for iRow := 0; iRow < outputCount; iRow++ {if c == 0 {//第一次计算校验数据,根据计算出来的值直接赋值到outputsgalMulSlice(matrixRows[iRow][c], in, outputs[iRow], &r.o)} else {
//不是第一次计算校验数据,根据计算出来的值异或原有的值galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow], &r.o)}}}
}
//galMulSlice、galMulSliceXor是对每个字节进行校验数据计算,同样是使用了查表法来做乘法
func galMulSlice(c byte, in, out []byte, o *options) {var done intif o.useAVX2 {galMulAVX2(mulTableLow[c][:], mulTableHigh[c][:], in, out)done = (len(in) >> 5) << 5} else if o.useSSSE3 {galMulSSSE3(mulTableLow[c][:], mulTableHigh[c][:], in, out)done = (len(in) >> 4) << 4}remain := len(in) - doneif remain > 0 {mt := mulTable[c][:256]for i := done; i < len(in); i++ {out[i] = mt[in[i]]}}
}func galMulSliceXor(c byte, in, out []byte, o *options) {var done intif o.useAVX2 {galMulAVX2Xor(mulTableLow[c][:], mulTableHigh[c][:], in, out)done = (len(in) >> 5) << 5} else if o.useSSSE3 {galMulSSSE3Xor(mulTableLow[c][:], mulTableHigh[c][:], in, out)done = (len(in) >> 4) << 4}remain := len(in) - doneif remain > 0 {mt := mulTable[c][:256]for i := done; i < len(in); i++ {out[i] ^= mt[in[i]]}}
}

编码程比较简单,主要就是查表计算乘法计算出校验数据块。

(3)、解码(数据重建)

解码分成重建原始数据块和重建校验块。如果是重建数据块,需要在编/解码将损坏的数据块对应的行去掉然后求逆矩阵,根据逆矩阵和完整的数据块计算出元数据数据块。如果是重建校验块,直接根据上面说的编码,重新计算出校验块。如果数据块和校验块都需要重建,先重建数据块,然后在重建校验块。

func (r reedSolomon) reconstruct(shards [][]byte, dataOnly bool) error {if len(shards) != r.Shards {return ErrTooFewShards}// 校验,检查是否有块需要重建err := checkShards(shards, true)if err != nil {return err}//检查部分略过shardSize := shardSize(shards)numberPresent := 0for i := 0; i < r.Shards; i++ {if len(shards[i]) != 0 {numberPresent++}}if numberPresent == r.Shards {return nil}// More complete sanity checkif numberPresent < r.DataShards {return ErrTooFewShards
}
//这里将损坏的块标记,subShards := make([][]byte, r.DataShards)validIndices := make([]int, r.DataShards)invalidIndices := make([]int, 0)subMatrixRow := 0for matrixRow := 0; matrixRow < r.Shards && subMatrixRow < r.DataShards; matrixRow++ {if len(shards[matrixRow]) != 0 {subShards[subMatrixRow] = shards[matrixRow]validIndices[subMatrixRow] = matrixRowsubMatrixRow++} else {//损坏的块invalidIndices = append(invalidIndices, matrixRow)}
}
//如果tree中该损坏的块已有对应的解码矩阵直接取来用即可dataDecodeMatrix := r.tree.GetInvertedMatrix(invalidIndices)if dataDecodeMatrix == nil {//若解码矩阵不存在,新建,获取完好行对应的矩阵subMatrix, _ := newMatrix(r.DataShards, r.DataShards)for subMatrixRow, validIndex := range validIndices {for c := 0; c < r.DataShards; c++ {subMatrix[subMatrixRow][c] = r.m[validIndex][c]}}//数据完好行矩阵求逆dataDecodeMatrix, err = subMatrix.Invert()if err != nil {return err}fmt.Println("dataDecodeMatrix:", dataDecodeMatrix)//更新进tree中err = r.tree.InsertInvertedMatrix(invalidIndices, dataDecodeMatrix, r.Shards)if err != nil {return err}}outputs := make([][]byte, r.ParityShards)matrixRows := make([][]byte, r.ParityShards)outputCount := 0//取逆矩阵的前N阶方阵,此方阵就是数据重建的解码矩阵for iShard := 0; iShard < r.DataShards; iShard++ {if len(shards[iShard]) == 0 {if cap(shards[iShard]) >= shardSize {shards[iShard] = shards[iShard][0:shardSize]} else {shards[iShard] = make([]byte, shardSize)}outputs[outputCount] = shards[iShard]matrixRows[outputCount] = dataDecodeMatrix[iShard]outputCount++}
}
//数据重建,这个函数在编码中已讲r.codeSomeShards(matrixRows, subShards, outputs[:outputCount], outputCount, shardSize)if dataOnly {// Exit out early if we are only interested in the data shardsfmt.Println("only interested in the data shards")return nil
}
//如果还要重建校验数据,就把校验数据计算出来,和编码部分一样。outputCount = 0for iShard := r.DataShards; iShard < r.Shards; iShard++ {if len(shards[iShard]) == 0 {if cap(shards[iShard]) >= shardSize {shards[iShard] = shards[iShard][0:shardSize]} else {shards[iShard] = make([]byte, shardSize)}outputs[outputCount] = shards[iShard]matrixRows[outputCount] = r.parity[iShard-r.DataShards]outputCount++}}r.codeSomeShards(matrixRows, shards[:r.DataShards], outputs[:outputCount], outputCount, shardSize)return nil
}