UMEHOSHI ITA TOP PAGE    COMPUTER SHIEN LAB

データ伝達用に作ったUME連続可逆圧縮 (略名:UMCMPRS)のpython実装例

アルゴリズム(仕様)はここを参照)

ソース(umcmprs.py)

''' umcmprs.py
UME連続可逆圧縮 (UME COMPRESSの略名:UMCMPRS)
「UMEHOSHI ITA」基板(https://manabu.quu.cc/up/ume/)で、
USB転送能力を補うために作ったものですが、他にも利用できるでしょう。
(自由にご利用していただいて結構です。
 可能な限り下記内容の"umcmprs.py"のモジュール内に上記コメントを含めてご利用ください。)
'''

#from umedebug import debug_hex4
#dflags =[False]*20
# dflags[0]=debug_hex4(nn=0, data=0b0001, enable=True)
# dflags[1]=debug_hex4(1, f"{"++++++++"}", dflags[0])
#from IPython.core.debugger import Pdb; Pdb().set_trace()

''' 1〜16bitの指定のデータを詰めてbyteごとにcallbackを実行する。'''
class Bit2Byte:
    count_byte=0
    def __init__(self, callback ):
        self.bit32=0        # bit設定の作業用変数
        self.bitcount=0     # 上記への記憶bit数。 8bit毎にcallback処理
        self.callback=callback
    #
    def setdata(self, data, bitsize): # bit列をbyte列に変換 ビット列構築用で使う。
        '''bitsizeのビット数で、dataを記憶 1byte分に詰められたタイミングで callbackを呼ぶ'''
        bitmask = (1<<bitsize)-1
        if data > bitmask : raise Exception(f"{data} does not fit in {bitsize} bit")
        self.bit32 |= data << self.bitcount
        self.bitcount += bitsize
        while self.bitcount >= 8:
            self.callback(self.bit32 & 0x0ff)
            Bit2Byte.count_byte+=1
            self.bit32 >>= 8
            self.bitcount-=8
    #
    def padding(self): # ビット列構築用で使う
        ''' bit_sizeサイズが、丁度byteに達するように0で埋める'''
        if self.bitcount != 0:
            self.setdata(0, 8-self.bitcount) # bit列をbyte列に変換

''' byteのデータ群から指定のビット長を取り出す '''
class Byte2Bit:
    def __init__(self, bytedata:bytes ): # 圧縮データを引数するコンストラクタ
        self.byte_buf = bytearray(bytedata)
        self.byte_buf_len=len(bytedata)     # 上記のbit記憶数
        self.bit32=0        # bit設定の作業用変数(0〜8)
        self.bitcount=0     # 上記へのbit記憶数。 
    #
    def getdata(self,bitsize): # ビット列復元用で使う
        '''bitsizeのビット数の値を取り出して返す '''
        rv = (self.byte_buf_len << 3) + self.bitcount - bitsize
        if rv < 0 : return rv  # 負の値(足りないビット数)
        bitmask = (1<<bitsize)-1
        while self.bitcount < bitsize and self.byte_buf_len>0:
            byte0 = self.byte_buf.pop(0)
            self.byte_buf_len-=1
            self.bit32 |= byte0 << self.bitcount
            self.bitcount += 8
        val = self.bit32 & bitmask
        self.bit32 >>= bitsize
        self.bitcount -= bitsize
        return val
    #
    def append(self, bytedata:bytes ): # 圧縮データを追加
        for i in range( len( bytedata ) ):
            self.byte_buf.append( bytedata[i] )
            self.byte_buf_len+=1     # 上記のbit記憶数

def to_uint(val, bitsize): # bitsizeビット長の符号ありvalを、符号なしに変換する
   return val & (1<<bitsize)-1

def to_int(uVal, bitsize): # bitsizeビット長の符号無しuValを符号ありに変換
    sign_bit=1<<(bitsize-1)
    return uVal if uVal < sign_bit else uVal-(sign_bit<<1)

