pythonメニュー

pythonのwebアプリ

Python web サーバ

次のように、python3の動作環境のみでWebサーバ起動できます。この動作は、静的なフHTMLァイルに限定されます。
python -m http.server --bind 127.0.0.1 80
(-m オプションで後述のライブラリモジュールをスクリプトとして実行させています。終了は「Ctrl+C」です。)
次のスクリプト「webtest1.py」を別途に作って動作させた場合とほぼ同じ動作です。

from http.server import HTTPServer, SimpleHTTPRequestHandler
host, port= '127.0.0.1', 80
httpd = HTTPServer((host, port), SimpleHTTPRequestHandler)
httpd.serve_forever()
GET や POST リクエストに対するレスポンスをカスタマイズするには http.server.SimpleHTTPRequestHandler を継承したクラスで do_GET や do_POST のメソッドをオーバーライドします。
from http.server import HTTPServer, SimpleHTTPRequestHandler
class MyHandler(SimpleHTTPRequestHandler):
    def do_GET(self):
        body = b'Hello Python HTML World'
        self.send_response(200)
        self.send_header('Content-type', 'text/html; charset=utf-8')
        self.send_header('Content-length', len(body))
        self.end_headers()
        self.wfile.write(body)

host, port= '127.0.0.1', 80
httpd = HTTPServer((host, port), MyHandler)
httpd.serve_forever()
また、WSGI(Web Server Gateway Interface) をサポートする HTTP サーバーの場合は、 をつくる場合、wsgiref モジュール(wsgiref.simple_server)を使います。

また、CGI サーバーをコマンドラインから起動させる場合は、次のように SimpleHTTPRequestHandler の代わりに CGIHTTPRequestHandlerを使います。
「python webserver.py」と実行することで、webサーバとして動作します。
# -*- coding: utf-8 -*-
#  python webserver.py

import http.server
from http.server import HTTPServer, CGIHTTPRequestHandler

class Handler(CGIHTTPRequestHandler):
    cgi_directories = ["/cgi-bin"]
    
server_address = ("", 80)
handler_class = Handler #ハンドラを設定
my_server = HTTPServer(server_address, handler_class)
my_server.serve_forever()
これは、後述するPython web アプリを起動できます。

Python web アプリ例

前述の方法で起動したサーバ「webserver.py」と同じ位置に、 「cgi-bin」のフォルダを作り、その中に 次のファイル[app1.py]を置いて使う。ブラウザで「http://localhost/app1.py」で閲覧します。
# -*- coding: utf-8 -*-
# app1.py の名前でファイルで作成

import cgi
import os
import sys
import io
 
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
 
# HTML文字列
html = '''
<!DOCTYPE html>
<html>
  <head>
    <style>
       body {{ font-size: 24pt }} /* 1つ波カッコは2つ並べて表現 */
    </style>
    <meta charset="UTF-8">
  </head>
  <body>
    {}
    <hr>
    {}
    <form action="app1.py">
    お名前:<input type="text" name='name'>
    <input type="submit">
    </form>
  </body>
</html>
'''
 
# フォームから値を取得
name = '未入力'
 
form = cgi.FieldStorage()
 
if 'name' in form:
    name = form['name'].value

result = 'お名前:{}<br />'.format(name)
 
