Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/bin/env python3 

2# 

3# Copyright (C) 2024 Vates SAS 

4# 

5# This program is free software: you can redistribute it and/or modify 

6# it under the terms of the GNU General Public License as published by 

7# the Free Software Foundation, either version 3 of the License, or 

8# (at your option) any later version. 

9# This program is distributed in the hope that it will be useful, 

10# but WITHOUT ANY WARRANTY; without even the implied warranty of 

11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

12# GNU General Public License for more details. 

13# 

14# You should have received a copy of the GNU General Public License 

15# along with this program. If not, see <https://www.gnu.org/licenses/>. 

16 

17from sm_typing import Any, Callable, Dict, Final, List, Optional, Sequence, Union 

18 

19from abc import ABC, abstractmethod 

20from enum import IntEnum 

21 

22import errno 

23import time 

24 

25import util 

26 

27from vditype import VdiType 

28 

29# ------------------------------------------------------------------------------ 

30 

31IMAGE_FORMAT_COW_FLAG: Final = 1 << 8 

32 

33class ImageFormat(IntEnum): 

34 RAW = 1 

35 VHD = 2 | IMAGE_FORMAT_COW_FLAG 

36 QCOW2 = 3 | IMAGE_FORMAT_COW_FLAG 

37 

38IMAGE_FORMAT_TO_STR: Final = { 

39 ImageFormat.RAW: "raw", 

40 ImageFormat.VHD: "vhd", 

41 ImageFormat.QCOW2: "qcow2" 

42} 

43 

44STR_TO_IMAGE_FORMAT: Final = {v: k for k, v in IMAGE_FORMAT_TO_STR.items()} 

45 

46# ------------------------------------------------------------------------------ 

47 

48def parseImageFormats( 

49 str_formats: Optional[str], 

50 default_formats: List[ImageFormat], 

51 supported_formats: List[ImageFormat] 

52) -> List[ImageFormat]: 

53 default_formats = [f for f in default_formats if f in supported_formats] 

54 

55 if not str_formats: 55 ↛ 58line 55 didn't jump to line 58, because the condition on line 55 was never false

56 return default_formats 

57 

58 image_formats: List[ImageFormat] = [] 

59 for entry in str_formats.split(","): 

60 image_format = STR_TO_IMAGE_FORMAT.get(entry.strip()) 

61 if image_format in supported_formats and image_format not in image_formats: 

62 image_formats.append(image_format) 

63 

64 if image_formats: 

65 return image_formats 

66 

67 return default_formats 

68 

69# ------------------------------------------------------------------------------ 

70 

71class CowImageInfo(object): 

72 uuid = "" 

73 path = "" 

74 sizeVirt = -1 

75 sizePhys = -1 

76 sizeAllocated = -1 

77 hidden = False 

78 parentUuid = "" 

79 parentPath = "" 

80 error: Any = 0 

81 

82 def __init__(self, uuid): 

83 self.uuid = uuid 

84 

85# ------------------------------------------------------------------------------ 

86 

87class CowUtil(ABC): 

88 class CheckResult(IntEnum): 

89 Success = 0 

90 Fail = 1 

91 Unavailable = 2 

92 

93 @abstractmethod 

94 def getMinImageSize(self) -> int: 

95 pass 

96 

97 @abstractmethod 

98 def getMaxImageSize(self) -> int: 

99 pass 

100 

101 @abstractmethod 

102 def getBlockSize(self, path: str) -> int: 

103 pass 

104 

105 @abstractmethod 

106 def getFooterSize(self) -> int: 

107 pass 

108 

109 @abstractmethod 

110 def getDefaultPreallocationSizeVirt(self) -> int: 

111 pass 

112 

113 @abstractmethod 

114 def getMaxChainLength(self) -> int: 

115 pass 

116 

117 @abstractmethod 

118 def calcOverheadEmpty(self, virtual_size: int, block_size: Optional[int] = None) -> int: 

119 pass 

120 

121 @abstractmethod 

122 def calcOverheadBitmap(self, virtual_size: int) -> int: 

123 pass 

124 

125 @abstractmethod 

126 def getInfo( 

127 self, 

128 path: str, 

129 extractUuidFunction: Callable[[str], str], 

130 includeParent: bool = True, 

131 resolveParent: bool = True, 

132 useBackupFooter: bool = False 

133 ) -> CowImageInfo: 