class ChbBlock:
    debug = False
    SBC_NUMB=15 # パス1用 同一ビットサイズの連続個数(この数以上並ぶ場合にビットサイズを縮小)  
    bit_bsize=3
    bit_range=[7,9,11,13,15,16] # 「ビット長のテーブル」
    BB_size=5 # 上記「ビット長のテーブル」要素のバイナリ記憶時のビットサイズ
    min_s = [ -(2**(v-1)) for v in bit_range ]
    max_s = [ 2**(v-1)-1 for v in bit_range ]
    #
    @classmethod
    def initialize(cls):
        cls.prev_CHB = None # get_sequence_bit_indexで利用する前のCHB
        cls.prev_bit_range_idx=5
        cls.prev_data=0
        cls.flag_size_confirmed=False # prev_CHBにサイズ変更のCHBが生成設定時にFalseで、その送出が終わったらTrue
        cls.data_buffer = None # 先頭データでget_sequence_bit_indexを使う時に記憶
        cls.index_buff = 0
        cls.i32 = 0 # 次に続くDABやBABの数で、直後がCHBの場合は0になる
        cls.bitbyte = None # 圧縮用Bit2Byteの設定
    #
    def __init__(self, bit_size_idx, buffer_idx, dict_idx = -1):
        self.bit_size_idx = bit_size_idx # サイズ変更
        self.buffer_idx = buffer_idx
        self.dict_idx = dict_idx # 辞書参照の添え字
    #
    def out_CHB_bitimage(self, index_buff=0):
        n_word = 0
        if ChbBlock.i32 == 32:
            if ChbBlock.bitbyte :ChbBlock.bitbyte.setdata( 0 , 3 )# サイズ変更無しで32個のデータが続く
            if ChbBlock.debug: print(f" [CHB] {32}個")
        elif self.dict_idx != -1:
            n_word = 1 # 参照ワード数 
            for idxtbl in range(len(DuplicateCount.idxs)):
                if DuplicateCount.idxs[idxtbl].idx == self.dict_idx: break
            if DuplicateCount.idxs[idxtbl].count2 >= DuplicateCount.idxs[idxtbl].count1: n_word = 2
            if ChbBlock.bitbyte :
                ChbBlock.bitbyte.setdata( 0b111, 3) # 辞書テーブル
                ChbBlock.bitbyte.setdata( idxtbl , 5 ) # 辞書テーブルの添え字
                ChbBlock.bitbyte.setdata( ChbBlock.i32 , 5 )
            if ChbBlock.debug:
                print(f"[{self.buffer_idx:4}][CHB] :{ChbBlock.data_buffer[self.dict_idx]:5}", end="")
                if n_word == 2: 
                    print(f" ,{ChbBlock.data_buffer[self.dict_idx+1]}", end="")
                print(f" [{self.dict_idx:4}] dict_idx={idxtbl} の辞書参照 直後のデータ{ChbBlock.i32}個 ")
        else:
            if ChbBlock.bitbyte :
                ChbBlock.bitbyte.setdata( self.bit_size_idx+1, 3) 
                ChbBlock.bitbyte.setdata( ChbBlock.i32 , 5 )
            if ChbBlock.debug: print(f" [CHB] bit size={ChbBlock.bit_range[self.bit_size_idx]} {ChbBlock.i32}個")
        return n_word # 辞書の場合の参照ワード数、辞書でない場合は0 
    #
    @classmethod
    def is_fit(cls,value, bit_range_idx):
        ''' 符号付valueが、min_s[bit_range_idx]以上で、max_s[bit_range_idx]以下であるならTrue '''
        if bit_range_idx==5 and value < 0: return False
        if bit_range_idx==5 and value <= cls.max_s[5]-cls.min_s[5]: return True
        if value < 0: value = -value-1 
        if cls.min_s[bit_range_idx] > value: return False
        if cls.max_s[bit_range_idx] < value: return False
        return True
    #
    @classmethod
    def get_bit_index(cls, value:int):
        ''' valueを記憶するための最小ビットサイズを指し示す ChbBlock.bit_range 配列の添え字を返します。'''
        bit_range_idx=0
        while bit_range_idx < len(cls.bit_range)-1:
            if cls.is_fit(value, bit_range_idx) : return bit_range_idx
            bit_range_idx += 1
        return bit_range_idx
    #
    @classmethod
    def flush_i32(cls, n_word ):
        ''' cls.i32 個のデータの出力 '''
        idx = cls.prev_CHB.buffer_idx + n_word
        prev_base_val = cls.data_buffer[idx-1]
        for n in range(cls.i32):
            base_val = cls.data_buffer[idx+n]
            bitsize=ChbBlock.bit_range[cls.prev_CHB.bit_size_idx]
            if cls.prev_CHB.bit_size_idx == 5: 
                if cls.is_fit(base_val, cls.prev_CHB.bit_size_idx) == False: 
                    raise Exception(f"not match get_sequence_bit_index(value={base_val},bit_range_idx={cls.prev_CHB.bit_size_idx})") 
                if cls.bitbyte :cls.bitbyte.setdata( base_val , bitsize ) # ベースビット長で保存
                if cls.debug:print(f"[{idx+n :4}][BAB] :{ base_val:5}, i32={n}")
            else:
                diff = base_val - prev_base_val
                if cls.is_fit(diff, cls.prev_CHB.bit_size_idx) == False: 
                    raise Exception(f"not match get_sequence_bit_index(value={base_val},bit_range_idx={diff})") 
                diff2 = to_uint( diff , bitsize) # 差分
                if cls.bitbyte :cls.bitbyte.setdata( diff2 , bitsize ) # ベースビット長で差分保存
                if cls.debug:print(f"[{idx+n :4}][DAB] :{ base_val:5},{bitsize}bit,diff={diff}, i32={n}")
            prev_base_val = base_val
        #
        cls.i32=0
    #
    @classmethod
    def get_sequence_bit_index(cls, uint_data, data_buffer=None, bit2byte:Bit2Byte=None):
        '''  data_buffer!=Noneならget_sequence_bit_index利用シーケンスの先頭要素の処理で、
                                                    data_bufferやbit2byteの登録処理になる。
            先頭引数uint_dataだけ(data_buffer==None)の呼び出しは、残りの要素の処理になる。
        bitByte2=None 指定は、uint_dataをcls.data_bufferに記憶する処理で、set_dupli_count関数で使われる。
          のこ場合は、別途で辞書候補用の重複数カウント処理が併用される。
        bitByte2!=None 指定はuint_dataをbitByte2に埋め込む処理で、ume_compress_out関数が使われる。
          のこ場合は、別途で辞書候補用リストの参照をbitByte2に埋め込む処理が併用される。
        戻り値は暫定ビットサイズです。
        '''
        #if cls.debug:print(f"UINT INPUT:{uint_data}")
        if data_buffer != None : # 先頭データの場合
            cls.index_buff = 0
            cls.i32 = 0
            cls.flag_size_confirmed=False # サイズ変更のCHBが生成時にFalseで、その送出が終わったらTrue
            cls.prev_bit_range_idx=len(cls.bit_range)-1 # get_bit_lengthメソッド実行で、以前のビット長を記憶
            bitsize= ChbBlock.bit_range[ cls.prev_bit_range_idx ]
            cls.prev_data = uint_data
            cls.data_buffer = data_buffer
            cls.bitbyte=bit2byte
            if cls.bitbyte == None:
                cls.data_buffer[cls.index_buff]=uint_data # パス1の記憶
            else:
                cls.bitbyte.setdata( to_uint( cls.data_buffer[cls.index_buff] , bitsize) ,bitsize)
            if cls.debug:print(f"[{cls.index_buff :4}][BAB] :{ cls.data_buffer[cls.index_buff]:5}")
            return cls.prev_bit_range_idx
        #
        cls.index_buff+=1
        if cls.bitbyte == None:
            cls.data_buffer[cls.index_buff]=uint_data  # パス1の記憶
        diff = uint_data - cls.prev_data
        bit_size_idx = cls.get_bit_index(diff)
        cls.prev_data = uint_data
        cls.i32 += 1
        flush_flag = False
        if cls.i32 == 1 and cls.prev_CHB is None:
            cls.prev_CHB = ChbBlock(bit_size_idx, cls.index_buff) # 暫定CHB生成
            if cls.prev_bit_range_idx != bit_size_idx: cls.flag_size_confirmed=False
            cls.prev_bit_range_idx = bit_size_idx
        elif bit_size_idx > cls.prev_bit_range_idx:#ビットサイズを大きくする必要がある?
            if cls.i32 < ChbBlock.SBC_NUMB and cls.prev_CHB.dict_idx == -1:
                cls.prev_CHB.bit_size_idx = bit_size_idx # 前のCHBのビットサイズ大きく変更
                cls.prev_bit_range_idx = bit_size_idx
                cls.flag_size_confirmed = False
                if cls.i32 >= 31: flush_flag = True
            else: # cls.prev_CHBが辞書または、cls.i32 >= ChbBlock.SBC_NUMB の場合
                cls.i32 -= 1
                n_word=cls.prev_CHB.out_CHB_bitimage() # 前の辞書を出力
                cls.flush_i32(n_word)
                cls.i32 += 1
                cls.prev_CHB = ChbBlock(bit_size_idx, cls.index_buff) # 暫定CHB生成
                if cls.prev_bit_range_idx != bit_size_idx: cls.flag_size_confirmed=False
                cls.prev_bit_range_idx = bit_size_idx
        elif (not cls.flag_size_confirmed or cls.prev_CHB.dict_idx != -1)and cls.i32 == 31:
            flush_flag = True
        elif cls.flag_size_confirmed and cls.i32 == 32:
            flush_flag = True
        elif bit_size_idx < cls.prev_bit_range_idx and cls.i32 >= ChbBlock.SBC_NUMB:
            cls.flag_size_confirmed = False
            flush_flag = True # ビットサイズを小さくする出力指示。
        elif bit_size_idx < cls.prev_bit_range_idx and \
                                    cls.prev_bit_range_idx == 5 and  cls.i32 >= 4:
            cls.flag_size_confirmed = False
            flush_flag = True # ベースビット長時のビットサイズを小さくする出力指示(4個で判定)。
        #
        if flush_flag:
            n_word=cls.prev_CHB.out_CHB_bitimage()
            cls.flush_i32(n_word)
            if cls.prev_CHB.i32 != 32 and cls.prev_CHB.dict_idx == -1: cls.flag_size_confirmed=True
            ChbBlock.prev_CHB = None 
        #
        return cls.prev_bit_range_idx
    #
    @classmethod
    def get_sequence_bit_index_flush(cls):
        if cls.prev_CHB :
            n_word=cls.prev_CHB.out_CHB_bitimage()
            cls.flush_i32(n_word)
            ChbBlock.prev_CHB = None
        if cls.bitbyte :cls.bitbyte.setdata( 6 , 3 )# 終了 CHB 送出
        if cls.bitbyte :cls.bitbyte.setdata( 0 , 5 )
        if ChbBlock.debug: print(f" [CHB] bit size={ChbBlock.bit_range[5]} {0}個")
        if cls.bitbyte :cls.bitbyte.padding() # byteに足りないbit列を0で埋める
        return 5
    #
    @classmethod
    def get_header_bitbyte(cls, idxs, datas, callback)->Bit2Byte:
        '''  idxsは、DuplicateCountの辞書参照用リスト  '''
        if ChbBlock.debug: print("------------------------------ create header")
        bib=Bit2Byte( callback ) #ヘッド部圧縮用バイナリ用
        bib.setdata( 0 , 1) #1ビットで0を記憶([Ver])
        bib.setdata( ChbBlock.BB_size , 5) # ビット長記憶域のビットサイズを5ビットで記憶
        for i in range(5, -1, -1):
            bib.setdata(cls.bit_range[i]-1, ChbBlock.BB_size) # ChbBlock.BB_size ビットで記憶
            if ChbBlock.debug: print(cls.bit_range[i] , end=" ")
        if ChbBlock.debug: print( " :Used bit length")
        # 先頭の辞書データの記憶
        bib.setdata( len( idxs ), 5) # 辞書データの個数 (0〜31)
        if ChbBlock.debug: print(f"DIB(辞書ブロック)の個数:{ len( idxs ) }")
        for i in range( len( idxs ) ):
            n_bit = 1 if idxs[i].count1 <= idxs[i].count2 else 0
            idx_buf=idxs[i].idx
            if idx_buf+1 == len(datas) : n_bit = 0
            if n_bit == 0: bib.setdata( 0 , 1) # 0記憶( 辞書ワードサイズが1 )
            else:  
                bib.setdata( 1 , 1) # 10を記憶( 辞書ワードサイズが2 )
                bib.setdata( 0 , 1) # 10を記憶( 辞書ワードサイズが2 )
            if ChbBlock.debug: print(f"DIB[{i}]:使用数:{max(idxs[i].count1,idxs[i].count2)}個",end="")
            if ChbBlock.debug: print(f"  値:{datas[idx_buf]}",end="")
            bib.setdata( datas[idx_buf] , cls.bit_range[5]) # 辞書データの記憶
            if n_bit == 1 :
                if ChbBlock.debug: print(f", {datas[idx_buf+1]} ",end="")
                bib.setdata( datas[idx_buf+1] , cls.bit_range[5]) # 辞書データの記憶
            if ChbBlock.debug: print()
        if ChbBlock.debug: print(f"-----------DIB(辞書ブロック)の終了 header size: {Bit2Byte.count_byte}byte")
        return bib
    #
    @classmethod
    def load_haeder(cls, bin_haed:Byte2Bit):
        ''' ChbBlock.BB_sizeと、ChbBlock.bit_range[0]から[5]の読み込みと
        DuplicateCount.dict_table の生成と読み込み。
        '''
        if ChbBlock.debug: print("------------------------------ヘッダー部 復元")  
        bib=bin_haed #bib = BitByte(bin_haed)
        if bib.getdata(1) != 0: raise Exception("Version does not match") # 未決定仕様
        ChbBlock.BB_size = bib.getdata(5) #5bit取り出し
        if ChbBlock.debug: print(f"「ビット長のテーブル」要素のサイズ:{ChbBlock.BB_size} bit")
        for i in range( len(cls.bit_range) ):
            b=bib.getdata( ChbBlock.BB_size ) # BB bit取り出し
            cls.bit_range[5-i]=b+1
            if ChbBlock.debug: print(b+1, end=" ")
        dict_len=bib.getdata(5) # 0〜31 # 使用辞書の個数取得
        DuplicateCount.dict_table=[None]*dict_len
        if ChbBlock.debug: print(f":使用するビット長\nDIB(辞書ブロック)の個数:{dict_len}")
        for i in range( dict_len ):
            num_bit = bib.getdata(1)
            if num_bit == 1:
                num_bit2 = bib.getdata(1) # 0のはず
                if num_bit2 != 0: raise Exception("Only 1 to 2 dictionary sizes are supported.")
            word1 = bib.getdata(cls.bit_range[5])
            word2 = None
            if num_bit == 1:
                word2 = bib.getdata(cls.bit_range[5])
            DuplicateCount.dict_table[i] = DictElement2(num_bit, word1, word2)
            if word2 == None: word2 =""
            if ChbBlock.debug: print(f"DICT[{i}]: {word1}  {word2}")
    #

