読者です 読者をやめる 読者になる 読者になる

俺の備忘録

個人的な備忘録です。

PS3のコントローラのイベントをHTTPでRaspberry Piに送信

python Flask Raspberry Pi

サマリー

過去記事(例えばこれ)でラジコンをPS3のコントローラでリモート操作する際に、 ノートPCとRaspberry Pi間の通信はTCP/IPをゴリ押しで使っていてスマートなやり方とは言えなかった。

そこで、今回はノートPCとRaspberry Pi間の通信をHTTP化し、整理してみた。 今後のラジコンづくりで利用予定。

クライアント側

PS3のコントローラを扱うためのライブラリとしてpygameを使用。 HTTP通信にはrequestsを使用。 発生したPS3のコントローラのイベントはJSON形式でPOST。

Ps3ControllerClient.py

#!/usr/bin/python
# -*- coding: utf-8 -*-

import pygame
from pygame.locals import *
import time
import json
import requests
from requests.exceptions import ConnectionError
import traceback
import logging


class Ps3Controller:
    """
    PS3のコントローラクラス
    """
    # pygameのイベント種別の定数とボタンイベント種別のマップ
    EVENT_TYPE_MAP = {JOYBUTTONDOWN: "DOWN",
                      JOYBUTTONUP: "UP"}

    # pygameのボタン種別の定数とわかりやすくするためにこちらで命名したボタン種別のマップ
    BUTTON_ID_MAP = {0: "SELECT",
                     1: "ANALOG_LEFT",
                     2: "ANALOG_RIGHT",
                     3: "START",
                     4: "UP",
                     5: "RIGHT",
                     6: "DOWN",
                     7: "LEFT",
                     8: "L2",
                     9: "R2",
                     10: "L1",
                     11: "R1",
                     12: "TRIANGLE",
                     13: "CIRCLE",
                     14: "CROSS",
                     15: "SQUARE"}

    # 扱うイベントのフィルタ(Event種別✕ButtonID) 傾き等のイベントは扱わない
    EVENT_FILTER = []
    for type_key in EVENT_TYPE_MAP.keys():
        for button_key in BUTTON_ID_MAP.keys():
            EVENT_FILTER.append((type_key, button_key))

    def __init__(self, handler):
        """
        コンストラクタ
        :param handler:PS3のコントローライベント発生時に実行するcallback関数
        :return:Ps3Controllerオブジェクト
        """
        pygame.joystick.init()
        joys = pygame.joystick.Joystick(0)
        joys.init()
        self._handler = handler

    def start(self):
        """
        PS3のコントローラからの入力を受付開始
        :return:void
        """
        pygame.init()

        while True:
            # イベントをハンドル
            self.handle_ps3_controller_event(pygame.event.get())
            time.sleep(0.1)

    def handle_ps3_controller_event(self, all_events):
        """
        :param all_events:PS3のコントローラで発生した全イベント
        :return:void
        """

        # 不要なイベントをフィルタ(傾き等のイベントは無視する)
        filtered_event = filter(lambda e:e.dict.has_key("button") and (e.type, e.dict["button"]) in Ps3Controller.EVENT_FILTER, all_events)

        if len(filtered_event) > 0:
            event_list = []
            for e in filtered_event:
                event = {'button_id': Ps3Controller.BUTTON_ID_MAP.get(e.dict["button"]),
                         'event_type': Ps3Controller.EVENT_TYPE_MAP.get(e.type)}
                event_list.append(event)
            # callback
            self._handler(event_list)

# Raspberry PiのIPアドレスまたはホスト名
RASPBERRY_HOST = "IP or Host Name"

# Raspberry側の待ち受けポート
RASPBERRY_PORT = "9999"

def send_ps3_controller_events(event_list):
    """
    PS3のコントローラのイベントをHTTPでRaspberry Piに送信する
    :param event_list:発生したPS3のコントロオーライベント一覧
    :return:void
    """
    try:
        # 以下のようなJSONをHTTPで送信する
        # {'events':[{'button_id': 'UP', 'event_type': 'DOWN'}, {'button_id': 'R2', 'event_type': 'UP'}]}
        requests.post("http://{0}:{1}/ps3ctrlevents".format(RASPBERRY_HOST, RASPBERRY_PORT),
                      data=json.dumps({"events": json.dumps(event_list)}),
                      headers={'Content-Type': 'application/json'})
    except ConnectionError as e:
        logging.error(e)
        logging.error(traceback.format_exc())
    except Exception as e:
        logging.error(e)
        logging.error(traceback.format_exc())

if __name__ == '__main__':
    try:
        # PS3コントローラ初期化
        c = Ps3Controller(send_ps3_controller_events)
        c.start()
    except KeyboardInterrupt:
        logging.info("terminated.")

サーバ側

サーバ側は、Pythonの軽量WebフレームワークであるFlaskを利用してさくっと実装。

RaspberryPiServer.py

#!/usr/bin/python
# -*- coding: utf-8 -*-

import traceback
import logging
import json
from flask import Flask, request, make_response

# Web Application
app = Flask(__name__)


@app.route('/ps3ctrlevents', methods=['POST'])
def ps3ctrlevent():
    """
    PS3のコントローラのイベント受付API
    :return:HTTP Response
    """
    if request.method == 'POST':
        try:
            ps3ctrlevents_handler(json.loads(request.data))
            response = make_response()
            response.headers["Content-Type"] = "application/json"
            response.data = json.dumps({"result": "accept"})
            return response
        except Exception as e:
            logging.error(e)
            logging.error(traceback.format_exc())


def ps3ctrlevents_handler(data):
    """
    PS3のコントローラのイベントを処理する
    :param data:PS3のコントローラのイベント情報(json)
    :return:void
    """
    events = data['events']

    # ここにラズパイにやらせたい処理を記述する################
    for event in json.loads(events):
        print event['button_id'], event['event_type']
    ##################################################

if __name__ == "__main__":
    try:
        app.run(host='0.0.0.0', port=9999, debug=True)
    except Exception as e:
        logging.error(e)
        logging.error(traceback.format_exc())

以上です。