- preliminary support for listening for messages, posting messages, and displaying status of network

- make title screen scores optimization optional
- add option to clear pop up messages queue when adding a message
This commit is contained in:
ohsqueezy 2024-01-09 19:22:54 -08:00
parent 9b9277bedb
commit 2fe057a351
3 changed files with 229 additions and 43 deletions

253
NS.py
View File

@ -10,7 +10,7 @@
# #
# This is the main file containing all the Pygame code. # This is the main file containing all the Pygame code.
import argparse, pathlib, operator, subprocess, sys, os import argparse, pathlib, operator, subprocess, sys, os, socket, select, time
# Auto-detect GPIO library # Auto-detect GPIO library
try: try:
@ -28,7 +28,7 @@ from time import sleep
from PIL import Image from PIL import Image
import pygame import pygame
from pygame import Surface, Color, mixer from pygame import Surface, mixer
from pygame.event import clear from pygame.event import clear
from pygame.mixer import Sound from pygame.mixer import Sound
from pygame.image import load, fromstring from pygame.image import load, fromstring
@ -145,6 +145,20 @@ class NS(Game, Animation):
else: else:
return self.level_index < other.level_index return self.level_index < other.level_index
class Peer:
"""
Scrapeboard game on the local area network. It is expected to be sending and receiving messages using socket
communication. It will be read and written to regularly in a separate thread.
"""
status = None
result = None
versus = False
level = None
def __init__(self, address, port):
self.address = address
self.port = port
def __init__(self): def __init__(self):
""" """
Parse the command line, set config types, initialize the serial reader, subscribe to events, and initialize child objects. Parse the command line, set config types, initialize the serial reader, subscribe to events, and initialize child objects.
@ -194,6 +208,18 @@ class NS(Game, Animation):
"cooldown-level-3", "first-combo-delay" "cooldown-level-3", "first-combo-delay"
] ]
}, },
"network":
{
"int": ["port", "diagnostics-size"],
"bool": "diagnostics",
"path": "diagnostics-font",
"list": "peers"
},
"pop-up":
{
"int": ["size", "length"],
"bool": "center"
},
"input": "input":
{ {
"bool": ["serial", "pi"] "bool": ["serial", "pi"]
@ -202,11 +228,12 @@ class NS(Game, Animation):
{ {
"float": "attract-gif-alpha", "float": "attract-gif-alpha",
"bool": ["effects", "alpha-effect-title", "qr-static"], "bool": ["effects", "alpha-effect-title", "qr-static"],
"path": "scores-font" "path": "scores-font",
"int": "scores-alpha"
}, },
"system": "system":
{ {
"bool": ["minimize-load-time", "enable-level-select"], "bool": ["minimize-load-time", "enable-level-select", "optimize-title-screen"],
"int": ["lives-boss-rush-mode", "lives-level-select-mode"] "int": ["lives-boss-rush-mode", "lives-level-select-mode"]
}, },
"pads": "pads":
@ -250,8 +277,7 @@ class NS(Game, Animation):
gpio.initialize_gpio() gpio.initialize_gpio()
# Launch a separate thread for reading the GPIO (and allowing its custom delays/sleeps). Use the daemon flag to force # Launch a separate thread for reading the GPIO (and allowing its custom delays/sleeps). Use the daemon flag to force
# exit when the main thread is killed (by a sigterm from systemctl stop) (?). # exit automatically when the main thread is killed.
self.gpio_kill = False
self.gpio_thread = Thread(target=self.read_gpio, daemon=True) self.gpio_thread = Thread(target=self.read_gpio, daemon=True)
self.gpio_thread.start() self.gpio_thread.start()
self.gpio_data = gpio.activity() self.gpio_data = gpio.activity()
@ -292,12 +318,11 @@ class NS(Game, Animation):
if not found: if not found:
raise SerialException("No usable serial port devices found. Use --no-serial for keyboard-only mode.") raise SerialException("No usable serial port devices found. Use --no-serial for keyboard-only mode.")
print(f"Using serial device at port {self.serial_reader.port}") print(f"Using serial device at port {self.serial_reader.port}")
self.serial_kill = False
self.serial_data = 0 self.serial_data = 0
self.reset_arduino() self.reset_arduino()
# Launch a separate thread for reading serial data # Launch a separate thread for reading serial data
self.serial_thread = Thread(target=self.read_serial) self.serial_thread = Thread(target=self.read_serial, daemon=True)
self.serial_thread.start() self.serial_thread.start()
Animation.__init__(self, self) Animation.__init__(self, self)
@ -340,11 +365,32 @@ class NS(Game, Animation):
# Draw the score sprites # Draw the score sprites
self.title.draw_scores() self.title.draw_scores()
# Initialize key input buffering
self.last_press = get_ticks() self.last_press = get_ticks()
# Initialize pop-up
self.register(self.close_pop_up) self.register(self.close_pop_up)
self.reset() self.pop_up_font = pygame.font.Font(self.get_resource(Dialogue.FONT_PATH), self.get_configuration("pop-up", "size"))
self.pop_up_font = pygame.font.Font(self.get_resource(Dialogue.FONT_PATH), 12)
self.pop_up_text = "" self.pop_up_text = ""
# Initialize networking
self.server = socket.create_server(("", self.get_configuration("network", "port")))
self.peers = {}
self.player_count = 1
if self.get_configuration("network", "peers"):
for peer in self.get_configuration("network", "peers"):
# Store peers in a dictionary where the key is the peer address
self.peers[peer] = NS.Peer(peer, self.get_configuration("network", "port"))
print(f"Added peer {peer}")
# Launch separate threads for listing and posting to peers
self.listen_thread = Thread(target=self.listen_to_peers, daemon=True)
self.listen_thread.start()
self.post_thread = Thread(target=self.post_to_peers, daemon=True)
self.post_thread.start()
self.reset()
# Clear events queue
clear() clear()
def pi_enabled(self): def pi_enabled(self):
@ -357,11 +403,11 @@ class NS(Game, Animation):
""" """
Test all connections of GPIO input pins. Test all connections of GPIO input pins.
""" """
while not self.gpio_kill: while True:
self.gpio_data = gpio.activity() self.gpio_data = gpio.activity()
def read_serial(self): def read_serial(self):
while not self.serial_kill: while True:
name = self.get_configuration("input", "arduino-port") name = self.get_configuration("input", "arduino-port")
try: try:
transmission = self.serial_reader.readline().strip() transmission = self.serial_reader.readline().strip()
@ -416,6 +462,72 @@ class NS(Game, Animation):
if self.gpio_data[light_id]: if self.gpio_data[light_id]:
self.idle_elapsed = 0 self.idle_elapsed = 0
def post_to_peers(self):
"""
Update peers with current status every 1/2 second.
"""
while True:
# Determine this game's status
if self.title.active:
status = "title"
elif self.level_select.active and self.level_select.level_index_selected is None:
status = "level select"
elif not self.boss.battle_finished:
status = f"playing {self.level_select.level_index_selected}"
elif self.boss.player_defeated:
status = "lost"
else:
status = f"complete {boss.time_elapse}"
# Connect and send status message to each peer. If sending fails, pass and wait until the next iteration.
for peer in self.peers.values():
try:
socket.create_connection((peer.address, peer.port)).send(str.encode(status))
except:
pass
# Send status every 1/2 second
time.sleep(0.5)
def listen_to_peers(self):
"""
Update peer statuses by processing incoming messages on the socket server.
"""
while True:
# Use the server to receive messages. Update peer statuses as the messages come in.
read_list, write_list, except_list = select.select([self.server], [], [], 0.5)
# When there is no read list, there are no messages to accept.
if (len(read_list) > 0):
incoming = self.server.accept()
peer = self.peers[incoming[1][0]]
# All messages are less than 64 characters
message = incoming[0].recv(64).decode()
if message.startswith("complete"):
try:
peer.result = float(message.split()[-1])
peer.status = "complete"
except:
# Improperly formatted message received
pass
elif message.startswith("playing"):
try:
peer.level = int(message.split()[-1])
peer.status = "playing"
except:
# Improperly formatted message received
pass
else:
peer.status = message
def count_players(self):
"""
@return count of peers playing versus against this
"""
count = 1
for peer in self.peers.values():
count += peer.versus
return count
def reset(self, leave_wipe_running=False): def reset(self, leave_wipe_running=False):
self.idle_elapsed = 0 self.idle_elapsed = 0
self.suppressing_input = False self.suppressing_input = False
@ -468,20 +580,29 @@ class NS(Game, Animation):
self.reset() self.reset()
elif event.key == K_a: elif event.key == K_a:
self.reset_arduino() self.reset_arduino()
elif event.type == KEYDOWN and event.key == K_n and pygame.key.get_mods() & (pygame.KMOD_CTRL | pygame.KMOD_SHIFT):
# Toggle visibility of network diagnostics menu
state = self.get_configuration("network", "diagnostics")
self.configuration.set("network", "diagnostics", not state)
self.pop_up(f"Network diagnostics visible: {not state}")
self.last_press = get_ticks() self.last_press = get_ticks()
else: else:
if self.get_delegate().compare(event, "reset-game"): if self.get_delegate().compare(event, "reset-game"):
self.reset() self.reset()
def pop_up(self, text): def pop_up(self, text, clear=False):
""" """
Trigger a pop up message that displays for a certain amount of time before being closed automatically. Adds a line of Trigger a pop up message that displays for a certain amount of time before being closed automatically. Adds a line of
text to a variable that contains all pop up messages in case there is a previously sent message that needs to continue text to a variable that contains all pop up messages in case there is a previously sent message that needs to continue
being displayed. being displayed.
@param text message to display @param text message to display
@param clear if True, delete any existing messages
""" """
self.pop_up_text += f"{text}\n" if not clear:
self.pop_up_text += f"{text}\n"
else:
self.pop_up_text = f"{text}\n"
self.halt(self.close_pop_up) self.halt(self.close_pop_up)
self.play(self.close_pop_up, play_once=True, delay=3000) self.play(self.close_pop_up, play_once=True, delay=3000)
@ -548,28 +669,58 @@ class NS(Game, Animation):
self.chemtrails.update() self.chemtrails.update()
self.boss.update_dialogue() self.boss.update_dialogue()
self.wipe.update() self.wipe.update()
# Draw the pop up text line by line if there is any
pop_up_y = 0 # Draw pop up text line by line
for line in self.pop_up_text.split("\n"): if self.pop_up_text:
if line: width = 0
surface = self.pop_up_font.render(line, False, (0, 0, 0), (255, 255, 255)) height = 0
self.get_display_surface().blit(surface, (0, pop_up_y)) for line in self.pop_up_text.split("\n"):
pop_up_y += surface.get_height() if line:
line_width, line_height = self.pop_up_font.size(line)
if line_width > width:
width = line_width
height += line_height
full_surface = pygame.Surface((width, height))
x = 0
y = 0
for line in self.pop_up_text.split("\n"):
if line:
surface = self.pop_up_font.render(
line, True, pygame.Color(self.get_configuration("pop-up", "foreground")),
pygame.Color(self.get_configuration("pop-up", "background")))
full_surface.blit(surface, (x, y))
y += surface.get_height()
if y > 0:
sprite = Sprite(self)
sprite.add_frame(full_surface)
if self.get_configuration("pop-up", "center"):
sprite.location.center = self.get_display_surface().get_rect().center
sprite.update()
# Draw network diagnostics
if self.get_configuration("network", "diagnostics"):
y = self.get_display_surface().get_rect().bottom
font = pygame.font.Font(self.get_configuration("network", "diagnostics-font"),
self.get_configuration("network", "diagnostics-size"))
for peer in self.peers.values():
surface = font.render(
f"{peer.address} {peer.status} [PvP {peer.versus}, lvl {peer.level}, result {peer.result}]",
True, (255, 255, 255), (0, 0, 0))
y -= surface.get_height()
self.get_display_surface().blit(surface, (0, y))
# Reset the game when idle
self.idle_elapsed += self.time_filter.get_last_frame_duration() self.idle_elapsed += self.time_filter.get_last_frame_duration()
if self.idle_elapsed >= self.IDLE_TIMEOUT: if self.idle_elapsed >= self.IDLE_TIMEOUT:
self.reset() self.reset()
def end(self, event): def end(self, event):
""" """
Extend the parent end method to try adding a permanent quit feature in case there is a Raspbian Lite systemd autostart service running Extend the parent end method to try adding a permanent quit feature in case there is a Raspbian Lite systemd autostart
service running
""" """
if event.type == QUIT or self.delegate.compare(event, "quit"): if event.type == QUIT or self.delegate.compare(event, "quit"):
if self.confirming_quit or not self.get_configuration("input", "confirm-quit"): if self.confirming_quit or not self.get_configuration("input", "confirm-quit"):
# Kill serial threads
self.serial_kill = True
self.gpio_kill = True
# If SHIFT is pressed, try permanently stopping the systemd service to get a console back in case this is running on # If SHIFT is pressed, try permanently stopping the systemd service to get a console back in case this is running on
# Raspbian Lite # Raspbian Lite
if pygame.key.get_mods() & pygame.KMOD_SHIFT: if pygame.key.get_mods() & pygame.KMOD_SHIFT:
@ -645,6 +796,7 @@ class LevelSelect(Animation):
self.previews[-1].add_frame(frame) self.previews[-1].add_frame(frame)
self.previews[-1].location.midbottom = self.platforms[level_index].view.location.centerx, \ self.previews[-1].location.midbottom = self.platforms[level_index].view.location.centerx, \
self.platforms[level_index].view.location.top - 12 self.platforms[level_index].view.location.top - 12
self.reset()
def activate(self): def activate(self):
self.active = True self.active = True
@ -728,7 +880,8 @@ class LevelSelect(Animation):
if self.grow_sound_channel is None: if self.grow_sound_channel is None:
self.grow_sound_channel = self.get_audio().play_sfx("grow", -1, x=platform.view.location.centerx) self.grow_sound_channel = self.get_audio().play_sfx("grow", -1, x=platform.view.location.centerx)
# Draw a growing ring around the currently pressed level # Draw a growing ring around the currently pressed level
angle = self.get_game().platform.press_elapsed / self.get_configuration("time", "level-select-press-length") * 2 * pi angle = self.get_game().platform.press_elapsed / \
self.get_configuration("time", "level-select-press-length") * 2 * pi
diameter = self.previews[level_index].location.height + 21 diameter = self.previews[level_index].location.height + 21
rect = pygame.Rect(0, 0, diameter, diameter) rect = pygame.Rect(0, 0, diameter, diameter)
rect.center = self.previews[level_index].location.center rect.center = self.previews[level_index].location.center
@ -1093,11 +1246,10 @@ class Title(Animation):
Handles displaying and drawing the title screen. It draws the high scores, creates and updates an attract mode video pop-up, tracks Handles displaying and drawing the title screen. It draws the high scores, creates and updates an attract mode video pop-up, tracks
the player's moves and checks if they are doing the start game pattern, and updates the background logo and giant Tony sprite. the player's moves and checks if they are doing the start game pattern, and updates the background logo and giant Tony sprite.
Notes Title.draw_scores is a slow method, so the scores should only be drawn when a score is added.
-----
* It should be directed to draw scores when a new score is added (and only then), and it will store the score surfaces and only blit If the game is configured to optimize on the title screen, the scores will only be blit when Title.activate is called. Otherwise,
them once when activated (unless the scores are updated). This way it only blits that section of the screen once. they will blit every update.
""" """
# Sequence of moves the player must do to start the game # Sequence of moves the player must do to start the game
@ -1110,6 +1262,7 @@ class Title(Animation):
@param parent GameChild object that will connect this GameChild object to the overall tree and root Game object @param parent GameChild object that will connect this GameChild object to the overall tree and root Game object
""" """
Animation.__init__(self, parent) Animation.__init__(self, parent)
self.active = False
# Set up attract mode pop-up # Set up attract mode pop-up
self.angle = pi / 8 self.angle = pi / 8
@ -1145,14 +1298,17 @@ class Title(Animation):
self.get_game().tony.set_frameset("static") self.get_game().tony.set_frameset("static")
self.get_audio().play_bgm("title") self.get_audio().play_bgm("title")
# Blit the scores # Optimization for only drawing part of the title screen
for sprite in self.score_sprites: if self.get_configuration("system", "optimize-title-screen"):
sprite.update()
# Optimize by setting a clip that excludes the area where the scores are drawn # Blit the scores
self.get_display_surface().set_clip( for sprite in self.score_sprites:
(self.score_sprites[0].location.right, 0, self.score_sprites[1].location.left - self.score_sprites[0].location.right, sprite.update()
self.get_display_surface().get_height()))
# Optimize by setting a clip that excludes the area where the scores are drawn
self.get_display_surface().set_clip(
(self.score_sprites[0].location.right, 0, self.score_sprites[1].location.left - self.score_sprites[0].location.right,
self.get_display_surface().get_height()))
def deactivate(self): def deactivate(self):
self.active = False self.active = False
@ -1238,9 +1394,9 @@ class Title(Animation):
def draw_scores(self): def draw_scores(self):
""" """
Create two columns, one for each side of the screen. Draw as many scores as can fit along each column, in order from best to worst, separating Create two columns, one for each side of the screen. Draw as many scores as can fit along each column, in order from
them evenly into categories: normal, advanced, and expert. Save the columns as sprites. Note that this doesn't support non-level select mode best to worst, separating them evenly into categories: normal, advanced, and expert. Save the columns as sprites. Note
anymore. that this doesn't support non-level select mode anymore.
""" """
ds = self.get_display_surface() ds = self.get_display_surface()
self.score_indicator = None self.score_indicator = None
@ -1290,6 +1446,10 @@ class Title(Animation):
y += self.draw_score_to_column(score, right_column, (x, y), rank) y += self.draw_score_to_column(score, right_column, (x, y), rank)
right_column_sprite.add_frame(right_column) right_column_sprite.add_frame(right_column)
right_column_sprite.location.topleft = x, 0 right_column_sprite.location.topleft = x, 0
if not self.get_configuration("system", "optimize-title-screen") and self.get_configuration("display", "scores-alpha") < 255:
alpha = self.get_configuration("display", "scores-alpha")
left_column.set_alpha(alpha)
right_column.set_alpha(alpha)
self.score_sprites = [left_column_sprite, right_column_sprite] self.score_sprites = [left_column_sprite, right_column_sprite]
def show_video(self): def show_video(self):
@ -1330,6 +1490,11 @@ class Title(Animation):
self.get_audio().play_sfx("land_0") self.get_audio().play_sfx("land_0")
self.get_game().tony.update() self.get_game().tony.update()
# Draw the scores unless optimized out
if not self.get_configuration("system", "optimize-title-screen"):
for sprite in self.score_sprites:
sprite.update()
# Bounce the GIF around the screen # Bounce the GIF around the screen
if self.video.location.right > dsr.right or self.video.location.left < dsr.left: if self.video.location.right > dsr.right or self.video.location.left < dsr.left:
self.angle = reflect_angle(self.angle, 0) self.angle = reflect_angle(self.angle, 0)
@ -2340,6 +2505,7 @@ class Boss(Animation):
animations that control attacks, effects, and dialog. animations that control attacks, effects, and dialog.
""" """
Animation.__init__(self, parent) Animation.__init__(self, parent)
self.battle_finished = False
# Set up sprites with boil, hit, and intro animations # Set up sprites with boil, hit, and intro animations
self.boss_sprites = [] self.boss_sprites = []
self.boss_sprite_arms = [] self.boss_sprite_arms = []
@ -2865,7 +3031,8 @@ class Boss(Animation):
def level_sprite(self, level_index=None): def level_sprite(self, level_index=None):
""" """
Return the boss sprite associated with this the given level index. If level index is not given, use the value in `self.level_index`. Return the boss sprite associated with this the given level index. If level index is not given, use the value in
`self.level_index`.
@param level_index index of the level of the requested sprite @param level_index index of the level of the requested sprite
""" """

18
config
View File

@ -30,6 +30,7 @@ attract-gif-alpha = 1.0
effects = True effects = True
alpha-effect-title = True alpha-effect-title = True
scores-font = BPmono.ttf scores-font = BPmono.ttf
scores-alpha = 230
qr-static = False qr-static = False
[system] [system]
@ -38,6 +39,23 @@ minimize-load-time = no
enable-level-select = yes enable-level-select = yes
lives-boss-rush-mode = 3 lives-boss-rush-mode = 3
lives-level-select-mode = 1 lives-level-select-mode = 1
optimize-title-screen = no
[network]
peers = 192.168.50.50, 192.168.50.77
port = 8080
delimeter = |
timeout = 10.0
diagnostics = no
diagnostics-font = resource/BPmono.ttf
diagnostics-size = 15
[pop-up]
size = 22
length = 3000
foreground = #ffffff
background = #000000
center = yes
[boss] [boss]
damage-per-hit-level-1 = 6.0 damage-per-hit-level-1 = 6.0

View File

@ -0,0 +1 @@
40143 0