134 pass 

135 

136 @abstractmethod 

137 def getInfoFromLVM( 

138 self, lvName: str, extractUuidFunction: Callable[[str], str], vgName: str 

139 ) -> Optional[CowImageInfo]: 

140 pass 

141 

142 @abstractmethod 

143 def getAllInfoFromVG( 

144 self, 

145 pattern: str, 

146 extractUuidFunction: Callable[[str], str], 

147 vgName: Optional[str] = None, 

148 parents: bool = False, 

149 exitOnError: bool = False 

150 ) -> Dict[str, CowImageInfo]: 

151 pass 

152 

153 @abstractmethod 

154 def getParent(self, path: str, extractUuidFunction: Callable[[str], str]) -> Optional[str]: 

155 pass 

156 

157 @abstractmethod 

158 def getParentNoCheck(self, path: str) -> Optional[str]: 

159 pass 

160 

161 @abstractmethod 

162 def hasParent(self, path: str) -> bool: 

163 pass 

164 

165 @abstractmethod 

166 def setParent(self, path: str, parentPath: str, parentRaw: bool) -> None: 

167 pass 

168 

169 @abstractmethod 

170 def getHidden(self, path: str) -> bool: 

171 pass 

172 

173 @abstractmethod 

174 def setHidden(self, path: str, hidden: bool = True) -> None: 

175 pass 

176 

177 @abstractmethod 

178 def getSizeVirt(self, path: str) -> int: 

179 pass 

180 

181 @abstractmethod 

182 def setSizeVirt(self, path: str, size: int, jFile: str) -> None: 

183 pass 

184 

185 @abstractmethod 

186 def setSizeVirtFast(self, path: str, size: int) -> None: 

187 pass 

188 

189 @abstractmethod 

190 def getMaxResizeSize(self, path: str) -> int: 

191 pass 

192 

193 @abstractmethod 

194 def getSizePhys(self, path: str) -> int: 

195 pass 

196 

197 @abstractmethod 

198 def setSizePhys(self, path: str, size: int, debug: bool = True) -> None: 

199 pass 

200 

201 @abstractmethod 

202 def getAllocatedSize(self, path: str) -> int: 

203 pass 

204 

205 @abstractmethod 

206 def getResizeJournalSize(self) -> int: 

207 pass 

208 

209 @abstractmethod 

210 def killData(self, path: str) -> None: 

211 pass 

212 

213 @abstractmethod 

214 def getDepth(self, path: str) -> int: 

215 pass 

216 

217 @abstractmethod 

218 def getBlockBitmap(self, path: str) -> bytes: 

219 pass 

220 

221 @abstractmethod 

222 def coalesce(self, path: str) -> int: 

223 pass 

224 

225 @abstractmethod 

226 def create(self, path: str, size: int, static: bool, msize: int = 0, block_size: Optional[int] = None) -> None: 

227 pass 

228 

229 @abstractmethod 

230 def snapshot( 

231 self, 

232 path: str, 

233 parent: str, 

234 parentRaw: bool, 

235 msize: int = 0, 

236 checkEmpty: bool = True, 

237 is_mirror_image: bool = False 

238 ) -> None: 

239 pass 

240 

241 @abstractmethod 

242 def canSnapshotRaw(self, size: int) -> bool: 

243 pass 

244 

245 @abstractmethod 

246 def check( 

247 self, 

248 path: str, 

249 ignoreMissingFooter: bool = False, 

250 fast: bool = False 

251 ) -> 'CowUtil.CheckResult': 

252 pass 

253 

254 @abstractmethod 

255 def revert(self, path: str, jFile: str) -> None: 

256 pass 

257 

258 @abstractmethod 

259 def repair(self, path: str) -> None: 

260 pass 

261 

262 @abstractmethod 

263 def validateAndRoundImageSize(self, size: int) -> int: 

264 pass 

265 

266 @abstractmethod 

267 def getKeyHash(self, path: str) -> Optional[str]: 

268 pass 

269 

270 @abstractmethod 

271 def setKey(self, path: str, key_hash: str) -> None: 

272 pass 

273 

274 # The availability of coalesceOnline and cancelCoalesceOnline are dependent on isCoalesceableOnRemote() returning True 

