Verified Commit 6b8b7c5f authored by Petr Špaček's avatar Petr Špaček
Browse files

Wrapper to expose S3 objects as named pipes

This script internally uses multiple processes to execute
`aws s3api` command line utility in parallel.
`aws s3api` handles download parallelization etc. so
we reuse it instead of reimplementing it ourselves.
parent 9ea08d6d
Pipeline #78512 passed with stage
in 4 minutes and 22 seconds
Wrapper to create and fill named pipes for all objects in S3 bucket matching given key prefix.
Object download into newly created pipes starts immediatelly, and user-provided command is
executed as soon as all named pipes are created.
User-provided command should start reading from all pipes it needs right away otherwise
S3 might might terminate the connection because of timeout.
- boto3 library
- aws s3api command line interface
import argparse
from pathlib import Path
import logging
import os
import sys
from typing import Iterator
import subprocess
import boto3
log = logging.getLogger('s3fifo')
def download_object_to_fifo(bucket: str, key: str, fspath: Path) -> subprocess.Popen:
"""Start asynchronous download into newly created named pipe."""
# pylint: disable=consider-using-with
return subprocess.Popen(['aws', 's3api', 'get-object',
'--bucket', bucket,
'--key', key,
stdout=sys.stderr, stderr=sys.stderr, close_fds=True)
def list_object_keys(bucket: str, key_prefix: str) -> Iterator[str]:
"""Yield all keys with matching prefix."""
s3cli = boto3.client('s3')
paginator = s3cli.get_paginator('list_objects_v2')
response_iterator = paginator.paginate(
for page in response_iterator:
if page['KeyCount'] <= 0:
raise RuntimeError('S3 returned empty set of objects matching criteria, '
'maybe a wrong prefix?')
for one_object in page['Contents']:
yield one_object['Key']
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)-15s %(message)s')
parser = argparse.ArgumentParser(
description="Wrapper to create named pipes for objects in given S3 bucket and prefix. "
"Command is executed after the directory is populated.",
parser.add_argument('--bucket', type=str, default='', help=' ')
parser.add_argument('--key-prefix', type=str,
help='S3 key prefix to filter objets in bucket on')
parser.add_argument('--outdir', type=Path, required=True,
help='Directory to create named pipes in; must not exist')
parser.add_argument('command', type=str, nargs='+',
help='command to be executed')
args = parser.parse_args()
# must not exist, this is to ensure we do not mix old/orphan pipes with fresh ones
download_jobs = []
for objkey in list_object_keys(args.bucket, args.key_prefix):
filename = Path(objkey).name
outpath = args.outdir / filename'%s %s -> %s', args.bucket, objkey, outpath)
download_object_to_fifo(args.bucket, objkey, outpath))
# wrapper did it's job, run user-specified command'executing %s', args.command)
ret =, check=False)'command %s exited with %d', ret.args, ret.returncode)
# terminate AWS s3api subprocesses
for proc in download_jobs:
logging.debug('killing %s', proc.args)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment