跳到正文

S3

基类: BaseKVStore

S3 键值存储。将键值对存储在 S3 存储桶中。可以选择指定一个文件夹路径来存储 KV 数据。KV 数据进一步按集合划分,集合是路径下的子文件夹。每个键值对存储为一个 JSON 文件。

参数

名称

类型 描述 默认值 s3_bucket
Any boto3 S3 Bucket 实例

必需的

path
可选的[str] S3 存储桶中存储 KV 数据的文件夹路径

源代码位于 llama-index-integrations/storage/kvstore/llama-index-storage-kvstore-s3/llama_index/storage/kvstore/s3/base.py

'./'
from_s3_location classmethod #
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
class S3DBKVStore(BaseKVStore):
    """
    S3 Key-Value store.
    Stores key-value pairs in a S3 bucket. Can optionally specify a path to a folder
        where KV data is stored.
    The KV data is further divided into collections, which are subfolders in the path.
    Each key-value pair is stored as a JSON file.

    Args:
        s3_bucket (Any): boto3 S3 Bucket instance
        path (Optional[str]): path to folder in S3 bucket where KV data is stored

    """

    def __init__(
        self,
        bucket: Any,
        path: Optional[str] = "./",
    ) -> None:
        """Init a S3DBKVStore."""
        self._bucket = bucket
        self._path = path or "./"

    @classmethod
    def from_s3_location(
        cls,
        bucket_name: str,
        path: Optional[str] = None,
    ) -> "S3DBKVStore":
        """
        Load a S3DBKVStore from a S3 URI.

        Args:
            bucket_name (str): S3 bucket name
            path (Optional[str]): path to folder in S3 bucket where KV data is stored

        """
        s3 = boto3.resource("s3")
        bucket = s3.Bucket(bucket_name)
        return cls(
            bucket,
            path=path,
        )

    def _get_object_key(self, collection: str, key: str) -> str:
        return str(PurePath(f"{self._path}/{collection}/{key}.json"))

    def put(
        self,
        key: str,
        val: dict,
        collection: str = DEFAULT_COLLECTION,
    ) -> None:
        """
        Put a key-value pair into the store.

        Args:
            key (str): key
            val (dict): value
            collection (str): collection name

        """
        obj_key = self._get_object_key(collection, key)
        self._bucket.put_object(
            Key=obj_key,
            Body=json.dumps(val),
        )

    async def aput(
        self,
        key: str,
        val: dict,
        collection: str = DEFAULT_COLLECTION,
    ) -> None:
        """
        Put a key-value pair into the store.

        Args:
            key (str): key
            val (dict): value
            collection (str): collection name

        """
        raise NotImplementedError

    def get(self, key: str, collection: str = DEFAULT_COLLECTION) -> Optional[dict]:
        """
        Get a value from the store.

        Args:
            key (str): key
            collection (str): collection name

        """
        obj_key = self._get_object_key(collection, key)
        try:
            obj = next(iter(self._bucket.objects.filter(Prefix=obj_key).limit(1)))
        except StopIteration:
            return None
        body = obj.get()["Body"].read()
        return json.loads(body)

    async def aget(
        self, key: str, collection: str = DEFAULT_COLLECTION
    ) -> Optional[dict]:
        """
        Get a value from the store.

        Args:
            key (str): key
            collection (str): collection name

        """
        raise NotImplementedError

    def get_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]:
        """
        Get all values from the store.

        Args:
            collection (str): collection name

        """
        collection_path = str(PurePath(f"{self._path}/{collection}/"))
        collection_kv_dict = {}
        for obj in self._bucket.objects.filter(Prefix=collection_path):
            body = obj.get()["Body"].read()
            json_filename = os.path.split(obj.key)[-1]
            key = os.path.splitext(json_filename)[0]
            value = json.loads(body)
            collection_kv_dict[key] = value
        return collection_kv_dict

    async def aget_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]:
        """
        Get all values from the store.

        Args:
            collection (str): collection name

        """
        raise NotImplementedError

    def delete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool:
        """
        Delete a value from the store.

        Args:
            key (str): key
            collection (str): collection name

        """
        obj_key = self._get_object_key(collection, key)
        matched_objs = list(self._bucket.objects.filter(Prefix=obj_key).limit(1))
        if len(matched_objs) == 0:
            return False
        obj = matched_objs[0]
        obj.delete()
        return True

    async def adelete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool:
        """
        Delete a value from the store.

        Args:
            key (str): key
            collection (str): collection name

        """
        raise NotImplementedError

从 S3 URI 加载 S3DBKVStore。

from_s3_location(bucket_name: str, path: Optional[str] = None) -> S3DBKVStore

bucket_name

名称

类型 描述 默认值 s3_bucket
str S3 存储桶名称

put #

path
可选的[str] S3 存储桶中存储 KV 数据的文件夹路径

源代码位于 llama-index-integrations/storage/kvstore/llama-index-storage-kvstore-s3/llama_index/storage/kvstore/s3/base.py

None
from_s3_location classmethod #
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@classmethod
def from_s3_location(
    cls,
    bucket_name: str,
    path: Optional[str] = None,
) -> "S3DBKVStore":
    """
    Load a S3DBKVStore from a S3 URI.

    Args:
        bucket_name (str): S3 bucket name
        path (Optional[str]): path to folder in S3 bucket where KV data is stored

    """
    s3 = boto3.resource("s3")
    bucket = s3.Bucket(bucket_name)
    return cls(
        bucket,
        path=path,
    )

