From 9ab782dcc935a92a305c6e5c24e396ebd65342ad Mon Sep 17 00:00:00 2001 From: vangef Date: Fri, 10 Mar 2023 12:50:14 +0000 Subject: [PATCH] added inspect_gradebook & code restructure for 'inspect by hash' feature --- inspect_gradebook.py | 16 ++++++++ inspect_submissions.py | 13 +++---- utils/inspector.py | 86 ++++++++++++++++++++++++++++++------------ 3 files changed, 83 insertions(+), 32 deletions(-) create mode 100644 inspect_gradebook.py diff --git a/inspect_gradebook.py b/inspect_gradebook.py new file mode 100644 index 0000000..d1b4a07 --- /dev/null +++ b/inspect_gradebook.py @@ -0,0 +1,16 @@ +import os, sys +from utils.inspector import generate_hashes_gradebook, generate_duplicate_hashes_gradebook + + +def main(): + gradebook_dir_name = ' '.join(sys.argv[1:]) if len(sys.argv) > 1 else exit(f'\nNo gradebook dir name given. Provide the name as an argument.\n\nUsage: python {sys.argv[0]} [gradebook dir name]\nExample: python {sys.argv[0]} AssignmentX\n') + + gradebook_dir_path = os.path.join('BB_gradebooks', gradebook_dir_name) + # generate CSV file with hashes for all files in gradebook & return path to CSV file for finding duplicate hashes + hashes_csv_file_path = generate_hashes_gradebook(gradebook_dir_path) + # generate CSV file with files having duplicate hashes + generate_duplicate_hashes_gradebook(hashes_csv_file_path) + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/inspect_submissions.py b/inspect_submissions.py index 41ad77a..9c8c492 100644 --- a/inspect_submissions.py +++ b/inspect_submissions.py @@ -1,16 +1,15 @@ import os, sys -from utils.inspector import hash_submissions, inspect_for_duplicate_hashes +from utils.inspector import generate_hashes_submissions, generate_duplicate_hashes_submissions -CSV_DIR = os.path.join(os.getcwd(), 'csv') def main(): submissions_dir_name = ' '.join(sys.argv[1:]) if len(sys.argv) > 1 else exit(f'\nNo submissions dir name given. Provide the name as an argument.\n\nUsage: python {sys.argv[0]} [submissions dir name]\nExample: python {sys.argv[0]} AssignmentX\n') + submissions_dir_path = os.path.join('BB_submissions', submissions_dir_name) - if not os.path.isdir(submissions_dir_path): - exit(f'Directory {submissions_dir_path} does not exist.\nMake sure "{submissions_dir_name}" exists in "BB_submissions".\n') - else: - hashes_csv_file_path = hash_submissions(submissions_dir_path) # generate CSV file with hashes for all files (except for any 'excluded') & return path to CSV file for finding duplicate/suspicious hashes - inspect_for_duplicate_hashes(hashes_csv_file_path) # generate CSV file with files having duplicate/suspicious hashes + # generate CSV file with hashes for all files in submissions (except for any 'excluded') & return path to CSV file for finding duplicate hashes + hashes_csv_file_path = generate_hashes_submissions(submissions_dir_path) + # generate CSV file with files having duplicate hashes + generate_duplicate_hashes_submissions(hashes_csv_file_path) if __name__ == '__main__': diff --git a/utils/inspector.py b/utils/inspector.py index ab9d716..51575dd 100644 --- a/utils/inspector.py +++ b/utils/inspector.py @@ -3,8 +3,9 @@ from datetime import datetime import csv import hashlib import pandas as pd +from functools import partial -CSV_DIR = os.path.join(os.getcwd(), 'csv') +from utils.settings import CSV_DIR def load_excluded_filenames(submissions_dir_name: str) -> list[str]: # helper function for hashing all files @@ -38,50 +39,85 @@ def get_hashes_in_dir(dir_path: str, excluded_filenames: list = []) -> list: # return hash_list -def hash_submissions(submissions_dir_path: str) -> str: # main function for hashing all files - os.makedirs(CSV_DIR, exist_ok=True) - submissions_dir_name = os.path.abspath(submissions_dir_path).split(os.path.sep)[-1] # get name of submission/assignment by separating path and use rightmost part - excluded_filenames = load_excluded_filenames(submissions_dir_name) +def generate_hashes_gradebook(gradebook_dir_path: str) -> str: # main function for hashing all files in gradebook + gradebook_dir_name = os.path.abspath(gradebook_dir_path).split(os.path.sep) # get name of gradebook by separating path and use rightmost part + if not os.path.isdir(gradebook_dir_path): + exit(f'Directory {gradebook_dir_path} does not exist.\nMake sure "{gradebook_dir_name}" exists in "BB_gradebooks".\n') + + dicts_with_hashes_list = get_hashes_in_dir(gradebook_dir_path) + for hash_dict in dicts_with_hashes_list: + student_id = hash_dict['filename'].split('_attempt_')[0].split('_')[-1] + del hash_dict['filepath'] + hash_dict.update({'Student ID': student_id}) - csv_file_name = f'{submissions_dir_name}_file_hashes_{datetime.now().strftime("%Y%m%d-%H%M%S")}.csv' + os.makedirs(CSV_DIR, exist_ok=True) + csv_file_name = f'{gradebook_dir_name}_gradebook_file_hashes_{datetime.now().strftime("%Y%m%d-%H%M%S")}.csv' csv_file_path = os.path.join(CSV_DIR, csv_file_name) + + with open(csv_file_path, 'w', newline='') as csvfile: # open the output CSV file for writing + fieldnames = ['Student ID', 'filename', 'sha256 hash'] + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(dicts_with_hashes_list) + print(f'[INFO] Created CSV file with all files & hashes in gradebook: {gradebook_dir_name}\nCSV file: {csv_file_path}') + return csv_file_path + + +def generate_hashes_submissions(submissions_dir_path: str) -> str: # main function for hashing all files in submissions + submissions_dir_name = os.path.abspath(submissions_dir_path).split(os.path.sep)[-1] # get name of submission/assignment by separating path and use rightmost part + if not os.path.isdir(submissions_dir_path): + exit(f'Directory {submissions_dir_path} does not exist.\nMake sure "{submissions_dir_name}" exists in "BB_submissions".\n') + + excluded_filenames = load_excluded_filenames(submissions_dir_name) + dicts_with_hashes_list = [] + for student_dir_name in os.listdir(submissions_dir_path): # loop through each student dir to get hashes for all files per student + student_dir_path = os.path.join(submissions_dir_path, student_dir_name) + student_dicts_with_hashes_list = get_hashes_in_dir(student_dir_path, excluded_filenames) # dict with hashes for all student files - except for 'excluded' file names + student_dicts_list = [] + for hash_dict in student_dicts_with_hashes_list: + hash_dict.update({'Student ID': student_dir_name}) # update hash records with student id + student_dicts_list.append(hash_dict) # append file dict to student list of dict for csv export + + dicts_with_hashes_list.append(student_dicts_list) # append student hashes to main list with all submissions + + os.makedirs(CSV_DIR, exist_ok=True) + csv_file_name = f'{submissions_dir_name}_submissions_file_hashes_{datetime.now().strftime("%Y%m%d-%H%M%S")}.csv' + csv_file_path = os.path.join(CSV_DIR, csv_file_name) + with open(csv_file_path, 'w', newline='') as csvfile: # open the output CSV file for writing fieldnames = ['Student ID', 'filepath', 'filename', 'sha256 hash'] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() - - for student_dir_name in os.listdir(submissions_dir_path): # loop through each student dir to get hashes for all files per student - student_dir_path = os.path.join(submissions_dir_path, student_dir_name) - hashes_dict = get_hashes_in_dir(student_dir_path, excluded_filenames) # dict with hashes for all student files - except for 'excluded' file names - for d in hashes_dict: - d.update({'Student ID': student_dir_name}) # update hash records with student id - writer.writerows(hashes_dict) - print(f'[INFO] Created CSV file with all files & hashes in {submissions_dir_name}\nCSV file: {csv_file_path}') + for student_dict in dicts_with_hashes_list: + writer.writerows(student_dict) + print(f'[INFO] Created CSV file with all files & hashes for submissions in: {submissions_dir_name}\nCSV file: {csv_file_path}') return csv_file_path -def inspect_for_duplicate_hashes(hashes_csv_file_path: str): # main function for finding duplicate / suspicious hashes +def generate_duplicate_hashes_generic(hashes_csv_file_path: str, drop_columns: list[str]): csv = pd.read_csv(hashes_csv_file_path) df = pd.DataFrame(csv) # df with all files and their hashes - drop_columns = ['filepath', 'filename'] # only need to keep 'student id' and 'sha256 hash' for groupby later df_clean = df.drop(columns=drop_columns) # clear not needed columns duplicate_hash = df_clean.loc[df_clean.duplicated(subset=['sha256 hash'], keep=False), :] # all files with duplicate hash - incl. files from the same student id - # agg() for 'Student ID' True if more than 1 in groupby (= files with the same hash by multiple student ids) # False if unique (= files from the same student id with the same hash) hash_with_multiple_student_ids = duplicate_hash.groupby('sha256 hash').agg(lambda x: len(x.unique())>1) # list with duplicate hashes - only if different student id (doesn't include files from same student id) - suspicious_hashes_list = hash_with_multiple_student_ids[hash_with_multiple_student_ids['Student ID']==True].index.to_list() + duplicate_hashes_list = hash_with_multiple_student_ids[hash_with_multiple_student_ids['Student ID']==True].index.to_list() - files_with_suspicious_hash = df[df['sha256 hash'].isin(suspicious_hashes_list)] # df with all files with duplicate/suspicious hash, excludes files from the same student id - df_suspicious = files_with_suspicious_hash.sort_values(['sha256 hash', 'Student ID']) # sort before output to csv + files_with_duplicate_hash = df[df['sha256 hash'].isin(duplicate_hashes_list)] # df with all files with duplicate hash, excludes files from the same student id + df_duplicate = files_with_duplicate_hash.sort_values(['sha256 hash', 'Student ID']) # sort before output to csv + gradebook_or_submissions_str = os.path.basename(hashes_csv_file_path).split('_file_hashes_')[0].split('_')[-1] # 'gradebook' or 'submissions' depending on which files hashes csv is read + assignment_name = os.path.basename(hashes_csv_file_path).split(f'_{gradebook_or_submissions_str}_')[0] + csv_out = hashes_csv_file_path.rsplit('_', 1)[0].replace('file_hashes', 'duplicate_') + datetime.now().strftime("%Y%m%d-%H%M%S") + '.csv' try: - submissions_dir_name = os.path.basename(hashes_csv_file_path).split('_file_hashes_')[0] - csv_out = hashes_csv_file_path.rsplit('_', 1)[0].replace('file_hashes', 'suspicious_') + datetime.now().strftime("%Y%m%d-%H%M%S") + '.csv' - df_suspicious.to_csv(csv_out, index=False) - print(f'[INFO] Created CSV file with duplicate/suspicious hashes in {submissions_dir_name}\nCSV file: {csv_out}') + df_duplicate.to_csv(csv_out, index=False) + print(f'[INFO] Created CSV file with duplicate hashes in {gradebook_or_submissions_str}: {assignment_name}\nCSV file: {csv_out}') except Exception as e: - exit(f'[ERROR] Something went wrong while trying to save csv file with suspicious hashes\nError message: {e}') + exit(f'[ERROR] Something went wrong while trying to save csv file with duplicate hashes\nError message: {e}') +# partials for generate_duplicate_hashes_generic(), setting the appropriate drop_columns for gradebook / submissions +generate_duplicate_hashes_gradebook = partial(generate_duplicate_hashes_generic, drop_columns=['filename']) +generate_duplicate_hashes_submissions = partial(generate_duplicate_hashes_generic, drop_columns=['filepath', 'filename'])