class DictElement2: # 2つ要素までを持つDuplicateCount.dict_tableの辞書要素
    def __init__(self, num_bit, word1, word2=None):
        self.num_bit=num_bit # 0, 1 の値で、1の時は、word1とword2の2つを利用
        self.word1=word1 # word1
        self.word2=word2

class DuplicateCount: # 重複データを数えて記憶
    MIN_DIFF_BIT=13 # 辞書を適用するビットサイズ(これ以上のビットサイズに適用)
    COUNT_FOR_EXTRACT = 10 # 辞書候補の重複カウント数(これ以上のビットサイズに適用)
    idxs = [] # extractメソッドで設定されるDuplicateCountの辞書要素のリスト(31個以内)を記憶するバッファ
    count_BUFFER1=0 # set_dupli_countメソッドで使われる作業用(データ格納用添え字)
    #
    def __init__(self, idx):
        self.count1=0 # 1ワード重複数 (uint8)
        self.count2=0 # 2ワード重複数 (uint8)
        self.idx = idx # データ群の中で、最初に見つかる位置
        pass
    #
    def __str__(self):
        return f"buffer[{self.idx:4}]:count1={self.count1:3},count2={self.count2:3}"
    #
    @classmethod
    def set_dupli_count(cls, data_buffer:list,  list_dupli_count:list, uint_data:int):
        ''' list_dupli_countで重複データを数える '''
        if cls.count_BUFFER1 == 0: # 先頭データの場合
            ChbBlock.get_sequence_bit_index(uint_data, data_buffer) #, list_dupli_count)
        else:
            ChbBlock.get_sequence_bit_index(uint_data)
        list_dupli_count[cls.count_BUFFER1].count1=1
        i=0
        flag_matched=False
        if cls.count_BUFFER1 > 0: prev_data = data_buffer[cls.count_BUFFER1-1] # 一つ前の値
        while i < cls.count_BUFFER1:
            if data_buffer[i]==uint_data: # 一致
                if flag_matched == False: list_dupli_count[i].count1+=1
                flag_matched=True
                if cls.count_BUFFER1 > 0:
                    if data_buffer[i-1]==prev_data:
                        list_dupli_count[i-1].count2+=2
                        break
            i+=1
        #  
        cls.count_BUFFER1+=1
    
    @classmethod
    def extract(cls, list_dupli_count):
        ''' 辞書候補のリストを抽出 ist_dupli_countの添え字リストを cls.idxs に設定
        ist_dupli_countリストを降順に32個並べて、それを 辞書候補のリストとする。'''
        idxs=[None]*32 # 抽出要素分のリスト
        icount=0
        big=len(list_dupli_count)*2 # 並び替え時に、作業中の最大値として使う
        while icount < 32:
            ''' big よりより小さい要素で、最大の要素と、その添え字を求める '''
            maxcount=0 # 範囲内の最大重複数を求めるためのループ内作業用
            imax=-1
            for i, obj in enumerate(list_dupli_count): 
                if max(obj.count1, obj.count2) >= big: continue
                flag1= imax==-1 and max(obj.count1, obj.count2) > 1
                gv=max( list_dupli_count[i].count1, list_dupli_count[i].count2)
                flag2=False
                if imax!=-1 and maxcount < gv: flag2=True
                if flag1 or flag2:
                    imax=i
                    maxcount=gv
                    continue
            big=maxcount # 上記ループで得られた最大重複数
            if big < cls.COUNT_FOR_EXTRACT: break # 重複数が登録候補に満たない
            if imax==-1: break # 候補が見つからない
            for i, obj in enumerate(list_dupli_count):
                # maxcountと同じと判断する要素添え字をldxsに記憶
                if maxcount == max( list_dupli_count[i].count1, list_dupli_count[i].count2):
                    if icount > 0 :
                        if idxs[icount-1].count2 >= idxs[icount-1].count1:
                            if idxs[icount-1].idx+1 == list_dupli_count[i].idx: continue
                    idxs[icount]=list_dupli_count[i]
                    icount+=1
                    if icount == 32: break # Full
            #
        i=0
        while i<31 and idxs[i]!=None: i+=1
        cls.idxs = idxs[:i] # idxs[X].idxが昇順のDuplicateCountリストになっている。

