-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathshare_files.py
More file actions
201 lines (150 loc) · 7.19 KB
/
share_files.py
File metadata and controls
201 lines (150 loc) · 7.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import socket
import threading
import keyboard
import json
from time import sleep
from classes import *
from classes import types as tp
import os
### This is the server file for the file sharing application. It is responsible for handling incoming connections and requests from clients. ###
def config_loader():
if not os.path.exists("config.json"):
prints(tp.WARNING, """"The config file was not found. Creating a new one with default values...""")
with open("config.json", "w") as f:
default_config = {
"debug_mode": False,
"fake_sleep": 0.5
}
f.write(json.dumps(default_config))
return default_config
with open("config.json", "r+") as f:
config = json.loads(f.read())
f.seek(0)
try:
if not isinstance(config["debug_mode"], bool):
prints(tp.WARNING, """The value for the "Debug mode" config is not boolean. Setting to False.""")
prints(type=tp.ADEBUG, text=f"debug_mode value & type: {config['debug_mode']}, {config['debug_mode'].__class__}")
config["debug_mode"] = False
except KeyError:
prints(tp.WARNING, """The value for the "Debug Mode" config was not found. Setting to False.""")
config["debug_mode"] = False
try:
if not isinstance(config["fake_sleep"], float) and not isinstance(config["fake_sleep"], int):
prints(tp.WARNING, """The value for the "Fake sleep" config is not a float or integer. Setting to 0.5.""")
prints(type=tp.DEBUG, text=f"fake_sleep value & type: {config['fake_sleep']}, {config['fake_sleep'].__class__}", config=config)
config["fake_sleep"] = 0.5
except KeyError:
prints(tp.WARNING, """The value for the "Fake sleep" config was not found. Setting to 0.5.""")
config["fake_sleep"] = 0.5
f.write(json.dumps(config))
f.truncate()
prints(type=tp.DEBUG, text=f"Config loaded successfully with values:\n{config}", config=config)
return config
def command_handler(command: str, server: Server, config: dict):
if command.lower() == "start":
if server.server_loop_thread.is_alive():
prints(tp.WARNING, "Server is already running.")
return
prints(tp.IMPORTANT, "Starting server...")
sleep(1)
start_server_thread = threading.Thread(target=server.start, name="Main Server Loop")
start_server_thread.start()
start_server_thread.join()
elif command.lower() == "test_delay":
if config["debug_mode"] == True:
print("Testing delay of 3 seconds...")
sleep(3)
print("Delay test complete.")
elif command.lower() == "whatsmyip":
prints(text=f"Your public IP address is {server.public_ip}.", type=tp.SUCCESS)
elif command.lower() == "status":
if server.server_loop_thread.is_alive():
prints(type=tp.SUCCESS, text="Server is running.")
else:
prints(type=tp.SUCCESS, text="Server is not running.")
elif command.lower() == "start":
if server.server_loop_running:
prints(type=tp.IMPORTANT, text="Server is already running.")
return
prints(tp.IMPORTANT, "Starting server...")
sleep(1)
start_server_thread = threading.Thread(target=server.start, name="Main Server Loop")
start_server_thread.start()
elif command.lower() == "stop" or command.lower() == "exit" or command.lower()== "quit" or command.lower() == "bye" or command.lower() == "shutdown" or command.lower() == "stop":
prints(tp.IMPORTANT, "Stopping server...")
prints(tp.IMPORTANT, "This can take up to 10 seconds.")
server.stop()
elif command.lower().startswith("list") or command.lower().startswith("ls") or command.lower().startswith("dir"):
try:
arg1 = command.split(" ")[1:][0].lower()
except:
files = os.listdir("server_files")
for file in files:
print(file)
return
if arg1 == "-l":
files = os.listdir("server_files")
for file in files:
print(file, os.path.getsize(f"server_files/{file}"), "bytes")
else:
prints(type=tp.WARNING, text="Invalid list command. Type 'help list' for a list of list commands.")
elif command.lower().startswith("cls") or command.lower().startswith("clear"):
os.system("cls" if os.name == "nt" else "clear")
elif command.lower().startswith("restart"):
prints(tp.IMPORTANT, "Restarting server...")
prints(tp.IMPORTANT, "This can take up to 20 seconds.")
if server.server_loop_running:
server.stop()
start_server_thread = threading.Thread(target=server.start, name="Main Server Loop")
start_server_thread.start()
start_server_thread.join(timeout=30)
sleep(config["fake_sleep"])
elif command.startswith("help"):
if command == "help":
print("""Available commands:
start - Start the server
stop - Stop the server
restart - Restart the server
status - Display the status of the server
help - Display this help message
list - List files in server_files directory:
-l - List files in server_files directory with details
whatsmyip - Display your public IP address
exit,quit - Stop the server
clear,cls - Clear the console""")
prints(type=tp.DEBUG, text="""Available commands:
test_delay: Test by delaying command handler by 3 seconds""", config=config)
else:
arg1 = command.split(" ")[1:][0]
if arg1 == "list":
print("""List commands:
list - List files in server_files directory
list -l - List files in server_files directory with details""")
else:
prints(tp.WARNING, "Invalid command. Type 'help' for a list of commands.")
def main():
config = config_loader(); print(f"Config: {config}")
if not os.path.exists("server_files"):
os.makedirs("server_files")
server = Server(["0.0.0.0", 8081])
server.config = config
def exit_program():
prints(tp.IMPORTANT, "Stopping server...")
server.stop()
exit(0)
prints(tp.IMPORTANT, "Starting server...")
start_server_thread = threading.Thread(target=server.start, name="Main Server Loop")
start_server_thread.start()
start_server_thread.join()
sleep(config["fake_sleep"])
print("Type 'help' for a list of commands.")
while True:
try:
command = input(">> ")
command_handler(command, server, config)
except KeyboardInterrupt:
exit_program()
except Exception as e:
prints(tp.ERROR, f"Encountered error in Command Loop: {e}")
if __name__ == "__main__":
main()