上图是来自Deep Dual-resolution Networks for Real-time and Accurate Semantic Segmentation of Road Scenes,该图描述了当前实时语义分割的一些优秀算法。从上图可以看出,DDRNet算法在速度和精度上都是较为优秀的算法。选择DDRNet-23-slim算法来作为Baseline。DDRNet的具体代码可以参考GitHub(本文采用该代码)。
if __name__ == '__main__': os.makedirs(dataset_root, exist_ok=True) if not os.path.exists(sys.argv[1]): print(f'{sys.argv[1]} 不存在!') exit(-1)
# 遍历数据集目录下所有xml文件及其对应的图片 dataset_path = pathlib.Path(sys.argv[1]) found_data_list = [] for mask_file in dataset_path.glob('**/*.png'): possible_images = [mask_file.with_suffix(suffix) for suffix in supported_fmt] supported_images = list(filter(lambda p: p.is_file(), possible_images)) if len(supported_images) == 0: print(f'找不到对应的图片文件:`{mask_file.as_posix()}`') continue found_data_list.append({'image': supported_images[0], 'label': mask_file})
# 随机化数据集,将数据集拆分成训练集和验证集,并将其拷贝到/project/train/src_repo/dataset下 random.shuffle(found_data_list) train_data_count = len(found_data_list) * 4 / 5 train_data_list = [] valid_data_list = [] for i, data in enumerate(found_data_list): if i < train_data_count: # 训练集 data_list = train_data_list else: # 验证集 data_list = valid_data_list data_list.append(data)
with open(os.path.join(dataset_root, 'train.txt'), 'w') as f: for name in train_data_list: f.write(name['image'].as_posix() + ',' + name['label'].as_posix() + '\n') with open(os.path.join(dataset_root, 'val.txt'), 'w') as f: for name in valid_data_list: f.write(name['image'].as_posix() + ',' + name['label'].as_posix() + '\n') print('Done')
# ------------------------------------------------------------------------------ # Copyright (c) Microsoft # Licensed under the MIT License. # Written by Ke Sun (sunk@mail.ustc.edu.cn) # ------------------------------------------------------------------------------
import os
import cv2 import numpy as np from PIL import Image
def read_files(self): files = [] if'test'in self.list_path: for item in self.img_list: image_path = item name = os.path.splitext(os.path.basename(image_path[0]))[0] files.append({ 'img': image_path[0], 'name': name, }) else: for item in self.img_list: image_path, label_path = item name = os.path.splitext(os.path.basename(label_path))[0] files.append({ 'img': image_path, 'label': label_path, 'name': name, 'weight': 1 }) return files
def convert_label(self, label, inverse=False): if self.label_mapping is None: return label temp = label.copy() if inverse: for v, k in self.label_mapping.items(): label[temp == k] = v else: for k, v in self.label_mapping.items(): label[temp == k] = v return label
''' An example that uses TensorRT's Python api to make inferences for BiSeNet. ''' import os import shutil import random import sys import threading import time import cv2 import numpy as np import pycuda.autoinit import pycuda.driver as cuda import tensorrt as trt
def get_img_path_batches(batch_size, img_dir): ret = [] batch = [] for root, dirs, files in os.walk(img_dir): for name in files: if len(batch) == batch_size: ret.append(batch) batch = [] batch.append(os.path.join(root, name)) if len(batch) > 0: ret.append(batch) return ret
class Seg_TRT(object): ''' description: A BiSeNet class that warps TensorRT ops, preprocess and postprocess ops. '''
def __init__(self, engine_file_path): # Create a Context on this device, self.cfx = cuda.Device(0).make_context() stream = cuda.Stream() runtime = trt.Runtime(trt.Logger(trt.Logger.ERROR)) assert runtime
# Deserialize the engine from file with open(engine_file_path, 'rb') as f: engine = runtime.deserialize_cuda_engine(f.read()) context = engine.create_execution_context()
def infer(self, image_raw): threading.Thread.__init__(self) # Make self the active context, pushing it on top of the context stack. self.cfx.push() # Restore stream = self.stream context = self.context engine = self.engine host_inputs = self.host_inputs cuda_inputs = self.cuda_inputs host_outputs = self.host_outputs cuda_outputs = self.cuda_outputs bindings = self.bindings # if image_raw is constant, image_raw.shape[1] != self.input_w w_ori, h_ori = image_raw.shape[1], image_raw.shape[0] # print('image size: ', w_ori, ' x ', h_ori) # Do image preprocess input_image = self.preprocess_image(image_raw) # Copy input image to host buffer np.copyto(host_inputs[0], input_image.ravel()) # Transfer input data to the GPU. cuda.memcpy_htod_async(cuda_inputs[0], host_inputs[0], stream) # Run inference. context.execute_async(bindings=bindings, stream_handle=stream.handle) # Transfer predictions back from the GPU. cuda.memcpy_dtoh_async(host_outputs[0], cuda_outputs[0], stream) # Synchronize the stream stream.synchronize() # Remove any context from the top of the context stack, deactivating it. self.cfx.pop() # Here we use the first row of output in that batch_size = 1 output = host_outputs[0]
# Do postprocess output = output.reshape(self.output_h, self.output_w).astype('uint8') if self.output_w != w_ori or h_ori != self.output_h: output = cv2.resize(output, (w_ori, h_ori), interpolation=cv2.INTER_NEAREST)
def get_raw_image(self, image_path_batch): ''' description: Read an image from image path ''' for img_path in image_path_batch: return cv2.imread(img_path)
def get_raw_image_zeros(self, image_path_batch=None): ''' description: Ready data for warmup ''' for _ in range(self.batch_size): return np.zeros([self.input_h, self.input_w, 3], dtype=np.uint8)
if __name__ == '__main__': # load custom engine engine_file_path = 'build/model.trt'# the generated engine file
if len(sys.argv) > 1: engine_file_path = sys.argv[1]
if os.path.exists('output/'): shutil.rmtree('output/') os.makedirs('output/') # a hrnet instance bisenet = Seg_TRT(engine_file_path) try: print('batch size is', bisenet.batch_size) # batch size is set to 1!
for i in range(10): # create a new thread to do warm_up thread1 = warmUpThread(bisenet) thread1.start() thread1.join() for batch in image_path_batches: # create a new thread to do inference thread1 = inferThread(bisenet, batch) thread1.start() thread1.join() finally: # destroy the instance bisenet.destroy()
from trt import * import json import cv2 import numpy as np import os from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path import logging as log import shutil
'''Initialize model Returns: model ''' onnx_file_path = '/project/train/models/model_sim.onnx' engine_file_path = '/usr/local/ev_sdk/model/model.trt' if not os.path.exists(engine_file_path): os.system('/usr/local/ev_sdk/3rd/BiSeNet/bin/segment compile ' + onnx_file_path + ' ' + engine_file_path + ' --fp16') net = Seg_TRT(engine_file_path) for i in range(10): # create a new thread to do warm_up thread1 = warmUpThread(bisenet) thread1.start() thread1.join() return net
'''Do inference to analysis input_image and get output Attributes: handle: algorithm handle returned by init() input_image (numpy.ndarray): image to be process, format: (h, w, c), BGR args: string in JSON format, format: { 'mask_output_path': '/path/to/output/mask.png' } Returns: process result ''' args_dict =json.loads(args) mask_output_path = args_dict['mask_output_path'] output = handle.infer(input_image) cv2.imwrite(mask_output_path, output) return json.dumps({'model_data': {'mask': mask_output_path}, 'mask': mask_output_path},indent=4)
if __name__ == '__main__': # Test API import time img = cv2.imread('../data/test.jpg') predictor = init() s = time.time() for i in range(10): res = process_image(predictor, img, '{\'mask_output_path\':\'./out.png\'}') print((time.time() - s) /10 ) print(res) predictor.destroy()