The first edition of this code was written back when Python 2 had not yet been deprecated in favor of Python 3. The difference between strings and bytes were not very clear back then. Also, PEP8 violations abound in the original version.
'''Module for Directory and File Serialization.
This module provides two classes that implement the
DFS (Directory and File Serialization) file format.'''
__version__ = '1.0'
import os as _os
import sys as _sys
################################################################################
FORMAT_DOCUMENTATION = '''\
Directory
Header
0,aaa,b,c,dd
0 = Directory
a = Pointer Size (bytes)
b = Name Size (bytes)
c = Content Flag
d = Type Code
00 = End
01 = Error
10 = Error
11 = Real
Pointer
Name Size
Name
File
Header
1,aaa,b,ccc
1 = File
a = Pointer Size (bytes)
b = Name Size (bytes)
c = Data Size (bytes)
Pointer
Name Size
Name
Data Size
Data'''
################################################################################
class Acquire:
'Acquire(destination) -> Acquire'
BUFF_SIZE = 2 ** 20
def __init__(self, destination):
'Initialize the Acquire object.'
self.__destination = destination
self.__destination_path = _os.path.abspath(destination.name) if hasattr(destination, 'name') else None
self.__archive = False
def acquire(self, source):
'Save source to destination.'
source = _os.path.abspath(source)
self.__pointer = 0
if self.__archive:
self.__destination.write('\0')
else:
self.__archive = True
if _os.path.isdir(source):
self.__dir(source, '\0')
elif _os.path.isfile(source):
if source == self.__destination_path:
raise ValueError, 'Source cannot be destination.'
self.__file(source, '\0')
else:
raise ValueError, 'Source must be directory or file.'
def __dir(self, source, pointer):
'Private class method.'
name = _os.path.basename(source)
name_size = _str(len(name))
try:
dir_list = _os.listdir(source)
except:
dir_list = ()
if dir_list:
self.__pointer += 1
self.__destination.write(chr((len(pointer) - 1 << 4) + (len(name_size) - 1 << 3) + 7) + pointer + name_size + name)
else:
self.__destination.write(chr((len(pointer) - 1 << 4) + (len(name_size) - 1 << 3) + 3) + pointer + name_size + name)
pointer = _str(self.__pointer)
for name in dir_list:
source_name = _os.path.join(source, name)
if source_name == self.__destination_path:
continue
elif _os.path.isdir(source_name):
self.__dir(source_name, pointer)
elif _os.path.isfile(source_name):
self.__file(source_name, pointer)
def __file(self, source, pointer):
'Private class method.'
name = _os.path.basename(source)
name_size = _str(len(name))
try:
source = open(source, 'rb')
source.seek(0, 2)
data_size = _str(source.tell())
source.seek(0, 0)
except:
source = None
data_size = '\0'
self.__destination.write(chr(127 + (len(pointer) - 1 << 4) + (len(name_size) - 1 << 3) + len(data_size)) + pointer + name_size + name + data_size)
if source:
data_size = _int(data_size)
try:
while data_size:
buff = source.read(min(self.BUFF_SIZE, data_size))
if buff:
self.__destination.write(buff)
data_size -= len(buff)
else:
raise IOError, 'File changed size while open.'
position = source.tell()
source.seek(0, 2)
if position != source.tell():
raise IOError, 'File changed size while open.'
finally:
source.close()
################################################################################
class Release:
'Release(source) -> Release'
BUFF_SIZE = 2 ** 20
def __init__(self, source):
'Initialize the Release object.'
self.__source = source
self.__EOF = False
def release(self, destination):
'Save source to destination.'
if self.__EOF:
raise EOFError, 'End Of File Found'
self.__parents = [_os.path.abspath(destination)]
header = self.__source.read(1)
header = ord(header) if header else -1
if header == -1:
self.__EOF = True
raise Warning, 'Irregular File Termination Detected'
while header != -1 and (header > 127 or header & 3):
if header < 128:
if header & 3 != 3:
raise IOError, 'Corrupt Directory Header Found'
self.__dir(header)
else:
self.__file(header)
header = self.__source.read(1)
header = ord(header) if header else -1
if header == -1:
self.__EOF = True
def EOF(self):
'Return the End Of File status.'
return self.__EOF
def __dir(self, header):
'Private class method.'
path = _os.path.join(self.__parents[_int(self.__read((header >> 4 & 7) + 1))], self.__read(_int(self.__read((header >> 3 & 1) + 1))))
_os.mkdir(path)
if header >> 2 & 1:
self.__parents.append(path)
def __file(self, header):
'Private class method.'
destination = open(_os.path.join(self.__parents[_int(self.__read((header >> 4 & 7) + 1))], self.__read(_int(self.__read((header >> 3 & 1) + 1)))), 'wb')
data_size = _int(self.__read((header & 7) + 1))
try:
while data_size:
buff = self.__source.read(min(self.BUFF_SIZE, data_size))
if buff:
destination.write(buff)
data_size -= len(buff)
else:
raise IOError, 'End Of File Found'
finally:
destination.close()
def __read(self, size):
'Private class method.'
if size:
buff = ''
while size:
temp = self.__source.read(size)
if temp:
buff += temp
size -= len(temp)
else:
raise IOError, 'End Of File Found'
return buff
raise IOError, 'Zero Length String Found'
################################################################################
def _str(integer):
'Private module function.'
if integer:
string = ''
while integer:
string = chr(integer & 0xFF) + string
integer >>= 8
return string
return '\0'
def _int(string):
'Private module function.'
integer = 0
for c in string:
integer <<= 8
integer += ord(c)
return integer
################################################################################
if __name__ == '__main__':
_sys.stdout.write('Content-Type: text/plain\n\n')
_sys.stdout.write(file(_sys.argv[0]).read())
The code has since then been rewritten for Python 3 and attempts to take advantage of its many changes. There is clarity between what should be bytes and what should be strings, and no PEP8 problems should be found in the new version.
#! /usr/bin/env python3
"""Provide a simple directory and file serialization protocol.
This module implements two classes that can handle the DFS (Directory &
File Serialization) file format. Both classes can deal with file-like
objects and stream directories and files to and from the file system."""
# Import other modules needed for this module to work properly.
import abc
import collections
import enum
import io
import pathlib
# Include supplemental information along with a public API definition.
__author__ = 'Stephen "Zero" Chappell <[email protected]>'
__date__ = '9 February 2017'
__version__ = 3, 0, 0
__all__ = 'Serializer', 'Deserializer'
# The organization of the serialized data is fairly simple as shown below.
SERIALIZATION_FORMAT = '''\
Directory
Header
0,aaa,b,c,dd (Bit Mapping)
0 = Directory
a = Pointer Length
b = Name Size Length
c = Content Flag
d = Type Code
00 = Separator
01 = Reserved
10 = Reserved
11 = Genuine
Pointer to Parent
Name Size
Name
---------------------------------
File
Header
1,aaa,b,ccc (Bit Mapping)
1 = File
a = Pointer Length
b = Name Size Length
c = Data Size Length
Pointer to Parent
Name Size
Name
Data Size
Data
'''
@enum.unique
class _RecordType(enum.IntEnum):
"""Enumeration of the different types a record may represent."""
DIRECTORY = 0b0
FILE = 0b1
@enum.unique
class _DirectoryTypeCode(enum.IntEnum):
"""Enumeration of codes directories may specify for their type."""
SEPARATOR = 0b00
RESERVED_A = 0b01
RESERVED_B = 0b10
GENUINE = 0b11
# Define the necessary components used to describe a bit field.
_BitField = collections.namedtuple('_BitField', 'offset, width')
class _Common(abc.ABC):
"""Abstract class for supporting Serializer and Deserializer classes."""
# Define a few static attributes for use in derived classes.
BUFFER_SIZE = 1 << 20
BYTE_WIDTH = 8
BYTE_MASK = (1 << BYTE_WIDTH) - 1
NAME_ENCODING = 'utf_8' # Set to 'mbcs' for Archive 2.0 compatibility.
NULL_BYTE = b'\0'
# Define the bit fields used in header bytes.
RECORD_TYPE = _BitField(7, 1)
POINTER_LENGTH = _BitField(4, 3)
NAME_SIZE_LENGTH = _BitField(3, 1)
CONTENT_FLAG = _BitField(2, 1)
DIRECTORY_TYPE_CODE = _BitField(0, 2)
FILE_DATA_SIZE_LENGTH = _BitField(0, 3)
@abc.abstractmethod
def __init__(self, stream):
"""Initialize the _Common instance's attributes."""
self._stream = stream
self._header = None
@classmethod
def _int_to_bytes(cls, integer):
"""Convert a number into a byte string of variable length."""
if integer:
array = bytearray()
while integer:
array.insert(0, integer & cls.BYTE_MASK)
integer >>= cls.BYTE_WIDTH
return bytes(array)
return cls.NULL_BYTE
@classmethod
def _bytes_to_int(cls, array):
"""Convert a byte string of variable length into a number."""
integer = 0
for byte in array:
integer <<= cls.BYTE_WIDTH
integer |= byte
return integer
@staticmethod
def _write(file, buffer):
"""Write buffer to file until it is completely written."""
while True:
written = file.write(buffer)
if written is None:
raise IOError('nothing could be written to the file')
if written == len(buffer):
break
buffer = buffer[written:]
class Serializer(_Common):
"""Serializer(destination) -> Serializer instance"""
def __init__(self, destination):
"""Initialize the Serializer instance's attributes."""
super().__init__(destination)
self._started = False
self._pointer = None
def run(self, source, keep_zombies=True):
"""Dump the source file or directory contents onto the destination."""
path = pathlib.Path(source).resolve()
zombies = []
if path.is_dir():
self._prime_run()
self._acquire_dir(path, self.NULL_BYTE, keep_zombies, zombies)
elif path.is_file():
self._prime_run()
self._acquire_file(path, self.NULL_BYTE, keep_zombies, zombies)
else:
raise ValueError('source must be a dir or a file')
return zombies
def _prime_run(self):
"""Reset some attributes before a serialization run."""
self._pointer = 0
if self._started:
self._write(self._stream, self.NULL_BYTE)
else:
self._started = True
def _acquire_dir(self, source, parent, keep_zombies, zombies):
"""Serialize a directory."""
try:
paths = tuple(source.iterdir())
except OSError:
zombies.append(source)
if not keep_zombies:
return
paths = ()
self._write_complete_dir_header(source, parent, bool(paths))
if paths:
self._pointer += 1
parent = self._int_to_bytes(self._pointer)
for path in paths:
if path.is_dir():
self._acquire_dir(path, parent, keep_zombies, zombies)
elif path.is_file():
self._acquire_file(path, parent, keep_zombies, zombies)
def _write_complete_dir_header(self, source, parent, content):
"""Record all directory information except its contents."""
name = source.name.encode(self.NAME_ENCODING)
name_size = self._int_to_bytes(len(name))
self._write_dir_header_byte(parent, name_size, content)
self._write(self._stream, parent)
self._write(self._stream, name_size)
self._write(self._stream, name)
def _write_dir_header_byte(self, pointer, name_size, content):
"""Record the directory header byte using the correct format."""
self._header = 0
self._set_bits(_RecordType.DIRECTORY, self.RECORD_TYPE)
self._set_bits(len(pointer) - 1, self.POINTER_LENGTH)
self._set_bits(len(name_size) - 1, self.NAME_SIZE_LENGTH)
self._set_bits(content, self.CONTENT_FLAG)
self._set_bits(_DirectoryTypeCode.GENUINE, self.DIRECTORY_TYPE_CODE)
self._write(self._stream, bytes([self._header]))
def _set_bits(self, integer, bit_field):
"""Help build the header byte while checking certain arguments."""
if not 0 <= integer < 1 << bit_field.width:
raise ValueError('integer does not fit in width numbers of bits')
self._header |= integer << bit_field.offset
def _acquire_file(self, source, parent, keep_zombies, zombies):
"""Serialize a file."""
restore_point = self._stream.tell()
try:
with source.open('rb') as file:
file_length = file.seek(0, io.SEEK_END)
self._write_complete_file_header(source, parent, file_length)
future_data = file.seek(0, io.SEEK_END)
if future_data != file_length:
raise OSError('source changed size after writing header')
file.seek(0, io.SEEK_SET)
while future_data:
buffer = file.read(min(future_data, self.BUFFER_SIZE))
if not buffer:
raise OSError('source file ended with remaining data')
self._write(self._stream, buffer)
future_data -= len(buffer)
if file.seek(0, io.SEEK_END) != file_length:
raise OSError('file changed size during serialization')
except OSError:
self._stream.seek(restore_point, io.SEEK_SET)
self._stream.truncate()
zombies.append(source)
if keep_zombies:
self._write_complete_file_header(source, parent, 0)
def _write_complete_file_header(self, source, parent, file_length):
"""Record all file information except its data."""
name = source.name.encode(self.NAME_ENCODING)
name_size = self._int_to_bytes(len(name))
data_size = self._int_to_bytes(file_length)
self._write_file_header_byte(parent, name_size, data_size)
self._write(self._stream, parent)
self._write(self._stream, name_size)
self._write(self._stream, name)
self._write(self._stream, data_size)
def _write_file_header_byte(self, pointer, name_size, data_size):
"""Record the file header byte using the correct format."""
self._header = 0
self._set_bits(_RecordType.FILE, self.RECORD_TYPE)
self._set_bits(len(pointer) - 1, self.POINTER_LENGTH)
self._set_bits(len(name_size) - 1, self.NAME_SIZE_LENGTH)
self._set_bits(len(data_size) - 1, self.FILE_DATA_SIZE_LENGTH)
self._write(self._stream, bytes([self._header]))
class Deserializer(_Common):
"""Deserializer(source) -> Deserializer instance"""
def __init__(self, source):
"""Initialize the Deserializer instance's attributes."""
super().__init__(source)
self._finished = False
self._parents = None
@property
def finished(self):
"""Check if the object has reached the end of the file yet."""
return self._finished
def run(self, destination):
"""Load the source file-like object onto the destination directory."""
if self._finished:
raise EOFError('end of file was found')
self._parents = [pathlib.Path(destination).resolve()]
starting_run = True
while True:
byte = self._stream.read(1)
if not byte:
self._finished = True
if starting_run:
raise IOError('unexpected file termination detected')
break
self._header = byte[0]
if self._get_bits(self.RECORD_TYPE) == _RecordType.FILE:
self._release_file()
else:
type_code = self._get_bits(self.DIRECTORY_TYPE_CODE)
if type_code == _DirectoryTypeCode.GENUINE:
self._release_dir()
elif type_code == _DirectoryTypeCode.SEPARATOR:
if starting_run:
raise IOError('empty record detected')
break
else:
raise IOError('reserved directory type code detected')
starting_run = False
def _get_bits(self, bit_field):
"""Extract width number of bits from header starting at offset."""
return self._header >> bit_field.offset & (1 << bit_field.width) - 1
def _release_dir(self):
"""Deserialize a directory."""
pointer_length = self._get_bits(self.POINTER_LENGTH) + 1
name_size_length = self._get_bits(self.NAME_SIZE_LENGTH) + 1
content_flag = bool(self._get_bits(self.CONTENT_FLAG))
# After decoding the header byte, read and process the remaining data.
pointer = self._bytes_to_int(self._read(pointer_length))
name_size = self._bytes_to_int(self._read(name_size_length))
name = self._read(name_size).decode(self.NAME_ENCODING)
path = self._parents[pointer] / name
path.mkdir()
if content_flag:
self._parents.append(path)
def _release_file(self):
"""Deserialize a file."""
pointer_length = self._get_bits(self.POINTER_LENGTH) + 1
name_size_length = self._get_bits(self.NAME_SIZE_LENGTH) + 1
data_size_length = self._get_bits(self.FILE_DATA_SIZE_LENGTH) + 1
# After decoding the header byte, read and process the remaining data.
pointer = self._bytes_to_int(self._read(pointer_length))
name_size = self._bytes_to_int(self._read(name_size_length))
name = self._read(name_size).decode(self.NAME_ENCODING)
with (self._parents[pointer] / name).open('wb') as destination:
future_data = self._bytes_to_int(self._read(data_size_length))
while future_data:
buffer = self._stream.read(min(future_data, self.BUFFER_SIZE))
if not buffer:
raise IOError('end of file was found')
self._write(destination, buffer)
future_data -= len(buffer)
def _read(self, future_data):
"""Read at least as many bytes from the source as requested."""
if future_data:
buffer = bytearray()
while future_data:
data = self._stream.read(future_data)
if not data:
raise IOError('end of file was found')
buffer.extend(data)
future_data -= len(data)
return buffer
raise IOError('request for zero bytes found')
Please note that EOFError should only be used when the end-of-file has been reached in an expected manner. Otherwise, most errors should be considered to be IOError in nature. Can you identify any areas to improve the second rewrite?