Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/bin/env python3 

2# 

3# Copyright (C) 2020 Vates SAS - ronan.abhamon@vates.fr 

4# 

5# This program is free software: you can redistribute it and/or modify 

6# it under the terms of the GNU General Public License as published by 

7# the Free Software Foundation, either version 3 of the License, or 

8# (at your option) any later version. 

9# This program is distributed in the hope that it will be useful, 

10# but WITHOUT ANY WARRANTY; without even the implied warranty of 

11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

12# GNU General Public License for more details. 

13# 

14# You should have received a copy of the GNU General Public License 

15# along with this program. If not, see <https://www.gnu.org/licenses/>. 

16# 

17 

18from sm_typing import ( 

19 Any, 

20 Dict, 

21 List, 

22 override, 

23) 

24 

25import json 

26import linstor 

27import os.path 

28import re 

29import shutil 

30import socket 

31import stat 

32import time 

33import util 

34import uuid 

35from datetime import datetime 

36from pathlib import Path 

37import contextlib 

38 

39# Persistent prefix to add to RAW persistent volumes. 

40PERSISTENT_PREFIX = 'xcp-persistent-' 

41 

42# Contains the data of the "/var/lib/linstor" directory. 

43DATABASE_VOLUME_NAME = PERSISTENT_PREFIX + 'database' 

44DATABASE_SIZE = 1 << 30 # 1GB. 

45DATABASE_PATH = '/var/lib/linstor' 

46DATABASE_MKFS = 'mkfs.ext4' 

47DATABASE_BACKUP_DIR_MAIN = Path(DATABASE_PATH) 

48DATABASE_BACKUP_DIR_SPARE = Path('/var/lib/linstor.d/db-backups') 

49DATABASE_BACKUP_NAME_FORMAT = "linstor_database_backup-{}-{}" 

50DATABASE_BACKUP_NAME_LATEST = "linstor_database_backup-latest.zip" 

51DATABASE_BACKUP_RETENTION = 10 

52DATABASE_BACKUP_DATE_FORMAT = "%Y%m%d_%H%M%S" 

53LINSTOR_SATELLITE_PORT = 3366 

54 

55REG_DRBDADM_PRIMARY = re.compile("([^\\s]+)\\s+role:Primary") 

56REG_DRBDSETUP_IP = re.compile('[^\\s]+\\s+(.*):.*$') 

57 

58DRBD_BY_RES_PATH = '/dev/drbd/by-res/' 

59 

60PLUGIN = 'linstor-manager' 

61 

62 

63# ============================================================================== 

64 

65def get_local_volume_openers(resource_name, volume): 

66 if not resource_name or volume is None: 

67 raise Exception('Cannot get DRBD openers without resource name and/or volume.') 

68 

69 path = '/sys/kernel/debug/drbd/resources/{}/volumes/{}/openers'.format( 

70 resource_name, volume 

71 ) 

72 

73 with open(path, 'r') as openers: 

74 # Not a big cost, so read all lines directly. 

75 lines = openers.readlines() 

76 

77 result = {} 

78 

79 opener_re = re.compile('(.*)\\s+([0-9]+)\\s+([0-9]+)') 

80 for line in lines: 

81 match = opener_re.match(line) 

82 assert match 

83 

84 groups = match.groups() 

85 process_name = groups[0] 

86 pid = groups[1] 

87 open_duration_ms = groups[2] 

88 result[pid] = { 

89 'process-name': process_name, 

90 'open-duration': open_duration_ms 

91 } 

92 

93 return json.dumps(result) 

94 

95def get_all_volume_openers(resource_name, volume): 

96 PLUGIN_CMD = 'getDrbdOpeners' 

97 

98 volume = str(volume) 

99 openers = {} 

100 

101 session = util.get_localAPI_session() 

102 

103 hosts = session.xenapi.host.get_all_records() 

104 for host_ref, host_record in hosts.items(): 

105 node_name = host_record['hostname'] 

106 try: 

107 if not session.xenapi.host_metrics.get_record( 

108 host_record['metrics'] 

109 )['live']: 

110 # Ensure we call plugin on online hosts only. 

111 continue 

112 

113 openers[node_name] = json.loads( 

114 session.xenapi.host.call_plugin(host_ref, PLUGIN, PLUGIN_CMD, { 

115 'resourceName': resource_name, 

116 'volume': volume 

117 }) 

118 ) 

119 except Exception as e: 

120 util.SMlog('Failed to get openers of `{}` on `{}`: {}'.format( 

121 resource_name, node_name, e 

122 )) 

123 

124 return openers 

125 

126 

127# ============================================================================== 

128 

129def round_up(value, divisor): 

130 assert divisor 

131 divisor = int(divisor) 

