-
Notifications
You must be signed in to change notification settings - Fork 33
Expand file tree
/
Copy pathfuncall_cpu.py
More file actions
74 lines (61 loc) · 2.58 KB
/
funcall_cpu.py
File metadata and controls
74 lines (61 loc) · 2.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# funcall_throughput_test.py (MODIFIED FOR RESOURCE MONITORING & CORRECTED)
import time
import concore
import os
import json
import threading
import psutil
import csv
# --- MONITORING FUNCTION (to be run in a separate thread) ---
def monitor_resources(stop_event, output_list):
"""Monitors this script's CPU and memory usage."""
process = psutil.Process(os.getpid())
while not stop_event.is_set():
# Get CPU (as a percentage) and Memory (RSS in MB)
cpu_percent = process.cpu_percent(interval=0.5)
memory_mb = process.memory_info().rss / (1024 * 1024)
output_list.append({'cpu_percent': cpu_percent, 'memory_mb': memory_mb})
print("funcall (Sender) using ZMQ REQ socket for Throughput & Resource Test.")
TEST_DURATION = 10
message_count = 0
resource_records = []
# --- Start Monitoring ---
stop_monitoring = threading.Event()
monitor_thread = threading.Thread(target=monitor_resources, args=(stop_monitoring, resource_records))
monitor_thread.start()
# --- Main Throughput Test Logic (CORRECTED) ---
concore.init_zmq_port(
port_name=PORT_NAME_IN_A,
port_type="connect",
address="tcp://192.168.0.109:" + PORT_IN_A, # Use 127.0.0.1 for local, or the receiver's IP
socket_type_str="REQ"
)
print(f"Sender starting. Will send data for {TEST_DURATION} seconds.")
# Send START signal and wait for reply
start_signal = json.dumps({"type": "control", "value": "START"})
concore.write(PORT_NAME_IN_A, "stream", start_signal)
concore.read(PORT_NAME_IN_A, "reply", "{}") # ADDED: Wait for acknowledgment
time.sleep(1)
start_time = time.perf_counter()
# Loop for the test duration
while (time.perf_counter() - start_time) < TEST_DURATION:
data_message = json.dumps({"type": "data", "value": message_count})
concore.write(PORT_NAME_IN_A, "stream", data_message)
concore.read(PORT_NAME_IN_A, "reply", "{}") # ADDED: Wait for acknowledgment
message_count += 1
time.sleep(1)
# Send STOP signal and wait for reply
stop_signal = json.dumps({"type": "control", "value": "STOP"})
concore.write(PORT_NAME_IN_A, "stream", stop_signal)
concore.read(PORT_NAME_IN_A, "reply", "{}") # ADDED: Wait for acknowledgment
print(f"Sender finished. Sent approximately {message_count} messages.")
# --- Stop Monitoring and Save Results ---
stop_monitoring.set()
monitor_thread.join()
concore.terminate_zmq()
if resource_records:
with open('sender_usage.csv', 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=['cpu_percent', 'memory_mb'])
writer.writeheader()
writer.writerows(resource_records)
print("Sender resource usage saved to sender_usage.csv")