UMEHOSHI ITA TOP PAGE COMPUTER SHIEN LAB
アルゴリズム(仕様)はここを参照)
''' umcmprs.py UME連続可逆圧縮 (UME COMPRESSの略名:UMCMPRS) 「UMEHOSHI ITA」基板(https://manabu.quu.cc/up/ume/)で、 USB転送能力を補うために作ったものですが、他にも利用できるでしょう。 (自由にご利用していただいて結構です。 可能な限り下記内容の"umcmprs.py"のモジュール内に上記コメントを含めてご利用ください。) ''' #from umedebug import debug_hex4 #dflags =[False]*20 # dflags[0]=debug_hex4(nn=0, data=0b0001, enable=True) # dflags[1]=debug_hex4(1, f"{"++++++++"}", dflags[0]) #from IPython.core.debugger import Pdb; Pdb().set_trace() ''' 1〜16bitの指定のデータを詰めてbyteごとにcallbackを実行する。''' class Bit2Byte: count_byte=0 def __init__(self, callback ): self.bit32=0 # bit設定の作業用変数 self.bitcount=0 # 上記への記憶bit数。 8bit毎にcallback処理 self.callback=callback # def setdata(self, data, bitsize): # bit列をbyte列に変換 ビット列構築用で使う。 '''bitsizeのビット数で、dataを記憶 1byte分に詰められたタイミングで callbackを呼ぶ''' bitmask = (1<<bitsize)-1 if data > bitmask : raise Exception(f"{data} does not fit in {bitsize} bit") self.bit32 |= data << self.bitcount self.bitcount += bitsize while self.bitcount >= 8: self.callback(self.bit32 & 0x0ff) Bit2Byte.count_byte+=1 self.bit32 >>= 8 self.bitcount-=8 # def padding(self): # ビット列構築用で使う ''' bit_sizeサイズが、丁度byteに達するように0で埋める''' if self.bitcount != 0: self.setdata(0, 8-self.bitcount) # bit列をbyte列に変換 ''' byteのデータ群から指定のビット長を取り出す ''' class Byte2Bit: def __init__(self, bytedata:bytes ): # 圧縮データを引数するコンストラクタ self.byte_buf = bytearray(bytedata) self.byte_buf_len=len(bytedata) # 上記のbit記憶数 self.bit32=0 # bit設定の作業用変数(0〜8) self.bitcount=0 # 上記へのbit記憶数。 # def getdata(self,bitsize): # ビット列復元用で使う '''bitsizeのビット数の値を取り出して返す ''' rv = (self.byte_buf_len << 3) + self.bitcount - bitsize if rv < 0 : return rv # 負の値(足りないビット数) bitmask = (1<<bitsize)-1 while self.bitcount < bitsize and self.byte_buf_len>0: byte0 = self.byte_buf.pop(0) self.byte_buf_len-=1 self.bit32 |= byte0 << self.bitcount self.bitcount += 8 val = self.bit32 & bitmask self.bit32 >>= bitsize self.bitcount -= bitsize return val # def append(self, bytedata:bytes ): # 圧縮データを追加 for i in range( len( bytedata ) ): self.byte_buf.append( bytedata[i] ) self.byte_buf_len+=1 # 上記のbit記憶数 def to_uint(val, bitsize): # bitsizeビット長の符号ありvalを、符号なしに変換する return val & (1<<bitsize)-1 def to_int(uVal, bitsize): # bitsizeビット長の符号無しuValを符号ありに変換 sign_bit=1<<(bitsize-1) return uVal if uVal < sign_bit else uVal-(sign_bit<<1) class ChbBlock: debug = False SBC_NUMB=15 # パス1用 同一ビットサイズの連続個数(この数以上並ぶ場合にビットサイズを縮小) bit_bsize=3 bit_range=[7,9,11,13,15,16] # 「ビット長のテーブル」 BB_size=5 # 上記「ビット長のテーブル」要素のバイナリ記憶時のビットサイズ min_s = [ -(2**(v-1)) for v in bit_range ] max_s = [ 2**(v-1)-1 for v in bit_range ] # @classmethod def initialize(cls): cls.prev_CHB = None # get_sequence_bit_indexで利用する前のCHB cls.prev_bit_range_idx=5 cls.prev_data=0 cls.flag_size_confirmed=False # prev_CHBにサイズ変更のCHBが生成設定時にFalseで、その送出が終わったらTrue cls.data_buffer = None # 先頭データでget_sequence_bit_indexを使う時に記憶 cls.index_buff = 0 cls.i32 = 0 # 次に続くDABやBABの数で、直後がCHBの場合は0になる cls.bitbyte = None # 圧縮用Bit2Byteの設定 # def __init__(self, bit_size_idx, buffer_idx, dict_idx = -1): self.bit_size_idx = bit_size_idx # サイズ変更 self.buffer_idx = buffer_idx self.dict_idx = dict_idx # 辞書参照の添え字 # def out_CHB_bitimage(self, index_buff=0): n_word = 0 if ChbBlock.i32 == 32: if ChbBlock.bitbyte :ChbBlock.bitbyte.setdata( 0 , 3 )# サイズ変更無しで32個のデータが続く if ChbBlock.debug: print(f" [CHB] {32}個") elif self.dict_idx != -1: n_word = 1 # 参照ワード数 for idxtbl in range(len(DuplicateCount.idxs)): if DuplicateCount.idxs[idxtbl].idx == self.dict_idx: break if DuplicateCount.idxs[idxtbl].count2 >= DuplicateCount.idxs[idxtbl].count1: n_word = 2 if ChbBlock.bitbyte : ChbBlock.bitbyte.setdata( 0b111, 3) # 辞書テーブル ChbBlock.bitbyte.setdata( idxtbl , 5 ) # 辞書テーブルの添え字 ChbBlock.bitbyte.setdata( ChbBlock.i32 , 5 ) if ChbBlock.debug: print(f"[{self.buffer_idx:4}][CHB] :{ChbBlock.data_buffer[self.dict_idx]:5}", end="") if n_word == 2: print(f" ,{ChbBlock.data_buffer[self.dict_idx+1]}", end="") print(f" [{self.dict_idx:4}] dict_idx={idxtbl} の辞書参照 直後のデータ{ChbBlock.i32}個 ") else: if ChbBlock.bitbyte : ChbBlock.bitbyte.setdata( self.bit_size_idx+1, 3) ChbBlock.bitbyte.setdata( ChbBlock.i32 , 5 ) if ChbBlock.debug: print(f" [CHB] bit size={ChbBlock.bit_range[self.bit_size_idx]} {ChbBlock.i32}個") return n_word # 辞書の場合の参照ワード数、辞書でない場合は0 # @classmethod def is_fit(cls,value, bit_range_idx): ''' 符号付valueが、min_s[bit_range_idx]以上で、max_s[bit_range_idx]以下であるならTrue ''' if bit_range_idx==5 and value < 0: return False if bit_range_idx==5 and value <= cls.max_s[5]-cls.min_s[5]: return True if value < 0: value = -value-1 if cls.min_s[bit_range_idx] > value: return False if cls.max_s[bit_range_idx] < value: return False return True # @classmethod def get_bit_index(cls, value:int): ''' valueを記憶するための最小ビットサイズを指し示す ChbBlock.bit_range 配列の添え字を返します。''' bit_range_idx=0 while bit_range_idx < len(cls.bit_range)-1: if cls.is_fit(value, bit_range_idx) : return bit_range_idx bit_range_idx += 1 return bit_range_idx # @classmethod def flush_i32(cls, n_word ): ''' cls.i32 個のデータの出力 ''' idx = cls.prev_CHB.buffer_idx + n_word prev_base_val = cls.data_buffer[idx-1] for n in range(cls.i32): base_val = cls.data_buffer[idx+n] bitsize=ChbBlock.bit_range[cls.prev_CHB.bit_size_idx] if cls.prev_CHB.bit_size_idx == 5: if cls.is_fit(base_val, cls.prev_CHB.bit_size_idx) == False: raise Exception(f"not match get_sequence_bit_index(value={base_val},bit_range_idx={cls.prev_CHB.bit_size_idx})") if cls.bitbyte :cls.bitbyte.setdata( base_val , bitsize ) # ベースビット長で保存 if cls.debug:print(f"[{idx+n :4}][BAB] :{ base_val:5}, i32={n}") else: diff = base_val - prev_base_val if cls.is_fit(diff, cls.prev_CHB.bit_size_idx) == False: raise Exception(f"not match get_sequence_bit_index(value={base_val},bit_range_idx={diff})") diff2 = to_uint( diff , bitsize) # 差分 if cls.bitbyte :cls.bitbyte.setdata( diff2 , bitsize ) # ベースビット長で差分保存 if cls.debug:print(f"[{idx+n :4}][DAB] :{ base_val:5},{bitsize}bit,diff={diff}, i32={n}") prev_base_val = base_val # cls.i32=0 # @classmethod def get_sequence_bit_index(cls, uint_data, data_buffer=None, bit2byte:Bit2Byte=None): ''' data_buffer!=Noneならget_sequence_bit_index利用シーケンスの先頭要素の処理で、 data_bufferやbit2byteの登録処理になる。 先頭引数uint_dataだけ(data_buffer==None)の呼び出しは、残りの要素の処理になる。 bitByte2=None 指定は、uint_dataをcls.data_bufferに記憶する処理で、set_dupli_count関数で使われる。 のこ場合は、別途で辞書候補用の重複数カウント処理が併用される。 bitByte2!=None 指定はuint_dataをbitByte2に埋め込む処理で、ume_compress_out関数が使われる。 のこ場合は、別途で辞書候補用リストの参照をbitByte2に埋め込む処理が併用される。 戻り値は暫定ビットサイズです。 ''' #if cls.debug:print(f"UINT INPUT:{uint_data}") if data_buffer != None : # 先頭データの場合 cls.index_buff = 0 cls.i32 = 0 cls.flag_size_confirmed=False # サイズ変更のCHBが生成時にFalseで、その送出が終わったらTrue cls.prev_bit_range_idx=len(cls.bit_range)-1 # get_bit_lengthメソッド実行で、以前のビット長を記憶 bitsize= ChbBlock.bit_range[ cls.prev_bit_range_idx ] cls.prev_data = uint_data cls.data_buffer = data_buffer cls.bitbyte=bit2byte if cls.bitbyte == None: cls.data_buffer[cls.index_buff]=uint_data # パス1の記憶 else: cls.bitbyte.setdata( to_uint( cls.data_buffer[cls.index_buff] , bitsize) ,bitsize) if cls.debug:print(f"[{cls.index_buff :4}][BAB] :{ cls.data_buffer[cls.index_buff]:5}") return cls.prev_bit_range_idx # cls.index_buff+=1 if cls.bitbyte == None: cls.data_buffer[cls.index_buff]=uint_data # パス1の記憶 diff = uint_data - cls.prev_data bit_size_idx = cls.get_bit_index(diff) cls.prev_data = uint_data cls.i32 += 1 flush_flag = False if cls.i32 == 1 and cls.prev_CHB is None: cls.prev_CHB = ChbBlock(bit_size_idx, cls.index_buff) # 暫定CHB生成 if cls.prev_bit_range_idx != bit_size_idx: cls.flag_size_confirmed=False cls.prev_bit_range_idx = bit_size_idx elif bit_size_idx > cls.prev_bit_range_idx:#ビットサイズを大きくする必要がある? if cls.i32 < ChbBlock.SBC_NUMB and cls.prev_CHB.dict_idx == -1: cls.prev_CHB.bit_size_idx = bit_size_idx # 前のCHBのビットサイズ大きく変更 cls.prev_bit_range_idx = bit_size_idx cls.flag_size_confirmed = False if cls.i32 >= 31: flush_flag = True else: # cls.prev_CHBが辞書または、cls.i32 >= ChbBlock.SBC_NUMB の場合 cls.i32 -= 1 n_word=cls.prev_CHB.out_CHB_bitimage() # 前の辞書を出力 cls.flush_i32(n_word) cls.i32 += 1 cls.prev_CHB = ChbBlock(bit_size_idx, cls.index_buff) # 暫定CHB生成 if cls.prev_bit_range_idx != bit_size_idx: cls.flag_size_confirmed=False cls.prev_bit_range_idx = bit_size_idx elif (not cls.flag_size_confirmed or cls.prev_CHB.dict_idx != -1)and cls.i32 == 31: flush_flag = True elif cls.flag_size_confirmed and cls.i32 == 32: flush_flag = True elif bit_size_idx < cls.prev_bit_range_idx and cls.i32 >= ChbBlock.SBC_NUMB: cls.flag_size_confirmed = False flush_flag = True # ビットサイズを小さくする出力指示。 elif bit_size_idx < cls.prev_bit_range_idx and \ cls.prev_bit_range_idx == 5 and cls.i32 >= 4: cls.flag_size_confirmed = False flush_flag = True # ベースビット長時のビットサイズを小さくする出力指示(4個で判定)。 # if flush_flag: n_word=cls.prev_CHB.out_CHB_bitimage() cls.flush_i32(n_word) if cls.prev_CHB.i32 != 32 and cls.prev_CHB.dict_idx == -1: cls.flag_size_confirmed=True ChbBlock.prev_CHB = None # return cls.prev_bit_range_idx # @classmethod def get_sequence_bit_index_flush(cls): if cls.prev_CHB : n_word=cls.prev_CHB.out_CHB_bitimage() cls.flush_i32(n_word) ChbBlock.prev_CHB = None if cls.bitbyte :cls.bitbyte.setdata( 6 , 3 )# 終了 CHB 送出 if cls.bitbyte :cls.bitbyte.setdata( 0 , 5 ) if ChbBlock.debug: print(f" [CHB] bit size={ChbBlock.bit_range[5]} {0}個") if cls.bitbyte :cls.bitbyte.padding() # byteに足りないbit列を0で埋める return 5 # @classmethod def get_header_bitbyte(cls, idxs, datas, callback)->Bit2Byte: ''' idxsは、DuplicateCountの辞書参照用リスト ''' if ChbBlock.debug: print("------------------------------ create header") bib=Bit2Byte( callback ) #ヘッド部圧縮用バイナリ用 bib.setdata( 0 , 1) #1ビットで0を記憶([Ver]) bib.setdata( ChbBlock.BB_size , 5) # ビット長記憶域のビットサイズを5ビットで記憶 for i in range(5, -1, -1): bib.setdata(cls.bit_range[i]-1, ChbBlock.BB_size) # ChbBlock.BB_size ビットで記憶 if ChbBlock.debug: print(cls.bit_range[i] , end=" ") if ChbBlock.debug: print( " :Used bit length") # 先頭の辞書データの記憶 bib.setdata( len( idxs ), 5) # 辞書データの個数 (0〜31) if ChbBlock.debug: print(f"DIB(辞書ブロック)の個数:{ len( idxs ) }") for i in range( len( idxs ) ): n_bit = 1 if idxs[i].count1 <= idxs[i].count2 else 0 idx_buf=idxs[i].idx if idx_buf+1 == len(datas) : n_bit = 0 if n_bit == 0: bib.setdata( 0 , 1) # 0記憶( 辞書ワードサイズが1 ) else: bib.setdata( 1 , 1) # 10を記憶( 辞書ワードサイズが2 ) bib.setdata( 0 , 1) # 10を記憶( 辞書ワードサイズが2 ) if ChbBlock.debug: print(f"DIB[{i}]:使用数:{max(idxs[i].count1,idxs[i].count2)}個",end="") if ChbBlock.debug: print(f" 値:{datas[idx_buf]}",end="") bib.setdata( datas[idx_buf] , cls.bit_range[5]) # 辞書データの記憶 if n_bit == 1 : if ChbBlock.debug: print(f", {datas[idx_buf+1]} ",end="") bib.setdata( datas[idx_buf+1] , cls.bit_range[5]) # 辞書データの記憶 if ChbBlock.debug: print() if ChbBlock.debug: print(f"-----------DIB(辞書ブロック)の終了 header size: {Bit2Byte.count_byte}byte") return bib # @classmethod def load_haeder(cls, bin_haed:Byte2Bit): ''' ChbBlock.BB_sizeと、ChbBlock.bit_range[0]から[5]の読み込みと DuplicateCount.dict_table の生成と読み込み。 ''' if ChbBlock.debug: print("------------------------------ヘッダー部 復元") bib=bin_haed #bib = BitByte(bin_haed) if bib.getdata(1) != 0: raise Exception("Version does not match") # 未決定仕様 ChbBlock.BB_size = bib.getdata(5) #5bit取り出し if ChbBlock.debug: print(f"「ビット長のテーブル」要素のサイズ:{ChbBlock.BB_size} bit") for i in range( len(cls.bit_range) ): b=bib.getdata( ChbBlock.BB_size ) # BB bit取り出し cls.bit_range[5-i]=b+1 if ChbBlock.debug: print(b+1, end=" ") dict_len=bib.getdata(5) # 0〜31 # 使用辞書の個数取得 DuplicateCount.dict_table=[None]*dict_len if ChbBlock.debug: print(f":使用するビット長\nDIB(辞書ブロック)の個数:{dict_len}") for i in range( dict_len ): num_bit = bib.getdata(1) if num_bit == 1: num_bit2 = bib.getdata(1) # 0のはず if num_bit2 != 0: raise Exception("Only 1 to 2 dictionary sizes are supported.") word1 = bib.getdata(cls.bit_range[5]) word2 = None if num_bit == 1: word2 = bib.getdata(cls.bit_range[5]) DuplicateCount.dict_table[i] = DictElement2(num_bit, word1, word2) if word2 == None: word2 ="" if ChbBlock.debug: print(f"DICT[{i}]: {word1} {word2}") # class DictElement2: # 2つ要素までを持つDuplicateCount.dict_tableの辞書要素 def __init__(self, num_bit, word1, word2=None): self.num_bit=num_bit # 0, 1 の値で、1の時は、word1とword2の2つを利用 self.word1=word1 # word1 self.word2=word2 class DuplicateCount: # 重複データを数えて記憶 MIN_DIFF_BIT=13 # 辞書を適用するビットサイズ(これ以上のビットサイズに適用) COUNT_FOR_EXTRACT = 10 # 辞書候補の重複カウント数(これ以上のビットサイズに適用) idxs = [] # extractメソッドで設定されるDuplicateCountの辞書要素のリスト(31個以内)を記憶するバッファ count_BUFFER1=0 # set_dupli_countメソッドで使われる作業用(データ格納用添え字) # def __init__(self, idx): self.count1=0 # 1ワード重複数 (uint8) self.count2=0 # 2ワード重複数 (uint8) self.idx = idx # データ群の中で、最初に見つかる位置 pass # def __str__(self): return f"buffer[{self.idx:4}]:count1={self.count1:3},count2={self.count2:3}" # @classmethod def set_dupli_count(cls, data_buffer:list, list_dupli_count:list, uint_data:int): ''' list_dupli_countで重複データを数える ''' if cls.count_BUFFER1 == 0: # 先頭データの場合 ChbBlock.get_sequence_bit_index(uint_data, data_buffer) #, list_dupli_count) else: ChbBlock.get_sequence_bit_index(uint_data) list_dupli_count[cls.count_BUFFER1].count1=1 i=0 flag_matched=False if cls.count_BUFFER1 > 0: prev_data = data_buffer[cls.count_BUFFER1-1] # 一つ前の値 while i < cls.count_BUFFER1: if data_buffer[i]==uint_data: # 一致 if flag_matched == False: list_dupli_count[i].count1+=1 flag_matched=True if cls.count_BUFFER1 > 0: if data_buffer[i-1]==prev_data: list_dupli_count[i-1].count2+=2 break i+=1 # cls.count_BUFFER1+=1 @classmethod def extract(cls, list_dupli_count): ''' 辞書候補のリストを抽出 ist_dupli_countの添え字リストを cls.idxs に設定 ist_dupli_countリストを降順に32個並べて、それを 辞書候補のリストとする。''' idxs=[None]*32 # 抽出要素分のリスト icount=0 big=len(list_dupli_count)*2 # 並び替え時に、作業中の最大値として使う while icount < 32: ''' big よりより小さい要素で、最大の要素と、その添え字を求める ''' maxcount=0 # 範囲内の最大重複数を求めるためのループ内作業用 imax=-1 for i, obj in enumerate(list_dupli_count): if max(obj.count1, obj.count2) >= big: continue flag1= imax==-1 and max(obj.count1, obj.count2) > 1 gv=max( list_dupli_count[i].count1, list_dupli_count[i].count2) flag2=False if imax!=-1 and maxcount < gv: flag2=True if flag1 or flag2: imax=i maxcount=gv continue big=maxcount # 上記ループで得られた最大重複数 if big < cls.COUNT_FOR_EXTRACT: break # 重複数が登録候補に満たない if imax==-1: break # 候補が見つからない for i, obj in enumerate(list_dupli_count): # maxcountと同じと判断する要素添え字をldxsに記憶 if maxcount == max( list_dupli_count[i].count1, list_dupli_count[i].count2): if icount > 0 : if idxs[icount-1].count2 >= idxs[icount-1].count1: if idxs[icount-1].idx+1 == list_dupli_count[i].idx: continue idxs[icount]=list_dupli_count[i] icount+=1 if icount == 32: break # Full # i=0 while i<31 and idxs[i]!=None: i+=1 cls.idxs = idxs[:i] # idxs[X].idxが昇順のDuplicateCountリストになっている。 def ume_compress_out( data_buffer:list, list_dupli_count:list, bib:Bit2Byte): ''' list_dupli_countの辞書リストも使って、bib に圧縮する ''' ChbBlock.initialize() i=0 data_buffer_len = len(data_buffer) while i < data_buffer_len: uint_data = data_buffer[i] uint_data_next = None if i+1 >= data_buffer_len else data_buffer[i+1] if i == 0: # 先頭データの場合 ChbBlock.get_sequence_bit_index(uint_data, data_buffer, bit2byte=bib) else: diff = uint_data - ChbBlock.prev_data bit_size_idx = ChbBlock.get_bit_index(diff) if list_dupli_count and ChbBlock.flag_size_confirmed : # 辞書参照可能なら登録 idx_ref = -1 # 辞書参照の添え字(0以上の値で有効) n_word = 1 # 参照ワード数 for ref in list_dupli_count: if uint_data == data_buffer[ref.idx]: # 辞書参照? if ref.count2 >= ref.count1: # 2ワード参照 if uint_data_next != None and uint_data_next == data_buffer[ref.idx+1]: n_word = 2 # 2 word の辞書参照 if ChbBlock.bit_range[bit_size_idx]*2 < DuplicateCount.MIN_DIFF_BIT:continue idx_ref = ref.idx break else: # 1ワード参照 if ChbBlock.bit_range[bit_size_idx] < DuplicateCount.MIN_DIFF_BIT: continue idx_ref = ref.idx break if idx_ref > 0: # 辞書参照の添え字 if ChbBlock.prev_CHB: nn_word = ChbBlock.prev_CHB.out_CHB_bitimage() # 辞書データ登録前に、前のCHBを送出 ChbBlock.prev_CHB.flush_i32(nn_word) # 記憶したデータを送出 # 辞書データ登録 ChbBlock.prev_CHB = ChbBlock( ChbBlock.prev_bit_range_idx, i , idx_ref) i+=1 ChbBlock.index_buff+=1 ChbBlock.prev_data = data_buffer[ref.idx] if n_word == 2: i+=1 ChbBlock.index_buff+=1 ChbBlock.prev_data = data_buffer[ref.idx+1] continue ChbBlock.get_sequence_bit_index(uint_data) i+=1 # ChbBlock.get_sequence_bit_index_flush() # フィニッシュ def ume_compress( datas : list, callback ): ChbBlock.initialize() numb=len(datas) data_buffer=[0]*numb # 1次処理のデータバッファ(パス1でデータをここに記憶、パス2はこれを読んで圧縮) list_dupli_count=[DuplicateCount(n) for n in range(numb)] # 重複データここに記録 DuplicateCount.count_BUFFER1=0 for x in datas: DuplicateCount.set_dupli_count(data_buffer, list_dupli_count, x) # xを記憶する繰り返し ChbBlock.get_sequence_bit_index_flush() # フィニッシュ #if ChbBlock.debug: print("重複状況:", [str(x) for x in list_dupli_count] ) DuplicateCount.extract(list_dupli_count) # 辞書候補の添え字群をDuplicateCount.idxsにセット if ChbBlock.debug: for n , obj in enumerate(DuplicateCount.idxs): if obj == None: continue print(f"{n:3} 辞書候補:", obj) bib=ChbBlock.get_header_bitbyte( DuplicateCount.idxs, datas, callback ) ume_compress_out( datas, DuplicateCount.idxs, bib ) def sub_decompress(bib): ''' bib のbitByte型を解凍して、データリストを返す bibは、ヘッダーを読み取った直後の状態 ''' data2 = [] # 解凍記憶データ idx2=0 # 上記用添え字 chg_bit_size=ChbBlock.bit_range[5] base_value=bib.getdata(chg_bit_size) # 現在の設定値 if ChbBlock.debug: print(f"[{idx2:4}][BAB] :{base_value:5},{chg_bit_size:2}bit") idx2+=1 data2.append( base_value ) while True: n32=0 chb = bib.getdata(3) # CHB 読み取り if chb < 0 : raise Exception("CHB not match") # if chb == 0: # 32個連続 n32=32 if ChbBlock.debug: print(f"CHB {n32}個") elif chb == 0b111: # 辞書テーブル参照 dict_idx = bib.getdata(5) # 辞書添え字 n32=bib.getdata(5) # 同じサイズが連続する個数 dict_element2 = DuplicateCount.dict_table[dict_idx] data2.append( dict_element2.word1 ) base_value = dict_element2.word1 if ChbBlock.debug: print(f" [CHB][{idx2:4}]:{dict_element2.word1:5}", end="") idx2+=1 if dict_element2.num_bit == 1: data2.append( dict_element2.word2 ) base_value = dict_element2.word2 if ChbBlock.debug: print(f" ,{dict_element2.word2}", end="") idx2+=1 if ChbBlock.debug: print(f" dict_idx={dict_idx} {n32}個") if n32 == 0: continue else: chg_bit_size=ChbBlock.bit_range[chb-1] n32=bib.getdata(5) if ChbBlock.debug: print(f" [CHB] bit size={chg_bit_size} {n32}個") # if n32 == 0 : break # 終了 for n in range(n32): if chg_bit_size==ChbBlock.bit_range[5]: # [BAB]の復元 base_value = bib.getdata(chg_bit_size) if ChbBlock.debug: print(f"[{idx2:4}][BAB] :{base_value:5},{chg_bit_size:2}bit") else: # [DAB]の復元 diff= bib.getdata(chg_bit_size) diff2 = to_int(diff, chg_bit_size) base_value += diff2 if ChbBlock.debug: print(f"[{idx2:4}][DAB] :{base_value:5},{chg_bit_size:2}bit diff={diff2:2}") # idx2+=1 data2.append( base_value ) # return data2 def ume_decompress( buff : list): if ChbBlock.debug: print("\n\n---------- 解凍 --------------") bib=Byte2Bit(buff) ChbBlock.load_haeder(bib) # ヘッダ部の復元 if ChbBlock.debug: print("------------------------------ヘッダ部の復元の終了") buff2=sub_decompress(bib) return buff2 import os import sys import pathlib if __name__ == '__main__': #ChbBlock.debug = True ChbBlock.SBC_NUMB=9 #1 # パス1用 同一ビットサイズの連続個数(この数以上並ぶ場合にビットサイズを縮小) #DuplicateCount.MIN_DIFF_BIT=8 # 辞書を適用するビットサイズ(デフォルト:13) #DuplicateCount.COUNT_FOR_EXTRACT=8 # 辞書候補の重複カウント数 datas=[40000, 34000, 29000, 30000, 36000, 35000, 30000, 31000 ] # datas+=[40000, 34000, 29000, 30000, 36000, 35000, 34001, 33000, 31000 ,36000, 35000, 33000] # datas+=[5,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,1,2] # datas+=[3] # datas+=[4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,1,0,1,2,3,4,5,6,7,8,9,0,2,3,4,5,6,7,8,9,] # datas+=[40000, 34000, 29000, 30000, 36000, 35000, 34001, 33000, 31000 ,36000, 35001] # datas+=[40000, 34000, 29000, 30000, 36000, 35000, 34001, 33000, 31000 ,36000, 35000] # datas+=[40000, 34000, 29000, 30000, 36000, 35000, 34001, 33000, 31000 ,36000,35001, 2] # datas=datas[::-1] import matplotlib.pyplot as plt plt.plot(datas) plt.show() # データ確認用 print(datas, f"個数:{len(datas)}") # 圧縮対象リスト表示 bin_data=bytearray() # 圧縮データを記憶するバイト列 def callback( bt ): # 圧縮処理で呼び出されるコールバック関数 # print( f"[{bt:08b}]", end=" ") bin_data.append(bt) ume_compress(datas, callback) # 圧縮! callbackでbin_dataのbytearrayに追加 data2=ume_decompress( bin_data ) # bin_dataのbytearrayを復号して、data2に記憶 print(f"元サイス:{len(datas)*2}byte, 圧縮サイス:{len(bin_data)}byte") if datas == data2: print("元データと圧縮データが、完全に一致") else: for i in range(len(datas)): if datas[i] == data2[i]: print(f"一致[{i}]:{datas[i]} ") else : print(f"不一致[{i}]:{datas[i]}!={data2[i]} ") print(f"ChbBlock.SBC_NUMB={ChbBlock.SBC_NUMB}") print(f"DuplicateCount.MIN_DIFF_BIT={DuplicateCount.MIN_DIFF_BIT}") print(f"DuplicateCount.COUNT_FOR_EXTRACT={DuplicateCount.COUNT_FOR_EXTRACT}") print(f"辞書={len(DuplicateCount.idxs)}") print("\n以上がtest1()の実行結果です。")
''' umcmprs.py の検証用 ''' from umcmprs import Bit2Byte # bitに圧縮してbyteにするクラス from umcmprs import Byte2Bit # byteから解凍してbitにするクラス out_func=print # 圧縮でバイトに詰められた時に実行する関数にprintを登録 barray=bytearray() # 圧縮したバイト列記憶用 def callback( data ): # 圧縮でbyteに詰められた時に実行するコールバック関数 out_func( f"圧縮した{data:08b}を記憶" , end=" " ) barray.append( data ) b_byte=Bit2Byte( callback ) # 圧縮オブジェクト BIT_SIZE=6 # 記憶で使うビットサイズ b_byte.setdata(8, BIT_SIZE) # 8を圧縮 b_byte.setdata(9, BIT_SIZE) # 9を圧縮 b_byte.setdata(10, BIT_SIZE) # 10を圧縮 b_byte.setdata(11, BIT_SIZE) # 11を圧縮 b_byte.setdata(7, BIT_SIZE) # 7を圧縮 b_byte.padding() # byte列になるように埋める print(f"\n圧縮したバイト列:{barray}") b_bit=Byte2Bit( barray ) for n in range(5): data=b_bit.getdata( BIT_SIZE ) print( f" 復号データ:{data}" , end=" ")実行例
圧縮した01001000を記憶 圧縮した10100010を記憶 圧縮した00101100を記憶 圧縮した00000111を記憶 圧縮したバイト列:bytearray(b'H\xa2,\x07') 復号データ:8 復号データ:9 復号データ:10 復号データ:11 復号データ:7
from umcmprs import ChbBlock,DuplicateCount ChbBlock.debug = True #ChbBlock.SBC_NUMB=15 #1 # パス1用 同一ビットサイズの連続個数(この数以上並ぶ場合にビットサイズを縮小) #DuplicateCount.MIN_DIFF_BIT=8 # 辞書を適用するビットサイズ(デフォルト:13) #DuplicateCount.COUNT_FOR_EXTRACT=11 # 辞書候補の重複カウント数 from umcmprs import ume_compress,ume_decompress # テストデータ生成 NUMB=1024 NUMB=2048 lst=[int(y) for y in range(NUMB)] datas=[] with open("test.bin", "wb") as fw: while True: y=0 for x in lst: y+=x if y > 65535 : break # uint16 以内 datas.append(y) fw.write(y.to_bytes(2, 'little')) # 2バイトでリトルエンディアン if len(datas) >= NUMB: break if len(datas) >= NUMB: break # -------圧縮------- fw=open("test.uprs", "wb") bin_data=bytearray() # 圧縮データを記憶するバイト列 def callback( bt ): # 圧縮処理で呼び出されるコールバック関数 # print( f"[{bt:08b}]", end=" ") bin_data.append(bt) byte_data=bt.to_bytes(1, 'little') #print(bt, type(bt), byte_data, type(byte_data)) fw.write(byte_data) ume_compress(datas, callback) # 圧縮! callbackでbin_dataのbytearrayに追加 fw.close() # -------"test.uprs"のファイルを復号(解凍)------- with open("test.uprs", "rb") as fr: bin_data=fr.read() data2=ume_decompress( bin_data ) # bin_dataのbytearrayを復号して、data2に記憶 #exit() print(f"元サイス:{len(datas)*2}byte, 圧縮サイス:{len(bin_data)}byte") if datas == data2: print("元データと圧縮データが、完全に一致") else: for i in range(len(datas)): if datas[i] == data2[i]: print(f"一致[{i}]:{datas[i]} ") else : print(f"不一致[{i}]:{datas[i]}!={data2[i]} ") print(f"ChbBlock.SBC_NUMB={ChbBlock.SBC_NUMB}") print(f"DuplicateCount.MIN_DIFF_BIT={DuplicateCount.MIN_DIFF_BIT}") print(f"DuplicateCount.COUNT_FOR_EXTRACT={DuplicateCount.COUNT_FOR_EXTRACT}") print(f"辞書={len(DuplicateCount.idxs)}") import matplotlib.pyplot as plt plt.plot(datas) # 圧縮前のデータ plt.plot(data2) # 圧縮データを復元したデータ(上記との違いを比較) plt.show() # データ確認用実行例
[ 0][BAB] : 0 [CHB] bit size=7 31個 [ 1][DAB] : 1,7bit,diff=1, i32=0 [ 2][DAB] : 3,7bit,diff=2, i32=1 ・・・・・・ [2047][DAB] :28203, 9bit diff=237 [CHB] bit size=16 0個 元サイス:4096byte, 圧縮サイス:2561byte 元データと圧縮データが、完全に一致 ChbBlock.SBC_NUMB=15 DuplicateCount.MIN_DIFF_BIT=13 DuplicateCount.COUNT_FOR_EXTRACT=10 辞書=31
from umcmprs import ChbBlock,DuplicateCount #ChbBlock.debug = True #ChbBlock.SBC_NUMB=15 #1 # パス1用 同一ビットサイズの連続個数(この数以上並ぶ場合にビットサイズを縮小) #DuplicateCount.MIN_DIFF_BIT=8 # 辞書を適用するビットサイズ(デフォルト:13) #DuplicateCount.COUNT_FOR_EXTRACT=11 # 辞書候補の重複カウント数 from umcmprs import ume_compress,ume_decompress import math import numpy as np import random BUF_SIZE=1024*3 # frequency Hzで16Bitのサンプリングでリトルエンディアン符号無しの1000Hz正弦波データを1024ワード並ぶファイルを、 # 16Bit_1KHz_1024_sin.binの名前で作る。そして 振幅は、0〜1024で変化する生成にする。 # このファイルのバイト数は、BUF_SIZE ワードなので、(BUF_SIZE*2)byteになる。 # 1024ワードで16K時の全体の時間は 『(1/16KHz)*1024=(1/16e3)*1024=0.064秒』と計算できる。 def create_sin(inc , noise, frequency=1000 , sample_rate = 16000): if not inc: singen=lambda sec : math.sin( math.pi * 2 * frequency * sec )*(2**15-1) + (2**15-1) else: singen=lambda sec : math.sin( math.pi * 2 * frequency * sec )*(2**15-1)*(sec/0.65) + (2**15-1) # ★ 徐々に大きくする ts = [ 1/sample_rate * t for t in range( BUF_SIZE )] ys = [ int(singen( t )) for t in ts ] if noise != 0: ys = [ int(singen( t ))+int(np.random.uniform(-noise,noise)) for t in ts ] # ★ ノイズを加える ys = [ t if t <= 65535 else 65535 for t in ys] ys = [ t if t >= 0 else 0 for t in ys] bs=np.array(ys, np.uint16) buffer=b''.join(bs) # バイナリに変換 filename=f"16bit{frequency}Hz_{BUF_SIZE}sin_X.bin" print(f"生成ファイル:{filename}") with open(filename,"wb") as fw: fw.write( buffer ) return ys # np.frombuffer(bs, dtype="int16") NN=10000 # この数だけ検証データを生成してチェックする繰り返しを行う。 NN=1 # 1回だけの生成とチェック seed=13 # 乱数シード for nn in range(NN): seed=int(random.random() * 1000) random.seed(seed) np.random.seed(seed) reverse=True if random.random() > 0.5 else False inc = "inc" if random.random() > 0.5 else "" frequency = int(random.random() * 1000) noise= int(random.random() * 100) # #特定のパラメタ検証用(上記乱数のパラメタ設定を、指定の値で上書する場合、下記のコメントを外す) seed,reverse,inc,frequency,noise = 886,False,"inc",300,10 # # 検証用データを生成してdatasに記憶する datas=create_sin(inc,noise,frequency,sample_rate=16000) if reverse : datas=datas[::-1] #print(datas,type(datas)) print(f"seed={seed}, reverse={reverse} {inc} frequency={frequency},noise={noise}, nn={nn}----") # # -------圧縮------- fw=open("test.uprs", "wb") bin_data=bytearray() # 圧縮データを記憶するバイト列 def callback( bt ): # 圧縮処理で呼び出されるコールバック関数 # print( f"[{bt:08b}]", end=" ") bin_data.append(bt) byte_data=bt.to_bytes(1, 'little') #print(bt, type(bt), byte_data, type(byte_data)) fw.write(byte_data) ume_compress(datas, callback) # 圧縮! callbackでbin_dataのbytearrayに追加 fw.close() # -------復号(解凍)------- data2=ume_decompress( bin_data ) # bin_dataのbytearrayを復号して、data2に記憶 #exit() print(f"元サイス:{len(datas)*2}byte, 圧縮サイス:{len(bin_data)}byte") if datas == data2: print("元データと圧縮データが、完全に一致") else: for i in range(len(datas)): if datas[i] == data2[i]: print(f"一致[{i}]:{datas[i]} ") else : print(f"不一致[{i}]:{datas[i]}!={data2[i]} ") break print(f"ChbBlock.SBC_NUMB={ChbBlock.SBC_NUMB}") print(f"DuplicateCount.MIN_DIFF_BIT={DuplicateCount.MIN_DIFF_BIT}") print(f"DuplicateCount.COUNT_FOR_EXTRACT={DuplicateCount.COUNT_FOR_EXTRACT}") print(f"辞書={len(DuplicateCount.idxs)}") if NN <= 10 : import matplotlib.pyplot as plt plt.plot(datas) plt.plot(data2) plt.show() # データ確認用実行例
生成ファイル:16bit300Hz_3072sin_X.bin seed=886, reverse=False inc frequency=300,noise=10, nn=0---- 元サイス:6144byte, 圧縮サイス:4182byte 元データと圧縮データが、完全に一致 ChbBlock.SBC_NUMB=15 DuplicateCount.MIN_DIFF_BIT=13 DuplicateCount.COUNT_FOR_EXTRACT=10 辞書=0