cloudbase-init/cloudbaseinit/utils/windows/disk.py

406 lines
12 KiB
Python

# Copyright 2012 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import ctypes
from ctypes import windll
from ctypes import wintypes
import random
import re
import six
import winioctlcon
from cloudbaseinit import exception
kernel32 = windll.kernel32
rpcrt4 = windll.rpcrt4
class Win32_DiskGeometry(ctypes.Structure):
FixedMedia = 12
_fields_ = [
('Cylinders', wintypes.LARGE_INTEGER),
('MediaType', wintypes.DWORD),
('TracksPerCylinder', wintypes.DWORD),
('SectorsPerTrack', wintypes.DWORD),
('BytesPerSector', wintypes.DWORD)
]
class Win32_DRIVE_LAYOUT_INFORMATION_MBR(ctypes.Structure):
_fields_ = [
('Signature', wintypes.ULONG)
]
class GUID(ctypes.Structure):
_fields_ = [
("data1", wintypes.DWORD),
("data2", wintypes.WORD),
("data3", wintypes.WORD),
("data4", wintypes.BYTE * 8)
]
def __init__(self, dw=0, w1=0, w2=0, b1=0, b2=0, b3=0, b4=0, b5=0, b6=0,
b7=0, b8=0):
self.data1 = dw
self.data2 = w1
self.data3 = w2
self.data4[0] = b1
self.data4[1] = b2
self.data4[2] = b3
self.data4[3] = b4
self.data4[4] = b5
self.data4[5] = b6
self.data4[6] = b7
self.data4[7] = b8
class Win32_DRIVE_LAYOUT_INFORMATION_GPT(ctypes.Structure):
_fields_ = [
('DiskId', GUID),
('StartingUsableOffset', wintypes.LARGE_INTEGER),
('UsableLength', wintypes.LARGE_INTEGER),
('MaxPartitionCount', wintypes.ULONG)
]
class DRIVE_FORMAT(ctypes.Union):
_fields_ = [
('Mbr', Win32_DRIVE_LAYOUT_INFORMATION_MBR),
('Gpt', Win32_DRIVE_LAYOUT_INFORMATION_GPT)
]
class Win32_PARTITION_INFORMATION_MBR(ctypes.Structure):
_fields_ = [
('PartitionType', wintypes.BYTE),
('BootIndicator', wintypes.BOOLEAN),
('RecognizedPartition', wintypes.BOOLEAN),
('HiddenSectors', wintypes.DWORD)
]
class Win32_PARTITION_INFORMATION_GPT(ctypes.Structure):
_fields_ = [
('PartitionType', GUID),
('PartitionId', GUID),
('Attributes', wintypes.ULARGE_INTEGER),
('Name', wintypes.WCHAR * 36)
]
class PARTITION_INFORMATION(ctypes.Union):
_fields_ = [
('Mbr', Win32_PARTITION_INFORMATION_MBR),
('Gpt', Win32_PARTITION_INFORMATION_GPT)
]
class Win32_PARTITION_INFORMATION_EX(ctypes.Structure):
_anonymous_ = ('PartitionInformation',)
_fields_ = [
('PartitionStyle', wintypes.DWORD),
('StartingOffset', wintypes.LARGE_INTEGER),
('PartitionLength', wintypes.LARGE_INTEGER),
('PartitionNumber', wintypes.DWORD),
('RewritePartition', wintypes.BOOLEAN),
('PartitionInformation', PARTITION_INFORMATION)
]
class Win32_DRIVE_LAYOUT_INFORMATION_EX(ctypes.Structure):
_anonymous_ = ('DriveFormat',)
_fields_ = [
('PartitionStyle', wintypes.DWORD),
('PartitionCount', wintypes.DWORD),
('DriveFormat', DRIVE_FORMAT),
('PartitionEntry', Win32_PARTITION_INFORMATION_EX * 128)
]
@six.add_metaclass(abc.ABCMeta)
class BaseDevice(object):
"""Base class for devices like disks and partitions.
It has common methods for getting physical disk geometry,
opening/closing the device and also seeking through it
for reading certain amounts of bytes.
"""
GENERIC_READ = 0x80000000
GENERIC_WRITE = 0x40000000
FILE_SHARE_READ = 1
FILE_SHARE_WRITE = 2
OPEN_EXISTING = 3
FILE_ATTRIBUTE_READONLY = 1
INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
FILE_BEGIN = 0
INVALID_SET_FILE_POINTER = 0xFFFFFFFF
def __init__(self, path, allow_write=False):
self._path = path
self._handle = None
self._sector_size = None
self._disk_size = None
self._allow_write = allow_write
self.fixed = None
def __repr__(self):
return "<{}: {}>".format(self.__class__.__name__, self._path)
def __enter__(self):
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def _get_geometry(self):
"""Get details about the disk size bounds."""
geom = Win32_DiskGeometry()
bytes_returned = wintypes.DWORD()
ret_val = kernel32.DeviceIoControl(
self._handle,
winioctlcon.IOCTL_DISK_GET_DRIVE_GEOMETRY,
0,
0,
ctypes.byref(geom),
ctypes.sizeof(geom),
ctypes.byref(bytes_returned),
0)
if not ret_val:
raise exception.WindowsCloudbaseInitException(
"Cannot get disk geometry: %r")
_sector_size = geom.BytesPerSector
_disk_size = (geom.Cylinders * geom.TracksPerCylinder *
geom.SectorsPerTrack * geom.BytesPerSector)
fixed = geom.MediaType == Win32_DiskGeometry.FixedMedia
return _sector_size, _disk_size, fixed
def _seek(self, offset):
high = wintypes.DWORD(offset >> 32)
low = wintypes.DWORD(offset & 0xFFFFFFFF)
ret_val = kernel32.SetFilePointer(self._handle, low,
ctypes.byref(high),
self.FILE_BEGIN)
if ret_val == self.INVALID_SET_FILE_POINTER:
raise exception.WindowsCloudbaseInitException(
"Seek error: %r")
def _read(self, size):
buff = ctypes.create_string_buffer(size)
bytes_read = wintypes.DWORD()
ret_val = kernel32.ReadFile(self._handle, buff, size,
ctypes.byref(bytes_read), 0)
if not ret_val:
raise exception.WindowsCloudbaseInitException(
"Read exception: %r")
return buff.raw[:bytes_read.value] # all bytes without the null byte
def open(self):
access = self.GENERIC_READ
share_mode = self.FILE_SHARE_READ
if self._allow_write:
access |= self.GENERIC_WRITE
share_mode |= self.FILE_SHARE_WRITE
attributes = 0
else:
attributes = self.FILE_ATTRIBUTE_READONLY
handle = kernel32.CreateFileW(
ctypes.c_wchar_p(self._path),
access,
share_mode,
0,
self.OPEN_EXISTING,
attributes,
0)
if handle == self.INVALID_HANDLE_VALUE:
raise exception.WindowsCloudbaseInitException(
'Cannot open file: %r')
self._handle = handle
self._sector_size, self._disk_size, self.fixed =\
self._get_geometry()
def close(self):
if self._handle:
kernel32.CloseHandle(self._handle)
self._handle = None
def seek(self, offset):
"""Drive geometry safe seek.
Seek for a given offset and return the valid set one.
"""
safe_offset = int(offset / self._sector_size) * self._sector_size
self._seek(safe_offset)
return safe_offset
def read(self, size, skip=0):
"""Drive geometry safe read.
Read and extract exactly the requested content.
"""
# Compute a size to fit both of the bytes we need to skip and
# also the minimum read size.
total = size + skip
safe_size = ((int(total / self._sector_size) +
bool(total % self._sector_size)) * self._sector_size)
content = self._read(safe_size)
return content[skip:total]
@abc.abstractmethod
def size(self):
"""Returns the size in bytes of the actual opened device."""
class Disk(BaseDevice):
"""Disk class with seek/read support.
It also has the capability of obtaining partition objects.
"""
PARTITION_ENTRY_UNUSED = 0
PARTITION_STYLE_MBR = 0
PARTITION_STYLE_GPT = 1
def _get_layout(self):
layout = Win32_DRIVE_LAYOUT_INFORMATION_EX()
bytes_returned = wintypes.DWORD()
ret_val = kernel32.DeviceIoControl(
self._handle,
winioctlcon.IOCTL_DISK_GET_DRIVE_LAYOUT_EX,
0,
0,
ctypes.byref(layout),
ctypes.sizeof(layout),
ctypes.byref(bytes_returned),
0)
if not ret_val:
raise exception.WindowsCloudbaseInitException(
"Cannot get disk layout: %r")
return layout
@staticmethod
def _create_guid():
guid = GUID()
ret_val = rpcrt4.UuidCreate(ctypes.byref(guid))
if ret_val:
raise exception.CloudbaseInitException(
"UuidCreate failed: %r" % ret_val)
return guid
def set_unique_id(self, unique_id=None):
layout = self._get_layout()
if layout.PartitionStyle == self.PARTITION_STYLE_MBR:
if not unique_id:
unique_id = random.randint(-2147483648, 2147483647)
layout.Mbr.Signature = unique_id
elif layout.PartitionStyle == self.PARTITION_STYLE_GPT:
if not unique_id:
unique_id = self._create_guid()
layout.Gpt.DiskId = unique_id
else:
raise exception.InvalidStateException(
"A unique id can be set on MBR or GPT partitions only")
bytes_returned = wintypes.DWORD()
ret_val = kernel32.DeviceIoControl(
self._handle, winioctlcon.IOCTL_DISK_SET_DRIVE_LAYOUT_EX,
ctypes.byref(layout), ctypes.sizeof(layout), 0, 0,
ctypes.byref(bytes_returned), 0)
if not ret_val:
raise exception.WindowsCloudbaseInitException(
"Cannot set disk layout: %r")
ret_val = kernel32.DeviceIoControl(
self._handle, winioctlcon.IOCTL_DISK_UPDATE_PROPERTIES, 0, 0, 0, 0,
ctypes.byref(bytes_returned), 0)
if not ret_val:
raise exception.WindowsCloudbaseInitException(
"Cannot update cached disk properties: %r")
def _get_partition_indexes(self, layout):
partition_style = layout.PartitionStyle
if partition_style not in (self.PARTITION_STYLE_MBR,
self.PARTITION_STYLE_GPT):
raise exception.CloudbaseInitException(
"Invalid partition style %r" % partition_style)
# If is GPT, then the count reflects the actual number of partitions
# but if is MBR, then the number of partitions is a multiple of 4
# and just the indexes for the used partitions must be saved.
partition_indexes = []
if partition_style == self.PARTITION_STYLE_GPT:
partition_indexes.extend(range(layout.PartitionCount))
else:
for idx in range(layout.PartitionCount):
if (layout.PartitionEntry[idx].Mbr.PartitionType !=
self.PARTITION_ENTRY_UNUSED):
partition_indexes.append(idx)
return partition_indexes
def partitions(self):
"""Return a list of partition objects available on disk."""
layout = self._get_layout()
partition_indexes = self._get_partition_indexes(layout)
# Create and return the partition objects containing their sizes.
partitions = []
disk_index = re.search(r"(disk|drive)(\d+)", self._path,
re.I | re.M).group(2)
for partition_index in partition_indexes:
path = r'\\?\GLOBALROOT\Device\Harddisk{}\Partition{}'.format(
disk_index, partition_index + 1)
size = layout.PartitionEntry[partition_index].PartitionLength
partition = Partition(path, size)
partitions.append(partition)
return partitions
@property
def size(self):
return self._disk_size
class Partition(BaseDevice):
"""Partition class with seek/read support."""
def __init__(self, path, size):
super(Partition, self).__init__(path)
self._partition_size = size
@property
def size(self):
return self._partition_size