UMEHOSHI ITA TOP PAGE

[Raspberry Pi 3 Model A+]と[UMEHOSHI ITA]を乗せたモータ付き台車

このページで示した[Raspberry Pi]のサービスのstart-app-select.pyから呼び出されるstart-app-select.pyのコードです。
Raspberry Pi 3 Model A+]に接続した[UMEHOSHI ITA]基板を乗せるモータ付き台車で使うコードです

制御コマンドを受信して実行するTCPサーバープログラム(raspiAPumeTcp.py)

raspiAPumeTcp.pyは、TCP受信したしたフィルの拡張子が「.umh」であれば、USBに接続先のUMEHOSHI ITA用へプログラムとして送信して実行させます。 (raspiAPumeTcp.pyのコードは後述するこのリンク先で示しています)
そしてUMEHOSHI ITAより、応答がある場合はそれをTCPで返信します。
そこで、USB通信の基本的な処理はumeusb.py、TCP通信の基本的な処理はumetcp.py の別モジュールに分割することにした。

raspiAPumeTcp.pyの実現に必要なUSB通信の基本的な処理のモジュール(/usr/local/apps/umeusb.py)

ここで、Raspberry Pi 3 Model A+のUSBに対応するシリアル通信デバイスを指すファイル名を次の操作で調べた。
suzuki@raspberrypi:~ $ ls -l /dev
・・省略・・・
crw-rw----  1 root   dialout 166,   0 Dec 14 12:23 ttyACM0
crw-rw----  1 root   dialout 204,  64 Dec 14 12:23 ttyAMA0
・・省略・・・
調べた結果、Type A のUSBのシリアル通信デバイスを指すファイル名はttyACM0と分かった。
上記で利用している「/usr/local/apps/umeusb.py」のモジュール内容
#!/usr/bin/python3
# 「/usr/local/apps/umeusb.py」
import serial
import sys
import time

def defalut_usb_receive_func(bin):
    'usb受信のイベントで実行するデフォルト処理'
    print( bin.decode('utf-8') ,end="") # USBより受信したデータを表示

usb_receive_func = defalut_usb_receive_func # usb受信で実行する処理
# 上記を変更することで、USB受信に対する振る舞いを変更できる。

"usb:このグローバル変数が指し示す接続相手とUSB通信するためのオブジェクト"
"imort時にオープンされて、下記各関数はそれを利用する。"
usb = None

def init_sub(): 
    global usb
    try:
        usb = serial.Serial(port = '/dev/ttyACM0', baudrate = 115200)
        #usb = serial.Serial(port = 'COM8', baudrate = 115200,timeout = 10)
    except Exception as e:
        print(e)
        sys.exit(1)
    print( usb ) # USB のシリアルオブジェクト表示

def check_sum(s):
    ''' b = check_sum("G10800090000067")
    '''
    ck = 0
    n = len(s)
    if n == 0: return
    for i in range(n-2):
        ck += ord(s[i])
        # print(ck)
        # print(s[i] , end ="")
    # print(' ',s[-2:] , end ="")
    ck += int(s[-2:] , 16)
    ck &= 0x0f
    if ck != 0 :
        print("check sum error", ck)
        return False
    return True

def usb_send(s, quantity:int=2, endstr:bytes=b"\n")->bytes:
    '''  b = usb_send("G10800090000067", endstr=b"\n",quantity=1):
    sを送信して、受信したバイナリを返す。タイムアウト時は""を返す
    endstrがquantity個受信した時点でリターンする。
    quantity=2のデフォルト値は、送信した1行に対するエコー文字列の1行と
    応答文字列の1行を意味します。
    '''
    if check_sum(s) == False: return b""
    usb.write(s.encode('utf-8'))
    usb.write(b'\r\n')
    rtnval=b""
    while True:
        if quantity <= 0:
            return rtnval # 受信したバイナリーを返す。
        b=usb.readline()
        if b == "": return rtnval
        rtnval+=b
        n=b.count(endstr)
        quantity-=n

def send_cmd( ss, quantity:int=2, endstr:bytes=b"\n", cmdchar:str="SRG"):# sの例[S048000800000FFAF000086]
    '''# 複数の'\n'を含む文字列をusbに送信する。'''
    a = ss.split('\n')
    for s in a:
        s=s.strip()
        if s == "" or not s[0] in cmdchar : continue # S R G 以外のコマンドを無視
        b = usb_send(s, quantity, endstr) # デフォルトで2行のudb応答文字列の受信を含むコマンド出力
        usb_receive_func( b ) # USB受信で得られたbで行う処理

