Raspberry Pi

【プログラミング】Raspberry PiにGPIOケーブルが接続されているかを判定する方法。

2025.01.18 土
143 view
【プログラミング】Raspberry PiにGPIOケーブルが接続されているかを判定する方法。

Raspberry Piを買って最初のうちは配線を楽しんでいるころだろう。

そんな時にふとこう思ったことは無いだろうか。

配線が挿さっているか、抜かれているかを判別したい。

私は初めてRaspberry Piにケーブルを差し込んだ時、爆弾解除ゲームが思い浮かんだ。

そのゲームを作るには、配線が挿さっているか、抜かれているかを判別する必要がある。

ということで今回は、GPIOケーブルが接続されているか判定する方法を紹介する。

GPIOとは

ラズパイの基板上にあるピンのことで、さまざまな電子部品やセンサーと接続し、データの送受信を行うために使われる。

サンプルコード

スクリプトファイルの用意

まずは、Pythonのファイルを用意する。

今回は、GPIOの接続を判定するPythonスクリプトを用意する。

app.py

#app.py

app.pyはGPIOの接続を判定をするPythonスクリプトだ。

ディレクトリ構造をツリーで確認

project_directory/    # プロジェクトのルートフォルダ
└── app.py

ファイルは上記のディレクトリ構造で配置する。

これで準備は完了だ。

GPIOの接続を判定するコード

import RPi.GPIO as GPIO
import time
import threading

# GPIOの初期化
def setup_gpio(pins):
    GPIO.setwarnings(False)
    GPIO.cleanup()
    GPIO.setmode(GPIO.BCM)
    for pin in pins:
        GPIO.setup(pin, GPIO.IN)

# ワイヤーが切断されたか確認
def monitor_wire(wire, stop_event):
    while not stop_event.is_set():
        wire_state = GPIO.input(wire["pin"])

        # ワイヤーが接続されたままの場合(LOW)
        if not wire["connected"] and wire_state == GPIO.LOW:
            wire["connected"] = True
            print(f"{wire['pin']}番のワイヤーが接続された。")

        # ワイヤーが切断された場合(HIGH)
        elif wire["connected"] and wire_state == GPIO.HIGH:
            start_time = time.time()
            while GPIO.input(wire["pin"]) == GPIO.HIGH:
                if time.time() - start_time > 1:
                    wire["connected"] = False
                    print(f"{wire['pin']}番のワイヤーを切断した。")
                    break
        time.sleep(0.1)  # 少し待機してから再チェック

def main():
    wires = [
        {"pin":  4, "connected": False},
        {"pin": 17, "connected": False},
        {"pin": 27, "connected": False},
        {"pin": 22, "connected": False},
        {"pin": 10, "connected": False},
        {"pin":  9, "connected": False},
        {"pin": 11, "connected": False},
        {"pin":  5, "connected": False},
        {"pin":  6, "connected": False},
        {"pin": 13, "connected": False},
        {"pin": 19, "connected": False},
        {"pin": 26, "connected": False},
        {"pin": 14, "connected": False},
        {"pin": 15, "connected": False},
        {"pin": 18, "connected": False},
        {"pin": 23, "connected": False},
        {"pin": 24, "connected": False},
        {"pin": 25, "connected": False},
        {"pin":  8, "connected": False},
        {"pin":  7, "connected": False},
        {"pin": 12, "connected": False},
        {"pin": 16, "connected": False},
        {"pin": 20, "connected": False},
        {"pin": 21, "connected": False},
    ]
    setup_gpio([wire["pin"] for wire in wires])

    stop_event = threading.Event()
    threads = []

    # 各ワイヤーの監視スレッドを開始
    for wire in wires:
        thread = threading.Thread(target=monitor_wire, args=(wire, stop_event))
        threads.append(thread)
        thread.start()
    try:
        while True:
            time.sleep(0.5)
    except KeyboardInterrupt:
        print("\n操作が中断された。")
    finally:
        stop_event.set()  # 全スレッドを停止
        for thread in threads:
            thread.join()  # スレッドの終了を待つ

if __name__ == '__main__':
    main()

コードの説明

モジュールの読み込み

import RPi.GPIO as GPIO
import time
import threading

RPi.GPIOは、Raspberry Pi の GPIO ピンを制御するライブラリだ。

timeは、待機処理 (sleep()) や時間計測 (time.time()) に使用する。

