this is an example for cortex release 0.22 and may not deploy correctly on other releases of cortex
This example shows how to deploy a batch image classification api that accepts a list of image urls as input, downloads the images, classifies them, and writes the results to S3.
Batch APIs are only supported on a Cortex cluster (in AWS). You can find cluster installation documentation here.
​Install Cortex CLI​
​Create a Cortex Cluster​
Create an S3 bucket/directory to store the results of the batch job
AWS CLI (optional)
Create a Python file named predictor.py
.
Define a Predictor class with a constructor that loads and initializes an image-classifier from torchvision
.
Add a predict()
function that will accept a list of images urls (http:// or s3://), downloads them, performs inference, and writes the predictions to S3.
Specify an on_job_complete()
function that aggregates the results and writes them to a single file named aggregated_results.json
in S3.
# predictor.py​import osimport requestsimport torchimport torchvisionfrom torchvision import transformsfrom PIL import Imagefrom io import BytesIOimport boto3import jsonimport re​​class PythonPredictor:def __init__(self, config, job_spec):self.model = torchvision.models.alexnet(pretrained=True).eval()​normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])self.preprocess = transforms.Compose([transforms.Resize(256), transforms.CenterCrop(224), transforms.ToTensor(), normalize])​self.labels = requests.get("https://storage.googleapis.com/download.tensorflow.org/data/ImageNetLabels.txt").text.split("\n")[1:]​if len(config.get("dest_s3_dir", "")) == 0:raise Exception("'dest_s3_dir' field was not provided in job submission")​self.s3 = boto3.client("s3")​self.bucket, self.key = re.match("s3://(.+?)/(.+)", config["dest_s3_dir"]).groups()self.key = os.path.join(self.key, job_spec["job_id"])​def predict(self, payload, batch_id):tensor_list = []​# download and preprocess each imagefor image_url in payload:if image_url.startswith("s3://"):bucket, image_key = re.match("s3://(.+?)/(.+)", image_url).groups()image_bytes = self.s3.get_object(Bucket=bucket, Key=image_key)["Body"].read()else:image_bytes = requests.get(image_url).content​img_pil = Image.open(BytesIO(image_bytes))tensor_list.append(self.preprocess(img_pil))​# classify the batch of imagesimg_tensor = torch.stack(tensor_list)with torch.no_grad():prediction = self.model(img_tensor)_, indices = prediction.max(1)​# extract predicted classesresults = [{"url": payload[i], "class": self.labels[class_idx]}for i, class_idx in enumerate(indices)]json_output = json.dumps(results)​# save resultsself.s3.put_object(Bucket=self.bucket, Key=f"{self.key}/{batch_id}.json", Body=json_output)​def on_job_complete(self):all_results = []​# aggregate all classificationspaginator = self.s3.get_paginator("list_objects_v2")for page in paginator.paginate(Bucket=self.bucket, Prefix=self.key):for obj in page["Contents"]:body = self.s3.get_object(Bucket=self.bucket, Key=obj["Key"])["Body"]all_results += json.loads(body.read().decode("utf8"))​# save single file containing aggregated classificationsself.s3.put_object(Bucket=self.bucket,Key=os.path.join(self.key, "aggregated_results.json"),Body=json.dumps(all_results),)
Here are the complete Predictor docs.
Create a requirements.txt
file to specify the dependencies needed by predictor.py
. Cortex will automatically install them into your runtime once you deploy:
# requirements.txt​boto3torchtorchvisionpillow
Create a cortex.yaml
file and add the configuration below. An api
with kind: BatchAPI
will expose your model as an endpoint that will orchestrate offline batch inference across multiple workers upon receiving job requests. The configuration below defines how much compute
each worker requires and your predictor.py
determines how each batch should be processed.
# cortex.yaml​- name: image-classifierkind: BatchAPIpredictor:type: pythonpath: predictor.pycompute:cpu: 1
Here are the complete API configuration docs.
cortex deploy
takes your model, your predictor.py
implementation, and your configuration from cortex.yaml
and creates an endpoint that can receive job submissions and manage running jobs.
$ cortex deploy --env aws​created image-classifier (BatchAPI)
Get the endpoint for your Batch API with cortex get image-classifier
:
$ cortex get image-classifier --env aws​no submitted jobs​endpoint: https://abcdefg.execute-api.us-west-2.amazonaws.com/image-classifier
Our predictor.py
implementation writes results to an S3 directory. Before submitting a job, we need to create an S3 directory to store the output of the batch job. The S3 directory should be accessible by the credentials used to create your Cortex cluster.
Export the S3 directory to an environment variable:
$ export CORTEX_DEST_S3_DIR=<YOUR_S3_DIRECTORY> # e.g. export CORTEX_DEST_S3_DIR=s3://my-bucket/dir
Now that you've deployed a Batch API, you are ready to submit jobs. You can provide image urls directly in the request by specifying the urls in item_list
. The curl command below showcases how to submit image urls in the request.
$ export BATCH_API_ENDPOINT=<BATCH_API_ENDPOINT> # e.g. export BATCH_API_ENDPOINT=https://abcdefg.execute-api.us-west-2.amazonaws.com/image-classifier$ export CORTEX_DEST_S3_DIR=<YOUR_S3_DIRECTORY> # e.g. export CORTEX_DEST_S3_DIR=s3://my-bucket/dir$ curl $BATCH_API_ENDPOINT \-X POST -H "Content-Type: application/json" \-d @- <<EOF{"workers": 1,"item_list": {"items": ["https://i.imgur.com/PzXprwl.jpg","https://i.imgur.com/E4cOSLw.jpg","https://user-images.githubusercontent.com/4365343/96516272-d40aa980-1234-11eb-949d-8e7e739b8345.jpg","https://i.imgur.com/jDimNTZ.jpg","https://i.imgur.com/WqeovVj.jpg"],"batch_size": 2},"config": {"dest_s3_dir": "$CORTEX_DEST_S3_DIR"}}EOF
Note: if you are prompted with >
then type EOF
.
After submitting the job, you should get a response like this:
{"job_id":"69d6faf82e4660d3","api_name":"image-classifier", "config":{"dest_s3_dir": "YOUR_S3_BUCKET_HERE"}}
Take note of the job id in the response.
$ cortex get image-classifier --env aws​job id status progress start time duration69d6faf82e4660d3 running 0/3 20 Jul 2020 01:07:44 UTC 3m26s​endpoint: https://abcdefg.execute-api.us-west-2.amazonaws.com/image-classifier
You can make a GET request to your <BATCH_API_ENDPOINT>/JOB_ID
to get the status of your job.
$ curl https://abcdefg.execute-api.us-west-2.amazonaws.com?jobID=69d6faf82e4660d3​{"job_status":{"job_id":"69d6faf82e4660d3","api_name":"image-classifier",...},"endpoint":"https://abcdefg.execute-api.us-west-2.amazonaws.com/image-classifier"}
You can also use the Cortex CLI to get the status of your job using cortex get <BATCH_API_NAME> <JOB_ID>
.
$ cortex get image-classifier 69d6faf82e4660d3 --env aws​job id: 69d6faf82e4660d3status: running​start time: 27 Jul 2020 15:02:25 UTCend time: -duration: 42s​batch statstotal succeeded failed avg time per batch3 0 0 -​worker statsrequested initializing running failed succeeded1 1 0 0 0​job endpoint: https://abcdefg.execute-api.us-west-2.amazonaws.com/image-classifier/69d6faf82e4660d3
You can stream logs realtime for debugging and monitoring purposes with cortex logs <BATCH_API_NAME> <JOB_ID>
$ cortex logs image-classifier 69d6fdeb2d8e6647 --env aws​started enqueuing batches to queuepartitioning 5 items found in job submission into 3 batches of size 2completed enqueuing a total of 3 batchesspinning up workers......2020-08-07 14:44:05.557598:cortex:pid-25:INFO:processing batch c9136381-6dcc-45bd-bd97-cc9c66ccc6d62020-08-07 14:44:26.037276:cortex:pid-25:INFO:executing on_job_complete2020-08-07 14:44:26.208972:cortex:pid-25:INFO:no batches left in queue, job has been completed
Wait for the job to complete by streaming the logs with cortex logs <BATCH_API_NAME> <JOB_ID>
or watching for the job status to change with cortex get <BATCH_API_NAME> <JOB_ID> --watch
.
The status of your job, which you can get from cortex get <BATCH_API_NAME> <JOB_ID>
, should change from running
to succeeded
once the job has completed. If it changes to a different status, you may be able to find the stacktrace using cortex logs <BATCH_API_NAME> <JOB_ID>
. If your job has completed successfully, you can view the results of the image classification in the S3 directory you specified in the job submission.
Using the AWS CLI:
$ aws s3 ls $CORTEX_DEST_S3_DIR/<JOB_ID>/161f9fda-fd08-44f3-b983-4529f950e40b.json40100ffb-6824-4560-8ca4-7c0d14273e05.jsonc9136381-6dcc-45bd-bd97-cc9c66ccc6d6.jsonaggregated_results.json
You can download the aggregated results file with aws s3 cp $CORTEX_DEST_S3_DIR/<JOB_ID>/aggregated_results.json .
and confirm that there are 16 classifications.
In addition to providing the image URLs directly in the job submission request, it is possible to use image urls stored in newline delimited json files in S3. A newline delimited JSON file has one complete JSON object per line.
Two newline delimited json files containing image urls for this tutorial have already been created for you and can be found at s3://cortex-examples/image-classifier/
. If you have AWS CLI, you can list the directory and you should be able to find the files (urls_0.json
and urls_1.json
).
$ aws s3 ls s3://cortex-examples/image-classifier/PRE inception/...2020-07-27 14:19:30 506 urls_0.json2020-07-27 14:19:30 473 urls_1.json
To use JSON files as input data for the job, we will specify delimited_files
in the job request. The Batch API will break up the JSON files into batches of desired size and push them onto a queue that is consumed by the pool of workers.
Before we submit the job, let's perform a dry run to ensure that only the desired files will be read. You can perform a dry run by appending dryRun=true
query parameter to your job request.
Get the endpoint from cortex get image-classifier
if you haven't done so already.
$ export BATCH_API_ENDPOINT=<BATCH_API_ENDPOINT> # e.g. export BATCH_API_ENDPOINT=https://abcdefg.execute-api.us-west-2.amazonaws.com/image-classifier$ export CORTEX_DEST_S3_DIR=<YOUR_S3_DIRECTORY> # e.g. export CORTEX_DEST_S3_DIR=s3://my-bucket/dir$ curl $BATCH_API_ENDPOINT?dryRun=true \-X POST -H "Content-Type: application/json" \-d @- <<EOF{"workers": 1,"delimited_files": {"s3_paths": ["s3://cortex-examples/image-classifier/"],"includes": ["**.json"],"batch_size": 2},"config": {"dest_s3_dir": "$CORTEX_DEST_S3_DIR"}}EOF
Note: if you are prompted with >
then type EOF
.
You should expect a response like this:
s3://cortex-examples/image-classifier/urls_0.jsons3://cortex-examples/image-classifier/urls_1.jsonvalidations passed
This shows that the correct files will be used as input for the job.
When you submit a job specifying delimited_files
, your Batch API will get all of the input S3 files based on s3_paths
and will apply the filters specified in includes
and excludes
. Then your Batch API will read each file, split on the newline characters, and parse each item as a JSON object. Each item in the file is treated as a single sample and will be grouped together into batches and then placed onto a queue that is consumed by the pool of workers.
In this example urls_0.json
and urls_1.json
each contain 8 urls. Let's classify the images from the URLs listed in those 2 files.
$ export BATCH_API_ENDPOINT=<BATCH_API_ENDPOINT> # e.g. export BATCH_API_ENDPOINT=https://abcdefg.execute-api.us-west-2.amazonaws.com/image-classifier$ export CORTEX_DEST_S3_DIR=<YOUR_S3_DIRECTORY> # e.g. export CORTEX_DEST_S3_DIR=s3://my-bucket/dir$ curl $BATCH_API_ENDPOINT \-X POST -H "Content-Type: application/json" \-d @- <<EOF{"workers": 1,"delimited_files": {"s3_paths": ["s3://cortex-examples/image-classifier/"],"includes": ["**.json"],"batch_size": 2},"config": {"dest_s3_dir": "$CORTEX_DEST_S3_DIR"}}EOF
Note: if you are prompted with >
then type EOF
.
After submitting this job, you should get a response like this:
{"job_id":"69d6faf82e4660d3","api_name":"image-classifier", "config":{"dest_s3_dir": "YOUR_S3_BUCKET_HERE"}}
Wait for the job to complete by streaming the logs with cortex logs <BATCH_API_NAME> <JOB_ID>
or watching for the job status to change with cortex get <BATCH_API_NAME> <JOB_ID> --watch
.
$ cortex logs image-classifier 69d6faf82e4660d3 --env aws​started enqueuing batches to queueenqueuing contents from file s3://cortex-examples/image-classifier/urls_0.jsonenqueuing contents from file s3://cortex-examples/image-classifier/urls_1.jsoncompleted enqueuing a total of 8 batchesspinning up workers...2020-08-07 15:11:21.364179:cortex:pid-25:INFO:processing batch 1de0bc65-04ea-4b9e-9e96-5a0bb52fcc37...2020-08-07 15:11:45.461032:cortex:pid-25:INFO:no batches left in queue, job has been completed
The status of your job, which you can get from cortex get <BATCH_API_NAME> <JOB_ID>
, should change from running
to succeeded
once the job has completed. If it changes to a different status, you may be able to find the stacktrace using cortex logs <BATCH_API_NAME> <JOB_ID>
. If your job has completed successfully, you can view the results of the image classification in the S3 directory you specified in the job submission.
Using the AWS CLI:
$ aws s3 ls $CORTEX_DEST_S3_DIR/<JOB_ID>/161f9fda-fd08-44f3-b983-4529f950e40b.json40100ffb-6824-4560-8ca4-7c0d14273e05.json6d1c933c-0ddf-4316-9956-046cd731c5ab.json...aggregated_results.json
You can download the aggregated results file with aws s3 cp $CORTEX_DEST_S3_DIR/<JOB_ID>/aggregated_results.json .
and confirm that there are 16 classifications.
Let's assume that rather downloading urls on the internet, you have an S3 directory containing the images. We can specify file_path_lister
in the job request to get the list of S3 urls for the images, partition the list of S3 urls into batches, and place them on a queue that will be consumed by the workers.
We'll classify the 16 images that can be found here s3://cortex-examples/image-classifier/samples
. You can use AWS CLI to verify that there are 16 images aws s3 ls s3://cortex-examples/image-classifier/samples/
.
Let's do a dry run to make sure the correct list of images will be submitted to the job.
$ export BATCH_API_ENDPOINT=<BATCH_API_ENDPOINT> # e.g. export BATCH_API_ENDPOINT=https://abcdefg.execute-api.us-west-2.amazonaws.com/image-classifier$ export CORTEX_DEST_S3_DIR=<YOUR_S3_DIRECTORY> # e.g. export CORTEX_DEST_S3_DIR=s3://my-bucket/dir$ curl $BATCH_API_ENDPOINT?dryRun=true \-X POST -H "Content-Type: application/json" \-d @- <<EOF{"workers": 1,"file_path_lister": {"s3_paths": ["s3://cortex-examples/image-classifier/samples"],"includes": ["**.jpg"],"batch_size": 2},"config": {"dest_s3_dir": "$CORTEX_DEST_S3_DIR"}}EOF
Note: if you are prompted with >
then type EOF
.
You should expect a response like this:
s3://cortex-examples/image-classifier/samples/img_0.jpgs3://cortex-examples/image-classifier/samples/img_1.jpg...s3://cortex-examples/image-classifier/samples/img_8.jpgs3://cortex-examples/image-classifier/samples/img_9.jpgvalidations passed
Let's actually submit the job now. Your Batch API will get all of the input S3 files based on s3_paths
and will apply the filters specified in includes
and excludes
.
$ export BATCH_API_ENDPOINT=<BATCH_API_ENDPOINT> # e.g. export BATCH_API_ENDPOINT=https://abcdefg.execute-api.us-west-2.amazonaws.com/image-classifier$ export CORTEX_DEST_S3_DIR=<YOUR_S3_DIRECTORY> # e.g. export CORTEX_DEST_S3_DIR=s3://my-bucket/dir$ curl $BATCH_API_ENDPOINT \-X POST -H "Content-Type: application/json" \-d @- <<EOF{"workers": 1,"file_path_lister": {"s3_paths": ["s3://cortex-examples/image-classifier/samples"],"includes": ["**.jpg"],"batch_size": 2},"config": {"dest_s3_dir": "$CORTEX_DEST_S3_DIR"}}EOF
Note: if you are prompted with >
then type EOF
.
You should get a response like this:
{"job_id":"69d6f8a472f0e1e5","api_name":"image-classifier", "config":{"dest_s3_dir": "YOUR_S3_BUCKET_HERE"}}
Wait for the job to complete by streaming the logs with cortex logs <BATCH_API_NAME> <JOB_ID>
or watching for the job status to change with cortex get <BATCH_API_NAME> <JOB_ID> --watch
.
$ cortex logs image-classifier 69d6f8a472f0e1e5 --env aws​started enqueuing batches to queuecompleted enqueuing a total of 8 batchesspinning up workers...2020-07-18 21:35:34.186348:cortex:pid-1:INFO:downloading the project code...2020-08-07 15:49:10.889839:cortex:pid-25:INFO:processing batch d0e695bc-a975-4115-a60f-0a55c743fc572020-08-07 15:49:31.188943:cortex:pid-25:INFO:executing on_job_complete2020-08-07 15:49:31.362053:cortex:pid-25:INFO:no batches left in queue, job has been completed
The status of your job, which you can get from cortex get <BATCH_API_NAME> <JOB_ID>
, should change from running
to succeeded
once the job has completed. If it changes to a different status, you may be able to find the stacktrace using cortex logs <BATCH_API_NAME> <JOB_ID>
. If your job has completed successfully, you can view the results of the image classification in the S3 directory you specified in the job submission.
Using the AWS CLI:
$ aws s3 ls $CORTEX_DEST_S3_DIR/<JOB_ID>/6bee7412-4c16-4d9f-ab3e-e88669cf7a89.json3c45b4b3-953e-4226-865b-75f3961dcf95.jsond0e695bc-a975-4115-a60f-0a55c743fc57.json...aggregated_results.json
You can download the aggregated results file with aws s3 cp $CORTEX_DEST_S3_DIR/<JOB_ID>/aggregated_results.json .
and confirm that there are 16 classifications.
You can stop a running job by sending a DELETE request to <BATCH_API_ENDPOINT>/<JOB_ID>
.
$ export BATCH_API_ENDPOINT=<BATCH_API_ENDPOINT> # e.g. export BATCH_API_ENDPOINT=https://abcdefg.execute-api.us-west-2.amazonaws.com/image-classifier$ curl -X DELETE $BATCH_API_ENDPOINT?jobID=69d96a01ea55da8c​stopped job 69d96a01ea55da8c
You can also use the Cortex CLI cortex delete <BATCH_API_NAME> <JOB_ID>
.
$ cortex delete image-classifier 69d96a01ea55da8c --env aws​stopped job 69d96a01ea55da8c
Run cortex delete
to delete the API:
$ cortex delete image-classifier --env aws​deleting image-classifier
Running cortex delete
will stop all in progress jobs for the API and will delete job history for that API. It will not spin down your cluster.
Deploy another one of our batch examples.
See our exporting guide for how to export your model to use in an API.
Try the realtime API tutorial to learn how to deploy realtime APIs in Cortex.
See uninstall if you'd like to spin down your cluster.