Coverage for drivers/cowutil.py : 69%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python3
2#
3# Copyright (C) 2024 Vates SAS
4#
5# This program is free software: you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation, either version 3 of the License, or
8# (at your option) any later version.
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with this program. If not, see <https://www.gnu.org/licenses/>.
17from sm_typing import Any, Callable, Dict, Final, List, Optional, Sequence, Union
19from abc import ABC, abstractmethod
20from enum import IntEnum
22import errno
23import time
25import util
27from vditype import VdiType
29# ------------------------------------------------------------------------------
31IMAGE_FORMAT_COW_FLAG: Final = 1 << 8
33class ImageFormat(IntEnum):
34 RAW = 1
35 VHD = 2 | IMAGE_FORMAT_COW_FLAG
36 QCOW2 = 3 | IMAGE_FORMAT_COW_FLAG
38IMAGE_FORMAT_TO_STR: Final = {
39 ImageFormat.RAW: "raw",
40 ImageFormat.VHD: "vhd",
41 ImageFormat.QCOW2: "qcow2"
42}
44STR_TO_IMAGE_FORMAT: Final = {v: k for k, v in IMAGE_FORMAT_TO_STR.items()}
46# ------------------------------------------------------------------------------
48def parseImageFormats(
49 str_formats: Optional[str],
50 default_formats: List[ImageFormat],
51 supported_formats: List[ImageFormat]
52) -> List[ImageFormat]:
53 default_formats = [f for f in default_formats if f in supported_formats]
55 if not str_formats: 55 ↛ 58line 55 didn't jump to line 58, because the condition on line 55 was never false
56 return default_formats
58 image_formats: List[ImageFormat] = []
59 for entry in str_formats.split(","):
60 image_format = STR_TO_IMAGE_FORMAT.get(entry.strip())
61 if image_format in supported_formats and image_format not in image_formats:
62 image_formats.append(image_format)
64 if image_formats:
65 return image_formats
67 return default_formats
69# ------------------------------------------------------------------------------
71class CowImageInfo(object):
72 uuid = ""
73 path = ""
74 sizeVirt = -1
75 sizePhys = -1
76 sizeAllocated = -1
77 hidden = False
78 parentUuid = ""
79 parentPath = ""
80 error: Any = 0
82 def __init__(self, uuid):
83 self.uuid = uuid
85# ------------------------------------------------------------------------------
87class CowUtil(ABC):
88 class CheckResult(IntEnum):
89 Success = 0
90 Fail = 1
91 Unavailable = 2
93 @abstractmethod
94 def getMinImageSize(self) -> int:
95 pass
97 @abstractmethod
98 def getMaxImageSize(self) -> int:
99 pass
101 @abstractmethod
102 def getBlockSize(self, path: str) -> int:
103 pass
105 @abstractmethod
106 def getFooterSize(self) -> int:
107 pass
109 @abstractmethod
110 def getDefaultPreallocationSizeVirt(self) -> int:
111 pass
113 @abstractmethod
114 def getMaxChainLength(self) -> int:
115 pass
117 @abstractmethod
118 def calcOverheadEmpty(self, virtual_size: int, block_size: Optional[int] = None) -> int:
119 pass
121 @abstractmethod
122 def calcOverheadBitmap(self, virtual_size: int) -> int:
123 pass
125 @abstractmethod
126 def getInfo(
127 self,
128 path: str,
129 extractUuidFunction: Callable[[str], str],
130 includeParent: bool = True,
131 resolveParent: bool = True,
132 useBackupFooter: bool = False
133 ) -> CowImageInfo:
134 pass
136 @abstractmethod
137 def getInfoFromLVM(
138 self, lvName: str, extractUuidFunction: Callable[[str], str], vgName: str
139 ) -> Optional[CowImageInfo]:
140 pass
142 @abstractmethod
143 def getAllInfoFromVG(
144 self,
145 pattern: str,
146 extractUuidFunction: Callable[[str], str],
147 vgName: Optional[str] = None,
148 parents: bool = False,
149 exitOnError: bool = False
150 ) -> Dict[str, CowImageInfo]:
151 pass
153 @abstractmethod
154 def getParent(self, path: str, extractUuidFunction: Callable[[str], str]) -> Optional[str]:
155 pass
157 @abstractmethod
158 def getParentNoCheck(self, path: str) -> Optional[str]:
159 pass
161 @abstractmethod
162 def hasParent(self, path: str) -> bool:
163 pass
165 @abstractmethod
166 def setParent(self, path: str, parentPath: str, parentRaw: bool) -> None:
167 pass
169 @abstractmethod
170 def getHidden(self, path: str) -> bool:
171 pass
173 @abstractmethod
174 def setHidden(self, path: str, hidden: bool = True) -> None:
175 pass
177 @abstractmethod
178 def getSizeVirt(self, path: str) -> int:
179 pass
181 @abstractmethod
182 def setSizeVirt(self, path: str, size: int, jFile: str) -> None:
183 pass
185 @abstractmethod
186 def setSizeVirtFast(self, path: str, size: int) -> None:
187 pass
189 @abstractmethod
190 def getMaxResizeSize(self, path: str) -> int:
191 pass
193 @abstractmethod
194 def getSizePhys(self, path: str) -> int:
195 pass
197 @abstractmethod
198 def setSizePhys(self, path: str, size: int, debug: bool = True) -> None:
199 pass
201 @abstractmethod
202 def getAllocatedSize(self, path: str) -> int:
203 pass
205 @abstractmethod
206 def getResizeJournalSize(self) -> int:
207 pass
209 @abstractmethod
210 def killData(self, path: str) -> None:
211 pass
213 @abstractmethod
214 def getDepth(self, path: str) -> int:
215 pass
217 @abstractmethod
218 def getBlockBitmap(self, path: str) -> bytes:
219 pass
221 @abstractmethod
222 def coalesce(self, path: str) -> int:
223 pass
225 @abstractmethod
226 def create(self, path: str, size: int, static: bool, msize: int = 0, block_size: Optional[int] = None) -> None:
227 pass
229 @abstractmethod
230 def snapshot(
231 self,
232 path: str,
233 parent: str,
234 parentRaw: bool,
235 msize: int = 0,
236 checkEmpty: bool = True,
237 is_mirror_image: bool = False
238 ) -> None:
239 pass
241 @abstractmethod
242 def canSnapshotRaw(self, size: int) -> bool:
243 pass
245 @abstractmethod
246 def check(
247 self,
248 path: str,
249 ignoreMissingFooter: bool = False,
250 fast: bool = False
251 ) -> 'CowUtil.CheckResult':
252 pass
254 @abstractmethod
255 def revert(self, path: str, jFile: str) -> None:
256 pass
258 @abstractmethod
259 def repair(self, path: str) -> None:
260 pass
262 @abstractmethod
263 def validateAndRoundImageSize(self, size: int) -> int:
264 pass
266 @abstractmethod
267 def getKeyHash(self, path: str) -> Optional[str]:
268 pass
270 @abstractmethod
271 def setKey(self, path: str, key_hash: str) -> None:
272 pass
274 # The availability of coalesceOnline and cancelCoalesceOnline are dependent on isCoalesceableOnRemote() returning True
275 # If not, both function should raise NotImplementedError
276 @abstractmethod
277 def isCoalesceableOnRemote(self) -> bool:
278 pass
280 @abstractmethod
281 def coalesceOnline(self, path: str) -> int:
282 pass
284 @abstractmethod
285 def cancelCoalesceOnline(self, path: str) -> None:
286 pass
288 def getParentChain(self, lvName: str, extractUuidFunction: Callable[[str], str], vgName: str) -> Dict[str, str]:
289 """
290 Get the chain of all parents of 'path'. Safe to call for raw VDI's as well.
291 """
292 chain = {}
293 vdis: Dict[str, CowImageInfo] = {}
294 retries = 0
295 while (not vdis):
296 if retries > 60: 296 ↛ 297line 296 didn't jump to line 297, because the condition on line 296 was never true
297 util.SMlog('ERROR: getAllInfoFromVG returned 0 VDIs after %d retries' % retries)
298 util.SMlog('ERROR: the image metadata might be corrupted')
299 break
300 vdis = self.getAllInfoFromVG(lvName, extractUuidFunction, vgName, True, True)
301 if (not vdis): 301 ↛ 302line 301 didn't jump to line 302, because the condition on line 301 was never true
302 retries = retries + 1
303 time.sleep(1)
304 for uuid, vdi in vdis.items(): 304 ↛ 305line 304 didn't jump to line 305, because the loop on line 304 never started
305 chain[uuid] = vdi.path
306 #util.SMlog("Parent chain for %s: %s" % (lvName, chain))
307 return chain
309 @staticmethod
310 def isCowImage(image_format: ImageFormat) -> bool:
311 return bool(image_format & IMAGE_FORMAT_COW_FLAG)
313 @staticmethod
314 def _ioretry(cmd: Sequence[str], text: bool = True) -> Union[str, bytes]:
315 return util.ioretry(
316 lambda: util.pread2(cmd, text=text),
317 errlist=[errno.EIO, errno.EAGAIN]
318 )
320# ------------------------------------------------------------------------------
322def getImageFormatFromVdiType(vdi_type: str) -> ImageFormat:
323 if vdi_type == VdiType.RAW:
324 return ImageFormat.RAW
325 if vdi_type == VdiType.VHD:
326 return ImageFormat.VHD
327 if vdi_type == VdiType.QCOW2: 327 ↛ 330line 327 didn't jump to line 330, because the condition on line 327 was never false
328 return ImageFormat.QCOW2
330 assert False, f"Unsupported vdi type: {vdi_type}"
332def getImageStringFromVdiType(vdi_type: str) -> str:
333 return IMAGE_FORMAT_TO_STR[getImageFormatFromVdiType(vdi_type)]
335def getVdiTypeFromImageFormat(image_format: ImageFormat) -> str:
336 if image_format == ImageFormat.RAW: 336 ↛ 337line 336 didn't jump to line 337, because the condition on line 336 was never true
337 return VdiType.RAW
338 if image_format == ImageFormat.VHD: 338 ↛ 340line 338 didn't jump to line 340, because the condition on line 338 was never false
339 return VdiType.VHD
340 if image_format == ImageFormat.QCOW2:
341 return VdiType.QCOW2
343 assert False, f"Unsupported image format: {IMAGE_FORMAT_TO_STR[image_format]}"
345# ------------------------------------------------------------------------------
347def getCowUtilFromImageFormat(image_format: ImageFormat) -> CowUtil:
348 import vhdutil
349 import qcow2util
351 if image_format in (ImageFormat.RAW, ImageFormat.VHD):
352 return vhdutil.VhdUtil()
354 if image_format == ImageFormat.QCOW2: 354 ↛ 357line 354 didn't jump to line 357, because the condition on line 354 was never false
355 return qcow2util.QCowUtil()
357 assert False, f"Unsupported image format: {image_format}"
359def getCowUtil(vdi_type: str) -> CowUtil:
360 return getCowUtilFromImageFormat(getImageFormatFromVdiType(vdi_type))