def send_cmdfile(filepath, cmdchar="SRG"):
    with open(filepath , "r", encoding="utf-8") as f:
        ss = f.readlines()
    if filepath.endswith(".umh") : ss = ss[1:] # 先頭のボタン仕様行を除去
    ss="\n".join(ss)
    #print(ss)
    send_cmd( ss, quantity=0, endstr=b"\n", cmdchar=cmdchar)

# USB受信スレット例(b'end\r\n'受信まで繰り返す)
def read_loop():
    while True:
        try:
            b=usb.readline() # binary
            if(b == b'end\r\n'):
                print("recieve 'end'")
                break
            usb_receive_func( b ) # USB受信の処理
        except Exception as e:
            #print(e)
            break
    #
    print("end read_loop()")

if __name__ == "__main__": # キー入力した「UME専用Hexコマンド」を送信して[UMEHOSHI ITA]を制御するテスト用
    # スレッドに read_loop 関数を渡して実行を始める
    init_sub()
    import threading
    t_id = threading.Thread(target=read_loop)
    t_id.start()
    s='G10800090000067' # [80009000]番地から16byteのメモリ内容の取得
    usb_send( s, quantity=0 )
    send_cmdfile( 'app_pwm_esp32.umh' )
    #send_cmdfile( 'uBeep.umh' )
    #send_cmdfile( 'uStartInit.umh' )
    time.sleep(3) # この待機中で、UMEHOSHI ITA からのUSBデータ(16byteメモリー情報)を表示すればOK
    usb.close()
    print("Ended !")

上記のmainの実行で、'G10800090000067'+'\r\n'を送って、応答のメモリ内容を表示できればOKです。

def usb_send(s, endstr=b"\n",quantity=2)が、上記で作成した最も低レベルのメソッドで、 sの文字列の送信ですが、その応答データの受信処理も行います
応答データの中にendstrがquantityの数だけ含まれたと判断できた時に、USBからの応答データをリターンします。
quantity=0に指定すると、USBからの受信を行いません。
別途でUSBからの受信処理をスレッドなどの非同期で行う場合は、quantity=0に指定して、usb_sendを送信専用関数として使います。
上記では、usb_sendの戻り値で得られる受信データやread_loop内の受信処理で得られた受信データは、usb_receive_funcに記憶する関数で 処理します。
ここではusb_receive_funcにデフォルトで、標準出力への表示関数を記憶しています。
このusb_receive_func変数は、このモジュールを利用する用途に応じた処理関数に変更して使うことを想定しています。

上記のsend_cmdfile( 'app_pwm_esp32.umh' ) のコメントを外すと、『app_pwm_esp32.umh』のファイルを送信します。
このファイルは、 [UMEHOSHI ITA]基板のEEPROM領域に書き込むにモータ制御プログラムで、 このリンク先ページで 紹介したものです。
この作品では、このモータ制御プログラムを書き込んで使っています。


raspiAPumeTcp.pyの実現に必要なTCP通信の基本的な処理のモジュール(/usr/local/apps/umetcp.py)

以下で示すコードは、
TCPのサーバーで文字列メッセージファイルを受信、送信するメソッドを用意したモジュールです。
そのデータ送受信のプロトコルは次の通りです。
文字列メッセージのデータブロックは、先頭が'M'の文字で、改行までの文字列です。
またファイルのデータブロックは、先頭が'f'の文字で、
 その直後にファイル名が続き、   次に半角スペースが続き、
   次にファイルのbyteサイスの10進の文字列が続きます。
    そしてその直後に'\r\nがあり、
     次にファイルのbyte数分のファイル内容バイト列が続きます。

このプロトコルで、WindowやLinux、ESP32のPython環境で送受信するためにの最小となるモジュールを目標に作りました。
#!/usr/bin/python3
# 「/usr/local/apps/umetcp.py」
import os
import sys
import socket
import netifaces

receive_folder="./" # 受信データの記憶位置
receive_folder="/usr/local/apps/" # 受信データの記憶位置

def get_wlan0_ip():
    # インターフェース 'wlan0' のアドレス情報を取得
    # netifaces.AF_INET は IPv4 アドレスを意味します
    addresses = netifaces.ifaddresses('wlan0') # Windows非対応
    if netifaces.AF_INET in addresses:
        # IPv4 アドレスのリストの最初の要素を取得
        ip_info = addresses[netifaces.AF_INET][0]
        return ip_info['addr']
    else:
        return None

def defalut_tcp_recieve_file(filepath):
    'tcpでファイルを受信した直後の処理'
    filesize = os.path.getsize(filepath)
    # 受信したファイル名とサイズを表示
    print( "recieve file:{},size:{}".format(filepath, filesize)) 

tcp_receive_file_func = defalut_tcp_recieve_file # tcpファイル受信で実行する処理

def defalut_tcp_recieve_message(msg):
    print( msg ) 