def ume_compress_out( data_buffer:list,  list_dupli_count:list, bib:Bit2Byte):
    ''' list_dupli_countの辞書リストも使って、bib に圧縮する '''
    ChbBlock.initialize()
    i=0
    data_buffer_len = len(data_buffer)
    while i < data_buffer_len:
        uint_data = data_buffer[i]
        uint_data_next = None if i+1 >= data_buffer_len else data_buffer[i+1] 
        if i == 0: # 先頭データの場合
            ChbBlock.get_sequence_bit_index(uint_data, data_buffer, bit2byte=bib)
        else:
            diff = uint_data - ChbBlock.prev_data
            bit_size_idx = ChbBlock.get_bit_index(diff)
            if list_dupli_count and ChbBlock.flag_size_confirmed : # 辞書参照可能なら登録
                idx_ref = -1  # 辞書参照の添え字(0以上の値で有効)
                n_word = 1 # 参照ワード数 
                for ref in list_dupli_count:
                    if uint_data == data_buffer[ref.idx]: # 辞書参照?
                        if ref.count2 >= ref.count1: # 2ワード参照
                            if uint_data_next != None and uint_data_next == data_buffer[ref.idx+1]:
                                n_word = 2 # 2 word の辞書参照
                                if ChbBlock.bit_range[bit_size_idx]*2 < DuplicateCount.MIN_DIFF_BIT:continue
                                idx_ref = ref.idx
                                break
                        else: # 1ワード参照
                            if ChbBlock.bit_range[bit_size_idx] < DuplicateCount.MIN_DIFF_BIT: continue
                            idx_ref = ref.idx
                            break
                if idx_ref > 0: # 辞書参照の添え字
                    if ChbBlock.prev_CHB:
                        nn_word = ChbBlock.prev_CHB.out_CHB_bitimage() # 辞書データ登録前に、前のCHBを送出
                        ChbBlock.prev_CHB.flush_i32(nn_word) # 記憶したデータを送出
                    # 辞書データ登録
                    ChbBlock.prev_CHB = ChbBlock( ChbBlock.prev_bit_range_idx, i , idx_ref)
                    i+=1
                    ChbBlock.index_buff+=1
                    ChbBlock.prev_data = data_buffer[ref.idx]
                    if n_word == 2:
                        i+=1
                        ChbBlock.index_buff+=1
                        ChbBlock.prev_data = data_buffer[ref.idx+1]
                    continue
            ChbBlock.get_sequence_bit_index(uint_data)
        i+=1
    #
    ChbBlock.get_sequence_bit_index_flush() # フィニッシュ

