mirror of https://github.com/aptly-dev/aptly
91 lines
2.6 KiB
Python
91 lines
2.6 KiB
Python
from lib import BaseTest
|
|
import uuid
|
|
import os
|
|
|
|
try:
|
|
import boto
|
|
|
|
if 'AWS_SECRET_ACCESS_KEY' in os.environ and 'AWS_ACCESS_KEY_ID' in os.environ and \
|
|
os.environ['AWS_SECRET_ACCESS_KEY'] != "" and os.environ['AWS_ACCESS_KEY_ID'] != "":
|
|
s3_conn = boto.connect_s3()
|
|
else:
|
|
print("S3 tests disabled: AWS creds not found in the environment (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)")
|
|
s3_conn = None
|
|
except ImportError as e:
|
|
print("S3 tests disabled: can't import boto", e)
|
|
s3_conn = None
|
|
|
|
|
|
class S3Test(BaseTest):
|
|
"""
|
|
BaseTest + support for S3
|
|
"""
|
|
|
|
s3Overrides = {}
|
|
|
|
def fixture_available(self):
|
|
return super(S3Test, self).fixture_available() and s3_conn is not None
|
|
|
|
def prepare(self):
|
|
self.bucket_name = "aptly-sys-test-" + str(uuid.uuid1())
|
|
self.bucket = s3_conn.create_bucket(self.bucket_name)
|
|
self.configOverride = {"S3PublishEndpoints": {
|
|
"test1": {
|
|
"region": "us-east-1",
|
|
"bucket": self.bucket_name,
|
|
"awsAccessKeyID": os.environ["AWS_ACCESS_KEY_ID"],
|
|
"awsSecretAccessKey": os.environ["AWS_SECRET_ACCESS_KEY"]
|
|
}
|
|
}}
|
|
|
|
self.configOverride["S3PublishEndpoints"]["test1"].update(**self.s3Overrides)
|
|
|
|
super(S3Test, self).prepare()
|
|
|
|
def shutdown(self):
|
|
if hasattr(self, "bucket_name"):
|
|
if hasattr(self, "bucket"):
|
|
keys = self.bucket.list()
|
|
if keys:
|
|
self.bucket.delete_keys(keys)
|
|
s3_conn.delete_bucket(self.bucket_name)
|
|
|
|
super(S3Test, self).shutdown()
|
|
|
|
def check_path(self, path):
|
|
if not hasattr(self, "bucket_contents"):
|
|
self.bucket_contents = [key.name for key in self.bucket.list()]
|
|
|
|
if path.startswith("public/"):
|
|
path = path[7:]
|
|
|
|
if path in self.bucket_contents:
|
|
return True
|
|
|
|
if not path.endswith("/"):
|
|
path = path + "/"
|
|
|
|
for item in self.bucket_contents:
|
|
if item.startswith(path):
|
|
return True
|
|
|
|
return False
|
|
|
|
def check_exists(self, path):
|
|
if not self.check_path(path):
|
|
raise Exception("path %s doesn't exist" % (path, ))
|
|
|
|
def check_not_exists(self, path):
|
|
if self.check_path(path):
|
|
raise Exception("path %s exists" % (path, ))
|
|
|
|
def read_file(self, path, mode=''):
|
|
# We don't support reading as binary here.
|
|
assert not mode
|
|
|
|
if path.startswith("public/"):
|
|
path = path[7:]
|
|
|
|
key = self.bucket.get_key(path)
|
|
return key.get_contents_as_string()
|