UMEHOSHI ITA TOP PAGE COMPUTER SHIEN LAB
ブロック名 | 記号 | 説明 |
---|---|---|
「データブロック」 | DAB | 実際のデータが入るブロックで、同じビット長が続く状態で伝達します。
そして、直前の「データブロック」の変化量(+/-)を記憶させます。 格納データは符号あり情報としますが、ゼロや負のデータは1足した値を記憶させます。 これは全てのビットがゼロを「データゼロブロック」として利用するためです。 ビット長を変更する場合は、「データゼロブロック」で終端させ、 次の「切り替えブロック」で「データゼロブロック」の長さを変更します。 |
「基準ブロック」 | BAB | 「データブロック」が並ぶ前のどこかで、後に並ぶ最初の「データブロック」の基準値を 指定するブロック。これは後述する「切り替えブロック」の直後に存在する。(直後に存在しない場合もある。) このサイズは生データとする。(つまり、16bitのワードであれば16ビットになる) |
「切り替えブロック」 | CHB | 上記の「データブロック」のサイズを変更する時に使うブロック。 この直後に「基準ブロック」の存在の有無を指定するビットがある。 また、これに続く「データブロック」のビット長を指定するブロックである。 |
「辞書ブロック」 | DIB | 上記の「データブロックや「切り替えブロックが複数並ぶ領域を参照するブロック。 |
「辞書参照ブロック」 | RFB | 「辞書ブロックの再生タイミング位置に存在して、辞書を参照したデータ復元や再生に使う添え字を 記憶・管理する。 |
「データゼロブロック」 | ZEB | 上記の「データブロック」から「切り替えブロック」や「辞書参照ブロック」への変更前に 存在するブロックで、直前の「データブロック」のビット長と同じで、全てのビットがゼロ |
「辞書参照ブロック」は、「切り替えブロック」の仕様を次のように決めます。ともに「データゼロブロック」の後に出現するものです。
そこでこの情報の並びで先頭ビットが0を「辞書参照ブロック」、先頭ビットが1を「切り替えブロック」とします。
次にビット数を決めます。将来的に不確定要素があるので先頭の2ビットの並びが00であれば、8ビット長とします。
(これで先頭01並びのデータ長で将来、別のルールを設けこともできるでしょう。
00の8bit長では、残り6bitを辞書内の添え字として使えるので64個指定できることになります。これが少ないと感じる場合などは、
この並びで新しいルールを追加する場合の対応をしているということです。)
次に「切り替えブロック」のビット長ですが、少しでも短い方が良いので4bitに決定します。
先頭1ビット目は、「辞書参照ブロック」と区別で、必ず1です。
1000が「データブロック」を6ビット(-31〜31)に変更する「切り替えブロック」とします。
1001が「データブロック」を9ビット(-255〜255)に変更する「切り替えブロック」とします。
1010が「データブロック」を14ビット(-8191〜8191)に変更する「切り替えブロック」とします。
1011が「データブロック」を15ビット(-16383〜16383)に変更する「切り替えブロック」とします。
1100の場合、次のデータが「基準ブロック」の生データで、ビット長の変更無しとします。
2番目のビットが1の場合、次のデータが「基準ブロック」で、以下はビット長も変更する指定です。
1101「データブロック」を15ビット(-16383〜16383)に変更する「切り替えブロック」とします。
1110「データブロック」を16ビット(-32767〜32767)に変更する「切り替えブロック」とします。
1111「データブロック」を17ビット(-65535〜65535)に変更する「切り替えブロック」とします。
# 連続データの可逆圧縮プログラム umecmpre.py (解凍用:umedecmp.py) import wave class Block: bit_range=[6,9,14,15,16,17] # 戻り値の候補 ranges=[2**(v-1)-1 for v in bit_range] # 上記ビットの記憶可能な絶対値 prevBitSize=25 downcount = 0 # ビットサイズ連続する回数 def __init__(self, category='CHB', val=11): self.category=category self.nextBase = False if category=='DAB': self.value = val # データ設定値 elif category=='BAB': self.value=val # 基準ブロック elif category=='CHB': self.value=val # データビット長 elif category=='ZEB': self.value = val # ゼロブロック @classmethod def getBit(cls,datas, idx, baseValue, nextNumb): ''' datas[idx]〜datas[idx+nextNumb]で、必要なビット長を、 bit_rangeの中から比較して求めて返す。(-1リターンはエラー) 最初の値がbaseValueとして、 それからの変化量で記憶することを前提に記憶可能か判定する。 ''' bit_rang_idx = 0 # bit_range内走査用添え字 while True: # prevValue = baseValue iStart=idx iEnd=idx+nextNumb flagCanStore = True # 記憶可能フラグ if iEnd > len(datas) : iEnd = len(datas) for n in range(iStart,iEnd): diff = datas[n]-prevValue # 前のデータの差 #print(f"n:{n}, diff:{diff}, bit_rang_idx:{bit_rang_idx},{cls.ranges[bit_rang_idx]}") if abs(diff) > cls.ranges[bit_rang_idx]: #print(f"break:{abs(diff)}>{cls.ranges[bit_rang_idx]}") flagCanStore = False # 記憶ができないサイズ break # 記憶不可 prevValue = datas[n] #print(F" n={n}, iEnd={iEnd}") if flagCanStore == False: # 記憶するビットサイズが足りない bit_rang_idx += 1 if bit_rang_idx == len(cls.ranges): return -1 #エラー continue # ビットサイズを大きくして探す newBitSize = cls.bit_range[bit_rang_idx] if cls.downcount > 0 and newBitSize >= cls.prevBitSize: cls.downcount -= 1 return cls.prevBitSize # cls.downcount = nextNumb-1 if newBitSize > cls.prevBitSize: cls.downcount = 0 # 大きい方に変化した場合は0に変更 cls.prevBitSize=newBitSize return newBitSize # 必要なビット長 ''' 上記クラスの検証 blk = Block('CHB', 17) ys=[ -y*511 for y in range(0, 10)] for i in range(5): bit=Block.getBit(datas=ys, idx=i, baseValue=0, nextNumb=4) print(ys,"必要ビット:",bit) exit() # 終了 ''' #ここまでが簡易検証 import random print(Block.ranges) buffer=b'' fr=open("16bit16KHz_1KHz1024sin_0.bin","rb") # 圧縮対象 ys=[] dic = {} # 共通データがどれだけ存在するかの検証用 while True: bs = fr.read(2) # 可能な限り読み取る if len(bs) != 2 : break #ys.append(bs[0]+bs[1]*256-random.randrange(0, 50)) y=bs[0]+bs[1]*256 ys.append(y) sy=str(y) if sy in dic: dic[sy] = dic[sy] + 1 else: dic[sy] = 1 import matplotlib.pyplot as plt plt.plot(ys) plt.show() # データ確認用 for k,v in dic.items(): if v > 1: print(f"{k}のデータ数{v}個") adc_resolution=16 # サンプリング分解能 datas=ys # データ群のリスト baseValue=0 # 比較対象となる現在の値 nextNumb=4 # 必要ビットを探索すする先行ワード数 now_bit_len=adc_resolution+1 # 現在のブロックビット数 blk = Block('CHB', now_bit_len) # 「切り替えブロック」 blk.nextBase=True # 次のブロックが「基準ブロック」であることを指定 blocks=[blk] prevBlk=blk total_bit = 4 # 圧縮データサイス検証用 for idx in range(len(datas)): if prevBlk.nextBase: baseValue=datas[idx] blk = Block('BAB', baseValue) # 「基準ブロック」 blocks.append(blk) total_bit += adc_resolution # 検証用 prevBlk=blk continue chg_bit_len=Block.getBit(datas, idx, baseValue, nextNumb)#ビット長取得 #print(f"chg_bit_len:{chg_bit_len}") if(chg_bit_len == -1): raise "Error!" if now_bit_len!=chg_bit_len: total_bit += now_bit_len # 検証用 now_bit_len=chg_bit_len blk = Block('CHB', now_bit_len) # 「切り替えブロック」 print(f"({idx:5}) CHB bit={blk.value:2}") blocks.append(blk) total_bit += 4 # 検証用 blk = Block('DAB', datas[idx]-baseValue) # 「データブロック」 blk.now_bit_len = now_bit_len # pythonならではの情報埋め込み(デバック用) print(f"({idx:5}) DAB bit={now_bit_len:2} value={blk.value:5} data={baseValue+blk.value}") baseValue=datas[idx] blocks.append(blk) total_bit += now_bit_len # 検証用 prevBlk=blk # print(f"元サイス:{len(datas)*2}byte, 圧縮サイス:{int(total_bit/8+0.5)}")
上記では、実際に圧縮はさせていませんが各ブロックに必要なビット数から圧縮後のサイズを計算できるようにした検討用プログラムです。
このコードで、前述の振幅一定の1KHz正弦波バイナリファイルを使った実行結果は次のようになりました。
元サイス:2048byte, 圧縮サイス:2451
圧縮サイスの方が増えてしまうという失敗例です。
理由を検討して、対策を試みます。まず、上記仕様で圧縮した場合の各ブロックが並ぶ例を示します。
[CHB],[BAB],[DAB],[DAB],[ZEB],[CHB],[DAB],[DAB],[DAB],[DAB],[ZEB],[CHB],[DAB],[DAB],[DAB],[DAB]・・
[CHB],と[ZEB]の間にある[DAB]の並びは[CHB]で指定される同じサイスのデータブロックです。
ですから小さい[DAB]が多く続くほど、小さく圧縮できることになります。
実際に、振幅が徐々に大きくなる定の1KHz正弦波バイナリファイル(変化量が少ないデータを含む)を使った実行結果は次のようになりました。
元サイス:2048byte, 圧縮サイス:2009少しだけ、圧縮が成功しています。[ZEB],[CHB],の出現数を減らすために、nextNumbの設定値を4から7に変更した結果が次の通りです。
元サイス:2048byte, 圧縮サイス:1829
以上より失敗の理由は、、[ZEB],[CHB],が沢山あるためです。
前述の仕様では、[DAB]のサイズを変更するためには直前の[DAB]のサイズと同じビット数の[ZEB]と4ビット固定の[CHB]が必要です。
ですから、[ZEB],[CHB],を頻繁に行うと、簡単にサイズが大きくなります。
前述の失敗の原因は、[DAB]のサイズ変更が多いと、制御の[ZEB],[CHB],が多くなり、余計なサイズが増えます。
よって、不必要なサイズ変更を少なくすることが大事です。
[DAB]のビットサイズ変更用に、「切り替えブロック」の固定4bitの存在は致し方がないとしても、「切り替えブロック」の前に[ZEB]が必要か?と
いう点に着目して再検討しました。
元々はビットサイズが可変長なので、この切り替えを知らせるために、変更前の[DAB]のビットが全てゼロの[ZEB]を導入したのです。
(また、そのために[DAB]のデータパターンに全てゼロがビットの値を除いています。)
この[ZEB]の存在を無くす方法を考えて見ました。
その対応策は、[CHB]の中に、その後に続く『同じビット長の[DAB]の並ぶ個数を埋め込む手法です。
(こうすると[ZEB]をなくせて、に全てのビットがゼロの[DAB]もデータとして使うことができます。)
しかにこれを実現するためには、[CHB]にビット長を記憶するの固定4bit長以外に、並べる個数を記憶する領域が必要となります。
これに必要な最大値は、バッファサイス1024なので10bitです。
この方法ですと、[CHB]のサイズがは、固定4bit+10の14bitも必要で、前と同じで大きなサイズになってしまいます。
そこで、並べる個数を記憶する領域をどうするか検討して、[CHB]の詳細仕様を次のようにしてみます。
[DAB]のビットサイズを指定する部分は、固定長のままですが3bitとします。
前述ソースコードのBlockクラスのbit_rangeがサイズの種類のリストでは6個で足りているからです。
(b001がbit_range[0]、b010がbit_range[1]・・・と添え字に1を加算した値とします。)
そして並べる個数を記憶する領域を、5ビットにします。つまり、1〜32個の指定とします。
但し、[CHB]の3bit固定部の一部に特別な意味を持たせることにします。
●[CHB]が0b000である場合、現在の[DAB]のビット長を維持したまま、[DAB]が32個並ぶこと意味します。
個数が32なので、個数をを指定する5ビット領域を無しとします。
つまり、この特別な3bitの[CHB]だけで、同じビットサイズが33個並ぶ延長指定ができるということです。
●[CHB]が0b111である場合、後ろに並ぶ5ビットで先頭部にある辞書データの添え字(0〜31)を指定します。
(先頭部にある辞書データは、最大分解能のデータが並ぶ領域とします)
つまり、この[CHB]は、3+5=8bit固定長サイスです。
これは、例外的に大きく変化する情報を、ビット長指定なしで表現できる指定です。
よって比較的に大きなビット長で、重複して出現する情報を辞書データにいれて、8bitで指定できるように使うものです。
複数の箇所で同じデータを指定できますが、この指定データ最大32個しか登録できません。
またこの[CHB]の次のブロックは必ず[CHB]が続くものとします。
[CHB]が2続くと、合計16bitでデータを表現することになり全く圧縮にならない。
それで、[CHB]を続ける指定は無しに、これを使った場合は後述に変化しないビット長の[DAB]がNC_NUMB個並ぶ意味とします。
(RFB_NC_NUMBの大きさは、要検討ですが、16とします。余計な[CHB]の出現を抑えるための考慮です。)
これで、[CHB]8bitで辞書データを参照できますが、16bitデータであれば2個の存在で4bit縮小できます。
(3個の存在で、16*3-16-8*3=8で8bit縮小、 4個の存在で、16*4-16-8*4=16で16bit縮小となります。)
この能力は、以前の案(UMECMPRE Ver 0.1)の「辞書参照ブロック」に対応する新しい仕様となる。
●[CHB]が0b001〜0b110である場合、この0から6の添え字で、Blockのrangesリスト内のビット数を指定します。
そして、続く固定5ビット領域で続く[DAB]または[BAB]の個数の(0〜31)を指定します。
その個数の[DAB]または[BAB]の並びの直後に次の[CHB]が在ります。
その個数(ビット領域)が0で、データの終了を意味します。
無ければデータの終わりを意味します。
つまり、この[CHB]は、3+5=8bit固定長サイスになります。
[DAB]と[BAB]のどちらが並ぶかですが、[CHB]が0b110であれば[BAB]が並び、そうでまければ[DAB]が並びます。
先頭部の仕様と、その続きを次のように決めます。
「5bit:ranges[5]のビット数」←最大データの分解能ビット数を記憶するためのビット数」 「5bit:ranges[4]のビット数」 「5bit:ranges[3]のビット数」 「5bit:ranges[2]のビット数」 「5bit:ranges[1]のビット数」 「5bit:ranges[0]のビット数」 『6bit:「辞書参照ブロック」が続く個数(0から32)』 上記個数分だけ「最大データの分解能bit:辞書参照ブロック」・・・・・と並ぶ。 「[BAB] 最大データの分解能bit:先頭データ」 「[CHB]」 [CHB]で示されるビット長で、指定個数分だけ[DAB]または[BAB] が並ぶ。 『場合により、[辞書参照のCHB][CHB]』 [CHB]で示されるビット長で、指定個数分だけ[DAB]または[BAB]が並ぶ。 ・・・と続く。
下記ソースのアルゴリズムの仕様は、このリンクで示しています。
この試作ソースでは、「ベースビット長」を16ビットとしています。
(言語仕様によりますが、pythonであれば31ビットまでの符号なしデータのリストが圧縮可能です。)
使用モジュール: umedebug.py 、bitbyte.py
# 連続データの可逆圧縮プログラム umecmpre.py (解凍用:umedecmp.py) # 1回目にファイルの解析を行い、2回目に解析に基づく適切な圧縮方法を選んで変換する2パス方式 from umedebug import debug_hex4 dflags=[False]*20 from bitbyte import BitByte def to_uint(val, bitsize): # bitsizeビット長の符号ありvalを、符号なしに変換する return val & (1<<bitsize)-1 def to_int(uVal, bitsize): # bitsizeビット長の符号無しuValを符号ありに変換 sign_bit=1<<(bitsize-1) return uVal if uVal < sign_bit else uVal-(sign_bit<<1) class Block: debug = False bit_range=[7,9,11,13,15,16] # 「ビット長のテーブル」 min_s = [ -(2**(v-1)) for v in bit_range ] max_s = [ 2**(v-1)-1 for v in bit_range ] SBC_NUMB=7 # 8 9 # パス1用 同一ビットサイズの連続個数(この数以上並ぶ場合にビットサイズを縮小) prev_bit_range_idx=len(bit_range)-1 # get_bit_lengthメソッド実行で、以前のビット長を記憶 # def __init__(self, category='CHB', val=16 , base_value=None): self.category=category self.nextBase = False if category=='DAB': self.value = val # 元データは「self.value+self.base_value」になる。 self.now_bit_len=0 # 特別にこの変数を持つ self.base_value= base_value if base_value else 0 elif category=='BAB': self.value=val # 元データを直接記憶 self.now_bit_len=0 # 特別にこの変数を持つ self.base_value=0 elif category=='CHB': self.value=val # データのビット長(インデックスでない。) # def out_CHB_bitimage(self,bib :BitByte): if self.value == 0: bib.setdata( 0b000, 3) # サイズ変更なしで32個並データが直後に並ぶ return if hasattr(self, "ref_dic_value"): bib.setdata( 0b111, 3) # 辞書テーブル bib.setdata( self.ref_dic_value, 5 ) # 辞書テーブルの添え字 else: i_size = Block.bit_range.index(self.value) bib.setdata( i_size+1, 3) # サイズ指定あり # bib.setdata( self.less_32, 5) # 直後に並ぶデータ個数 # @classmethod def get_bit_length(cls,datas, idx, base_value, next_numb): ''' datas[idx]〜datas[idx+next_numb]で、必要なビット長を、bit_rangeの中から比較して求めて返す。(-1の戻り値はエラー) datas[idx-1]のデータをbase_valueとして呼び出し、一つ前からの変化量から、効率よく記憶するビット長を求めます。''' bit_range_idx = 0 # bit_range内走査用添え字(ビット最小値を指し示す) while True: # prev_value = base_value # 下記forでサーチするためのローカル変数初期化 istart=idx iend=idx+next_numb flag_can_store = True # 記憶可能フラグ if iend > len(datas) : iend = len(datas) bit_range_idx_same_flag=False # 前に実行したbit_range_idxと同じ長さでdatas[idx]を記憶可能なら、Trueにする。 if cls.debug: print(f"{cls.bit_range[bit_range_idx]}ビット長が{istart}から{iend}の添え字間で続くか検証?") for n in range(istart,iend): diff = datas[n]-prev_value # 前のデータの差 if cls.debug: print(f"記憶対象:{diff},{cls.min_s[bit_range_idx]}〜{cls.max_s[bit_range_idx]}") if cls.is_fit(diff, bit_range_idx)==False: if cls.debug: print(f"対象{diff}:記憶不可{cls.min_s[bit_range_idx]}〜{cls.max_s[bit_range_idx]}") flag_can_store = False # 記憶ができないサイズ break # 記憶不可 prev_value = datas[n] if n == 0 and cls.prev_bit_range_idx == bit_range_idx: bit_range_idx_same_flag=True # if flag_can_store == False: # 記憶用ビットサイズが足りない bit_range_idx += 1 # 記憶用ビットサイズを増やす if bit_range_idx == len(cls.bit_range): # ビットサイズが最大か? if abs(diff) <= 2**cls.bit_range[-1] : return cls.bit_range[-1] if cls.debug: print(f"記憶対象:{diff},bit_range_idx={bit_range_idx}") return -1 # エラー continue # ビットサイズを大きくして探す # if bit_range_idx >= cls.prev_bit_range_idx: if bit_range_idx_same_flag : if cls.debug: print(f"get_bit_length 前手と同じ戻り値:{cls.bit_range[cls.prev_bit_range_idx]}") return cls.bit_range[cls.prev_bit_range_idx] # new_bit_size = cls.bit_range[bit_range_idx] cls.prev_bit_range_idx=bit_range_idx if cls.debug: print(f"get_bit_length 前と異なる戻り値:{new_bit_size}") return new_bit_size # 必要なビット長 # @classmethod def is_fit(cls,value, bit_range_idx): ''' 符号付valueが、min_s[bit_range_idx]以上で、max_s[bit_range_idx]以下であるならTrue ''' if value < 0: value = -value-1 if cls.min_s[bit_range_idx] > value: return False if cls.max_s[bit_range_idx] < value: return False return True # @classmethod def set_samebitsize_data(cls,bib ,blocks, idx, numb, now_bit_size, d_count=0, debug_range=None): ''' 連続データを blocks[idx].value〜blocks[idx+numb].valueを bibのbitbyteに設定する。戻り値は、idx+numb+1 。sub_compress関数で利用 ''' for n in range(numb): u_int=to_uint(blocks[idx].value, now_bit_size) bib.setdata( u_int, now_bit_size) cls.debug_DAB_BAB_print(blocks[idx], idx, now_bit_size, data=u_int, same_count=n+1,d_count=d_count,debug_range=debug_range) idx+=1 d_count+=1 return idx # @classmethod def get_same_size_count(cls,blocks, idx, bit_size,numb): ''' blocks[idx]からidxを増やす方向で、blocks[idx+numb]まで、 categoryが'DAB'または'BAB'で、同じbit_sizeサイズが続く数をカウントして返す。 (numb個が全て同じサイズであれば、numbを返す。sub_compress関数で利用) blocksの終端になった場合は、それまでの数を返す。 ''' count = 0 while count < numb and idx < len(blocks): if blocks[idx].category!='DAB' and blocks[idx].category!='BAB' : break if blocks[idx].now_bit_len != bit_size : break count += 1 idx += 1 return count # @classmethod def debug_DAB_BAB_print(cls,block, idx=-1, bit_size=0, data=None, same_count=None, d_count=0, debug_range=None): if not debug_range: return if idx < debug_range[0] or idx > debug_range[1]: return print(f"[{idx:5}] {block.category} bit={bit_size:2} value={block.value:5}",end=" ") print(f"({d_count:5})", end="") if block.category=='DAB': data = f"base_value={block.base_value :5} data={block.base_value+block.value :5}" elif block.category=='BAB': data = f"data={block.value :5}" # same_count = "" if same_count==None else f" same count={same_count:2}" print( data , same_count) # @classmethod def debug_CHB_print(cls,blk_CHB, idx=0, debug_range=None): if not debug_range: return if idx < debug_range[0] or idx > debug_range[1]: return if blk_CHB.value == 0: print(f"({idx:5}) CHB bit={0:03b} 32個連続データが続く。") elif hasattr(blk_CHB, "ref_dic_value"): blk_CHB.value = 0b111 value=DuplicateCount.dict_table[blk_CHB.ref_dic_value] print(f"({idx:5}) CHB ref Idx={blk_CHB.ref_dic_value:03b} 辞書値:{value}",end="") print(f" {blk_CHB.less_32}個連続データが続く。") else: i_size = cls.bit_range.index(blk_CHB.value) print(f"({idx:5}) CHB bit={i_size+1:03b}({blk_CHB.value:2}ビット)",end="") if hasattr(blk_CHB, "less_32"): print(f" {blk_CHB.less_32}個連続データが続く。") else: print(f"連続データ個数未定") # @classmethod def get_header_bitbyte(cls, debug_flag=False)->BitByte: if debug_flag: print("ヘッダー部 生成 --------------------------------") bib_head=BitByte() #ヘッド部圧縮用バイナリ用 bib_head.setdata( 0 , 1) #1ビットで0を記憶([Ver]) BB = 5 # ビット長記憶域のビットサイズ bib_head.setdata( BB , 5) # [BB=5]の5ビットで記憶 for i in range(5, -1, -1): bib_head.setdata(cls.bit_range[i]-1, BB) # BB ビットで記憶 if debug_flag: print(cls.bit_range[i] , end=" ") if debug_flag: print( " :使用するビット長") # 先頭の辞書データの記憶 bib_head.setdata( DuplicateCount.index_ref, 6) # 辞書データの個数 (0〜32) if debug_flag: print(f"DIB(辞書ブロック)の個数:{DuplicateCount.index_ref}") for i in range( DuplicateCount.index_ref ): bib_head.setdata( 0 , 1) #1ビットで0を記憶( 1ワードサイズ辞書 ) bib_head.setdata( DuplicateCount.dict_table[i], cls.bit_range[5]) # 辞書データの記憶 if debug_flag: print(f"DIB[{i}]:{DuplicateCount.dict_table[i]} 使用数:{DuplicateCount.dict_count[i]}") if debug_flag: print("DIB(辞書ブロック)の終了") return bib_head # @classmethod def load_haeder(cls, bin_haed:BitByte,debug_flag=False): if debug_flag: print("ヘッダー部 復元 ------------------------------") #bib_head = BitByte(bin_haed) bib_head=bin_haed if bib_head.getdata(1) != 0: raise Exception("Version does not match") # 未決定仕様 BB = bib_head.getdata(5) #5bit取り出し for i in range( len(cls.bit_range) ): b=bib_head.getdata( BB ) # BB bit取り出し cls.bit_range[5-i]=b+1 if debug_flag: print(b+1, end=" ") dict_len=bib_head.getdata(6) # 0〜32 # 使用辞書の個数取得 DuplicateCount.dict_table=[0]*dict_len if debug_flag: print(f":使用するビット長\n辞書数:{dict_len}") for i in range( dict_len ): nw = 1 while True: n = bib_head.getdata(1) if n == 0: break nw += 1 DuplicateCount.dict_table[i] = bib_head.getdata(cls.bit_range[5]*nw) if debug_flag: print(f"DICT[{i}]:{DuplicateCount.dict_table[i]}") # class DuplicateCount: # 重複データを数えて記憶 NC_NUMB=1 # '辞書参照がある直後に存在するビット長が同じサイズのデータ数' DC_BSIZE=9 # 9 # 辞書参照に変更する対象データの最低ビットサイズ dict_table = [0] * 32 # 辞書テーブルデータ dict_count = [0] * 32 # 辞書要素の使用数(デバックと検証用) index_ref = 0 # 上記辞書リストの記憶数 def __init__(self, alldict, value): ''' alldictの辞書に登録する。この時、登録済であれば、数を数える ''' if value not in alldict : alldict[value] = 1 else : alldict[value] = alldict[value]+1 # @classmethod def sort(cls, alldict, numb, count): ''' alldictから、重複数がcount以上で、bitSizeを超えるのbit数のキーのリストを numb個以内で抽出して返す。''' rtnList = [] for key in alldict.keys(): cnt = alldict[key] if cnt < count: continue rtnList.append(key) # rtnList=sorted(rtnList,key=lambda k : alldict[k] , reverse=True) return rtnList[:numb] # @classmethod def get_index(cls, val): ''' val を dict_tableのリストに無い場合は追加記憶して、記憶した添え字を返す。 既に存在する場合は、存在する添え字を返す。リスト追加できない場合-1 ''' for i in range(cls.index_ref): if cls.dict_table[i] == val : return i # 記憶情報があり、その位置を返す # find_idx = cls.index_ref if find_idx == len(cls.dict_table): return -1 # フルで記憶不可 cls.dict_table[find_idx] = val # 記憶 cls.index_ref += 1 return find_idx # 記憶位置を返す # @classmethod def is_posible(cls, blocks, idx): ''' blocks[idx]blocks[idx+DuplicateCount.NC_NUMB]の間で bitサイズ変更が無ければTrueを返す ''' bit_len = blocks[idx].now_bit_len top_category=blocks[idx].category if top_category != 'DAB': return False n=1 while n <= DuplicateCount.NC_NUMB and idx < len(blocks): if blocks[idx].category!=top_category: return False if blocks[idx].now_bit_len != bit_len: return False n+=1 idx+=1 return True # @classmethod def keys_print(cls, alldict, keys): for n, key in enumerate(keys): print( f"{n+1:2} 値:{key:6}, 重複数:{alldict[key]:3}個") def creat_block(datas,alldict,next_numb, debug_range=None): ''' 圧縮用にデータをBlockのリストを作り、それを返すパス1の処理。 また、重複データを数えた辞書をalldictに構築する''' if debug_range : print( list(zip(Block.min_s,Block.max_s)) ) Block.prev_bit_range_idx=len(Block.bit_range)-1 # get_bit_lengthメソッド実行で、以前のビット長を記憶 now_bit_len=Block.bit_range[-1] # 現在のブロックビット数 total_bit = now_bit_len # 圧縮データサイス検証用 base_value=datas[0] # 比較対象となる現在の値 (最初の先頭データ) blk = Block('BAB', base_value) # 「切り替えブロック」 blocks=[blk] Block.debug_DAB_BAB_print(blk, 0, blk.value,debug_range=debug_range) blk_CHB=Block('CHB', now_bit_len) # 暗黙の「切り替えブロック」 for idx in range(1,len(datas)): chg_bit_len=Block.get_bit_length(datas, idx, base_value, next_numb)#ビット長取得 if(chg_bit_len == -1): raise Exception("Error! in creat_block") if now_bit_len!=chg_bit_len or idx == 1: now_bit_len=chg_bit_len blk_CHB = Block('CHB', now_bit_len) # 「切り替えブロック」 Block.debug_CHB_print(blk_CHB, idx,debug_range=debug_range) blocks.append(blk_CHB) total_bit += 8 # 検証用 # if now_bit_len < Block.bit_range[-1]: blk = Block('DAB', datas[idx]-base_value, base_value) # 「データブロック」生成 else: blk = Block('BAB', datas[idx] ) # 「基準ブロック」生成 # base_value=datas[idx] DuplicateCount(alldict, datas[idx]) # alldictで重複データカウント blk.now_bit_len = now_bit_len # pythonならではの情報埋め込み(デバック用) Block.debug_DAB_BAB_print(blk, idx, bit_size=now_bit_len, data=datas[idx],debug_range=debug_range) blocks.append(blk) # total_bit += now_bit_len # 検証用 # if debug_range: print(f"元サイス:{len(datas)*2}byte, 純粋データの圧縮サイス:{int(total_bit/8+0.5)}+α") return blocks def sub_compress(bib, blocks, keys , d_count=0, debug_range=None): ''' blocksのリスト内要素に従って、BAB,DAB,CHBのブロックを確定しながらbibにシリアライズする。 bibはBitByteのオブジェクトで、ここに圧縮してそれを返す。(解凍メソッドはsub_decompress。) blocksはBlockのリスト、keysは、データをキーとする辞書候補の辞書型。 ''' now_bit_size=Block.bit_range[5] # 先頭データビットサイズ d_count+=1 # 格納データ数カウント(デバック表示用) Block.debug_DAB_BAB_print( blocks[0], idx=0, bit_size=now_bit_size , d_count=d_count,debug_range=debug_range) bib.setdata( blocks[0].value, now_bit_size) # 先頭データの記憶 base_value=blocks[0].value # データ群の圧縮 idx = 1 now_CHB = blocks[idx] idx += 1 first_CHB_flag=True while idx < len(blocks): # if dflags[0] : from IPython.core.debugger import Pdb; Pdb().set_trace() # dflags[0]=debug_hex4(0,f"{now_CHB.value}+++{now_bit_size}:+++{idx}",True) if now_CHB.category=='CHB': if now_CHB.value != 0 and not hasattr(now_CHB, "ref_dic_value"): chg_bit_size = now_CHB.value i32 = 0 dict_cnt_idx = 0 change_size_flag = False while i32 < 32: # 32個の同じサイズが続くか? (次の'CHB'を挿入する場所で止める) idx2=idx+i32 if idx2 >= len(blocks): break # 終端? if blocks[idx2].category=='CHB' : change_size_flag = True break #サイズが変化 # if blocks[idx2].now_bit_len != chg_bit_size: raise Exception("contradict CHB") if blocks[idx2].category!='DAB' and blocks[idx2].category!='BAB': raise Exception("contradict DAB") # if blocks[idx2].category=='BAB': base_value = blocks[idx2].value elif blocks[idx2].category=='DAB': if base_value!=blocks[idx2].base_value: print(f"[{idx2}]contradict DATA: {base_value} != {blocks[idx2].base_value}") raise Exception("contradict DATA") # base_value += blocks[idx2].value # rValue = blocks[idx2].base_value + blocks[idx2].value # 辞書の参照ビットサイズ変更情報がないので、直後のブロックでビットサイズが変わらない場合だけ使える。 if now_bit_size >= DuplicateCount.DC_BSIZE and idx2 > 2 and i32 > 0: #最初のCHB以降で行う if rValue in keys: #辞書登録データか? #dict_cnt_idx=Block.get_same_size_count(blocks, idx2+1, now_bit_size, DuplicateCount.NC_NUMB) # 上記は、辞書テーブル参照のCHBに続くデータがDuplicateCount.NC_NUMB個の固定長時に使う処理 dict_cnt_idx=1 # 辞書テーブル参照のCHBに続くデータ個数がCHBに含む場合の指定 if dict_cnt_idx != 0: #辞書参照可能か? break # if i32==31 and (first_CHB_flag or hasattr(now_CHB, "ref_dic_value") or chg_bit_size != now_bit_size): break i32+=1 # first_CHB_flag=False if i32 == 32 and chg_bit_size == now_bit_size: # 同じサイズが連続32個続く出力 now_CHB.value=0 # 32個連続のCHBに変更 Block.debug_CHB_print(now_CHB, idx,debug_range=debug_range) now_CHB.out_CHB_bitimage(bib) # サイズ変更なしで並ぶ idx=Block.set_samebitsize_data(bib, blocks, idx, 32, now_bit_size,d_count,debug_range=debug_range) d_count+=32 now_CHB = Block('CHB', val=now_bit_size) continue # if dict_cnt_idx != 0: #辞書参照データがあって切り替わる if i32 != 0 or hasattr(now_CHB, "ref_dic_value") : # 前のCHBも辞書であった。 now_CHB.less_32=i32 # 同じサイズが連続する個数が確定 now_CHB.out_CHB_bitimage(bib) # ビットイメージセット Block.debug_CHB_print(now_CHB, i32, debug_range=debug_range) idx=Block.set_samebitsize_data(bib, blocks, idx, i32, chg_bit_size,d_count,debug_range=debug_range) d_count+=i32 now_CHB = Block('CHB', 0b111) # 辞書参照ブロック生成 base_value = blocks[idx2].base_value + blocks[idx2].value now_CHB.ref_dic_value = DuplicateCount.get_index( base_value ) # 辞書テーブル内添え字取得 idx+=1 if idx >= len(blocks): # 残りのデータがない? now_CHB.less_32=0 # サイズ変更ありで0個並ぶ 終了指定 now_CHB.out_CHB_bitimage(bib) # ビットイメージセット Block.debug_CHB_print(now_CHB, idx, debug_range=debug_range) now_CHB = Block('CHB', val=now_bit_size) break continue # # 32個以下で同じサイズが連続 now_CHB.less_32=i32 # 同じサイズが連続する個数が確定 now_CHB.out_CHB_bitimage(bib) # ビットイメージセット Block.debug_CHB_print(now_CHB, idx-1,debug_range=debug_range) idx=Block.set_samebitsize_data(bib, blocks, idx, i32, chg_bit_size,d_count,debug_range=debug_range) d_count+=i32 if change_size_flag: if blocks[idx].category!='CHB': raise Exception("contradict CHB sequence") now_CHB = blocks[idx] idx+=1 else: now_CHB = Block('CHB', val=chg_bit_size) # now_bit_size = chg_bit_size if idx >= len(blocks): break # 終了 base_value=blocks[idx].base_value else: raise Exception("contradict exist CHB") # now_CHB.less_32=0 # サイズ変更ありで0個並ぶ 終了指定 now_CHB.out_CHB_bitimage(bib) # ビットイメージセット Block.debug_CHB_print(now_CHB, idx, debug_range=debug_range) return bib def sub_decompress(bib, debug_range=None): ''' bib のbitByte型を解凍して、データリストを返す ''' data2 = [] # 解凍記憶データ idx2=0 chg_bit_size=Block.bit_range[5] base_value=bib.getdata(chg_bit_size) # 現在の設定値 if debug_range and debug_range[0]<=idx2<=debug_range[1]:print(f"[{idx2:4}]:{chg_bit_size:2}bit{base_value:6}") idx2+=1 data2.append( base_value ) while True: n32=0 chb = bib.getdata(3) # CHB 読み取り if chb == 0: # 32個連続 n32=32 if debug_range and debug_range[0]<=idx2<=debug_range[1]: print(f"CHB {n32}個") elif chb == 0b111: # 辞書テーブル参照 dict_idx = bib.getdata(5) # 辞書添え字 base_value = DuplicateCount.dict_table[dict_idx] data2.append( base_value ) idx2+=1 if debug_range and debug_range[0]<=idx2<=debug_range[1]: print(f"[{idx2:4}]:{chg_bit_size:2}bit {base_value:5} 辞書参照") n32=bib.getdata(5) # 同じサイズが連続する個数 if n32 == 0: continue else: chg_bit_size=Block.bit_range[chb-1] n32=bib.getdata(5) if debug_range and debug_range[0]<=idx2<=debug_range[1]: print(f"CHB BIT:{chg_bit_size}bit {n32}個") # if n32 == 0 : break # 終了 for n in range(n32): if chg_bit_size==Block.bit_range[5]: base_value = bib.getdata(chg_bit_size) if debug_range and debug_range[0]<=idx2<=debug_range[1]:print(f"[{idx2:4}]:{chg_bit_size:2}bit {base_value:5}") else: diff= bib.getdata(chg_bit_size) diff = to_int(diff, chg_bit_size) base_value += diff if debug_range and debug_range[0]<=idx2<=debug_range[1]:print(f"[{idx2:4}]:{chg_bit_size:2}bit {base_value:5} diff={diff:5}") # idx2+=1 data2.append( base_value ) # return data2 def ume_compress( buff : list, debug_range = None): alldict = {} # datasの値の重複を調べる(参照用の辞書(32個まで)の作成用) print("パス1 圧縮のための解析-------------------------") blocks=creat_block(datas=buff,alldict=alldict,next_numb=Block.SBC_NUMB, debug_range=debug_range) print("辞書に登録するデータ選定-------------------------") keys=DuplicateCount.sort( alldict, numb=32, count=2) # 重複数2以上を種類32個を抽出 if debug_range : DuplicateCount.keys_print(alldict, keys) print("パス2 圧縮------------------------------------") bib_data=BitByte() #データ部圧縮バイナリ用生成 bib_data=sub_compress(bib_data, blocks, keys, debug_range=debug_range) bin_data=bib_data.getbinary() print(f"{len(buff)*2}byteを圧縮⇒バイナリデータサイズ:{len(bin_data)}byte {DuplicateCount.index_ref}") bin_haed=Block.get_header_bitbyte(debug_flag=debug_range!=None) # ヘッダー部バイナリ生成 bin_haed.add_bitbyte( bin_data, bib_data.bit_size ) bin_haed.padding() return bin_haed.getbinary() def ume_decompress( buff : list, debug_range = None): print("\n\n---------- 解凍 ---------------") bib=BitByte(buff) Block.load_haeder(bib, debug_flag=debug_range!=None) # ヘッダ部の復元 buff2=sub_decompress(bib, debug_range=debug_range) return buff2 if __name__ == '__main__': NUMB=1024 lst=[int(y) for y in range(NUMB)] datas=[] while True: y=0 for x in lst: y+=x if y > 65535 : break # uint16 以内 datas.append(y) if len(datas) >= NUMB: break if len(datas) >= NUMB: break # #datas=[0,25,100,65535,0,65534,1,65533,2,65532,3,65531,4,65530,5,65535,0,65535,0,55535,0,45535,0,35535,0,25535,0] datas=[40000, 34000, 29000, 30000, 36000, 35000, 30000, 31000 ] #datas=datas[::-1] import matplotlib.pyplot as plt plt.plot(datas) plt.show() # データ確認用 debug_range=(0,5000) # 始点〜終点 bin_data=ume_compress(datas,debug_range) # 圧縮 data2=ume_decompress(bin_data,debug_range) # 解凍 print(f"元サイス:{len(datas)*2}byte, 圧縮サイス:{len(bin_data)}byte") if datas == data2: print("元データと圧縮データが、完全に一致") else: for i in range(len(datas)): if datas[i] == data2[i]: print(f"一致[{i}]:{datas[i]} ") else : print(f"不一致[{i}]:{datas[i]}!={data2[i]} ")パス2では、パス1で得られたブロック群で必要な新たなCHBを生成追加して、 それらブロックをバイナリとして詰めていく作業です。
from umecmpre0 import ume_compress, ume_decompress import random fr=open("16bit16KHz_1KHz1024sin.bin","rb") # 圧縮対象 #fr=open("16bit16KHz_1KHz1024sin_0.bin","rb") # 圧縮対象 datas=[] while True: bs = fr.read(2) # 2byte読み取る if len(bs) != 2 : break #ys.append(bs[0]+bs[1]*256-random.randrange(0, 50)) y=bs[0]+bs[1]*256 datas.append(y) fr.close() datas=datas[::-1] import matplotlib.pyplot as plt plt.plot(datas) plt.show() # データ確認用 debug_range=(0,5000) # 始点〜終点 bin_data=ume_compress(datas,debug_range) # 圧縮 data2=ume_decompress(bin_data,debug_range) # 解凍 print(f"元サイス:{len(datas)*2}byte, 圧縮サイス:{len(bin_data)}byte") if datas == data2: print("元データと圧縮データが、完全に一致") else: for i in range(len(datas)): if datas[i] == data2[i]: print(f"一致[{i}]:{datas[i]} ") else : print(f"不一致[{i}]:{datas[i]}!={data2[i]} ")
# 16KHzで16Bitのサンプリングでリトルエンディアン符号無しの1000Hz正弦波データを1024ワード並ぶファイルを、16Bit_1KHz_1024_sin.binの名前で作る。 # このファイルのバイト数は、1024ワードなので、2048byteになる。全体の時間=(1/16KHz)*1024=(1/16e3)*1024=0.064秒 # 振幅は、0〜1024で変化する生成にする。 import math import numpy as np sample_rate = 16000 # サンプリングレート frequency=1000 singen=lambda sec : math.sin( math.pi * 2 * frequency * sec )*(2**15-1) + (2**15-1) singen=lambda sec : math.sin( math.pi * 2 * frequency * sec )*(2**15-1)*(sec/0.065) + (2**15-1) ts = [ 1/sample_rate * t for t in range( 1024 )] ys = [ singen( t ) for t in ts ] ys = [ singen( t )+int(np.random.uniform(-5,5)) for t in ts ] bs=np.array(ys, np.uint16) buffer=b''.join(bs) # バイナリに変換 with open("16bit16KHz_1KHz1024sin.bin","wb") as fw: fw.write( buffer ) # 確認用の表示 import matplotlib.pyplot as plt plt.plot( ts, ys ) plt.show()