====== 第2章 エリア判定(立ち止まり判定)【初級編】 ====== ===== 第一項 スターターコードの中身を確認する ===== 第2章のスターターコードは2つのプログラムコードに分かれます\\ 一つがSCORER People Trackerからログを受信して1分ごとにjson形式で保存するプログラム。\\ もう一つが保存したログからあらかじめ指定したエリア内で立ち止まりがあるかどうかを調べるプログラムとなります。 保存用プログラム \\ #!/usr/bin/env python3 # This script dumps various streams import sys import os import argparse import pickle import json import zmq import numpy import cv2 import math from datetime import datetime, timedelta, timezone # Handle arguments PROG_DESCRIPTION='''\ description: this program subscribes ZMQ streams and dumps the data to stdout ''' PROG_EPILOG='''\ examples: %(prog)s %(prog)s --pretty-json %(prog)s -ic tcp://192.168.10.11:9877 tcp://192.168.10.12:9877 ''' DEFAULT_INGRESS_ADDR="tcp://localhost:5002" DEFAULT_INGRESS_TOPICS=[ 'VideoFrame', 'JpegFrame', 'LogFrame' ] ap = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description=PROG_DESCRIPTION, epilog=PROG_EPILOG) ap.add_argument('-v','--verbose', action='count', default=0, help='increase verbosity') apg1 = ap.add_argument_group() apmx1 = apg1.add_mutually_exclusive_group() apmx1.add_argument('-ic', dest='ingress_connects', metavar='ADDR', nargs='+', action='append', help='specify a ZMQ endpoint to connect to a data source (default:"{}")' .format(DEFAULT_INGRESS_ADDR)) apg3 = ap.add_argument_group() apg3.add_argument('--subscribe', dest='topics', metavar='TOPIC', nargs='+', action='append', help='specify a subscription topic of data sources (default:"{}")' .format(DEFAULT_INGRESS_TOPICS)) apg3.add_argument('--pretty-json', action='store_true', help='dump the json with sorted keys and indent') args = ap.parse_args() # Flatten the lists if args.ingress_connects: args.ingress_connects = sum(args.ingress_connects, []) args.ingress_connects = [s for s in args.ingress_connects if s] if args.topics: args.topics = sum(args.topics, []) args.topics = [s for s in args.topics if s] # Set the default endpoint if not args.ingress_connects: args.ingress_connects = [ DEFAULT_INGRESS_ADDR ] # Set the default topic filter if not args.topics: args.topics = DEFAULT_INGRESS_TOPICS # Set the option for json.dumps if args.pretty_json: json_opts = {'sort_keys': True, 'indent': 2} else: json_opts = {'sort_keys': True} # -------------------------------- # Utility Function # -------------------------------- def dprint(level_, *args_, **kwargs_): if level_ <= 0: print('Log level must be positive: {}'.format(level_)) sys.exit(1) # args.verbose: # 0: Suppress all debug logging # 1: Show significant logs only # 2: Show important logs # 3: Show detailed logs # 4: Show trace logs if args.verbose >= level_: print(*args_, **kwargs_) # -------------------------------- # Stream Subscriber # -------------------------------- TOPIC_LOG_FRAME = b'LogFrame' TOPIC_VIDEO_FRAME = b'VideoFrame' TOPIC_JPEG_FRAME = b'JpegFrame' POLL_WAIT_TIME = 30 # milliseconds def subscribe_streams(ctx_, stream_addrs_): print('Starting the stream subscriber now...\n') dirpath = 'data' # Setup the ingress socket for streams socki = ctx_.socket(zmq.SUB) socki.setsockopt(zmq.RCVHWM, 100) if type(args.topics) == list: for topic in args.topics: print('Ingress: Setting a topic filter "{}"'.format(topic)) socki.setsockopt_string(zmq.SUBSCRIBE, topic) for addr in stream_addrs_: print('Connecting to "{}"'.format(addr)) socki.connect(addr) poller = zmq.Poller() poller.register(socki, zmq.POLLIN) print('\nReceiving the stream data ...\n') JST = timezone(timedelta(hours=+9), 'JST') jsonpack = [] datestr0 = datetime.now(JST).strftime('%Y%m%d_%H%M%S') datenum0 = datetime.now(JST) if not os.path.exists(dirpath+'/'+datestr0[:8]): os.makedirs(dirpath+'/'+datestr0[:8]) raw_json_file = open(dirpath+'/'+datestr0[:8]+'/raw_'+datestr0+'.json', 'w') while True: events = dict(poller.poll(POLL_WAIT_TIME)) if events.get(socki) != zmq.POLLIN: continue # Poller time out # Receive a message msg = socki.recv_multipart(flags=zmq.NOBLOCK) if not msg: continue # Decode the message try: topic = msg[0] # bytes stream_id = msg[1] # bytes frame_time = pickle.loads(msg[2]) dprint(4,'Received: topic {} stream_id {} ftime {:.3f}' .format(topic, stream_id, frame_time), flush=True) if topic.endswith(b'/' + stream_id): # Separate out the stream ID from the topic topic = topic[:-(len(stream_id)+1)] if topic == TOPIC_VIDEO_FRAME or topic == TOPIC_JPEG_FRAME: meta = pickle.loads(msg[3]) img = numpy.frombuffer(memoryview(msg[4]), dtype=meta['dtype']) img = img.reshape(meta['shape']) if topic == TOPIC_JPEG_FRAME: img = cv2.imdecode(img, cv2.IMREAD_COLOR) #cv2.imwrite("test.jpg",img) item = (img, pickle.loads(msg[5])) elif topic == TOPIC_LOG_FRAME: item = (numpy.array([]), pickle.loads(msg[3])) else: dprint(4,'Ignoring a message by topic: {} (len {})' .format(topic, len(msg))) continue except pickle.UnpicklingError as e: print('Corrupted pickle message: topic {}, stream {}, {}' .format(topic, stream_id, e)) continue except IndexError as e: print('Invalid length: topic {}, stream {}, length {}, {}' .format(topic, stream_id, len(msg))) continue except ValueError as e: print('Invalid value: topic {}, stream {}, {}' .format(topic, stream_id, e)) continue # Dump the message item[1]['stream_id'] = stream_id.decode() item[1]['frame_time'] = frame_time jptime = datetime.fromtimestamp(item[1]['frame_time'],JST) print(jptime) jsonpack.append(item[1]) datenum1 = datetime.now(JST) datestr1 = datetime.now(JST).strftime('%Y%m%d_%H%M%S') datenumdiff = datenum1-datenum0 if datenumdiff.total_seconds()>60: datenum0=datenum1 datestr0=datestr1 json.dump(jsonpack,raw_json_file) if not os.path.exists(dirpath+'/'+datestr0[:8]): os.makedirs(dirpath+'/'+datestr0[:8]) raw_json_file = open(dirpath+'/'+datestr0[:8]+'/raw_'+datestr0+'.json', 'w') jsonpack = [] # Clean up socki.setsockopt(zmq.LINGER, 0) socki.close() dprint(4,'Exiting the stream subscriber.') # Main ctx = zmq.Context() try: subscribe_streams(ctx, args.ingress_connects) except KeyboardInterrupt: print("\nKeyboardInterrupt\n", file=sys.stderr, flush=True) ctx.term() # EOF capture.pyで保存したログと、エリア座標をjsonファイルから読み込んで指定するプログラム\\ import sys import os import json import cv2 import math from datetime import datetime, timedelta, timezone import csv import glob import numpy as np if __name__ == '__main__': dirpath = 'data' duration = 5 area_def_file = '02area.json' JST = timezone(timedelta(hours=+9), 'JST') print(sys.argv[1]+"を集計しています・・") file_list = sorted(glob.glob('./'+dirpath+'/'+sys.argv[1]+'/raw_'+sys.argv[1]+'*.json')) print("対象ファイル一覧") for filename in file_list: print(filename) json_open = open(area_def_file, 'r') result_json_file = open('result_'+area_def_file[:len(area_def_file)-5]+'_'+sys.argv[1]+'.json', 'w') areadef = json.load(json_open) resultarr = {} for areaidx, area in enumerate(areadef['areas']): areaname='areaID'+str(areaidx) print(areaname) resultarr[areaname]={} print("集計・・") for areaidx, area in enumerate(areadef['areas']): areaname='areaID'+str(areaidx) print(areaname) polygon = [] for point in area['points']: polygon.append([point['x'],point['y']]) contour = np.array(polygon) print(contour) for filename in file_list: print(areaname+"_"+filename) if os.path.getsize(filename) == 0: continue raw_json_open = open(filename, 'r') raw_json = json.load(raw_json_open) for frame in raw_json: jptime = datetime.fromtimestamp(frame['frame_time'],JST) people = frame['objects'] for person in people: #print(areaname+'_'+person['tracking_id']+'_'+str(judgenum)) personpos = [(person["x0"]+person["x1"])/2,person["y1"]] test = cv2.pointPolygonTest(contour, personpos, False) if test>0: if person['tracking_id'] in resultarr[areaname]: resultarr[areaname][person['tracking_id']]['end_time'] = frame['frame_time'] resultarr[areaname][person['tracking_id']]['duration'] = frame['frame_time']-resultarr[areaname][person['tracking_id']]['start_time'] resultarr[areaname][person['tracking_id']]['end_time_jp'] = jptime.isoformat() if resultarr[areaname][person['tracking_id']]['duration'] > duration: resultarr[areaname][person['tracking_id']]['stop_judge'] = True else: resultarr[areaname][person['tracking_id']]={'area':areaname,'start_time_jp':jptime.isoformat(),'end_time_jp':jptime.isoformat(),'start_time':frame['frame_time'],'end_time':frame['frame_time'],'stream_id':frame['stream_id'],'duration':0,'stop_judge':False} json.dump(resultarr,result_json_file) for areaname, records in resultarr.items(): print(areaname) f = open(area_def_file[:len(area_def_file)-5]+"_"+sys.argv[1]+"_"+areaname+".csv", 'w', encoding='utf-8', newline='') write = csv.writer(f) write.writerow(["stream_id","areaname","tracking_id","start_time_jp","end_time_jp","total_duration","stop_judge"]) for tracking_id, record in records.items(): write.writerow([record['stream_id'],areaname,tracking_id,record['start_time_jp'],record['end_time_jp'],record['duration'],record['stop_judge']]) f.close() エリアを指定するJSON\\ { "areas": [ { "points": [ {"x": 110, "y": 351}, {"x": 571, "y": 369}, {"x": 601, "y": 567}, {"x": 147, "y": 709} ] } ] } ===== 第二項 座標ログをSCORERシステムから受け取る ===== 以下、前項でのプログラムの解説を中心に、最終形を作る過程のプログラムも併せて掲載します。 SCORERでのデータのやり取りは様々な機能を実現するため、メッセージングキューという比較的複雑な手法で行っています。 具体的には「ZeroMQ」というオープンソースライブラリ([[https://zeromq.org/]])を利用していますが、チュートリアルでは技術的な詳細は省き、「こうすればデータが受け取れる」とだけ理解しておけば十分でしょう。 そのため単にデータを受け取る割には行数が多いコードとなっていますが、ある部分だけ注目して開発すればいいので、このチュートリアルで変更する部分以外は「おまじない」が書いてあると思っておきましょう。 #!/usr/bin/env python3 # This script dumps various streams import sys import os import argparse import pickle import json import zmq import numpy import cv2 import math from datetime import datetime, timedelta, timezone # Handle arguments PROG_DESCRIPTION='''\ description: this program subscribes ZMQ streams and dumps the data to stdout ''' PROG_EPILOG='''\ examples: %(prog)s %(prog)s --pretty-json %(prog)s -ic tcp://192.168.10.11:9877 tcp://192.168.10.12:9877 ''' DEFAULT_INGRESS_ADDR="tcp://localhost:5002" DEFAULT_INGRESS_TOPICS=[ 'VideoFrame', 'JpegFrame', 'LogFrame' ] ap = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description=PROG_DESCRIPTION, epilog=PROG_EPILOG) ap.add_argument('-v','--verbose', action='count', default=0, help='increase verbosity') apg1 = ap.add_argument_group() apmx1 = apg1.add_mutually_exclusive_group() apmx1.add_argument('-ic', dest='ingress_connects', metavar='ADDR', nargs='+', action='append', help='specify a ZMQ endpoint to connect to a data source (default:"{}")' .format(DEFAULT_INGRESS_ADDR)) apg3 = ap.add_argument_group() apg3.add_argument('--subscribe', dest='topics', metavar='TOPIC', nargs='+', action='append', help='specify a subscription topic of data sources (default:"{}")' .format(DEFAULT_INGRESS_TOPICS)) apg3.add_argument('--pretty-json', action='store_true', help='dump the json with sorted keys and indent') args = ap.parse_args() # Flatten the lists if args.ingress_connects: args.ingress_connects = sum(args.ingress_connects, []) args.ingress_connects = [s for s in args.ingress_connects if s] if args.topics: args.topics = sum(args.topics, []) args.topics = [s for s in args.topics if s] # Set the default endpoint if not args.ingress_connects: args.ingress_connects = [ DEFAULT_INGRESS_ADDR ] # Set the default topic filter if not args.topics: args.topics = DEFAULT_INGRESS_TOPICS # Set the option for json.dumps if args.pretty_json: json_opts = {'sort_keys': True, 'indent': 2} else: json_opts = {'sort_keys': True} # -------------------------------- # Utility Function # -------------------------------- def dprint(level_, *args_, **kwargs_): if level_ <= 0: print('Log level must be positive: {}'.format(level_)) sys.exit(1) # args.verbose: # 0: Suppress all debug logging # 1: Show significant logs only # 2: Show important logs # 3: Show detailed logs # 4: Show trace logs if args.verbose >= level_: print(*args_, **kwargs_) # -------------------------------- # Stream Subscriber # -------------------------------- TOPIC_LOG_FRAME = b'LogFrame' TOPIC_VIDEO_FRAME = b'VideoFrame' TOPIC_JPEG_FRAME = b'JpegFrame' POLL_WAIT_TIME = 30 # milliseconds def subscribe_streams(ctx_, stream_addrs_): print('Starting the stream subscriber now...\n') # Setup the ingress socket for streams socki = ctx_.socket(zmq.SUB) socki.setsockopt(zmq.RCVHWM, 100) if type(args.topics) == list: for topic in args.topics: print('Ingress: Setting a topic filter "{}"'.format(topic)) socki.setsockopt_string(zmq.SUBSCRIBE, topic) for addr in stream_addrs_: print('Connecting to "{}"'.format(addr)) socki.connect(addr) poller = zmq.Poller() poller.register(socki, zmq.POLLIN) print('\nReceiving the stream data ...\n') while True: events = dict(poller.poll(POLL_WAIT_TIME)) if events.get(socki) != zmq.POLLIN: continue # Poller time out # Receive a message msg = socki.recv_multipart(flags=zmq.NOBLOCK) if not msg: continue # Decode the message try: topic = msg[0] # bytes stream_id = msg[1] # bytes frame_time = pickle.loads(msg[2]) dprint(4,'Received: topic {} stream_id {} ftime {:.3f}' .format(topic, stream_id, frame_time), flush=True) if topic.endswith(b'/' + stream_id): # Separate out the stream ID from the topic topic = topic[:-(len(stream_id)+1)] if topic == TOPIC_VIDEO_FRAME or topic == TOPIC_JPEG_FRAME: meta = pickle.loads(msg[3]) img = numpy.frombuffer(memoryview(msg[4]), dtype=meta['dtype']) img = img.reshape(meta['shape']) if topic == TOPIC_JPEG_FRAME: img = cv2.imdecode(img, cv2.IMREAD_COLOR) item = (img, pickle.loads(msg[5])) elif topic == TOPIC_LOG_FRAME: item = (numpy.array([]), pickle.loads(msg[3])) else: dprint(4,'Ignoring a message by topic: {} (len {})' .format(topic, len(msg))) continue except pickle.UnpicklingError as e: print('Corrupted pickle message: topic {}, stream {}, {}' .format(topic, stream_id, e)) continue except IndexError as e: print('Invalid length: topic {}, stream {}, length {}, {}' .format(topic, stream_id, len(msg))) continue except ValueError as e: print('Invalid value: topic {}, stream {}, {}' .format(topic, stream_id, e)) continue #ログをそのまま表示 print(item[1]) # Clean up socki.setsockopt(zmq.LINGER, 0) socki.close() dprint(4,'Exiting the stream subscriber.') # Main ctx = zmq.Context() try: subscribe_streams(ctx, args.ingress_connects) except KeyboardInterrupt: print("\nKeyboardInterrupt\n", file=sys.stderr, flush=True) ctx.term() # EOF root@lesson:~# python 02capture_02show.py Starting the stream subscriber now... Ingress: Setting a topic filter "VideoFrame" Ingress: Setting a topic filter "JpegFrame" Ingress: Setting a topic filter "LogFrame" Connecting to "tcp://localhost:5002" Receiving the stream data ... {'objects': [{'kind': 'person', 'probability': 0.8608451478231887, 'x0': 774, 'y0': 173, 'x1': 877, 'y1': 390, 'last_seen': 1641021272.101173, 'tracking_id': '1641021236.501173_000001', 'face': {'kind': 'face', 'probability': 0.494140625, 'x0': 814, 'y0': 187, 'x1': 836, 'y1': 215, 'landmark': [[817, 196], [823, 197], [817, 202], [817, 208], [821, 208]], 'yaw': 96.8467055176538, 'pitch': 3.2267551947969326, 'roll': 3.081119732984186}, 'floor': {'x': 211, 'y': 167, 'headpose': {'phi': 156.4529207752412, 'pitch': 43.22675519479693, 'roll': 3.081119732984186}}}, {'kind': 'person', 'probability': 0.6284851603879158, 'x0': 545, 'y0': 125, 'x1': 609, 'y1': 218, 'last_seen': 1641021271.301173, 'tracking_id': '1641021270.501173_000001', 'floor': {'x': 159, 'y': 87}}], 'object_counts': {'total': 2, 'new': 0, 'found': 1, 'lost': 1, 'giveup': 0, 'by_kind': {'person': {'new': 0, 'found': 1, 'lost': 1, 'giveup': 0}}}, 'map_scale': {'pixel_to_cm': 3.9712}} {'objects': [{'kind': 'person', 'probability': 0.863207096019165, 'x0': 774, 'y0': 174, 'x1': 877, 'y1': 390, 'last_seen': 1641021272.301173, 'tracking_id': '1641021236.501173_000001', 'face': {'kind': 'face', 'probability': 0.482177734375, 'x0': 814, 'y0': 187, 'x1': 836, 'y1': 215, 'landmark': [[817, 196], [822, 197], [817, 203], [817, 208], [821, 209]], 'yaw': 89.36312648515958, 'pitch': 4.938198147817392, 'roll': 2.9951394016016732}, 'floor': {'x': 211, 'y': 167, 'headpose': {'phi': 148.969341742747, 'pitch': 44.93819814781739, 'roll': 2.9951394016016732}}}, {'kind': 'person', 'probability': 0.6284851603879158, 'x0': 545, 'y0': 125, 'x1': 609, 'y1': 218, 'last_seen': 1641021271.301173, 'tracking_id': '1641021270.501173_000001', 'floor': {'x': 159, 'y': 87}}], 'object_counts': {'total': 2, 'new': 0, 'found': 1, 'lost': 1, 'giveup': 0, 'by_kind': {'person': {'new': 0, 'found': 1, 'lost': 1, 'giveup': 0}}}, 'map_scale': {'pixel_to_cm': 3.9712}} プログラム189-190行目でSCORER People Trackerから受け取ったログ情報をそのままコンソール上で表示しています。\\ 全体的な解説としては\\ 112行目 subscribe_streams関数が処理のメインとなる部分です。これは197-207行目でプログラム実行時にすぐ呼ばれるものとなっています。\\ subscribe_streams内ではまず受信のための初期設定を前半で行い、133行目のwhile文で各フレームの情報を逐次受信している形です。\\ データの流れとしてはSCORER People Trackerは受信したか関係なくだいたい毎秒5-10回ログ情報を送信します。\\ subscribe_streamsのwhile部分、135行目のevents = dict(poller.poll(POLL_WAIT_TIME))を実行した際に次に来る最新のログ情報を受信します。\\ もし最新のログが更新されていたらそれまでのログ情報は消えてなくなりますが、ループ処理の速度が非常に速い場合は全フレームを取得することが可能になるでしょう。\\ なお、ログが流れてこなかった場合はこの部分でプログラムが止まった状態になります。\\ さて、ログとしては取得できることが確認できましたが、これらの情報をファイルとして保存しなければその後の解析ができません。\\ 次はログの情報を記録するプログラムに変更しましょう。\\ ===== 第三項 座標ログを保存する ===== 前項でログが取得できることが確認できました。\\ 受信したデータを一つのファイルに保存してもよいのですが、これらのログは人が多く映っている時は非常に大量にあり、1分間に300-600レコードものデータとなります。\\ 今回は1分に1ファイル、書きこんでいるファイルに60秒分書きこんだらファイルを次の時刻に切り替えて保存する方式とします。\\ また、1日に60ファイル(分ごと)×24時間の1440ファイルにもなるため、日付ごとにフォルダも分けてファイルを保存しましょう。\\ ちなみに取得したログを見ると、どのカメラからのデータなのか、いつのフレームの時刻なのかの情報がありません。\\ こちらはデータ取得時の時間を追記して保存しましょう(将来のSCORER People Trackerのバージョンアップで改善するかもしれません)。\\ さて変更した部分をハイライトしておきます。 #!/usr/bin/env python3 # This script dumps various streams import sys import os import argparse import pickle import json import zmq import numpy import cv2 import math from datetime import datetime, timedelta, timezone # Handle arguments PROG_DESCRIPTION='''\ description: this program subscribes ZMQ streams and dumps the data to stdout ''' PROG_EPILOG='''\ examples: %(prog)s %(prog)s --pretty-json %(prog)s -ic tcp://192.168.10.11:9877 tcp://192.168.10.12:9877 ''' DEFAULT_INGRESS_ADDR="tcp://localhost:5002" DEFAULT_INGRESS_TOPICS=[ 'VideoFrame', 'JpegFrame', 'LogFrame' ] ap = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description=PROG_DESCRIPTION, epilog=PROG_EPILOG) ap.add_argument('-v','--verbose', action='count', default=0, help='increase verbosity') apg1 = ap.add_argument_group() apmx1 = apg1.add_mutually_exclusive_group() apmx1.add_argument('-ic', dest='ingress_connects', metavar='ADDR', nargs='+', action='append', help='specify a ZMQ endpoint to connect to a data source (default:"{}")' .format(DEFAULT_INGRESS_ADDR)) apg3 = ap.add_argument_group() apg3.add_argument('--subscribe', dest='topics', metavar='TOPIC', nargs='+', action='append', help='specify a subscription topic of data sources (default:"{}")' .format(DEFAULT_INGRESS_TOPICS)) apg3.add_argument('--pretty-json', action='store_true', help='dump the json with sorted keys and indent') args = ap.parse_args() # Flatten the lists if args.ingress_connects: args.ingress_connects = sum(args.ingress_connects, []) args.ingress_connects = [s for s in args.ingress_connects if s] if args.topics: args.topics = sum(args.topics, []) args.topics = [s for s in args.topics if s] # Set the default endpoint if not args.ingress_connects: args.ingress_connects = [ DEFAULT_INGRESS_ADDR ] # Set the default topic filter if not args.topics: args.topics = DEFAULT_INGRESS_TOPICS # Set the option for json.dumps if args.pretty_json: json_opts = {'sort_keys': True, 'indent': 2} else: json_opts = {'sort_keys': True} # -------------------------------- # Utility Function # -------------------------------- def dprint(level_, *args_, **kwargs_): if level_ <= 0: print('Log level must be positive: {}'.format(level_)) sys.exit(1) # args.verbose: # 0: Suppress all debug logging # 1: Show significant logs only # 2: Show important logs # 3: Show detailed logs # 4: Show trace logs if args.verbose >= level_: print(*args_, **kwargs_) # -------------------------------- # Stream Subscriber # -------------------------------- TOPIC_LOG_FRAME = b'LogFrame' TOPIC_VIDEO_FRAME = b'VideoFrame' TOPIC_JPEG_FRAME = b'JpegFrame' POLL_WAIT_TIME = 30 # milliseconds def subscribe_streams(ctx_, stream_addrs_): print('Starting the stream subscriber now...\n') dirpath = 'data' # Setup the ingress socket for streams socki = ctx_.socket(zmq.SUB) socki.setsockopt(zmq.RCVHWM, 100) if type(args.topics) == list: for topic in args.topics: print('Ingress: Setting a topic filter "{}"'.format(topic)) socki.setsockopt_string(zmq.SUBSCRIBE, topic) for addr in stream_addrs_: print('Connecting to "{}"'.format(addr)) socki.connect(addr) poller = zmq.Poller() poller.register(socki, zmq.POLLIN) print('\nReceiving the stream data ...\n') JST = timezone(timedelta(hours=+9), 'JST') jsonpack = [] datestr0 = datetime.now(JST).strftime('%Y%m%d_%H%M%S') datenum0 = datetime.now(JST) if not os.path.exists(dirpath+'/'+datestr0[:8]): os.makedirs(dirpath+'/'+datestr0[:8]) raw_json_file = open(dirpath+'/'+datestr0[:8]+'/raw_'+datestr0+'.json', 'w') while True: events = dict(poller.poll(POLL_WAIT_TIME)) if events.get(socki) != zmq.POLLIN: continue # Poller time out # Receive a message msg = socki.recv_multipart(flags=zmq.NOBLOCK) if not msg: continue # Decode the message try: topic = msg[0] # bytes stream_id = msg[1] # bytes frame_time = pickle.loads(msg[2]) dprint(4,'Received: topic {} stream_id {} ftime {:.3f}' .format(topic, stream_id, frame_time), flush=True) if topic.endswith(b'/' + stream_id): # Separate out the stream ID from the topic topic = topic[:-(len(stream_id)+1)] if topic == TOPIC_VIDEO_FRAME or topic == TOPIC_JPEG_FRAME: meta = pickle.loads(msg[3]) img = numpy.frombuffer(memoryview(msg[4]), dtype=meta['dtype']) img = img.reshape(meta['shape']) if topic == TOPIC_JPEG_FRAME: img = cv2.imdecode(img, cv2.IMREAD_COLOR) #cv2.imwrite("test.jpg",img) item = (img, pickle.loads(msg[5])) elif topic == TOPIC_LOG_FRAME: item = (numpy.array([]), pickle.loads(msg[3])) else: dprint(4,'Ignoring a message by topic: {} (len {})' .format(topic, len(msg))) continue except pickle.UnpicklingError as e: print('Corrupted pickle message: topic {}, stream {}, {}' .format(topic, stream_id, e)) continue except IndexError as e: print('Invalid length: topic {}, stream {}, length {}, {}' .format(topic, stream_id, len(msg))) continue except ValueError as e: print('Invalid value: topic {}, stream {}, {}' .format(topic, stream_id, e)) continue # Dump the message item[1]['stream_id'] = stream_id.decode() item[1]['frame_time'] = frame_time jptime = datetime.fromtimestamp(item[1]['frame_time'],JST) print(jptime) jsonpack.append(item[1]) datenum1 = datetime.now(JST) datestr1 = datetime.now(JST).strftime('%Y%m%d_%H%M%S') datenumdiff = datenum1-datenum0 if datenumdiff.total_seconds()>60: datenum0=datenum1 datestr0=datestr1 json.dump(jsonpack,raw_json_file) if not os.path.exists(dirpath+'/'+datestr0[:8]): os.makedirs(dirpath+'/'+datestr0[:8]) raw_json_file = open(dirpath+'/'+datestr0[:8]+'/raw_'+datestr0+'.json', 'w') jsonpack = [] # Clean up socki.setsockopt(zmq.LINGER, 0) socki.close() dprint(4,'Exiting the stream subscriber.') # Main ctx = zmq.Context() try: subscribe_streams(ctx, args.ingress_connects) except KeyboardInterrupt: print("\nKeyboardInterrupt\n", file=sys.stderr, flush=True) ctx.term() # EOF 124-132行目で保存するファイルやフォルダを準備し、ファイルを書きこめる状態にします。\\ 189-198行目で受け取ったログに時刻やカメラ情報を付加して配列に入ったデータをJSON形式に変換してファイルに保存します。\\ 199-205行目で書きこんでいるファイル作成時刻から60秒たったら、保存するファイルを切り替えて再度書きこめる状態にします。なお、各分の00秒をファイル切り替えの基準にはしていません。例えば13時30分31秒からファイル書き込みがスタートすれば次のファイルに切り替わるタイミングは13時31分31秒以降になります。また、日をまたいだ最初のファイルはを保存する際は新しい日付フォルダも作って切り替えます。\\ これでログを保存することができるようになりました。\\ ===== 第四項 SCORER People Trackerから取得できる情報 ===== SCORER People Trackerの内容やログ形式については1章でも書いたものとなっており、再掲すると\\ {{:peopletrackerログ説明.png?linkonly|}}\\ となっています。\\ なお、前項で保存したログのサンプルデータ(最初3フレーム分)は [ { "objects": [ { "kind": "person", "probability": 0.5974622151923598, "x0": 0, "y0": 209, "x1": 182, "y1": 535, "last_seen": 1640876588.952184, "tracking_id": "1640876585.885518_000000", "face": { "kind": "face", "probability": 0.400390625, "x0": 100, "y0": 249, "x1": 119, "y1": 267, "landmark": [ [ 103, 257 ], [ 109, 255 ], [ 105, 261 ], [ 105, 264 ], [ 110, 263 ] ], "yaw": 87.87988909181664, "pitch": 12.394597945309016, "roll": -6.7445171975791345 }, "floor": { "x": 77, "y": 176, "headpose": { "phi": 128.9253565254918, "pitch": 32.394597945309016, "roll": -6.7445171975791345 } } } ], "object_counts": { "total": 1, "new": 0, "found": 1, "lost": 0, "giveup": 0, "by_kind": { "person": { "new": 0, "found": 1, "lost": 0, "giveup": 0 } } }, "map_scale": { "pixel_to_cm": 3.9564 }, "stream_id": "RTSPcam1", "frame_time": 1640876588.952184 }, { "objects": [ { "kind": "person", "probability": 0.5974622151923598, "x0": 0, "y0": 209, "x1": 182, "y1": 535, "last_seen": 1640876589.018851, "tracking_id": "1640876585.885518_000000", "face": { "kind": "face", "probability": 0.400390625, "x0": 100, "y0": 249, "x1": 119, "y1": 267, "landmark": [ [ 103, 257 ], [ 109, 255 ], [ 105, 261 ], [ 105, 264 ], [ 110, 263 ] ], "yaw": 87.87988909181664, "pitch": 12.394597945309016, "roll": -6.7445171975791345 }, "floor": { "x": 77, "y": 176, "headpose": { "phi": 128.9253565254918, "pitch": 32.394597945309016, "roll": -6.7445171975791345 } } } ], "object_counts": { "total": 1, "new": 0, "found": 1, "lost": 0, "giveup": 0, "by_kind": { "person": { "new": 0, "found": 1, "lost": 0, "giveup": 0 } } }, "map_scale": { "pixel_to_cm": 3.9564 }, "stream_id": "RTSPcam1", "frame_time": 1640876589.018851 } ] となっているはずです。カメラidやフレーム時刻なども記録されていますね。\\ さて、このデータはカメラ画像内の足元位置座標(カメラ座標系)や顔向きと見取り図内の足元位置座標(見取り図座標系)や顔向きの情報両方が取得できています。\\ カメラ座標系の場合\\ {{::peopletracker_def-camera.png|}}\\ 見取り図座標系の場合\\ {{:peopletracker_def-sketch.png|}}\\ というように定義されています。\\ どちらの座標系を使ってもエリアを設定して判定することは可能ですが、この章ではより直観的にエリアを設定するために、カメラ座標系のデータを採用します(3章では見取り図上でのコードも提示します)。\\ また、SCORER People Trackerの出力する人物検知結果にはそれぞれの人物に対して固有のIDが振られています\\ 例:"tracking_id": "1640876585.885518_000000"\\ フレームを全部調べて、同じIDの人物の座標を取得して来ればその人物の動きが時系列で分かるようになります。\\ これにより横断歩道を「通った」か「立ち止まった」かなど「動き」の情報が得られます。\\ ちなみに、通常画像処理は1枚1枚の画像ごとにそれぞれAIで人物の位置を判定しているだけで、前後の時間との関連情報はありません。\\ そのため固有IDを振るためには特殊な処理をしています。\\ SCORER People Trackerでは1秒間に5-10回もの高頻度で検知を行っているため、前後の検知結果同士を見比べれば同じ人物であることが容易に推定されます。\\ {{::peopletracker_saikinbo.png|}}\\ 本来は複数の人物がすれ違った時や、画面の外に出てまた入ってきた場合はどうなるかなどいろいろ考えるべきことはありますが、そのあたりはSCORER People Trackerがうまくやってくれています。\\ 今回は画面内にある人物の判定と前提があるため、あまり気にせず固有IDをそのまま活用していきましょう。\\ ===== 第五項 エリア座標判定処理の考え方 ===== さて、ここでどのようなデータを取得したいか具体的に定義していきましょう。\\ 今回は横断歩道などの手前で正しく立ち止まったかどうか?ということがテーマになっています。\\ ここで「正しく」というのをプログラム的に定義しましょう。\\ 人物に固有IDが振られていることは前の項で説明しました。\\ また、エリアの定義については。\\ カメラ座標に対してエリアを定義する場合、4つの場合で考えると\\ {{::peopletracker_area01.png|}}\\ 多角形の場合は\\ {{:peopletracker_area02.png|}}\\ のように座標を設定することとします。\\ これらの情報から、それぞれの固有IDごとに人物の動きを確認し、エリアに入ったことがある人とそうでない人、さらにエリアに入ったことがある人が例えば最低5秒滞在していたかという判定を行うことが可能になります。\\ 結果として全体で1時間に横断歩道を渡った人が200人、ちゃんと手前で止まった人が150人などという結果データをCSVファイルとして出力してみましょう。\\ ※実際は横断歩道を渡らず横切るだけの人をカウントしたり、立ち止まってはいないけれどもゆっくり歩いていた人も立ち止まったことになってしまうなど課題はあります。\\ これらはチュートリアルを終えたのち、より判定精度を上げるにはどうするかということで現場に合わせて皆さんがカスタマイズしてみてください。\\ さて、プログラム的な考え方としては * 保存した多数のログファイルを読み込んで配列に入れる(計算しやすくする) * エリアを設定したテキストファイルからエリア情報を読み取る * エリアごと、人物ごとに軌跡を確認し、エリアに入ったかどうか、入った場合の秒数をカウント という設計をします。\\ 実際は - エリアごとに - ファイルを順番に読み込んで - ファイルの中にあるフレームも順番に読み込んで - 固有IDが初めて出てきたら配列に固有IDと座標等を登録。2回目以降は配列に座標を追加。 - 同時にエリア内にいたかどうかを判定。いた場合は固有ID配列に時刻ごとに「通過したフラグ」を立てる。 - 5秒前の情報からエリアにいた場合は立ち止まったと判断して固有IDに「立ち止まったフラグ」を立てる。 - 全部の集計が完了したら配列をCSVにして出力する。 という流れになります。\\ さて、実際のコードは次の項で説明することとして、まずエリアを設定しなければなりません。\\ 今回は手間ですが、エリアの座標をペイントツールなどを利用して座標を決めることとします。\\ まずカメラの画像を取得しましょう。\\ [[http://解析端末のIP:21001/]] \\ にアクセスすると \\ {{::peopletracker_api01.png?600|}} \\ のようにSCORER EdgeのAPI画面が表示されます。\\ ここで、「snapshot」とブラウザで検索し、「Try it out」をクリックします。 \\ {{::peopletracker_api02.png?600|}} \\ カメラ名を入れる場所が出てきますので、「RTSPcam1」と入力し、Exacuteを押しましょう。 \\ {{::peopletracker_api03.png?600|}} \\ すぐに画像が表示されますので、右クリック画像を保存で画像を操作PCにダウンロードしてきましょう。 (名前はarea.jpegなど適当につけます)\\ {{::peopletracker_api04.png?600|}} \\ 保存したファイルをwindowsであれば標準の「ペイント」で開きます。 {{:peopletracker_api05.png|}}\\ 今回は1-4の点の座標を調べることとします。(以下は例です。実際の画面に合わせて変更してください)\\ {{:peopletracker_api06.png|}}\\ まず一つ目はx:110px y:351px\\ {{:peopletracker_api07.png|}}\\ 二つ目はx:571px y:369px\\ {{:peopletracker_api10.png|}}\\ 三つ目はx:601px y:567px\\ {{:peopletracker_api08.png|}}\\ 四つ目はx:147px y:709px\\ {{:peopletracker_api09.png|}}\\ と取得できます。 なお、これらはカメラの数や試行回数が多くなると途端に面倒になるものですので、次の章ではWEB画面で簡単に設定できるようにしていきます。\\ { "areas": [ { "points": [ {"x": 110, "y": 351}, {"x": 571, "y": 369}, {"x": 601, "y": 567}, {"x": 147, "y": 709} ] } ] } 取得した座標と立ち止まり判定継続時間(秒)を設定します。\\ また、今回のスターターコードでは複数のエリアを同時に設定しても問題ないように書いてあります。\\ { "areas": [ { "points": [ {"x": 0, "y": 0}, {"x": 500, "y": 0}, {"x": 500, "y": 200}, {"x": 0, "y": 200} ] }, { "points": [ {"x": 725, "y": 499}, {"x": 1225, "y": 499}, {"x": 1225, "y": 699}, {"x": 725, "y": 699} ] } ] } ===== 第六項 スターターコードの完成 ===== さて、前項でエリア情報をファイルで定義し、判定する手順も決めたため集計プログラムを作成するうえで必要な情報はすべてそろいました。\\ 再掲となりますが判定プログラムがこちらです。 import sys import os import json import cv2 import math from datetime import datetime, timedelta, timezone import csv import glob import numpy as np if __name__ == '__main__': dirpath = 'data' duration = 5 area_def_file = '02area.json' JST = timezone(timedelta(hours=+9), 'JST') print(sys.argv[1]+"を集計しています・・") file_list = sorted(glob.glob('./'+dirpath+'/'+sys.argv[1]+'/raw_'+sys.argv[1]+'*.json')) print("対象ファイル一覧") for filename in file_list: print(filename) json_open = open(area_def_file, 'r') result_json_file = open('result_'+area_def_file[:len(area_def_file)-5]+'_'+sys.argv[1]+'.json', 'w') areadef = json.load(json_open) resultarr = {} for areaidx, area in enumerate(areadef['areas']): areaname='areaID'+str(areaidx) print(areaname) resultarr[areaname]={} print("集計・・") for areaidx, area in enumerate(areadef['areas']): areaname='areaID'+str(areaidx) print(areaname) polygon = [] for point in area['points']: polygon.append([point['x'],point['y']]) contour = np.array(polygon) print(contour) for filename in file_list: print(areaname+"_"+filename) if os.path.getsize(filename) == 0: continue raw_json_open = open(filename, 'r') raw_json = json.load(raw_json_open) for frame in raw_json: jptime = datetime.fromtimestamp(frame['frame_time'],JST) people = frame['objects'] for person in people: #print(areaname+'_'+person['tracking_id']+'_'+str(judgenum)) personpos = [(person["x0"]+person["x1"])/2,person["y1"]] test = cv2.pointPolygonTest(contour, personpos, False) if test>0: if person['tracking_id'] in resultarr[areaname]: resultarr[areaname][person['tracking_id']]['end_time'] = frame['frame_time'] resultarr[areaname][person['tracking_id']]['duration'] = frame['frame_time']-resultarr[areaname][person['tracking_id']]['start_time'] resultarr[areaname][person['tracking_id']]['end_time_jp'] = jptime.isoformat() if resultarr[areaname][person['tracking_id']]['duration'] > duration: resultarr[areaname][person['tracking_id']]['stop_judge'] = True else: resultarr[areaname][person['tracking_id']]={'area':areaname,'start_time_jp':jptime.isoformat(),'end_time_jp':jptime.isoformat(),'start_time':frame['frame_time'],'end_time':frame['frame_time'],'stream_id':frame['stream_id'],'duration':0,'stop_judge':False} json.dump(resultarr,result_json_file) for areaname, records in resultarr.items(): print(areaname) f = open(area_def_file[:len(area_def_file)-5]+"_"+sys.argv[1]+"_"+areaname+".csv", 'w', encoding='utf-8', newline='') write = csv.writer(f) write.writerow(["stream_id","areaname","tracking_id","start_time_jp","end_time_jp","total_duration","stop_judge"]) for tracking_id, record in records.items(): write.writerow([record['stream_id'],areaname,tracking_id,record['start_time_jp'],record['end_time_jp'],record['duration'],record['stop_judge']]) f.close() 実行は\\ python 02stopinfront_final.py 20211101 のように日付を引数に実行します。\\ 以下プログラムの簡単な解説です。\\ 13-15行目で読み込むログファイルがあるフォルダやエリア情報の定義ファイル、判定秒数の情報を記入します。\\ 17行目ではコンピューター時刻(UNIXタイム)で記載された時刻情報を日本時間に変換しないとデータが読みづらいため、変換するための事前準備をしています。\\ 19行目で指定日付のフォルダに入っているファイル一覧を取得して、時刻順に並べ替えをしています。\\ 24行目でエリア情報のファイルを開きます。(JSON形式のため26行目で配列に変換しています)\\ 25行目で結果CSVファイルを新規で開きます。\\ 28-31行目では結果データの配列をエリアごとに事前に初期化します。\\ さて、今回多角形のエリアに対応した判定ができるようなコードとなっています。\\ ここで初めて出てきますが、映像解析でよく使われる計算ライブラリにOpenCVというものがあります。\\ このライブラリを利用して多角形内にある点が入っているか入っていないかが簡単に判明します。\\ 57-58行目\\ personpos = [(person["x0"]+person["x1"])/2,person["y1"]] test = cv2.pointPolygonTest(contour, personpos, False) でPersonposで判定する点(今回は人物を囲んだ四角=BoundingBoxの下部中央)をcontourというエリアを定義した点群で判定するということを行います。\\ {{::peopletracker_hittest01.png|}}\\ 青い点となっている場合のみを今回エリア内判定とすることにします。\\ (testにはエリア外の場合は-1,エリア境界上にいる場合は0,エリア内にいる場合は1という数字が返ってきます。)\\ contourは34-35行目でエリア定義の情報を転記しています。\\ ※このチュートリアルでは一部の機能を使うだけなのでおまじない扱いとしますが、詳細については\\ [[https://www.codexa.net/opencv_python_introduction/]]\\ などを読むと理解が進むかとおみます。\\ 次に時系列を考慮した判定を行います。\\ これは結果データを個体IDごとに集計しなおすことで、一つのIDに判定情報を蓄積していく形としています。\\ 59-67行目ではエリア内だった場合に、初めてその個体IDが現れた場合は初期化をし時刻を書きこみます。二回目以降出てきた場合は時刻を更新して、最初に検知された時刻との差分を増やしていき、5秒以上になった場合は立ち止まり判定フラグにTrueを入れます。\\ なお、エリアに一度も入っていない場合は今回集計対象からは外しています。\\ 71-79行目では一度すべての情報を配列で集計し終わった後、CSVファイルに書き込む処理をしています。CSVファイルの1行目には列のヘッダー情報を入れておきます。\\ 細かいところなどは省きましたが、上記の動作するコードに対して様々な変更を加えて自分なりに動作を確認してみましょう。\\ 次の章ではエリア座標を設定する手間を大幅に軽減するためにWEB画面で変更できるところなども学んでみましょう。