我想用python PyQt5从网络上流式传输mp3文件。我研究了很多,只找到了流式传输wav文件的代码。
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtMultimedia import *
import urllib.request
import threading
import time
class Streamer:
def __init__(self,url):
self.url = url
self.fancy = urllib.request.URLopener()
self.web = self.fancy.open(self.url)
self.content_len = int(self.web.headers["Content-Length"])
self.data = self.web.read(1024*1024)
self.buffer = QBuffer()
self.buffer.writeData(self.data[250:])
self.buffer.open(QBuffer.ReadWrite)
threading.Thread(target=self.stream).start()
self.format = QAudioFormat()
self.format.setSampleRate(48000)
self.format.setChannelCount(2)
self.format.setSampleSize(16)
self.format.setCodec("audio/pcm")
self.format.setByteOrder(QAudioFormat.LittleEndian)
self.format.setSampleType(QAudioFormat.SignedInt)
self.audio = QAudioOutput(self.format)
self.audio.start(self.buffer)
def stream(self):
# while True:
# self.sound_data = self.web.read(1024*1024)
# if not self.sound_data:
# break
# self.buffer.buffer().append(self.sound_data)
# time.sleep(2)
while len(self.data) < self.content_len:
self.sound_data = self.web.read(1024*1024)
self.buffer.buffer().append(self.sound_data)
self.data+=self.sound_data
time.sleep(2)
self.buffer.buffer().clear()
del self.data
if __name__ == "__main__":
app = QApplication([])
streamer = Streamer("https://raw.githubusercontent.com/PremKumarMishra/Stream-Songs/main/Audio.wav")
app.exec_()我检查了,但无法在QAudioFormat.So中添加MPEG-3(mp3编解码器)编解码器,此当前代码不适用于mp3。
发布于 2021-11-25 14:00:22
QMediaPlayer的基本行为应该足以管理简单音频流的缓冲,因为后端会认为100%的缓冲区大小足以保证播放。
如果您希望对缓冲区状态进行更多的控制,则需要实现一个自定义的QIODevice子类来充当QMediaPlayer和下载过程之间的中间层。
在下面的示例中,我使用QNetworkAccessManager下载流,然后将QNetworkReply的readyRead信号连接到一个读取原始字节的函数,并根据当前可用的读取数据和为缓冲区设置的最小大小发出缓冲区状态。
当接收到的数据第一次达到最小大小时,它将开始发出readyRead信号,如果播放器尚未启动(未设置媒体集),它将使用Buffer实例设置媒体,然后准备播放。
from PyQt5 import QtCore, QtWidgets, QtNetwork, QtMultimedia
url = 'https://url.to/stream'
Errors = {}
for k, v in QtMultimedia.QMediaPlayer.__dict__.items():
if isinstance(v, QtMultimedia.QMediaPlayer.Error):
Errors[v] = k
class Buffer(QtCore.QIODevice):
buffering = QtCore.pyqtSignal(object, object)
fullBufferEmitted = False
def __init__(self, reply, minBufferSize=250000):
super().__init__()
self.minBufferSize = max(200000, minBufferSize)
self.reply = reply
self.data = bytes()
# the network reply is on another thread, use a mutex to ensure that
# no simultaneous access is done in the meantime
self.mutex = QtCore.QMutex()
# this is important!
self.setOpenMode(self.ReadOnly|self.Unbuffered)
self.reply.readyRead.connect(self.dataReceived)
def dataReceived(self):
self.mutex.lock()
self.data += self.reply.readAll().data()
dataLen = len(self.data)
self.mutex.unlock()
self.buffering.emit(dataLen, self.minBufferSize)
if not self.fullBufferEmitted:
if dataLen < self.minBufferSize:
return
self.fullBufferEmitted = True
self.readyRead.emit()
def isSequential(self):
return True
def readData(self, size):
self.mutex.lock()
data = self.data[:size]
self.data = self.data[size:]
self.mutex.unlock()
return data
def bytesAvailable(self):
return len(self.data) + super().bytesAvailable()
class Player(QtWidgets.QWidget):
def __init__(self):
super().__init__()
layout = QtWidgets.QVBoxLayout(self)
self.playButton = QtWidgets.QPushButton('Play', enabled=False)
layout.addWidget(self.playButton)
self.volumeSlider = QtWidgets.QSlider(QtCore.Qt.Horizontal)
layout.addWidget(self.volumeSlider)
self.statusLabel = QtWidgets.QLabel('Waiting')
self.statusLabel.setFrameShape(
self.statusLabel.StyledPanel|self.statusLabel.Sunken)
layout.addWidget(self.statusLabel)
self.player = QtMultimedia.QMediaPlayer(volume=16)
self.volumeSlider.setValue(self.player.volume())
self.networkManager = QtNetwork.QNetworkAccessManager()
self.url = QtCore.QUrl(url)
self.media = QtMultimedia.QMediaContent(self.url)
reply = self.networkManager.get(QtNetwork.QNetworkRequest(self.url))
self.buffer = Buffer(reply)
self.playButton.clicked.connect(self.play)
self.volumeSlider.valueChanged.connect(self.player.setVolume)
self.player.error.connect(self.error)
self.buffer.buffering.connect(self.buffering)
def error(self, error):
errorStr = 'Error: {} ({})'.format(
Errors.get(error, 'Unknown error'), int(error))
self.statusLabel.setText(errorStr)
print(errorStr)
def buffering(self, loaded, minBufferSize):
self.statusLabel.setText('Buffer: {}%'.format(int(loaded / minBufferSize * 100)))
if self.player.media().isNull() and loaded >= minBufferSize:
self.player.setMedia(self.media, self.buffer)
self.playButton.setEnabled(True)
self.playButton.setFocus()
self.statusLabel.setText('Ready to play')
def play(self):
if self.player.state() == self.player.PlayingState:
self.player.pause()
self.playButton.setText('Play')
else:
self.player.play()
self.playButton.setText('Pause')
app = QtWidgets.QApplication([])
w = Player()
w.show()
app.exec_()请注意:
https://stackoverflow.com/questions/70100233
复制相似问题