1010import ssl
1111import types
1212from email .message import EmailMessage
13- from typing import List , cast
13+ from typing import Callable , Generator , List , cast
1414from urllib .parse import unquote , urlparse
1515
1616import lxml .etree as ET
@@ -168,7 +168,7 @@ def check():
168168 return Response (res , mimetype = "text/plain" , status = 503 if err_count else 200 )
169169
170170
171- def check_clamav ():
171+ def check_clamav () -> str :
172172 clamd_path = config ["config" ].get ("clamd_socket" )
173173 if clamd_path :
174174 test = clamav_sock .ping ()
@@ -177,7 +177,7 @@ def check_clamav():
177177 return "clamav: no ping\n "
178178
179179
180- def check_icap ():
180+ def check_icap () -> str :
181181 icap_host = config ["config" ].get ("icap_host" )
182182 if icap_host :
183183 try :
@@ -187,7 +187,7 @@ def check_icap():
187187 return "icap: failed\n "
188188
189189
190- def phr_service ():
190+ def phr_service () -> Response :
191191 """Scan AV on xop documents for retrieveDocumentSetRequest"""
192192 client_config = get_client_config ()
193193 with request_upstream (client_config ) as upstream :
@@ -199,7 +199,7 @@ def phr_service():
199199
200200 # debugging only - remove after testing
201201 if EICAR in data :
202- fn = f"/app/dump /{ request .path .replace ('/' , '_' )} .xml"
202+ fn = f"/tmp /{ request .path .replace ('/' , '_' )} .xml"
203203 with open (fn , "wb" ) as f :
204204 f .write (data )
205205 logger .error (f"found EICAR signature - see content in file { fn } " )
@@ -209,7 +209,7 @@ def phr_service():
209209 return response
210210
211211
212- def other ():
212+ def other () -> Response :
213213 """Streamed forward without scan"""
214214 client_config = get_client_config ()
215215 upstream = request_upstream (client_config , stream = True )
@@ -223,7 +223,7 @@ def generate():
223223 return response
224224
225225
226- def request_upstream (client_config , warn = True , stream = False ):
226+ def request_upstream (client_config , warn = True , stream = False ) -> Response :
227227 """Request to real Konnektor"""
228228
229229 konn = client_config ["Konnektor" ]
@@ -272,7 +272,7 @@ def request_upstream(client_config, warn=True, stream=False):
272272 abort (502 )
273273
274274
275- def get_client_config ():
275+ def get_client_config () -> configparser . ConfigParser :
276276 request_ip = request .headers .get ("X-real-ip" , request .host .split (":" )[0 ])
277277 port = request .host .split (":" )[1 ] if ":" in request .host else "443"
278278
@@ -444,7 +444,7 @@ def handle_attachment(
444444 msg .attach (att )
445445
446446
447- def get_malicious_content_ids (msg : EmailMessage ):
447+ def get_malicious_content_ids (msg : EmailMessage ) -> Generator [ str , None , None ] :
448448 """Extracting content_ids of malicious attachments"""
449449 for att in msg .iter_attachments ():
450450 att = cast (EmailMessage , att )
@@ -524,7 +524,7 @@ def fix_status(xml_resp, xml_errlist, xml_ns, msg):
524524
525525def build_payload (
526526 msg : EmailMessage , malicious_content_ids : List [str ], res : requests .Response
527- ):
527+ ) -> bytes :
528528 "create payload based on original response with replacing only payoad for malicious_content_ids"
529529
530530 content_type = res .headers ["Content-Type" ]
@@ -576,7 +576,7 @@ def get_content_id(content: bytes):
576576}
577577
578578
579- def get_replacement (mimetype ):
579+ def get_replacement (mimetype ) -> bytes :
580580 """get content for replacements"""
581581 filename = replacement_files .get (mimetype ) or replacement_files .get ("text/plain" )
582582 with open (filename , "rb" ) as f :
@@ -590,7 +590,7 @@ def dump(dict):
590590# File Scanning
591591
592592
593- def get_file_scanner ():
593+ def get_file_scanner () -> Callable [[ bytes ], List [ str | None ]] :
594594 clamd_path = config ["config" ].get ("clamd_socket" )
595595 icap_host = config ["config" ].get ("icap_host" )
596596
@@ -611,13 +611,13 @@ def get_file_scanner():
611611 return scan_file_icap
612612
613613
614- def scan_file_clamav (content ) :
614+ def scan_file_clamav (content : bytes ) -> List [ str | None ] :
615615 "return scan result, do use clamav socket"
616616 scan_res = clamav_sock .instream (io .BytesIO (content ))["stream" ]
617617 return scan_res
618618
619619
620- def scan_file_icap (content ) :
620+ def scan_file_icap (content : bytes ) -> List [ str | None ] :
621621 "return scan result, do use icap"
622622 icap_service = config ["config" ]["icap_service" ]
623623 icap_host = config ["config" ]["icap_host" ]
@@ -650,9 +650,10 @@ def scan_file_icap(content):
650650 first_line = first_block .partition (b"\r \n " )[0 ]
651651 http_response_code = second_block .partition (b"\r \n " )[0 ]
652652
653+ logger .debug (f"check icar { content [:30 ]} - { EICAR in content } " )
653654 if EICAR in content :
654- logger .debug (f"Eicar ICAP REQ \n { req .encode ()} { content } { footer .encode ()} " )
655- logger .debug (f"RESP\n { first_block } { second_block [:500 ]} " )
655+ logger .debug (f"Eicar ICAP REQ \n { req .encode () + content + footer .encode ()} " )
656+ logger .debug (f"RESP\n { first_block + second_block [:500 ]} " )
656657
657658 # debug
658659 return ["OK" , None ]
@@ -677,7 +678,7 @@ def scan_file_icap(content):
677678 return ["FOUND" , "unknown" ]
678679
679680
680- def _open_sock (host , port , tls ) :
681+ def _open_sock (host : str , port : int , tls : bool ) -> socket :
681682 if tls :
682683 with socket .create_connection ((host , port )) as sock :
683684 context = ssl .create_default_context ()
0 commit comments