@@ -16,9 +16,11 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from typing import List
+import string
SECTOR_SIZE = 512
DIRENTRY_SIZE = 32
+ALLOWED_FILE_CHARS = set("!#$%&'()-@^_`{}~" + string.digits + string.ascii_uppercase)
class MBR:
@@ -265,7 +267,7 @@ def write_fat_entry(self, cluster: int, value: int):
+ self.fats[fat_offset + 2 :]
)
self.fats_dirty_sectors.add(fat_offset // SECTOR_SIZE)
-
+
def flush_fats(self):
"""
Write the FATs back to the disk.
@@ -293,7 +295,7 @@ def next_cluster(self, cluster: int) -> int | None:
raise Exception("Invalid FAT entry")
else:
return fat_entry
-
+
def next_free_cluster(self) -> int:
"""
Find the next free cluster.
@@ -338,6 +340,67 @@ def read_directory(self, cluster: int) -> List[FatDirectoryEntry]:
cluster = self.next_cluster(cluster)
return entries
+ def add_direntry(self, cluster: int | None, name: str, ext: str, attributes: int):
+ """
+ Add a new directory entry to the given cluster.
+ If the cluster is `None`, then it will be added to the root directory.
+ """
+
+ def find_free_entry(data: bytes):
+ for i in range(0, len(data), DIRENTRY_SIZE):
+ entry = data[i : i + DIRENTRY_SIZE]
+ if entry[0] == 0 or entry[0] == 0xE5:
+ return i
+ return None
+
+ assert len(name) <= 8, "Name must be 8 characters or less"
+ assert len(ext) <= 3, "Ext must be 3 characters or less"
+ assert attributes % 0x15 != 0x15, "Invalid attributes"
+
+ # initial dummy data
+ new_entry = FatDirectoryEntry(b"\0" * 32, 0, 0)
+ new_entry.name = name.ljust(8, " ")
+ new_entry.ext = ext.ljust(3, " ")
+ new_entry.attributes = attributes
+ new_entry.reserved = 0
+ new_entry.create_time_tenth = 0
+ new_entry.create_time = 0
+ new_entry.create_date = 0
+ new_entry.last_access_date = 0
+ new_entry.last_mod_time = 0
+ new_entry.last_mod_date = 0
+ new_entry.cluster = self.next_free_cluster()
+ new_entry.size_bytes = 0
+
+ # mark as EOF
+ self.write_fat_entry(new_entry.cluster, 0xFFFF)
+
+ if cluster is None:
+ for i in range(self.boot_sector.root_dir_size()):
+ sector_data = self.read_sectors(
+ self.boot_sector.root_dir_start() + i, 1
+ )
+ offset = find_free_entry(sector_data)
+ if offset is not None:
+ new_entry.sector = self.boot_sector.root_dir_start() + i
+ new_entry.offset = offset
+ self.update_direntry(new_entry)
+ return new_entry
+ else:
+ while cluster is not None:
+ data = self.read_cluster(cluster)
+ offset = find_free_entry(data)
+ if offset is not None:
+ new_entry.sector = self.boot_sector.first_sector_of_cluster(
+ cluster
+ ) + (offset // SECTOR_SIZE)
+ new_entry.offset = offset % SECTOR_SIZE
+ self.update_direntry(new_entry)
+ return new_entry
+ cluster = self.next_cluster(cluster)
+
+ raise Exception("No free directory entries")
+
def update_direntry(self, entry: FatDirectoryEntry):
"""
Write the directory entry back to the disk.
@@ -406,9 +469,10 @@ def truncate_file(self, entry: FatDirectoryEntry, new_size: int):
raise Exception(f"{entry.whole_name()} is a directory")
def clusters_from_size(size: int):
- return (size + self.boot_sector.cluster_bytes() - 1) // self.boot_sector.cluster_bytes()
+ return (
+ size + self.boot_sector.cluster_bytes() - 1
+ ) // self.boot_sector.cluster_bytes()
-
# First, allocate new FATs if we need to
required_clusters = clusters_from_size(new_size)
current_clusters = clusters_from_size(entry.size_bytes)
@@ -438,7 +502,7 @@ def clusters_from_size(size: int):
self.write_fat_entry(cluster, new_cluster)
self.write_fat_entry(new_cluster, 0xFFFF)
cluster = new_cluster
-
+
elif required_clusters < current_clusters:
# Truncate the file
cluster = entry.cluster
@@ -464,7 +528,9 @@ def clusters_from_size(size: int):
count += 1
affected_clusters.add(cluster)
cluster = self.next_cluster(cluster)
- assert count == required_clusters, f"Expected {required_clusters} clusters, got {count}"
+ assert (
+ count == required_clusters
+ ), f"Expected {required_clusters} clusters, got {count}"
# update the size
entry.size_bytes = new_size
@@ -505,3 +571,49 @@ def write_file(self, entry: FatDirectoryEntry, data: bytes):
cluster = self.next_cluster(cluster)
assert len(data) == 0, "Data was not written completely, clusters missing"
+
+ def create_file(self, path: str):
+ """
+ Create a new file at the given path.
+ """
+ assert path[0] == "/", "Path must start with /"
+
+ path = path[1:] # remove the leading /
+
+ parts = path.split("/")
+
+ directory_cluster = None
+ directory = self.read_root_directory()
+
+ parts, filename = parts[:-1], parts[-1]
+
+ for i, part in enumerate(parts):
+ current_entry = None
+ for entry in directory:
+ if entry.whole_name() == part:
+ current_entry = entry
+ break
+ if current_entry is None:
+ return None
+
+ if current_entry.attributes & 0x10 == 0:
+ raise Exception(f"{current_entry.whole_name()} is not a directory")
+ else:
+ directory = self.read_directory(current_entry.cluster)
+ directory_cluster = current_entry.cluster
+
+ # add new entry to the directory
+
+ filename, ext = filename.split(".")
+
+ if len(ext) > 3:
+ raise Exception("Ext must be 3 characters or less")
+ if len(filename) > 8:
+ raise Exception("Name must be 8 characters or less")
+
+ for c in filename + ext:
+
+ if c not in ALLOWED_FILE_CHARS:
+ raise Exception("Invalid character in filename")
+
+ return self.add_direntry(directory_cluster, filename, ext, 0)
@@ -323,7 +323,7 @@ class TestVVFatDriver(QMPTestCase):
with open(os.path.join(filesystem, "file0.txt"), "rb") as f:
self.assertEqual(f.read(), new_content)
-
+
def test_write_large_file(self):
"""
Test writing a large file
@@ -342,7 +342,7 @@ class TestVVFatDriver(QMPTestCase):
with open(os.path.join(filesystem, "large1.txt"), "rb") as f:
self.assertEqual(f.read(), new_content)
-
+
def test_truncate_file_change_clusters_less(self):
"""
Test truncating a file by reducing the number of clusters
@@ -359,7 +359,6 @@ class TestVVFatDriver(QMPTestCase):
with open(os.path.join(filesystem, "large1.txt"), "rb") as f:
self.assertEqual(f.read(), b"A")
-
def test_write_file_change_clusters_less(self):
"""
Test truncating a file by reducing the number of clusters
@@ -376,7 +375,7 @@ class TestVVFatDriver(QMPTestCase):
with open(os.path.join(filesystem, "large2.txt"), "rb") as f:
self.assertEqual(f.read(), new_content)
-
+
def test_write_file_change_clusters_more(self):
"""
Test truncating a file by increasing the number of clusters
@@ -392,7 +391,27 @@ class TestVVFatDriver(QMPTestCase):
with open(os.path.join(filesystem, "large2.txt"), "rb") as f:
self.assertEqual(f.read(), new_content)
-
+
+ def test_create_file(self):
+ """
+ Test creating a new file
+ """
+ fat16 = self.init_fat16()
+
+ new_file = fat16.create_file("/NEWFILE.TXT")
+
+ self.assertIsNotNone(new_file)
+ self.assertEqual(new_file.size_bytes, 0)
+
+ new_content = b"Hello, world! New file\n"
+ fat16.write_file(new_file, new_content)
+
+ self.assertEqual(fat16.read_file(new_file), new_content)
+
+ with open(os.path.join(filesystem, "newfile.txt"), "rb") as f:
+ self.assertEqual(f.read(), new_content)
+
+ # TODO: support deleting files
if __name__ == "__main__":
@@ -1,5 +1,5 @@
-.............
+..............
----------------------------------------------------------------------
-Ran 13 tests
+Ran 14 tests
OK
We test the ability to create new files in the filesystem, this is done by adding an entry in the desired directory list. The file will also be created in the host filesystem with matching filename. Signed-off-by: Amjad Alsharafi <amjadsharafi10@gmail.com> --- tests/qemu-iotests/fat16.py | 124 +++++++++++++++++++++++++++-- tests/qemu-iotests/tests/vvfat | 29 +++++-- tests/qemu-iotests/tests/vvfat.out | 4 +- 3 files changed, 144 insertions(+), 13 deletions(-)