# 結果を表示
print(html.format(
    'リクエストは {} です。'.format(os.environ['REQUEST_METHOD']),
    result, 
)

簡易セッションを自作実現した例

これは、defaultdictの情報をファイルに記憶することで、その情報をセッションスコープとして永続的に記憶するものです。
セッションはクッキー「名前:SESSID」で、この値をファイル名に使っています。
このSESSIDの値はデフォルトで現在時間の文字列が使われますが、 getSessionの引数で、強制的に文字列値を指定できます。
以下を「mysession.py」の名前で作ります。(これはセッションを実現するためのモジュールです。)
# -*- coding: utf-8 -*-
# <meta charset="UTF-8"> mysession.py

from collections import defaultdict
import dill
import os
import time
import re #正規表現のモジュール

set_cookie_SESSID=None  # Set-Coolie用文字列

# sessIDの辞書取得して戻す。
def getSession( sessID ):
   try:
       with open(sessID, 'rb') as f:
           sessDic = dill.load(f) # ロードして変数に復元
       return sessDic
   except: return defaultdict(str) # strの型のデフォルト値の空の辞書を返す

def saveSession( sessID,  sessDic ):
    with open(sessID, 'wb') as f:
         dill.dump(sessDic, f)

# セッションの辞書取得
# 'SESSID'のクッキーキーの値(セッションID)があれば、そのファイルより辞書を取得する。
# 無ければ、現在時間からセッションIDを生成し、「mysession.set_cookie_SESSID」に、
# セッション書き込み文字列を設定する。
def getSessDic( usrSESSID = None):
    sessID = None
    if  'HTTP_COOKIE' in os.environ.keys(): # クッキーがある?
        cookie=os.environ['HTTP_COOKIE']  # 取得例: 'AB=12,SESSID=15466.8484,IP=127.0.0.1'
        m=re.match('.*(SESSID=)([^,]*)', cookie)
        if m and m.lastindex >= 2 :
            sessID = m.group(2)
    if not sessID : # SESSIDのクッキーが無い?
        global set_cookie_SESSID
        sessID = str(time.time()) # 現在時間の文字列を取得(例:1545015666.8494184)
        if usrSESSID:
             sessID = usrSESSID
        set_cookie_SESSID = 'Set-Cookie: SESSID=' + sessID
    sessDic = getSession(sessID) # sessIDのファイルから辞書生成
    return sessDic, sessID

def main():
    sessDic=getSessDic()
    sessDic=getSession( "123456789.12345" )
    print(sessDic['AIの応答'])
    sessDic['AIの応答'] = 'こんばんは'
    saveSession(  "123456789.12345",  sessDic )

if __name__ == '__main__':
    main()
上記「mysession.py」を置いた場所あるwebアプリのコード例
# -*- coding: utf-8 -*-
# <meta charset="UTF-8">
import io
import os
import sys
import cgi

import cgitb
cgitb.enable() # エラー箇所を指摘させる設定。

# HTML文字列
html = '''
<!DOCTYPE html>
<html>
  <head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
  </head>
  <body>
<h1><script>
 document.write("Cookie:" +  document.cookie )
</script></h1>
<pre>
{}
</pre>
<form action="test.py" method="POST">
<input type="submit" name="AAA" value="押して">
</form>
  </body>
</html>
'''  

import mysession
sessDic,sessID = mysession.getSessDic() # セッションの辞書取得

sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')

form = cgi. FieldStorage()

if not sessDic['閲覧回数']: 
   sessDic['閲覧回数'] = "1"
else:
   sessDic['閲覧回数'] = str(int(sessDic['閲覧回数']) + 1) 

msg = "[<br>"
for param in os.environ.keys():
    msg += "<b>%-20s</b>: %s<br>" % (param, os.environ[param])
msg += '閲覧回数:' + sessDic['閲覧回数'] + "<br>"
msg += "<br>]<br>\n"
# 結果を表示


# HTTPのレスポンスヘッダー出力
print('Content-type: text/html')
if mysession.set_cookie_SESSID : # セッションが無ければ(セッション書き込み文字列があれば?)
    print( mysession.set_cookie_SESSID )
print('Set-Cookie: BB=bb')
print('Server: myServer')
print() # 空の改行

print( html.format( msg ) )# HTTPのレスポンスのボディ出力

mysession.saveSession( sessID,  sessDic ) # セッション用ファイルに保存


デスクトップ画面を配信するアプリ検討

これを実現するために必要なデスクトップ画面そ取得して、"scr.png"に保存するコード例を示す。

import pyautogui as pag # デスクトップスクリーンを制御するモジュール
import numpy as np
import cv2

img=pag.screenshot()# デスクトップ画像取得
img=np.asarray(img)
img=cv2.cvtColor(img,cv2.COLOR_RGB2BGR)
cv2.imwrite("scr.png", img)
このようにして作った"scr.png"を2秒ごとに再表示する次のHTML「index.html」を用意します。
これはhead内のmetaタグで、refreshによるリダイレクト機能を使っています。
<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8" />
    <meta name="viewport" content="width=device-width">
    <meta http-equiv="refresh" content="2; URL=">
    <style>
        body {
            font-size: large;
            background-color:aquamarine
        }
    </style>
    <title>Image distribution</title>
</head>
<body>
<img src="scr.png">    
</body>
</html>
これを利用すれば、Webサーバ側でデスクトップの画像を "scr.png"の名前で保存する繰り返しを行えば デスクトップを配信する構造ができます。
下記にそのWebサーバ「web_desktopserver.py」のコード例を紹介します。 (デスクトップの画像を1秒ごとに "scr.png"の名前で保存する別スレッドを動かしている。)
# -*- coding: utf-8 -*-
'''
単にWebサーバを起動させ、同時にデスクトップの画像を1秒ごとに "scr.png"の名前で保存する別スレッドを起動している。
別途で、"scr.png"の表示用index.htmlを用意することで、デスクトップ画面を配信するサーバとなる。
'''

import http.server
from http.server import HTTPServer, CGIHTTPRequestHandler
from socketserver import ThreadingMixIn 

class Handler(CGIHTTPRequestHandler):
    cgi_directories = ["/cgi-bin"]
    

class ThreadedHTTPServer(ThreadingMixIn, HTTPServer): 
    pass 


import pyautogui as pag # デスクトップスクリーンを制御するモジュール
import threading
import numpy as np
import cv2
import time
def capture():#
    count=0
    while True:
        try:
           img=pag.screenshot() # デスクトップ画像取得
           img=np.asarray(img)
           img=cv2.cvtColor(img,cv2.COLOR_RGB2BGR)
           cv2.imwrite("scr.png", img)
           count+=1
           print(count,"seconds")
        except Exception as e:
           print(e)
        #
        time.sleep(1)
    #
    print("終了")

t_id = threading.Thread(target=capture)
t_id.start()

server_address = ("", 3000)
handler_class = Handler #ハンドラを設定
my_webserver = ThreadedHTTPServer(server_address, handler_class)
my_webserver.serve_forever()


デスクトップ画面を配信するTCPソケットで実現するサーバの検討

# MySrcTcpServer.py
import socket
import threading

import numpy as np
import cv2
import pyautogui as pag # デスクトップスクリーンを制御するモジュール

server_ip = "127.0.0.1"
server_port = 80
listen_num = 5

tcp_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print(type(tcp_server), "のソケットオブジェクトの作成")

tcp_server.bind((server_ip, server_port))
print("ソケットにIPアドレス:{}ポート:{}を紐づける".
        format(server_ip,server_port))

tcp_server.listen(listen_num)
print("listenで接続可能状態へ移行")

def rec_send(client,head,body):
    client.send(head) # 送信
    lst=np.frombuffer( head , dtype="int16")#2バイト、リトルエンディアンで変換
    print("-------------", lst)
    client.send(body) # 送信
    client.close()

while True:
    img=pag.screenshot()
    img=np.asarray(img)
    img=cv2.cvtColor(img,cv2.COLOR_RGB2BGR)
    cv2.imwrite("scr.png", img)
    print( img.shape )
    a=np.array(img.shape, dtype="int16")
    head=b''.join( a ) # bytes型(文字列に似た操作が可能)
    size=img.shape[0]*img.shape[1]*img.shape[2]
    body=img.reshape(size,dtype="int16")
    body=body.tobytes()

    client,address = tcp_server.accept()
    print("クライアント接続許可:",type(client), address)

    t_id = threading.Thread(target=rec_send, args=(client, head, body))
    t_id.start()



TCPソケットで実現するWebサーバの実験

# MyWebServer
#このコード内に埋め込んである所定のHTMLだけを配信するWebサーバ

import socket
import threading

server_ip = "127.0.0.1"
server_port = 80
listen_num = 30

tcp_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print(type(tcp_server), "のソケットオブジェクトの作成")

tcp_server.bind((server_ip, server_port))
print("ソケットにIPアドレス:{}ポート:{}を紐づける".
        format(server_ip,server_port))

tcp_server.listen(listen_num)
print("listenで接続可能状態へ移行")

def rec_send(args):
    print("-------------------", type( args )  )
    client = args
    head = ""
    while True:
        data = client.recv(1)
        head +=data.decode('utf-8')
        idx = head.find("\r\n")
        if head == "\r\n": break
        if idx != -1 :
           print( head , end="")
           head = ""
    #
    html='''<html>
<body>
<h1>TEST</h1>
</body>
<html>
'''

    send_head='''HTTP/1.1 200 OK
Content-Type: text/html
Accept-Ranges: bytes
Content-Length: {}

'''
    body = html.encode('utf-8') # byteに変換
    send_head=send_head.format(len(body))
    send_head = send_head.encode('utf-8') # byteに変換
    client.send(send_head) # 送信
    print("送信")
    print(send_head.decode('utf-8'))
    client.send(body) # 送信
    client.close()

while True:
    client,address = tcp_server.accept()
    print("クライアント接続許可:",type(client), address)
    t_id = threading.Thread(target=rec_send, args=(client,))
    t_id.start()