Source code for pycognaize.document.snapshot_downloader

import fnmatch
import logging
import os
from pathlib import Path, PurePath

import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
from botocore.paginate import Paginator

from ..login import Login


[docs] class SnapshotDownloader: """ A class for downloading snapshots from an S3 bucket to a local destination. Attributes: PREFIX (str): The prefix used to indicate S3 paths, which is 's3://'. Methods: download( snapshot_path, destination_path, exclude=None, continue_token=None): Downloads snapshots from an S3 bucket to a local destination. Private Methods: _get_parts_from_path(path): Extracts the bucket name and path from an S3 path. _init_s3_objects(): Initializes the S3 client and resource objects. _get_page_iterator(bucket_name, continue_token, paginator, path_without_bucket): Returns an iterator for paginating through S3 bucket contents. _get_relogined_page_iterator(bucket_name, next_token, pagination_config, path_without_bucket): Returns a relogged-in iterator for paginating through S3 bucket contents. _relogin(): Performs reauthentication for AWS credentials. _copy_objects_from_page(bucket_name, exclude, page, snapshot_path, destination_path): Copies objects from an S3 page to the destination. _copy_file_to_dest(s3_object, snapshot_path, destination_path): Copies a single S3 object to the destination. _write_file(path, file_data): Writes file data to a local path. _should_exclude(file_path, exclude): Determines if a file path should be excluded from copying. """ PREFIX = 's3://' def _get_parts_from_path(self, path: str): """ Extracts the bucket name and path from an S3 path. Args: path (str): The S3 path to extract parts from. Returns: tuple: A tuple containing the bucket name and path without the bucket name. """ if not path: return '', '' if not path.startswith(self.PREFIX): raise ValueError('S3 prefix not in path. ' 'It must start with s3://.') path_without_prefix = PurePath( path.split(self.PREFIX, 1)[-1] ) path_parts = path_without_prefix.parts bucket_name = path_parts[0] path_without_bucket = path_without_prefix.relative_to(bucket_name) return bucket_name, path_without_bucket def _init_s3_objects(self): """ Initializes the AWS S3 client and resource objects for interacting with S3. This method sets up the AWS S3 client and resource objects using the AWS credentials obtained from the `Login` class. It also configures the client with retry settings. Note: This method should be called before performing any S3-related operations. Raises: None """ login = Login() session = boto3.session.Session( login.aws_access_key, login.aws_secret_access_key, login.aws_session_token ) client_config = Config(retries={ 'max_attempts': 5, 'mode': 'standard' }) self._client = session.client('s3', config=client_config) self._s3 = session.resource('s3', config=client_config) def _get_page_iterator( self, bucket_name: str, continue_token: str, paginator: Paginator, path_without_bucket: str ): """ Returns an iterator for paginating through the contents of an S3 bucket. Args: bucket_name (str): The name of the S3 bucket to paginate. continue_token (str): An optional continuation token for resuming pagination. paginator (botocore.paginate.Paginator): The paginator object for list objects. path_without_bucket (str): The path within the S3 bucket to start pagination from. Returns: tuple: A tuple containing the page iterator and pagination configuration. The page iterator allows you to iterate through pages of S3 objects within the specified bucket and path. The pagination configuration may include a continuation token for resuming pagination. Raises: None """ pagination_config = dict() if continue_token: pagination_config['StartingToken'] = continue_token page_iterator = iter(paginator.paginate( Bucket=bucket_name, Prefix=str(path_without_bucket), PaginationConfig=pagination_config )) return page_iterator, pagination_config def _get_relogined_page_iterator( self, bucket_name: str, next_token: str, pagination_config: dict, path_without_bucket: str ): """ Returns a relogged-in iterator for paginating through the contents of an S3 bucket. This method is used to obtain an iterator for paginating through S3 objects within the specified bucket and path, while reauthenticating the AWS connection objects. Args: bucket_name (str): The name of the S3 bucket to paginate. next_token (str): An optional continuation token for resuming pagination. pagination_config (dict): The pagination configuration, including the continuation token. path_without_bucket (str): The path within the S3 bucket to start pagination from. Returns: iter: An iterator for paginating through S3 objects. The iterator allows you to iterate through pages of S3 objects within the specified bucket and path, handling reauthentication when needed. The updated pagination configuration may contain a new continuation token if reauthentication occurs. Raises: None """ self._relogin() paginator = self._client.get_paginator('list_objects_v2') if next_token: pagination_config['StartingToken'] = next_token return iter(paginator.paginate( Bucket=bucket_name, Prefix=str(path_without_bucket), PaginationConfig=pagination_config )) def _relogin(self): """ Performs reauthentication to refresh AWS credentials. This method reauthenticates to AWS to refresh the AWS credentials used by the S3 client and resource objects. It utilizes the `Login` class to obtain updated AWS credentials. Note: This method should be called when AWS credentials have expired, typically due to an 'ExpiredToken' error. Raises: None """ print('Relogin called') login = Login() login.relogin() self._init_s3_objects() def _copy_objects_from_page( self, bucket_name: str, exclude: list, include: list, page: dict, snapshot_path: str, destination_path: Path ): """ Copies objects from an S3 page to the local destination path. This method iterates through the objects in the specified S3 page and copies them to the local destination path, excluding any objects that match patterns in the 'exclude' list. Args: bucket_name (str): The name of the S3 bucket. exclude (list): A list of file path patterns to exclude from copying. include (list): A list of file path patterns to include even if excluded. page (dict): The page of S3 objects to copy. snapshot_path (str): The original S3 snapshot path. destination_path (Path): The local destination path. Returns: None This method copies each object from the S3 page to the destination path, excluding objects based on the patterns provided in the 'exclude' list. Raises: None """ if 'Contents' not in page: return for obj in page['Contents']: s3_object = self._s3.Object(bucket_name, obj['Key']) if (self._matches_patterns(s3_object.key, exclude) and not self._matches_patterns(s3_object.key, include)): continue self._copy_file_to_dest(s3_object, snapshot_path, destination_path) def _copy_file_to_dest( self, s3_object, snapshot_path, destination_path): """ Copies a single S3 object to the local destination path. This method copies a single S3 object to the specified local destination path. It also handles reauthentication if an 'ExpiredToken' error is encountered while accessing the S3 object. Args: s3_object (boto3.resources.factory.s3.Object): The S3 object to copy. snapshot_path (str): The original S3 snapshot path. destination_path (Path): The local destination path. Returns: None This method copies the S3 object to the destination path, and in case of an 'ExpiredToken' error, it reauthenticates to AWS and retries the copy operation. Raises: None """ bucket_name = s3_object.bucket_name object_full_path = f's3://{str(bucket_name / PurePath(s3_object.key))}' print(f'Copying: {object_full_path} to {destination_path}.') object_relative_path = ( PurePath(object_full_path).relative_to(snapshot_path)) try: file_data = s3_object.get()['Body'].read() except ClientError as e: if e.response['Error']['Code'] == 'ExpiredToken': self._relogin() s3_object = self._s3.Object(bucket_name, s3_object.key) file_data = s3_object.get()['Body'].read() else: raise new_file_path = str(destination_path / object_relative_path) self._write_file(new_file_path, file_data) print(f'Copied: {object_full_path} to {destination_path}.') def _write_file(self, path, file_data): """ Writes file data to a local path. This method writes the provided file data to the specified local path. It also ensures that the necessary directories leading to the path exist, and it handles potential conflicts gracefully. Args: path (str): The local path where the file data should be written. file_data (bytes): The binary file data to be written. Returns: None This method creates any missing directories in the path and writes the file data to the specified location. Raises: None """ try: os.makedirs(os.path.dirname(path), exist_ok=True) except FileExistsError: logging.info(f'File/folder conflict for ' f'{os.path.dirname(path)} path') return None try: with open(path, 'wb') as f: f.write(file_data) except IsADirectoryError: logging.info(f'File/folder conflict for ' f'{os.path.dirname(path)} path') def _matches_patterns(self, file_path: str, patterns: list): """ Determines whether a file path matches at least one of the patterns. This method checks if the provided file path matches any of the patterns in the 'patterns' list. If a match is found, it returns True, indicating that the file is matched; otherwise, it returns False. Args: file_path (str): The file path to be checked for exclusion. patterns (list): A list of file path patterns to compare against. Returns: bool: True if the file path matches one of the patterns, False otherwise. This method is used to filter out files based on specified exclusion or inclusion patterns. Raises: None """ if not patterns: return False for pattern in patterns: if fnmatch.fnmatch(file_path, pattern): return True return False
[docs] def download( self, snapshot_path: str, destination_path: str, exclude=None, include=None, continue_token: str = None ): """ Downloads snapshots from an S3 bucket to a local destination. This method allows you to download snapshots from an S3 bucket to a specified local destination path. It supports optional exclusion patterns for excluding specific files and can continue downloading from a specified continuation token. Args: snapshot_path (str): The S3 path of the snapshot to download. destination_path (str): The local destination directory where snapshots will be saved. exclude (list, optional): A list of file path patterns to exclude from copying. include (list, optional): A list of file path patterns to include even if excluded. continue_token (str, optional): An optional continuation token for resuming downloads. Returns: None This method initiates the download of snapshots from the specified S3 path to the local destination path. It handles pagination and reauthentication as needed. Progress and status information is printed during the download process. Note: - The 'snapshot_path' should start with 's3://'. - The 'destination_path' will be created if it doesn't exist. Raises: ValueError: If 'snapshot_path' doesn't start with 's3://'. """ if not snapshot_path.startswith('s3://'): raise ValueError('Snapshot path should start with "s3://".') destination_path = Path(destination_path) bucket_name, path_without_bucket = self._get_parts_from_path( snapshot_path ) self._init_s3_objects() paginator = self._client.get_paginator('list_objects_v2') page_iterator, pagination_config = self._get_page_iterator( bucket_name, continue_token, paginator, path_without_bucket ) next_token = None while True: try: page = next(page_iterator) next_token = page.get('NextContinuationToken') self._copy_objects_from_page( bucket_name, exclude, include, page, snapshot_path, destination_path ) except ClientError as e: if e.response['Error']['Code'] == 'ExpiredToken': page_iterator = self._get_relogined_page_iterator( bucket_name, next_token, pagination_config, path_without_bucket ) continue raise except StopIteration: break