Page 1 of 1

Python-OSC, how to share blinks/jaw clenches with other functions?

Posted: Sat Sep 17, 2022 4:37 am
by banjo
This is most probably more a Python related question than directly related to MindMonitor or Python-OSC, but as I've not found an answer on the web, I thought I'd ask here.
Below is skeletal code for a very simple Pong-game which I'm ultimately trying to control with blinks and jaw clenches instead of from the keyboard. Where I'm stuck is how to share the amount of blinks from the function def blink_handler to the calling function pong. The error I get is NameError: name 'blinks' is not defined on row 137 (print(blinks)).
Might this be because threading is being used, and global variables are not shared? Any clue how to fix this, or can I avoid threading somehow?

Code: Select all

from pythonosc.dispatcher import Dispatcher
from pythonosc.osc_server import BlockingOSCUDPServer
from tkinter import *
import tkinter as tk
import time
import random
import threading

global isFailed

IP = "0.0.0.0"
PORT = 5000

def muse_handler(address, *args):
    """handle all muse data"""
    print(f"{address}: {args}")

def eeg_handler(address, *args):
    """handle raw eeg data"""
    print(f"{address}: {args}")

def blink_handler(address, *args):
    global blinks
    """handle blink data"""

    blinks = blinks + 1
    print("Blink detected ", blinks)

def jaw_handler(address, *args):
    """handle jaw clench data"""

    print("Jaw Clench detected")


def get_dispatcher():
    dispatcher = Dispatcher()
    dispatcher.map("/muse/elements/blink", blink_handler)
    dispatcher.map("/muse/elements/jaw_clench", jaw_handler)
    
    return dispatcher


def start_blocking_server(ip, port):
    server = BlockingOSCUDPServer((ip, port), dispatcher)
    server.serve_forever()  # Blocks forever

def moveBaseLR(base, dir, x, y = 0):
    x1, y1, x2, y2 = c.coords(base)
    if ((x1 > 0 and dir == 'l') or (x2 < 400 and dir == 'r')):
        c.move(base, x, y)
        c.update()

def dispatch():
    global dispatcher

    dispatcher = get_dispatcher()
    start_blocking_server(IP, PORT)

def startBall(ball, sp):

    s = random.randint(-sp, sp)
    x, y = s, 0-sp #Start. Ball to move in random direction. 0-sp is used to get negative value
    c.move(ball, x, y)

    for p in range(1, 500000):
        l, t, r, b = c.coords(ball)
        txtS.delete(0, END)
        txtS.insert(0, str(p))
        #Need to change direction on hitting wall. Eight options are there.
        if(r >= 400 and x >= 0 and y < 0): #Ball moving ↗ and hit right wall
            x, y = 0-sp, 0-sp
        elif(r >= 400 and x >= 0 and y >= 0): #Ball moving ↘ and hit right wall
            x, y = 0-sp, sp
        elif(l <= 0 and x < 0 and y < 0): #Ball moving ↖ and hit left wall
            x, y = sp, 0-sp
        elif(l <= 0 and x < 0 and y >= 0): #Ball moving ↙ and hit left wall
            x, y = sp, sp
        elif(t <= 0 and x >= 0 and y < 0): #Ball moving ↗ and hit top wall
            x, y = sp, sp
        elif(t <= 0 and x < 0 and y < 0): #Ball moving ↖ and hit top wall
            x, y = 0-sp, sp
        elif(b >= 385): #Ball reached base level. Check if base touches ball
            tchPt = l + 10 #Size is 20. Half of it.
            bsl, bst, bsr, bsb = c.coords(base)
            if(tchPt >= bsl and tchPt <= bsr): #Ball touch base
                n = random.randint(-sp, sp)
                x, y = n, 0-sp
            else: #Hit bottom. Failed
                c.itemconfigure(lblID, state='normal')
                global isFailed
                isFailed = True
                break
        
        time.sleep(.025)
        c.move(ball, x, y)
        c.update()
    
def restart():
    global isFailed
    if(isFailed == True):
        isFailed = False
        c.itemconfigure(lblID, state='hidden')
        c.moveto(base, 150, 385)
        c.moveto(ball, 190, 365)
        startBall(ball, ballsp)


def pong():
    global c, ball, txtS, base, lblID, ballsp, blinks

    root = Tk()
    root.minsize(400,400)
    basesp = 10
    ballsp = 5
    global isFailed
    isFailed = False

    thread = threading.Thread(target=dispatch)
    thread.daemon = True
    thread.start()

    c = Canvas(width=400, height=400, background='#a0aa00')
    c.pack()
    base = c.create_rectangle(150, 385, 250, 400, fill='blue', outline='blue')
    ball = c.create_oval(190, 365, 210, 385, fill='red', outline='red')    
    txtS = tk.Entry(c, text='0')
    txtScore = c.create_window(350, 0, anchor='nw', window=txtS)

    lblM = tk.Label(c, text='Failed!!!Press Enter key to start again')
    lblID = c.create_window(100, 190, anchor='nw', window=lblM)
    c.itemconfigure(lblID, state='hidden')

    root.bind("<KeyPress-Left>", lambda event: moveBaseLR(base, 'l', 0-basesp))
    root.bind("<KeyPress-Right>", lambda event: moveBaseLR(base, 'r', basesp))
    root.bind("<Return>", lambda event: restart())
    
    print(blinks)
    startBall(ball, ballsp)
    
    root.mainloop()


if __name__ == "__main__":

    pong()

Re: Python-OSC, how to share blinks/jaw clenches with other functions?