将键值对放入存储。

put(key: str, val: dict, collection: str = DEFAULT_COLLECTION) -> None

key

名称

类型 描述 默认值 s3_bucket
val S3 存储桶名称

val

path
dict

collection

path
集合名称 S3 存储桶名称

DEFAULT_COLLECTION

aput async #
from_s3_location classmethod #
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def put(
    self,
    key: str,
    val: dict,
    collection: str = DEFAULT_COLLECTION,
) -> None:
    """
    Put a key-value pair into the store.

    Args:
        key (str): key
        val (dict): value
        collection (str): collection name

    """
    obj_key = self._get_object_key(collection, key)
    self._bucket.put_object(
        Key=obj_key,
        Body=json.dumps(val),
    )

get #

aput(key: str, val: dict, collection: str = DEFAULT_COLLECTION) -> None

key

名称

类型 描述 默认值 s3_bucket
val S3 存储桶名称

val

path
dict

collection

path
集合名称 S3 存储桶名称

DEFAULT_COLLECTION

aput async #
from_s3_location classmethod #
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
async def aput(
    self,
    key: str,
    val: dict,
    collection: str = DEFAULT_COLLECTION,
) -> None:
    """
    Put a key-value pair into the store.

    Args:
        key (str): key
        val (dict): value
        collection (str): collection name

    """
    raise NotImplementedError

从存储获取值。

get(key: str, collection: str = DEFAULT_COLLECTION) -> Optional[dict]

aget async #

名称

类型 描述 默认值 s3_bucket
val S3 存储桶名称

val

path
集合名称 S3 存储桶名称

DEFAULT_COLLECTION

aput async #
from_s3_location classmethod #
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def get(self, key: str, collection: str = DEFAULT_COLLECTION) -> Optional[dict]:
    """
    Get a value from the store.

    Args:
        key (str): key
        collection (str): collection name

    """
    obj_key = self._get_object_key(collection, key)
    try:
        obj = next(iter(self._bucket.objects.filter(Prefix=obj_key).limit(1)))
    except StopIteration:
        return None
    body = obj.get()["Body"].read()
    return json.loads(body)

get_all #

aget(key: str, collection: str = DEFAULT_COLLECTION) -> Optional[dict]

aget async #

名称

类型 描述 默认值 s3_bucket
val S3 存储桶名称

val

path
集合名称 S3 存储桶名称

DEFAULT_COLLECTION

aput async #
from_s3_location classmethod #
112
113
114
115
116
117
118
119
120
121
122
123
async def aget(
    self, key: str, collection: str = DEFAULT_COLLECTION
) -> Optional[dict]:
    """
    Get a value from the store.

    Args:
        key (str): key
        collection (str): collection name

    """
    raise NotImplementedError

从存储获取所有值。

get_all(collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]

aget_all async #

名称

类型 描述 默认值 s3_bucket
集合名称 S3 存储桶名称

DEFAULT_COLLECTION

aput async #
from_s3_location classmethod #
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def get_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]:
    """
    Get all values from the store.

    Args:
        collection (str): collection name

    """
    collection_path = str(PurePath(f"{self._path}/{collection}/"))
    collection_kv_dict = {}
    for obj in self._bucket.objects.filter(Prefix=collection_path):
        body = obj.get()["Body"].read()
        json_filename = os.path.split(obj.key)[-1]
        key = os.path.splitext(json_filename)[0]
        value = json.loads(body)
        collection_kv_dict[key] = value
    return collection_kv_dict

delete #

aget_all(collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]

aget_all async #

名称

类型 描述 默认值 s3_bucket
集合名称 S3 存储桶名称

DEFAULT_COLLECTION

aput async #
from_s3_location classmethod #
143
144
145
146
147
148
149
150
151
async def aget_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]:
    """
    Get all values from the store.

    Args:
        collection (str): collection name

    """
    raise NotImplementedError

从存储删除值。

delete(key: str, collection: str = DEFAULT_COLLECTION) -> bool

adelete async #

名称

类型 描述 默认值 s3_bucket
val S3 存储桶名称

val

path
集合名称 S3 存储桶名称

DEFAULT_COLLECTION

aput async #
from_s3_location classmethod #
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def delete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool:
    """
    Delete a value from the store.

    Args:
        key (str): key
        collection (str): collection name

    """
    obj_key = self._get_object_key(collection, key)
    matched_objs = list(self._bucket.objects.filter(Prefix=obj_key).limit(1))
    if len(matched_objs) == 0:
        return False
    obj = matched_objs[0]
    obj.delete()
    return True

返回顶部

adelete(key: str, collection: str = DEFAULT_COLLECTION) -> bool

adelete async #

名称

类型 描述 默认值 s3_bucket
val S3 存储桶名称

val

path
集合名称 S3 存储桶名称

DEFAULT_COLLECTION

aput async #
from_s3_location classmethod #
170
171
172
173
174
175
176
177
178
179
async def adelete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool:
    """
    Delete a value from the store.

    Args:
        key (str): key
        collection (str): collection name

    """
    raise NotImplementedError