我需要在OpenCV/Python中实时处理视频流和动画数据流。我使用FFMPEG读取文件或流,因为OpenCV不保留klvdata。我使用子处理模块将数据传递给OpenCV。
我的问题是,我不知道如何将视频和视频数据同时映射到同一个子处理管道?
我的代码:
#!/usr/bin/env python3
import sys, json, klvdata;
from subprocess import PIPE
import subprocess as sp
import cv2
import numpy
command = ['ffmpeg',
'-i', 'DayFlight.mpg',
'-map', '0:0',
'-map', '0:d',
'-pix_fmt', 'bgr24',
'-c:v', 'rawvideo',
'-an','-sn',
'-f', 'image2pipe', '-',
'-c:d', 'copy',
'-f','data',
]
pipe = sp.Popen(command, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE, bufsize=10**8)
while True:
raw_image = pipe.stdout.read(1280*720*3)
image = numpy.fromstring(raw_image, dtype='uint8')
image = image.reshape((720,1280,3))
if image is not None:
cv2.imshow('Video', image)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
for packet in klvdata.StreamParser(pipe.stdout):
metadata = packet.MetadataList()
print(metadata)
pipe.stdout.flush()
cv2.destroyAllWindows()产生以下错误:
Traceback (most recent call last):
File "test_cv.py", line 32, in <module>
metadata = packet.MetadataList()
AttributeError: 'UnknownElement' object has no attribute 'MetadataList'任何帮助都是非常感谢的。
发布于 2022-03-15 22:14:10
为了分割视频和数据,我们可以将视频流映射到stderr管道,将KLV数据流映射到stdout管道。
在我的以下答案中,同样的技术用于分离视频和音频。
当每个视频帧具有私有KLV数据(按顺序顺序同步)时,视频帧与相应数据之间的精确同步相对简单。
日Flight.mpg示例文件的数据包比帧少得多,使用建议的解决方案是不可能实现精确同步的(我认为使用管道方法是不可能的)。
我们仍然可以应用一些粗略的同步-假设数据和帧是在时间附近读取的。
建议的分割视频和数据的方法:
-----------
--->| Raw Video | ---> stderr (pipe)
----------- ------------- | -----------
| Input | | FFmpeg | |
| Video with| ---> | sub-process | ---
| Data | | | |
----------- ------------- | -----------
--->| KLV data | ---> stdout (pipe)
-----------视频和数据在两个单独的线程中读取:
根据维基百科,KLV格式没有很好地定义:
键的长度可以是1、2、4或16字节。 想必在一个单独的规范文档中,您会就给定应用程序的密钥长度达成一致。
在示例视频中,密钥长度为16字节,但不能保证.
从stdout管道读取KLV数据:
当从管道读取数据时(以类似于实时的方式),我们需要知道需要读取的字节数。
这迫使我们对KLV数据进行部分解析:
读取密钥、长度和数据后,我们有一个"KLV数据包“,可以发送到KLV数据解析器。
下面是一个使用Day Flight.mpg示例输入文件的代码示例:
#!/usr/bin/env python3
import klvdata
import subprocess as sp
import shlex
import threading
import numpy as np
import cv2
from io import BytesIO
# Video reader thread.
def video_reader(pipe):
cols, rows = 1280, 720 # Assume we know frame size is 1280x720
counter = 0
while True:
raw_image = pipe.read(cols*rows*3) # Read raw video frame
# Break the loop when length is too small
if len(raw_image) < cols*rows*3:
break
if (counter % 60) == 0:
# Show video frame evey 60 frames
image = np.frombuffer(raw_image, np.uint8).reshape([rows, cols, 3])
cv2.imshow('Video', image) # Show video image for testing
cv2.waitKey(1)
counter += 1
# https://github.com/paretech/klvdata/tree/master/klvdata
def bytes_to_int(value, signed=False):
"""Return integer given bytes."""
return int.from_bytes(bytes(value), byteorder='big', signed=signed)
# Data reader thread (read KLV data).
def data_reader(pipe):
key_length = 16 # Assume key length is 16 bytes.
f = open('data.bin', 'wb') # For testing - store the KLV data to data.bin (binary file)
while True:
# https://en.wikipedia.org/wiki/KLV
# The first few bytes are the Key, much like a key in a standard hash table data structure.
# Keys can be 1, 2, 4, or 16 bytes in length.
# Presumably in a separate specification document you would agree on a key length for a given application.
key = pipe.read(key_length) # Read the key
if len(key) < key_length:
break # Break the loop when length is too small
f.write(key) # Write data to binary file for testing
# https://github.com/paretech/klvdata/tree/master/klvdata
# Length field
len_byte = pipe.read(1)
if len(len_byte) < 1:
break # Break the loop when length is too small
f.write(len_byte) # Write data to binary file for testing
byte_length = bytes_to_int(len_byte)
# https://github.com/paretech/klvdata/tree/master/klvdata
if byte_length < 128:
# BER Short Form
length = byte_length
ber_len_bytes = b''
else:
# BER Long Form
ber_len = byte_length - 128
ber_len_bytes = pipe.read(ber_len)
if len(ber_len_bytes) < ber_len:
break # Break the loop when length is too small
f.write(ber_len_bytes) # Write ber_len_bytes to binary file for testing
length = bytes_to_int(ber_len_bytes)
# Read the value (length bytes)
value = pipe.read(length)
if len(value) < length:
break # Break the loop when length is too small
f.write(value) # Write data to binary file for testing
klv_data = key + len_byte + ber_len_bytes + value # Concatenate key length and data
klv_data_as_bytes_io = BytesIO(klv_data) # Wrap klv_data with BytesIO (before parsing)
# Parse the KLV data
for packet in klvdata.StreamParser(klv_data_as_bytes_io):
metadata = packet.MetadataList()
print(metadata)
print() # New line
# Execute FFmpeg as sub-process
# Map the video to stderr and map the data to stdout
process = sp.Popen(shlex.split('ffmpeg -hide_banner -loglevel quiet ' # Set loglevel to quiet for disabling the prints ot stderr
'-i "Day Flight.mpg" ' # Input video "Day Flight.mpg"
'-map 0:v -c:v rawvideo -pix_fmt bgr24 -f:v rawvideo pipe:2 ' # rawvideo format is mapped to stderr pipe (raw video codec with bgr24 pixel format)
'-map 0:d -c copy -copy_unknown -f:d data pipe:1 ' # Copy the data without ddecoding.
'-report'), # Create a log file (because we can't the statuses that are usually printed to stderr).
stdout=sp.PIPE, stderr=sp.PIPE)
# Start video reader thread (pass stderr pipe as argument).
video_thread = threading.Thread(target=video_reader, args=(process.stderr,))
video_thread.start()
# Start data reader thread (pass stdout pipe as argument).
data_thread = threading.Thread(target=data_reader, args=(process.stdout,))
data_thread.start()
# Wait for threads (and process) to finish.
video_thread.join()
data_thread.join()
process.wait()上面的代码将数据保存到data.bin (用于测试)。
data.bin可用于一致性检查。
执行FFmpeg CLI提取数据流:
ffmpeg -y -i "Day Flight.mpg" -map 0:d -c copy -copy_unknown -f data raw.bin验证raw.bin和data.bin文件是否相等。
https://stackoverflow.com/questions/71464433
复制相似问题