import array import config import os import pyaudio import random import typing import tkinter from tkinter import filedialog # Hide Pygame "Hello world" stuff. Why does this require an envvar :/ os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "1" import pygame from pygame.locals import * __author__ = "Trysdyn Black" __license__ = "ACSL v1.4" __version__ = "1.0.0" __copyright__ = "Copyright 2022, Trysdyn Black" class Audio: """Audio subsystem that monitors a microphone and determines avatar state. Methods ------- get_state() -> bool Evaluate buffered audio data and return true if the average amplitude is over the configured threshold. Indicates the mic is hot. Also drains the audio buffer. """ def __init__(self, channels: int, rate: int, frames: int, threshold: int = 5000, smoothing: int = 0) -> None: """ Parameters ---------- channels : int Number of audio channels in the input device, usually 1 or 2 for mono/stereo rate : int Baud rate of the audio device, typically 44100 or 48000 as configured in Windows frames : int Number of times a second to check the audio buffer. The buffer is fully drained each check threshold : int Audio amplitude at which we consider the mic hot smoothing : int Number of frames we continue to consider the mic hot after it stops being hot """ p = pyaudio.PyAudio() self.frames = frames self.stream = p.open( format=pyaudio.paInt16, channels=channels, rate=rate, input=True, frames_per_buffer=frames, ) self.threshold = threshold self.smoothing = smoothing self.this_smooth = 0 def get_state(self) -> bool: """Check audio buffer and indicate if the avatar's mouth should be open this frame. Returns ------- bool Returns true if the avatar's mouth should be considered open this frame """ # Grab a window of mic data into integer representations data = self.stream.read(self.frames, exception_on_overflow=False) a_data = array.array("h", data) # Return true if we're over threshold or it's been fewer than smooth # checks since the last time we were over threshold. if max(a_data) > self.threshold: self.this_smooth = self.smoothing return True elif self.this_smooth: self.this_smooth -= 1 return True return False class Tube: """Encapsulates avatar and windowing logic, handles drawing. Methods ------- should_blink() -> bool Evaluate all available settings and data and return true if the avatar should be blinking this frame. update() Perform one frame of checks, logic, and prep and draw the avatar. """ def __init__( self, image_closed: str, image_open: typing.Optional[str] = None, image_blink_closed: typing.Optional[str] = None, image_blink_open: typing.Optional[str] = None, blink_chance: int = 0, blink_frames: int = 0, shake_intensity: int = 0, shake_delay: int = 0, win_size: typing.Optional[tuple] = None, bg_color: tuple = (0, 255, 0), ) -> None: """ Parameters ---------- image_closed : string Filename for the image to load for a opened eyes/closed mouth avatar state. Mandatory. image_open : string or None Filename for the image to load for a opened eyes/opened mouth avatar state. Defaults to None. image_blink_closed : string or None Filename for the image to load for a closed eyes/closed mouth avatar state. Defaults to None. image_blink_open : string Filename for the image to load for a closed eyes/opened mouth avatar state. Defaults to None. blink_chance : float A chance, out of 1.0, to activate blinking each frame of rendering. Defaults to 0 to disable blinking. blink_frames : int How many frames to hold the eyes-closed state after activating a blink. Defaults to 0 to disable blinking. shake_intensity : int How far in pixels to limit movement on each axis when shaking. Defaults to 0 to disable shaking. shake_delay : int How many frames to prevent shaking after completing a prior shake. Defaults to 0. win_size : tuple(int, int) or None Window size as a tuple of ints (width, height). Defaults to None, which uses the image size of image_closed instead. bg_color : tuple(int, int, int) Background color as a tuple of ints (red, green, blue). Defaults to (0, 255, 0) for greenscreen green. """ self.open_frames = 0 self.blinked_frames = 0 # These display settings are temporary. We can't load images without it self.display = pygame.display.set_mode((400, 300)) pygame.display.set_caption("pyngtube") # Closed mouth image is mandatory self.image_closed = pygame.image.load(image_closed).convert_alpha() # Open mouth image is optional if image_open: self.image_open = pygame.image.load(image_open).convert_alpha() else: self.image_open = None # Blinking images are optional if image_blink_closed: self.image_blink_closed = pygame.image.load(image_blink_closed).convert_alpha() else: self.image_blink_closed = None if image_blink_open: self.image_blink_open = pygame.image.load(image_blink_open).convert_alpha() else: self.image_blink_open = None # Establish our blink settings self.blink_chance = blink_chance self.blink_frames = blink_frames # If we only have a closed-mouth image (why?) use it for open-mouth too using_image_open = self.image_open if self.image_open else self.image_closed # State map for which image to use. (Mouth Opened?, Blinked?) # If we don't have blinked images, we use the non-blinked images for those slots self.state_map = { (False, False): self.image_closed, (True, False): using_image_open, (False, True): self.image_blink_closed if self.image_blink_closed else self.image_closed, (True, True): self.image_blink_open if self.image_blink_open else using_image_open, } # If we specified a window size, use it. If not use closed image size # Then we pad by 10px to allow for shaking if not win_size: win_size = self.image_closed.get_size() self.win_size = ( win_size[0] + (shake_intensity * 2), win_size[1] + (shake_intensity * 2), ) # Background color to blank with, likely a greenscreen color self.bg_color = bg_color # Shaking intensity. Numer of frames to wait before shaking self.shake_intensity = shake_intensity self.shake_delay = shake_delay # Create our drawing buffer and initialize true display size self.buf = pygame.surface.Surface(self.image_closed.get_size()) self.display = pygame.display.set_mode(self.win_size, pygame.RESIZABLE) def should_blink(self) -> bool: """Check if we're currently blinking and check blink_chance to indicate if we should be blinking this frame. Returns ------- bool Returns true if the avatar's eyes should be considered closed this frame. """ # If we're blinking and haven't reached the configured blink duration yet # extend blinking by a frame to keep the eyes shut. # Otherwise roll blink chance and set blink if the roll wins. if self.blinked_frames and self.blinked_frames < self.blink_frames: self.blinked_frames += 1 return True else: if random.random() < self.blink_chance: self.blinked_frames += 1 return True self.blinked_frames = 0 return False def update(self, opened: bool, blinked: bool) -> None: """Perform updates and drawing for one frame. This should be called once a frame and fed avatar state info on open mouth and blinking. This handles all the other placement, style, and drawing functions from there. Parameters ---------- opened : bool true if the avatar's mouth should be open this frame. blinked : bool true if the avatar's eyes should be closed this frame. """ # Blank with bg_color self.display.fill(self.bg_color) self.buf.fill(self.bg_color) # Jitter tracks our offset from origin for shaking # It's applied as a transformation to image location each frame jitter = (0, 0) i = self.shake_intensity # Figure out which image we're drawing this frame using our state map this_frame_image = self.state_map[(opened, blinked)] # If the mic state is opened and an open mouth image exists, blit it # to the drawing buffer. Otherwise use the closed mouth image. if opened and self.image_open: self.open_frames += 1 # Randomize our jitter if we're currently shaking if self.shake_delay and not self.open_frames % self.shake_delay: jitter = (random.randint(-i, i), random.randint(-i, i)) else: self.open_frames = 0 jitter = (0, 0) # Draw our chosen image to a drawing buffer self.buf.blit(this_frame_image, (0, 0)) # Resize the drawing buffer to the window size if necessary # Scale on the vertical axis to maximum window size. # If we're configured to shake the sprite on open mic, randomly # jitter the sprite around the frame a bit while mic is open. display_size = self.display.get_size() shrunk_size = (display_size[0] - (i * 2), display_size[1] - (i * 2)) if self.buf.get_size() != shrunk_size: new_buf = pygame.transform.scale(self.buf, shrunk_size) self.display.blit(new_buf, (i + jitter[0], i + jitter[1])) else: self.display.blit(self.buf, (i + jitter[0], i + jitter[1])) pygame.display.flip() if __name__ == "__main__": # I use some not-great extreme shorthand in the main loop here: # c = config object # a = audio subsystem object # t = pngtube object, the window and avatar object quit = False rehash = True root = tkinter.Tk() root.withdraw() pygame.init() # Load config c = config.Config() # Initialize audio system a = Audio( channels=2 if c.mic_stereo else 1, rate=c.mic_rate, frames=int(c.mic_rate / c.audio_checks), threshold=c.threshold, smoothing=c.smoothing, ) while not quit: # Reload our avatar if we've been instructed to rehash settings if rehash: t = Tube( image_closed=c.image_closed, image_open=c.image_open, image_blink_closed=c.image_blink_closed, image_blink_open=c.image_blink_open, blink_chance=c.blink_chance, blink_frames=c.blink_frames, bg_color=c.bg_color, shake_delay=c.shake_delay, shake_intensity=c.shake_intensity, ) rehash = False for event in pygame.event.get(): # Clean shutdown if user clicks the [X] if event.type == QUIT: pygame.quit() a.stream.close() quit = True break # If the window gets resized, handle redrawing the window in the new size elif event.type == pygame.VIDEORESIZE: # Scale only on vertical axis. Force horizontal axis to match # the drawing buffer's aspect ratio. win_size = (event.w, event.h) buf_ratio = t.buf.get_width() / t.buf.get_height() t.win_size = (win_size[1] * buf_ratio, win_size[1]) t.display = pygame.display.set_mode(t.win_size, pygame.RESIZABLE) # Mouse wheel up and down changes mic threshold. # TODO: Provide some kind of user feedback # But I don't want it visible in the capture region elif event.type == pygame.MOUSEWHEEL: a.threshold += event.y * 100 print(a.threshold) # RMB to open load file dialog for profile elif event.type == pygame.MOUSEBUTTONDOWN: if event.button == 3: file_types = [("yaml files", "profile.yaml")] file_path = filedialog.askopenfilename(title="Select a profile", filetypes=file_types) if file_path: c.profile = os.path.dirname(file_path) rehash = True # Check mic, update image if not quit: t.update(a.get_state(), t.should_blink())