def ume_compress( datas : list, callback ):
    ChbBlock.initialize()
    numb=len(datas)
    data_buffer=[0]*numb # 1次処理のデータバッファ(パス1でデータをここに記憶、パス2はこれを読んで圧縮)
    list_dupli_count=[DuplicateCount(n) for n in range(numb)] # 重複データここに記録
    DuplicateCount.count_BUFFER1=0
    for x in datas:
        DuplicateCount.set_dupli_count(data_buffer, list_dupli_count, x) # xを記憶する繰り返し
    ChbBlock.get_sequence_bit_index_flush() # フィニッシュ
    #if ChbBlock.debug: print("重複状況:", [str(x) for x in list_dupli_count] )
    DuplicateCount.extract(list_dupli_count) # 辞書候補の添え字群をDuplicateCount.idxsにセット
    if ChbBlock.debug: 
        for n , obj in enumerate(DuplicateCount.idxs):
            if obj == None: continue
            print(f"{n:3} 辞書候補:", obj)
    bib=ChbBlock.get_header_bitbyte( DuplicateCount.idxs, datas, callback )
    ume_compress_out( datas, DuplicateCount.idxs, bib )

def sub_decompress(bib):
    ''' bib のbitByte型を解凍して、データリストを返す bibは、ヘッダーを読み取った直後の状態 '''
    data2 = [] # 解凍記憶データ
    idx2=0 # 上記用添え字
    chg_bit_size=ChbBlock.bit_range[5]
    base_value=bib.getdata(chg_bit_size) # 現在の設定値
    if ChbBlock.debug: print(f"[{idx2:4}][BAB] :{base_value:5},{chg_bit_size:2}bit")
    idx2+=1
    data2.append( base_value )
    while True:
        n32=0
        chb = bib.getdata(3) # CHB 読み取り
        if chb < 0 : raise Exception("CHB not match") #
        if chb == 0: # 32個連続
            n32=32
            if ChbBlock.debug: print(f"CHB {n32}個")
        elif chb == 0b111: # 辞書テーブル参照
            dict_idx = bib.getdata(5) # 辞書添え字
            n32=bib.getdata(5) # 同じサイズが連続する個数
            dict_element2 = DuplicateCount.dict_table[dict_idx]
            data2.append( dict_element2.word1 )
            base_value = dict_element2.word1
            if ChbBlock.debug: print(f" [CHB][{idx2:4}]:{dict_element2.word1:5}", end="")
            idx2+=1
            if  dict_element2.num_bit == 1:
                data2.append( dict_element2.word2 )
                base_value = dict_element2.word2
                if ChbBlock.debug: print(f" ,{dict_element2.word2}", end="")
                idx2+=1
            if ChbBlock.debug: print(f" dict_idx={dict_idx} {n32}個")
            if n32 == 0: continue
        else:
            chg_bit_size=ChbBlock.bit_range[chb-1]
            n32=bib.getdata(5)
            if ChbBlock.debug:  print(f" [CHB] bit size={chg_bit_size} {n32}個")
        #
        if n32 == 0 : break # 終了
        for n in range(n32):
            if chg_bit_size==ChbBlock.bit_range[5]: # [BAB]の復元
                base_value = bib.getdata(chg_bit_size)
                if ChbBlock.debug: print(f"[{idx2:4}][BAB] :{base_value:5},{chg_bit_size:2}bit")
            else:  # [DAB]の復元
                diff= bib.getdata(chg_bit_size)
                diff2 = to_int(diff, chg_bit_size)
                base_value += diff2
                if ChbBlock.debug: print(f"[{idx2:4}][DAB] :{base_value:5},{chg_bit_size:2}bit  diff={diff2:2}")
            #
            idx2+=1
            data2.append( base_value )
        #
    return data2

