我正在跟踪这个教程。在克隆了她的存储库并使"track“命令正常工作之后,我想尝试集成一个扫描功能。
我进入了她的manager.py脚本,并在set_servos函数中添加了我的扫描过程,如下所示(粗体)。这在servos_process中运行:
import logging
from multiprocessing import Value, Process, Manager, Queue
import pantilthat as pth
import signal
import sys
import time
import RPi.GPIO as GPIO
from rpi_deep_pantilt.detect.util.visualization import visualize_boxes_and_labels_on_image_array
from rpi_deep_pantilt.detect.camera import run_pantilt_detect
from rpi_deep_pantilt.control.pid import PIDController
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(8,GPIO.OUT)
logging.basicConfig()
LOGLEVEL = logging.getLogger().getEffectiveLevel()
RESOLUTION = (320, 320)
SERVO_MIN = -90
SERVO_MAX = 90
CENTER = (
RESOLUTION[0] // 2,
RESOLUTION[1] // 2
)
# function to handle keyboard interrupt
def signal_handler(sig, frame):
# print a status message
print("[INFO] You pressed `ctrl + c`! Exiting...")
# disable the servos
pth.servo_enable(1, False)
pth.servo_enable(2, False)
GPIO.output(8,GPIO.LOW)
# exit
sys.exit()
def in_range(val, start, end):
# determine the input value is in the supplied range
return (val >= start and val <= end)
def set_servos(pan, tilt, scan):
# signal trap to handle keyboard interrupt
signal.signal(signal.SIGINT, signal_handler)
**
#visualize_boxes_and_labels_on_image_array()
print(scan.value) # output: 't'
while scan.value == 't':**
print('Scanning')
pth.servo_one(90)
pth.servo_two(25)
time.sleep(10)
pth.servo_one(30)
pth.servo_two(25)
time.sleep(10)
pth.servo_one(-30)
pth.servo_two(25)
time.sleep(10)
pth.servo_one(-90)
pth.servo_two(25)
time.sleep(10)
pth.servo_one(-30)
pth.servo_two(25)
time.sleep(10)
pth.servo_one(30)
pth.servo_two(25)
pth.time.sleep(10)
pth.servo_one(90)
pth.servo_two(25)
time.sleep(10)
continue
while True:
pan_angle = -1 * pan.value
tilt_angle = tilt.value
# if the pan angle is within the range, pan
if in_range(pan_angle, SERVO_MIN, SERVO_MAX):
pth.pan(pan_angle)
else:
logging.info(f'pan_angle not in range {pan_angle}')
if in_range(tilt_angle, SERVO_MIN, SERVO_MAX):
pth.tilt(tilt_angle)
else:
logging.info(f'tilt_angle not in range {tilt_angle}')
def pid_process(output, p, i, d, box_coord, origin_coord, action):
# signal trap to handle keyboard interrupt
signal.signal(signal.SIGINT, signal_handler)
# create a PID and initialize it
p = PIDController(p.value, i.value, d.value)
p.reset()
# loop indefinitely
while True:
error = origin_coord - box_coord.value
output.value = p.update(error)
# logging.info(f'{action} error {error} angle: {output.value}')
def pantilt_process_manager(
model_cls,
labels=('Raspi',),
rotation=0
):
pth.servo_enable(1, True)
pth.servo_enable(2, True)
with Manager() as manager:
**scan = manager.Value('c', 't')**
# set initial bounding box (x, y)-coordinates to center of frame
center_x = manager.Value('i', 0)
center_y = manager.Value('i', 0)
center_x.value = RESOLUTION[0] // 2
center_y.value = RESOLUTION[1] // 2
# pan and tilt angles updated by independent PID processes
pan = manager.Value('i', 0)
tilt = manager.Value('i', 0)
# PID gains for panning
pan_p = manager.Value('f', 0.05)
# 0 time integral gain until inferencing is faster than ~50ms
pan_i = manager.Value('f', 0.1)
pan_d = manager.Value('f', 0)
# PID gains for tilting
tilt_p = manager.Value('f', 0.15)
# 0 time integral gain until inferencing is faster than ~50ms
tilt_i = manager.Value('f', 0.2)
tilt_d = manager.Value('f', 0)
**detect_processr = Process(target=run_pantilt_detect,
args=(center_x, center_y, labels, model_cls, rotation, scan))**
pan_process = Process(target=pid_process,
args=(pan, pan_p, pan_i, pan_d, center_x, CENTER[0], 'pan'))
tilt_process = Process(target=pid_process,
args=(tilt, tilt_p, tilt_i, tilt_d, center_y, CENTER[1], 'tilt'))
**servo_process = Process(target=set_servos, args=(pan, tilt, scan))**
detect_processr.start()
pan_process.start()
tilt_process.start()
servo_process.start()
detect_processr.join()
pan_process.join()
tilt_process.join()
servo_process.join()
if __name__ == '__main__':
pantilt_process_manager()在一个名为visualization.py的单独脚本中,visualize_boxes_and_labels_on_image_array函数中有一条语句,负责在检测到对象后,在摄像机馈送上覆盖边框(以粗体结尾)。这在detect_processr中运行:
# python
import collections
import logging
# lib
import numpy as np
import PIL.Image as Image
import PIL.ImageColor as ImageColor
import PIL.ImageDraw as ImageDraw
import PIL.ImageFont as ImageFont
import six
import RPi.GPIO as GPIO
import time
from time import sleep
import pantilthat as pth
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(8,GPIO.OUT)
STANDARD_COLORS = [
'AliceBlue', 'Chartreuse', 'Aqua', 'Aquamarine', 'Azure', 'Beige', 'Bisque',
'BlanchedAlmond', 'BlueViolet', 'BurlyWood', 'CadetBlue', 'AntiqueWhite',
'Chocolate', 'Coral', 'CornflowerBlue', 'Cornsilk', 'Crimson', 'Cyan',
'DarkCyan', 'DarkGoldenRod', 'DarkGrey', 'DarkKhaki', 'DarkOrange',
'DarkOrchid', 'DarkSalmon', 'DarkSeaGreen', 'DarkTurquoise', 'DarkViolet',
'DeepPink', 'DeepSkyBlue', 'DodgerBlue', 'FireBrick', 'FloralWhite',
'ForestGreen', 'Fuchsia', 'Gainsboro', 'GhostWhite', 'Gold', 'GoldenRod',
'Salmon', 'Tan', 'HoneyDew', 'HotPink', 'IndianRed', 'Ivory', 'Khaki',
'Lavender', 'LavenderBlush', 'LawnGreen', 'LemonChiffon', 'LightBlue',
'LightCoral', 'LightCyan', 'LightGoldenRodYellow', 'LightGray', 'LightGrey',
'LightGreen', 'LightPink', 'LightSalmon', 'LightSeaGreen', 'LightSkyBlue',
'LightSlateGray', 'LightSlateGrey', 'LightSteelBlue', 'LightYellow', 'Lime',
'LimeGreen', 'Linen', 'Magenta', 'MediumAquaMarine', 'MediumOrchid',
'MediumPurple', 'MediumSeaGreen', 'MediumSlateBlue', 'MediumSpringGreen',
'MediumTurquoise', 'MediumVioletRed', 'MintCream', 'MistyRose', 'Moccasin',
'NavajoWhite', 'OldLace', 'Olive', 'OliveDrab', 'Orange', 'OrangeRed',
'Orchid', 'PaleGoldenRod', 'PaleGreen', 'PaleTurquoise', 'PaleVioletRed',
'PapayaWhip', 'PeachPuff', 'Peru', 'Pink', 'Plum', 'PowderBlue', 'Purple',
'Red', 'RosyBrown', 'RoyalBlue', 'SaddleBrown', 'Green', 'SandyBrown',
'SeaGreen', 'SeaShell', 'Sienna', 'Silver', 'SkyBlue', 'SlateBlue',
'SlateGray', 'SlateGrey', 'Snow', 'SpringGreen', 'SteelBlue', 'GreenYellow',
'Teal', 'Thistle', 'Tomato', 'Turquoise', 'Violet', 'Wheat', 'White',
'WhiteSmoke', 'Yellow', 'YellowGreen'
]
def _get_multiplier_for_color_randomness():
num_colors = len(STANDARD_COLORS)
prime_candidates = [5, 7, 11, 13, 17]
# Remove all prime candidates that divide the number of colors.
prime_candidates = [p for p in prime_candidates if num_colors % p]
if not prime_candidates:
return 1
# Return the closest prime number to num_colors / 10.
abs_distance = [np.abs(num_colors / 10. - p) for p in prime_candidates]
num_candidates = len(abs_distance)
inds = [i for _, i in sorted(zip(abs_distance, range(num_candidates)))]
return prime_candidates[inds[0]]
def draw_mask_on_image_array(image, mask, color='red', alpha=0.4):
if image.dtype != np.uint8:
raise ValueError('`image` not of type np.uint8')
if mask.dtype != np.uint8:
raise ValueError('`mask` not of type np.uint8')
if np.any(np.logical_and(mask != 1, mask != 0)):
raise ValueError('`mask` elements should be in [0, 1]')
if image.shape[:2] != mask.shape:
raise ValueError('The image has spatial dimensions %s but the mask has '
'dimensions %s' % (image.shape[:2], mask.shape))
rgb = ImageColor.getrgb(color)
pil_image = Image.fromarray(image)
solid_color = np.expand_dims(
np.ones_like(mask), axis=2) * np.reshape(list(rgb), [1, 1, 3])
pil_solid_color = Image.fromarray(np.uint8(solid_color)).convert('RGBA')
pil_mask = Image.fromarray(np.uint8(255.0*alpha*mask)).convert('L')
pil_image = Image.composite(pil_solid_color, pil_image, pil_mask)
np.copyto(image, np.array(pil_image.convert('RGB')))
def draw_bounding_box_on_image(image,
ymin,
xmin,
ymax,
xmax,
color='red',
thickness=4,
display_str_list=(),
use_normalized_coordinates=True):
GPIO.output(8,GPIO.HIGH)
print('Object Detected')
draw = ImageDraw.Draw(image)
im_width, im_height = image.size
if use_normalized_coordinates:
(left, right, top, bottom) = (xmin * im_width, xmax * im_width,
ymin * im_height, ymax * im_height)
else:
(left, right, top, bottom) = (xmin, xmax, ymin, ymax)
draw.line([(left, top), (left, bottom), (right, bottom),
(right, top), (left, top)], width=thickness, fill=color)
try:
font = ImageFont.truetype('arial.ttf', 24)
except IOError:
font = ImageFont.load_default()
# If the total height of the display strings added to the top of the bounding
# box exceeds the top of the image, stack the strings below the bounding box
# instead of above.
display_str_heights = [font.getsize(ds)[1] for ds in display_str_list]
# Each display_str has a top and bottom margin of 0.05x.
total_display_str_height = (1 + 2 * 0.05) * sum(display_str_heights)
if top > total_display_str_height:
text_bottom = top
else:
text_bottom = bottom + total_display_str_height
# Reverse list and print from bottom to top.
for display_str in display_str_list[::-1]:
text_width, text_height = font.getsize(display_str)
margin = np.ceil(0.05 * text_height)
draw.rectangle(
[(left, text_bottom - text_height - 2 * margin), (left + text_width,
text_bottom)],
fill=color)
draw.text(
(left + margin, text_bottom - text_height - margin),
display_str,
fill='black',
font=font)
text_bottom -= text_height - 2 * margin
def draw_bounding_box_on_image_array(image,
ymin,
xmin,
ymax,
xmax,
color='red',
thickness=4,
display_str_list=(),
use_normalized_coordinates=True):
image_pil = Image.fromarray(np.uint8(image)).convert('RGB')
draw_bounding_box_on_image(image_pil, ymin, xmin, ymax, xmax, color,
thickness, display_str_list,
use_normalized_coordinates)
np.copyto(image, np.array(image_pil))
def draw_keypoints_on_image(image,
keypoints,
color='red',
radius=2,
use_normalized_coordinates=True):
draw = ImageDraw.Draw(image)
im_width, im_height = image.size
keypoints_x = [k[1] for k in keypoints]
keypoints_y = [k[0] for k in keypoints]
if use_normalized_coordinates:
keypoints_x = tuple([im_width * x for x in keypoints_x])
keypoints_y = tuple([im_height * y for y in keypoints_y])
for keypoint_x, keypoint_y in zip(keypoints_x, keypoints_y):
draw.ellipse([(keypoint_x - radius, keypoint_y - radius),
(keypoint_x + radius, keypoint_y + radius)],
outline=color, fill=color)
def draw_keypoints_on_image_array(image,
keypoints,
color='red',
radius=2,
use_normalized_coordinates=True):
image_pil = Image.fromarray(np.uint8(image)).convert('RGB')
draw_keypoints_on_image(image_pil, keypoints, color, radius,
use_normalized_coordinates)
np.copyto(image, np.array(image_pil))
def visualize_boxes_and_labels_on_image_array(
image,
boxes,
classes,
scores,
category_index,
** scan, **
instance_masks=None,
instance_boundaries=None,
keypoints=None,
track_ids=None,
use_normalized_coordinates=False,
max_boxes_to_draw=20,
min_score_thresh=.5,
agnostic_mode=False,
line_thickness=4,
groundtruth_box_visualization_color='black',
skip_scores=False,
skip_labels=False,
skip_track_ids=False):
GPIO.output(8,GPIO.LOW)
# Create a display string (and color) for every box location, group any boxes
# that correspond to the same location.
box_to_display_str_map = collections.defaultdict(list)
box_to_color_map = collections.defaultdict(str)
box_to_instance_masks_map = {}
box_to_instance_boundaries_map = {}
box_to_keypoints_map = collections.defaultdict(list)
box_to_track_ids_map = {}
if not max_boxes_to_draw:
max_boxes_to_draw = boxes.shape[0]
for i in range(min(max_boxes_to_draw, boxes.shape[0])):
if scores is None or scores[i] > min_score_thresh:
box = tuple(boxes[i].tolist())
if instance_masks is not None:
box_to_instance_masks_map[box] = instance_masks[i]
if instance_boundaries is not None:
box_to_instance_boundaries_map[box] = instance_boundaries[i]
if keypoints is not None:
box_to_keypoints_map[box].extend(keypoints[i])
if track_ids is not None:
box_to_track_ids_map[box] = track_ids[i]
if scores is None:
box_to_color_map[box] = groundtruth_box_visualization_color
else:
display_str = ''
if not skip_labels:
if not agnostic_mode:
if classes[i] in six.viewkeys(category_index):
class_name = category_index[classes[i]]['name']
else:
class_name = 'N/A'
display_str = str(class_name)
if not skip_scores:
if not display_str:
display_str = '{}%'.format(int(100*scores[i]))
else:
display_str = '{}: {}%'.format(
display_str, int(100*scores[i]))
if not skip_track_ids and track_ids is not None:
if not display_str:
display_str = 'ID {}'.format(track_ids[i])
else:
display_str = '{}: ID {}'.format(
display_str, track_ids[i])
box_to_display_str_map[box].append(display_str)
if agnostic_mode:
box_to_color_map[box] = 'DarkOrange'
elif track_ids is not None:
prime_multipler = _get_multiplier_for_color_randomness()
box_to_color_map[box] = STANDARD_COLORS[
(prime_multipler * track_ids[i]) % len(STANDARD_COLORS)]
else:
box_to_color_map[box] = STANDARD_COLORS[
classes[i] % len(STANDARD_COLORS)]
# Draw all boxes onto image.
for box, color in box_to_color_map.items():
ymin, xmin, ymax, xmax = box
if instance_masks is not None:
draw_mask_on_image_array(
image,
box_to_instance_masks_map[box],
color=color
)
if instance_boundaries is not None:
draw_mask_on_image_array(
image,
box_to_instance_boundaries_map[box],
color='red',
alpha=1.0
)
draw_bounding_box_on_image_array(
image,
ymin,
xmin,
ymax,
xmax,
color=color,
thickness=line_thickness,
display_str_list=box_to_display_str_map[box],
use_normalized_coordinates=use_normalized_coordinates)
**scan.value = 'f'
print(scan.value) # output: 'f'**
if keypoints is not None:
draw_keypoints_on_image_array(
image,
box_to_keypoints_map[box],
color=color,
radius=line_thickness / 2,
use_normalized_coordinates=use_normalized_coordinates)
return image希望一旦检测到一个对象,扫描功能就会中断。Visualization.py在detect_process中执行,但只在检测到对象时执行。
这些语句正确地从我的print语句中中继如下,但是循环仍然没有中断:
$ rpi-deep-pantilt track Raspi
t
Scanning
Object Detected
f
Object Detected
f
Object Detected
f
Object Detected
f
Object Detected
f
Object Detected
f
Object Detected
f
Object Detected
f
Object Detected
f
Object Detected
f
Object Detected
f
Object Detected
f
Object Detected
f
Object Detected
f
Object Detected
f
Object Detected
f
Object Detected
f
^C发布于 2020-09-03 15:41:00
这里的问题是内存(通常)不是在多处理python程序的各个部分之间共享的。在本例中,scan_on是manager.py脚本中的局部变量,然后在visualization.py脚本中重新实例化。
为了在进程之间共享数据,我们可以使用值。这些是对象化的值,通过简单地将其作为参数传递,从而允许进程之间共享状态。
为什么?当您将整数和布尔值传递到函数中时,会复制像整数和布尔值这样的简单值,而不是在它们的多个实例之间维护状态。然而,Value对象只是传递对值的引用,而不是直接副本。
虽然我不知道你在哪里打电话给visualize_boxes_and_labels_on_image_array(),但这里有一个开始:
scan_on = manager.Value('c', 't') # the 'c' references the unsigned c-char,
# other types found here: https://docs.python.org/3/library/array.html#module-array
# we'll edit your set_servo process to include this variable
servo_process = Process(target=set_servos, args=(pan, tilt, scan_on))无论何时调用visualize_boxes_and_labels_on_image_array(),您都希望传递与 scan_on相同的实例。你不应该重新声明它。若要访问或编辑scan_on的值,请使用.value字段。
print(scan_on.value) # output: 't'
scan_on.value = 'f'
print(scan_on.value) # output: 'f'https://stackoverflow.com/questions/63607576
复制相似问题