persons are detect by resnet
pose estimations are applied to different person
#pose_tfod.py
import os
import time
import tensorflow as tf
import tarfile
import urllib.request
from object_detection.utils import label_map_util
from object_detection.utils import config_util
from object_detection.utils import visualization_utils as viz_utils
from object_detection.builders import model_builder
import cv2
import numpy as np
DATA_DIR = os.path.join(os.getcwd(), 'googleNet')
MODELS_DIR = os.path.join(DATA_DIR, 'modelZoo')
print(MODELS_DIR)
for dir in [DATA_DIR, MODELS_DIR]:
if not os.path.exists(dir):
os.mkdir(dir)
# Download and extract model
MODEL_DATE = '20200711'
MODEL_NAME = 'centernet_resnet101_v1_fpn_512x512_coco17_tpu-8'
MODEL_TAR_FILENAME = MODEL_NAME + '.tar.gz'
MODELS_DOWNLOAD_BASE = 'http://download.tensorflow.org/models/object_detection/tf2/'
MODEL_DOWNLOAD_LINK = MODELS_DOWNLOAD_BASE + MODEL_DATE + '/' + MODEL_TAR_FILENAME
PATH_TO_MODEL_TAR = os.path.join(MODELS_DIR, MODEL_TAR_FILENAME)
PATH_TO_CKPT = os.path.join(MODELS_DIR, os.path.join(MODEL_NAME, 'checkpoint/'))
PATH_TO_CFG = os.path.join(MODELS_DIR, os.path.join(MODEL_NAME, 'pipeline.config'))
if not os.path.exists(PATH_TO_CKPT):
print('Downloading model. This may take a while... ', end='')
urllib.request.urlretrieve(MODEL_DOWNLOAD_LINK, PATH_TO_MODEL_TAR)
tar_file = tarfile.open(PATH_TO_MODEL_TAR)
tar_file.extractall(MODELS_DIR)
tar_file.close()
os.remove(PATH_TO_MODEL_TAR)
print('Done')
# Download labels file
LABEL_FILENAME = 'mscoco_label_map.pbtxt'
LABELS_DOWNLOAD_BASE = \
'https://raw.githubusercontent.com/tensorflow/models/master/research/object_detection/data/'
PATH_TO_LABELS = os.path.join(MODELS_DIR, os.path.join(MODEL_NAME, LABEL_FILENAME))
if not os.path.exists(PATH_TO_LABELS):
print('Downloading label file... ', end='')
print(PATH_TO_LABELS)
urllib.request.urlretrieve(LABELS_DOWNLOAD_BASE + LABEL_FILENAME, PATH_TO_LABELS)
print('Done')
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # Suppress TensorFlow logging
tf.get_logger().setLevel('ERROR') # Suppress TensorFlow logging (2)
# Enable GPU dynamic memory allocation
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
# Load pipeline config and build a detection model
configs = config_util.get_configs_from_pipeline_file(PATH_TO_CFG)
model_config = configs['model']
detection_model = model_builder.build(model_config=model_config, is_training=False)
# Restore checkpoint
ckpt = tf.compat.v2.train.Checkpoint(model=detection_model)
ckpt.restore(os.path.join(PATH_TO_CKPT, 'ckpt-0')).expect_partial()
@tf.function
def detect_fn(image):
"""Detect objects in image."""
image, shapes = detection_model.preprocess(image)
prediction_dict = detection_model.predict(image, shapes)
detections = detection_model.postprocess(prediction_dict, shapes)
return detections, prediction_dict, tf.reshape(shapes, [-1])
category_index = label_map_util.create_category_index_from_labelmap(PATH_TO_LABELS,
use_display_name=True)
#print(category_index)
BODY_PARTS = {"Nose": 0, "Neck": 1, "RShoulder": 2, "RElbow": 3, "RWrist": 4,
"LShoulder": 5, "LElbow": 6, "LWrist": 7, "RHip": 8, "RKnee": 9,
"RAnkle": 10, "LHip": 11, "LKnee": 12, "LAnkle": 13, "REye": 14,
"LEye": 15, "REar": 16, "LEar": 17, "Background": 18}
POSE_PAIRS = [["Neck", "RShoulder"], ["Neck", "LShoulder"], ["RShoulder", "RElbow"],
["RElbow", "RWrist"], ["LShoulder", "LElbow"], ["LElbow", "LWrist"],
["Neck", "RHip"], ["RHip", "RKnee"], ["RKnee", "RAnkle"], ["Neck", "LHip"],
["LHip", "LKnee"], ["LKnee", "LAnkle"], ["Neck", "Nose"], ["Nose", "REye"],
["REye", "REar"], ["Nose", "LEye"], ["LEye", "LEar"]]
inWidth = 368
inHeight = 368
net = cv2.dnn.readNetFromTensorflow("googleNet/graph_opt.pb")
# Using OpenCV to initialize the webcam
cap = cv2.VideoCapture("assets/mma.mp4")
# Define the codec and create VideoWriter object
fps = 20
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
size = (int(frame_width), int(frame_height))
fourcc = cv2.VideoWriter_fourcc('m', 'p', '4', 'v')
path = 'C:/Users/zchen/PycharmProjects/opencv/googleNet/record'
out = cv2.VideoWriter()
success = out.open(os.path.join(path, "mma.mov"), fourcc, fps, size, True)
frame_count = 0
t1 = time.time()
while cap.isOpened():
ret, image_np = cap.read()
input_tensor = tf.convert_to_tensor(np.expand_dims(image_np, 0), dtype=tf.float32)
detections, predictions_dict, shapes = detect_fn(input_tensor)
label_id_offset = 1
image_np_with_detections = image_np.copy()
"""
viz_utils.visualize_boxes_and_labels_on_image_array(
image_np_with_detections,
detections['detection_boxes'][0].numpy(),
(detections['detection_classes'][0].numpy() + label_id_offset).astype(int),
detections['detection_scores'][0].numpy(),
category_index,
use_normalized_coordinates=True,
max_boxes_to_draw=200,
min_score_thresh=.50,
line_thickness=1,
agnostic_mode=False)
"""
categories = (detections['detection_classes'][0].numpy() + label_id_offset).astype(int)
boxes = detections['detection_boxes'][0].numpy()
scores = detections['detection_scores'][0].numpy()
#print(categories, boxes, scores)
for i, category in enumerate(categories):
if category == 1 and scores[i] > 0.5:
start_x = int(frame_width * boxes[i][1])
start_y = int(frame_height * boxes[i][0])
end_x = int(frame_width * boxes[i][3])
end_y = int(frame_height * boxes[i][2])
#print(start_x, start_y, end_x, end_y)
"""
cv2.rectangle(image_np_with_detections,
(start_x, start_y),
(end_x, end_y),
(0, 255, 255), 1)
"""
person = image_np_with_detections[start_y:end_y, start_x:end_x]
personHeight, personWidth, channels = person.shape
#print(person.shape)
if personWidth > 0 and personHeight > 0:
net.setInput(
cv2.dnn.blobFromImage(person, 1.0, (inWidth, inHeight), (127.5, 127.5, 127.5), swapRB=True, crop=False))
pose_out = net.forward()
pose_out = pose_out[:, :19, :, :] # MobileNet output [1, 57, -1, -1], we only need the first 19 elements
assert (len(BODY_PARTS) == pose_out.shape[1])
points = []
for j in range(len(BODY_PARTS)):
# Slice heatmap of corresponging body's part.
heatMap = pose_out[0, j, :, :]
# Originally, we try to find all the local maximums. To simplify a sample
# we just find a global one. However only a single pose at the same time
# could be detected this way.
_, conf, _, point = cv2.minMaxLoc(heatMap)
x = (personWidth * point[0]) / pose_out.shape[3]
y = (personHeight * point[1]) / pose_out.shape[2]
# Add a point if it's confidence is higher than threshold.
points.append((int(x), int(y)) if conf > 0.2 else None)
for pair in POSE_PAIRS:
partFrom = pair[0]
partTo = pair[1]
assert (partFrom in BODY_PARTS)
assert (partTo in BODY_PARTS)
idFrom = BODY_PARTS[partFrom]
idTo = BODY_PARTS[partTo]
if points[idFrom] and points[idTo]:
cv2.line(person, points[idFrom], points[idTo], (0, 255, 0), 3)
cv2.ellipse( person, points[idFrom], (3, 3), 0, 0, 360, (0, 0, 255), cv2.FILLED)
cv2.ellipse( person, points[idTo], (3, 3), 0, 0, 360, (0, 0, 255), cv2.FILLED)
#cv2.imshow("person", person)
if cv2.waitKey(1) == ord('p'):
cv2.waitKey(-1) # wait until any key is pressed
cv2.imshow('centernet_resnet101_pose_detection', image_np_with_detections)
out.write(image_np_with_detections)
frame_count += 1
if cv2.waitKey(1) == ord('q'):
break
if cv2.waitKey(1) == ord('p'):
cv2.waitKey(-1) # wait until any key is pressed
t2 = time.time()
# calculate FPS
fps = str(float(frame_count / float(t2 - t1))) + ' FPS'
print("Frames processed: {}".format(frame_count))
print("Elapsed time: {:.2f}".format(float(t2 - t1)))
print("FPS: {}".format(fps))
# Release camera and close windows
cap.release()
cv2.destroyAllWindows()
reference:
resnet
pose estimation
No comments:
Post a Comment