Sunday, June 6, 2021

opencv 59 resnet person detection + pose estimation


persons are detect by resnet
pose estimations are applied to different person


#pose_tfod.py
import os
import time
import tensorflow as tf
import tarfile
import urllib.request
from object_detection.utils import label_map_util
from object_detection.utils import config_util
from object_detection.utils import visualization_utils as viz_utils
from object_detection.builders import model_builder
import cv2
import numpy as np

DATA_DIR = os.path.join(os.getcwd(), 'googleNet')
MODELS_DIR = os.path.join(DATA_DIR, 'modelZoo')
print(MODELS_DIR)
for dir in [DATA_DIR, MODELS_DIR]:
    if not os.path.exists(dir):
        os.mkdir(dir)

# Download and extract model
MODEL_DATE = '20200711'
MODEL_NAME = 'centernet_resnet101_v1_fpn_512x512_coco17_tpu-8'
MODEL_TAR_FILENAME = MODEL_NAME + '.tar.gz'
MODELS_DOWNLOAD_BASE = 'http://download.tensorflow.org/models/object_detection/tf2/'
MODEL_DOWNLOAD_LINK = MODELS_DOWNLOAD_BASE + MODEL_DATE + '/' + MODEL_TAR_FILENAME
PATH_TO_MODEL_TAR = os.path.join(MODELS_DIR, MODEL_TAR_FILENAME)
PATH_TO_CKPT = os.path.join(MODELS_DIR, os.path.join(MODEL_NAME, 'checkpoint/'))
PATH_TO_CFG = os.path.join(MODELS_DIR, os.path.join(MODEL_NAME, 'pipeline.config'))
if not os.path.exists(PATH_TO_CKPT):
    print('Downloading model. This may take a while... ', end='')
    urllib.request.urlretrieve(MODEL_DOWNLOAD_LINK, PATH_TO_MODEL_TAR)
    tar_file = tarfile.open(PATH_TO_MODEL_TAR)
    tar_file.extractall(MODELS_DIR)
    tar_file.close()
    os.remove(PATH_TO_MODEL_TAR)
    print('Done')

# Download labels file
LABEL_FILENAME = 'mscoco_label_map.pbtxt'
LABELS_DOWNLOAD_BASE = \
    'https://raw.githubusercontent.com/tensorflow/models/master/research/object_detection/data/'
PATH_TO_LABELS = os.path.join(MODELS_DIR, os.path.join(MODEL_NAME, LABEL_FILENAME))
if not os.path.exists(PATH_TO_LABELS):
    print('Downloading label file... ', end='')
    print(PATH_TO_LABELS)
    urllib.request.urlretrieve(LABELS_DOWNLOAD_BASE + LABEL_FILENAME, PATH_TO_LABELS)
    print('Done')

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'  # Suppress TensorFlow logging
tf.get_logger().setLevel('ERROR')  # Suppress TensorFlow logging (2)

# Enable GPU dynamic memory allocation
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

# Load pipeline config and build a detection model
configs = config_util.get_configs_from_pipeline_file(PATH_TO_CFG)
model_config = configs['model']
detection_model = model_builder.build(model_config=model_config, is_training=False)

# Restore checkpoint
ckpt = tf.compat.v2.train.Checkpoint(model=detection_model)
ckpt.restore(os.path.join(PATH_TO_CKPT, 'ckpt-0')).expect_partial()


@tf.function
def detect_fn(image):
    """Detect objects in image."""

    image, shapes = detection_model.preprocess(image)
    prediction_dict = detection_model.predict(image, shapes)
    detections = detection_model.postprocess(prediction_dict, shapes)

    return detections, prediction_dict, tf.reshape(shapes, [-1])


category_index = label_map_util.create_category_index_from_labelmap(PATH_TO_LABELS,
                                                                    use_display_name=True)
#print(category_index)

BODY_PARTS = {"Nose": 0, "Neck": 1, "RShoulder": 2, "RElbow": 3, "RWrist": 4,
              "LShoulder": 5, "LElbow": 6, "LWrist": 7, "RHip": 8, "RKnee": 9,
              "RAnkle": 10, "LHip": 11, "LKnee": 12, "LAnkle": 13, "REye": 14,
              "LEye": 15, "REar": 16, "LEar": 17, "Background": 18}

POSE_PAIRS = [["Neck", "RShoulder"], ["Neck", "LShoulder"], ["RShoulder", "RElbow"],
              ["RElbow", "RWrist"], ["LShoulder", "LElbow"], ["LElbow", "LWrist"],
              ["Neck", "RHip"], ["RHip", "RKnee"], ["RKnee", "RAnkle"], ["Neck", "LHip"],
              ["LHip", "LKnee"], ["LKnee", "LAnkle"], ["Neck", "Nose"], ["Nose", "REye"],
              ["REye", "REar"], ["Nose", "LEye"], ["LEye", "LEar"]]

inWidth = 368
inHeight = 368

net = cv2.dnn.readNetFromTensorflow("googleNet/graph_opt.pb")