def ume_decompress( buff : list):
    if ChbBlock.debug: print("\n\n---------- 解凍 --------------")
    bib=Byte2Bit(buff)
    ChbBlock.load_haeder(bib) # ヘッダ部の復元
    if ChbBlock.debug: print("------------------------------ヘッダ部の復元の終了")
    buff2=sub_decompress(bib)
    return buff2

import os
import sys
import pathlib

if __name__ == '__main__':
    #ChbBlock.debug = True
    ChbBlock.SBC_NUMB=9 #1 # パス1用 同一ビットサイズの連続個数(この数以上並ぶ場合にビットサイズを縮小)  
    #DuplicateCount.MIN_DIFF_BIT=8 # 辞書を適用するビットサイズ(デフォルト:13)
    #DuplicateCount.COUNT_FOR_EXTRACT=8 # 辞書候補の重複カウント数

    datas=[40000, 34000, 29000, 30000, 36000, 35000, 30000, 31000 ]
    # datas+=[40000, 34000, 29000, 30000, 36000, 35000, 34001, 33000, 31000 ,36000, 35000, 33000]
    # datas+=[5,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,1,2]
    # datas+=[3]
    # datas+=[4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,1,0,1,2,3,4,5,6,7,8,9,0,2,3,4,5,6,7,8,9,]
    # datas+=[40000, 34000, 29000, 30000, 36000, 35000, 34001, 33000, 31000 ,36000, 35001]
    # datas+=[40000, 34000, 29000, 30000, 36000, 35000, 34001, 33000, 31000 ,36000, 35000]
    # datas+=[40000, 34000, 29000, 30000, 36000, 35000, 34001, 33000, 31000 ,36000,35001, 2]
    # datas=datas[::-1]

    import matplotlib.pyplot as plt
    plt.plot(datas)
    plt.show() # データ確認用
    print(datas, f"個数:{len(datas)}") # 圧縮対象リスト表示

    bin_data=bytearray() # 圧縮データを記憶するバイト列
    def callback( bt ): # 圧縮処理で呼び出されるコールバック関数
        # print( f"[{bt:08b}]", end=" ")
        bin_data.append(bt)

    ume_compress(datas, callback) # 圧縮! callbackでbin_dataのbytearrayに追加

    data2=ume_decompress( bin_data ) # bin_dataのbytearrayを復号して、data2に記憶

    print(f"元サイス:{len(datas)*2}byte, 圧縮サイス:{len(bin_data)}byte")
    if datas == data2: print("元データと圧縮データが、完全に一致")
    else:
        for i in range(len(datas)):
            if datas[i] == data2[i]: print(f"一致[{i}]:{datas[i]} ")
            else : print(f"不一致[{i}]:{datas[i]}!={data2[i]} ")
    
    print(f"ChbBlock.SBC_NUMB={ChbBlock.SBC_NUMB}")
    print(f"DuplicateCount.MIN_DIFF_BIT={DuplicateCount.MIN_DIFF_BIT}")
    print(f"DuplicateCount.COUNT_FOR_EXTRACT={DuplicateCount.COUNT_FOR_EXTRACT}")
    print(f"辞書={len(DuplicateCount.idxs)}")
    print("\n以上がtest1()の実行結果です。")

umcmprs.pyの検証用ソース(umcmprs_test00.py)

''' umcmprs.py の検証用
'''
from umcmprs import Bit2Byte # bitに圧縮してbyteにするクラス
from umcmprs import Byte2Bit # byteから解凍してbitにするクラス

out_func=print # 圧縮でバイトに詰められた時に実行する関数にprintを登録