tcp_receive_message_func = defalut_tcp_recieve_message # tcpメッセージ受信で実行する処理

def send_file(sock, filepath):# filepathのファイルをクライアントに送信(大きなフィルにも対応)
    try:
        filename = os.path.basename(filepath)
        file_info = os.stat(filepath)
        filesize = file_info[6]
        s = "f{} {}".format(filename, filesize)
        bin=(s+"\r\n").encode("utf-8")
        print(s) # 名前とサイズの送信文字列
        sock.sendall(bin) # send filename filesize
        CHUNK = 100000 # この数ずつ送る(必要に応じて変更)
        remaining_sizes=filesize
        with open(filepath, "rb") as f:
            while True:
                print(f"Remaining sizes:{remaining_sizes}")
                size=CHUNK if remaining_sizes >= CHUNK else remaining_sizes
                data = f.read(size)
                sock.sendall(data)
                remaining_sizes -=len(data)
                if remaining_sizes == 0: break
            #
        #
        print(f"send_file:{filepath} end." )  
    except Exception as e:
        print("send_file Error:"+ str(e))

def send_message(sock,msg):
    bin=('M'+msg+"\r\n").encode("utf-8")#binaryへ変換
    sock.sendall(bin)#一括送信

def recieve_file(sock):  # 受信ファイル (ファイル名にスペースを含む場合にも対応)
    buf=b""
    while True:
        bin = sock.recv(1)#1byte受信
        if len(bin) != 1: break
        buf += bin
        if buf[-2:] == b"\r\n":# 1行の文字列受信終了?
            s = buf[0:-2].decode('utf-8')#バイナリから
            iSpace=s.rfind(' ')
            filename, filesize = s[0:iSpace], int(s[iSpace+1:])
            print(filename, filesize) # 受信ファイルとサイズ確定
            CHUNK = 100000
            remaining_sizes=filesize
            with open( receive_folder + filename, "wb") as f:
                bin=b""
                while remaining_sizes > 0:
                    print(f"Remaining sizes:{remaining_sizes}")
                    size=CHUNK if remaining_sizes >= CHUNK else remaining_sizes
                    bin = sock.recv(size) # ファイルの受信
                    f.write(bin)
                    remaining_sizes -= len(bin)
                #
            tcp_receive_file_func( receive_folder + filename) # ファイル受信直後の処理
            break

def recieve_message(sock):
    buf=b""
    while True:
        bin = sock.recv(1)#1byte受信
        if len(bin) != 1: break
        buf += bin
        #print("----", buf)
        if buf[-2:] == b"\r\n":# 1行の文字列受信終了?
            s = buf[0:-2].decode('utf-8')#バイナリから
            tcp_receive_message_func(s) # 受信文字列の対処
            break

def receiveData(sock):
    print( "start receiveData" )
    while True:
        bin = sock.recv(1)#1byte受信
        if len(bin) != 1: break
        if bin == b"f" : recieve_file(sock)
        elif bin == b"M" : recieve_message(sock)
        else:
            print(bin , end="")
    #
    print( "receiveData ended." )   

if __name__ == "__main__":
    server_addr =('192.168.0.110', 59154) # 接続先をコードで指定する
    server_addr =(get_wlan0_ip(), 59154) # RaspberryでSTAやATモードに対応時に使う
    print("Serverの情報:",server_addr)
    serversock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    serversock.bind(server_addr)  # IPとポート番号を指定します
    print("接続要求を待つ")
    serversock.listen()
    sock, address = serversock.accept()#サーバの接続受け入れ
    try: 
        receiveData( sock ) # 命令を受信して処理するループ
    except Exception as e:
        print(e)
    sock.close()
    serversock.close()

上記サーバーを実行すると、その"/usr/local/apps/"のディレクトリに、受信したファイル群を記憶できます。
このサーバーに対して、ファイル群を送信する テスト用コードをこのリンク先で紹介しています。

上記のreceiveDataが実行で、文字列メッセージやファイルを接続元のクライアント側から受け取ることができるようになります。
そして、文字列メッセージを受け取ると、tcp_receive_message_funcに記憶される関数が実行されます。 この変数の初期値は、受信文字列を標準出力へ表示する関数が記憶されています。
また、ファイルを受け取るとカレントディレクトリに記憶され、tcp_receive_file_funcに記憶される関数が実行されます。 この変数の初期値は、受信したファイルの名前とサイズを標準出力へ表示する関数が記憶されています。


起動サービスから呼び出されるTCPサーバーのモジュール(/usr/local/apps/raspiAPumeTcp.py)