threadingは、並行処理(複数のワイヤーを同時に監視する)に使用する。

GPIO の初期化

def setup_gpio(pins):
    GPIO.setwarnings(False)
    GPIO.cleanup()
    GPIO.setmode(GPIO.BCM)

    for pin in pins:
        GPIO.setup(pin, GPIO.IN)

GPIO.cleanup() を実行することで、前回の GPIO 設定が残っていてもリセットされる。

GPIO.setmode(GPIO.BCM) により、GPIO のピン番号は物理的なピン番号ではなく GPIO 番号で指定される。

GPIO.setup(pin, GPIO.IN) で各ピンを 入力モード(IN) に設定する。

ワイヤーの監視関数

def monitor_wire(wire, stop_event):
    while not stop_event.is_set():
        wire_state = GPIO.input(wire["pin"])  # 現在のピンの状態を取得

        # ワイヤーが接続されたままの場合(LOW)
        if not wire["connected"] and wire_state == GPIO.LOW:
            wire["connected"] = True
            print(f"{wire['pin']}番のワイヤーが接続された。")

        # ワイヤーが切断された場合(HIGH)
        elif wire["connected"] and wire_state == GPIO.HIGH:
            start_time = time.time()  # 切断が始まった時間を記録
            while GPIO.input(wire["pin"]) == GPIO.HIGH:
                if time.time() - start_time > 1:  # 1秒以上 HIGH 状態が続いたら切断と判断
                    wire["connected"] = False
                    print(f"{wire['pin']}番のワイヤーを切断した。")
                    break
        time.sleep(0.1)  # 0.1秒待機して再チェック

GPIO.input(wire["pin"]) で現在の GPIO ピンの状態を取得する。

LOW(0)なら ワイヤーが接続されている

HIGH(1)なら ワイヤーが切断されている

1秒以上 HIGH が続いたら「ワイヤーが切断された」と判定する

time.sleep(0.1) で 0.1 秒間隔で状態をチェックし続ける。

メイン処理

def main():
    wires = [
        {"pin":  4, "connected": False},
        {"pin": 17, "connected": False},
        {"pin": 27, "connected": False},
        {"pin": 22, "connected": False},
        {"pin": 10, "connected": False},
        {"pin":  9, "connected": False},
        {"pin": 11, "connected": False},
        {"pin":  5, "connected": False},
        {"pin":  6, "connected": False},
        {"pin": 13, "connected": False},
        {"pin": 19, "connected": False},
        {"pin": 26, "connected": False},
        {"pin": 14, "connected": False},
        {"pin": 15, "connected": False},
        {"pin": 18, "connected": False},
        {"pin": 23, "connected": False},
        {"pin": 24, "connected": False},
        {"pin": 25, "connected": False},
        {"pin":  8, "connected": False},
        {"pin":  7, "connected": False},
        {"pin": 12, "connected": False},
        {"pin": 16, "connected": False},
        {"pin": 20, "connected": False},
        {"pin": 21, "connected": False},
    ]
setup_gpio([wire["pin"] for wire in wires])

監視対象の GPIO ピンリスト を作成する。

各ピンに connected: False(未接続状態)を設定する。

setup_gpio() を呼び出して GPIO ピンを初期化する。

スレッドを使って複数のワイヤーを監視

stop_event = threading.Event()
threads = []

# 各ワイヤーの監視スレッドを開始
for wire in wires:
    thread = threading.Thread(target=monitor_wire, args=(wire, stop_event))
    threads.append(thread)
    thread.start()

stop_event は、全スレッドを停止するためのイベントを作成する。

各ワイヤーごとにスレッドを作成し、 monitor_wire() を並行実行する

threading.Thread() で新しいスレッドを作成する。

thread.start() でスレッド開始(並行処理)する。

ソースコードのダウンロード

これらのコードはGithubにアップロードしている。

コードのダウンロードはこちら (github.com)

上記のページに、すべてアップロードされているので、必要な方はぜひ使ってみてくれ。

まとめ

いかがだっただろうか。

無事に、GPIOに接続されたワイヤーを監視出来ただろうか。

このコードを使えば、ワイヤーを使った爆弾解除ゲームや黒ひげ危機一発などのゲームを再現することが出来るだろう。

ぜひ使ってみてくれ。

コメントはこちらから

必須コメント

必須ハンドルネーム