barray=bytearray() # 圧縮したバイト列記憶用 

def callback( data ): # 圧縮でbyteに詰められた時に実行するコールバック関数
    out_func( f"圧縮した{data:08b}を記憶" , end="  " )
    barray.append( data )

b_byte=Bit2Byte( callback ) # 圧縮オブジェクト
BIT_SIZE=6 # 記憶で使うビットサイズ
b_byte.setdata(8, BIT_SIZE)  #  8を圧縮
b_byte.setdata(9, BIT_SIZE)  #  9を圧縮
b_byte.setdata(10, BIT_SIZE) # 10を圧縮
b_byte.setdata(11, BIT_SIZE) # 11を圧縮
b_byte.setdata(7, BIT_SIZE)  #  7を圧縮
b_byte.padding()      # byte列になるように埋める
print(f"\n圧縮したバイト列:{barray}")

b_bit=Byte2Bit( barray )
for n in range(5):
    data=b_bit.getdata( BIT_SIZE )
    print( f" 復号データ:{data}" , end="  ")
実行例
圧縮した01001000を記憶  圧縮した10100010を記憶  圧縮した00101100を記憶  圧縮した00000111を記憶
圧縮したバイト列:bytearray(b'H\xa2,\x07')
 復号データ:8   復号データ:9   復号データ:10   復号データ:11   復号データ:7

umcmprs.pyの検証用ソース(umcmprs_test01.py)

from umcmprs import ChbBlock,DuplicateCount
ChbBlock.debug = True
#ChbBlock.SBC_NUMB=15 #1 # パス1用 同一ビットサイズの連続個数(この数以上並ぶ場合にビットサイズを縮小)  
#DuplicateCount.MIN_DIFF_BIT=8 # 辞書を適用するビットサイズ(デフォルト:13)
#DuplicateCount.COUNT_FOR_EXTRACT=11 # 辞書候補の重複カウント数

from umcmprs import ume_compress,ume_decompress

# テストデータ生成
NUMB=1024
NUMB=2048
lst=[int(y) for y in range(NUMB)]
datas=[]
with open("test.bin", "wb") as fw:
    while True:
        y=0
        for x in lst:
            y+=x
            if y > 65535 : break # uint16 以内
            datas.append(y)
            fw.write(y.to_bytes(2, 'little'))  # 2バイトでリトルエンディアン 
            if len(datas) >= NUMB: break
        if len(datas) >= NUMB: break

# -------圧縮-------
fw=open("test.uprs", "wb")
bin_data=bytearray() # 圧縮データを記憶するバイト列
def callback( bt ): # 圧縮処理で呼び出されるコールバック関数
    # print( f"[{bt:08b}]", end=" ")
    bin_data.append(bt)
    byte_data=bt.to_bytes(1, 'little')
    #print(bt, type(bt), byte_data, type(byte_data))
    fw.write(byte_data)

ume_compress(datas, callback) # 圧縮! callbackでbin_dataのbytearrayに追加
fw.close()

# -------"test.uprs"のファイルを復号(解凍)-------
with open("test.uprs", "rb") as fr:
    bin_data=fr.read()

data2=ume_decompress( bin_data ) # bin_dataのbytearrayを復号して、data2に記憶
#exit()
print(f"元サイス:{len(datas)*2}byte, 圧縮サイス:{len(bin_data)}byte")
if datas == data2: print("元データと圧縮データが、完全に一致")
else:
    for i in range(len(datas)):
        if datas[i] == data2[i]: print(f"一致[{i}]:{datas[i]} ")
        else : print(f"不一致[{i}]:{datas[i]}!={data2[i]} ")

print(f"ChbBlock.SBC_NUMB={ChbBlock.SBC_NUMB}")
print(f"DuplicateCount.MIN_DIFF_BIT={DuplicateCount.MIN_DIFF_BIT}")
print(f"DuplicateCount.COUNT_FOR_EXTRACT={DuplicateCount.COUNT_FOR_EXTRACT}")
print(f"辞書={len(DuplicateCount.idxs)}")
import matplotlib.pyplot as plt
plt.plot(datas) # 圧縮前のデータ
plt.plot(data2) # 圧縮データを復元したデータ(上記との違いを比較)
plt.show() # データ確認用
実行例
[   0][BAB] :    0
 [CHB] bit size=7 31個
[   1][DAB] :    1,7bit,diff=1, i32=0
[   2][DAB] :    3,7bit,diff=2, i32=1
・・・・・・
[2047][DAB] :28203, 9bit  diff=237
 [CHB] bit size=16 0個
元サイス:4096byte, 圧縮サイス:2561byte
元データと圧縮データが、完全に一致
ChbBlock.SBC_NUMB=15
DuplicateCount.MIN_DIFF_BIT=13
DuplicateCount.COUNT_FOR_EXTRACT=10
辞書=31

上記の圧縮対象データと、復元データ
上記の圧縮対象データファイル「"test.bin"」のサイズは、4,096 バイトで、圧縮後のファイルは2,561 バイトです。
(「"test.bin"」のファイルをzipファイルにすると、909 バイトでした)

umcmprs.pyの検証用ソース(umcmprs_test02.py)

N回のsin関数を利用した圧縮検証用データ生成と検証プログラムです。
(noiseパラメタで正確なsinの幅に乱数の値を付加しています。指定パラメタで動作していますが、簡単に乱数生成が可能)
from umcmprs import ChbBlock,DuplicateCount
#ChbBlock.debug = True
#ChbBlock.SBC_NUMB=15 #1 # パス1用 同一ビットサイズの連続個数(この数以上並ぶ場合にビットサイズを縮小)  
#DuplicateCount.MIN_DIFF_BIT=8 # 辞書を適用するビットサイズ(デフォルト:13)
#DuplicateCount.COUNT_FOR_EXTRACT=11 # 辞書候補の重複カウント数