start-app-select.pyから呼び出されるraspiAPumeTcp.pyです。
このモジュールは上記のumeusbとumeusbを利用して出来ています。
下記ではumetcp.tcp_receive_file_funcに、「"/usr/local/apps/"にファイルを格納した受信ファイル」が".umh"であれば、 そのデータ(UME専用Hexコマンド)をUSBを介して、UMEHOSHI ITAに転送・実行させる my_tcp_receive_file_func関数を記憶しています。
同様に、umetcp.tcp_receive_message_funcに、受信したUME専用Hexコマンド文字列をUMEHOSHI ITAに転送・実行させる my_tcp_recieve_message関数を記憶しています。
また、umeusb.usb_receive_funcに、UMEHOSHI ITAからのUSB応答処理文字列をTCPクライアントへ送信するmy_usb_receive_func関数を記憶しています。
#!/usr/bin/python3
# -*- coding: utf-8 -*-
#   TCPサーバープログラム(/usr/local/apps/raspiAPumeTcp.py)
#  [Raspberry Pi 3 Model A+]と[UMEHOSHI ITA]を乗せたモータ付き台車のサービスから呼び出される

import board
import busio
from adafruit_ssd1306 import SSD1306_I2C
from PIL import Image, ImageDraw, ImageFont
import time

i2c = busio.I2C(board.SCL, board.SDA)# --- I2C初期化 ---

# --- SSD1306ディスプレイ初期化 (128x64の場合) ---
oled = SSD1306_I2C(128, 64, i2c)

# --- 画面をクリア ---
oled.fill(0)
oled.show()

# --- Pillowで描画領域を作成 ---
image = Image.new("1", (oled.width, oled.height))
draw = ImageDraw.Draw(image)
font = ImageFont.load_default()# --- フォント設定 ---

def draw_text(txt: str, row=0, font=font, fill=255):
   # --- テキスト描画 (0=黒、255=白)上記設定で、横21文字---
   draw.text((0, row*20), txt , font=font, fill=255)
   # --- 画面に表示 ---
   oled.image(image)
   oled.show()

def oled_clear():
   oled.fill(0)   # --- クリア
   oled.show() # ---表示

import os
import umetcp
from umetcp import send_message, receiveData
import socket
import umeusb
import traceback

sock = None # クライアントと通信するソケット

def my_tcp_receive_file_func(filepath):
    ''' 受信したumehoshiアプリ用「.umh」のファイルより、
     「UME専用Hexコマンド」の文字列をusbへ出力する'''
    name,ext = os.path.splitext(filepath)
    if not ext == ".umh" : return
    with open( filepath ) as f: ss=f.readlines()
    ss="".join(ss[1:])
    print(ss)
    umeusb.send_cmd(ss, quantity=0) # TCPで受信した 「.umh」データを[UMEHOSHI ITA]へ送る

umetcp.tcp_receive_file_func = my_tcp_receive_file_func # tcpファイル受信データの処理を置き換え

def my_tcp_recieve_message(msg):
   umeusb.send_cmd(msg)

umetcp.tcp_receive_message_func=my_tcp_recieve_message  # TCO受信文字列(UME専用Hexコマンド)処理を置き換え

def my_usb_receive_func(bin):
    'usb受信のイベントで実行するデフォルト処理'
    global sock
    #print("-----------",bin)
    ss = bin.decode('utf-8')
    a=ss.split('\n')
    for s in a:
        s = s.strip()
        if sock != None:
            print(s)
            send_message(sock, s) # [UMEHOSHI ITA]からの応答メッセージをTCPで返す
        else: print( s )

umeusb.usb_receive_func = my_usb_receive_func # USB受信データの処理を置き換える。

umeusb.init_sub()
import threading
t_id = threading.Thread(target=umeusb.read_loop)
t_id.start()

oled.contrast(128) # 0?255
draw_text(f"UMEHOSHI ITA",0)

#ip="192.168.4.1"
ip=umetcp.get_wlan0_ip() # IPアドレス取得
while ip == None:
   ip=umetcp.get_wlan0_ip()
   if ip : break
   time.sleep(0.1)

umeusb.send_cmdfile("/usr/local/apps/uStartInit.umh") # ロボット初期化
server_addr =(umetcp.get_wlan0_ip(), 59154)
hostname=socket.gethostname()
draw_text(f"{hostname}",1)
draw_text(f"{server_addr[0]},{server_addr[1]}",2)

print("Serverの情報:",hostname, server_addr)
serversock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversock.bind(server_addr)  # IPとポート番号を指定します
print("接続要求を待つ")
serversock.listen()
while True:
   print("接続を待って、接続してきたら許可")
   sock, address = serversock.accept()#サーバの接続受け入れ
   print("接続相手:",address)
   try:
      receiveData( sock ) # 受信ループ
   except Exception as e:
      print(f"例外の種類: {type(e)}")
      print(f"スタックトレース: {traceback.format_exc()}")
      print(e, "sock.close()")
   sock.close()