# Using OpenCV to initialize the webcam
cap = cv2.VideoCapture("assets/mma.mp4")

# Define the codec and create VideoWriter object
fps = 20
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
size = (int(frame_width), int(frame_height))
fourcc = cv2.VideoWriter_fourcc('m', 'p', '4', 'v')
path = 'C:/Users/zchen/PycharmProjects/opencv/googleNet/record'
out = cv2.VideoWriter()
success = out.open(os.path.join(path, "mma.mov"), fourcc, fps, size, True)

frame_count = 0
t1 = time.time()

while cap.isOpened():
    ret, image_np = cap.read()
    input_tensor = tf.convert_to_tensor(np.expand_dims(image_np, 0), dtype=tf.float32)
    detections, predictions_dict, shapes = detect_fn(input_tensor)

    label_id_offset = 1
    image_np_with_detections = image_np.copy()

    """
    viz_utils.visualize_boxes_and_labels_on_image_array(
        image_np_with_detections,
        detections['detection_boxes'][0].numpy(),
        (detections['detection_classes'][0].numpy() + label_id_offset).astype(int),
        detections['detection_scores'][0].numpy(),
        category_index,
        use_normalized_coordinates=True,
        max_boxes_to_draw=200,
        min_score_thresh=.50,
        line_thickness=1,
        agnostic_mode=False)
    """

    categories = (detections['detection_classes'][0].numpy() + label_id_offset).astype(int)
    boxes = detections['detection_boxes'][0].numpy()
    scores = detections['detection_scores'][0].numpy()
    #print(categories, boxes, scores)

    for i, category in enumerate(categories):
        if category == 1 and scores[i] > 0.5:
            start_x = int(frame_width * boxes[i][1])
            start_y = int(frame_height * boxes[i][0])
            end_x = int(frame_width * boxes[i][3])
            end_y = int(frame_height * boxes[i][2])
            #print(start_x, start_y, end_x, end_y)

            """
            cv2.rectangle(image_np_with_detections,
                          (start_x, start_y),
                          (end_x, end_y),
                          (0, 255, 255), 1)
            """

            person = image_np_with_detections[start_y:end_y, start_x:end_x]
            personHeight, personWidth, channels = person.shape
            #print(person.shape)

            if personWidth > 0 and personHeight > 0:

                net.setInput(
                    cv2.dnn.blobFromImage(person, 1.0, (inWidth, inHeight), (127.5, 127.5, 127.5), swapRB=True, crop=False))
                pose_out = net.forward()
                pose_out = pose_out[:, :19, :, :]  # MobileNet output [1, 57, -1, -1], we only need the first 19 elements

                assert (len(BODY_PARTS) == pose_out.shape[1])

                points = []
                for j in range(len(BODY_PARTS)):
                    # Slice heatmap of corresponging body's part.
                    heatMap = pose_out[0, j, :, :]

                    # Originally, we try to find all the local maximums. To simplify a sample
                    # we just find a global one. However only a single pose at the same time
                    # could be detected this way.
                    _, conf, _, point = cv2.minMaxLoc(heatMap)
                    x = (personWidth * point[0]) / pose_out.shape[3]
                    y = (personHeight * point[1]) / pose_out.shape[2]
                    # Add a point if it's confidence is higher than threshold.
                    points.append((int(x), int(y)) if conf > 0.2 else None)

                for pair in POSE_PAIRS:
                    partFrom = pair[0]
                    partTo = pair[1]
                    assert (partFrom in BODY_PARTS)
                    assert (partTo in BODY_PARTS)

                    idFrom = BODY_PARTS[partFrom]
                    idTo = BODY_PARTS[partTo]

                    if points[idFrom] and points[idTo]:
                        cv2.line(person, points[idFrom], points[idTo], (0, 255, 0), 3)
                        cv2.ellipse( person, points[idFrom], (3, 3), 0, 0, 360, (0, 0, 255), cv2.FILLED)
                        cv2.ellipse( person, points[idTo], (3, 3), 0, 0, 360, (0, 0, 255), cv2.FILLED)

                #cv2.imshow("person", person)

                if cv2.waitKey(1) == ord('p'):
                    cv2.waitKey(-1)  # wait until any key is pressed

    cv2.imshow('centernet_resnet101_pose_detection', image_np_with_detections)
    out.write(image_np_with_detections)
    frame_count += 1

    if cv2.waitKey(1) == ord('q'):
        break

    if cv2.waitKey(1) == ord('p'):
        cv2.waitKey(-1)  # wait until any key is pressed

t2 = time.time()
# calculate FPS
fps = str(float(frame_count / float(t2 - t1))) + ' FPS'
print("Frames processed: {}".format(frame_count))
print("Elapsed time: {:.2f}".format(float(t2 - t1)))
print("FPS: {}".format(fps))

# Release camera and close windows
cap.release()
cv2.destroyAllWindows()

reference:
resnet

pose estimation

No comments:

Post a Comment