from umcmprs import ume_compress,ume_decompress
import math
import numpy as np
import random

BUF_SIZE=1024*3
# frequency Hzで16Bitのサンプリングでリトルエンディアン符号無しの1000Hz正弦波データを1024ワード並ぶファイルを、
# 16Bit_1KHz_1024_sin.binの名前で作る。そして 振幅は、0〜1024で変化する生成にする。
# このファイルのバイト数は、BUF_SIZE ワードなので、(BUF_SIZE*2)byteになる。
# 1024ワードで16K時の全体の時間は 『(1/16KHz)*1024=(1/16e3)*1024=0.064秒』と計算できる。
def create_sin(inc , noise, frequency=1000 , sample_rate = 16000):    
    if not inc: 
        singen=lambda sec : math.sin(  math.pi * 2 * frequency * sec )*(2**15-1) + (2**15-1)
    else:
        singen=lambda sec : math.sin(  math.pi * 2 * frequency * sec )*(2**15-1)*(sec/0.65) + (2**15-1) # ★ 徐々に大きくする
    ts = [ 1/sample_rate * t for t in range( BUF_SIZE )]
    ys = [ int(singen( t )) for t in ts ] 
    if noise != 0:
        ys = [ int(singen( t ))+int(np.random.uniform(-noise,noise)) for t in ts ] # ★ ノイズを加える
    ys = [ t if t <= 65535 else 65535  for t in ys]
    ys = [ t if t >= 0 else 0  for t in ys]
    bs=np.array(ys, np.uint16)
    buffer=b''.join(bs) # バイナリに変換
    filename=f"16bit{frequency}Hz_{BUF_SIZE}sin_X.bin"
    print(f"生成ファイル:{filename}")
    with open(filename,"wb") as fw:
        fw.write( buffer )
    return ys # np.frombuffer(bs, dtype="int16")

NN=10000 # この数だけ検証データを生成してチェックする繰り返しを行う。
NN=1 # 1回だけの生成とチェック
seed=13 # 乱数シード
for nn in range(NN):
    seed=int(random.random() * 1000)
    random.seed(seed)
    np.random.seed(seed)
    reverse=True if random.random() > 0.5 else False
    inc = "inc" if random.random() > 0.5 else ""
    frequency = int(random.random() * 1000)
    noise= int(random.random() * 100)
    #
    #特定のパラメタ検証用(上記乱数のパラメタ設定を、指定の値で上書する場合、下記のコメントを外す)
    seed,reverse,inc,frequency,noise = 886,False,"inc",300,10
    #
    # 検証用データを生成してdatasに記憶する
    datas=create_sin(inc,noise,frequency,sample_rate=16000)
    if reverse : 
        datas=datas[::-1]
    #print(datas,type(datas))
    print(f"seed={seed}, reverse={reverse} {inc} frequency={frequency},noise={noise}, nn={nn}----")
    #
    # -------圧縮-------
    fw=open("test.uprs", "wb")
    bin_data=bytearray() # 圧縮データを記憶するバイト列
    def callback( bt ): # 圧縮処理で呼び出されるコールバック関数
        # print( f"[{bt:08b}]", end=" ")
        bin_data.append(bt)
        byte_data=bt.to_bytes(1, 'little')
        #print(bt, type(bt), byte_data, type(byte_data))
        fw.write(byte_data)

    ume_compress(datas, callback) # 圧縮! callbackでbin_dataのbytearrayに追加
    fw.close()

    # -------復号(解凍)-------
    data2=ume_decompress( bin_data ) # bin_dataのbytearrayを復号して、data2に記憶
    #exit()
    print(f"元サイス:{len(datas)*2}byte, 圧縮サイス:{len(bin_data)}byte")
    if datas == data2: print("元データと圧縮データが、完全に一致")
    else:
        for i in range(len(datas)):
            if datas[i] == data2[i]: print(f"一致[{i}]:{datas[i]} ")
            else : print(f"不一致[{i}]:{datas[i]}!={data2[i]} ")
        break

    print(f"ChbBlock.SBC_NUMB={ChbBlock.SBC_NUMB}")
    print(f"DuplicateCount.MIN_DIFF_BIT={DuplicateCount.MIN_DIFF_BIT}")
    print(f"DuplicateCount.COUNT_FOR_EXTRACT={DuplicateCount.COUNT_FOR_EXTRACT}")
    print(f"辞書={len(DuplicateCount.idxs)}")

    if NN <= 10 :
        import matplotlib.pyplot as plt
        plt.plot(datas)
        plt.plot(data2)
        plt.show() # データ確認用
実行例
生成ファイル:16bit300Hz_3072sin_X.bin
seed=886, reverse=False inc frequency=300,noise=10, nn=0----
元サイス:6144byte, 圧縮サイス:4182byte
元データと圧縮データが、完全に一致
ChbBlock.SBC_NUMB=15
DuplicateCount.MIN_DIFF_BIT=13
DuplicateCount.COUNT_FOR_EXTRACT=10
辞書=0

上記の圧縮対象データのイメージです。
上記の圧縮対象データファイル「"16bit300Hz_3072sin_X.bin"」のサイズは、6,144 バイトで、 圧縮後のファイルは4,180 バイトです。
(「"16bit300Hz_3072sin_X.bin"」のファイルをzipファイルにすると、5,959 バイトでした)