日本たばこ産業 たばこ商品の画像検出チュートリアル

はじめに¶
これは「日本たばこ産業 たばこ商品の画像検出」( https://signate.jp/competitions/159 )に対するチュートリアルです. 画像データの可視化などを含めて一連のモデリングの考え方や実装例を説明します. 参考になれば幸いです.
目次¶
- 想定される環境
- データ準備
- データ確認
- メタ情報の確認
- 画像データの可視化
- モデリング
- 領域検出
- 深層学習モデルのインスタンス化
- 入力用データ作成
- 深層学習モデルの学習
- 深層学習モデルによる予測
- 同一判別
- 深層学習モデルのインスタンス化
- 入力用データ作成
- 深層学習モデルの学習
- 深層学習モデルによる予測
- 予測結果の出力
- 領域検出
- まとめ
- 参考文献
想定される環境¶
- Python: 3.6.2+
- Libraries
- chainer: 5.3.0
- chainercv: 0.12.0
- cupy: 5.3.0
- pillow: 5.4.1
- numpy: 1.16.1
- matplotlib: 2.1.0
- GPU
- NVIDIA製
データ準備¶
https://signate.jp/competitions/159/data にアクセスし, train_images.zip, train_annotations.zip, master_images.zip, master.tsvをダウンロードして, それらをすべて作業ディレクトリにおいて, zipファイルは解凍してください. 解凍すると, train_images, train_annotations, master_imagesディレクトリが作成されます.
データ確認¶
メタ情報の確認¶
import pandas as pd
import os
import json
銘柄データ(master.tsv)を確認してみます.
master = pd.read_csv('master.tsv', sep='\t')
unique_id =master['category'].unique()
unique_brand = master['brand'].unique()
print('total:', len(unique_brand))
ブランド名は全部で22種類でした.
print(unique_id)
print('total:', len(unique_id))
銘柄レベルで見ると全部で223種類ありました.
アノテーションデータをもとにして銘柄ごとの矩形の数, 矩形の面積などを確認してみます.
annotations = os.listdir('train_annotations')
category_data = {}
for annotation in annotations:
with open(os.path.join('train_annotations', annotation)) as f:
data = json.load(f)
labels = data['labels']
for label in labels:
category = label['category']
if category not in category_data:
category_data[category] = {'num':0, 'acc_area':0}
category_data[category]['num'] += 1
category_data[category]['acc_area'] += (label['box2d']['x2']-label['box2d']['x1'])*(label['box2d']['y2']-label['box2d']['y1'])
category_df = pd.DataFrame(category_data).T
category_df['area_mean'] = category_df['acc_area']/category_df['num']
category_df[['num', 'area_mean']].describe()
1銘柄当たり平均して107個の矩形があり, 画像における矩形の面積は平均21290(px*px)であることがわかりました.
ヒストグラムを描画してみます
from matplotlib import pyplot as plt
%matplotlib inline
fig, axes = plt.subplots(nrows = 1, ncols=2, figsize=(30,10))
modes = ['num','area_mean']
for i,mode in enumerate(modes):
axes[i].hist(list(category_df[mode]), 20)
axes[i].set_title(mode, fontsize=30)
axes[i].set_ylabel('count', fontsize=30)
ブランドごとの矩形の個数を見てみます.
category_df = category_df.reset_index()[['index', 'num', 'area_mean']]
category_df.columns = ['category', 'num', 'area_mean']
master = pd.merge(master, category_df)
summary_master = master.groupby('brand')[['num']].apply(sum).sort_values('num', ascending=False)
print('total:', summary_master['num'].sum())
上位4ブランドが支配的で, 全体(23748個)の50%以上を占めています.
画像データの可視化¶
棚画像データに対して矩形を描画してどのような銘柄がどこに存在するかを見てみます.
import numpy as np
import chainer
from PIL import Image
from chainercv.chainer_experimental.datasets.sliceable import GetterDataset
from chainercv.utils import read_image
from chainercv.visualizations import vis_bbox
from pylab import rcParams
rcParams['figure.figsize'] = 12,12
plt.rcParams['font.family'] = 'IPAPGothic'
class BboxDataset(GetterDataset):
def __init__(self, img_dir, annotation_dir, categories, img_ext='.jpg', annotation_ext='.json'):
super(BboxDataset, self).__init__()
self.names = [i.split('.')[0] for i in os.listdir(img_dir)]
self.img_dir = img_dir
self.annotation_dir = annotation_dir
self.categories = categories
self.img_ext = img_ext
self.annotation_ext = annotation_ext
self.add_getter('img', self.get_image)
self.add_getter(('bbox', 'label'), self.get_annotation)
def __len__(self):
return len(self.names)
def get_image(self, i):
name = self.names[i]
img_path = os.path.join(self.img_dir, name+self.img_ext)
img = read_image(img_path, color=True)
return img
def get_annotation(self, i):
name = self.names[i]
annotation_path = os.path.join(self.annotation_dir, name+self.annotation_ext)
with open(annotation_path) as f:
annotation = json.load(f)
bbox = []
label = []
for l in annotation['labels']:
category = str(l['category'])
if category in self.categories:
bb = l['box2d']
bbox.append([bb['y1'], bb['x1'], bb['y2'], bb['x2']])
label.append(self.categories.index(category))
bbox = np.array(bbox).astype(np.float32)
label = np.array(label).astype(np.int32)
return bbox, label
categories = [str(i) for i in master['category']]
data = BboxDataset('train_images', 'train_annotations', categories)
'train_020.jpg'の棚画像データを矩形とともに可視化してみます.
index_num = 20
print(data.names[index_num])
img, bbox, label = data[index_num]
vis_bbox(img, bbox, label, label_names=categories)
plt.show()
ブランド名で見たい場合はlabel_namesにブランド名リストを直接渡します.
brands = list(master['brand'])
vis_bbox(img, bbox, label, label_names=brands)
plt.show()
特定のブランドのみ切り取って見てみます.
brand = master.loc[set(master[master['brand']=='ピアニッシモ'].index).intersection(set(label))]
imgs = []
for l,b in zip(label, bbox):
if l in brand.index:
imgs.append((brand.at[l, 'category'], img[:,int(b[0]):int(b[2])+1,int(b[1]):int(b[3])+1].transpose(1,2,0).astype(np.uint8)))
マスタ画像データと見比べてみます.
fig, axes = plt.subplots(nrows = len(imgs), ncols=2, figsize=(5,4*(len(imgs))))
for i,image in enumerate(imgs):
master_image = read_image(os.path.join('master_images', str(image[0])+'.jpg')).transpose((1,2,0)).astype(np.uint8)
axes[i,0].imshow(master_image)
axes[i,0].set_title('{}'.format(image[0]), fontsize=20)
axes[i,1].imshow(image[1])
axes[i,1].set_title('cropped image', fontsize=20)
ブランドによっては見た目が大きく異なるものもあればほぼ見分けがつかないものもありました.
モデリング¶
本コンペでは棚画像データのみを入力として銘柄とその位置を予測するアルゴリズムを作成することは禁止となっており, マスター画像も合わせて入力として必ず入れなければなりません. そこでここではまず棚画像データから一般的なたばこ(のパッケージ)の領域を検出し(位置を予測し) 予測されたそれぞれの領域に対してマスター画像と照らし合わせてその同一性を判別することで銘柄を具体的に予測する, という2段階の予測アルゴリズムを考えてみます. 領域の検出と同一判別にそれぞれ深層学習モデルを利用します. まず領域検出の部分を考えます.
領域検出¶
深層学習モデルのインスタンス化¶
一般的なたばこの検出を行うため, 全ての銘柄は同じクラスとして扱います.
深層学習モデルはSSD300[1]を用います.
from chainercv.links.model.ssd import multibox_loss
from chainercv.links import SSD300
class MultiboxTrainChain(chainer.Chain):
def __init__(self, model, alpha=1, k=3):
super(MultiboxTrainChain, self).__init__()
with self.init_scope():
self.model = model
self.alpha = alpha
self.k = k
def __call__(self, imgs, gt_mb_locs, gt_mb_labels):
mb_locs, mb_confs = self.model(imgs)
loc_loss, conf_loss = multibox_loss(
mb_locs, mb_confs, gt_mb_locs, gt_mb_labels, self.k)
loss = loc_loss * self.alpha + conf_loss
chainer.reporter.report(
{'loss': loss, 'loss/loc': loc_loss, 'loss/conf': conf_loss},
self)
return loss
eval_categories = ('tabacco',)
model_detector = SSD300(n_fg_class=len(eval_categories), pretrained_model = 'imagenet')
model_detector.nms_thresh = 0.5
model_detector.score_thresh = 0.5
detector_train_chain = MultiboxTrainChain(model_detector)
入力用データ作成¶
適当なストライドのスライディングウィンドウにより, 適当な大きさの画像を切り取って改めて学習データを作成します.
import copy
from chainer.datasets import TransformDataset
from chainercv import transforms
from chainercv.links.model.ssd import random_crop_with_bbox_constraints
from chainercv.links.model.ssd import random_distort
from chainercv.links.model.ssd import resize_with_random_interpolation
class Transform():
def __init__(self, coder, size, mean):
# to send cpu, make a copy
self.coder = copy.copy(coder)
self.coder.to_cpu()
self.size = size
self.mean = mean
def __call__(self, in_data):
img, bbox, label = in_data
# 1. Color augmentation
img = random_distort(img)
# 2. Random expansion
if np.random.randint(2):
img, param = transforms.random_expand(
img, fill=self.mean, return_param=True)
bbox = transforms.translate_bbox(
bbox, y_offset=param['y_offset'], x_offset=param['x_offset'])
# 3. Random cropping
img, param = random_crop_with_bbox_constraints(
img, bbox, return_param=True)
bbox, param = transforms.crop_bbox(
bbox, y_slice=param['y_slice'], x_slice=param['x_slice'],
allow_outside_center=False, return_param=True)
label = label[param['index']]
# 4. Resizing with random interpolatation
_, H, W = img.shape
img = resize_with_random_interpolation(img, (self.size, self.size))
bbox = transforms.resize_bbox(bbox, (H, W), (self.size, self.size))
# 5. Random horizontal flipping
img, params = transforms.random_flip(
img, x_random=True, return_param=True)
bbox = transforms.flip_bbox(
bbox, (self.size, self.size), x_flip=params['x_flip'])
# Preparation for SSD network
img -= self.mean
mb_loc, mb_label = self.coder.encode(bbox, label)
return img, mb_loc, mb_label
os.makedirs('train_images_0', exist_ok=True)
os.makedirs('train_annotations_0', exist_ok=True)
os.makedirs('val_images', exist_ok=True)
os.makedirs('val_annotations', exist_ok=True)
def crop_image(src_img_dir, img_fname, src_annotation_dir, annotation_fname, dst_img_dir, dst_annotation_dir, stride=300, crop_size=600, buffer=30):
image = Image.open(os.path.join(src_img_dir, img_fname))
image_array = np.array(image)
with open(os.path.join(src_annotation_dir, annotation_fname)) as f:
annotation = json.load(f)
bboxes = pd.DataFrame([bb['box2d'] for bb in annotation['labels']])
count = 0
for i in range(0,image_array.shape[0],stride):
for j in range(0, image_array.shape[1],stride):
inside = bboxes[(bboxes['x1']>=j-buffer)&(bboxes['y1']>=i-buffer)&(bboxes['x2']<=j+crop_size+buffer)&(bboxes['y2']<=i+crop_size+buffer)]
if len(inside)>0:
cropped_image = image_array[i:i+crop_size,j:j+crop_size,:]
cropped_annotation={}
cropped_annotation['labels'] = [{'box2d':{'x1':int(r['x1'])-j,
'y1':int(r['y1'])-i,
'x2':int(r['x2'])-j,
'y2':int(r['y2'])-i},
'category': 'tabacco'} for r in inside.to_dict('record')]
img_name = img_fname.split('.')[0]+'_'+str(count)+'.jpg'
annotation_name = annotation_fname.split('.')[0]+'_'+str(count)+'.json'
Image.fromarray(cropped_image).save(os.path.join(dst_img_dir, img_name))
with open(os.path.join(dst_annotation_dir, annotation_name), 'w') as f:
json.dump(cropped_annotation,f)
count += 1
count = 0
for train_images_file in os.listdir('train_images'):
annotation_file_name = os.path.splitext(train_images_file)[0]+'.json'
if count < 175:
crop_image('train_images', train_images_file, 'train_annotations', annotation_file_name, 'train_images_0', 'train_annotations_0')
else:
crop_image('train_images', train_images_file, 'train_annotations', annotation_file_name, 'val_images', 'val_annotations')
count+=1
train_data = BboxDataset('train_images_0', 'train_annotations_0', eval_categories)
val_data = BboxDataset('val_images', 'val_annotations', eval_categories)
transformed_train_data = TransformDataset(train_data, Transform(model_detector.coder, model_detector.insize, model_detector.mean))
深層学習モデルの学習¶
上記で作成した学習データにより, SSD300モデルの学習を行います.
from chainer.optimizer_hooks import WeightDecay
from chainercv.links.model.ssd import GradientScaling
from chainer import training
from chainer.training import extensions, triggers
from chainercv.extensions import DetectionVOCEvaluator
ミニバッチサイズはbachsizeで, エポック数はnum_epochsとなります. 適宜変えてみてください.
gpu = 1
batchsize = 20
num_epochs = 20
train_iter = chainer.iterators.SerialIterator(transformed_train_data, batchsize)
val_iter = chainer.iterators.SerialIterator(val_data, batchsize, repeat=False, shuffle=False)
if gpu:
gpu_id = 0
model_detector.to_gpu()
else:
gpu_id = -1
optimizer = chainer.optimizers.MomentumSGD(lr=0.0005)
optimizer.setup(detector_train_chain)
for param in detector_train_chain.params():
if param.name == 'b':
param.update_rule.add_hook(GradientScaling(2))
else:
param.update_rule.add_hook(WeightDecay(0.0001))
updater = training.updaters.StandardUpdater(train_iter, optimizer, device=gpu_id)
trainer = training.Trainer(updater, (num_epochs, 'epoch'), 'detection_results')
log_interval = 100, 'iteration'
trainer.extend(DetectionVOCEvaluator(val_iter, model_detector, use_07_metric=False,label_names=eval_categories),trigger=log_interval)
trainer.extend(extensions.LogReport(trigger=log_interval))
trainer.extend(extensions.observe_lr(), trigger=log_interval)
trainer.extend(extensions.PrintReport(['epoch', 'iteration', 'lr','main/loss', 'main/loss/loc', 'main/loss/conf','validation/main/map','elapsed_time']), trigger=log_interval)
trainer.extend(extensions.snapshot_object(model_detector, 'model_epoch_{.updater.epoch}'), trigger=(1, 'epoch'))
trainer.run()
深層学習モデルによる予測¶
棚画像データに対してたばこのパッケージの位置を上記で学習した深層学習モデルにより予測してみます.
from chainercv.utils.bbox.non_maximum_suppression import non_maximum_suppression
def detect_tabacco(model, img_dir, fname, stride = 300, crop_size=600):
img = read_image(os.path.join(img_dir, fname))
B = []
L = []
S = []
for i in range(0,img.shape[1], stride):
for j in range(0,img.shape[2], stride):
img_cropped = img[:,i:i+crop_size,j:j+crop_size]
bboxes, labels, scores = model.predict([img_cropped])
B.append(bboxes[0]+[i,j,i,j])
L.append(labels[0])
S.append(scores[0])
B = np.concatenate(B)
L = np.concatenate(L)
S = np.concatenate(S)
r = non_maximum_suppression(bbox = B, thresh = 0.1,score = S)
return B[r], L[r], S[r]
学習した深層学習モデルを読み込みます.
chainer.serializers.load_npz(os.path.join('detection_results', 'model_epoch_{}'.format(20)), model_detector)
評価用データの'test_015.jpg'に対して予測を行います.
test_img_dir = 'test_images_1'
fname = 'test_015.jpg'
bboxes, labels, scores = detect_tabacco(model_detector, test_img_dir, fname)
img = read_image(os.path.join(test_img_dir, fname))
vis_bbox(img, bboxes, labels, scores, label_names=eval_categories)
plt.show()
パッケージの検出はそれなりに精度よくできるようです.
同一判別¶
深層学習モデルのインスタンス化¶
2つの画像データ(マスター画像データと棚画像データにおいて領域検出されたデータ)を渡してそれらの類似度(または距離)を出力する深層学習モデルを構築します. ここではSiameseNetwork[2]を用います. contrasitive lossにより学習するパターンとtriplet lossにより学習するパターンを試みます.
import chainer.links as L
import chainer.functions as F
class FCN(chainer.Chain):
def __init__(self):
super(FCN, self).__init__()
with self.init_scope():
self.conv1 = L.Convolution2D(None, 32, 3, stride=1)
self.bn1 = L.BatchNormalization(32)
self.conv2 = L.Convolution2D(None, 32, 3, stride=1)
self.bn2 = L.BatchNormalization(32)
self.conv3 = L.Convolution2D(None, 64, 3, stride=1)
self.bn3 = L.BatchNormalization(64)
self.conv4 = L.Convolution2D(None, 64, 3, stride=1)
self.bn4 = L.BatchNormalization(64)
self.conv5 = L.Convolution2D(None, 128, 3, stride=1)
self.bn5 = L.BatchNormalization(128)
self.conv6 = L.Convolution2D(None, 128, 3, stride=1)
self.bn6 = L.BatchNormalization(128)
self.conv7 = L.Convolution2D(None, 256, 3, stride=1)
self.bn7 = L.BatchNormalization(256)
self.conv8 = L.Convolution2D(None, 256, 3, stride=1)
self.bn8 = L.BatchNormalization(256)
self.conv9 = L.Convolution2D(None, 512, 3, stride=1)
self.bn9 = L.BatchNormalization(512)
def __call__(self, x):
h = F.relu(self.bn1(self.conv1(x)))
h = F.relu(self.bn2(self.conv2(h)))
h = F.max_pooling_2d(h, 2, stride=2)
h = F.dropout(h, ratio=0.25)
h = F.relu(self.bn3(self.conv3(h)))
h = F.relu(self.bn4(self.conv4(h)))
h = F.max_pooling_2d(h, 2, stride=2)
h = F.dropout(h, ratio=0.25)
h = F.relu(self.bn5(self.conv5(h)))
h = F.relu(self.bn6(self.conv6(h)))
h = F.max_pooling_2d(h, 2, stride=2)
h = F.dropout(h, ratio=0.25)
h = F.relu(self.bn7(self.conv7(h)))
h = F.relu(self.bn8(self.conv8(h)))
h = F.max_pooling_2d(h, 2, stride=2)
h = F.dropout(h, ratio=0.25)
h = F.relu(self.bn9(self.conv9(h)))
h = F.average_pooling_2d(h, h.data.shape[2:], stride=1)
y = F.reshape(h, h.data.shape[:2])
return y
class SiameseTrainChain(chainer.Chain):
def __init__(self, model, train):
super(SiameseTrainChain, self).__init__()
with self.init_scope():
self.model = model
self.train = train
def __call__(self, imgs_0, labels_0, imgs_1, labels_1):
with chainer.using_config('enable_backprop', self.train):
with chainer.using_config('train', self.train):
y_0 = self.model(imgs_0)
y_1 = self.model(imgs_1)
labels = (labels_0 == labels_1).astype(np.int32)
loss = F.contrastive(y_0, y_1, labels)
chainer.reporter.report({'loss': loss}, self)
return loss
class TripletTrainChain(chainer.Chain):
def __init__(self, model, train):
super(TripletTrainChain, self).__init__()
with self.init_scope():
self.model = model
self.train = train
def __call__(self, imgs_0, imgs_1, imgs_2):
with chainer.using_config('enable_backprop', self.train):
with chainer.using_config('train', self.train):
y_0 = self.model(imgs_0)
y_1 = self.model(imgs_1)
y_2 = self.model(imgs_2)
loss = F.triplet(y_0, y_1, y_2)
chainer.reporter.report({'loss': loss}, self)
return loss
fcn_siamese = FCN()
fcn_triplet = FCN()
contrastive lossによる学習モデルを構築します.
siamese_train_chain = SiameseTrainChain(fcn_siamese, train=True)
siamese_val_chain = SiameseTrainChain(fcn_siamese, train=False)
triplet lossによる学習モデルを構築します.
triplet_train_chain = TripletTrainChain(fcn_triplet, train=True)
triplet_val_chain = TripletTrainChain(fcn_triplet, train=False)
入力用データ作成¶
棚画像データに対してパッケージ部分を切り出し, カテゴリ別にその切り出された画像データを格納していきます.
マスター画像データに対して上記で学習した領域検出モデルによってパッケージ部分を切り出して同様にカテゴリ別に画像データを格納していきます.
import os
import numpy as np
import json
from PIL import Image
_train_annotations_path = 'train_annotations'
_train_images_path = 'train_images'
_master_images_path = 'master_images'
_train_images_1_path = 'train_images_1'
_val_images_1_path = 'val_images_1'
_train_images_2_path = 'train_images_2'
os.makedirs(_train_images_1_path, exist_ok=True)
os.makedirs(_val_images_1_path, exist_ok=True)
os.makedirs(_train_images_2_path, exist_ok=True)
counts = {}
for img_name in os.listdir(_train_images_path):
print(img_name)
ann_name = img_name.split('.')[0]+'.json'
with open(os.path.join(_train_annotations_path, ann_name)) as f:
ann = json.load(f)
img = Image.open(os.path.join(_train_images_path, img_name))
for l in ann['labels']:
t = np.random.binomial(1,0.9)
box2d = l['box2d']
category = l['category']
if (32 <= box2d['x2'] - box2d['x1']) and (32 <= box2d['y2'] - box2d['y1']):
cropped_img = img.crop((box2d['x1'],box2d['y1'],box2d['x2'],box2d['y2']))
if t:
if not os.path.exists(os.path.join(_train_images_1_path, str(category))):
os.mkdir(os.path.join(_train_images_1_path, str(category)))
counts[category] = 0
cropped_img.save(os.path.join(_train_images_1_path, str(category), img_name.split('.')[0]+'_'+str(counts[category])+'.jpg'))
else:
if not os.path.exists(os.path.join(_val_images_1_path, str(category))):
os.mkdir(os.path.join(_val_images_1_path, str(category)))
counts[category] = 0
cropped_img.save(os.path.join(_val_images_1_path, str(category), img_name.split('.')[0]+'_'+str(counts[category])+'.jpg'))
counts[category]+=1
else:
print('invalid bb')
eval_categories = ('tabacco',)
model_detector = SSD300(n_fg_class=len(eval_categories), pretrained_model = 'imagenet')
model_detector.score_thresh = 0.4
chainer.serializers.load_npz(os.path.join('detection_results', 'model_epoch_{}'.format(20)), model_detector)
if gpu:
model_detector.to_gpu()
for master_img_path in os.listdir(_master_images_path):
print(master_img_path)
category = master_img_path.split('.')[0]
master_img = read_image(os.path.join(_master_images_path, master_img_path))
bbox, _, _ = model_detector.predict([master_img])
if len(bbox[0]):
y1, x1, y2, x2 = bbox[0][0]
y1, x1, y2, x2 = int(y1), int(x1), int(y2), int(x2)
h = y2-y1
w = x2-x1
cropped_img = Image.fromarray(master_img[:,max(y1-100,0):max(y1-100,0)+int(h*0.7),max(x1-100,0):max(x1-100,0)+w+100].transpose((1,2,0)).astype(np.uint8))
if not os.path.exists(os.path.join(_train_images_2_path, category)):
os.mkdir(os.path.join(_train_images_2_path, category))
cropped_img.save(os.path.join(_train_images_2_path, category, 'train_0.jpg'))
else:
print('undetected')
Image.fromarray(master_img.transpose((1,2,0)).astype(np.uint8)).save(os.path.join(_train_images_2_path, category, 'train_0.jpg'))
from chainercv.datasets import DirectoryParsingLabelDataset, SiameseDataset, TransformDataset, directory_parsing_label_names
from chainercv import transforms
class Transform():
def __init__(self, size, train):
self.size = size
self.train = train
def __call__(self, in_data):
img, label = in_data
if self.train:
# Color augmentation
img = transforms.pca_lighting(img, 25.5)
# Random sized crop
img = transforms.random_sized_crop(img)
# Random horizontal flipping
img = transforms.random_flip(img, x_random=True)
# Resize
img = transforms.resize(img, (self.size, self.size))
return img, label
class TripletDataset(GetterDataset):
def __init__(self, anchor_dir, ref_dir):
super(TripletDataset, self).__init__()
self.anchor_dataset = DirectoryParsingLabelDataset(anchor_dir)
self.ref_dataset = DirectoryParsingLabelDataset(ref_dir)
self.anchor_label_names = np.array(directory_parsing_label_names(anchor_dir))
self.ref_label_names = np.array(directory_parsing_label_names(ref_dir))
self.ref_label_dist = {label_name: list(np.where(self.ref_dataset.labels==i)[0]) for i, label_name in enumerate(self.ref_label_names)}
self.add_getter(('anchor', 'positive', 'negative'), self.get_data)
def __len__(self):
return len(self.anchor_dataset)
def enable_transform(self, transform_function):
self.anchor_dataset = TransformDataset(self.anchor_dataset, transform_function)
self.ref_dataset = TransformDataset(self.ref_dataset, transform_function)
def get_data(self, i):
anchor, anchor_label = self.anchor_dataset[i]
anchor_label_name = self.anchor_label_names[int(anchor_label)]
# get positive sample from ref
ref_label_positive_idx = np.random.choice(self.ref_label_dist[anchor_label_name])
positive, positive_label = self.ref_dataset[ref_label_positive_idx]
# get negative sample from ref
ref_label_name_negative = np.random.choice(self.ref_label_names[np.where(self.ref_label_names != anchor_label_name)])
ref_label_negative_idx = np.random.choice(self.ref_label_dist[ref_label_name_negative])
negative, negative_label = self.ref_dataset[ref_label_negative_idx]
return anchor, positive, negative
dataset_1 = DirectoryParsingLabelDataset(_train_images_1_path)
dataset_2 = DirectoryParsingLabelDataset(_train_images_2_path)
dataset_3 = DirectoryParsingLabelDataset(_val_images_1_path)
transformed_train_dataset_1 = TransformDataset(dataset_1, Transform(128, 1))
transformed_train_dataset_2 = TransformDataset(dataset_2, Transform(128, 1))
transformed_val_dataset_1 = TransformDataset(dataset_3, Transform(128, 0))
transformed_val_dataset_2 = TransformDataset(dataset_2, Transform(128, 0))
siamese_train_dataset = SiameseDataset(transformed_train_dataset_1, transformed_train_dataset_1, pos_ratio=0.06)
siamese_val_dataset = SiameseDataset(transformed_val_dataset_1, transformed_val_dataset_1, pos_ratio=0.06)
triplet_train_dataset = TripletDataset(_train_images_1_path, _train_images_1_path)
triplet_train_dataset.enable_transform(Transform(128, 1))
triplet_val_dataset = TripletDataset(_val_images_1_path, _val_images_1_path)
triplet_val_dataset.enable_transform(Transform(128, 0))
深層学習モデルの学習¶
上記で作成した学習データにより, contrastive lossによるSiameseNetworkモデルの学習を行います.
from chainer import training
from chainer.training import extensions, triggers
ミニバッチサイズはbachsizeで, エポック数はnum_epochsとなります. 適宜変えてみてください.
gpu = 1
batchsize = 50
num_epochs = 100
train_iter = chainer.iterators.SerialIterator(siamese_train_dataset, batchsize)
val_iter = chainer.iterators.SerialIterator(siamese_val_dataset, batchsize, repeat=False, shuffle=False)
if gpu:
gpu_id = 0
siamese_train_chain.to_gpu()
else:
gpu_id = -1
optimizer = chainer.optimizers.MomentumSGD(lr=0.01)
optimizer.setup(siamese_train_chain)
updater = training.updaters.StandardUpdater(train_iter, optimizer, device=gpu_id)
trainer = training.Trainer(updater, (num_epochs, 'epoch'), 'discrimination_siamese_results')
log_interval = 100, 'iteration'
trainer.extend(extensions.Evaluator(val_iter,siamese_val_chain, device=gpu_id))
trainer.extend(extensions.LogReport(trigger=log_interval))
trainer.extend(extensions.observe_lr(), trigger=log_interval)
trainer.extend(extensions.PrintReport(['epoch', 'iteration', 'lr','main/loss','validation/main/loss', 'elapsed_time']), trigger=log_interval)
trainer.extend(extensions.snapshot_object(fcn_siamese, 'model_epoch_{.updater.epoch}'), trigger=(1, 'epoch'))
trainer.run()
triplet lossによる学習を行います. ミニバッチサイズはbachsizeで, エポック数はnum_epochsとなります. 適宜変えてみてください.
gpu = 1
batchsize = 50
num_epochs = 100
train_iter = chainer.iterators.SerialIterator(triplet_train_dataset, batchsize)
val_iter = chainer.iterators.SerialIterator(triplet_val_dataset, batchsize, repeat=False, shuffle=False)
if gpu:
gpu_id = 0
triplet_train_chain.to_gpu()
else:
gpu_id = -1
optimizer = chainer.optimizers.MomentumSGD(lr=0.01)
optimizer.setup(triplet_train_chain)
updater = training.updaters.StandardUpdater(train_iter, optimizer, device=gpu_id)
trainer = training.Trainer(updater, (num_epochs, 'epoch'), 'discrimination_triplet_results')
log_interval = 100, 'iteration'
trainer.extend(extensions.Evaluator(val_iter, triplet_val_chain, device=gpu_id))
trainer.extend(extensions.LogReport(trigger=log_interval))
trainer.extend(extensions.observe_lr(), trigger=log_interval)
trainer.extend(extensions.PrintReport(['epoch', 'iteration', 'lr','main/loss','validation/main/loss', 'elapsed_time']), trigger=log_interval)
trainer.extend(extensions.snapshot_object(fcn_triplet, 'model_epoch_{.updater.epoch}'), trigger=(1, 'epoch'))
trainer.run()
深層学習モデルによる予測¶
def predict_category(fcn, transformed_val_dataset_1, transformed_val_dataset_2, gpu):
label_names = directory_parsing_label_names(_val_images_1_path)
master_label_names = directory_parsing_label_names(_train_images_2_path)
if gpu:
import cupy as cp
xp = cp
fcn.to_gpu()
else:
xp = np
results = {}
master_features = []
master_names = []
for master_img, master_label in transformed_val_dataset_2:
master_name = master_label_names[master_label]
master_img_input = xp.array([master_img])
with chainer.using_config('enable_backprop', False):
with chainer.using_config('train', False):
y_0 = fcn(master_img_input)
master_features.append(cp.asnumpy(y_0.data))
master_names.append(master_name)
master_features = np.concatenate(master_features)
count_right=0
count = 0
for img, label in transformed_val_dataset_1:
label_name = label_names[label]
results[count]={}
results[count]['ans_label'] = label_name
img_input = xp.array([img])
with chainer.using_config('enable_backprop', False):
with chainer.using_config('train', False):
y = fcn(img_input)
distances = np.sum((master_features-cp.asnumpy(y.data))**2, axis=1)
if results[count]['ans_label']==master_names[distances.argmin()]:
count_right+=1
print(count, results[count]['ans_label'], master_names[distances.argmin()], distances.min())
count+=1
print(count_right, '/', len(transformed_val_dataset_1))
上記で学習したcontrastive lossによる深層学習モデルを読み込みます.
fcn = FCN()
chainer.serializers.load_npz(os.path.join('discrimination_siamese_results', 'model_epoch_{}'.format(100)), fcn)
パッケージ領域に対してマスタ画像との"距離"を出力し, 一番近いマスタ画像の銘柄を割り当てます.
predict_category(fcn, transformed_val_dataset_1, transformed_val_dataset_2, gpu=1)
銘柄の同定は難しいようです.
triplet lossにより学習した深層学習モデルを読み込みます.
fcn = FCN()
chainer.serializers.load_npz(os.path.join('discrimination_triplet_results', 'model_epoch_{}'.format(100)), fcn)
パッケージ領域に対してマスタ画像との"距離"を出力し, 一番近いマスタ画像の銘柄を割り当てます.
predict_category(fcn, transformed_val_dataset_1, transformed_val_dataset_2, gpu=1)
予測結果の出力¶
パイプライン全体を通してみます. すなわち棚画像データからパッケージの位置を検出し, それぞれのパッケージ領域に対して銘柄を割り当ててみます.
import cupy as cp
def predict(test_img_dir, master_img_dir, model_detector, model_detector_path, model_discriminator, model_discriminator_path, gpu=1):
model_detector.score_thresh = 0.6
chainer.serializers.load_npz(model_detector_path, model_detector)
chainer.serializers.load_npz(model_discriminator_path, model_discriminator)
if gpu:
model_detector.to_gpu()
model_discriminator.to_gpu()
xp = cp
else:
xp = np
# compute master image features
master_label_names = directory_parsing_label_names(master_img_dir)
dataset_master = DirectoryParsingLabelDataset(master_img_dir)
transformed_dataset_master = TransformDataset(dataset_master, Transform(128, 0))
result = {}
master_features = []
master_names = []
for master_img, master_label in transformed_dataset_master:
master_name = master_label_names[master_label]
master_img_input = xp.array([master_img])
with chainer.using_config('enable_backprop', False):
with chainer.using_config('train', False):
y_0 = model_discriminator(master_img_input)
master_features.append(cp.asnumpy(y_0.data))
master_names.append(master_name)
master_features = np.concatenate(master_features)
results = {}
for fname in os.listdir(test_img_dir):
print(fname)
results[fname] = {}
# detect packages
bboxes, labels, scores = detect_tabacco(model_detector, test_img_dir, fname)
# compute distances between master images and the detected packages and predict the category
img = read_image(os.path.join(test_img_dir, fname))
for bbox in bboxes:
if (bbox[3]-bbox[1])>32 and (bbox[2]-bbox[0])>32:
# transform the cropped image for testing
img_cropped = transforms.resize(img[:,max(int(bbox[0]),0):int(bbox[2])-1,max(int(bbox[1]),0):int(bbox[3])-1], (128,128))
img_cropped = xp.array([img_cropped])
with chainer.using_config('enable_backprop', False):
with chainer.using_config('train', False):
y = model_discriminator(img_cropped)
distances = np.sum((master_features-cp.asnumpy(y.data))**2, axis=1)
pred_category = master_names[distances.argmin()]
if pred_category not in result:
results[fname][pred_category] = []
results[fname][pred_category].append([int(bbox[1]), int(bbox[0]), int(bbox[3]), int(bbox[2])])
return results
model_detector = SSD300(n_fg_class=len(eval_categories), pretrained_model = 'imagenet')
model_detector_path = os.path.join('detection_results', 'model_epoch_{}'.format(20))
model_discriminator = FCN()
model_discriminator_path = os.path.join('discrimination_siamese_results', 'model_epoch_{}'.format(100))
results = predict(test_img_dir = test_img_dir, master_img_dir = _train_images_2_path,
model_detector = model_detector, model_detector_path = model_detector_path,
model_discriminator = model_discriminator, model_discriminator_path = model_discriminator_path)
with open('results.json', 'w') as f:
json.dump(results, f)
'results.json'が予測結果ファイルとなります.
まとめ¶
今回は棚画像データからたばこのパッケージの位置を検出し, それぞれの領域においてマスター画像との比較によって具体的な銘柄を予測する一連のモデリングを行ってみました. 領域検出は容易でしたが, 銘柄の同定は難しいのがわかりました. 特に同一判別の部分において検討する必要があるようです. 皆様の応募お待ちしております.
参考文献¶