275 # If not, both function should raise NotImplementedError 

276 @abstractmethod 

277 def isCoalesceableOnRemote(self) -> bool: 

278 pass 

279 

280 @abstractmethod 

281 def coalesceOnline(self, path: str) -> int: 

282 pass 

283 

284 @abstractmethod 

285 def cancelCoalesceOnline(self, path: str) -> None: 

286 pass 

287 

288 def getParentChain(self, lvName: str, extractUuidFunction: Callable[[str], str], vgName: str) -> Dict[str, str]: 

289 """ 

290 Get the chain of all parents of 'path'. Safe to call for raw VDI's as well. 

291 """ 

292 chain = {} 

293 vdis: Dict[str, CowImageInfo] = {} 

294 retries = 0 

295 while (not vdis): 

296 if retries > 60: 296 ↛ 297line 296 didn't jump to line 297, because the condition on line 296 was never true

297 util.SMlog('ERROR: getAllInfoFromVG returned 0 VDIs after %d retries' % retries) 

298 util.SMlog('ERROR: the image metadata might be corrupted') 

299 break 

300 vdis = self.getAllInfoFromVG(lvName, extractUuidFunction, vgName, True, True) 

301 if (not vdis): 301 ↛ 302line 301 didn't jump to line 302, because the condition on line 301 was never true

302 retries = retries + 1 

303 time.sleep(1) 

304 for uuid, vdi in vdis.items(): 304 ↛ 305line 304 didn't jump to line 305, because the loop on line 304 never started

305 chain[uuid] = vdi.path 

306 #util.SMlog("Parent chain for %s: %s" % (lvName, chain)) 

307 return chain 

308 

309 @staticmethod 

310 def isCowImage(image_format: ImageFormat) -> bool: 

311 return bool(image_format & IMAGE_FORMAT_COW_FLAG) 

312 

313 @staticmethod 

314 def _ioretry(cmd: Sequence[str], text: bool = True) -> Union[str, bytes]: 

315 return util.ioretry( 

316 lambda: util.pread2(cmd, text=text), 

317 errlist=[errno.EIO, errno.EAGAIN] 

318 ) 

319 

320# ------------------------------------------------------------------------------ 

321 

322def getImageFormatFromVdiType(vdi_type: str) -> ImageFormat: 

323 if vdi_type == VdiType.RAW: 

324 return ImageFormat.RAW 

325 if vdi_type == VdiType.VHD: 

326 return ImageFormat.VHD 

327 if vdi_type == VdiType.QCOW2: 327 ↛ 330line 327 didn't jump to line 330, because the condition on line 327 was never false

328 return ImageFormat.QCOW2 

329 

330 assert False, f"Unsupported vdi type: {vdi_type}" 

331 

332def getImageStringFromVdiType(vdi_type: str) -> str: 

333 return IMAGE_FORMAT_TO_STR[getImageFormatFromVdiType(vdi_type)] 

334 

335def getVdiTypeFromImageFormat(image_format: ImageFormat) -> str: 

336 if image_format == ImageFormat.RAW: 336 ↛ 337line 336 didn't jump to line 337, because the condition on line 336 was never true

337 return VdiType.RAW 

338 if image_format == ImageFormat.VHD: 338 ↛ 340line 338 didn't jump to line 340, because the condition on line 338 was never false

339 return VdiType.VHD 

340 if image_format == ImageFormat.QCOW2: 

341 return VdiType.QCOW2 

342 

343 assert False, f"Unsupported image format: {IMAGE_FORMAT_TO_STR[image_format]}" 

344 

345# ------------------------------------------------------------------------------ 

346 

347def getCowUtilFromImageFormat(image_format: ImageFormat) -> CowUtil: 

348 import vhdutil 

349 import qcow2util 

350 

351 if image_format in (ImageFormat.RAW, ImageFormat.VHD): 

352 return vhdutil.VhdUtil() 

353 

354 if image_format == ImageFormat.QCOW2: 354 ↛ 357line 354 didn't jump to line 357, because the condition on line 354 was never false

355 return qcow2util.QCowUtil() 

356 

357 assert False, f"Unsupported image format: {image_format}" 

358 

359def getCowUtil(vdi_type: str) -> CowUtil: 

360 return getCowUtilFromImageFormat(getImageFormatFromVdiType(vdi_type))