From 5fe4e6bbaefd34c2adfa2ee98767cf1082838c12 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 11 Mar 2022 17:38:22 +0000 Subject: [PATCH 01/11] work in progress --- nucleus/dataset_item_uploader.py | 243 ++++++++----------------------- nucleus/retry_strategy.py | 2 +- 2 files changed, 64 insertions(+), 181 deletions(-) diff --git a/nucleus/dataset_item_uploader.py b/nucleus/dataset_item_uploader.py index e2e33fca..adf7e343 100644 --- a/nucleus/dataset_item_uploader.py +++ b/nucleus/dataset_item_uploader.py @@ -1,25 +1,26 @@ -import asyncio import json import os -import time -from typing import TYPE_CHECKING, Any, List - -import aiohttp -import nest_asyncio +from typing import ( + TYPE_CHECKING, + Any, + BinaryIO, + Callable, + List, + Sequence, + Tuple, +) -from .constants import ( - DATASET_ID_KEY, - DEFAULT_NETWORK_TIMEOUT_SEC, - IMAGE_KEY, - IMAGE_URL_KEY, - ITEMS_KEY, - UPDATE_KEY, +from nucleus.async_utils import ( + FileFormData, + FileFormField, + FormDataContextHandler, + make_many_form_data_requests_concurrently, ) + +from .constants import DATASET_ID_KEY, IMAGE_KEY, ITEMS_KEY, UPDATE_KEY from .dataset_item import DatasetItem from .errors import NotFoundError -from .logger import logger from .payload_constructor import construct_append_payload -from .retry_strategy import RetryStrategy from .upload_response import UploadResponse if TYPE_CHECKING: @@ -79,9 +80,8 @@ def upload( ) for batch in tqdm_local_batches: - payload = construct_append_payload(batch, update) responses = self._process_append_requests_local( - self.dataset_id, payload, update + self.dataset_id, items=batch, update=update ) async_responses.extend(responses) @@ -90,10 +90,9 @@ def upload( remote_batches, desc="Remote file batches" ) for batch in tqdm_remote_batches: - payload = construct_append_payload(batch, update) responses = self._process_append_requests( dataset_id=self.dataset_id, - payload=payload, + payload=construct_append_payload(batch, update), update=update, batch_size=batch_size, ) @@ -107,173 +106,24 @@ def upload( def _process_append_requests_local( self, dataset_id: str, - payload: dict, - update: bool, # TODO: understand how to pass this in. + items: Sequence[DatasetItem], + update: bool, local_batch_size: int = 10, ): - def get_files(batch): - for item in batch: - item[UPDATE_KEY] = update - request_payload = [ - ( - ITEMS_KEY, - ( - None, - json.dumps(batch, allow_nan=False), - "application/json", - ), - ) - ] - for item in batch: - image = open( # pylint: disable=R1732 - item.get(IMAGE_URL_KEY), "rb" # pylint: disable=R1732 - ) # pylint: disable=R1732 - img_name = os.path.basename(image.name) - img_type = ( - f"image/{os.path.splitext(image.name)[1].strip('.')}" - ) - request_payload.append( - (IMAGE_KEY, (img_name, image, img_type)) - ) - return request_payload - - items = payload[ITEMS_KEY] - responses: List[Any] = [] - files_per_request = [] - payload_items = [] + requests = [] for i in range(0, len(items), local_batch_size): batch = items[i : i + local_batch_size] - files_per_request.append(get_files(batch)) - payload_items.append(batch) + request = FormDataContextHandler( + self.get_form_data_and_file_pointers_fn(batch, update) + ) + requests.append(request) - future = self.make_many_files_requests_asynchronously( - files_per_request, + return make_many_form_data_requests_concurrently( + self._client, + requests, f"dataset/{dataset_id}/append", ) - try: - loop = asyncio.get_event_loop() - except RuntimeError: # no event loop running: - loop = asyncio.new_event_loop() - responses = loop.run_until_complete(future) - else: - nest_asyncio.apply(loop) - return loop.run_until_complete(future) - - def close_files(request_items): - for item in request_items: - # file buffer in location [1][1] - if item[0] == IMAGE_KEY: - item[1][1].close() - - # don't forget to close all open files - for p in files_per_request: - close_files(p) - - return responses - - async def make_many_files_requests_asynchronously( - self, files_per_request, route - ): - """ - Makes an async post request with files to a Nucleus endpoint. - - :param files_per_request: A list of lists of tuples (name, (filename, file_pointer, content_type)) - name will become the name by which the multer can build an array. - :param route: route for the request - :return: awaitable list(response) - """ - async with aiohttp.ClientSession() as session: - tasks = [ - asyncio.ensure_future( - self._make_files_request( - files=files, route=route, session=session - ) - ) - for files in files_per_request - ] - return await asyncio.gather(*tasks) - - async def _make_files_request( - self, - files, - route: str, - session: aiohttp.ClientSession, - retry_attempt=0, - max_retries=3, - sleep_intervals=(1, 3, 9), - ): - """ - Makes an async post request with files to a Nucleus endpoint. - - :param files: A list of tuples (name, (filename, file_pointer, file_type)) - :param route: route for the request - :param session: Session to use for post. - :return: response - """ - endpoint = f"{self._client.endpoint}/{route}" - - logger.info("Posting to %s", endpoint) - - form = aiohttp.FormData() - - for file in files: - form.add_field( - name=file[0], - filename=file[1][0], - value=file[1][1], - content_type=file[1][2], - ) - - for sleep_time in RetryStrategy.sleep_times + [-1]: - - async with session.post( - endpoint, - data=form, - auth=aiohttp.BasicAuth(self._client.api_key, ""), - timeout=DEFAULT_NETWORK_TIMEOUT_SEC, - ) as response: - logger.info( - "API request has response code %s", response.status - ) - - try: - data = await response.json() - except aiohttp.client_exceptions.ContentTypeError: - # In case of 404, the server returns text - data = await response.text() - if ( - response.status in RetryStrategy.statuses - and sleep_time != -1 - ): - time.sleep(sleep_time) - continue - - if not response.ok: - if retry_attempt < max_retries: - time.sleep(sleep_intervals[retry_attempt]) - retry_attempt += 1 - return self._make_files_request( - files, - route, - session, - retry_attempt, - max_retries, - sleep_intervals, - ) - else: - self._client.handle_bad_response( - endpoint, - session.post, - aiohttp_response=( - response.status, - response.reason, - data, - ), - ) - - return data - def _process_append_requests( self, dataset_id: str, @@ -283,11 +133,9 @@ def _process_append_requests( ): items = payload[ITEMS_KEY] payloads = [ - # batch_size images per request {ITEMS_KEY: items[i : i + batch_size], UPDATE_KEY: update} for i in range(0, len(items), batch_size) ] - return [ self._client.make_request( payload, @@ -295,3 +143,38 @@ def _process_append_requests( ) for payload in payloads ] + + def get_form_data_and_file_pointers_fn( + self, items: Sequence[DatasetItem], update: bool + ) -> Callable[..., Tuple[FileFormData, Sequence[BinaryIO]]]: + """Constructs a function that will generate form data on each retry.""" + + def fn(): + payload = construct_append_payload(items, update) + form_data = [ + FileFormField( + name=ITEMS_KEY, + filename=None, + value=json.dumps(payload, allow_nan=False), + content_type="application/json", + ) + ] + + file_pointers = [] + for item in items: + image_fp = open( # pylint: disable=R1732 + item.image_location, "rb" # pylint: disable=R1732 + ) # pylint: disable=R1732 + img_type = f"image/{os.path.splitext(item.image_location)[1].strip('.')}" + form_data.append( + FileFormField( + name=IMAGE_KEY, + filename=item.image_location, + value=image_fp, + content_type=img_type, + ) + ) + file_pointers.append(image_fp) + return form_data, file_pointers + + return fn diff --git a/nucleus/retry_strategy.py b/nucleus/retry_strategy.py index eabc1309..6ca64a7e 100644 --- a/nucleus/retry_strategy.py +++ b/nucleus/retry_strategy.py @@ -1,4 +1,4 @@ # TODO: use retry library instead of custom code. Tenacity is one option. class RetryStrategy: statuses = {503, 524, 520, 504} - sleep_times = [1, 3, 9] + sleep_times = [1, 3, 9, 27] # These are in seconds From cd942e4fb790634eac7c701e3a50efb398a2942a Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 11 Mar 2022 17:39:20 +0000 Subject: [PATCH 02/11] work in progress --- nucleus/async_utils.py | 192 +++++++++++++++++++++++++++++++ nucleus/segmentation_uploader.py | 114 ++++++++++++++++++ 2 files changed, 306 insertions(+) create mode 100644 nucleus/async_utils.py create mode 100644 nucleus/segmentation_uploader.py diff --git a/nucleus/async_utils.py b/nucleus/async_utils.py new file mode 100644 index 00000000..9c8c196f --- /dev/null +++ b/nucleus/async_utils.py @@ -0,0 +1,192 @@ +import asyncio +import time +from dataclasses import dataclass +from typing import TYPE_CHECKING, BinaryIO, Callable, Sequence, Tuple + +import aiohttp +import nest_asyncio + +from nucleus.constants import DEFAULT_NETWORK_TIMEOUT_SEC +from nucleus.errors import NucleusAPIError +from nucleus.retry_strategy import RetryStrategy + +from .logger import logger + +if TYPE_CHECKING: + from . import NucleusClient + + +@dataclass +class FileFormField: + name: str + filename: str + value: BinaryIO + content_type: str + + +FileFormData = Sequence[FileFormField] + + +class FormDataContextHandler: + """A context handler for file form data that handles closing all files in a request. + + Why do I need to wrap my requests in such a funny way? + + 1. Form data must be regenerated on each request to avoid errors + see https://github.com/Rapptz/discord.py/issues/6531 + 2. Files must be properly open/closed for each request. + 3. We need to be able to do 1/2 above multiple times so that we can implement retries + properly. + + Write a function that returns a tuple of form data and file pointers, then pass it to the + constructor of this class, and this class will handle the rest for you. + """ + + def __init__( + self, + form_data_and_file_pointers_fn: Callable[ + ..., Tuple[FileFormData, Sequence[BinaryIO]] + ], + ): + self._form_data_and_file_pointer_fn = form_data_and_file_pointers_fn + self._file_pointers = None + + def __enter__(self): + ( + file_form_data, + self._file_pointers, + ) = self._form_data_and_file_pointer_fn() + form = aiohttp.FormData() + for field in file_form_data: + form.add_field( + name=field.name, + filename=field.filename, + value=field.value, + content_type=field.content_type, + ) + return form + + def __exit__(self, exc_type, exc_val, exc_tb): + for file_pointer in self._file_pointers: + file_pointer.close() + + +def get_event_loop(): + try: + loop = asyncio.get_event_loop() + except RuntimeError: # no event loop running: + loop = asyncio.new_event_loop() + else: + nest_asyncio.apply(loop) + return loop + + +def make_many_form_data_requests_concurrently( + client: "NucleusClient", + requests: Sequence[FormDataContextHandler], + route: str, +): + """ + Makes an async post request with form data to a Nucleus endpoint. + + Args: + client: The client to use for the request. + requests: Each requst should be a FormDataContextHandler object which will + handle generating form data, and opening/closing files for each request. + route: route for the request. + """ + loop = get_event_loop() + return loop.run_until_complete( + form_data_request_helper(client, requests, route) + ) + + +async def form_data_request_helper( + client: "NucleusClient", + requests: Sequence[FormDataContextHandler], + route: str, +): + """ + Makes an async post request with files to a Nucleus endpoint. + + Args: + client: The client to use for the request. + requests: Each requst should be a FormDataContextHandler object which will + handle generating form data, and opening/closing files for each request. + route: route for the request. + """ + async with aiohttp.ClientSession() as session: + tasks = [ + asyncio.ensure_future( + _post_form_data( + client=client, + request=request, + route=route, + session=session, + ) + ) + for request in requests + ] + return await asyncio.gather(*tasks) + + +async def _post_form_data( + client: "NucleusClient", + request: FormDataContextHandler, + route: str, + session: aiohttp.ClientSession, +): + """ + Makes an async post request with files to a Nucleus endpoint. + + Args: + client: The client to use for the request. + request: The request to make (See FormDataContextHandler for more details.) + route: route for the request. + session: The session to use for the request. + """ + endpoint = f"{client.endpoint}/{route}" + + logger.info("Posting to %s", endpoint) + + for sleep_time in RetryStrategy.sleep_times + [-1]: + with request as form: + async with session.post( + endpoint, + data=form, + auth=aiohttp.BasicAuth(client.api_key, ""), + timeout=DEFAULT_NETWORK_TIMEOUT_SEC, + ) as response: + logger.info( + "API request has response code %s", response.status + ) + + try: + data = await response.json() + except aiohttp.client_exceptions.ContentTypeError: + # In case of 404, the server returns text + data = await response.text() + if ( + response.status in RetryStrategy.statuses + and sleep_time != -1 + ): + time.sleep(sleep_time) + continue + + if response.status == 503: + raise TimeoutError( + "The request to upload your max is timing out, please lower the batch size." + ) + + if not response.ok: + raise NucleusAPIError( + endpoint, + session.post, + aiohttp_response=( + response.status, + response.reason, + data, + ), + ) + + return data diff --git a/nucleus/segmentation_uploader.py b/nucleus/segmentation_uploader.py new file mode 100644 index 00000000..6915143a --- /dev/null +++ b/nucleus/segmentation_uploader.py @@ -0,0 +1,114 @@ +# import asyncio +# import json +# import os +# from typing import TYPE_CHECKING, Any, List +# from nucleus.annotation import SegmentationAnnotation +# from nucleus.async_utils import get_event_loop +# from nucleus.constants import DATASET_ID_KEY, MASK_TYPE, SEGMENTATIONS_KEY +# from nucleus.errors import NotFoundError +# from nucleus.payload_constructor import construct_segmentation_payload +# from annotation import is_local_path +# from nucleus.upload_response import UploadResponse +# import nest_asyncio + +# if TYPE_CHECKING: +# from . import NucleusClient + + +# class SegmentationUploader: +# def __init__(self, dataset_id: str, client: "NucleusClient"): # noqa: F821 +# self.dataset_id = dataset_id +# self._client = client + +# def annotate( +# self, +# segmentations: List[SegmentationAnnotation], +# batch_size: int = 20, +# update: bool = False, +# ): +# remote_segmentations = [] +# local_segmentations = [] +# for segmentation in segmentations: +# if is_local_path(segmentation.mask_url): +# if not segmentation.local_file_exists(): +# raise NotFoundError( +# "Could not find f{segmentation.mask_url}" +# ) +# local_segmentations.append(segmentation) +# else: +# remote_segmentations.append(segmentation) + +# local_batches = [ +# local_segmentations[i : i + batch_size] +# for i in range(0, len(local_segmentations), batch_size) +# ] + +# remote_batches = [ +# remote_segmentations[i : i + batch_size] +# for i in range(0, len(remote_segmentations), batch_size) +# ] + +# agg_response = UploadResponse(json={DATASET_ID_KEY: self.dataset_id}) + +# async_responses: List[Any] = [] + +# if local_batches: +# tqdm_local_batches = self._client.tqdm_bar( +# local_batches, desc="Local file batches" +# ) +# for batch in tqdm_local_batches: +# responses = self._process_annotate_requests_local( +# self.dataset_id, batch +# ) +# async_responses.extend(responses) + +# def process_annotate_requests_local( +# dataset_id: str, +# segmentations: List[SegmentationAnnotation], +# local_batch_size: int = 10, +# ): +# requests = [] +# file_pointers = [] +# for i in range(0, len(segmentations), local_batch_size): +# batch = segmentations[i : i + local_batch_size] +# request, request_file_pointers = self.construct_files_request( +# batch +# ) +# requests.append(request) +# file_pointers.extend(request_file_pointers) + +# future = self.make_many_files_requests_asynchronously( +# requests, f"dataset/{dataset_id}/files" +# ) + +# loop = get_event_loop() + +# responses = loop.run_until_complete(future) +# [fp.close() for fp in file_pointers] +# return responses + +# def construct_files_request( +# segmentations: List[SegmentationAnnotation], +# ): +# request_json = construct_segmentation_payload( +# segmentations, update +# ) +# request_payload = [ +# ( +# SEGMENTATIONS_KEY, +# (None, json.dumps(request_json), "application/json"), +# ) +# ] +# file_pointers = [] +# for segmentation in segmentations: +# mask_fp = open(segmentation.mask_url, "rb") +# filename = os.path.basename(segmentation.mask_url) +# file_type = segmentation.mask_url.split(".")[-1] +# if file_type != "png": +# raise ValueError( +# f"Only png files are supported. Got {file_type} for {segmentation.mask_url}" +# ) +# request_payload.append( +# (MASK_TYPE, (filename, mask_fp, "image/png")) +# ) +# return request_payload, file_pointers From 4106f8ee4ec9d966f2cf5918bd780e9e73e79a62 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 11 Mar 2022 17:52:40 +0000 Subject: [PATCH 03/11] Big refactor to make things cleaner + enable retries properly on infra flakes for local upload --- nucleus/dataset_item_uploader.py | 14 ++++++++++++-- nucleus/segmentation_uploader.py | 5 +++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/nucleus/dataset_item_uploader.py b/nucleus/dataset_item_uploader.py index adf7e343..9682316e 100644 --- a/nucleus/dataset_item_uploader.py +++ b/nucleus/dataset_item_uploader.py @@ -150,12 +150,22 @@ def get_form_data_and_file_pointers_fn( """Constructs a function that will generate form data on each retry.""" def fn(): - payload = construct_append_payload(items, update) + + # For some reason, our backend only accepts this reformatting of items when + # doing local upload. + # TODO: make it just accept the same exact format as a normal append request + # i.e. the output of construct_append_payload(items, update) + json_data = [] + for item in items: + item_payload = item.to_payload() + item_payload[UPDATE_KEY] = update + json_data.append(item_payload) + form_data = [ FileFormField( name=ITEMS_KEY, filename=None, - value=json.dumps(payload, allow_nan=False), + value=json.dumps(json_data, allow_nan=False), content_type="application/json", ) ] diff --git a/nucleus/segmentation_uploader.py b/nucleus/segmentation_uploader.py index 6915143a..623fdac4 100644 --- a/nucleus/segmentation_uploader.py +++ b/nucleus/segmentation_uploader.py @@ -112,3 +112,8 @@ # (MASK_TYPE, (filename, mask_fp, "image/png")) # ) # return request_payload, file_pointers + + +# {"items": [{"metadata": {"test": 0}, "reference_id": "test_img.jpg", "image_url": "tests/test_img.jpg", "upload_to_scale": true}] + +# [{"metadata": {"test": 0}, "reference_id": "test_img.jpg", "image_url": "tests/test_img.jpg", "upload_to_scale": true, "update": false}] From 59ce11ed376f13da4a123f0b4142b4b89f404b3f Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 11 Mar 2022 23:21:23 +0000 Subject: [PATCH 04/11] work in progress refactor of annotation upload --- nucleus/__init__.py | 87 ---------------------- nucleus/annotation.py | 15 +++- nucleus/async_utils.py | 27 +++++-- nucleus/dataset.py | 88 +++++++++++++++-------- nucleus/dataset_item_uploader.py | 75 ++++++++++++------- nucleus/segmentation_uploader.py | 119 ------------------------------- tests/test_dataset.py | 5 +- 7 files changed, 145 insertions(+), 271 deletions(-) delete mode 100644 nucleus/segmentation_uploader.py diff --git a/nucleus/__init__.py b/nucleus/__init__.py index c66292f1..edbb98ab 100644 --- a/nucleus/__init__.py +++ b/nucleus/__init__.py @@ -460,93 +460,6 @@ def populate_dataset( dataset_items, batch_size=batch_size, update=update ) - def annotate_dataset( - self, - dataset_id: str, - annotations: Sequence[ - Union[ - BoxAnnotation, - PolygonAnnotation, - CuboidAnnotation, - CategoryAnnotation, - MultiCategoryAnnotation, - SegmentationAnnotation, - ] - ], - update: bool, - batch_size: int = 5000, - ) -> Dict[str, object]: - # TODO: deprecate in favor of Dataset.annotate invocation - - # Split payload into segmentations and Box/Polygon - segmentations = [ - ann - for ann in annotations - if isinstance(ann, SegmentationAnnotation) - ] - other_annotations = [ - ann - for ann in annotations - if not isinstance(ann, SegmentationAnnotation) - ] - - batches = [ - other_annotations[i : i + batch_size] - for i in range(0, len(other_annotations), batch_size) - ] - - semseg_batches = [ - segmentations[i : i + batch_size] - for i in range(0, len(segmentations), batch_size) - ] - - agg_response = { - DATASET_ID_KEY: dataset_id, - ANNOTATIONS_PROCESSED_KEY: 0, - ANNOTATIONS_IGNORED_KEY: 0, - ERRORS_KEY: [], - } - - total_batches = len(batches) + len(semseg_batches) - - tqdm_batches = self.tqdm_bar(batches) - - with self.tqdm_bar(total=total_batches) as pbar: - for batch in tqdm_batches: - payload = construct_annotation_payload(batch, update) - response = self.make_request( - payload, f"dataset/{dataset_id}/annotate" - ) - pbar.update(1) - if STATUS_CODE_KEY in response: - agg_response[ERRORS_KEY] = response - else: - agg_response[ANNOTATIONS_PROCESSED_KEY] += response[ - ANNOTATIONS_PROCESSED_KEY - ] - agg_response[ANNOTATIONS_IGNORED_KEY] += response[ - ANNOTATIONS_IGNORED_KEY - ] - agg_response[ERRORS_KEY] += response[ERRORS_KEY] - - for s_batch in semseg_batches: - payload = construct_segmentation_payload(s_batch, update) - response = self.make_request( - payload, f"dataset/{dataset_id}/annotate_segmentation" - ) - pbar.update(1) - if STATUS_CODE_KEY in response: - agg_response[ERRORS_KEY] = response - else: - agg_response[ANNOTATIONS_PROCESSED_KEY] += response[ - ANNOTATIONS_PROCESSED_KEY - ] - agg_response[ANNOTATIONS_IGNORED_KEY] += response[ - ANNOTATIONS_IGNORED_KEY - ] - - return agg_response - @deprecated(msg="Use Dataset.ingest_tasks instead") def ingest_tasks(self, dataset_id: str, payload: dict): dataset = self.get_dataset(dataset_id) diff --git a/nucleus/annotation.py b/nucleus/annotation.py index 347e156d..f3f9533a 100644 --- a/nucleus/annotation.py +++ b/nucleus/annotation.py @@ -1,4 +1,5 @@ import json +import os from dataclasses import dataclass, field from enum import Enum from typing import Dict, List, Optional, Sequence, Type, Union @@ -70,6 +71,9 @@ def to_json(self) -> str: """Serializes annotation object to schematized JSON string.""" return json.dumps(self.to_payload(), allow_nan=False) + def has_local_files(self) -> bool: + return False + @dataclass # pylint: disable=R0902 class BoxAnnotation(Annotation): # pylint: disable=R0902 @@ -578,6 +582,13 @@ def to_payload(self) -> dict: return payload + def has_files(self) -> bool: + if is_local_path(self.mask_url): + if not os.path.isfile(self.mask_url): + raise Exception(f"Mask file {self.mask_url} does not exist.") + return True + return False + class AnnotationTypes(Enum): BOX = BOX_TYPE @@ -737,12 +748,12 @@ def is_local_path(path: str) -> bool: def check_all_mask_paths_remote( - annotations: Sequence[Union[Annotation]], + annotations: Sequence[Annotation], ): for annotation in annotations: if hasattr(annotation, MASK_URL_KEY): if is_local_path(getattr(annotation, MASK_URL_KEY)): raise ValueError( "Found an annotation with a local path, which is not currently" - f"supported. Use a remote path instead. {annotation}" + f"supported for asynchronous upload. Use a remote path instead, or try synchronous upload. {annotation}" ) diff --git a/nucleus/async_utils.py b/nucleus/async_utils.py index 9c8c196f..1b46b806 100644 --- a/nucleus/async_utils.py +++ b/nucleus/async_utils.py @@ -5,6 +5,7 @@ import aiohttp import nest_asyncio +from tqdm import tqdm from nucleus.constants import DEFAULT_NETWORK_TIMEOUT_SEC from nucleus.errors import NucleusAPIError @@ -27,6 +28,16 @@ class FileFormField: FileFormData = Sequence[FileFormField] +async def gather_with_concurrency(n, *tasks): + semaphore = asyncio.Semaphore(n) + + async def sem_task(task): + async with semaphore: + return await task + + return await asyncio.gather(*(sem_task(task) for task in tasks)) + + class FormDataContextHandler: """A context handler for file form data that handles closing all files in a request. @@ -85,6 +96,8 @@ def make_many_form_data_requests_concurrently( client: "NucleusClient", requests: Sequence[FormDataContextHandler], route: str, + progressbar: tqdm, + concurrency: int = 30, ): """ Makes an async post request with form data to a Nucleus endpoint. @@ -97,7 +110,9 @@ def make_many_form_data_requests_concurrently( """ loop = get_event_loop() return loop.run_until_complete( - form_data_request_helper(client, requests, route) + form_data_request_helper( + client, requests, route, progressbar, concurrency + ) ) @@ -105,6 +120,8 @@ async def form_data_request_helper( client: "NucleusClient", requests: Sequence[FormDataContextHandler], route: str, + progressbar: tqdm, + concurrency: int = 30, ): """ Makes an async post request with files to a Nucleus endpoint. @@ -123,11 +140,12 @@ async def form_data_request_helper( request=request, route=route, session=session, + progressbar=progressbar, ) ) for request in requests ] - return await asyncio.gather(*tasks) + return await gather_with_concurrency(concurrency, *tasks) async def _post_form_data( @@ -135,6 +153,7 @@ async def _post_form_data( request: FormDataContextHandler, route: str, session: aiohttp.ClientSession, + progressbar: tqdm, ): """ Makes an async post request with files to a Nucleus endpoint. @@ -175,7 +194,7 @@ async def _post_form_data( if response.status == 503: raise TimeoutError( - "The request to upload your max is timing out, please lower the batch size." + "The request to upload your max is timing out, please lower local_files_per_upload_request in your api call." ) if not response.ok: @@ -188,5 +207,5 @@ async def _post_form_data( data, ), ) - + progressbar.update(1) return data diff --git a/nucleus/dataset.py b/nucleus/dataset.py index a7959507..08ef8bc3 100644 --- a/nucleus/dataset.py +++ b/nucleus/dataset.py @@ -3,6 +3,7 @@ import requests +from nucleus.annotation_uploader import AnnotationUploader from nucleus.job import AsyncJob from nucleus.prediction import ( BoxPrediction, @@ -20,16 +21,7 @@ serialize_and_write_to_presigned_url, ) -from .annotation import ( - Annotation, - BoxAnnotation, - CategoryAnnotation, - CuboidAnnotation, - MultiCategoryAnnotation, - PolygonAnnotation, - SegmentationAnnotation, - check_all_mask_paths_remote, -) +from .annotation import Annotation, check_all_mask_paths_remote from .constants import ( ANNOTATIONS_KEY, AUTOTAG_SCORE_THRESHOLD, @@ -304,19 +296,13 @@ def create_model_run( def annotate( self, - annotations: Sequence[ - Union[ - BoxAnnotation, - PolygonAnnotation, - CuboidAnnotation, - CategoryAnnotation, - MultiCategoryAnnotation, - SegmentationAnnotation, - ] - ], + annotations: Sequence[Annotation], update: bool = DEFAULT_ANNOTATION_UPDATE_MODE, batch_size: int = 5000, asynchronous: bool = False, + remote_files_per_upload_request: int = 20, + local_files_per_upload_request: int = 10, + local_file_upload_concurrency: int = 30, ) -> Union[Dict[str, Any], AsyncJob]: """Uploads ground truth annotations to the dataset. @@ -349,9 +335,19 @@ def annotate( objects to upload. update: Whether to ignore or overwrite metadata for conflicting annotations. batch_size: Number of annotations processed in each concurrent batch. - Default is 5000. + Default is 5000. If you get timeouts when uploading geometric annotations, + you can try lowering this batch size. asynchronous: Whether or not to process the upload asynchronously (and return an :class:`AsyncJob` object). Default is False. + remote_files_per_upload_request: Number of remote files to upload in each + request. Segmentations have either local or remote files, if you are + getting timeouts while uploading segmentations with remote urls, you + should lower this value from its default of 20. + local_files_per_upload_request: Number of local files to upload in each + request. Segmentations have either local or remote files, if you are + getting timeouts while uploading segmentations with local files, you + should lower this value from its default of 10. The maximum is 10. + Returns: If synchronous, payload describing the upload result:: @@ -363,9 +359,8 @@ def annotate( Otherwise, returns an :class:`AsyncJob` object. """ - check_all_mask_paths_remote(annotations) - if asynchronous: + check_all_mask_paths_remote(annotations) request_id = serialize_and_write_to_presigned_url( annotations, self.id, self._client ) @@ -374,8 +369,14 @@ def annotate( route=f"dataset/{self.id}/annotate?async=1", ) return AsyncJob.from_json(response, self._client) - return self._client.annotate_dataset( - self.id, annotations, update=update, batch_size=batch_size + uploader = AnnotationUploader(dataset_id=self.id, client=self._client) + return uploader.upload( + annotations=annotations, + update=update, + batch_size=batch_size, + remote_files_per_upload_request=remote_files_per_upload_request, + local_files_per_upload_request=local_files_per_upload_request, + local_file_upload_concurrency=local_file_upload_concurrency, ) def ingest_tasks(self, task_ids: List[str]) -> dict: @@ -412,6 +413,8 @@ def append( update: bool = False, batch_size: int = 20, asynchronous: bool = False, + local_files_per_upload_request: int = 10, + local_file_upload_concurrency: int = 30, ) -> Union[Dict[Any, Any], AsyncJob, UploadResponse]: """Appends items or scenes to a dataset. @@ -482,12 +485,20 @@ def append( Sequence[:class:`LidarScene`] \ Sequence[:class:`VideoScene`] ]): List of items or scenes to upload. - batch_size: Size of the batch for larger uploads. Default is 20. + batch_size: Size of the batch for larger uploads. Default is 20. This is + for items that have a remote URL and do not require a local upload. + If you get timeouts for uploading remote urls, try decreasing this. update: Whether or not to overwrite metadata on reference ID collision. Default is False. asynchronous: Whether or not to process the upload asynchronously (and return an :class:`AsyncJob` object). This is required when uploading scenes. Default is False. + files_per_upload_request: How large to make each upload request when your + files are local. If you get timeouts, you may need to lower this from + its default of 10. The default is 10. + local_file_upload_concurrency: How many local file requests to send + concurrently. If you start to see gateway timeouts or cloudflare related + errors, you may need to lower this from its default of 30. Returns: For scenes @@ -557,6 +568,8 @@ def append( dataset_items, update=update, batch_size=batch_size, + local_files_per_upload_request=local_files_per_upload_request, + local_file_upload_concurrency=local_file_upload_concurrency, ) @deprecated("Prefer using Dataset.append instead.") @@ -1431,6 +1444,8 @@ def _upload_items( dataset_items: List[DatasetItem], batch_size: int = 20, update: bool = False, + local_files_per_upload_request: int = 10, + local_file_upload_concurrency: int = 30, ) -> UploadResponse: """ Appends images to a dataset with given dataset_id. @@ -1438,8 +1453,16 @@ def _upload_items( Args: dataset_items: Items to Upload - batch_size: size of the batch for long payload + batch_size: how many items with remote urls to include in each request. + If you get timeouts for uploading remote urls, try decreasing this. update: Update records on conflict otherwise overwrite + local_files_per_upload_request: How large to make each upload request when your + files are local. If you get timeouts, you may need to lower this from + its default of 10. The maximum is 10. + local_file_upload_concurrency: How many local file requests to send + concurrently. If you start to see gateway timeouts or cloudflare related + errors, you may need to lower this from its default of 30. + Returns: UploadResponse """ @@ -1450,9 +1473,14 @@ def _upload_items( "client.create_dataset(, is_scene=False) or add the dataset items to " "an existing dataset supporting dataset items." ) - - populator = DatasetItemUploader(self.id, self._client) - return populator.upload(dataset_items, batch_size, update) + uploader = DatasetItemUploader(self.id, self._client) + return uploader.upload( + dataset_items=dataset_items, + batch_size=batch_size, + update=update, + local_files_per_upload_request=local_files_per_upload_request, + local_file_upload_concurrency=local_file_upload_concurrency, + ) def update_scene_metadata(self, mapping: Dict[str, dict]): """ diff --git a/nucleus/dataset_item_uploader.py b/nucleus/dataset_item_uploader.py index 9682316e..a803ca92 100644 --- a/nucleus/dataset_item_uploader.py +++ b/nucleus/dataset_item_uploader.py @@ -35,14 +35,20 @@ def __init__(self, dataset_id: str, client: "NucleusClient"): # noqa: F821 def upload( self, dataset_items: List[DatasetItem], - batch_size: int = 20, + batch_size: int = 5000, update: bool = False, + local_files_per_upload_request: int = 10, + local_file_upload_concurrency: int = 30, ) -> UploadResponse: """ Args: dataset_items: Items to Upload - batch_size: How many items to pool together for a single request + batch_size: How many items to pool together for a single request for items + without files to upload + files_per_upload_request: How many items to pool together for a single + request for items with files to upload + update: Update records instead of overwriting Returns: @@ -50,6 +56,8 @@ def upload( """ local_items = [] remote_items = [] + if local_files_per_upload_request > 10: + raise ValueError("local_files_per_upload_request should be <= 10") # Check local files exist before sending requests for item in dataset_items: @@ -60,30 +68,26 @@ def upload( else: remote_items.append(item) - local_batches = [ - local_items[i : i + batch_size] - for i in range(0, len(local_items), batch_size) - ] - - remote_batches = [ - remote_items[i : i + batch_size] - for i in range(0, len(remote_items), batch_size) - ] - agg_response = UploadResponse(json={DATASET_ID_KEY: self.dataset_id}) async_responses: List[Any] = [] - if local_batches: - tqdm_local_batches = self._client.tqdm_bar( - local_batches, desc="Local file batches" + if local_items: + async_responses.extend( + self._process_append_requests_local( + self.dataset_id, + items=local_items, + update=update, + batch_size=batch_size, + local_files_per_upload_request=local_files_per_upload_request, + local_file_upload_concurrency=local_file_upload_concurrency, + ) ) - for batch in tqdm_local_batches: - responses = self._process_append_requests_local( - self.dataset_id, items=batch, update=update - ) - async_responses.extend(responses) + remote_batches = [ + remote_items[i : i + batch_size] + for i in range(0, len(remote_items), batch_size) + ] if remote_batches: tqdm_remote_batches = self._client.tqdm_bar( @@ -108,20 +112,30 @@ def _process_append_requests_local( dataset_id: str, items: Sequence[DatasetItem], update: bool, - local_batch_size: int = 10, + batch_size: int, + local_files_per_upload_request: int, + local_file_upload_concurrency: int, ): + # Batch into requests requests = [] - for i in range(0, len(items), local_batch_size): - batch = items[i : i + local_batch_size] + batch_size = local_files_per_upload_request + for i in range(0, len(items), batch_size): + batch = items[i : i + batch_size] request = FormDataContextHandler( self.get_form_data_and_file_pointers_fn(batch, update) ) requests.append(request) + progressbar = self._client.tqdm_bar( + total=len(requests), desc="Local file batches" + ) + return make_many_form_data_requests_concurrently( self._client, requests, f"dataset/{dataset_id}/append", + progressbar=progressbar, + concurrency=local_file_upload_concurrency, ) def _process_append_requests( @@ -147,7 +161,12 @@ def _process_append_requests( def get_form_data_and_file_pointers_fn( self, items: Sequence[DatasetItem], update: bool ) -> Callable[..., Tuple[FileFormData, Sequence[BinaryIO]]]: - """Constructs a function that will generate form data on each retry.""" + """Defines a function to be called on each retry. + + File pointers are also returned so whoever calls this function can + appropriately close the files. This is intended for use with a + FormDataContextHandler in order to make form data requests. + """ def fn(): @@ -172,9 +191,11 @@ def fn(): file_pointers = [] for item in items: - image_fp = open( # pylint: disable=R1732 - item.image_location, "rb" # pylint: disable=R1732 - ) # pylint: disable=R1732 + # I don't know of a way to use with, since all files in the request + # need to be opened at the same time. + # pylint: disable=consider-using-with + image_fp = open(item.image_location, "rb") + # pylint: enable=consider-using-with img_type = f"image/{os.path.splitext(item.image_location)[1].strip('.')}" form_data.append( FileFormField( diff --git a/nucleus/segmentation_uploader.py b/nucleus/segmentation_uploader.py deleted file mode 100644 index 623fdac4..00000000 --- a/nucleus/segmentation_uploader.py +++ /dev/null @@ -1,119 +0,0 @@ -# import asyncio -# import json -# import os -# from typing import TYPE_CHECKING, Any, List -# from nucleus.annotation import SegmentationAnnotation -# from nucleus.async_utils import get_event_loop -# from nucleus.constants import DATASET_ID_KEY, MASK_TYPE, SEGMENTATIONS_KEY -# from nucleus.errors import NotFoundError -# from nucleus.payload_constructor import construct_segmentation_payload -# from annotation import is_local_path -# from nucleus.upload_response import UploadResponse -# import nest_asyncio - -# if TYPE_CHECKING: -# from . import NucleusClient - - -# class SegmentationUploader: -# def __init__(self, dataset_id: str, client: "NucleusClient"): # noqa: F821 -# self.dataset_id = dataset_id -# self._client = client - -# def annotate( -# self, -# segmentations: List[SegmentationAnnotation], -# batch_size: int = 20, -# update: bool = False, -# ): -# remote_segmentations = [] -# local_segmentations = [] -# for segmentation in segmentations: -# if is_local_path(segmentation.mask_url): -# if not segmentation.local_file_exists(): -# raise NotFoundError( -# "Could not find f{segmentation.mask_url}" -# ) -# local_segmentations.append(segmentation) -# else: -# remote_segmentations.append(segmentation) - -# local_batches = [ -# local_segmentations[i : i + batch_size] -# for i in range(0, len(local_segmentations), batch_size) -# ] - -# remote_batches = [ -# remote_segmentations[i : i + batch_size] -# for i in range(0, len(remote_segmentations), batch_size) -# ] - -# agg_response = UploadResponse(json={DATASET_ID_KEY: self.dataset_id}) - -# async_responses: List[Any] = [] - -# if local_batches: -# tqdm_local_batches = self._client.tqdm_bar( -# local_batches, desc="Local file batches" -# ) -# for batch in tqdm_local_batches: -# responses = self._process_annotate_requests_local( -# self.dataset_id, batch -# ) -# async_responses.extend(responses) - -# def process_annotate_requests_local( -# dataset_id: str, -# segmentations: List[SegmentationAnnotation], -# local_batch_size: int = 10, -# ): -# requests = [] -# file_pointers = [] -# for i in range(0, len(segmentations), local_batch_size): -# batch = segmentations[i : i + local_batch_size] -# request, request_file_pointers = self.construct_files_request( -# batch -# ) -# requests.append(request) -# file_pointers.extend(request_file_pointers) - -# future = self.make_many_files_requests_asynchronously( -# requests, f"dataset/{dataset_id}/files" -# ) - -# loop = get_event_loop() - -# responses = loop.run_until_complete(future) -# [fp.close() for fp in file_pointers] -# return responses - -# def construct_files_request( -# segmentations: List[SegmentationAnnotation], -# ): -# request_json = construct_segmentation_payload( -# segmentations, update -# ) -# request_payload = [ -# ( -# SEGMENTATIONS_KEY, -# (None, json.dumps(request_json), "application/json"), -# ) -# ] -# file_pointers = [] -# for segmentation in segmentations: -# mask_fp = open(segmentation.mask_url, "rb") -# filename = os.path.basename(segmentation.mask_url) -# file_type = segmentation.mask_url.split(".")[-1] -# if file_type != "png": -# raise ValueError( -# f"Only png files are supported. Got {file_type} for {segmentation.mask_url}" -# ) -# request_payload.append( -# (MASK_TYPE, (filename, mask_fp, "image/png")) -# ) -# return request_payload, file_pointers - - -# {"items": [{"metadata": {"test": 0}, "reference_id": "test_img.jpg", "image_url": "tests/test_img.jpg", "upload_to_scale": true}] - -# [{"metadata": {"test": 0}, "reference_id": "test_img.jpg", "image_url": "tests/test_img.jpg", "upload_to_scale": true, "update": false}] diff --git a/tests/test_dataset.py b/tests/test_dataset.py index 8594f4d0..884ca12d 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -271,8 +271,9 @@ def test_dataset_append_local(CLIENT, dataset): DatasetItem( image_location=LOCAL_FILENAME, metadata={"test": 0}, - reference_id=LOCAL_FILENAME.split("/")[-1], + reference_id=LOCAL_FILENAME.split("/")[-1] + str(i), ) + for i in range(1000) ] response = dataset.append(ds_items_local) @@ -280,7 +281,7 @@ def test_dataset_append_local(CLIENT, dataset): assert isinstance(response, UploadResponse) resp_json = response.json() assert resp_json[DATASET_ID_KEY] == dataset.id - assert resp_json[NEW_ITEMS] == 1 + assert resp_json[NEW_ITEMS] == 1000 assert resp_json[UPDATED_ITEMS] == 0 assert resp_json[IGNORED_ITEMS] == 0 assert resp_json[ERROR_ITEMS] == 0 From 01464a1302c3d1ec152b044db95bbbf41a3a82d6 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 11 Mar 2022 23:48:09 +0000 Subject: [PATCH 05/11] Fixed segmentation bugs --- nucleus/annotation_uploader.py | 201 +++++++++++++++++++++++++++++++++ 1 file changed, 201 insertions(+) create mode 100644 nucleus/annotation_uploader.py diff --git a/nucleus/annotation_uploader.py b/nucleus/annotation_uploader.py new file mode 100644 index 00000000..c4a2bc63 --- /dev/null +++ b/nucleus/annotation_uploader.py @@ -0,0 +1,201 @@ +import json +from typing import TYPE_CHECKING, Iterable, List, Sequence + +from nucleus.annotation import Annotation, SegmentationAnnotation +from nucleus.async_utils import ( + FileFormField, + FormDataContextHandler, + make_many_form_data_requests_concurrently, +) +from nucleus.constants import ITEMS_KEY, SEGMENTATIONS_KEY +from nucleus.payload_constructor import ( + construct_annotation_payload, + construct_segmentation_payload, +) + +if TYPE_CHECKING: + from . import NucleusClient + + +def accumulate_dict_values(dicts: Iterable[dict]): + """ + Accumulate a list of dicts into a single dict using summation. + """ + result = {} + for d in dicts: + for key, value in d.items(): + if key not in result: + result[key] = value + else: + result[key] += value + return result + + +class AnnotationUploader: + def __init__(self, dataset_id: str, client: "NucleusClient"): # noqa: F821 + self.dataset_id = dataset_id + self._client = client + + def upload( + self, + annotations: Iterable[Annotation], + batch_size: int = 5000, + update: bool = False, + remote_files_per_upload_request: int = 20, + local_files_per_upload_request: int = 10, + local_file_upload_concurrency: int = 30, + ): + if local_files_per_upload_request > 10: + raise ValueError("local_files_per_upload_request must be <= 10") + annotations_without_files: List[Annotation] = [] + segmentations_with_local_files: List[SegmentationAnnotation] = [] + segmentations_with_remote_files: List[SegmentationAnnotation] = [] + + for annotation in annotations: + if annotation.has_local_files(): + # Only segmentations have local files currently, and probably for a long + # time to to come. + assert isinstance(annotation, SegmentationAnnotation) + segmentations_with_local_files.append(annotation) + elif isinstance(annotation, SegmentationAnnotation): + segmentations_with_remote_files.append(annotation) + else: + annotations_without_files.append(annotation) + + responses = [] + if segmentations_with_local_files: + responses.extend( + self.make_batched_file_form_data_requests( + segmentations=segmentations_with_local_files, + update=update, + local_files_per_upload_request=local_files_per_upload_request, + local_file_upload_concurrency=local_file_upload_concurrency, + ) + ) + if segmentations_with_remote_files: + # Segmentations require an upload and must be batched differently since a single + # segmentation will take a lot longer for the server to process than a single + # annotation of any other kind. + responses.extend( + self.make_batched_annotate_requests( + segmentations_with_remote_files, + update, + batch_size=remote_files_per_upload_request, + segmentation=True, + ) + ) + if annotations_without_files: + responses.extend( + self.make_batched_annotate_requests( + annotations_without_files, + update, + batch_size=batch_size, + segmentation=False, + ) + ) + + return accumulate_dict_values(responses) + + def make_batched_annotate_requests( + self, + annotations: Sequence[Annotation], + update: bool, + batch_size: int, + segmentation: bool, + ): + batches = [ + annotations[i : i + batch_size] + for i in range(0, len(annotations), batch_size) + ] + responses = [] + progress_bar_name = ( + "Segmentation batches" if segmentation else "Annotation batches" + ) + for batch in self._client.tqdm_bar(batches, desc=progress_bar_name): + if segmentation: + payload = construct_segmentation_payload(batch, update) + # TODO: remove validation checks in backend for /annotate + # since it should work. + route = f"dataset/{self.dataset_id}/annotate_segmentation" + else: + payload = construct_annotation_payload(batch, update) + route = f"dataset/{self.dataset_id}/annotate" + responses.append(self._client.make_request(payload, route)) + return responses + + def make_batched_file_form_data_requests( + self, + segmentations: Sequence[SegmentationAnnotation], + update, + local_files_per_upload_request: int, + local_file_upload_concurrency: int, + ): + requests = [] + for i in range(0, len(segmentations), local_files_per_upload_request): + batch = segmentations[i : i + local_files_per_upload_request] + request = FormDataContextHandler( + self.get_form_data_and_file_pointers_fn(batch, update) + ) + requests.append(request) + + progressbar = self._client.tqdm_bar( + total=len(requests), + desc="Local segmentation mask file batches", + ) + + return make_many_form_data_requests_concurrently( + client=self._client, + requests=requests, + route=f"dataset/{self.dataset_id}/annotate_segmentation_files", + progressbar=progressbar, + concurrency=local_file_upload_concurrency, + ) + + def get_form_data_and_file_pointers_fn( + self, + segmentations: Iterable[SegmentationAnnotation], + update: bool, + ): + """Defines a function to be called on each retry. + + File pointers are also returned so whoever calls this function can + appropriately close the files. This is intended for use with a + FormDataContextHandler in order to make form data requests. + """ + + def fn(): + request_json = construct_segmentation_payload( + segmentations, update + ) + form_data = [ + FileFormField( + name=ITEMS_KEY, + filename=None, + value=json.dumps(request_json), + content_type="application/json", + ) + ] + file_pointers = [] + for segmentation in segmentations: + # I don't know of a way to use with, since all files in the request + # need to be opened at the same time. + # pylint: disable=consider-using-with + mask_fp = open(segmentation.mask_url, "rb") + # pylint: enable=consider-using-with + file_type = segmentation.mask_url.split(".")[-1] + if file_type != "png": + raise ValueError( + f"Only png files are supported. Got {file_type} for {segmentation.mask_url}" + ) + form_data.append( + FileFormField( + name=SEGMENTATIONS_KEY, + filename=segmentation.mask_url, + value=mask_fp, + content_type="image/png", + ) + ) + file_pointers.append(mask_fp) + return form_data, file_pointers + + return fn From ad186d34bc649f02cfe65a8859690b3fe8d5fd69 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Sat, 12 Mar 2022 00:08:37 +0000 Subject: [PATCH 06/11] Fix one more bug and remove use of annotate_segmentation endpoint --- nucleus/annotation_uploader.py | 17 +++++++---------- nucleus/payload_constructor.py | 23 +++++++++++++++++------ 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/nucleus/annotation_uploader.py b/nucleus/annotation_uploader.py index c4a2bc63..4dd06578 100644 --- a/nucleus/annotation_uploader.py +++ b/nucleus/annotation_uploader.py @@ -24,7 +24,7 @@ def accumulate_dict_values(dicts: Iterable[dict]): result = {} for d in dicts: for key, value in d.items(): - if key not in result: + if key not in result or key == "dataset_id": result[key] = value else: result[key] += value @@ -112,15 +112,12 @@ def make_batched_annotate_requests( "Segmentation batches" if segmentation else "Annotation batches" ) for batch in self._client.tqdm_bar(batches, desc=progress_bar_name): - if segmentation: - payload = construct_segmentation_payload(batch, update) - # TODO: remove validation checks in backend for /annotate - # since it should work. - route = f"dataset/{self.dataset_id}/annotate_segmentation" - else: - payload = construct_annotation_payload(batch, update) - route = f"dataset/{self.dataset_id}/annotate" - responses.append(self._client.make_request(payload, route)) + payload = construct_annotation_payload(batch, update) + responses.append( + self._client.make_request( + payload, route=f"dataset/{self.dataset_id}/annotate" + ) + ) return responses def make_batched_file_form_data_requests( diff --git a/nucleus/payload_constructor.py b/nucleus/payload_constructor.py index 38609368..f21f6990 100644 --- a/nucleus/payload_constructor.py +++ b/nucleus/payload_constructor.py @@ -1,4 +1,4 @@ -from typing import Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Union from .annotation import ( BoxAnnotation, @@ -72,11 +72,22 @@ def construct_annotation_payload( ], update: bool, ) -> dict: - annotations = [] - for annotation_item in annotation_items: - annotations.append(annotation_item.to_payload()) - - return {ANNOTATIONS_KEY: annotations, ANNOTATION_UPDATE_KEY: update} + annotations = [ + annotation.to_payload() + for annotation in annotation_items + if not isinstance(annotation, SegmentationAnnotation) + ] + segmentations = [ + annotation.to_payload() + for annotation in annotation_items + if isinstance(annotation, SegmentationAnnotation) + ] + payload: Dict[str, Any] = {ANNOTATION_UPDATE_KEY: update} + if annotations: + payload[ANNOTATIONS_KEY] = annotations + if segmentations: + payload[SEGMENTATIONS_KEY] = segmentations + return payload def construct_segmentation_payload( From 6ac90ac056dd38cd259ddd7b69a5ce1dcf669553 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Sat, 12 Mar 2022 02:09:22 +0000 Subject: [PATCH 07/11] refactor tests and add segmentation local upload test --- tests/helpers.py | 3 ++ tests/test_annotation.py | 65 ---------------------------- tests/test_segmentation.py | 89 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 92 insertions(+), 65 deletions(-) create mode 100644 tests/test_segmentation.py diff --git a/tests/helpers.py b/tests/helpers.py index d5f67d87..9a8c2881 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -301,6 +301,9 @@ def reference_id_from_url(url): TEST_MASK_URL = "https://raw.githubusercontent.com/scaleapi/nucleus-python-client/master/tests/testdata/000000000285.png" +this_dir = os.path.dirname(os.path.realpath(__file__)) +TEST_LOCAL_MASK_URL = os.path.join(this_dir, "testdata/000000000285.png") + TEST_SEGMENTATION_ANNOTATIONS = [ { "reference_id": reference_id_from_url(TEST_IMG_URLS[i]), diff --git a/tests/test_annotation.py b/tests/test_annotation.py index ced5ff11..6b218817 100644 --- a/tests/test_annotation.py +++ b/tests/test_annotation.py @@ -34,7 +34,6 @@ assert_multicategory_annotation_matches_dict, assert_partial_equality, assert_polygon_annotation_matches_dict, - assert_segmentation_annotation_matches_dict, reference_id_from_url, ) @@ -242,70 +241,6 @@ def test_default_multicategory_gt_upload(dataset): ) -def test_single_semseg_gt_upload(dataset): - annotation = SegmentationAnnotation.from_json( - TEST_SEGMENTATION_ANNOTATIONS[0] - ) - response = dataset.annotate(annotations=[annotation]) - assert response["dataset_id"] == dataset.id - assert response["annotations_processed"] == 1 - assert response["annotations_ignored"] == 0 - - response_annotation = dataset.refloc(annotation.reference_id)[ - "annotations" - ]["segmentation"][0] - assert_segmentation_annotation_matches_dict( - response_annotation, TEST_SEGMENTATION_ANNOTATIONS[0] - ) - - -def test_batch_semseg_gt_upload(dataset): - annotations = [ - SegmentationAnnotation.from_json(ann) - for ann in TEST_SEGMENTATION_ANNOTATIONS - ] - response = dataset.annotate(annotations=annotations) - assert response["dataset_id"] == dataset.id - assert response["annotations_processed"] == 5 - assert response["annotations_ignored"] == 0 - - -def test_batch_semseg_gt_upload_ignore(dataset): - # First upload annotations - annotations = [ - SegmentationAnnotation.from_json(ann) - for ann in TEST_SEGMENTATION_ANNOTATIONS - ] - response = dataset.annotate(annotations=annotations) - assert response["dataset_id"] == dataset.id - assert response["annotations_processed"] == 5 - assert response["annotations_ignored"] == 0 - - # When we re-upload, expect them to be ignored - response = dataset.annotate(annotations=annotations) - assert response["dataset_id"] == dataset.id - assert response["annotations_processed"] == 0 - assert response["annotations_ignored"] == 5 - - -def test_batch_semseg_gt_upload_update(dataset): - # First upload annotations - annotations = [ - SegmentationAnnotation.from_json(ann) - for ann in TEST_SEGMENTATION_ANNOTATIONS - ] - response = dataset.annotate(annotations=annotations) - assert response["dataset_id"] == dataset.id - assert response["annotations_processed"] == 5 - assert response["annotations_ignored"] == 0 - - # When we re-upload, expect uploads to be processed - response = dataset.annotate(annotations=annotations, update=True) - assert response["dataset_id"] == dataset.id - assert response["annotations_processed"] == 5 - assert response["annotations_ignored"] == 0 - - def test_mixed_annotation_upload(dataset): # First upload annotations semseg_annotations = [ diff --git a/tests/test_segmentation.py b/tests/test_segmentation.py new file mode 100644 index 00000000..f7062001 --- /dev/null +++ b/tests/test_segmentation.py @@ -0,0 +1,89 @@ +from nucleus.annotation import SegmentationAnnotation +from tests.helpers import ( + TEST_LOCAL_MASK_URL, + TEST_SEGMENTATION_ANNOTATIONS, + assert_segmentation_annotation_matches_dict, +) + + +def test_single_local_semseg_gt_upload(dataset): + annotation = SegmentationAnnotation.from_json( + TEST_SEGMENTATION_ANNOTATIONS[0] + ) + annotation.mask_url = TEST_LOCAL_MASK_URL + response = dataset.annotate(annotations=[annotation]) + + assert response["dataset_id"] == dataset.id + assert response["annotations_processed"] == 1 + assert response["annotations_ignored"] == 0 + + response_annotation = dataset.refloc(annotation.reference_id)[ + "annotations" + ]["segmentation"][0] + assert_segmentation_annotation_matches_dict( + response_annotation, TEST_SEGMENTATION_ANNOTATIONS[0] + ) + + +def test_single_semseg_gt_upload(dataset): + annotation = SegmentationAnnotation.from_json( + TEST_SEGMENTATION_ANNOTATIONS[0] + ) + response = dataset.annotate(annotations=[annotation]) + assert response["dataset_id"] == dataset.id + assert response["annotations_processed"] == 1 + assert response["annotations_ignored"] == 0 + + response_annotation = dataset.refloc(annotation.reference_id)[ + "annotations" + ]["segmentation"][0] + assert_segmentation_annotation_matches_dict( + response_annotation, TEST_SEGMENTATION_ANNOTATIONS[0] + ) + + +def test_batch_semseg_gt_upload(dataset): + annotations = [ + SegmentationAnnotation.from_json(ann) + for ann in TEST_SEGMENTATION_ANNOTATIONS + ] + response = dataset.annotate(annotations=annotations) + assert response["dataset_id"] == dataset.id + assert response["annotations_processed"] == 5 + assert response["annotations_ignored"] == 0 + + +def test_batch_semseg_gt_upload_ignore(dataset): + # First upload annotations + annotations = [ + SegmentationAnnotation.from_json(ann) + for ann in TEST_SEGMENTATION_ANNOTATIONS + ] + response = dataset.annotate(annotations=annotations) + assert response["dataset_id"] == dataset.id + assert response["annotations_processed"] == 5 + assert response["annotations_ignored"] == 0 + + # When we re-upload, expect them to be ignored + response = dataset.annotate(annotations=annotations) + assert response["dataset_id"] == dataset.id + assert response["annotations_processed"] == 0 + assert response["annotations_ignored"] == 5 + + +def test_batch_semseg_gt_upload_update(dataset): + # First upload annotations + annotations = [ + SegmentationAnnotation.from_json(ann) + for ann in TEST_SEGMENTATION_ANNOTATIONS + ] + response = dataset.annotate(annotations=annotations) + assert response["dataset_id"] == dataset.id + assert response["annotations_processed"] == 5 + assert response["annotations_ignored"] == 0 + + # When we re-upload, expect uploads to be processed + response = dataset.annotate(annotations=annotations, update=True) + assert response["dataset_id"] == dataset.id + assert response["annotations_processed"] == 5 + assert response["annotations_ignored"] == 0 From c61e60f82a545de75a8e5c20ec908fe592b0a0da Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Sat, 12 Mar 2022 04:03:15 +0000 Subject: [PATCH 08/11] Tests passing --- nucleus/annotation.py | 14 ++++++++- nucleus/annotation_uploader.py | 8 ++--- nucleus/constants.py | 1 + tests/test_segmentation.py | 55 ++++++++++++++++++++++++++-------- 4 files changed, 60 insertions(+), 18 deletions(-) diff --git a/nucleus/annotation.py b/nucleus/annotation.py index f3f9533a..f21fb597 100644 --- a/nucleus/annotation.py +++ b/nucleus/annotation.py @@ -582,13 +582,25 @@ def to_payload(self) -> dict: return payload - def has_files(self) -> bool: + def has_local_files(self) -> bool: if is_local_path(self.mask_url): if not os.path.isfile(self.mask_url): raise Exception(f"Mask file {self.mask_url} does not exist.") return True return False + def __eq__(self, other): + if not isinstance(other, SegmentationAnnotation): + return False + self.annotations = sorted(self.annotations, key=lambda x: x.index) + other.annotations = sorted(other.annotations, key=lambda x: x.index) + return ( + (self.annotation_id == other.annotation_id) + and (self.annotations == other.annotations) + and (self.mask_url == other.mask_url) + and (self.reference_id == other.reference_id) + ) + class AnnotationTypes(Enum): BOX = BOX_TYPE diff --git a/nucleus/annotation_uploader.py b/nucleus/annotation_uploader.py index 4dd06578..99fcac9d 100644 --- a/nucleus/annotation_uploader.py +++ b/nucleus/annotation_uploader.py @@ -7,7 +7,7 @@ FormDataContextHandler, make_many_form_data_requests_concurrently, ) -from nucleus.constants import ITEMS_KEY, SEGMENTATIONS_KEY +from nucleus.constants import MASK_TYPE, SERIALIZED_REQUEST_KEY from nucleus.payload_constructor import ( construct_annotation_payload, construct_segmentation_payload, @@ -143,7 +143,7 @@ def make_batched_file_form_data_requests( return make_many_form_data_requests_concurrently( client=self._client, requests=requests, - route=f"dataset/{self.dataset_id}/annotate_segmentation_files", + route=f"dataset/{self.dataset_id}/annotate", progressbar=progressbar, concurrency=local_file_upload_concurrency, ) @@ -166,7 +166,7 @@ def fn(): ) form_data = [ FileFormField( - name=ITEMS_KEY, + name=SERIALIZED_REQUEST_KEY, filename=None, value=json.dumps(request_json), content_type="application/json", @@ -186,7 +186,7 @@ def fn(): ) form_data.append( FileFormField( - name=SEGMENTATIONS_KEY, + name=MASK_TYPE, filename=segmentation.mask_url, value=mask_fp, content_type="image/png", diff --git a/nucleus/constants.py b/nucleus/constants.py index 69837640..426fc7a4 100644 --- a/nucleus/constants.py +++ b/nucleus/constants.py @@ -99,6 +99,7 @@ REFERENCE_ID_KEY = "reference_id" REQUEST_ID_KEY = "requestId" SCENES_KEY = "scenes" +SERIALIZED_REQUEST_KEY = "serialized_request" SEGMENTATIONS_KEY = "segmentations" SLICE_ID_KEY = "slice_id" STATUS_CODE_KEY = "status_code" diff --git a/tests/test_segmentation.py b/tests/test_segmentation.py index f7062001..590b366d 100644 --- a/tests/test_segmentation.py +++ b/tests/test_segmentation.py @@ -1,4 +1,5 @@ from nucleus.annotation import SegmentationAnnotation +from nucleus.dataset import Dataset from tests.helpers import ( TEST_LOCAL_MASK_URL, TEST_SEGMENTATION_ANNOTATIONS, @@ -6,23 +7,51 @@ ) -def test_single_local_semseg_gt_upload(dataset): - annotation = SegmentationAnnotation.from_json( +def test_single_local_semseg_gt_upload(dataset: Dataset): + request_annotation = SegmentationAnnotation.from_json( TEST_SEGMENTATION_ANNOTATIONS[0] ) - annotation.mask_url = TEST_LOCAL_MASK_URL - response = dataset.annotate(annotations=[annotation]) + request_annotation.mask_url = TEST_LOCAL_MASK_URL + response = dataset.annotate(annotations=[request_annotation]) assert response["dataset_id"] == dataset.id assert response["annotations_processed"] == 1 assert response["annotations_ignored"] == 0 - response_annotation = dataset.refloc(annotation.reference_id)[ + response_annotation = dataset.refloc(request_annotation.reference_id)[ "annotations" ]["segmentation"][0] - assert_segmentation_annotation_matches_dict( - response_annotation, TEST_SEGMENTATION_ANNOTATIONS[0] - ) + + assert response_annotation == request_annotation + + +def test_batch_local_semseg_gt_upload(dataset: Dataset): + + # This reference id is not in the dataset. + bad_reference_id = TEST_SEGMENTATION_ANNOTATIONS[-1]["reference_id"] + + request_annotations = [ + SegmentationAnnotation.from_json(json_data) + for json_data in TEST_SEGMENTATION_ANNOTATIONS + ] + for request_annotation in request_annotations: + request_annotation.mask_url = TEST_LOCAL_MASK_URL + response = dataset.annotate(annotations=request_annotations) + + print(request_annotations) + print(response) + + assert response["dataset_id"] == dataset.id + assert response["annotations_processed"] == 4 + assert response["annotations_ignored"] == 0 + assert bad_reference_id in response["errors"][0] + + for request_annotation in request_annotations[:4]: + response_annotation = dataset.refloc(request_annotation.reference_id)[ + "annotations" + ]["segmentation"][0] + + assert response_annotation == request_annotation def test_single_semseg_gt_upload(dataset): @@ -49,7 +78,7 @@ def test_batch_semseg_gt_upload(dataset): ] response = dataset.annotate(annotations=annotations) assert response["dataset_id"] == dataset.id - assert response["annotations_processed"] == 5 + assert response["annotations_processed"] == 4 assert response["annotations_ignored"] == 0 @@ -61,14 +90,14 @@ def test_batch_semseg_gt_upload_ignore(dataset): ] response = dataset.annotate(annotations=annotations) assert response["dataset_id"] == dataset.id - assert response["annotations_processed"] == 5 + assert response["annotations_processed"] == 4 assert response["annotations_ignored"] == 0 # When we re-upload, expect them to be ignored response = dataset.annotate(annotations=annotations) assert response["dataset_id"] == dataset.id assert response["annotations_processed"] == 0 - assert response["annotations_ignored"] == 5 + assert response["annotations_ignored"] == 4 def test_batch_semseg_gt_upload_update(dataset): @@ -79,11 +108,11 @@ def test_batch_semseg_gt_upload_update(dataset): ] response = dataset.annotate(annotations=annotations) assert response["dataset_id"] == dataset.id - assert response["annotations_processed"] == 5 + assert response["annotations_processed"] == 4 assert response["annotations_ignored"] == 0 # When we re-upload, expect uploads to be processed response = dataset.annotate(annotations=annotations, update=True) assert response["dataset_id"] == dataset.id - assert response["annotations_processed"] == 5 + assert response["annotations_processed"] == 4 assert response["annotations_ignored"] == 0 From 0bdd631d8449e06a13304c2a6ffc64f2e1d96647 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 14 Mar 2022 22:08:46 +0000 Subject: [PATCH 09/11] Review feedback --- nucleus/annotation.py | 11 ++++++++-- nucleus/annotation_uploader.py | 8 ++++++- nucleus/async_utils.py | 6 +++++- nucleus/connection.py | 2 +- nucleus/retry_strategy.py | 10 ++++++++- tests/helpers.py | 2 ++ tests/test_dataset.py | 5 +++-- tests/test_segmentation.py | 38 +++++++++++++++++++++++++--------- 8 files changed, 64 insertions(+), 18 deletions(-) diff --git a/nucleus/annotation.py b/nucleus/annotation.py index f21fb597..46483ac1 100644 --- a/nucleus/annotation.py +++ b/nucleus/annotation.py @@ -71,7 +71,13 @@ def to_json(self) -> str: """Serializes annotation object to schematized JSON string.""" return json.dumps(self.to_payload(), allow_nan=False) - def has_local_files(self) -> bool: + def has_local_files_to_upload(self) -> bool: + """Returns True if annotation has local files that need to be uploaded. + + Nearly all subclasses have no local files, so we default this to just return + false. If the subclass has local files, it should override this method (but + that is not the only thing required to get local upload of files to work.) + """ return False @@ -582,7 +588,8 @@ def to_payload(self) -> dict: return payload - def has_local_files(self) -> bool: + def has_local_files_to_upload(self) -> bool: + """Check if the mask url is local and needs to be uploaded.""" if is_local_path(self.mask_url): if not os.path.isfile(self.mask_url): raise Exception(f"Mask file {self.mask_url} does not exist.") diff --git a/nucleus/annotation_uploader.py b/nucleus/annotation_uploader.py index 99fcac9d..ace601cd 100644 --- a/nucleus/annotation_uploader.py +++ b/nucleus/annotation_uploader.py @@ -32,6 +32,11 @@ def accumulate_dict_values(dicts: Iterable[dict]): class AnnotationUploader: + """This is a helper class not intended for direct use. Please use dataset.annotate. + + This class is purely a helper class for implementing dataset.annotate. + """ + def __init__(self, dataset_id: str, client: "NucleusClient"): # noqa: F821 self.dataset_id = dataset_id self._client = client @@ -45,6 +50,7 @@ def upload( local_files_per_upload_request: int = 10, local_file_upload_concurrency: int = 30, ): + """For more details on parameters and functionality, see dataset.annotate.""" if local_files_per_upload_request > 10: raise ValueError("local_files_per_upload_request must be <= 10") annotations_without_files: List[Annotation] = [] @@ -52,7 +58,7 @@ def upload( segmentations_with_remote_files: List[SegmentationAnnotation] = [] for annotation in annotations: - if annotation.has_local_files(): + if annotation.has_local_files_to_upload(): # Only segmentations have local files currently, and probably for a long # time to to come. assert isinstance(annotation, SegmentationAnnotation) diff --git a/nucleus/async_utils.py b/nucleus/async_utils.py index 1b46b806..5e9b2ee9 100644 --- a/nucleus/async_utils.py +++ b/nucleus/async_utils.py @@ -29,6 +29,7 @@ class FileFormField: async def gather_with_concurrency(n, *tasks): + """Helper method to limit the concurrency when gathering the results from multiple tasks.""" semaphore = asyncio.Semaphore(n) async def sem_task(task): @@ -107,6 +108,9 @@ def make_many_form_data_requests_concurrently( requests: Each requst should be a FormDataContextHandler object which will handle generating form data, and opening/closing files for each request. route: route for the request. + progressbar: A tqdm progress bar to use for showing progress to the user. + concurrency: How many concurrent requests to run at once. Should be exposed + to the user. """ loop = get_event_loop() return loop.run_until_complete( @@ -168,7 +172,7 @@ async def _post_form_data( logger.info("Posting to %s", endpoint) - for sleep_time in RetryStrategy.sleep_times + [-1]: + for sleep_time in RetryStrategy.sleep_times() + [-1]: with request as form: async with session.post( endpoint, diff --git a/nucleus/connection.py b/nucleus/connection.py index 4054748f..11d07ba4 100644 --- a/nucleus/connection.py +++ b/nucleus/connection.py @@ -55,7 +55,7 @@ def make_request( logger.info("Make request to %s", endpoint) - for retry_wait_time in RetryStrategy.sleep_times: + for retry_wait_time in RetryStrategy.sleep_times(): response = requests_command( endpoint, json=payload, diff --git a/nucleus/retry_strategy.py b/nucleus/retry_strategy.py index 6ca64a7e..fae0ff33 100644 --- a/nucleus/retry_strategy.py +++ b/nucleus/retry_strategy.py @@ -1,4 +1,12 @@ # TODO: use retry library instead of custom code. Tenacity is one option. +import random + + class RetryStrategy: statuses = {503, 524, 520, 504} - sleep_times = [1, 3, 9, 27] # These are in seconds + + @staticmethod + def sleep_times(): + sleep_times = [1, 3, 9, 27] # These are in seconds + + return [2 * random.random() * t for t in sleep_times] diff --git a/tests/helpers.py b/tests/helpers.py index 9a8c2881..6a4a34aa 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -304,6 +304,8 @@ def reference_id_from_url(url): this_dir = os.path.dirname(os.path.realpath(__file__)) TEST_LOCAL_MASK_URL = os.path.join(this_dir, "testdata/000000000285.png") + +NUM_VALID_SEGMENTATIONS_IN_MAIN_DATASET = len(TEST_DATASET_ITEMS) TEST_SEGMENTATION_ANNOTATIONS = [ { "reference_id": reference_id_from_url(TEST_IMG_URLS[i]), diff --git a/tests/test_dataset.py b/tests/test_dataset.py index 884ca12d..adc3c7cc 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -262,6 +262,7 @@ def test_dataset_append_local(CLIENT, dataset): reference_id="bad", ) ] + num_local_items_to_test = 10 with pytest.raises(ValueError) as e: dataset.append(ds_items_local_error) assert "Out of range float values are not JSON compliant" in str( @@ -273,7 +274,7 @@ def test_dataset_append_local(CLIENT, dataset): metadata={"test": 0}, reference_id=LOCAL_FILENAME.split("/")[-1] + str(i), ) - for i in range(1000) + for i in range(num_local_items_to_test) ] response = dataset.append(ds_items_local) @@ -281,7 +282,7 @@ def test_dataset_append_local(CLIENT, dataset): assert isinstance(response, UploadResponse) resp_json = response.json() assert resp_json[DATASET_ID_KEY] == dataset.id - assert resp_json[NEW_ITEMS] == 1000 + assert resp_json[NEW_ITEMS] == num_local_items_to_test assert resp_json[UPDATED_ITEMS] == 0 assert resp_json[IGNORED_ITEMS] == 0 assert resp_json[ERROR_ITEMS] == 0 diff --git a/tests/test_segmentation.py b/tests/test_segmentation.py index 590b366d..84ca07ab 100644 --- a/tests/test_segmentation.py +++ b/tests/test_segmentation.py @@ -1,6 +1,7 @@ from nucleus.annotation import SegmentationAnnotation from nucleus.dataset import Dataset from tests.helpers import ( + NUM_VALID_SEGMENTATIONS_IN_MAIN_DATASET, TEST_LOCAL_MASK_URL, TEST_SEGMENTATION_ANNOTATIONS, assert_segmentation_annotation_matches_dict, @@ -38,15 +39,17 @@ def test_batch_local_semseg_gt_upload(dataset: Dataset): request_annotation.mask_url = TEST_LOCAL_MASK_URL response = dataset.annotate(annotations=request_annotations) - print(request_annotations) - print(response) - assert response["dataset_id"] == dataset.id - assert response["annotations_processed"] == 4 + assert ( + response["annotations_processed"] + == NUM_VALID_SEGMENTATIONS_IN_MAIN_DATASET + ) assert response["annotations_ignored"] == 0 assert bad_reference_id in response["errors"][0] - for request_annotation in request_annotations[:4]: + for request_annotation in request_annotations[ + :NUM_VALID_SEGMENTATIONS_IN_MAIN_DATASET + ]: response_annotation = dataset.refloc(request_annotation.reference_id)[ "annotations" ]["segmentation"][0] @@ -78,7 +81,10 @@ def test_batch_semseg_gt_upload(dataset): ] response = dataset.annotate(annotations=annotations) assert response["dataset_id"] == dataset.id - assert response["annotations_processed"] == 4 + assert ( + response["annotations_processed"] + == NUM_VALID_SEGMENTATIONS_IN_MAIN_DATASET + ) assert response["annotations_ignored"] == 0 @@ -90,14 +96,20 @@ def test_batch_semseg_gt_upload_ignore(dataset): ] response = dataset.annotate(annotations=annotations) assert response["dataset_id"] == dataset.id - assert response["annotations_processed"] == 4 + assert ( + response["annotations_processed"] + == NUM_VALID_SEGMENTATIONS_IN_MAIN_DATASET + ) assert response["annotations_ignored"] == 0 # When we re-upload, expect them to be ignored response = dataset.annotate(annotations=annotations) assert response["dataset_id"] == dataset.id assert response["annotations_processed"] == 0 - assert response["annotations_ignored"] == 4 + assert ( + response["annotations_ignored"] + == NUM_VALID_SEGMENTATIONS_IN_MAIN_DATASET + ) def test_batch_semseg_gt_upload_update(dataset): @@ -108,11 +120,17 @@ def test_batch_semseg_gt_upload_update(dataset): ] response = dataset.annotate(annotations=annotations) assert response["dataset_id"] == dataset.id - assert response["annotations_processed"] == 4 + assert ( + response["annotations_processed"] + == NUM_VALID_SEGMENTATIONS_IN_MAIN_DATASET + ) assert response["annotations_ignored"] == 0 # When we re-upload, expect uploads to be processed response = dataset.annotate(annotations=annotations, update=True) assert response["dataset_id"] == dataset.id - assert response["annotations_processed"] == 4 + assert ( + response["annotations_processed"] + == NUM_VALID_SEGMENTATIONS_IN_MAIN_DATASET + ) assert response["annotations_ignored"] == 0 From ab05d1fad9b2937b11d28ab9aa5df751d7ed8344 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 14 Mar 2022 23:29:29 +0000 Subject: [PATCH 10/11] Initial pass at client changes for prediction segmentation upload --- nucleus/__init__.py | 87 ---------------------------------- nucleus/annotation_uploader.py | 44 ++++++++++++----- nucleus/dataset.py | 31 +++++++++++- nucleus/model_run.py | 48 +++++++++++++++---- 4 files changed, 102 insertions(+), 108 deletions(-) diff --git a/nucleus/__init__.py b/nucleus/__init__.py index edbb98ab..50016905 100644 --- a/nucleus/__init__.py +++ b/nucleus/__init__.py @@ -512,93 +512,6 @@ def create_model_run(self, dataset_id: str, payload: dict) -> ModelRun: response[MODEL_RUN_ID_KEY], dataset_id=dataset_id, client=self ) - @deprecated("Use Dataset.upload_predictions instead.") - def predict( - self, - annotations: List[ - Union[ - BoxPrediction, - PolygonPrediction, - CuboidPrediction, - SegmentationPrediction, - CategoryPrediction, - ] - ], - model_run_id: Optional[str] = None, - model_id: Optional[str] = None, - dataset_id: Optional[str] = None, - update: bool = False, - batch_size: int = 5000, - ): - if model_run_id is not None: - assert model_id is None and dataset_id is None - endpoint = f"modelRun/{model_run_id}/predict" - else: - assert ( - model_id is not None and dataset_id is not None - ), "Model ID and dataset ID are required if not using model run id." - endpoint = ( - f"dataset/{dataset_id}/model/{model_id}/uploadPredictions" - ) - segmentations = [ - ann - for ann in annotations - if isinstance(ann, SegmentationPrediction) - ] - - other_predictions = [ - ann - for ann in annotations - if not isinstance(ann, SegmentationPrediction) - ] - - s_batches = [ - segmentations[i : i + batch_size] - for i in range(0, len(segmentations), batch_size) - ] - - batches = [ - other_predictions[i : i + batch_size] - for i in range(0, len(other_predictions), batch_size) - ] - - errors = [] - predictions_processed = 0 - predictions_ignored = 0 - - tqdm_batches = self.tqdm_bar(batches) - - for batch in tqdm_batches: - batch_payload = construct_box_predictions_payload( - batch, - update, - ) - response = self.make_request(batch_payload, endpoint) - if STATUS_CODE_KEY in response: - errors.append(response) - else: - predictions_processed += response[PREDICTIONS_PROCESSED_KEY] - predictions_ignored += response[PREDICTIONS_IGNORED_KEY] - if ERRORS_KEY in response: - errors += response[ERRORS_KEY] - - for s_batch in s_batches: - payload = construct_segmentation_payload(s_batch, update) - response = self.make_request(payload, endpoint) - # pbar.update(1) - if STATUS_CODE_KEY in response: - errors.append(response) - else: - predictions_processed += response[PREDICTIONS_PROCESSED_KEY] - predictions_ignored += response[PREDICTIONS_IGNORED_KEY] - - return { - MODEL_RUN_ID_KEY: model_run_id, - PREDICTIONS_PROCESSED_KEY: predictions_processed, - PREDICTIONS_IGNORED_KEY: predictions_ignored, - ERRORS_KEY: errors, - } - @deprecated( "Model runs have been deprecated and will be removed. Use a Model instead." ) diff --git a/nucleus/annotation_uploader.py b/nucleus/annotation_uploader.py index ace601cd..c5390f34 100644 --- a/nucleus/annotation_uploader.py +++ b/nucleus/annotation_uploader.py @@ -1,5 +1,5 @@ import json -from typing import TYPE_CHECKING, Iterable, List, Sequence +from typing import TYPE_CHECKING, Iterable, List, Optional, Sequence from nucleus.annotation import Annotation, SegmentationAnnotation from nucleus.async_utils import ( @@ -34,12 +34,14 @@ def accumulate_dict_values(dicts: Iterable[dict]): class AnnotationUploader: """This is a helper class not intended for direct use. Please use dataset.annotate. - This class is purely a helper class for implementing dataset.annotate. + This class is purely a helper class for implementing dataset.annotate/dataset.predict. """ - def __init__(self, dataset_id: str, client: "NucleusClient"): # noqa: F821 - self.dataset_id = dataset_id + def __init__( + self, dataset_id: Optional[str], client: "NucleusClient" + ): # noqa: F821 self._client = client + self._route = f"dataset/{dataset_id}/annotate" def upload( self, @@ -83,7 +85,7 @@ def upload( # segmentation will take a lot longer for the server to process than a single # annotation of any other kind. responses.extend( - self.make_batched_annotate_requests( + self.make_batched_requests( segmentations_with_remote_files, update, batch_size=remote_files_per_upload_request, @@ -92,7 +94,7 @@ def upload( ) if annotations_without_files: responses.extend( - self.make_batched_annotate_requests( + self.make_batched_requests( annotations_without_files, update, batch_size=batch_size, @@ -102,7 +104,7 @@ def upload( return accumulate_dict_values(responses) - def make_batched_annotate_requests( + def make_batched_requests( self, annotations: Sequence[Annotation], update: bool, @@ -120,9 +122,7 @@ def make_batched_annotate_requests( for batch in self._client.tqdm_bar(batches, desc=progress_bar_name): payload = construct_annotation_payload(batch, update) responses.append( - self._client.make_request( - payload, route=f"dataset/{self.dataset_id}/annotate" - ) + self._client.make_request(payload, route=self._route) ) return responses @@ -149,7 +149,7 @@ def make_batched_file_form_data_requests( return make_many_form_data_requests_concurrently( client=self._client, requests=requests, - route=f"dataset/{self.dataset_id}/annotate", + route=self._route, progressbar=progressbar, concurrency=local_file_upload_concurrency, ) @@ -202,3 +202,25 @@ def fn(): return form_data, file_pointers return fn + + +class PredictionUploader(AnnotationUploader): + def __init__( + self, + client: "NucleusClient", + dataset_id: Optional[str] = None, + model_id: Optional[str] = None, + model_run_id: Optional[str] = None, + ): + super().__init__(dataset_id, client) + self._client = client + if model_run_id is not None: + assert model_id is None and dataset_id is None + self._route = f"modelRun/{model_run_id}/predict" + else: + assert ( + model_id is not None and dataset_id is not None + ), "Model ID and dataset ID are required if not using model run id." + self._route = ( + f"dataset/{dataset_id}/model/{model_id}/uploadPredictions" + ) diff --git a/nucleus/dataset.py b/nucleus/dataset.py index 08ef8bc3..811dd270 100644 --- a/nucleus/dataset.py +++ b/nucleus/dataset.py @@ -3,7 +3,7 @@ import requests -from nucleus.annotation_uploader import AnnotationUploader +from nucleus.annotation_uploader import AnnotationUploader, PredictionUploader from nucleus.job import AsyncJob from nucleus.prediction import ( BoxPrediction, @@ -347,6 +347,7 @@ def annotate( request. Segmentations have either local or remote files, if you are getting timeouts while uploading segmentations with local files, you should lower this value from its default of 10. The maximum is 10. + local_file_upload_concurrency: Number of concurrent local file uploads. Returns: @@ -1283,6 +1284,10 @@ def upload_predictions( ], update: bool = False, asynchronous: bool = False, + batch_size: int = 5000, + remote_files_per_upload_request: int = 20, + local_files_per_upload_request: int = 10, + local_file_upload_concurrency: int = 30, ): """Uploads predictions and associates them with an existing :class:`Model`. @@ -1325,6 +1330,21 @@ def upload_predictions( collision. Default is False. asynchronous: Whether or not to process the upload asynchronously (and return an :class:`AsyncJob` object). Default is False. + batch_size: Number of predictions processed in each concurrent batch. + Default is 5000. If you get timeouts when uploading geometric predictions, + you can try lowering this batch size. This is only relevant for + asynchronous=False + remote_files_per_upload_request: Number of remote files to upload in each + request. Segmentations have either local or remote files, if you are + getting timeouts while uploading segmentations with remote urls, you + should lower this value from its default of 20. This is only relevant for + asynchronous=False. + local_files_per_upload_request: Number of local files to upload in each + request. Segmentations have either local or remote files, if you are + getting timeouts while uploading segmentations with local files, you + should lower this value from its default of 10. The maximum is 10. + This is only relevant for asynchronous=False + local_file_upload_concurrency: Number of concurrent local file uploads. Returns: Payload describing the synchronous upload:: @@ -1348,12 +1368,19 @@ def upload_predictions( ) return AsyncJob.from_json(response, self._client) else: - return self._client.predict( + uploader = PredictionUploader( model_run_id=None, dataset_id=self.id, model_id=model.id, + client=self._client, + ) + return uploader.upload( annotations=predictions, + batch_size=batch_size, update=update, + remote_files_per_upload_request=remote_files_per_upload_request, + local_files_per_upload_request=local_files_per_upload_request, + local_file_upload_concurrency=local_file_upload_concurrency, ) def predictions_iloc(self, model, index): diff --git a/nucleus/model_run.py b/nucleus/model_run.py index 137d7cf8..993eda7e 100644 --- a/nucleus/model_run.py +++ b/nucleus/model_run.py @@ -18,6 +18,7 @@ import requests from nucleus.annotation import check_all_mask_paths_remote +from nucleus.annotation_uploader import PredictionUploader from nucleus.job import AsyncJob from nucleus.utils import ( format_prediction_response, @@ -114,12 +115,38 @@ def predict( SegmentationPrediction, ] ], - update: Optional[bool] = DEFAULT_ANNOTATION_UPDATE_MODE, + update: bool = DEFAULT_ANNOTATION_UPDATE_MODE, asynchronous: bool = False, + batch_size: int = 5000, + remote_files_per_upload_request: int = 20, + local_files_per_upload_request: int = 10, + local_file_upload_concurrency: int = 30, ) -> Union[dict, AsyncJob]: """ Uploads model outputs as predictions for a model_run. Returns info about the upload. - :param annotations: List[Union[BoxPrediction, PolygonPrediction, CuboidPrediction, SegmentationPrediction]], + + Args: + annotations: Predictions to upload for this model run, + update: If True, existing predictions for the same (reference_id, annotation_id) + will be overwritten. If False, existing predictions will be skipped. + asynchronous: Whether or not to process the upload asynchronously (and + return an :class:`AsyncJob` object). Default is False. + batch_size: Number of predictions processed in each concurrent batch. + Default is 5000. If you get timeouts when uploading geometric annotations, + you can try lowering this batch size. This is only relevant for + asynchronous=False. + remote_files_per_upload_request: Number of remote files to upload in each + request. Segmentations have either local or remote files, if you are + getting timeouts while uploading segmentations with remote urls, you + should lower this value from its default of 20. This is only relevant for + asynchronous=False + local_files_per_upload_request: Number of local files to upload in each + request. Segmentations have either local or remote files, if you are + getting timeouts while uploading segmentations with local files, you + should lower this value from its default of 10. The maximum is 10. + This is only relevant for asynchronous=False + local_file_upload_concurrency: Number of concurrent local file uploads. + This is only relevant for asynchronous=False :return: { "model_run_id": str, @@ -138,12 +165,17 @@ def predict( route=f"modelRun/{self.model_run_id}/predict?async=1", ) return AsyncJob.from_json(response, self._client) - else: - return self._client.predict( - model_run_id=self.model_run_id, - annotations=annotations, - update=update, - ) + uploader = PredictionUploader( + model_run_id=self.model_run_id, client=self._client + ) + return uploader.upload( + annotations=annotations, + update=update, + batch_size=batch_size, + remote_files_per_upload_request=remote_files_per_upload_request, + local_files_per_upload_request=local_files_per_upload_request, + local_file_upload_concurrency=local_file_upload_concurrency, + ) def iloc(self, i: int): """ From 5b501d18f5e20e1a20deeff30449ed3827614dab Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Tue, 15 Mar 2022 17:28:28 +0000 Subject: [PATCH 11/11] relevant tests pass --- conftest.py | 7 +++++ nucleus/annotation_uploader.py | 9 ++++-- tests/test_prediction.py | 33 --------------------- tests/test_segmentation.py | 52 ++++++++++++++++++++++++++++++++++ 4 files changed, 66 insertions(+), 35 deletions(-) diff --git a/conftest.py b/conftest.py index b98074ea..d956c050 100644 --- a/conftest.py +++ b/conftest.py @@ -28,6 +28,13 @@ def dataset(CLIENT): CLIENT.delete_dataset(ds.id) +@pytest.fixture() +def model(CLIENT): + model = CLIENT.create_model(TEST_DATASET_NAME, "fake_reference_id") + yield model + CLIENT.delete_model(model.id) + + if __name__ == "__main__": client = nucleus.NucleusClient(API_KEY) # ds = client.create_dataset("Test Dataset With Autotags") diff --git a/nucleus/annotation_uploader.py b/nucleus/annotation_uploader.py index c5390f34..6926eb30 100644 --- a/nucleus/annotation_uploader.py +++ b/nucleus/annotation_uploader.py @@ -24,7 +24,11 @@ def accumulate_dict_values(dicts: Iterable[dict]): result = {} for d in dicts: for key, value in d.items(): - if key not in result or key == "dataset_id": + if ( + key not in result + or key == "dataset_id" + or key == "model_run_id" + ): result[key] = value else: result[key] += value @@ -32,7 +36,8 @@ def accumulate_dict_values(dicts: Iterable[dict]): class AnnotationUploader: - """This is a helper class not intended for direct use. Please use dataset.annotate. + """This is a helper class not intended for direct use. Please use dataset.annotate + or dataset.upload_predictions. This class is purely a helper class for implementing dataset.annotate/dataset.predict. """ diff --git a/tests/test_prediction.py b/tests/test_prediction.py index cc8ba6f4..3b06fb0e 100644 --- a/tests/test_prediction.py +++ b/tests/test_prediction.py @@ -194,39 +194,6 @@ def test_non_existent_taxonomy_category_gt_upload(model_run): ) -def test_segmentation_pred_upload(model_run): - prediction = SegmentationPrediction.from_json( - TEST_SEGMENTATION_PREDICTIONS[0] - ) - response = model_run.predict(annotations=[prediction]) - - assert response["model_run_id"] == model_run.model_run_id - assert response["predictions_processed"] == 1 - assert response["predictions_ignored"] == 0 - - response = model_run.refloc(prediction.reference_id)["segmentation"] - assert isinstance(response[0], SegmentationPrediction) - - assert_segmentation_annotation_matches_dict( - response[0], TEST_SEGMENTATION_PREDICTIONS[0] - ) - - -def test_segmentation_pred_upload_ignore(model_run): - prediction = SegmentationPrediction.from_json( - TEST_SEGMENTATION_PREDICTIONS[0] - ) - response1 = model_run.predict(annotations=[prediction]) - - assert response1["predictions_processed"] == 1 - - # Upload Duplicate annotation - response = model_run.predict(annotations=[prediction]) - assert response["model_run_id"] == model_run.model_run_id - assert response["predictions_processed"] == 0 - assert response["predictions_ignored"] == 1 - - def test_box_pred_upload_update(model_run): prediction = BoxPrediction(**TEST_BOX_PREDICTIONS[0]) response = model_run.predict(annotations=[prediction]) diff --git a/tests/test_segmentation.py b/tests/test_segmentation.py index 84ca07ab..7bf0fe23 100644 --- a/tests/test_segmentation.py +++ b/tests/test_segmentation.py @@ -1,13 +1,65 @@ from nucleus.annotation import SegmentationAnnotation from nucleus.dataset import Dataset +from nucleus.model import Model +from nucleus.prediction import SegmentationPrediction from tests.helpers import ( NUM_VALID_SEGMENTATIONS_IN_MAIN_DATASET, TEST_LOCAL_MASK_URL, TEST_SEGMENTATION_ANNOTATIONS, + TEST_SEGMENTATION_PREDICTIONS, assert_segmentation_annotation_matches_dict, ) +def test_segmentation_pred_upload_local(dataset: Dataset, model: Model): + prediction = SegmentationPrediction.from_json( + TEST_SEGMENTATION_PREDICTIONS[0] + ) + prediction.mask_url = TEST_LOCAL_MASK_URL + response = dataset.upload_predictions(model, [prediction]) + + assert response["predictions_processed"] == 1 + + response = dataset.predictions_refloc(model, prediction.reference_id)[ + "segmentation" + ][0] + assert isinstance(response, SegmentationPrediction) + assert response == prediction + + +def test_segmentation_pred_upload(dataset: Dataset, model: Model): + prediction = SegmentationPrediction.from_json( + TEST_SEGMENTATION_PREDICTIONS[0] + ) + response = dataset.upload_predictions(model, [prediction]) + + assert response["predictions_processed"] == 1 + assert response["predictions_ignored"] == 0 + + response = dataset.predictions_refloc(model, prediction.reference_id)[ + "segmentation" + ] + assert isinstance(response[0], SegmentationPrediction) + + assert_segmentation_annotation_matches_dict( + response[0], TEST_SEGMENTATION_PREDICTIONS[0] + ) + + +def test_segmentation_pred_upload_ignore(dataset: Dataset, model: Model): + prediction = SegmentationPrediction.from_json( + TEST_SEGMENTATION_PREDICTIONS[0] + ) + response1 = dataset.upload_predictions(model, [prediction]) + + assert response1["predictions_processed"] == 1 + + # Upload Duplicate annotation + response = dataset.upload_predictions(model, [prediction]) + assert response["predictions_processed"] == 0 + assert response["predictions_ignored"] == 1 + + def test_single_local_semseg_gt_upload(dataset: Dataset): request_annotation = SegmentationAnnotation.from_json( TEST_SEGMENTATION_ANNOTATIONS[0]