132 return ((int(value) + divisor - 1) // divisor) * divisor 

133 

134 

135def round_down(value, divisor): 

136 assert divisor 

137 value = int(value) 

138 return value - (value % int(divisor)) 

139 

140 

141# ============================================================================== 

142 

143def _get_controller_addresses() -> List[str]: 

144 try: 

145 (ret, stdout, stderr) = util.doexec([ 

146 "/usr/sbin/ss", "-tnpH", "state", "established", f"( sport = :{LINSTOR_SATELLITE_PORT} )" 

147 ]) 

148 if ret == 0: 

149 return [ 

150 line.split()[3].rsplit(":", 1)[0] 

151 for line in stdout.splitlines() 

152 ] 

153 util.SMlog(f"Unexpected code {ret}: {stderr}") 

154 except Exception as e: 

155 util.SMlog(f"Unable to get controller addresses: {e}") 

156 return [] 

157 

158def _get_controller_uri() -> str: 

159 # TODO: Check that an IP address from the current pool is returned. 

160 addresses = _get_controller_addresses() 

161 return "linstor://" + addresses[0] if addresses else "" 

162 

163def get_controller_uri(): 

164 retries = 0 

165 while True: 

166 uri = _get_controller_uri() 

167 if uri: 

168 return uri 

169 

170 retries += 1 

171 if retries >= 30: 

172 break 

173 time.sleep(1) 

174 

175 

176def get_controller_node_name(): 

177 PLUGIN_CMD = 'hasControllerRunning' 

178 

179 (ret, stdout, stderr) = util.doexec([ 

180 'drbdadm', 'status', DATABASE_VOLUME_NAME 

181 ]) 

182 

183 if ret == 0: 

184 if stdout.startswith('{} role:Primary'.format(DATABASE_VOLUME_NAME)): 

185 return 'localhost' 

186 

187 res = REG_DRBDADM_PRIMARY.search(stdout) 

188 if res: 

189 return res.groups()[0] 

190 

191 session = util.timeout_call(5, util.get_localAPI_session) 

192 

193 for host_ref, host_record in session.xenapi.host.get_all_records().items(): 

194 node_name = host_record['hostname'] 

195 try: 

196 if not session.xenapi.host_metrics.get_record( 

197 host_record['metrics'] 

198 )['live']: 

199 continue 

200 

201 if util.strtobool(session.xenapi.host.call_plugin( 

202 host_ref, PLUGIN, PLUGIN_CMD, {} 

203 )): 

204 return node_name 

205 except Exception as e: 

206 util.SMlog('Failed to call plugin to get controller on `{}`: {}'.format( 

207 node_name, e 

208 )) 

209 

210 

211def demote_drbd_resource(node_name, resource_name): 

212 PLUGIN_CMD = 'demoteDrbdResource' 

213 

214 session = util.timeout_call(5, util.get_localAPI_session) 

215 

216 for host_ref, host_record in session.xenapi.host.get_all_records().items(): 

217 if host_record['hostname'] != node_name: 

218 continue 

219 

220 try: 

221 session.xenapi.host.call_plugin( 

222 host_ref, PLUGIN, PLUGIN_CMD, {'resource_name': resource_name} 

223 ) 

224 except Exception as e: 

225 util.SMlog('Failed to demote resource `{}` on `{}`: {}'.format( 

226 resource_name, node_name, e 

227 )) 

228 raise Exception( 

229 'Can\'t demote resource `{}`, unable to find node `{}`' 

230 .format(resource_name, node_name) 

231 ) 

232 

233# ============================================================================== 

234 

235class LinstorVolumeManagerError(Exception): 

236 ERR_GENERIC = 0, 

237 ERR_VOLUME_EXISTS = 1, 

238 ERR_VOLUME_NOT_EXISTS = 2, 

239 ERR_VOLUME_DESTROY = 3, 

240 ERR_GROUP_NOT_EXISTS = 4, 

241 ERR_VOLUME_IN_USE = 5 

242 

243 def __init__(self, message, code=ERR_GENERIC): 

244 super(LinstorVolumeManagerError, self).__init__(message) 

245 self._code = code 

246 

247 @property 

248 def code(self): 

249 return self._code 

250 

251 

252# ============================================================================== 

253 

254# Note: 

255# If a storage pool is not accessible after a network change: 

256# linstor node interface modify <NODE> default --ip <IP> 

257 

258 

259class LinstorVolumeManager(object): 

260 """ 

261 API to manager LINSTOR volumes in XCP-ng. 

262 A volume in this context is a physical part of the storage layer. 

263 """ 

264 

265 __slots__ = ( 

266 '_linstor', '_uri', '_logger', '_redundancy', 

267 '_base_group_name', '_group_name', '_ha_group_name', 

268 '_volumes', '_storage_pools', '_storage_pools_time', 

269 '_kv_cache', '_resource_cache', '_volume_info_cache', 

270 '_kv_cache_dirty', '_resource_cache_dirty', '_volume_info_cache_dirty', 

271 '_resources_info_cache', 

272 ) 

273 

274 DEV_ROOT_PATH = DRBD_BY_RES_PATH 

275 

276 # Default sector size. 

277 BLOCK_SIZE = 512 

278 

279 # List of volume properties. 

280 PROP_METADATA = 'metadata' 

281 PROP_NOT_EXISTS = 'not-exists' 

282 PROP_VOLUME_NAME = 'volume-name' 

283 PROP_IS_READONLY_TIMESTAMP = 'readonly-timestamp' 

284 

285 # A volume can only be locked for a limited duration. 

286 # The goal is to give enough time to slaves to execute some actions on 

287 # a device before an UUID update or a coalesce for example. 

288 # Expiration is expressed in seconds. 

289 LOCKED_EXPIRATION_DELAY = 1 * 60 

290 

291 # Used when volume uuid is being updated. 

292 PROP_UPDATING_UUID_SRC = 'updating-uuid-src' 

293 

294 # States of property PROP_NOT_EXISTS. 

295 STATE_EXISTS = '0' 

296 STATE_NOT_EXISTS = '1' 

297 STATE_CREATING = '2' 

298 

299 # Property namespaces. 

300 NAMESPACE_SR = 'xcp/sr' 

301 NAMESPACE_VOLUME = 'xcp/volume' 

302 

303 # Regex to match properties. 

304 REG_PROP = '^([^/]+)/{}$' 

305 

306 REG_METADATA = re.compile(REG_PROP.format(PROP_METADATA)) 

307 REG_NOT_EXISTS = re.compile(REG_PROP.format(PROP_NOT_EXISTS)) 

308 REG_VOLUME_NAME = re.compile(REG_PROP.format(PROP_VOLUME_NAME)) 

309 REG_UPDATING_UUID_SRC = re.compile(REG_PROP.format(PROP_UPDATING_UUID_SRC)) 

310 

311 # Prefixes of SR/VOLUME in the LINSTOR DB. 

312 # A LINSTOR (resource, group, ...) name cannot start with a number. 

313 # So we add a prefix behind our SR/VOLUME uuids. 

314 PREFIX_SR = 'xcp-sr-' 

315 PREFIX_HA = 'xcp-ha-' 

316 PREFIX_VOLUME = 'xcp-volume-' 

317 

318 # Limit request number when storage pool info is asked, we fetch 

319 # the current pool status after N elapsed seconds. 

320 STORAGE_POOLS_FETCH_INTERVAL = 15 

321 

322 @staticmethod 

323 def default_logger(*args): 

324 print(args) 

325 

326 # -------------------------------------------------------------------------- 

327 # API. 

328 # -------------------------------------------------------------------------- 

329 

330 class VolumeInfo(object): 

331 __slots__ = ( 

332 'name', 

333 'allocated_size', # Allocated size, place count is not used. 

334 'virtual_size', # Total virtual available size of this volume 

335 # (i.e. the user size at creation). 

336 'diskful' # Array of nodes that have a diskful volume. 

337 ) 

338 

339 def __init__(self, name): 

340 self.name = name 

341 self.allocated_size = 0 

342 self.virtual_size = 0 

343 self.diskful = [] 

344 

345 @override 

346 def __repr__(self) -> str: 

347 return 'VolumeInfo("{}", {}, {}, {})'.format( 

348 self.name, self.allocated_size, self.virtual_size, 

349 self.diskful 

350 ) 

351 

352 # -------------------------------------------------------------------------- 

353 

354 def __init__( 

355 self, uri, group_name, repair=False, logger=default_logger.__func__, 

356 attempt_count=30 

357 ): 

358 """ 

359 Create a new LinstorVolumeManager object. 

360 :param str uri: URI to communicate with the LINSTOR controller. 

361 :param str group_name: The SR group name to use. 

362 :param bool repair: If true we try to remove bad volumes due to a crash 

363 or unexpected behavior. 

364 :param function logger: Function to log messages. 

365 :param int attempt_count: Number of attempts to join the controller. 

366 """ 

367 

368 self._uri = uri 

369 self._linstor = self._create_linstor_instance( 

370 uri, attempt_count=attempt_count 

371 ) 

372 

373 

374 mismatched_nodes = [ 

375 node for node in self._linstor.node_list().pop().nodes if node.connection_status == "VERSION_MISMATCH" 

376 ] 

377 

378 if mismatched_nodes: 

379 raise LinstorVolumeManagerError( 

380 "Some linstor nodes are not using the same version. " + 

381 f"Incriminated nodes are: {','.join([node.name for node in mismatched_nodes])}" 

382 ) 

383 

384 self._base_group_name = group_name 

385 

386 # Ensure group exists. 

387 group_name = self._build_group_name(group_name) 

388 groups = self._linstor.resource_group_list_raise([group_name]).resource_groups 

389 if not groups: 

390 raise LinstorVolumeManagerError( 

391 'Unable to find `{}` Linstor SR'.format(group_name) 

392 ) 

393 

394 # Ok. ;) 

395 self._logger = logger 

396 self._redundancy = groups[0].select_filter.place_count 

397 self._group_name = group_name 

398 self._ha_group_name = self._build_ha_group_name(self._base_group_name) 

399 self._volumes = set() 

400 self._storage_pools_time = 0 

401 

402 # To increase performance and limit request count to LINSTOR services, 

403 # we use caches. 

404 self._kv_cache = self._create_kv_cache() 

405 self._resource_cache = None 

406 self._resource_cache_dirty = True 

407 self._volume_info_cache = None 

408 self._volume_info_cache_dirty = True 

409 self._resources_info_cache = None 

410 self._build_volumes(repair=repair) 

411 

412 @property 

413 def uri(self) -> str: 

414 return self._uri 

415 

416 @property 

417 def group_name(self): 

418 """ 

419 Give the used group name. 

420 :return: The group name. 

421 :rtype: str 

422 """ 

423 return self._base_group_name 

424 

425 @property 

426 def redundancy(self): 

427 """ 

428 Give the used redundancy. 

429 :return: The redundancy. 

430 :rtype: int 

431 """ 

432 return self._redundancy 

433 

434 @property 

435 def volumes(self): 

436 """ 

437 Give the volumes uuid set. 

438 :return: The volumes uuid set. 

439 :rtype: set(str) 

440 """ 

441 return self._volumes 

442 

443 @property 

444 def max_volume_size_allowed(self): 

445 """ 

446 Give the max volume size currently available in B. 

447 :return: The current size. 

448 :rtype: int 

449 """ 

450 

451 candidates = self._find_best_size_candidates() 

452 if not candidates: 

453 raise LinstorVolumeManagerError( 

454 'Failed to get max volume size allowed' 

455 ) 

456 

457 size = candidates[0].max_volume_size 

458 if size < 0: 

459 raise LinstorVolumeManagerError( 

460 'Invalid max volume size allowed given: {}'.format(size) 

461 ) 

462 return self.round_down_volume_size(size * 1024) 

463 

464 @property 

465 def physical_size(self): 

466 """ 

467 Give the total physical size of the SR. 

468 :return: The physical size. 

469 :rtype: int 

470 """ 

471 return self._compute_size('total_capacity') 

472 

473 @property 

474 def physical_free_size(self): 

475 """ 

476 Give the total free physical size of the SR. 

477 :return: The physical free size. 

478 :rtype: int 

479 """ 

480 return self._compute_size('free_capacity') 

481 

482 @property 

483 def allocated_volume_size(self): 

484 """ 

485 Give the allocated size for all volumes. The place count is not 

486 used here. When thick lvm is used, the size for one volume should 

487 be equal to the virtual volume size. With thin lvm, the size is equal 

488 or lower to the volume size. 

489 :return: The allocated size of all volumes. 

490 :rtype: int 

491 """ 

492 

493 # Paths: /res_name/vol_number/size 

494 sizes = {} 

495 

496 for resource in self._get_resource_cache().resources: 

497 if resource.name not in sizes: 

498 current = sizes[resource.name] = {} 

499 else: 

500 current = sizes[resource.name] 

501 

502 for volume in resource.volumes: 

503 # We ignore diskless pools of the form "DfltDisklessStorPool". 

504 if volume.storage_pool_name != self._group_name: 

505 continue 

506 

507 allocated_size = max(volume.allocated_size, 0) 

508 current_allocated_size = current.get(volume.number) or -1 

509 if allocated_size > current_allocated_size: 

510 current[volume.number] = allocated_size 

511 

512 total_size = 0 

513 for volumes in sizes.values(): 

514 for size in volumes.values(): 

515 total_size += size 

516 

517 return total_size * 1024 

518 

519 def get_min_physical_size(self): 

520 """ 

521 Give the minimum physical size of the SR. 

522 I.e. the size of the smallest disk + the number of pools. 

523 :return: The physical min size. 

524 :rtype: tuple(int, int) 

525 """ 

526 size = None 

527 pool_count = 0 

528 for pool in self._get_storage_pools(force=True): 

529 space = pool.free_space 

530 if space: 

531 pool_count += 1 

532 current_size = space.total_capacity 

533 if current_size < 0: 

534 raise LinstorVolumeManagerError( 

535 'Failed to get pool total_capacity attr of `{}`' 

536 .format(pool.node_name) 

537 ) 

538 if size is None or current_size < size: 

539 size = current_size 

540 return (pool_count, (size or 0) * 1024) 

541 

542 @property 

543 def metadata(self): 

544 """ 

545 Get the metadata of the SR. 

546 :return: Dictionary that contains metadata. 

547 :rtype: dict(str, dict) 

548 """ 

549 

550 sr_properties = self._get_sr_properties() 

551 metadata = sr_properties.get(self.PROP_METADATA) 

552 if metadata is not None: 

553 metadata = json.loads(metadata) 

554 if isinstance(metadata, dict): 

555 return metadata 

556 raise LinstorVolumeManagerError( 

557 'Expected dictionary in SR metadata: {}'.format( 

558 self._group_name 

559 ) 

560 ) 

561 

562 return {} 

563 

564 @metadata.setter 

565 def metadata(self, metadata): 

566 """ 

567 Set the metadata of the SR. 

568 :param dict metadata: Dictionary that contains metadata. 

569 """ 

570 

571 assert isinstance(metadata, dict) 

572 sr_properties = self._get_sr_properties() 

573 sr_properties[self.PROP_METADATA] = json.dumps(metadata) 

574 

575 @property 

576 def disconnected_hosts(self): 

577 """ 

578 Get the list of disconnected hosts. 

579 :return: Set that contains disconnected hosts. 

580 :rtype: set(str) 

581 """ 

582 

583 disconnected_hosts = set() 

584 for pool in self._get_storage_pools(): 

585 for report in pool.reports: 

586 if report.ret_code & linstor.consts.WARN_NOT_CONNECTED == \ 

587 linstor.consts.WARN_NOT_CONNECTED: 

588 disconnected_hosts.add(pool.node_name) 

589 break 

590 return disconnected_hosts 

591 

592 def check_volume_exists(self, volume_uuid): 

593 """ 

594 Check if a volume exists in the SR. 

595 :return: True if volume exists. 

596 :rtype: bool 

597 """ 

598 return volume_uuid in self._volumes 

599 

600 def create_volume( 

601 self, 

602 volume_uuid, 

603 size, 

604 persistent=True, 

605 volume_name=None, 

606 high_availability=False 

607 ): 

608 """ 

609 Create a new volume on the SR. 

610 :param str volume_uuid: The volume uuid to use. 

611 :param int size: volume size in B. 

612 :param bool persistent: If false the volume will be unavailable 

613 on the next constructor call LinstorSR(...). 

614 :param str volume_name: If set, this name is used in the LINSTOR 

615 database instead of a generated name. 

616 :param bool high_availability: If set, the volume is created in 

617 the HA group. 

618 :return: The current device path of the volume. 

619 :rtype: str 

620 """ 

621 

622 self._logger('Creating LINSTOR volume {}...'.format(volume_uuid)) 

623 if not volume_name: 

624 volume_name = self.build_volume_name(util.gen_uuid()) 

625 volume_properties = self._create_volume_with_properties( 

626 volume_uuid, 

627 volume_name, 

628 size, 

629 True, # place_resources 

630 high_availability 

631 ) 

632 

633 # Volume created! Now try to find the device path. 

634 try: 

635 self._logger( 

636 'Find device path of LINSTOR volume {}...'.format(volume_uuid) 

637 ) 

638 device_path = self._find_device_path(volume_uuid, volume_name) 

639 if persistent: 

640 volume_properties[self.PROP_NOT_EXISTS] = self.STATE_EXISTS 

641 self._volumes.add(volume_uuid) 

642 self._logger( 

643 'LINSTOR volume {} created!'.format(volume_uuid) 

644 ) 

645 return device_path 

646 except Exception: 

647 # There is an issue to find the path. 

648 # At this point the volume has just been created, so force flag can be used. 

649 self._destroy_volume(volume_uuid, force=True) 

650 raise 

651 

652 def mark_volume_as_persistent(self, volume_uuid): 

653 """ 

654 Mark volume as persistent if created with persistent=False. 

655 :param str volume_uuid: The volume uuid to mark. 

656 """ 

657 

658 self._ensure_volume_exists(volume_uuid) 

659 

660 # Mark volume as persistent. 

661 volume_properties = self._get_volume_properties(volume_uuid) 

662 volume_properties[self.PROP_NOT_EXISTS] = self.STATE_EXISTS 

663 

664 def destroy_volume(self, volume_uuid): 

665 """ 

666 Destroy a volume. 

667 :param str volume_uuid: The volume uuid to destroy. 

668 """ 

669 

670 self._ensure_volume_exists(volume_uuid) 

671 self.ensure_volume_is_not_locked(volume_uuid) 

672 

673 is_volume_in_use = any(node["in-use"] for node in self.get_resource_info(volume_uuid)["nodes"].values()) 

674 if is_volume_in_use: 

675 raise LinstorVolumeManagerError( 

676 f"Could not destroy volume `{volume_uuid}` as it is currently in use", 

677 LinstorVolumeManagerError.ERR_VOLUME_IN_USE 

678 ) 

679 

680 # Mark volume as destroyed. 

681 volume_properties = self._get_volume_properties(volume_uuid) 

682 volume_properties[self.PROP_NOT_EXISTS] = self.STATE_NOT_EXISTS 

683 

684 try: 

685 self._volumes.remove(volume_uuid) 

686 self._destroy_volume(volume_uuid) 

687 except Exception as e: 

688 raise LinstorVolumeManagerError( 

689 str(e), 

690 LinstorVolumeManagerError.ERR_VOLUME_DESTROY 

691 ) 

692 

693 def lock_volume(self, volume_uuid, locked=True): 

694 """ 

695 Prevent modifications of the volume properties during 

696 "self.LOCKED_EXPIRATION_DELAY" seconds. The SR must be locked 

697 when used. This method is useful to attach/detach correctly a volume on 

698 a slave. Without it the GC can rename a volume, in this case the old 

699 volume path can be used by a slave... 

700 :param str volume_uuid: The volume uuid to protect/unprotect. 

701 :param bool locked: Lock/unlock the volume. 

702 """ 

703 

704 self._ensure_volume_exists(volume_uuid) 

705 

706 self._logger( 

707 '{} volume {} as locked'.format( 

708 'Mark' if locked else 'Unmark', 

709 volume_uuid 

710 ) 

711 ) 

712 

713 volume_properties = self._get_volume_properties(volume_uuid) 

714 if locked: 

715 volume_properties[ 

716 self.PROP_IS_READONLY_TIMESTAMP 

717 ] = str(time.time()) 

718 elif self.PROP_IS_READONLY_TIMESTAMP in volume_properties: 

719 volume_properties.pop(self.PROP_IS_READONLY_TIMESTAMP) 

720 

721 def ensure_volume_is_not_locked(self, volume_uuid, timeout=None): 

722 """ 

723 Ensure a volume is not locked. Wait if necessary. 

724 :param str volume_uuid: The volume uuid to check. 

725 :param int timeout: If the volume is always locked after the expiration 

726 of the timeout, an exception is thrown. 

727 """ 

728 return self.ensure_volume_list_is_not_locked([volume_uuid], timeout) 

729 

730 def ensure_volume_list_is_not_locked(self, volume_uuids, timeout=None): 

731 checked = set() 

732 for volume_uuid in volume_uuids: 

733 if volume_uuid in self._volumes: 

734 checked.add(volume_uuid) 

735 

736 if not checked: 

737 return 

738 

739 waiting = False 

740 

741 volume_properties = self._get_kv_cache() 

742 

743 start = time.time() 

744 while True: 

745 # Can't delete in for loop, use a copy of the list. 

746 remaining = checked.copy() 

747 for volume_uuid in checked: 

748 volume_properties.namespace = \ 

749 self._build_volume_namespace(volume_uuid) 

750 timestamp = volume_properties.get( 

751 self.PROP_IS_READONLY_TIMESTAMP 

752 ) 

753 if timestamp is None: 

754 remaining.remove(volume_uuid) 

755 continue 

756 

757 now = time.time() 

758 if now - float(timestamp) > self.LOCKED_EXPIRATION_DELAY: 

759 self._logger( 

760 'Remove readonly timestamp on {}'.format(volume_uuid) 

761 ) 

762 volume_properties.pop(self.PROP_IS_READONLY_TIMESTAMP) 

763 remaining.remove(volume_uuid) 

764 continue 

765 

766 if not waiting: 

767 self._logger( 

768 'Volume {} is locked, waiting...'.format(volume_uuid) 

769 ) 

770 waiting = True 

771 break 

772 

773 if not remaining: 

774 break 

775 checked = remaining 

776 

777 if timeout is not None and now - start > timeout: 

778 raise LinstorVolumeManagerError( 

779 'volume `{}` is locked and timeout has been reached' 

780 .format(volume_uuid), 

781 LinstorVolumeManagerError.ERR_VOLUME_NOT_EXISTS 

782 ) 

783 

784 # We must wait to use the volume. After that we can modify it 

785 # ONLY if the SR is locked to avoid bad reads on the slaves. 

786 time.sleep(1) 

787 volume_properties = self._create_kv_cache() 

788 

789 if waiting: 

790 self._logger('No volume locked now!') 

791 

792 def remove_volume_if_diskless(self, volume_uuid): 

793 """ 

794 Remove disless path from local node. 

795 :param str volume_uuid: The volume uuid to remove. 

796 """ 

797 

798 self._ensure_volume_exists(volume_uuid) 

799 

800 volume_properties = self._get_volume_properties(volume_uuid) 

801 volume_name = volume_properties.get(self.PROP_VOLUME_NAME) 

802 

803 node_name = socket.gethostname() 

804 

805 for resource in self._get_resource_cache().resources: 

806 if resource.name == volume_name and resource.node_name == node_name: 

807 if linstor.consts.FLAG_TIE_BREAKER in resource.flags: 

808 return 

809 break 

810 

811 result = self._linstor.resource_delete_if_diskless( 

812 node_name=node_name, rsc_name=volume_name 

813 ) 

814 if not linstor.Linstor.all_api_responses_no_error(result): 

815 raise LinstorVolumeManagerError( 

816 'Unable to delete diskless path of `{}` on node `{}`: {}' 

817 .format(volume_name, node_name, ', '.join( 

818 [str(x) for x in result])) 

819 ) 

820 

821 def introduce_volume(self, volume_uuid): 

822 pass # TODO: Implement me. 

823 

824 def resize_volume(self, volume_uuid, new_size): 

825 """ 

826 Resize a volume. 

827 :param str volume_uuid: The volume uuid to resize. 

828 :param int new_size: New size in B. 

829 """ 

830 

831 volume_name = self.get_volume_name(volume_uuid) 

832 self.ensure_volume_is_not_locked(volume_uuid) 

833 new_size = self.round_up_volume_size(new_size) // 1024 

834 

835 # We can't resize anything until DRBD is up to date. 

836 # We wait here for 5min max and raise an easy to understand error for the user. 

837 # 5min is an arbitrary time, it's impossible to get a fit all situation value 

838 # and it's currently impossible to know how much time we have to wait 

839 # This is mostly an issue for thick provisioning, thin isn't affected. 

840 start_time = time.monotonic() 

841 try: 

842 self._linstor.resource_dfn_wait_synced(volume_name, wait_interval=1.0, timeout=60*5) 

843 except linstor.LinstorTimeoutError: 

844 raise LinstorVolumeManagerError( 

845 f"Volume resizing of `{volume_uuid}` from SR `{self._group_name}` is incomplete: timeout reached but it continues in background." 

846 ) 

847 util.SMlog(f"DRBD is up to date, syncing took {time.monotonic() - start_time}s") 

848 

849 result = self._linstor.volume_dfn_modify( 

850 rsc_name=volume_name, 

851 volume_nr=0, 

852 size=new_size 

853 ) 

854 

855 self._mark_resource_cache_as_dirty() 

856 

857 error_str = self._get_error_str(result) 

858 if error_str: 

859 raise LinstorVolumeManagerError( 

860 f"Could not resize volume `{volume_uuid}` from SR `{self._group_name}`: {error_str}" 

861 ) 

862 

863 def get_volume_name(self, volume_uuid): 

864 """ 

865 Get the name of a particular volume. 

866 :param str volume_uuid: The volume uuid of the name to get. 

867 :return: The volume name. 

868 :rtype: str 

869 """ 

870 

871 self._ensure_volume_exists(volume_uuid) 

872 volume_properties = self._get_volume_properties(volume_uuid) 

873 volume_name = volume_properties.get(self.PROP_VOLUME_NAME) 

874 if volume_name: 

875 return volume_name 

876 raise LinstorVolumeManagerError( 

877 'Failed to get volume name of {}'.format(volume_uuid) 

878 ) 

879 

880 def get_volume_size(self, volume_uuid): 

881 """ 

882 Get the size of a particular volume. 

883 :param str volume_uuid: The volume uuid of the size to get. 

884 :return: The volume size. 

885 :rtype: int 

886 """ 

887 

888 volume_name = self.get_volume_name(volume_uuid) 

889 dfns = self._linstor.resource_dfn_list_raise( 

890 query_volume_definitions=True, 

891 filter_by_resource_definitions=[volume_name] 

892 ).resource_definitions 

893 

894 size = dfns[0].volume_definitions[0].size 

895 if size < 0: 

896 raise LinstorVolumeManagerError( 

897 'Failed to get volume size of: {}'.format(volume_uuid) 

898 ) 

899 return size * 1024 

900 

901 def set_auto_promote_timeout(self, volume_uuid, timeout): 

902 """ 

903 Define the blocking time of open calls when a DRBD 

904 is already open on another host. 

905 :param str volume_uuid: The volume uuid to modify. 

906 """ 

907 

908 volume_name = self.get_volume_name(volume_uuid) 

909 result = self._linstor.resource_dfn_modify(volume_name, { 

910 'DrbdOptions/Resource/auto-promote-timeout': timeout 

911 }) 

912 error_str = self._get_error_str(result) 

913 if error_str: 

914 raise LinstorVolumeManagerError( 

915 'Could not change the auto promote timeout of `{}`: {}' 

916 .format(volume_uuid, error_str) 

917 ) 

918 

919 def set_drbd_ha_properties(self, volume_name, enabled=True): 

920 """ 

921 Set or not HA DRBD properties required by drbd-reactor and 

922 by specific volumes. 

923 :param str volume_name: The volume to modify. 

924 :param bool enabled: Enable or disable HA properties. 

925 """ 

926 

927 properties = { 

928 'DrbdOptions/auto-quorum': 'disabled', 

929 'DrbdOptions/Resource/auto-promote': 'no', 

930 'DrbdOptions/Resource/on-no-data-accessible': 'io-error', 

931 'DrbdOptions/Resource/on-no-quorum': 'io-error', 

932 'DrbdOptions/Resource/on-suspended-primary-outdated': 'force-secondary', 

933 'DrbdOptions/Resource/quorum': 'majority' 

934 } 

935 if enabled: 

936 result = self._linstor.resource_dfn_modify(volume_name, properties) 

937 else: 

938 result = self._linstor.resource_dfn_modify(volume_name, {}, delete_props=list(properties.keys())) 

939 

940 error_str = self._get_error_str(result) 

941 if error_str: 

942 raise LinstorVolumeManagerError( 

943 'Could not modify HA DRBD properties on volume `{}`: {}' 

944 .format(volume_name, error_str) 

945 ) 

946 

947 def get_volume_info(self, volume_uuid): 

948 """ 

949 Get the volume info of a particular volume. 

950 :param str volume_uuid: The volume uuid of the volume info to get. 

951 :return: The volume info. 

952 :rtype: VolumeInfo 

953 """ 

954 

955 volume_name = self.get_volume_name(volume_uuid) 

956 return self._get_volumes_info()[volume_name] 

957 

958 def get_device_path(self, volume_uuid): 

959 """ 

960 Get the dev path of a volume, create a diskless if necessary. 

961 :param str volume_uuid: The volume uuid to get the dev path. 

962 :return: The current device path of the volume. 

963 :rtype: str 

964 """ 

965 

966 volume_name = self.get_volume_name(volume_uuid) 

967 return self._find_device_path(volume_uuid, volume_name) 

968 

969 def get_volume_uuid_from_device_path(self, device_path): 

970 """ 

971 Get the volume uuid of a device_path. 

972 :param str device_path: The dev path to find the volume uuid. 

973 :return: The volume uuid of the local device path. 

974 :rtype: str 

975 """ 

976 

977 expected_volume_name = \ 

978 self.get_volume_name_from_device_path(device_path) 

979 

980 volume_names = self.get_volumes_with_name() 

981 for volume_uuid, volume_name in volume_names.items(): 

982 if volume_name == expected_volume_name: 

983 return volume_uuid 

984 

985 raise LinstorVolumeManagerError( 

986 'Unable to find volume uuid from dev path `{}`'.format(device_path) 

987 ) 

988 

989 def get_volume_name_from_device_path(self, device_path): 

990 """ 

991 Get the volume name of a device_path. 

992 :param str device_path: The dev path to find the volume name. 

993 :return: The volume name of the device path. 

994 :rtype: str 

995 """ 

996 

997 # Assume that we have a path like this: 

998 # - "/dev/drbd/by-res/xcp-volume-<UUID>/0" 

999 # - "../xcp-volume-<UUID>/0" 

1000 if device_path.startswith(DRBD_BY_RES_PATH): 

1001 prefix_len = len(DRBD_BY_RES_PATH) 

1002 elif device_path.startswith('../'): 

1003 prefix_len = 3 

1004 else: 

1005 raise LinstorVolumeManagerError('Unexpected device path: `{}`'.format(device_path)) 

1006 

1007 res_name_end = device_path.find('/', prefix_len) 

1008 assert res_name_end != -1 

1009 return device_path[prefix_len:res_name_end] 

1010 

1011 def update_volume_uuid(self, volume_uuid, new_volume_uuid, force=False): 

1012 """ 

1013 Change the uuid of a volume. 

1014 :param str volume_uuid: The volume to modify. 

1015 :param str new_volume_uuid: The new volume uuid to use. 

1016 :param bool force: If true we doesn't check if volume_uuid is in the 

1017 volume list. I.e. the volume can be marked as deleted but the volume 

1018 can still be in the LINSTOR KV store if the deletion has failed. 

1019 In specific cases like "undo" after a failed clone we must rename a bad 

1020 deleted VDI. 

1021 """ 

1022 

1023 self._logger( 

1024 'Trying to update volume UUID {} to {}...' 

1025 .format(volume_uuid, new_volume_uuid) 

1026 ) 

1027 assert volume_uuid != new_volume_uuid, 'can\'t update volume UUID, same value' 

1028 

1029 if not force: 

1030 self._ensure_volume_exists(volume_uuid) 

1031 self.ensure_volume_is_not_locked(volume_uuid) 

1032 

1033 if new_volume_uuid in self._volumes: 

1034 raise LinstorVolumeManagerError( 

1035 'Volume `{}` already exists'.format(new_volume_uuid), 

1036 LinstorVolumeManagerError.ERR_VOLUME_EXISTS 

1037 ) 

1038 

1039 volume_properties = self._get_volume_properties(volume_uuid) 

1040 if volume_properties.get(self.PROP_UPDATING_UUID_SRC): 

1041 raise LinstorVolumeManagerError( 

1042 'Cannot update volume uuid {}: invalid state' 

1043 .format(volume_uuid) 

1044 ) 

1045 

1046 # 1. Copy in temp variables metadata and volume_name. 

1047 metadata = volume_properties.get(self.PROP_METADATA) 

1048 volume_name = volume_properties.get(self.PROP_VOLUME_NAME) 

1049 

1050 # 2. Switch to new volume namespace. 

1051 volume_properties.namespace = self._build_volume_namespace( 

1052 new_volume_uuid 

1053 ) 

1054 

1055 if list(volume_properties.items()): 

1056 raise LinstorVolumeManagerError( 

1057 'Cannot update volume uuid {} to {}: ' 

1058 .format(volume_uuid, new_volume_uuid) + 

1059 'this last one is not empty' 

1060 ) 

1061 

1062 try: 

1063 # 3. Mark new volume properties with PROP_UPDATING_UUID_SRC. 

1064 # If we crash after that, the new properties can be removed 

1065 # properly. 

1066 volume_properties[self.PROP_NOT_EXISTS] = self.STATE_NOT_EXISTS 

1067 volume_properties[self.PROP_UPDATING_UUID_SRC] = volume_uuid 

1068 

1069 # 4. Copy the properties. 

1070 # Note: On new volumes, during clone for example, the metadata 

1071 # may be missing. So we must test it to avoid this error: 

1072 # "None has to be a str/unicode, but is <type 'NoneType'>" 

1073 if metadata: 

1074 volume_properties[self.PROP_METADATA] = metadata 

1075 volume_properties[self.PROP_VOLUME_NAME] = volume_name 

1076 

1077 # 5. Ok! 

1078 volume_properties[self.PROP_NOT_EXISTS] = self.STATE_EXISTS 

1079 except Exception as err: 

1080 try: 

1081 # Clear the new volume properties in case of failure. 

1082 assert volume_properties.namespace == \ 

1083 self._build_volume_namespace(new_volume_uuid) 

1084 volume_properties.clear() 

1085 except Exception as e: 

1086 self._logger( 

1087 'Failed to clear new volume properties: {} (ignoring...)' 

1088 .format(e) 

1089 ) 

1090 raise LinstorVolumeManagerError( 

1091 'Failed to copy volume properties: {}'.format(err) 

1092 ) 

1093 

1094 try: 

1095 # 6. After this point, it's ok we can remove the 

1096 # PROP_UPDATING_UUID_SRC property and clear the src properties 

1097 # without problems. 

1098 

1099 # 7. Switch to old volume namespace. 

1100 volume_properties.namespace = self._build_volume_namespace( 

1101 volume_uuid 

1102 ) 

1103 volume_properties.clear() 

1104 

1105 # 8. Switch a last time to new volume namespace. 

1106 volume_properties.namespace = self._build_volume_namespace( 

1107 new_volume_uuid 

1108 ) 

1109 volume_properties.pop(self.PROP_UPDATING_UUID_SRC) 

1110 except Exception as e: 

1111 raise LinstorVolumeManagerError( 

1112 'Failed to clear volume properties ' 

1113 'after volume uuid update: {}'.format(e) 

1114 ) 

1115 

1116 try: 

1117 self._volumes.remove(volume_uuid) 

1118 except KeyError: 

1119 # Can be missing if we are building the volume set attr AND 

1120 # we are processing a deleted resource. 

1121 assert force 

1122 

1123 self._volumes.add(new_volume_uuid) 

1124 

1125 self._logger( 

1126 'UUID update succeeded of {} to {}! (properties={})' 

1127 .format( 

1128 volume_uuid, new_volume_uuid, 

1129 self._get_filtered_properties(volume_properties) 

1130 ) 

1131 ) 

1132 

1133 def update_volume_name(self, volume_uuid, volume_name): 

1134 """ 

1135 Change the volume name of a volume. 

1136 :param str volume_uuid: The volume to modify. 

1137 :param str volume_name: The volume_name to use. 

1138 """ 

1139 

1140 self._ensure_volume_exists(volume_uuid) 

1141 self.ensure_volume_is_not_locked(volume_uuid) 

1142 if not volume_name.startswith(self.PREFIX_VOLUME): 

1143 raise LinstorVolumeManagerError( 

1144 'Volume name `{}` must be start with `{}`' 

1145 .format(volume_name, self.PREFIX_VOLUME) 

1146 ) 

1147 

1148 if volume_name not in self._fetch_resource_names(): 

1149 raise LinstorVolumeManagerError( 

1150 'Volume `{}` doesn\'t exist'.format(volume_name) 

1151 ) 

1152 

1153 volume_properties = self._get_volume_properties(volume_uuid) 

1154 volume_properties[self.PROP_VOLUME_NAME] = volume_name 

1155 

1156 def get_usage_states(self, volume_uuid): 

1157 """ 

1158 Check if a volume is currently used. 

1159 :param str volume_uuid: The volume uuid to check. 

1160 :return: A dictionary that contains states. 

1161 :rtype: dict(str, bool or None) 

1162 """ 

1163 

1164 states = {} 

1165 

1166 volume_name = self.get_volume_name(volume_uuid) 

1167 for resource_state in self._linstor.resource_list_raise( 

1168 filter_by_resources=[volume_name] 

1169 ).resource_states: 

1170 states[resource_state.node_name] = resource_state.in_use 

1171 

1172 return states 

1173 

1174 def get_volume_openers(self, volume_uuid): 

1175 """ 

1176 Get openers of a volume. 

1177 :param str volume_uuid: The volume uuid to monitor. 

1178 :return: A dictionary that contains openers. 

1179 :rtype: dict(str, obj) 

1180 """ 

1181 return get_all_volume_openers(self.get_volume_name(volume_uuid), '0') 

1182 

1183 def get_volumes_with_name(self): 

1184 """ 

1185 Give a volume dictionary that contains names actually owned. 

1186 :return: A volume/name dict. 

1187 :rtype: dict(str, str) 

1188 """ 

1189 return self._get_volumes_by_property(self.REG_VOLUME_NAME) 

1190 

1191 def get_volumes_with_info(self): 

1192 """ 

1193 Give a volume dictionary that contains VolumeInfos. 

1194 :return: A volume/VolumeInfo dict. 

1195 :rtype: dict(str, VolumeInfo) 

1196 """ 

1197 

1198 volumes = {} 

1199 

1200 volume_names = self.get_volumes_with_name() 

1201 all_volume_info = self._get_volumes_info(volume_names) 

1202 for volume_uuid, volume_name in volume_names.items(): 

1203 if volume_name: 

1204 volume_info = all_volume_info.get(volume_name) 

1205 if volume_info: 

1206 volumes[volume_uuid] = volume_info 

1207 continue 

1208 

1209 # Well I suppose if this volume is not available, 

1210 # LINSTOR has been used directly without using this API. 

1211 volumes[volume_uuid] = self.VolumeInfo('') 

1212 

1213 return volumes 

1214 

1215 def get_volumes_with_metadata(self): 

1216 """ 

1217 Give a volume dictionary that contains metadata. 

1218 :return: A volume/metadata dict. 

1219 :rtype: dict(str, dict) 

1220 """ 

1221 

1222 volumes = {} 

1223 

1224 metadata = self._get_volumes_by_property(self.REG_METADATA) 

1225 for volume_uuid, volume_metadata in metadata.items(): 

1226 if volume_metadata: 

1227 volume_metadata = json.loads(volume_metadata) 

1228 if isinstance(volume_metadata, dict): 

1229 volumes[volume_uuid] = volume_metadata 

1230 continue 

1231 raise LinstorVolumeManagerError( 

1232 'Expected dictionary in volume metadata: {}' 

1233 .format(volume_uuid) 

1234 ) 

1235 

1236 volumes[volume_uuid] = {} 

1237 

1238 return volumes 

1239 

1240 def get_volume_metadata(self, volume_uuid): 

1241 """ 

1242 Get the metadata of a volume. 

1243 :return: Dictionary that contains metadata. 

1244 :rtype: dict 

1245 """ 

1246 

1247 self._ensure_volume_exists(volume_uuid) 

1248 volume_properties = self._get_volume_properties(volume_uuid) 

1249 metadata = volume_properties.get(self.PROP_METADATA) 

1250 if metadata: 

1251 metadata = json.loads(metadata) 

1252 if isinstance(metadata, dict): 

1253 return metadata 

1254 raise LinstorVolumeManagerError( 

1255 'Expected dictionary in volume metadata: {}' 

1256 .format(volume_uuid) 

1257 ) 

1258 return {} 

1259 

1260 def set_volume_metadata(self, volume_uuid, metadata): 

1261 """ 

1262 Set the metadata of a volume. 

1263 :param dict metadata: Dictionary that contains metadata. 

1264 """ 

1265 

1266 self._ensure_volume_exists(volume_uuid) 

1267 self.ensure_volume_is_not_locked(volume_uuid) 

1268 

1269 assert isinstance(metadata, dict) 

1270 volume_properties = self._get_volume_properties(volume_uuid) 

1271 volume_properties[self.PROP_METADATA] = json.dumps(metadata) 

1272 

1273 def update_volume_metadata(self, volume_uuid, metadata): 

1274 """ 

1275 Update the metadata of a volume. It modify only the given keys. 

1276 It doesn't remove unreferenced key instead of set_volume_metadata. 

1277 :param dict metadata: Dictionary that contains metadata. 

1278 """ 

1279 

1280 self._ensure_volume_exists(volume_uuid) 

1281 self.ensure_volume_is_not_locked(volume_uuid) 

1282 

1283 assert isinstance(metadata, dict) 

1284 volume_properties = self._get_volume_properties(volume_uuid) 

1285 

1286 current_metadata = json.loads( 

1287 volume_properties.get(self.PROP_METADATA, '{}') 

1288 ) 

1289 if not isinstance(metadata, dict): 

1290 raise LinstorVolumeManagerError( 

1291 'Expected dictionary in volume metadata: {}' 

1292 .format(volume_uuid) 

1293 ) 

1294 

1295 for key, value in metadata.items(): 

1296 current_metadata[key] = value 

1297 volume_properties[self.PROP_METADATA] = json.dumps(current_metadata) 

1298 

1299 def shallow_clone_volume(self, volume_uuid, clone_uuid, persistent=True): 

1300 """ 

1301 Clone a volume. Do not copy the data, this method creates a new volume 

1302 with the same size. 

1303 :param str volume_uuid: The volume to clone. 

1304 :param str clone_uuid: The cloned volume. 

1305 :param bool persistent: If false the volume will be unavailable 

1306 on the next constructor call LinstorSR(...). 

1307 :return: The current device path of the cloned volume. 

1308 :rtype: str 

1309 """ 

1310 

1311 volume_name = self.get_volume_name(volume_uuid) 

1312 self.ensure_volume_is_not_locked(volume_uuid) 

1313 

1314 # 1. Find ideal nodes + size to use. 

1315 ideal_node_names, size = self._get_volume_node_names_and_size( 

1316 volume_name 

1317 ) 

1318 if size <= 0: 

1319 raise LinstorVolumeManagerError( 

1320 'Invalid size of {} for volume `{}`'.format(size, volume_name) 

1321 ) 

1322 

1323 # 2. Create clone! 

1324 return self.create_volume(clone_uuid, size, persistent) 

1325 

1326 def remove_resourceless_volumes(self): 

1327 """ 

1328 Remove all volumes without valid or non-empty name 

1329 (i.e. without LINSTOR resource). It's different than 

1330 LinstorVolumeManager constructor that takes a `repair` param that 

1331 removes volumes with `PROP_NOT_EXISTS` to 1. 

1332 """ 

1333 

1334 resource_names = self._fetch_resource_names() 

1335 for volume_uuid, volume_name in self.get_volumes_with_name().items(): 

1336 if not volume_name or volume_name not in resource_names: 

1337 # Don't force, we can be sure of what's happening. 

1338 self.destroy_volume(volume_uuid) 

1339 

1340 def destroy(self): 

1341 """ 

1342 Destroy this SR. Object should not be used after that. 

1343 :param bool force: Try to destroy volumes before if true. 

1344 """ 

1345 

1346 # 1. Ensure volume list is empty. No cost. 

1347 if self._volumes: 

1348 raise LinstorVolumeManagerError( 

1349 'Cannot destroy LINSTOR volume manager: ' 

1350 'It exists remaining volumes' 

1351 ) 

1352 

1353 # 2. Fetch ALL resource names. 

1354 # This list may therefore contain volumes created outside 

1355 # the scope of the driver. 

1356 resource_names = self._fetch_resource_names(ignore_deleted=False) 

1357 try: 

1358 resource_names.remove(DATABASE_VOLUME_NAME) 

1359 except KeyError: 

1360 # Really strange to reach that point. 

1361 # Normally we always have the database volume in the list. 

1362 pass 

1363 

1364 # 3. Ensure the resource name list is entirely empty... 

1365 if resource_names: 

1366 raise LinstorVolumeManagerError( 

1367 'Cannot destroy LINSTOR volume manager: ' 

1368 'It exists remaining volumes (created externally or being deleted)' 

1369 ) 

1370 

1371 # 4. Destroying... 

1372 controller_is_running = self._controller_is_running() 

1373 uri = 'linstor://localhost' 

1374 try: 

1375 if controller_is_running: 

1376 self._start_controller(start=False) 

1377 

1378 # 4.1. Umount LINSTOR database. 

1379 self._mount_database_volume( 

1380 self.build_device_path(DATABASE_VOLUME_NAME), 

1381 mount=False, 

1382 force=True 

1383 ) 

1384 

1385 # 4.2. Refresh instance. 

1386 self._start_controller(start=True) 

1387 self._linstor = self._create_linstor_instance( 

1388 uri, keep_uri_unmodified=True 

1389 ) 

1390 

1391 # 4.3. Destroy database volume. 

1392 self._destroy_resource(DATABASE_VOLUME_NAME) 

1393 

1394 # 4.4. Refresh linstor connection. 

1395 # Without we get this error: 

1396 # "Cannot delete resource group 'xcp-sr-linstor_group_thin_device' because it has existing resource definitions.." 

1397 # Because the deletion of the databse was not seen by Linstor for some reason. 

1398 # It seems a simple refresh of the Linstor connection make it aware of the deletion. 

1399 self._linstor.disconnect() 

1400 self._linstor.connect() 

1401 

1402 # 4.5. Destroy remaining drbd nodes on hosts. 

1403 # We check if there is a DRBD node on hosts that could mean blocking when destroying resource groups. 

1404 # It needs to be done locally by each host so we go through the linstor-manager plugin. 

1405 # If we don't do this sometimes, the destroy will fail when trying to destroy the resource groups with: 

1406 # "linstor-manager:destroy error: Failed to destroy SP `xcp-sr-linstor_group_thin_device` on node `r620-s2`: The specified storage pool 'xcp-sr-linstor_group_thin_device' on node 'r620-s2' can not be deleted as volumes / snapshot-volumes are still using it." 

1407 session = util.timeout_call(5, util.get_localAPI_session) 

1408 for host_ref in session.xenapi.host.get_all(): 

1409 try: 

1410 response = session.xenapi.host.call_plugin( 

1411 host_ref, 'linstor-manager', 'destroyDrbdVolumes', {'volume_group': self._group_name} 

1412 ) 

1413 except Exception as e: 

1414 util.SMlog('Calling destroyDrbdVolumes on host {} failed with error {}'.format(host_ref, e)) 

1415 

1416 # 4.6. Destroy group and storage pools. 

1417 self._destroy_resource_group(self._linstor, self._group_name) 

1418 self._destroy_resource_group(self._linstor, self._ha_group_name) 

1419 for pool in self._get_storage_pools(force=True): 

1420 self._destroy_storage_pool( 

1421 self._linstor, pool.name, pool.node_name 

1422 ) 

1423 except Exception as e: 

1424 self._start_controller(start=controller_is_running) 

1425 raise e 

1426 

1427 try: 

1428 self._start_controller(start=False) 

1429 for file in os.listdir(DATABASE_PATH): 

1430 if file != 'lost+found': 

1431 os.remove(DATABASE_PATH + '/' + file) 

1432 except Exception as e: 

1433 util.SMlog( 

1434 'Ignoring failure after LINSTOR SR destruction: {}' 

1435 .format(e) 

1436 ) 

1437 

1438 def find_up_to_date_diskful_nodes(self, volume_uuid): 

1439 """ 

1440 Find all nodes that contain a specific volume using diskful disks. 

1441 The disk must be up to data to be used. 

1442 :param str volume_uuid: The volume to use. 

1443 :return: The available nodes. 

1444 :rtype: tuple(set(str), str) 

1445 """ 

1446 

1447 volume_name = self.get_volume_name(volume_uuid) 

1448 

1449 in_use_by = None 

1450 node_names = set() 

1451 

1452 resource_states = filter( 

1453 lambda resource_state: resource_state.name == volume_name, 

1454 self._get_resource_cache().resource_states 

1455 ) 

1456 

1457 for resource_state in resource_states: 

1458 volume_state = resource_state.volume_states[0] 

1459 if volume_state.disk_state == 'UpToDate': 

1460 node_names.add(resource_state.node_name) 

1461 if resource_state.in_use: 

1462 in_use_by = resource_state.node_name 

1463 

1464 return (node_names, in_use_by) 

1465 

1466 def invalidate_resource_cache(self): 

1467 """ 

1468 If resources are impacted by external commands like vhdutil, 

1469 it's necessary to call this function to invalidate current resource 

1470 cache. 

1471 """ 

1472 self._mark_resource_cache_as_dirty() 

1473 

1474 def has_node(self, node_name): 

1475 """ 

1476 Check if a node exists in the LINSTOR database. 

1477 :rtype: bool 

1478 """ 

1479 result = self._linstor.node_list() 

1480 error_str = self._get_error_str(result) 

1481 if error_str: 

1482 raise LinstorVolumeManagerError( 

1483 'Failed to list nodes using `{}`: {}' 

1484 .format(node_name, error_str) 

1485 ) 

1486 return bool(result[0].node(node_name)) 

1487 

1488 def create_node(self, node_name, ip): 

1489 """ 

1490 Create a new node in the LINSTOR database. 

1491 :param str node_name: Node name to use. 

1492 :param str ip: Host IP to communicate. 

1493 """ 

1494 result = self._linstor.node_create( 

1495 node_name, 

1496 linstor.consts.VAL_NODE_TYPE_CMBD, 

1497 ip 

1498 ) 

1499 errors = self._filter_errors(result) 

1500 if errors: 

1501 error_str = self._get_error_str(errors) 

1502 raise LinstorVolumeManagerError( 

1503 'Failed to create node `{}`: {}'.format(node_name, error_str) 

1504 ) 

1505 

1506 def destroy_node(self, node_name): 

1507 """ 

1508 Destroy a node in the LINSTOR database. 

1509 :param str node_name: Node name to remove. 

1510 """ 

1511 result = self._linstor.node_delete(node_name) 

1512 errors = self._filter_errors(result) 

1513 if errors: 

1514 error_str = self._get_error_str(errors) 

1515 raise LinstorVolumeManagerError( 

1516 'Failed to destroy node `{}`: {}'.format(node_name, error_str) 

1517 ) 

1518 

1519 def create_node_interface(self, node_name, name, ip): 

1520 """ 

1521 Create a new node interface in the LINSTOR database. 

1522 :param str node_name: Node name of the interface to use. 

1523 :param str name: Interface to create. 

1524 :param str ip: IP of the interface. 

1525 """ 

1526 result = self._linstor.netinterface_create(node_name, name, ip) 

1527 errors = self._filter_errors(result) 

1528 if errors: 

1529 error_str = self._get_error_str(errors) 

1530 raise LinstorVolumeManagerError( 

1531 'Failed to create node interface on `{}`: {}'.format(node_name, error_str) 

1532 ) 

1533 

1534 def destroy_node_interface(self, node_name, name): 

1535 """ 

1536 Destroy a node interface in the LINSTOR database. 

1537 :param str node_name: Node name of the interface to remove. 

1538 :param str name: Interface to remove. 

1539 """ 

1540 

1541 if name == 'default': 

1542 raise LinstorVolumeManagerError( 

1543 'Unable to delete the default interface of a node!' 

1544 ) 

1545 

1546 result = self._linstor.netinterface_delete(node_name, name) 

1547 errors = self._filter_errors(result) 

1548 if errors: 

1549 error_str = self._get_error_str(errors) 

1550 raise LinstorVolumeManagerError( 

1551 'Failed to destroy node interface on `{}`: {}'.format(node_name, error_str) 

1552 ) 

1553 

1554 def modify_node_interface(self, node_name, name, ip): 

1555 """ 

1556 Modify a node interface in the LINSTOR database. Create it if necessary. 

1557 :param str node_name: Node name of the interface to use. 

1558 :param str name: Interface to modify or create. 

1559 :param str ip: IP of the interface. 

1560 """ 

1561 result = self._linstor.netinterface_create(node_name, name, ip) 

1562 errors = self._filter_errors(result) 

1563 if not errors: 

1564 return 

1565 

1566 if self._check_errors(errors, [linstor.consts.FAIL_EXISTS_NET_IF]): 

1567 result = self._linstor.netinterface_modify(node_name, name, ip) 

1568 errors = self._filter_errors(result) 

1569 if not errors: 

1570 return 

1571 

1572 error_str = self._get_error_str(errors) 

1573 raise LinstorVolumeManagerError( 

1574 'Unable to modify interface on `{}`: {}'.format(node_name, error_str) 

1575 ) 

1576 

1577 def list_node_interfaces(self, node_name): 

1578 """ 

1579 List all node interfaces. 

1580 :param str node_name: Node name to use to list interfaces. 

1581 :rtype: list 

1582 : 

1583 """ 

1584 result = self._linstor.net_interface_list(node_name) 

1585 if not result: 

1586 raise LinstorVolumeManagerError( 

1587 'Unable to list interfaces on `{}`: no list received'.format(node_name) 

1588 ) 

1589 

1590 interfaces = {} 

1591 for interface in result: 

1592 interface = interface._rest_data 

1593 interfaces[interface['name']] = { 

1594 'address': interface['address'], 

1595 'active': interface['is_active'] 

1596 } 

1597 return interfaces 

1598 

1599 def get_node_preferred_interface(self, node_name): 

1600 """ 

1601 Get the preferred interface used by a node. 

1602 :param str node_name: Node name of the interface to get. 

1603 :rtype: str 

1604 """ 

1605 try: 

1606 nodes = self._linstor.node_list_raise([node_name]).nodes 

1607 if nodes: 

1608 properties = nodes[0].props 

1609 return properties.get('PrefNic', 'default') 

1610 return nodes 

1611 except Exception as e: 

1612 raise LinstorVolumeManagerError( 

1613 'Failed to get preferred interface: `{}`'.format(e) 

1614 ) 

1615 

1616 def set_node_preferred_interface(self, node_name, name): 

1617 """ 

1618 Set the preferred interface to use on a node. 

1619 :param str node_name: Node name of the interface. 

1620 :param str name: Preferred interface to use. 

1621 """ 

1622 result = self._linstor.node_modify(node_name, property_dict={'PrefNic': name}) 

1623 errors = self._filter_errors(result) 

1624 if errors: 

1625 error_str = self._get_error_str(errors) 

1626 raise LinstorVolumeManagerError( 

1627 'Failed to set preferred node interface on `{}`: {}'.format(node_name, error_str) 

1628 ) 

1629 

1630 def get_nodes_info(self): 

1631 """ 

1632 Get all nodes + statuses, used or not by the pool. 

1633 :rtype: dict(str, dict) 

1634 """ 

1635 try: 

1636 nodes = {} 

1637 for node in self._linstor.node_list_raise().nodes: 

1638 nodes[node.name] = node.connection_status 

1639 return nodes 

1640 except Exception as e: 

1641 raise LinstorVolumeManagerError( 

1642 'Failed to get all nodes: `{}`'.format(e) 

1643 ) 

1644 

1645 def get_storage_pools_info(self): 

1646 """ 

1647 Give all storage pools of current group name. 

1648 :rtype: dict(str, list) 

1649 """ 

1650 storage_pools = {} 

1651 for pool in self._get_storage_pools(force=True): 

1652 if pool.node_name not in storage_pools: 

1653 storage_pools[pool.node_name] = [] 

1654 

1655 size = -1 

1656 capacity = -1 

1657 

1658 space = pool.free_space 

1659 if space: 

1660 size = space.free_capacity 

1661 if size < 0: 

1662 size = -1 

1663 else: 

1664 size *= 1024 

1665 capacity = space.total_capacity 

1666 if capacity <= 0: 

1667 capacity = -1 

1668 else: 

1669 capacity *= 1024 

1670 

1671 storage_pools[pool.node_name].append({ 

1672 'name': pool.name, 

1673 'linstor-uuid': pool.uuid, 

1674 'free-size': size, 

1675 'capacity': capacity 

1676 }) 

1677 

1678 return storage_pools 

1679 

1680 def get_resources_info(self): 

1681 """ 

1682 Give all resources of current group name. 

1683 :rtype: dict(str, list) 

1684 """ 

1685 if self._resources_info_cache and not self._resource_cache_dirty: 

1686 return self._resources_info_cache 

1687 

1688 resources = {} 

1689 resource_list = self._get_resource_cache() 

1690 volume_names = self.get_volumes_with_name() 

1691 for resource in resource_list.resources: 

1692 if resource.name not in resources: 

1693 resources[resource.name] = { 'nodes': {}, 'uuid': '' } 

1694 resource_nodes = resources[resource.name]['nodes'] 

1695 

1696 resource_nodes[resource.node_name] = { 

1697 'volumes': [], 

1698 'diskful': linstor.consts.FLAG_DISKLESS not in resource.flags, 

1699 'tie-breaker': linstor.consts.FLAG_TIE_BREAKER in resource.flags 

1700 } 

1701 resource_volumes = resource_nodes[resource.node_name]['volumes'] 

1702 

1703 for volume in resource.volumes: 

1704 # We ignore diskless pools of the form "DfltDisklessStorPool". 

1705 if volume.storage_pool_name != self._group_name: 

1706 continue 

1707 

1708 usable_size = volume.usable_size 

1709 if usable_size < 0: 

1710 usable_size = -1 

1711 else: 

1712 usable_size *= 1024 

1713 

1714 allocated_size = volume.allocated_size 

1715 if allocated_size < 0: 

1716 allocated_size = -1 

1717 else: 

1718 allocated_size *= 1024 

1719 

1720 resource_volumes.append({ 

1721 'storage-pool-name': volume.storage_pool_name, 

1722 'linstor-uuid': volume.uuid, 

1723 'number': volume.number, 

1724 'device-path': volume.device_path, 

1725 'usable-size': usable_size, 

1726 'allocated-size': allocated_size 

1727 }) 

1728 

1729 for resource_state in resource_list.resource_states: 

1730 resource = resources[resource_state.rsc_name]['nodes'][resource_state.node_name] 

1731 resource['in-use'] = resource_state.in_use 

1732 

1733 volumes = resource['volumes'] 

1734 for volume_state in resource_state.volume_states: 

1735 volume = next((x for x in volumes if x['number'] == volume_state.number), None) 

1736 if volume: 

1737 volume['disk-state'] = volume_state.disk_state 

1738 

1739 for volume_uuid, volume_name in volume_names.items(): 

1740 resource = resources.get(volume_name) 

1741 if resource: 

1742 resource['uuid'] = volume_uuid 

1743 

1744 self._resources_info_cache = resources 

1745 return self._resources_info_cache 

1746 

1747 def get_resource_info(self, volume_uuid: str) -> Dict[str, Any]: 

1748 """ 

1749 Give a resource info based on its UUID. 

1750 :param volume_uuid str: volume uuid to search for 

1751 :rtype: dict(str, any) 

1752 """ 

1753 for volume in self.get_resources_info().values(): 

1754 if volume["uuid"] == volume_uuid: 

1755 return volume 

1756 

1757 raise LinstorVolumeManagerError( 

1758 f"Could not find info about volume `{volume_uuid}`", 

1759 LinstorVolumeManagerError.ERR_VOLUME_NOT_EXISTS 

1760 ) 

1761 

1762 def get_database_path(self): 

1763 """ 

1764 Get the database path. 

1765 :return: The current database path. 

1766 :rtype: str 

1767 """ 

1768 return self._request_database_path(self._linstor, activate=True) 

1769 

1770 def database_backup(self, name="", *, delay=0): 

1771 now = datetime.now() 

1772 # Throttling to avoid too many backups of the same kind on a short period 

1773 if delay: 

1774 _, date_latest = self._get_latest_database_backup(name) 

1775 if date_latest and ((now - date_latest).total_seconds() < delay): 

1776 return # No backup for now 

1777 

1778 # Create new backup with link to latest 

1779 filename = DATABASE_BACKUP_NAME_FORMAT.format(now.strftime(DATABASE_BACKUP_DATE_FORMAT), name) 

1780 self._linstor.controller_backupdb(filename) 

1781 # Copy to secondary backup location 

1782 with contextlib.suppress(OSError): 

1783 os.makedirs(DATABASE_BACKUP_DIR_SPARE, mode=0o755, exist_ok=True) 

1784 shutil.copy2( 

1785 (DATABASE_BACKUP_DIR_MAIN / filename).with_suffix(".zip"), 

1786 DATABASE_BACKUP_DIR_SPARE, 

1787 ) 

1788 for directory in (DATABASE_BACKUP_DIR_MAIN, DATABASE_BACKUP_DIR_SPARE): 

1789 # Remove and set latest 

1790 with contextlib.suppress(OSError): 

1791 (directory / DATABASE_BACKUP_NAME_LATEST).unlink() 

1792 os.link(str((directory / filename).with_suffix(".zip")), 

1793 str((directory / DATABASE_BACKUP_NAME_LATEST))) 

1794 # Apply retention 

1795 for old_file, _ in self._get_sorted_database_backup(directory)[DATABASE_BACKUP_RETENTION:]: 

1796 os.unlink(old_file) 

1797 util.SMlog("[database_backup] Created: {}".format(filename)) 

1798 

1799 @classmethod 

1800 def get_all_group_names(cls, base_name): 

1801 """ 

1802 Get all group names. I.e. list of current group + HA. 

1803 :param str base_name: The SR group_name to use. 

1804 :return: List of group names. 

1805 :rtype: list 

1806 """ 

1807 return [cls._build_group_name(base_name), cls._build_ha_group_name(base_name)] 

1808 

1809 @classmethod 

1810 def create_sr(cls, group_name, ips, redundancy, thin_provisioning, logger=default_logger.__func__): 

1811 """ 

1812 Create a new SR on the given nodes. 

1813 :param str group_name: The SR group_name to use. 

1814 :param set(str) ips: Node ips. 

1815 :param int redundancy: How many copy of volumes should we store? 

1816 :param bool thin_provisioning: Use thin or thick provisioning. 

1817 :param function logger: Function to log messages. 

1818 :return: A new LinstorSr instance. 

1819 :rtype: LinstorSr 

1820 """ 

1821 

1822 try: 

1823 cls._start_controller(start=True) 

1824 sr = cls._create_sr(group_name, ips, redundancy, thin_provisioning, logger) 

1825 finally: 

1826 # Controller must be stopped and volume unmounted because 

1827 # it is the role of the drbd-reactor daemon to do the right 

1828 # actions. 

1829 cls._start_controller(start=False) 

1830 cls._mount_volume( 

1831 cls.build_device_path(DATABASE_VOLUME_NAME), 

1832 DATABASE_PATH, 

1833 mount=False 

1834 ) 

1835 return sr 

1836 

1837 @classmethod 

1838 def _create_sr(cls, group_name, ips, redundancy, thin_provisioning, logger=default_logger.__func__): 

1839 # 1. Check if SR already exists. 

1840 uri = 'linstor://localhost' 

1841 

1842 lin = cls._create_linstor_instance(uri, keep_uri_unmodified=True) 

1843 

1844 node_names = list(ips.keys()) 

1845 for node_name, ip in ips.items(): 

1846 while True: 

1847 # Try to create node. 

1848 result = lin.node_create( 

1849 node_name, 

1850 linstor.consts.VAL_NODE_TYPE_CMBD, 

1851 ip 

1852 ) 

1853 

1854 errors = cls._filter_errors(result) 

1855 if cls._check_errors( 

1856 errors, [linstor.consts.FAIL_EXISTS_NODE] 

1857 ): 

1858 # If it already exists, remove, then recreate. 

1859 result = lin.node_delete(node_name) 

1860 error_str = cls._get_error_str(result) 

1861 if error_str: 

1862 raise LinstorVolumeManagerError( 

1863 'Failed to remove old node `{}`: {}' 

1864 .format(node_name, error_str) 

1865 ) 

1866 elif not errors: 

1867 break # Created! 

1868 else: 

1869 raise LinstorVolumeManagerError( 

1870 'Failed to create node `{}` with ip `{}`: {}'.format( 

1871 node_name, ip, cls._get_error_str(errors) 

1872 ) 

1873 ) 

1874 

1875 driver_pool_name = group_name 

1876 base_group_name = group_name 

1877 group_name = cls._build_group_name(group_name) 

1878 storage_pool_name = group_name 

1879 pools = lin.storage_pool_list_raise(filter_by_stor_pools=[storage_pool_name]).storage_pools 

1880 if pools: 

1881 existing_node_names = [pool.node_name for pool in pools] 

1882 raise LinstorVolumeManagerError( 

1883 'Unable to create SR `{}`. It already exists on node(s): {}' 

1884 .format(group_name, existing_node_names) 

1885 ) 

1886 

1887 if lin.resource_group_list_raise( 

1888 cls.get_all_group_names(base_group_name) 

1889 ).resource_groups: 

1890 if not lin.resource_dfn_list_raise().resource_definitions: 

1891 backup_path = cls._create_database_backup_path() 

1892 logger( 

1893 'Group name already exists `{}` without LVs. ' 

1894 'Ignoring and moving the config files in {}'.format(group_name, backup_path) 

1895 ) 

1896 cls._move_files(DATABASE_PATH, backup_path) 

1897 else: 

1898 raise LinstorVolumeManagerError( 

1899 'Unable to create SR `{}`: The group name already exists' 

1900 .format(group_name) 

1901 ) 

1902 

1903 if thin_provisioning: 

1904 driver_pool_parts = driver_pool_name.split('/') 

1905 if not len(driver_pool_parts) == 2: 

1906 raise LinstorVolumeManagerError( 

1907 'Invalid group name using thin provisioning. ' 

1908 'Expected format: \'VG/LV`\'' 

1909 ) 

1910 

1911 # 2. Create storage pool on each node + resource group. 

1912 reg_volume_group_not_found = re.compile( 

1913 ".*Volume group '.*' not found$" 

1914 ) 

1915 

1916 i = 0 

1917 try: 

1918 # 2.a. Create storage pools. 

1919 storage_pool_count = 0 

1920 while i < len(node_names): 

1921 node_name = node_names[i] 

1922 

1923 result = lin.storage_pool_create( 

1924 node_name=node_name, 

1925 storage_pool_name=storage_pool_name, 

1926 storage_driver='LVM_THIN' if thin_provisioning else 'LVM', 

1927 driver_pool_name=driver_pool_name 

1928 ) 

1929 

1930 errors = linstor.Linstor.filter_api_call_response_errors( 

1931 result 

1932 ) 

1933 if errors: 

1934 if len(errors) == 1 and errors[0].is_error( 

1935 linstor.consts.FAIL_STOR_POOL_CONFIGURATION_ERROR 

1936 ) and reg_volume_group_not_found.match(errors[0].message): 

1937 logger( 

1938 'Volume group `{}` not found on `{}`. Ignoring...' 

1939 .format(group_name, node_name) 

1940 ) 

1941 cls._destroy_storage_pool(lin, storage_pool_name, node_name) 

1942 else: 

1943 error_str = cls._get_error_str(result) 

1944 raise LinstorVolumeManagerError( 

1945 'Could not create SP `{}` on node `{}`: {}' 

1946 .format(group_name, node_name, error_str) 

1947 ) 

1948 else: 

1949 storage_pool_count += 1 

1950 i += 1 

1951 

1952 if not storage_pool_count: 

1953 raise LinstorVolumeManagerError( 

1954 'Unable to create SR `{}`: No VG group found'.format( 

1955 group_name, 

1956 ) 

1957 ) 

1958 

1959 # 2.b. Create resource groups. 

1960 ha_group_name = cls._build_ha_group_name(base_group_name) 

1961 cls._create_resource_group( 

1962 lin, 

1963 group_name, 

1964 storage_pool_name, 

1965 redundancy, 

1966 True 

1967 ) 

1968 cls._create_resource_group( 

1969 lin, 

1970 ha_group_name, 

1971 storage_pool_name, 

1972 3, 

1973 True 

1974 ) 

1975 

1976 # 3. Create the LINSTOR database volume and mount it. 

1977 try: 

1978 logger('Creating database volume...') 

1979 volume_path = cls._create_database_volume( 

1980 lin, ha_group_name, storage_pool_name, node_names, redundancy 

1981 ) 

1982 except LinstorVolumeManagerError as e: 

1983 if e.code != LinstorVolumeManagerError.ERR_VOLUME_EXISTS: 

1984 logger('Destroying database volume after creation fail...') 

1985 cls._force_destroy_database_volume(lin, group_name) 

1986 raise 

1987 

1988 try: 

1989 logger('Mounting database volume...') 

1990 

1991 # First we must disable the controller to move safely the 

1992 # LINSTOR config. 

1993 cls._start_controller(start=False) 

1994 

1995 cls._mount_database_volume(volume_path) 

1996 except Exception as e: 

1997 # Ensure we are connected because controller has been 

1998 # restarted during mount call. 

1999 logger('Destroying database volume after mount fail...') 

2000 

2001 try: 

2002 cls._start_controller(start=True) 

2003 except Exception: 

2004 pass 

2005 

2006 lin = cls._create_linstor_instance( 

2007 uri, keep_uri_unmodified=True 

2008 ) 

2009 cls._force_destroy_database_volume(lin, group_name) 

2010 raise e 

2011 

2012 cls._start_controller(start=True) 

2013 lin = cls._create_linstor_instance(uri, keep_uri_unmodified=True) 

2014 

2015 # 4. Remove storage pools/resource/volume group in the case of errors. 

2016 except Exception as e: 

2017 logger('Destroying resource group and storage pools after fail...') 

2018 try: 

2019 cls._destroy_resource_group(lin, group_name) 

2020 cls._destroy_resource_group(lin, ha_group_name) 

2021 except Exception as e2: 

2022 logger('Failed to destroy resource group: {}'.format(e2)) 

2023 pass 

2024 j = 0 

2025 i = min(i, len(node_names) - 1) 

2026 while j <= i: 

2027 try: 

2028 cls._destroy_storage_pool(lin, storage_pool_name, node_names[j]) 

2029 except Exception as e2: 

2030 logger('Failed to destroy resource group: {}'.format(e2)) 

2031 pass 

2032 j += 1 

2033 raise e 

2034 

2035 # 5. Return new instance. 

2036 instance = cls.__new__(cls) 

2037 instance._linstor = lin 

2038 instance._logger = logger 

2039 instance._redundancy = redundancy 

2040 instance._base_group_name = base_group_name 

2041 instance._group_name = group_name 

2042 instance._volumes = set() 

2043 instance._storage_pools_time = 0 

2044 instance._kv_cache = instance._create_kv_cache() 

2045 instance._resource_cache = None 

2046 instance._resource_cache_dirty = True 

2047 instance._volume_info_cache = None 

2048 instance._volume_info_cache_dirty = True 

2049 return instance 

2050 

2051 @classmethod 

2052 def build_device_path(cls, volume_name): 

2053 """ 

2054 Build a device path given a volume name. 

2055 :param str volume_name: The volume name to use. 

2056 :return: A valid or not device path. 

2057 :rtype: str 

2058 """ 

2059 

2060 return '{}{}/0'.format(cls.DEV_ROOT_PATH, volume_name) 

2061 

2062 @classmethod 

2063 def build_volume_name(cls, base_name): 

2064 """ 

2065 Build a volume name given a base name (i.e. a UUID). 

2066 :param str base_name: The volume name to use. 

2067 :return: A valid or not device path. 

2068 :rtype: str 

2069 """ 

2070 return '{}{}'.format(cls.PREFIX_VOLUME, base_name) 

2071 

2072 @classmethod 

2073 def round_up_volume_size(cls, volume_size): 

2074 """ 

2075 Align volume size on higher multiple of BLOCK_SIZE. 

2076 :param int volume_size: The volume size to align. 

2077 :return: An aligned volume size. 

2078 :rtype: int 

2079 """ 

2080 return round_up(volume_size, cls.BLOCK_SIZE) 

2081 

2082 @classmethod 

2083 def round_down_volume_size(cls, volume_size): 

2084 """ 

2085 Align volume size on lower multiple of BLOCK_SIZE. 

2086 :param int volume_size: The volume size to align. 

2087 :return: An aligned volume size. 

2088 :rtype: int 

2089 """ 

2090 return round_down(volume_size, cls.BLOCK_SIZE) 

2091 

2092 # -------------------------------------------------------------------------- 

2093 # Private helpers. 

2094 # -------------------------------------------------------------------------- 

2095 

2096 def _create_kv_cache(self): 

2097 self._kv_cache = self._create_linstor_kv('/') 

2098 self._kv_cache_dirty = False 

2099 return self._kv_cache 

2100 

2101 def _get_kv_cache(self): 

2102 if self._kv_cache_dirty: 

2103 self._kv_cache = self._create_kv_cache() 

2104 return self._kv_cache 

2105 

2106 def _create_resource_cache(self): 

2107 self._resource_cache = self._linstor.resource_list_raise() 

2108 self._resource_cache_dirty = False 

2109 return self._resource_cache 

2110 

2111 def _get_resource_cache(self): 

2112 if self._resource_cache_dirty: 

2113 self._resource_cache = self._create_resource_cache() 

2114 return self._resource_cache 

2115 

2116 def _mark_resource_cache_as_dirty(self): 

2117 self._resource_cache_dirty = True 

2118 self._volume_info_cache_dirty = True 

2119 

2120 # -------------------------------------------------------------------------- 

2121 

2122 def _ensure_volume_exists(self, volume_uuid): 

2123 if volume_uuid not in self._volumes: 

2124 raise LinstorVolumeManagerError( 

2125 'volume `{}` doesn\'t exist'.format(volume_uuid), 

2126 LinstorVolumeManagerError.ERR_VOLUME_NOT_EXISTS 

2127 ) 

2128 

2129 def _find_best_size_candidates(self): 

2130 result = self._linstor.resource_group_qmvs(self._group_name) 

2131 error_str = self._get_error_str(result) 

2132 if error_str: 

2133 raise LinstorVolumeManagerError( 

2134 'Failed to get max volume size allowed of SR `{}`: {}'.format( 

2135 self._group_name, 

2136 error_str 

2137 ) 

2138 ) 

2139 return result[0].candidates 

2140 

2141 def _fetch_resource_names(self, ignore_deleted=True): 

2142 resource_names = set() 

2143 dfns = self._linstor.resource_dfn_list_raise().resource_definitions 

2144 for dfn in dfns: 

2145 if dfn.resource_group_name in self.get_all_group_names(self._base_group_name) and ( 

2146 ignore_deleted or 

2147 linstor.consts.FLAG_DELETE not in dfn.flags 

2148 ): 

2149 resource_names.add(dfn.name) 

2150 return resource_names 

2151 

2152 def _get_volumes_info(self, volume_names=None): 

2153 all_volume_info = {} 

2154 

2155 if not self._volume_info_cache_dirty: 

2156 return self._volume_info_cache 

2157 

2158 # `volume_names` MUST contain all volumes registered in the KV store. 

2159 # It can be provided to the function to avoid double fetching. 

2160 if not volume_names: 

2161 volume_names = self.get_volumes_with_name() 

2162 volume_names = set(volume_names.values()) 

2163 

2164 def process_resource(resource): 

2165 if resource.name not in all_volume_info: 

2166 current = all_volume_info[resource.name] = self.VolumeInfo( 

2167 resource.name 

2168 ) 

2169 else: 

2170 current = all_volume_info[resource.name] 

2171 

2172 if linstor.consts.FLAG_DISKLESS not in resource.flags: 

2173 current.diskful.append(resource.node_name) 

2174 

2175 for volume in resource.volumes: 

2176 # We ignore diskless pools of the form "DfltDisklessStorPool". 

2177 if volume.storage_pool_name != self._group_name: 

2178 continue 

2179 # Only fetch first volume. 

2180 if volume.number != 0: 

2181 continue 

2182 

2183 allocated_size = volume.allocated_size 

2184 if allocated_size > current.allocated_size: 

2185 current.allocated_size = allocated_size 

2186 

2187 usable_size = volume.usable_size 

2188 if usable_size > 0 and ( 

2189 usable_size < current.virtual_size or 

2190 not current.virtual_size 

2191 ): 

2192 current.virtual_size = usable_size 

2193 

2194 try: 

2195 for resource in self._get_resource_cache().resources: 

2196 if resource.name in volume_names: 

2197 process_resource(resource) 

2198 for volume in all_volume_info.values(): 

2199 if volume.allocated_size <= 0: 

2200 raise LinstorVolumeManagerError('Failed to get allocated size of `{}`'.format(resource.name)) 

2201 

2202 if volume.virtual_size <= 0: 

2203 raise LinstorVolumeManagerError('Failed to get usable size of `{}`'.format(volume.name)) 

2204 

2205 volume.allocated_size *= 1024 

2206 volume.virtual_size *= 1024 

2207 except LinstorVolumeManagerError: 

2208 self._mark_resource_cache_as_dirty() 

2209 raise 

2210 

2211 self._volume_info_cache_dirty = False 

2212 self._volume_info_cache = all_volume_info 

2213 

2214 return all_volume_info 

2215 

2216 def _get_volume_node_names_and_size(self, volume_name): 

2217 node_names = set() 

2218 size = -1 

2219 for resource in self._linstor.resource_list_raise( 

2220 filter_by_resources=[volume_name] 

2221 ).resources: 

2222 for volume in resource.volumes: 

2223 # We ignore diskless pools of the form "DfltDisklessStorPool". 

2224 if volume.storage_pool_name != self._group_name: 

2225 continue 

2226 

2227 node_names.add(resource.node_name) 

2228 

2229 usable_size = volume.usable_size 

2230 if usable_size <= 0: 

2231 continue 

2232 

2233 if size < 0: 

2234 size = usable_size 

2235 else: 

2236 size = min(size, usable_size) 

2237 

2238 if size <= 0: 

2239 raise LinstorVolumeManagerError('Failed to get usable size of `{}`'.format(resource.name)) 

2240 

2241 return (node_names, size * 1024) 

2242 

2243 def _compute_size(self, attr): 

2244 capacity = 0 

2245 for pool in self._get_storage_pools(force=True): 

2246 space = pool.free_space 

2247 if space: 

2248 size = getattr(space, attr) 

2249 if size < 0: 

2250 raise LinstorVolumeManagerError( 

2251 'Failed to get pool {} attr of `{}`' 

2252 .format(attr, pool.node_name) 

2253 ) 

2254 capacity += size 

2255 return capacity * 1024 

2256 

2257 def _get_node_names(self): 

2258 node_names = set() 

2259 for pool in self._get_storage_pools(): 

2260 node_names.add(pool.node_name) 

2261 return node_names 

2262 

2263 def _get_storage_pools(self, force=False): 

2264 cur_time = time.time() 

2265 elsaped_time = cur_time - self._storage_pools_time 

2266 

2267 if force or elsaped_time >= self.STORAGE_POOLS_FETCH_INTERVAL: 

2268 self._storage_pools = self._linstor.storage_pool_list_raise( 

2269 filter_by_stor_pools=[self._group_name] 

2270 ).storage_pools 

2271 self._storage_pools_time = time.time() 

2272 

2273 return self._storage_pools 

2274 

2275 def _create_volume( 

2276 self, 

2277 volume_uuid, 

2278 volume_name, 

2279 size, 

2280 place_resources, 

2281 high_availability 

2282 ): 

2283 size = self.round_up_volume_size(size) 

2284 self._mark_resource_cache_as_dirty() 

2285 

2286 group_name = self._ha_group_name if high_availability else self._group_name 

2287 def create_definition(): 

2288 first_attempt = True 

2289 while True: 

2290 try: 

2291 self._check_volume_creation_errors( 

2292 self._linstor.resource_group_spawn( 

2293 rsc_grp_name=group_name, 

2294 rsc_dfn_name=volume_name, 

2295 vlm_sizes=['{}B'.format(size)], 

2296 definitions_only=True 

2297 ), 

2298 volume_uuid, 

2299 self._group_name 

2300 ) 

2301 break 

2302 except LinstorVolumeManagerError as e: 

2303 if ( 

2304 not first_attempt or 

2305 not high_availability or 

2306 e.code != LinstorVolumeManagerError.ERR_GROUP_NOT_EXISTS 

2307 ): 

2308 raise 

2309 

2310 first_attempt = False 

2311 self._create_resource_group( 

2312 self._linstor, 

2313 group_name, 

2314 self._group_name, 

2315 3, 

2316 True 

2317 ) 

2318 

2319 self._configure_volume_peer_slots(self._linstor, volume_name) 

2320 

2321 def clean(): 

2322 try: 

2323 self._destroy_volume(volume_uuid, force=True, preserve_properties=True) 

2324 except Exception as e: 

2325 self._logger( 

2326 'Unable to destroy volume {} after creation fail: {}' 

2327 .format(volume_uuid, e) 

2328 ) 

2329 

2330 def create(): 

2331 try: 

2332 create_definition() 

2333 if place_resources: 

2334 # Basic case when we use the default redundancy of the group. 

2335 self._check_volume_creation_errors( 

2336 self._linstor.resource_auto_place( 

2337 rsc_name=volume_name, 

2338 place_count=self._redundancy, 

2339 diskless_on_remaining=False 

2340 ), 

2341 volume_uuid, 

2342 self._group_name 

2343 ) 

2344 except LinstorVolumeManagerError as e: 

2345 if e.code != LinstorVolumeManagerError.ERR_VOLUME_EXISTS: 

2346 clean() 

2347 raise 

2348 except Exception: 

2349 clean() 

2350 raise 

2351 

2352 util.retry(create, maxretry=5) 

2353 

2354 def _create_volume_with_properties( 

2355 self, 

2356 volume_uuid, 

2357 volume_name, 

2358 size, 

2359 place_resources, 

2360 high_availability 

2361 ): 

2362 if self.check_volume_exists(volume_uuid): 

2363 raise LinstorVolumeManagerError( 

2364 'Could not create volume `{}` from SR `{}`, it already exists' 

2365 .format(volume_uuid, self._group_name) + ' in properties', 

2366 LinstorVolumeManagerError.ERR_VOLUME_EXISTS 

2367 ) 

2368 

2369 if volume_name in self._fetch_resource_names(): 

2370 raise LinstorVolumeManagerError( 

2371 'Could not create volume `{}` from SR `{}`, '.format( 

2372 volume_uuid, self._group_name 

2373 ) + 'resource of the same name already exists in LINSTOR' 

2374 ) 

2375 

2376 # I am paranoid. 

2377 volume_properties = self._get_volume_properties(volume_uuid) 

2378 if (volume_properties.get(self.PROP_NOT_EXISTS) is not None): 

2379 raise LinstorVolumeManagerError( 

2380 'Could not create volume `{}`, '.format(volume_uuid) + 

2381 'properties already exist' 

2382 ) 

2383 

2384 try: 

2385 volume_properties[self.PROP_NOT_EXISTS] = self.STATE_CREATING 

2386 volume_properties[self.PROP_VOLUME_NAME] = volume_name 

2387 

2388 self._create_volume( 

2389 volume_uuid, 

2390 volume_name, 

2391 size, 

2392 place_resources, 

2393 high_availability 

2394 ) 

2395 

2396 assert volume_properties.namespace == \ 

2397 self._build_volume_namespace(volume_uuid) 

2398 return volume_properties 

2399 except LinstorVolumeManagerError as e: 

2400 # Do not destroy existing resource! 

2401 # In theory we can't get this error because we check this event 

2402 # before the `self._create_volume` case. 

2403 # It can only happen if the same volume uuid is used in the same 

2404 # call in another host. 

2405 if e.code != LinstorVolumeManagerError.ERR_VOLUME_EXISTS: 

2406 self._destroy_volume(volume_uuid, force=True) 

2407 raise 

2408 

2409 def _find_device_path(self, volume_uuid, volume_name): 

2410 current_device_path = self._request_device_path( 

2411 volume_uuid, volume_name, activate=True 

2412 ) 

2413 

2414 # We use realpath here to get the /dev/drbd<id> path instead of 

2415 # /dev/drbd/by-res/<resource_name>. 

2416 expected_device_path = self.build_device_path(volume_name) 

2417 util.wait_for_path(expected_device_path, 5) 

2418 

2419 device_realpath = os.path.realpath(expected_device_path) 

2420 if current_device_path != device_realpath: 

2421 raise LinstorVolumeManagerError( 

2422 'Invalid path, current={}, expected={} (realpath={})' 

2423 .format( 

2424 current_device_path, 

2425 expected_device_path, 

2426 device_realpath 

2427 ) 

2428 ) 

2429 return expected_device_path 

2430 

2431 def _request_device_path(self, volume_uuid, volume_name, activate=False): 

2432 node_name = socket.gethostname() 

2433 

2434 resource = next(filter( 

2435 lambda resource: resource.node_name == node_name and 

2436 resource.name == volume_name, 

2437 self._get_resource_cache().resources 

2438 ), None) 

2439 

2440 if not resource: 

2441 if activate: 

2442 self._mark_resource_cache_as_dirty() 

2443 self._activate_device_path( 

2444 self._linstor, node_name, volume_name 

2445 ) 

2446 return self._request_device_path(volume_uuid, volume_name) 

2447 raise LinstorVolumeManagerError( 

2448 'Unable to get dev path for `{}`, no resource found but definition "seems" to exist' 

2449 .format(volume_uuid) 

2450 ) 

2451 

2452 # Contains a path of the /dev/drbd<id> form. 

2453 device_path = resource.volumes[0].device_path 

2454 if not device_path: 

2455 raise LinstorVolumeManagerError('Empty dev path for `{}`!'.format(volume_uuid)) 

2456 return device_path 

2457 

2458 def _destroy_resource(self, resource_name, force=False): 

2459 result = self._linstor.resource_dfn_delete(resource_name) 

2460 error_str = self._get_error_str(result) 

2461 if not error_str: 

2462 self._mark_resource_cache_as_dirty() 

2463 return 

2464 

2465 if not force: 

2466 self._mark_resource_cache_as_dirty() 

2467 raise LinstorVolumeManagerError( 

2468 'Could not destroy resource `{}` from SR `{}`: {}' 

2469 .format(resource_name, self._group_name, error_str) 

2470 ) 

2471 

2472 # If force is used, ensure there is no opener. 

2473 all_openers = get_all_volume_openers(resource_name, '0') 

2474 for openers in all_openers.values(): 

2475 if openers: 

2476 self._mark_resource_cache_as_dirty() 

2477 raise LinstorVolumeManagerError( 

2478 'Could not force destroy resource `{}` from SR `{}`: {} (openers=`{}`)' 

2479 .format(resource_name, self._group_name, error_str, all_openers) 

2480 ) 

2481 

2482 # Maybe the resource is blocked in primary mode. DRBD/LINSTOR issue? 

2483 resource_states = filter( 

2484 lambda resource_state: resource_state.name == resource_name, 

2485 self._get_resource_cache().resource_states 

2486 ) 

2487 

2488 # Mark only after computation of states. 

2489 self._mark_resource_cache_as_dirty() 

2490 

2491 for resource_state in resource_states: 

2492 volume_state = resource_state.volume_states[0] 

2493 if resource_state.in_use: 

2494 demote_drbd_resource(resource_state.node_name, resource_name) 

2495 break 

2496 self._destroy_resource(resource_name) 

2497 

2498 def _destroy_volume(self, volume_uuid, force=False, preserve_properties=False): 

2499 volume_properties = self._get_volume_properties(volume_uuid) 

2500 try: 

2501 volume_name = volume_properties.get(self.PROP_VOLUME_NAME) 

2502 if volume_name in self._fetch_resource_names(): 

2503 self._destroy_resource(volume_name, force) 

2504 

2505 # Assume this call is atomic. 

2506 if not preserve_properties: 

2507 volume_properties.clear() 

2508 except Exception as e: 

2509 raise LinstorVolumeManagerError( 

2510 'Cannot destroy volume `{}`: {}'.format(volume_uuid, e) 

2511 ) 

2512 

2513 def _build_volumes(self, repair): 

2514 properties = self._kv_cache 

2515 resource_names = self._fetch_resource_names() 

2516 

2517 self._volumes = set() 

2518 

2519 updating_uuid_volumes = self._get_volumes_by_property( 

2520 self.REG_UPDATING_UUID_SRC, ignore_inexisting_volumes=False 

2521 ) 

2522 if updating_uuid_volumes and not repair: 

2523 raise LinstorVolumeManagerError( 

2524 'Cannot build LINSTOR volume list: ' 

2525 'It exists invalid "updating uuid volumes", repair is required' 

2526 ) 

2527 

2528 existing_volumes = self._get_volumes_by_property( 

2529 self.REG_NOT_EXISTS, ignore_inexisting_volumes=False 

2530 ) 

2531 for volume_uuid, not_exists in existing_volumes.items(): 

2532 properties.namespace = self._build_volume_namespace(volume_uuid) 

2533 

2534 src_uuid = properties.get(self.PROP_UPDATING_UUID_SRC) 

2535 if src_uuid: 

2536 self._logger( 

2537 'Ignoring volume during manager initialization with prop ' 

2538 ' PROP_UPDATING_UUID_SRC: {} (properties={})' 

2539 .format( 

2540 volume_uuid, 

2541 self._get_filtered_properties(properties) 

2542 ) 

2543 ) 

2544 continue 

2545 

2546 # Insert volume in list if the volume exists. Or if the volume 

2547 # is being created and a slave wants to use it (repair = False). 

2548 # 

2549 # If we are on the master and if repair is True and state is 

2550 # Creating, it's probably a bug or crash: the creation process has 

2551 # been stopped. 

2552 if not_exists == self.STATE_EXISTS or ( 

2553 not repair and not_exists == self.STATE_CREATING 

2554 ): 

2555 self._volumes.add(volume_uuid) 

2556 continue 

2557 

2558 if not repair: 

2559 self._logger( 

2560 'Ignoring bad volume during manager initialization: {} ' 

2561 '(properties={})'.format( 

2562 volume_uuid, 

2563 self._get_filtered_properties(properties) 

2564 ) 

2565 ) 

2566 continue 

2567 

2568 # Remove bad volume. 

2569 try: 

2570 self._logger( 

2571 'Removing bad volume during manager initialization: {} ' 

2572 '(properties={})'.format( 

2573 volume_uuid, 

2574 self._get_filtered_properties(properties) 

2575 ) 

2576 ) 

2577 volume_name = properties.get(self.PROP_VOLUME_NAME) 

2578 

2579 # Little optimization, don't call `self._destroy_volume`, 

2580 # we already have resource name list. 

2581 if volume_name in resource_names: 

2582 self._destroy_resource(volume_name, force=True) 

2583 

2584 # Assume this call is atomic. 

2585 properties.clear() 

2586 except Exception as e: 

2587 # Do not raise, we don't want to block user action. 

2588 self._logger( 

2589 'Cannot clean volume {}: {}'.format(volume_uuid, e) 

2590 ) 

2591 

2592 # The volume can't be removed, maybe it's still in use, 

2593 # in this case rename it with the "DELETED_" prefix. 

2594 # This prefix is mandatory if it exists a snap transaction to 

2595 # rollback because the original VDI UUID can try to be renamed 

2596 # with the UUID we are trying to delete... 

2597 if not volume_uuid.startswith('DELETED_'): 

2598 self.update_volume_uuid( 

2599 volume_uuid, 'DELETED_' + volume_uuid, force=True 

2600 ) 

2601 

2602 for dest_uuid, src_uuid in updating_uuid_volumes.items(): 

2603 dest_namespace = self._build_volume_namespace(dest_uuid) 

2604 

2605 properties.namespace = dest_namespace 

2606 if int(properties.get(self.PROP_NOT_EXISTS)): 

2607 properties.clear() 

2608 continue 

2609 

2610 properties.namespace = self._build_volume_namespace(src_uuid) 

2611 properties.clear() 

2612 

2613 properties.namespace = dest_namespace 

2614 properties.pop(self.PROP_UPDATING_UUID_SRC) 

2615 

2616 if src_uuid in self._volumes: 

2617 self._volumes.remove(src_uuid) 

2618 self._volumes.add(dest_uuid) 

2619 

2620 def _get_sr_properties(self): 

2621 return self._create_linstor_kv(self._build_sr_namespace()) 

2622 

2623 def _get_volumes_by_property( 

2624 self, reg_prop, ignore_inexisting_volumes=True 

2625 ): 

2626 base_properties = self._get_kv_cache() 

2627 base_properties.namespace = self._build_volume_namespace() 

2628 

2629 volume_properties = {} 

2630 for volume_uuid in self._volumes: 

2631 volume_properties[volume_uuid] = '' 

2632 

2633 for key, value in base_properties.items(): 

2634 res = reg_prop.match(key) 

2635 if res: 

2636 volume_uuid = res.groups()[0] 

2637 if not ignore_inexisting_volumes or \ 

2638 volume_uuid in self._volumes: 

2639 volume_properties[volume_uuid] = value 

2640 

2641 return volume_properties 

2642 

2643 def _create_linstor_kv(self, namespace): 

2644 return linstor.KV( 

2645 self._group_name, 

2646 uri=self._linstor.controller_host(), 

2647 namespace=namespace 

2648 ) 

2649 

2650 def _get_volume_properties(self, volume_uuid): 

2651 properties = self._get_kv_cache() 

2652 properties.namespace = self._build_volume_namespace(volume_uuid) 

2653 return properties 

2654 

2655 def _list_database_backup(self, database_backup_dir, name="*"): 

2656 for path in database_backup_dir.glob(DATABASE_BACKUP_NAME_FORMAT.format( 

2657 "20[0-9][0-9][01][0-9][0-3][0-9]_[0-2][0-9][0-5][0-9][0-5][0-9]", name) + ".zip"): 

2658 try: 

2659 yield path, datetime.strptime(path.name.split("-")[1], DATABASE_BACKUP_DATE_FORMAT) 

2660 except (ValueError, IndexError): 

2661 continue 

2662 

2663 def _get_sorted_database_backup(self, database_backup_dir, name="*"): 

2664 return sorted(self._list_database_backup(database_backup_dir, name), 

2665 reverse=True, 

2666 key=lambda p: p[0].stat().st_mtime) 

2667 

2668 def _get_latest_database_backup(self, name="*"): 

2669 return max(self._list_database_backup(DATABASE_BACKUP_DIR_MAIN, name), 

2670 default=(None, None), 

2671 key=lambda p: p[0].stat().st_mtime) 

2672 

2673 @classmethod 

2674 def _build_sr_namespace(cls): 

2675 return '/{}/'.format(cls.NAMESPACE_SR) 

2676 

2677 @classmethod 

2678 def _build_volume_namespace(cls, volume_uuid=None): 

2679 # Return a path to all volumes if `volume_uuid` is not given. 

2680 if volume_uuid is None: 

2681 return '/{}/'.format(cls.NAMESPACE_VOLUME) 

2682 return '/{}/{}/'.format(cls.NAMESPACE_VOLUME, volume_uuid) 

2683 

2684 @classmethod 

2685 def _get_error_str(cls, result): 

2686 return ', '.join([ 

2687 err.message for err in cls._filter_errors(result) 

2688 ]) 

2689 

2690 @classmethod 

2691 def _create_linstor_instance( 

2692 cls, uri, keep_uri_unmodified=False, attempt_count=30 

2693 ): 

2694 retry = False 

2695 

2696 def connect(uri): 

2697 if not uri: 

2698 uri = get_controller_uri() 

2699 if not uri: 

2700 raise LinstorVolumeManagerError( 

2701 'Unable to find controller uri...' 

2702 ) 

2703 instance = linstor.Linstor(uri, keep_alive=True) 

2704 instance.connect() 

2705 return instance 

2706 

2707 try: 

2708 return connect(uri) 

2709 except (linstor.errors.LinstorNetworkError, LinstorVolumeManagerError): 

2710 pass 

2711 

2712 if not keep_uri_unmodified: 

2713 uri = None 

2714 

2715 return util.retry( 

2716 lambda: connect(uri), 

2717 maxretry=attempt_count, 

2718 period=1, 

2719 exceptions=[ 

2720 linstor.errors.LinstorNetworkError, 

2721 LinstorVolumeManagerError 

2722 ] 

2723 ) 

2724 

2725 @classmethod 

2726 def _configure_volume_peer_slots(cls, lin, volume_name): 

2727 result = lin.resource_dfn_modify(volume_name, {}, peer_slots=3) 

2728 error_str = cls._get_error_str(result) 

2729 if error_str: 

2730 raise LinstorVolumeManagerError( 

2731 'Could not configure volume peer slots of {}: {}' 

2732 .format(volume_name, error_str) 

2733 ) 

2734 

2735 @classmethod 

2736 def _activate_device_path(cls, lin, node_name, volume_name): 

2737 result = lin.resource_make_available(node_name, volume_name, diskful=False) 

2738 if linstor.Linstor.all_api_responses_no_error(result): 

2739 return 

2740 errors = linstor.Linstor.filter_api_call_response_errors(result) 

2741 if len(errors) == 1 and errors[0].is_error( 

2742 linstor.consts.FAIL_EXISTS_RSC 

2743 ): 

2744 return 

2745 

2746 raise LinstorVolumeManagerError( 

2747 'Unable to activate device path of `{}` on node `{}`: {}' 

2748 .format(volume_name, node_name, ', '.join( 

2749 [str(x) for x in result])) 

2750 ) 

2751 

2752 @classmethod 

2753 def _request_database_path(cls, lin, activate=False): 

2754 node_name = socket.gethostname() 

2755 

2756 try: 

2757 resource = next(filter( 

2758 lambda resource: resource.node_name == node_name and 

2759 resource.name == DATABASE_VOLUME_NAME, 

2760 lin.resource_list_raise().resources 

2761 ), None) 

2762 except Exception as e: 

2763 raise LinstorVolumeManagerError( 

2764 'Unable to fetch database resource: {}' 

2765 .format(e) 

2766 ) 

2767 

2768 if not resource: 

2769 if activate: 

2770 cls._activate_device_path( 

2771 lin, node_name, DATABASE_VOLUME_NAME 

2772 ) 

2773 return cls._request_database_path( 

2774 DATABASE_VOLUME_NAME, DATABASE_VOLUME_NAME 

2775 ) 

2776 raise LinstorVolumeManagerError( 

2777 'Empty dev path for `{}`, but definition "seems" to exist' 

2778 .format(DATABASE_PATH) 

2779 ) 

2780 # Contains a path of the /dev/drbd<id> form. 

2781 return resource.volumes[0].device_path 

2782 

2783 @classmethod 

2784 def _create_database_volume( 

2785 cls, lin, group_name, storage_pool_name, node_names, redundancy 

2786 ): 

2787 try: 

2788 dfns = lin.resource_dfn_list_raise().resource_definitions 

2789 except Exception as e: 

2790 raise LinstorVolumeManagerError( 

2791 'Unable to get definitions during database creation: {}' 

2792 .format(e) 

2793 ) 

2794 

2795 if dfns: 

2796 raise LinstorVolumeManagerError( 

2797 'Could not create volume `{}` from SR `{}`, '.format( 

2798 DATABASE_VOLUME_NAME, group_name 

2799 ) + 'LINSTOR volume list must be empty.' 

2800 ) 

2801 

2802 # Workaround to use thin lvm. Without this line an error is returned: 

2803 # "Not enough available nodes" 

2804 # I don't understand why but this command protect against this bug. 

2805 try: 

2806 pools = lin.storage_pool_list_raise( 

2807 filter_by_stor_pools=[storage_pool_name] 

2808 ) 

2809 except Exception as e: 

2810 raise LinstorVolumeManagerError( 

2811 'Failed to get storage pool list before database creation: {}' 

2812 .format(e) 

2813 ) 

2814 

2815 # Ensure we have a correct list of storage pools. 

2816 assert pools.storage_pools # We must have at least one storage pool! 

2817 nodes_with_pool = list(map(lambda pool: pool.node_name, pools.storage_pools)) 

2818 for node_name in nodes_with_pool: 

2819 assert node_name in node_names 

2820 util.SMlog('Nodes with storage pool: {}'.format(nodes_with_pool)) 

2821 

2822 # Create the database definition. 

2823 size = cls.round_up_volume_size(DATABASE_SIZE) 

2824 cls._check_volume_creation_errors(lin.resource_group_spawn( 

2825 rsc_grp_name=group_name, 

2826 rsc_dfn_name=DATABASE_VOLUME_NAME, 

2827 vlm_sizes=['{}B'.format(size)], 

2828 definitions_only=True 

2829 ), DATABASE_VOLUME_NAME, group_name) 

2830 cls._configure_volume_peer_slots(lin, DATABASE_VOLUME_NAME) 

2831 

2832 # Create real resources on the first nodes. 

2833 resources = [] 

2834 

2835 diskful_nodes = [] 

2836 diskless_nodes = [] 

2837 for node_name in node_names: 

2838 if node_name in nodes_with_pool: 

2839 diskful_nodes.append(node_name) 

2840 else: 

2841 diskless_nodes.append(node_name) 

2842 

2843 assert diskful_nodes 

2844 for node_name in diskful_nodes[:redundancy]: 

2845 util.SMlog('Create database diskful on {}'.format(node_name)) 

2846 resources.append(linstor.ResourceData( 

2847 node_name=node_name, 

2848 rsc_name=DATABASE_VOLUME_NAME, 

2849 storage_pool=storage_pool_name 

2850 )) 

2851 # Create diskless resources on the remaining set. 

2852 for node_name in diskful_nodes[redundancy:] + diskless_nodes: 

2853 util.SMlog('Create database diskless on {}'.format(node_name)) 

2854 resources.append(linstor.ResourceData( 

2855 node_name=node_name, 

2856 rsc_name=DATABASE_VOLUME_NAME, 

2857 diskless=True 

2858 )) 

2859 

2860 result = lin.resource_create(resources) 

2861 error_str = cls._get_error_str(result) 

2862 if error_str: 

2863 raise LinstorVolumeManagerError( 

2864 'Could not create database volume from SR `{}`: {}'.format( 

2865 group_name, error_str 

2866 ) 

2867 ) 

2868 

2869 # Create database and ensure path exists locally and 

2870 # on replicated devices. 

2871 current_device_path = cls._request_database_path(lin, activate=True) 

2872 

2873 # Ensure diskless paths exist on other hosts. Otherwise PBDs can't be 

2874 # plugged. 

2875 for node_name in node_names: 

2876 cls._activate_device_path(lin, node_name, DATABASE_VOLUME_NAME) 

2877 

2878 # We use realpath here to get the /dev/drbd<id> path instead of 

2879 # /dev/drbd/by-res/<resource_name>. 

2880 expected_device_path = cls.build_device_path(DATABASE_VOLUME_NAME) 

2881 util.wait_for_path(expected_device_path, 5) 

2882 

2883 device_realpath = os.path.realpath(expected_device_path) 

2884 if current_device_path != device_realpath: 

2885 raise LinstorVolumeManagerError( 

2886 'Invalid path, current={}, expected={} (realpath={})' 

2887 .format( 

2888 current_device_path, 

2889 expected_device_path, 

2890 device_realpath 

2891 ) 

2892 ) 

2893 

2894 try: 

2895 util.retry( 

2896 lambda: util.pread2([DATABASE_MKFS, expected_device_path]), 

2897 maxretry=5 

2898 ) 

2899 except Exception as e: 

2900 raise LinstorVolumeManagerError( 

2901 'Failed to execute {} on database volume: {}' 

2902 .format(DATABASE_MKFS, e) 

2903 ) 

2904 

2905 return expected_device_path 

2906 

2907 @classmethod 

2908 def _destroy_database_volume(cls, lin, group_name): 

2909 error_str = cls._get_error_str( 

2910 lin.resource_dfn_delete(DATABASE_VOLUME_NAME) 

2911 ) 

2912 if error_str: 

2913 raise LinstorVolumeManagerError( 

2914 'Could not destroy resource `{}` from SR `{}`: {}' 

2915 .format(DATABASE_VOLUME_NAME, group_name, error_str) 

2916 ) 

2917 

2918 @classmethod 

2919 def _mount_database_volume(cls, volume_path, mount=True, force=False): 

2920 try: 

2921 # 1. Create a backup config folder. 

2922 database_not_empty = bool(os.listdir(DATABASE_PATH)) 

2923 backup_path = cls._create_database_backup_path() 

2924 

2925 # 2. Move the config in the mounted volume. 

2926 if database_not_empty: 

2927 cls._move_files(DATABASE_PATH, backup_path) 

2928 

2929 cls._mount_volume(volume_path, DATABASE_PATH, mount) 

2930 

2931 if database_not_empty: 

2932 cls._move_files(backup_path, DATABASE_PATH, force) 

2933 

2934 # 3. Remove useless backup directory. 

2935 try: 

2936 os.rmdir(backup_path) 

2937 except Exception as e: 

2938 raise LinstorVolumeManagerError( 

2939 'Failed to remove backup path {} of LINSTOR config: {}' 

2940 .format(backup_path, e) 

2941 ) 

2942 except Exception as e: 

2943 def force_exec(fn): 

2944 try: 

2945 fn() 

2946 except Exception: 

2947 pass 

2948 

2949 if mount == cls._is_mounted(DATABASE_PATH): 

2950 force_exec(lambda: cls._move_files( 

2951 DATABASE_PATH, backup_path 

2952 )) 

2953 force_exec(lambda: cls._mount_volume( 

2954 volume_path, DATABASE_PATH, not mount 

2955 )) 

2956 

2957 if mount != cls._is_mounted(DATABASE_PATH): 

2958 force_exec(lambda: cls._move_files( 

2959 backup_path, DATABASE_PATH 

2960 )) 

2961 

2962 force_exec(lambda: os.rmdir(backup_path)) 

2963 raise e 

2964 

2965 @classmethod 

2966 def _force_destroy_database_volume(cls, lin, group_name): 

2967 try: 

2968 cls._destroy_database_volume(lin, group_name) 

2969 except Exception: 

2970 pass 

2971 

2972 @classmethod 

2973 def _destroy_storage_pool(cls, lin, group_name, node_name): 

2974 def destroy(): 

2975 result = lin.storage_pool_delete(node_name, group_name) 

2976 errors = cls._filter_errors(result) 

2977 if cls._check_errors(errors, [ 

2978 linstor.consts.FAIL_NOT_FOUND_STOR_POOL, 

2979 linstor.consts.FAIL_NOT_FOUND_STOR_POOL_DFN 

2980 ]): 

2981 return 

2982 

2983 if errors: 

2984 raise LinstorVolumeManagerError( 

2985 'Failed to destroy SP `{}` on node `{}`: {}'.format( 

2986 group_name, 

2987 node_name, 

2988 cls._get_error_str(errors) 

2989 ) 

2990 ) 

2991 

2992 # We must retry to avoid errors like: 

2993 # "can not be deleted as volumes / snapshot-volumes are still using it" 

2994 # after LINSTOR database volume destruction. 

2995 return util.retry(destroy, maxretry=10) 

2996 

2997 @classmethod 

2998 def _create_resource_group( 

2999 cls, 

3000 lin, 

3001 group_name, 

3002 storage_pool_name, 

3003 redundancy, 

3004 destroy_old_group 

3005 ): 

3006 rg_creation_attempt = 0 

3007 while True: 

3008 result = lin.resource_group_create( 

3009 name=group_name, 

3010 place_count=redundancy, 

3011 storage_pool=storage_pool_name, 

3012 diskless_on_remaining=False 

3013 ) 

3014 error_str = cls._get_error_str(result) 

3015 if not error_str: 

3016 break 

3017 

3018 errors = cls._filter_errors(result) 

3019 if destroy_old_group and cls._check_errors(errors, [ 

3020 linstor.consts.FAIL_EXISTS_RSC_GRP 

3021 ]): 

3022 rg_creation_attempt += 1 

3023 if rg_creation_attempt < 2: 

3024 try: 

3025 cls._destroy_resource_group(lin, group_name) 

3026 except Exception as e: 

3027 error_str = 'Failed to destroy old and empty RG: {}'.format(e) 

3028 else: 

3029 continue 

3030 

3031 raise LinstorVolumeManagerError( 

3032 'Could not create RG `{}`: {}'.format( 

3033 group_name, error_str 

3034 ) 

3035 ) 

3036 

3037 result = lin.volume_group_create(group_name) 

3038 error_str = cls._get_error_str(result) 

3039 if error_str: 

3040 raise LinstorVolumeManagerError( 

3041 'Could not create VG `{}`: {}'.format( 

3042 group_name, error_str 

3043 ) 

3044 ) 

3045 

3046 @classmethod 

3047 def _destroy_resource_group(cls, lin, group_name): 

3048 def destroy(): 

3049 result = lin.resource_group_delete(group_name) 

3050 errors = cls._filter_errors(result) 

3051 if cls._check_errors(errors, [ 

3052 linstor.consts.FAIL_NOT_FOUND_RSC_GRP 

3053 ]): 

3054 return 

3055 

3056 if errors: 

3057 raise LinstorVolumeManagerError( 

3058 'Failed to destroy RG `{}`: {}' 

3059 .format(group_name, cls._get_error_str(errors)) 

3060 ) 

3061 

3062 return util.retry(destroy, maxretry=10) 

3063 

3064 @classmethod 

3065 def _build_group_name(cls, base_name): 

3066 # If thin provisioning is used we have a path like this: 

3067 # `VG/LV`. "/" is not accepted by LINSTOR. 

3068 return '{}{}'.format(cls.PREFIX_SR, base_name.replace('/', '_')) 

3069 

3070 # Used to store important data in a HA context, 

3071 # i.e. a replication count of 3. 

3072 @classmethod 

3073 def _build_ha_group_name(cls, base_name): 

3074 return '{}{}'.format(cls.PREFIX_HA, base_name.replace('/', '_')) 

3075 

3076 @classmethod 

3077 def _check_volume_creation_errors(cls, result, volume_uuid, group_name): 

3078 errors = cls._filter_errors(result) 

3079 if cls._check_errors(errors, [ 

3080 linstor.consts.FAIL_EXISTS_RSC, linstor.consts.FAIL_EXISTS_RSC_DFN 

3081 ]): 

3082 raise LinstorVolumeManagerError( 

3083 'Failed to create volume `{}` from SR `{}`, it already exists' 

3084 .format(volume_uuid, group_name), 

3085 LinstorVolumeManagerError.ERR_VOLUME_EXISTS 

3086 ) 

3087 

3088 if cls._check_errors(errors, [linstor.consts.FAIL_NOT_FOUND_RSC_GRP]): 

3089 raise LinstorVolumeManagerError( 

3090 'Failed to create volume `{}` from SR `{}`, resource group doesn\'t exist' 

3091 .format(volume_uuid, group_name), 

3092 LinstorVolumeManagerError.ERR_GROUP_NOT_EXISTS 

3093 ) 

3094 

3095 if errors: 

3096 raise LinstorVolumeManagerError( 

3097 'Failed to create volume `{}` from SR `{}`: {}'.format( 

3098 volume_uuid, 

3099 group_name, 

3100 cls._get_error_str(errors) 

3101 ) 

3102 ) 

3103 

3104 @classmethod 

3105 def _move_files(cls, src_dir, dest_dir, force=False): 

3106 def listdir(dir): 

3107 ignored = ['lost+found'] 

3108 return [file for file in os.listdir(dir) if file not in ignored] 

3109 

3110 try: 

3111 if not force: 

3112 files = listdir(dest_dir) 

3113 if files: 

3114 raise LinstorVolumeManagerError( 

3115 'Cannot move files from {} to {} because destination ' 

3116 'contains: {}'.format(src_dir, dest_dir, files) 

3117 ) 

3118 except LinstorVolumeManagerError: 

3119 raise 

3120 except Exception as e: 

3121 raise LinstorVolumeManagerError( 

3122 'Cannot list dir {}: {}'.format(dest_dir, e) 

3123 ) 

3124 

3125 try: 

3126 for file in listdir(src_dir): 

3127 try: 

3128 dest_file = os.path.join(dest_dir, file) 

3129 if not force and os.path.exists(dest_file): 

3130 raise LinstorVolumeManagerError( 

3131 'Cannot move {} because it already exists in the ' 

3132 'destination'.format(file) 

3133 ) 

3134 shutil.move(os.path.join(src_dir, file), dest_file) 

3135 except LinstorVolumeManagerError: 

3136 raise 

3137 except Exception as e: 

3138 raise LinstorVolumeManagerError( 

3139 'Cannot move {}: {}'.format(file, e) 

3140 ) 

3141 except Exception as e: 

3142 if not force: 

3143 try: 

3144 cls._move_files(dest_dir, src_dir, force=True) 

3145 except Exception: 

3146 pass 

3147 

3148 raise LinstorVolumeManagerError( 

3149 'Failed to move files from {} to {}: {}'.format( 

3150 src_dir, dest_dir, e 

3151 ) 

3152 ) 

3153 

3154 @staticmethod 

3155 def _create_database_backup_path(): 

3156 path = DATABASE_PATH + '-' + str(uuid.uuid4()) 

3157 try: 

3158 os.mkdir(path) 

3159 return path 

3160 except Exception as e: 

3161 raise LinstorVolumeManagerError( 

3162 'Failed to create backup path {} of LINSTOR config: {}' 

3163 .format(path, e) 

3164 ) 

3165 

3166 @staticmethod 

3167 def _get_filtered_properties(properties): 

3168 return dict(properties.items()) 

3169 

3170 @staticmethod 

3171 def _filter_errors(result): 

3172 return [ 

3173 err for err in result 

3174 if hasattr(err, 'is_error') and err.is_error() 

3175 ] 

3176 

3177 @staticmethod 

3178 def _check_errors(result, codes): 

3179 for err in result: 

3180 for code in codes: 

3181 if err.is_error(code): 

3182 return True 

3183 return False 

3184 

3185 @classmethod 

3186 def _controller_is_running(cls): 

3187 return cls._service_is_running('linstor-controller') 

3188 

3189 @classmethod 

3190 def _start_controller(cls, start=True): 

3191 return cls._start_service('linstor-controller', start) 

3192 

3193 @staticmethod 

3194 def _start_service(name, start=True): 

3195 action = 'start' if start else 'stop' 

3196 (ret, out, err) = util.doexec([ 

3197 'systemctl', action, name 

3198 ]) 

3199 if ret != 0: 

3200 raise LinstorVolumeManagerError( 

3201 'Failed to {} {}: {} {}' 

3202 .format(action, name, out, err) 

3203 ) 

3204 

3205 @staticmethod 

3206 def _service_is_running(name): 

3207 (ret, out, err) = util.doexec([ 

3208 'systemctl', 'is-active', '--quiet', name 

3209 ]) 

3210 return not ret 

3211 

3212 @staticmethod 

3213 def _is_mounted(mountpoint): 

3214 (ret, out, err) = util.doexec(['mountpoint', '-q', mountpoint]) 

3215 return ret == 0 

3216 

3217 @classmethod 

3218 def _mount_volume(cls, volume_path, mountpoint, mount=True): 

3219 if mount: 

3220 try: 

3221 util.pread(['mount', volume_path, mountpoint]) 

3222 except Exception as e: 

3223 raise LinstorVolumeManagerError( 

3224 'Failed to mount volume {} on {}: {}' 

3225 .format(volume_path, mountpoint, e) 

3226 ) 

3227 else: 

3228 try: 

3229 if cls._is_mounted(mountpoint): 

3230 util.pread(['umount', mountpoint]) 

3231 except Exception as e: 

3232 raise LinstorVolumeManagerError( 

3233 'Failed to umount volume {} on {}: {}' 

3234 .format(volume_path, mountpoint, e) 

3235 ) 

3236 

3237 

3238# ============================================================================== 

3239 

3240# Check if a path is a DRBD resource and log the process name/pid 

3241# that opened it. 

3242def log_drbd_openers(path): 

3243 # Ignore if it's not a symlink to DRBD resource. 

3244 if not path.startswith(DRBD_BY_RES_PATH): 

3245 return 

3246 

3247 # Compute resource name. 

3248 res_name_end = path.find('/', len(DRBD_BY_RES_PATH)) 

3249 if res_name_end == -1: 

3250 return 

3251 res_name = path[len(DRBD_BY_RES_PATH):res_name_end] 

3252 

3253 volume_end = path.rfind('/') 

3254 if volume_end == res_name_end: 

3255 return 

3256 volume = path[volume_end + 1:] 

3257 

3258 try: 

3259 # Ensure path is a DRBD. 

3260 drbd_path = os.path.realpath(path) 

3261 stats = os.stat(drbd_path) 

3262 if not stat.S_ISBLK(stats.st_mode) or os.major(stats.st_rdev) != 147: 

3263 return 

3264 

3265 # Find where the device is open. 

3266 (ret, stdout, stderr) = util.doexec(['drbdadm', 'status', res_name]) 

3267 if ret != 0: 

3268 util.SMlog('Failed to execute `drbdadm status` on `{}`: {}'.format( 

3269 res_name, stderr 

3270 )) 

3271 return 

3272 

3273 # Is it a local device? 

3274 if stdout.startswith('{} role:Primary'.format(res_name)): 

3275 util.SMlog( 

3276 'DRBD resource `{}` is open on local host: {}' 

3277 .format(path, get_local_volume_openers(res_name, volume)) 

3278 ) 

3279 return 

3280 

3281 # Is it a remote device? 

3282 util.SMlog( 

3283 'DRBD resource `{}` is open on hosts: {}' 

3284 .format(path, get_all_volume_openers(res_name, volume)) 

3285 ) 

3286 except Exception as e: 

3287 util.SMlog( 

3288 'Got exception while trying to determine where DRBD resource ' + 

3289 '`{}` is open: {}'.format(path, e) 

3290 )