354 lines
13 KiB
Python
354 lines
13 KiB
Python
import array
|
|
import config
|
|
import os
|
|
import pyaudio
|
|
import random
|
|
import typing
|
|
import tkinter
|
|
from tkinter import filedialog
|
|
|
|
|
|
os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "1"
|
|
import pygame
|
|
from pygame.locals import *
|
|
|
|
|
|
__author__ = "Trysdyn Black"
|
|
__license__ = "ACSL v1.4"
|
|
__version__ = "1.0.0"
|
|
__copyright__ = "Copyright 2022, Trysdyn Black"
|
|
|
|
|
|
class Audio:
|
|
"""Audio subsystem that monitors a microphone and determines avatar state.
|
|
|
|
Methods
|
|
-------
|
|
get_state() -> bool
|
|
Evaluate buffered audio data and return true if the average amplitude is over the configured threshold.
|
|
Indicates the mic is hot. Also drains the audio buffer.
|
|
"""
|
|
|
|
def __init__(self, channels: int, rate: int, frames: int, threshold: int = 5000, smoothing: int = 0) -> None:
|
|
"""
|
|
Parameters
|
|
----------
|
|
channels : int
|
|
Number of audio channels in the input device, usually 1 or 2 for mono/stereo
|
|
rate : int
|
|
Baud rate of the audio device, typically 44100 or 48000 as configured in Windows
|
|
frames : int
|
|
Number of times a second to check the audio buffer. The buffer is fully drained each check
|
|
threshold : int
|
|
Audio amplitude at which we consider the mic hot
|
|
smoothing : int
|
|
Number of frames we continue to consider the mic hot after it stops being hot
|
|
"""
|
|
p = pyaudio.PyAudio()
|
|
self.frames = frames
|
|
self.stream = p.open(
|
|
format=pyaudio.paInt16,
|
|
channels=channels,
|
|
rate=rate,
|
|
input=True,
|
|
frames_per_buffer=frames,
|
|
)
|
|
|
|
self.threshold = threshold
|
|
self.smoothing = smoothing
|
|
self.this_smooth = 0
|
|
|
|
def get_state(self) -> bool:
|
|
"""Check audio buffer and indicate if the avatar's mouth should be open this frame.
|
|
|
|
Returns
|
|
-------
|
|
bool
|
|
Returns true if the avatar's mouth should be considered open this frame
|
|
"""
|
|
# Grab a window of mic data into integer representations
|
|
data = self.stream.read(self.frames, exception_on_overflow=False)
|
|
a_data = array.array("h", data)
|
|
|
|
# Return true if we're over threshold or it's been fewer than smooth
|
|
# checks since the last time we were over threshold.
|
|
if max(a_data) > self.threshold:
|
|
self.this_smooth = self.smoothing
|
|
return True
|
|
elif self.this_smooth:
|
|
self.this_smooth -= 1
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
class Tube:
|
|
"""Encapsulates avatar and windowing logic, handles drawing.
|
|
|
|
Methods
|
|
-------
|
|
should_blink() -> bool
|
|
Evaluate all available settings and data and return true if the avatar should be blinking this frame.
|
|
update()
|
|
Perform one frame of checks, logic, and prep and draw the avatar.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
image_closed: str,
|
|
image_open: typing.Optional[str] = None,
|
|
image_blink_closed: typing.Optional[str] = None,
|
|
image_blink_open: typing.Optional[str] = None,
|
|
blink_chance: int = 0,
|
|
blink_frames: int = 0,
|
|
shake_intensity: int = 0,
|
|
shake_delay: int = 0,
|
|
win_size: typing.Optional[tuple] = None,
|
|
bg_color: tuple = (0, 255, 0),
|
|
) -> None:
|
|
"""
|
|
Parameters
|
|
----------
|
|
image_closed : string
|
|
Filename for the image to load for a opened eyes/closed mouth avatar state.
|
|
Mandatory.
|
|
image_open : string or None
|
|
Filename for the image to load for a opened eyes/opened mouth avatar state.
|
|
Defaults to None.
|
|
image_blink_closed : string or None
|
|
Filename for the image to load for a closed eyes/closed mouth avatar state.
|
|
Defaults to None.
|
|
image_blink_open : string
|
|
Filename for the image to load for a closed eyes/opened mouth avatar state.
|
|
Defaults to None.
|
|
blink_chance : float
|
|
A chance, out of 1.0, to activate blinking each frame of rendering.
|
|
Defaults to 0 to disable blinking.
|
|
blink_frames : int
|
|
How many frames to hold the eyes-closed state after activating a blink.
|
|
Defaults to 0 to disable blinking.
|
|
shake_intensity : int
|
|
How far in pixels to limit movement on each axis when shaking.
|
|
Defaults to 0 to disable shaking.
|
|
shake_delay : int
|
|
How many frames to prevent shaking after completing a prior shake.
|
|
Defaults to 0.
|
|
win_size : tuple(int, int) or None
|
|
Window size as a tuple of ints (width, height).
|
|
Defaults to None, which uses the image size of image_closed instead.
|
|
bg_color : tuple(int, int, int)
|
|
Background color as a tuple of ints (red, green, blue).
|
|
Defaults to (0, 255, 0) for greenscreen green.
|
|
"""
|
|
self.open_frames = 0
|
|
self.blinked_frames = 0
|
|
|
|
# These display settings are temporary. We can't load images without it
|
|
self.display = pygame.display.set_mode((400, 300))
|
|
pygame.display.set_caption("pyngtube")
|
|
|
|
# Closed mouth image is mandatory
|
|
self.image_closed = pygame.image.load(image_closed).convert_alpha()
|
|
|
|
# Open mouth image is optional
|
|
if image_open:
|
|
self.image_open = pygame.image.load(image_open).convert_alpha()
|
|
else:
|
|
self.image_open = None
|
|
|
|
# Blinking images are optional
|
|
if image_blink_closed:
|
|
self.image_blink_closed = pygame.image.load(image_blink_closed).convert_alpha()
|
|
else:
|
|
self.image_blink_closed = None
|
|
|
|
if image_blink_open:
|
|
self.image_blink_open = pygame.image.load(image_blink_open).convert_alpha()
|
|
else:
|
|
self.image_blink_open = None
|
|
|
|
# Establish our blink settings
|
|
self.blink_chance = blink_chance
|
|
self.blink_frames = blink_frames
|
|
|
|
# If we only have a closed-mouth image (why?) use it for open-mouth too
|
|
using_image_open = self.image_open if self.image_open else self.image_closed
|
|
|
|
# State map for which image to use. (Mouth Opened?, Blinked?)
|
|
# If we don't have blinked images, we use the non-blinked images for those slots
|
|
self.state_map = {
|
|
(False, False): self.image_closed,
|
|
(True, False): using_image_open,
|
|
(False, True): self.image_blink_closed if self.image_blink_closed else self.image_closed,
|
|
(True, True): self.image_blink_open if self.image_blink_open else using_image_open,
|
|
}
|
|
|
|
# If we specified a window size, use it. If not use closed image size
|
|
# Then we pad by 10px to allow for shaking
|
|
if not win_size:
|
|
win_size = self.image_closed.get_size()
|
|
|
|
self.win_size = (
|
|
win_size[0] + (shake_intensity * 2),
|
|
win_size[1] + (shake_intensity * 2),
|
|
)
|
|
|
|
# Background color to blank with, likely a greenscreen color
|
|
self.bg_color = bg_color
|
|
|
|
# Shaking intensity. Numer of frames to wait before shaking
|
|
self.shake_intensity = shake_intensity
|
|
self.shake_delay = shake_delay
|
|
|
|
# Create our drawing buffer and initialize true display size
|
|
self.buf = pygame.surface.Surface(self.image_closed.get_size())
|
|
self.display = pygame.display.set_mode(self.win_size, pygame.RESIZABLE)
|
|
|
|
def should_blink(self) -> bool:
|
|
"""Check if we're currently blinking and check blink_chance to indicate if we should be blinking this frame.
|
|
|
|
Returns
|
|
-------
|
|
bool
|
|
Returns true if the avatar's eyes should be considered closed this frame.
|
|
"""
|
|
# If we're blinking and haven't reached the configured blink duration yet
|
|
# extend blinking by a frame to keep the eyes shut.
|
|
# Otherwise roll blink chance and set blink if the roll wins.
|
|
if self.blinked_frames and self.blinked_frames < self.blink_frames:
|
|
self.blinked_frames += 1
|
|
return True
|
|
else:
|
|
if random.random() < self.blink_chance:
|
|
self.blinked_frames += 1
|
|
return True
|
|
|
|
self.blinked_frames = 0
|
|
return False
|
|
|
|
def update(self, opened: bool, blinked: bool) -> None:
|
|
"""Perform updates and drawing for one frame. This should be called once a frame and fed avatar state info on
|
|
open mouth and blinking. This handles all the other placement, style, and drawing functions from there.
|
|
|
|
Parameters
|
|
----------
|
|
opened : bool
|
|
true if the avatar's mouth should be open this frame.
|
|
blinked : bool
|
|
true if the avatar's eyes should be closed this frame.
|
|
"""
|
|
# Blank with bg_color
|
|
self.display.fill(self.bg_color)
|
|
self.buf.fill(self.bg_color)
|
|
|
|
# Jitter tracks our offset from origin for shaking
|
|
# It's applied as a transformation to image location each frame
|
|
jitter = (0, 0)
|
|
i = self.shake_intensity
|
|
|
|
# Figure out which image we're drawing this frame using our state map
|
|
this_frame_image = self.state_map[(opened, blinked)]
|
|
|
|
# If the mic state is opened and an open mouth image exists, blit it
|
|
# to the drawing buffer. Otherwise use the closed mouth image.
|
|
if opened and self.image_open:
|
|
self.open_frames += 1
|
|
|
|
# Randomize our jitter if we're currently shaking
|
|
if self.shake_delay and not self.open_frames % self.shake_delay:
|
|
jitter = (random.randint(-i, i), random.randint(-i, i))
|
|
else:
|
|
self.open_frames = 0
|
|
jitter = (0, 0)
|
|
|
|
# Draw our chosen image to a drawing buffer
|
|
self.buf.blit(this_frame_image, (0, 0))
|
|
|
|
# Resize the drawing buffer to the window size if necessary
|
|
# Scale on the vertical axis to maximum window size.
|
|
# If we're configured to shake the sprite on open mic, randomly
|
|
# jitter the sprite around the frame a bit while mic is open.
|
|
display_size = self.display.get_size()
|
|
shrunk_size = (display_size[0] - (i * 2), display_size[1] - (i * 2))
|
|
if self.buf.get_size() != shrunk_size:
|
|
new_buf = pygame.transform.scale(self.buf, shrunk_size)
|
|
self.display.blit(new_buf, (i + jitter[0], i + jitter[1]))
|
|
else:
|
|
self.display.blit(self.buf, (i + jitter[0], i + jitter[1]))
|
|
|
|
pygame.display.flip()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Hide Pygame "Hello world" stuff. Why does this require an envvar :/
|
|
|
|
# I use some not-great extreme shorthand in the main loop here:
|
|
# c = config object
|
|
# a = audio subsystem object
|
|
# t = pngtube object, the window and avatar object
|
|
quit = False
|
|
rehash = True
|
|
root = tkinter.Tk()
|
|
root.withdraw()
|
|
pygame.init()
|
|
|
|
# Load config
|
|
c = config.Config()
|
|
|
|
# Initialize audio system
|
|
a = Audio(
|
|
channels=2 if c.mic_stereo else 1,
|
|
rate=c.mic_rate,
|
|
frames=int(c.mic_rate / c.audio_checks),
|
|
threshold=c.threshold,
|
|
smoothing=c.smoothing,
|
|
)
|
|
|
|
while not quit:
|
|
# Reload our avatar if we've been instructed to rehash settings
|
|
if rehash:
|
|
t = Tube(
|
|
image_closed=c.image_closed,
|
|
image_open=c.image_open,
|
|
image_blink_closed=c.image_blink_closed,
|
|
image_blink_open=c.image_blink_open,
|
|
blink_chance=c.blink_chance,
|
|
blink_frames=c.blink_frames,
|
|
bg_color=c.bg_color,
|
|
shake_delay=c.shake_delay,
|
|
shake_intensity=c.shake_intensity,
|
|
)
|
|
rehash = False
|
|
for event in pygame.event.get():
|
|
# Clean shutdown if user clicks the [X]
|
|
if event.type == QUIT:
|
|
pygame.quit()
|
|
a.stream.close()
|
|
quit = True
|
|
break
|
|
# If the window gets resized, handle redrawing the window in the new size
|
|
elif event.type == pygame.VIDEORESIZE:
|
|
# Scale only on vertical axis. Force horizontal axis to match
|
|
# the drawing buffer's aspect ratio.
|
|
win_size = (event.w, event.h)
|
|
buf_ratio = t.buf.get_width() / t.buf.get_height()
|
|
t.win_size = (win_size[1] * buf_ratio, win_size[1])
|
|
t.display = pygame.display.set_mode(t.win_size, pygame.RESIZABLE)
|
|
# Mouse wheel up and down changes mic threshold.
|
|
# TODO: Provide some kind of user feedback
|
|
# But I don't want it visible in the capture region
|
|
elif event.type == pygame.MOUSEWHEEL:
|
|
a.threshold += event.y * 100
|
|
print(a.threshold)
|
|
# RMB to open load file dialog for profile
|
|
elif event.type == pygame.MOUSEBUTTONDOWN:
|
|
if event.button == 3:
|
|
file_types = [("yaml files", "profile.yaml")]
|
|
file_path = filedialog.askopenfilename(title="Select a profile", filetypes=file_types)
|
|
if file_path:
|
|
c.profile = os.path.dirname(file_path)
|
|
rehash = True
|
|
|
|
# Check mic, update image
|
|
if not quit:
|
|
t.update(a.get_state(), t.should_blink())
|