#!/usr/bin/env python
"""
Script to prepare iNaturalist image files based on
input parameters
"""
import argparse
import os
import random as rng
import shutil
from shutil import rmtree
from typing import Dict, List, Set, Union
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
from scripts.constants import (CLASS_MAP, CLASS_RANGES, COLEOPTERA_STR,
HEMIPTERA_STR, HYMENOPTERA_STR, LEPIDOPTERA_STR,
ORDONATA_STR)
from scripts.helpers import ProgressBar, ThreadWithReturnValue, decision
## Holds available ID ranges
USED_ID_RANGES = {}
[docs]def get_n_random_elements(container: List[Union[str, int]], num_elements: int) -> Set[Union[str, int]]:
"""
Getter for `num_elements` random elements out of `container`.
:param container: the container to extract the elements from
:param num_elements: number of random elements to extract
:return: list of `num_elements` random elements out of `container`
"""
output = set()
while len(output) < num_elements:
output |= {rng.choice(container)}
return output
[docs]def init_random_dataset_ids(num_elements: int) -> List[str]:
"""
:param num_elements:
:return:
"""
coleoptera_choices = get_n_random_elements(USED_ID_RANGES.get(COLEOPTERA_STR), num_elements)
hymenoptera_choices = get_n_random_elements(USED_ID_RANGES.get(HYMENOPTERA_STR), num_elements)
lepidoptera_choices = get_n_random_elements(USED_ID_RANGES.get(LEPIDOPTERA_STR), num_elements)
ordonata_choices = get_n_random_elements(USED_ID_RANGES.get(ORDONATA_STR), num_elements)
hemiptera_choices = get_n_random_elements(USED_ID_RANGES.get(HEMIPTERA_STR), num_elements)
return [
str(i).rjust(5, '0')
for i in coleoptera_choices | hymenoptera_choices | lepidoptera_choices | ordonata_choices | hemiptera_choices
]
[docs]def calculate_stats(combined_list: List[Union[str, int]]) -> None:
"""
:param combined_list:
:return:
"""
stats = dict.fromkeys(CLASS_MAP.keys(), 0)
for elem in set(combined_list):
x = int(elem)
for key, value in USED_ID_RANGES.items():
stats[key] += x in value
print('Distribution of Species per Order:')
print(stats)
plt.title('Distribution of Species per Order in base set')
plt.bar(stats.keys(), stats.values())
plt.savefig('../base-set-distribution.eps', bbox_inches='tight', pad_inches=0)
plt.savefig('../base-set-distribution.png', bbox_inches='tight', pad_inches=0)
plt.show()
[docs]def decide_image(img_path: str) -> bool:
"""
:param img_path:
:return:
"""
img = Image.open(img_path)
plt.imshow(img)
plt.axis('off')
plt.show()
dec = decision('Do you want to use it?')
plt.close()
return dec
[docs]def get_directory_by_prefix(prefix: str, base_dir: str) -> str:
"""
:param prefix:
:param base_dir:
:return:
"""
files = list(filter(lambda x: prefix in x, os.listdir(base_dir)))
if len(files) < 1:
raise FileNotFoundError(f'Can\'t find {prefix}')
return base_dir + files[0]
[docs]def process_directory(dir_prefix: str, base_dir: str, out_dir: str, requires_decisions: bool = False,
num_samples: int = 30) -> List[str]:
"""
Copies `num_samples` files from input to target directory.
Allows to require approval for each sample by adding `requires_decision=True`
:param dir_prefix: prefix of source/input directory, e.g. 00420
:param base_dir: root directory of source
:param out_dir: target/output directory
:param requires_decisions: if `True` asks for approval before copying each sample
:param num_samples: required amount of samples
:return: List of new file locations
"""
directory = get_directory_by_prefix(dir_prefix, base_dir)
files = os.listdir(directory)
files = filter(lambda x: '.jpg' in x, files)
files = np.array([f'{directory}/{f}' for f in files])
assert len(files) >= num_samples, f'Cannot proceed, source directory "{directory}" does not contain enough samples.'
np.random.shuffle(files)
stored_files = []
# !!! replace=False for unique elements!
choices = np.random.choice(files, num_samples, replace=False)
for index, choice in enumerate(choices):
file_path = choice
file_name = file_path.split('/')[-1]
taxonomy = file_path.split('/')[-2].split('_')
out_name = f'{taxonomy[0]}_{taxonomy[4]}_{taxonomy[5]}_{file_name}'
if not requires_decisions or decide_image(file_path):
output_path = f'{out_dir}/{out_name}'
shutil.copy(file_path, output_path)
stored_files.append(output_path)
return stored_files
[docs]def create_local_copy(combined_list: List[Union[str, int]], base_dir: str, storage_dir: str, requires_decisions: bool, num_elements: int) -> List[str]:
"""
Creates local copy of `num_elements` samples per record in `combined_list` from samples in `base_dir` into `storage_dir`.
Uses multi-threading.
:param combined_list: List of order directories
:param base_dir: parent directory of input files
:param storage_dir: storage directory to create copy in
:param requires_decisions: if `True` asks for approval before adding a sample
:param num_elements: required amount of samples per order
:return: List of new file locations
"""
print(f'\rStart processing file directories.')
file_list = []
list_len = len(combined_list)
max_running_threads = 10
number_iterations = int(np.ceil(list_len / max_running_threads))
element_index = 0
pb = ProgressBar(number_iterations)
for i in range(number_iterations):
threads = []
for thread_id in range(max_running_threads):
if element_index >= list_len:
break
threads.append(
ThreadWithReturnValue(
target=process_directory,
args=(combined_list[element_index], base_dir),
kwargs=dict(
out_dir=storage_dir,
requires_decisions=requires_decisions,
num_samples=num_elements
)
)
)
element_index += 1
for thread in threads:
thread.start()
for thread in threads:
file_list.extend(thread.join())
pb.step(i)
pb.done()
print('Done processing.')
return file_list
[docs]def process_directories(
combined_list: List[Union[str, int]], base_dir: str, out_dir: str = '../data/iNat',
requires_decisions: bool = False, num_elements: int = 30, move_files_only: bool = False) -> None:
"""
Method to generate a local copy of the data set.
:param combined_list: List of sample directories
:param base_dir: root directory of source files
:param out_dir: target directory
:param requires_decisions: if `True` requires approval before adding a sample
:param num_elements: required number of samples per order/species
:param move_files_only: if `True` moves
:return:
"""
# clean target directory
if not move_files_only:
rmtree(out_dir, ignore_errors=True)
storage_dir = os.path.join(out_dir, 'storage')
directories = [
storage_dir
]
for directory in directories:
os.makedirs(directory, exist_ok=True)
create_local_copy(combined_list, base_dir, storage_dir, requires_decisions, num_elements)
print(f'\rDone moving files.')
[docs]def test_dataset(combined_list: List[Union[str, int]], base_dir: str, expected_num_elements: int) -> None:
"""
Verifies the dataset for consistency/integrity
:param combined_list:
:param base_dir:
:param expected_num_elements:
:return:
"""
generated_files = sorted(os.listdir(os.path.join(base_dir, 'storage')))
elements = {}
print('Testing for integrity of the dataset...')
for file in generated_files:
f = file.split('_')
order_id = f[1]
if elements.get(order_id):
elements[order_id].append(f[-1])
else:
elements[order_id] = [f[-1]]
count = 0
for key, values in elements.items():
count += len(values)
if count != expected_num_elements:
print(f'{expected_num_elements - count} Missing')
print(f'Total amount of genera: {len(set(combined_list))}')
[docs]def get_id_range_for_search_term(file_list: List[str], search_terms: List[str]) -> List[int]:
"""
Filters given file name list by a list of search terms
:param file_list: list of file names
:param search_terms: list of search terms to search for
:return: list of ids matching the search terms
"""
return [int(i.split('_')[0]) for i in filter(lambda x: all([term in x for term in search_terms]), file_list)]
if __name__ == '__main__':
# Initiate the parser
parser = argparse.ArgumentParser()
parser.add_argument('input_directory', help='name of input directory to use input files from')
parser.add_argument('output_directory', help='name of output directory to store files in')
parser.add_argument('-r', dest='reuse_ids', action='store_true', help='reuse predefined id ranges')
parser.add_argument('-rng', dest='rng_samples', action='store_true', help='Chose random samples')
parser.add_argument('-g', '--genera', dest='num_elements', default=10, help='Number of samples per genera')
parser.add_argument('-s', '--species', dest='num_samples', default=10, help='Number of samples per species')
parser.add_argument('--seed', default=42, help='A seed to use for underlying random number generators')
parser.add_argument('--scan-only', dest='scan', action='store_true',
help='Display input directory information then exit')
parser.add_argument('--test-only', dest='test', action='store_true',
help='Test output directory then exit')
parser.add_argument('--move-only', dest='move_only', action='store_true',
help='Only spread objects from nested "tmp" directory')
args = parser.parse_args()
number_elements = int(args.num_elements)
number_samples = int(args.num_samples)
root_dir = args.input_directory
output_dir = args.output_directory
reuse_ids = args.reuse_ids
use_rng_samples = args.rng_samples
seed = int(args.seed)
scan = args.scan
test = args.test
move_only = args.move_only
# set seeds
rng.seed(seed)
np.random.seed(seed)
if scan:
scan_input_directory(root_dir)
exit(0)
# use predefined id ranges, or scrape the root directory
USED_ID_RANGES = CLASS_RANGES.copy()
if not reuse_ids:
USED_ID_RANGES = get_id_ranges_from_input_directory(root_dir)
# extract random file directory names
random_list = init_random_dataset_ids(number_elements)
rng.shuffle(random_list)
calculate_stats(random_list)
# generate file structure
process_directories(
random_list,
root_dir,
out_dir=output_dir,
requires_decisions=not use_rng_samples,
num_elements=number_samples,
move_files_only=move_only
)
# test the output
ds_size = len(random_list) * number_samples
test_dataset(random_list, output_dir, ds_size)