UMEHOSHI ITA TOP PAGE COMPUTER SHIEN LAB
esp32側をpythonでTCPサーバーにし、それを付けたUMEHOSHI ITA基板側のモータ制御ROM関数を呼び出してモータを動かす構成の紹介です。
(このサーバーに接続するPC側のPython用クライアントは、このページで紹介しています。)
これでPCからWifiを介し、esp32で次の制御を行います。
その制御は、ファイルの転送、esp32側のファイルリスト取得、Pythonファイルの実行です。
よって、Pythonファイルがモータ制御であれば、Wifiを介した遠隔でモータ制御ができます。
ロボットのハード構成は、こちらのページで紹介しています。
MicroPythonのインストールは、こちらのページで紹介している内容で行います。
これを行うには、UMEHOSHI ITAをスルーモードで行います。
この状態は、PCとUSBで接続してPC側の「Tera Term」のターミナルで、MicroPythonの会話モードが動作できます。
(UMEHOSHI ITAをスルーモードにする操作:
SW1とSW2を押し、SW1を離してLED1が消灯に変わるまでSW2を押す。約3秒)
UMEHOSHI ITAをスルーモードとは、接続したUSBでPCから送った情報がそのままUARTを介してesp32に送られるモードで、逆にesp32が出力した情報はPCへ送信されます。
この状態で、PC側のampyコマンドを使うことにより、esp32へのファイル転送操作やesp32側でのpythonプログラムの実行ができす。
ampyの使い方は、こちらのページで紹介しています。
EEPROM化は、こちらのページで紹介している内容で行います
これを行うには、UMEHOSHI ITAをUART1コマンドモードで行います。
この状態は、UARTでUMEHOSHI ITAで、UART1やUSBで受信した'S'、'R'、'G'から始まる文字列命令「UME専用Hexコマンド」を処理できます。
(なおUMEHOSHI ITAをUART1コマンドモードにする操作は、
SW1とSW2を押し、SW1を離してLED1が消灯し、それから点灯に変わるまでSW2を押す。約4秒)
このROM化により、PCとUSBで接続してPC側の「Tera Term」のターミナルで
EEPROM内のエントリーポイントの文字列を送るだけで、対応する関数を起動できます。
例えば、「Tera Term」からEEPROMのブザーを鳴らすエントリポイントを実行する"R009D020D00003B"の文字列を送ると、
START:9D020D00の応答文字列で応答すると共にブザーが鳴ります。
R009D020D00003B
START:9D020D00
なお、esp32の起動メッセージなど余計な文字列も表示されますが、それは無視してください。
このUMEHOSHI ITAをUART1コマンドモードでは、
esp32がUMEHOSHI ITAをのUART1に接続されるため、
esp32のMicroPythonのprintで出力した'S'、'R'、'G'から始まる文字列は
UMEHOSHI ITAの命令「UME専用Hexコマンド」として処理さているのです。
さて 上記操作は UMEHOSHI ITAをUART1コマンドモードにするために約4秒が必要になっていますが、
その待ち時間無しで初めからUART1コマンドモードで起動させます。
同時にエコーモードオフと、前述のEEPROM内の
モーターのPWM制御などの初期化関数AdrStart(0x9D020010番地)を行う次の関数を、ROM化します。
それには、次のコードをumehoshiEditの開発ツールで実行させます。
#include <xc.h> #include "common.h" #define BASE_FUNC 0x9D020000 // EEPROM領域配置用(パワーONでないリセット操作の実行アドレス) #define AdrEsp32Init (BASE_FUNC+0x1000) // esp32 の初期化関数 #define AdrStart (BASE_FUNC+0x0010) // アプリの起動時の初期ルーチンのアドレス // esp32 の初期化関数 起動アドレス:0x9D020000+0x1000=0x9D021000 // この0x9D021000を0x9D03fff4番地に記憶しておく予定。 // これで、パワーオン時でSW2(白)スイッチが押した時に実行する。 __attribute__((address( AdrEsp32Init ))) void Esp32Init(void); void Esp32Init() { _set_int_var(_SPECIFIED_flagEcho, 0); // エコー制御変数を0にして、エコーをOFFにする。 _set_int_var(_SPECIFIED_uart_command_mode , 1); // UARTコマンドモードに指定する変数を1。 _set_int_var(_SPECIFIED_mode_change_request , 1); // 上記コマンドモードへの変更要求 void (*func)(void) = (void (*)(void) )AdrStart;//モーター制御の初期化関数アドレスの設定 func(); // 上記モーター制御の初期化 }上記をumehoshiEditの開発ツールで実行することで、 「エコーOFF、UARTコマンドモード指定、モーター制御の初期化関数」を実行するEsp32Init関数が、0x9D021000番地のROM領域に書き込まれます。
import time # setap.py time.sleep(5) import network ap = network.WLAN(network.AP_IF) # create access-point interface ap.config(essid='ESP-AP') # set the ESSID of the access point ap.config(max_clients=10) # set how many clients can connect to the network ap.ifconfig(('192.168.222.1', '255.255.255.0', '192.168.222.1', '8.8.8.8')) # set (ip address, subnet mast, default gateway, dns ip address) ap.active(True) # activate the interface
import os # ume.py import sys import socket import _thread import time #import esp #esp.osdebug(None) # ベンダ O/S デバッグメッセージをオフにする path_datas = "./" flag_listen_loop=True sock2=None # TCPクライアントと通信するソケット def tcp_send_message(sock,msg): # sockのクライアント側にメッセージ送信 bin=('M'+msg+"\r\n").encode("utf-8") if sock : sock.sendall(bin) def send(msg):# sock2のクライアント側にメッセージ送信 msg=f"{msg}" # 文字列へ変換 if sock2 is None: return a=msg.split("\n") for s in a: tcp_send_message(sock2,s) def ls_filelist(sock,path): # ファイルリストの文字列をクライアントに送信 tcp_send_message(sock,"ls_filelist:" + path) if path == "": path="." try: for f_name in os.listdir(path) : file_info = os.stat(f_name) file_size = file_info[6] d_inf = 'dir' if (file_info[0] & 0x4000 ) else ' ' # ディレクトリ判定 msg = f"{d_inf} {file_size:7} {f_name}" tcp_send_message(sock, msg) except Exception as e: tcp_send_message(sock,"ls_filelist Error:" + str(e)) def cat_filepath(sock, filepath): # filepathのテキスト内容をクライアントに送信 tcp_send_message(sock, "cat_filepath:" + filepath) try: with open(filepath) as fr: ss = fr.readlines() for s in ss: if s[-1] == '\n' : s=s[:-1] # 改行を除く tcp_send_message(sock, s) except Exception as e: tcp_send_message(sock,"cat_filepath Error:"+ str(e)) def del_filepath(sock, filepath): # filepathのファイルを削除 tcp_send_message(sock, "del_filepath:" + filepath) try: os.remove(filepath) except Exception as e: tcp_send_message(sock,"del_filepath Error:"+ str(e)) def import_name(sock, name):# nameのモジュールのPythonソースを実行 tcp_send_message(sock, "import_name:" + name) try: mod=__import__(name) # 動的なインポートで実行 del sys.modules[name] # 再度インポートできるように開放 except Exception as e: tcp_send_message(sock,"import_name Error:"+ str(e)) def send_file(sock, filepath):# filepathのファイルをクライアントに送信 try: idx_last_sep = filepath.rfind('\\') filename = filepath if idx_last_sep == -1 else filepath[idx_last_sep+1:] file_info = os.stat(filepath) filesize = file_info[6] s = "f{} {}".format(filename, filesize) bin=(s+"\r\n").encode("utf-8") sock.sendall(bin) #send filename filesize if filesize > 0: with open(filepath, "rb") as f: bin = f.read(filesize) sock.sendall(bin) # send binary file # tcp_send_message(sock,"server:send_file:"+ filepath) except Exception as e: tcp_send_message(sock,"server:send_file Error:"+ str(e)) def recieve_file(sock):# TCPでファイルを受信(1byte受信の繰り返し) buf=b"" while True: bin = sock.recv(1) if bin == b'': break buf += bin if buf[-2:] == b"\r\n": s = buf[0:-2].decode('utf-8') a=s.split(' ') filename, filesize = a[0], int(a[1]) f=open( path_datas + filename, "wb") # 受信ファイルの保存 n=0 tcp_send_message(sock,f"Starting to receive {filename}:size={filesize} Please wait.") while n < filesize: bin = sock.recv(1) if bin == b'': break f.write(bin) n += 1 if n % 10000 == 0: tcp_send_message(sock,f"\n{n}bytes were received.") # f.close() tcp_send_message(sock,f"\n{filename} size:{filesize}, {n}bytes receive end.") if filename.endswith(".umh"): send_cmdfile(sock,filename) # ファイルのコマンド内容をUMEHOSHI ITAへ送信 break # #print(filename + ":size=" + str(filesize)) # debug --------------- def recieve_file_Max(sock):# TCPでファイルを受信(バッファを最大して受信) buf=b"" while True: bin = sock.recv(1) if len(bin) != 1: break buf += bin if buf[-2:] == b"\r\n": s = buf[0:-2].decode('utf-8') a=s.split(' ') filename, filesize = a[0], int(a[1]) bin=b"" while len(bin) < filesize: bin += sock.recv(filesize-len(bin)) with open( path_datas + filename, "wb") as f: # 受信ファイルの保存 f.write(bin) tcp_send_message(sock,f"Server:'{filename}' {len(bin)}bytes received.") if filename.endswith(".umh"): send_cmdfile(sock,filename) # ファイルのコマンド内容をUMEHOSHI ITAへ送信 break # #print(filename + ":size=" + str(filesize)) # debug --------------- def recieve_message(sock): # 受信メッセージに対する処理 global flag_listen_loop buf=b"" while True: bin = sock.recv(1) if len(bin) != 1: flag_listen_loop = False return False # buf += bin if buf[-2:] == b"\r\n": s = buf[0:-2].decode('utf-8').strip() if s == "end_listen_loop": # 受信文字列がこれと一致するなら終了 flag_listen_loop = False return False # if s.startswith("ls -l"): # ファイルリスト要求に対する送信処理 ls_filelist(sock, s[5:].strip() ) return True # if s.startswith("cat "):# ファイル内容表示要求に対する送信処理 cat_filepath(sock, s[4:] ) return True # if s.startswith("del "):# ファイル削除要求に対する処理 del_filepath(sock, s[4:] ) return True # if s.startswith("import "):# Pythonファイルの実行要求に対する処理 import_name(sock, s[7:] ) return True # if s.startswith("get "): # ファイル要求に対するファイル送信 send_file(sock, s[4:] ) return True # if s.startswith("exc "): # .umhファイルの内容で、[UMEHOSHI ITA]のUART1に出力する。 send_cmdfile(sock, s[4:] ) return True # if len(s)>1 and s[0] in "SRG": print( s ) # 受信文字列を、UME専用Hexコマンドと判断して、UMEHOSHI ITAへ送信 else: tcp_send_message(sock,s) # 受信文字列を、TCPクライアントへECHO送信 # return True def receiveData(sock): # 受信処理ループ # tcp_send_message(sock,"start sever receiveData" ) while True: bin = sock.recv(1)#1byte recieve if len(bin) != 1: break if bin == b"f" : recieve_file(sock) elif bin == b"F" : recieve_file_Max(sock) elif bin == b"M" : recieve_message(sock) else: tcp_send_message(sock,"server receive error:" + str(bin) ) # # ------------- UART ------------------- flag_input_loop=False cmd='''S1080007F0000E0FFBD271C00BFAF1800BEAF21F0A003F0 S1080007F10001000C0AF00A0023CF44142340000428C95 S1080007F200009F840000000000000A0023CFC414234B6 S1080007F300000A0033CF84163340000638C000043AC98 S1080007F400000A0023C0C4242340000428C01000424C8 S1080007F50000F0005240100062409F8400000000000E9 S1080007F600021E8C0031C00BF8F1800BE8F2000BD2732 S0880007F70000800E0030000000019 R0080007F000049 ''' # 終了を知らせるループビープ音のRAM配置用のUME専用Hexコマンド log_fw=open( "log.txt", "a") # TCP通信ができない時のesp32への入力状態を記録するログ # def input_loop(): # UMEHOSHI ITA のUART1出力を読み取ってTCPクライアントに送信するループ global flag_input_loop try: flag_input_loop = True s="" while flag_input_loop: c = sys.stdin.read(1) s += c if c != '\n': continue if sock2 is None: log_fw.write(s) time.sleep(0.1) s="" continue s = s[:-1:] # 最後の改行を除く if s.strip() != '' : # Response message from ume received by esp32 tcp_send_message(sock2, s + ":Response") # time.sleep(0.001) if s == "end_input_loop": break s="" # tcp_send_message(sock2, "End! input_loop\r\n") except Exception as e: # print("Error :" , e) # debug --------------- tcp_send_message(sock2,"input_loop Error:"+ str(e)) log_fw.write("input_loop Error:"+ str(e) + "\n") log_fw.close() # ss のUME専用Hexコマンドを[UMEHOSHI ITA]のUART1に出力する。 def send_cmd( ss, cmdchar="SRG"):# example ss=[S048000800000FFAF000086] a = ss.split('\n') for s in a: s=s.strip() if s == "" or not s[0] in cmdchar : continue print( s ) # Command string sent from esp32 to ume tcp_send_message(sock2, s + ":Command" ) time.sleep(0.1) # .umhファイルの内容で、[UMEHOSHI ITA]のUART1に出力する。 def send_cmdfile(sock,filepath, cmdchar="SRG"): try: with open(filepath , "r", encoding="utf-8") as f: ss = f.readlines() if filepath.endswith(".umh") : ss = ss[1:] ss="".join(ss) send_cmd( ss, cmdchar) # ss のUME専用Hexコマンドを[UMEHOSHI ITA]のUART1に出力 except Exception as e: # print("Error :" , e) # debug --------------- tcp_send_message(sock,"send_cmdfile Error:"+ str(e)) # ------------- Main ------------------- portnumber=59154 ip = "192.168.222.1" #ip = "192.168.0.110" server_addr =(ip, portnumber) serversock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serversock.bind(server_addr) time.sleep(3.1) # 3.1秒待機 ([UMEHOSHI ITA]の初期を待つため) thread_id = _thread.start_new_thread(input_loop, ()) # 入力用スレッド起動 serversock.listen(1) # TCPサーバ受け入れ待機 while flag_listen_loop: # sock2, address = serversock.accept() # クライアントの接続許可 serversock.close() tcp_send_message(sock2, "\nConnect!") # TCPクライアントへのメッセージ try: receiveData( sock2 ) # 受信応答処理のループ except: pass print(cmd) # ― ― ― ― の音で終了処理 sock2.close()上記コードの補足: ファイル受信関数が、「recieve_file_Max」と「recieve_file」の2つあります。
#include#include "common.h" // RAMで動かす場合の絶対アドレス範囲は、(0x80005000〜0x80008000)です。 __attribute__((address( 0x80007F00 ))) void test_main (void); void test_main(){ _clear_beep_code();// 以前の音の登録情報を消去 _UM_PTR_GOTO_BEEP=_UM_PTR_BEEP_AREA; // ループ音に設定 _debug_hex4(1,0x0F,1);//― ― ― ― }
import setap import server