Merge pull request #2323 from zjeffer/mediaplayer-improvements

Mediaplayer improvements
This commit is contained in:
Alexis Rouillard 2023-07-24 08:56:46 +02:00 committed by GitHub
commit 8ce1d15885
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,89 +1,158 @@
#!/usr/bin/env python3
import gi
gi.require_version("Playerctl", "2.0")
from gi.repository import Playerctl, GLib
from gi.repository.Playerctl import Player
import argparse
import logging
import sys
import signal
import gi
import json
gi.require_version('Playerctl', '2.0')
from gi.repository import Playerctl, GLib
import os
from typing import List
logger = logging.getLogger(__name__)
def write_output(text, player):
logger.info('Writing output')
output = {'text': text,
'class': 'custom-' + player.props.player_name,
'alt': player.props.player_name}
sys.stdout.write(json.dumps(output) + '\n')
sys.stdout.flush()
def on_play(player, status, manager):
logger.info('Received new playback status')
on_metadata(player, player.props.metadata, manager)
def on_metadata(player, metadata, manager):
logger.info('Received new metadata')
track_info = ''
if player.props.player_name == 'spotify' and \
'mpris:trackid' in metadata.keys() and \
':ad:' in player.props.metadata['mpris:trackid']:
track_info = 'AD PLAYING'
elif player.get_artist() != '' and player.get_title() != '':
track_info = '{artist} - {title}'.format(artist=player.get_artist(),
title=player.get_title())
else:
track_info = player.get_title()
if player.props.status != 'Playing' and track_info:
track_info = '' + track_info
write_output(track_info, player)
def on_player_appeared(manager, player, selected_player=None):
if player is not None and (selected_player is None or player.name == selected_player):
init_player(manager, player)
else:
logger.debug("New player appeared, but it's not the selected player, skipping")
def on_player_vanished(manager, player):
logger.info('Player has vanished')
sys.stdout.write('\n')
sys.stdout.flush()
def init_player(manager, name):
logger.debug('Initialize player: {player}'.format(player=name.name))
player = Playerctl.Player.new_from_name(name)
player.connect('playback-status', on_play, manager)
player.connect('metadata', on_metadata, manager)
manager.manage_player(player)
on_metadata(player, player.props.metadata, manager)
def signal_handler(sig, frame):
logger.debug('Received signal to stop, exiting')
sys.stdout.write('\n')
logger.info("Received signal to stop, exiting")
sys.stdout.write("\n")
sys.stdout.flush()
# loop.quit()
sys.exit(0)
class PlayerManager:
def __init__(self, selected_player=None):
self.manager = Playerctl.PlayerManager()
self.loop = GLib.MainLoop()
self.manager.connect(
"name-appeared", lambda *args: self.on_player_appeared(*args))
self.manager.connect(
"player-vanished", lambda *args: self.on_player_vanished(*args))
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
self.selected_player = selected_player
self.init_players()
def init_players(self):
for player in self.manager.props.player_names:
if self.selected_player is not None and self.selected_player != player.name:
logger.debug(f"{player.name} is not the filtered player, skipping it")
continue
self.init_player(player)
def run(self):
logger.info("Starting main loop")
self.loop.run()
def init_player(self, player):
logger.info(f"Initialize new player: {player.name}")
player = Playerctl.Player.new_from_name(player)
player.connect("playback-status",
self.on_playback_status_changed, None)
player.connect("metadata", self.on_metadata_changed, None)
self.manager.manage_player(player)
self.on_metadata_changed(player, player.props.metadata)
def get_players(self) -> List[Player]:
return self.manager.props.players
def write_output(self, text, player):
logger.debug(f"Writing output: {text}")
output = {"text": text,
"class": "custom-" + player.props.player_name,
"alt": player.props.player_name}
sys.stdout.write(json.dumps(output) + "\n")
sys.stdout.flush()
def clear_output(self):
sys.stdout.write("\n")
sys.stdout.flush()
def on_playback_status_changed(self, player, status, _=None):
logger.debug(f"Playback status changed for player {player.props.player_name}: {status}")
self.on_metadata_changed(player, player.props.metadata)
def get_first_playing_player(self):
players = self.get_players()
logger.debug(f"Getting first playing player from {len(players)} players")
if len(players) > 0:
# if any are playing, show the first one that is playing
# reverse order, so that the most recently added ones are preferred
for player in players[::-1]:
if player.props.status == "Playing":
return player
# if none are playing, show the first one
return players[0]
else:
logger.debug("No players found")
return None
def show_most_important_player(self):
logger.debug("Showing most important player")
# show the currently playing player
# or else show the first paused player
# or else show nothing
current_player = self.get_first_playing_player()
if current_player is not None:
self.on_metadata_changed(current_player, current_player.props.metadata)
else:
self.clear_output()
def on_metadata_changed(self, player, metadata, _=None):
logger.debug(f"Metadata changed for player {player.props.player_name}")
player_name = player.props.player_name
artist = player.get_artist()
title = player.get_title()
track_info = ""
if player_name == "spotify" and "mpris:trackid" in metadata.keys() and ":ad:" in player.props.metadata["mpris:trackid"]:
track_info = "Advertisement"
elif artist is not None and title is not None:
track_info = f"{artist} - {title}"
else:
track_info = title
if track_info:
if player.props.status == "Playing":
track_info = "" + track_info
else:
track_info = "" + track_info
# only print output if no other player is playing
current_playing = self.get_first_playing_player()
if current_playing is None or current_playing.props.player_name == player.props.player_name:
self.write_output(track_info, player)
else:
logger.debug(f"Other player {current_playing.props.player_name} is playing, skipping")
def on_player_appeared(self, _, player):
logger.info(f"Player has appeared: {player.name}")
if player is not None and (self.selected_player is None or player.name == self.selected_player):
self.init_player(player)
else:
logger.debug(
"New player appeared, but it's not the selected player, skipping")
def on_player_vanished(self, _, player):
logger.info(f"Player {player.props.player_name} has vanished")
self.show_most_important_player()
def parse_arguments():
parser = argparse.ArgumentParser()
# Increase verbosity with every occurrence of -v
parser.add_argument('-v', '--verbose', action='count', default=0)
parser.add_argument("-v", "--verbose", action="count", default=0)
# Define for which player we're listening
parser.add_argument('--player')
# Define for which player we"re listening
parser.add_argument("--player")
parser.add_argument("--enable-logging", action="store_true")
return parser.parse_args()
@ -92,37 +161,22 @@ def main():
arguments = parse_arguments()
# Initialize logging
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG,
format='%(name)s %(levelname)s %(message)s')
if arguments.enable_logging:
logfile = os.path.join(os.path.dirname(
os.path.realpath(__file__)), "media-player.log")
logging.basicConfig(filename=logfile, level=logging.DEBUG,
format="%(asctime)s %(name)s %(levelname)s:%(lineno)d %(message)s")
# Logging is set by default to WARN and higher.
# With every occurrence of -v it's lowered by one
logger.setLevel(max((3 - arguments.verbose) * 10, 0))
# Log the sent command line arguments
logger.debug('Arguments received {}'.format(vars(arguments)))
manager = Playerctl.PlayerManager()
loop = GLib.MainLoop()
manager.connect('name-appeared', lambda *args: on_player_appeared(*args, arguments.player))
manager.connect('player-vanished', on_player_vanished)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
for player in manager.props.player_names:
if arguments.player is not None and arguments.player != player.name:
logger.debug('{player} is not the filtered player, skipping it'
.format(player=player.name)
)
continue
init_player(manager, player)
loop.run()
logger.info("Creating player manager")
if arguments.player:
logger.info(f"Filtering for player: {arguments.player}")
player = PlayerManager(arguments.player)
player.run()
if __name__ == '__main__':
if __name__ == "__main__":
main()