Posted: Sat Sep 17, 2022 10:52 am
by James
You need to define blinks outside of the function in order to reference it as a global.
Add

Code: Select all

blinks = 0
under the IP/port declaration on line 13.
Note I've not checked any other part of this code, just what you asked about ;-)

Re: Python-OSC, how to share blinks/jaw clenches with other functions?

Posted: Sat Sep 17, 2022 2:05 pm
by banjo
Thx James!
Your suggestion helped me to hammer out a working version, including it below for anyone to use. It is just a proof of concept, so has no scoring system or other features expected of a real game.

Code: Select all

# Very simple Pong game with infinite lives
# Scoring system to be implemented by the reader :-)
# Needs Muse EEG-device and MindMonitor app
#
# Usage: Blink to move the base to one direction, 
#        following blink will stop moving it,
#        and yet following move it to the opposite direction


from pythonosc.dispatcher import Dispatcher
from pythonosc.osc_server import BlockingOSCUDPServer
from tkinter import *
import tkinter as tk
import time
import random
import threading
from queue import Queue

global isFailed

blinks = 0

IP = "0.0.0.0"
PORT = 5000

def muse_handler(address, *args):
    """handle all muse data"""
    print(f"{address}: {args}")

def eeg_handler(address, *args):
    """handle raw eeg data"""
    print(f"{address}: {args}")

def blink_handler(address, *args):
    global blinks
    """handle blink data"""

    blinks = blinks + 1
    print("Blink detected ")


def jaw_handler(address, *args):
    """handle jaw clench data"""

    print("Jaw Clench detected")


def get_dispatcher():
    dispatcher = Dispatcher()
    dispatcher.map("/muse/elements/blink", blink_handler)
    dispatcher.map("/muse/elements/jaw_clench", jaw_handler)
    
    return dispatcher


def start_blocking_server(ip, port):
    server = BlockingOSCUDPServer((ip, port), dispatcher)
    server.serve_forever()  # Blocks forever

def dispatch(q):
    global dispatcher

    dispatcher = get_dispatcher()
    start_blocking_server(IP, PORT)


def moveBaseLR(base, dir, x, y = 0):
    x1, y1, x2, y2 = c.coords(base)
    if ((x1 > 0 and dir == 'l') or (x2 < 400 and dir == 'r')):
        c.move(base, x, y)
        c.update()
    elif dir == 'stop':
        c.move(base, 0, 0)
        c.update()


def startBall(ball, sp):

    s = random.randint(-sp, sp)
    x, y = s, 0-sp #Start. Ball to move in random direction. 0-sp is used to get negative value
    c.move(ball, x, y)

    for p in range(1, 500000):
        l, t, r, b = c.coords(ball)
        txtS.delete(0, END)
        txtS.insert(0, str(p))
        #Need to change direction on hitting wall. Eight options are there.
        if(r >= 400 and x >= 0 and y < 0): #Ball moving ↗ and hit right wall
            x, y = 0-sp, 0-sp
        elif(r >= 400 and x >= 0 and y >= 0): #Ball moving ↘ and hit right wall
            x, y = 0-sp, sp
        elif(l <= 0 and x < 0 and y < 0): #Ball moving ↖ and hit left wall
            x, y = sp, 0-sp
        elif(l <= 0 and x < 0 and y >= 0): #Ball moving ↙ and hit left wall
            x, y = sp, sp
        elif(t <= 0 and x >= 0 and y < 0): #Ball moving ↗ and hit top wall
            x, y = sp, sp
        elif(t <= 0 and x < 0 and y < 0): #Ball moving ↖ and hit top wall
            x, y = 0-sp, sp
        elif(b >= 385): #Ball reached base level. Check if base touches ball
            tchPt = l + 10 #Size is 20. Half of it.
            bsl, bst, bsr, bsb = c.coords(base)
            if(tchPt >= bsl and tchPt <= bsr): #Ball touch base
                n = random.randint(-sp, sp)
                x, y = n, 0-sp
            else: #Hit bottom. Failed
                global isFailed
                isFailed = True
                break
        
        time.sleep(.025)
        what = blinks % 4
        if what == 0:
            moveBaseLR(base, 'l', 0-basesp)
        if what == 1 or what == 3:
            moveBaseLR(base, 'stop', 0)
        if what == 2:
            moveBaseLR(base, 'r', basesp)

        c.move(ball, x, y)
        c.update()
    
def restart():
    global isFailed
    if(isFailed == True):
        isFailed = False

        c.moveto(base, 150, 385)
        c.moveto(ball, 190, 365)
        startBall(ball, ballsp)


def pong():
    global c, ball, txtS, base, lblID, ballsp, basesp, q, cur_state

    root = Tk()
    root.minsize(400,400)
    basesp = 5
    ballsp = 5
    global isFailed
    isFailed = False

    thread = threading.Thread(target=dispatch, args=(blinks,))
    thread.daemon = True
    thread.start()

    c = Canvas(width=400, height=400, background='#a0aa00')
    c.pack()
    base = c.create_rectangle(150, 385, 250, 400, fill='blue', outline='blue')
    ball = c.create_oval(190, 365, 210, 385, fill='red', outline='red')    
    txtS = tk.Entry(c, text='0')
    txtScore = c.create_window(350, 0, anchor='nw', window=txtS)

    root.bind("<KeyPress-Left>", lambda event: moveBaseLR(base, 'l', 0-basesp))
    root.bind("<KeyPress-Right>", lambda event: moveBaseLR(base, 'r', basesp))
    
    while 1:
        startBall(ball, ballsp)
        restart()
    root.mainloop()


if __name__ == "__main__":
    pong()