@@ -76,6 +76,7 @@ impl Daemon {
7676 . route ( "/status" , get ( status_handler) )
7777 . route ( "/command" , post ( command_handler) )
7878 . route ( "/ai-generate" , post ( ai_generate_proxy_handler) )
79+ . route ( "/ai-stream" , get ( ai_stream_ws_handler) )
7980 . route ( "/check-update" , get ( check_update_handler) )
8081 . route ( "/ext" , get ( ws_handler) )
8182 . layer ( cors)
@@ -499,6 +500,26 @@ async fn handle_extension_ws(state: Arc<DaemonState>, socket: WebSocket) {
499500 match msg {
500501 Ok ( Message :: Text ( text) ) => {
501502 debug ! ( len = text. len( ) , "received message from extension" ) ;
503+
504+ // Check if it's an AI generate request
505+ if let Ok ( parsed) = serde_json:: from_str :: < serde_json:: Value > ( text. as_str ( ) ) {
506+ let msg_type = parsed. get ( "type" ) . and_then ( |t| t. as_str ( ) ) . unwrap_or ( "" ) ;
507+ debug ! ( msg_type, "parsed extension message type" ) ;
508+ // Skip known non-command messages
509+ if msg_type == "log" || msg_type == "hello" {
510+ continue ;
511+ }
512+ if msg_type == "ai-generate" {
513+ let stream_id = parsed. get ( "streamId" ) . and_then ( |s| s. as_str ( ) ) . unwrap_or ( "" ) . to_string ( ) ;
514+ let state_clone = state. clone ( ) ;
515+ let body_json = parsed. clone ( ) ;
516+ tokio:: spawn ( async move {
517+ handle_ai_stream_via_ws ( state_clone, stream_id, body_json) . await ;
518+ } ) ;
519+ continue ;
520+ }
521+ }
522+
502523 match serde_json:: from_str :: < DaemonResult > ( & text) {
503524 Ok ( result) => {
504525 let id = result. id . clone ( ) ;
@@ -544,6 +565,194 @@ async fn handle_extension_ws(state: Arc<DaemonState>, socket: WebSocket) {
544565 }
545566}
546567
568+ // ─── AI Stream via existing extension WS ────────────────────────
569+ async fn handle_ai_stream_via_ws ( state : Arc < DaemonState > , stream_id : String , body : serde_json:: Value ) {
570+ // Helper to send message back through extension WS
571+ async fn send_ws ( state : & Arc < DaemonState > , msg : serde_json:: Value ) {
572+ let mut tx = state. extension_tx . lock ( ) . await ;
573+ if let Some ( ref mut sink) = * tx {
574+ let _ = sink. send ( Message :: Text ( msg. to_string ( ) . into ( ) ) ) . await ;
575+ }
576+ }
577+
578+ // Read token
579+ let home = std:: env:: var ( "HOME" )
580+ . or_else ( |_| std:: env:: var ( "USERPROFILE" ) )
581+ . unwrap_or_else ( |_| "." . to_string ( ) ) ;
582+ let config_path = std:: path:: PathBuf :: from ( & home) . join ( ".autocli" ) . join ( "config.json" ) ;
583+ let token = std:: fs:: read_to_string ( & config_path)
584+ . ok ( )
585+ . and_then ( |c| serde_json:: from_str :: < serde_json:: Value > ( & c) . ok ( ) )
586+ . and_then ( |v| v. get ( "autocli-token" ) . and_then ( |t| t. as_str ( ) ) . map ( String :: from) )
587+ . unwrap_or_default ( ) ;
588+
589+ if token. is_empty ( ) {
590+ send_ws ( & state, json ! ( { "type" : "ai-stream-error" , "streamId" : stream_id, "error" : "No token" } ) ) . await ;
591+ return ;
592+ }
593+
594+ let api_base = std:: env:: var ( "AUTOCLI_API_BASE" )
595+ . unwrap_or_else ( |_| "https://www.autocli.ai" . to_string ( ) ) ;
596+ let url = format ! ( "{}/api/ai/extension-generate" , api_base. trim_end_matches( '/' ) ) ;
597+
598+ // Build request body from the message
599+ let request_body = json ! ( {
600+ "captured_data" : body. get( "body" ) . and_then( |b| b. get( "captured_data" ) ) . cloned( ) . unwrap_or( json!( null) ) ,
601+ "stream" : true ,
602+ } ) ;
603+
604+ let client = match reqwest:: Client :: builder ( ) . timeout ( std:: time:: Duration :: from_secs ( 300 ) ) . build ( ) {
605+ Ok ( c) => c,
606+ Err ( e) => {
607+ send_ws ( & state, json ! ( { "type" : "ai-stream-error" , "streamId" : stream_id, "error" : e. to_string( ) } ) ) . await ;
608+ return ;
609+ }
610+ } ;
611+
612+ let mut resp = match client
613+ . post ( & url)
614+ . header ( "Authorization" , format ! ( "Bearer {}" , token) )
615+ . header ( "Content-Type" , "application/json" )
616+ . json ( & request_body)
617+ . send ( )
618+ . await
619+ {
620+ Ok ( r) => r,
621+ Err ( e) => {
622+ send_ws ( & state, json ! ( { "type" : "ai-stream-error" , "streamId" : stream_id, "error" : e. to_string( ) } ) ) . await ;
623+ return ;
624+ }
625+ } ;
626+
627+ if !resp. status ( ) . is_success ( ) {
628+ let status = resp. status ( ) . as_u16 ( ) ;
629+ let body_text = resp. text ( ) . await . unwrap_or_default ( ) ;
630+ send_ws ( & state, json ! ( { "type" : "ai-stream-error" , "streamId" : stream_id, "status" : status, "body" : body_text } ) ) . await ;
631+ return ;
632+ }
633+
634+ // Stream chunks back through WS
635+ let mut all_bytes = Vec :: new ( ) ;
636+ while let Some ( chunk) = resp. chunk ( ) . await . unwrap_or ( None ) {
637+ all_bytes. extend_from_slice ( & chunk) ;
638+ if let Ok ( text) = std:: str:: from_utf8 ( & chunk) {
639+ send_ws ( & state, json ! ( { "type" : "ai-stream-chunk" , "streamId" : stream_id, "data" : text } ) ) . await ;
640+ }
641+ }
642+
643+ send_ws ( & state, json ! ( { "type" : "ai-stream-done" , "streamId" : stream_id } ) ) . await ;
644+
645+ // Post-processing: save + upload
646+ let full_text = String :: from_utf8_lossy ( & all_bytes) . to_string ( ) ;
647+ let yaml_content = extract_yaml_from_response ( & full_text) ;
648+ if !yaml_content. is_empty ( ) {
649+ let _ = save_adapter_locally ( & home, & yaml_content) ;
650+ let _ = upload_adapter_to_server ( & api_base, & token, & yaml_content) . await ;
651+ }
652+ }
653+
654+ // ─── AI Stream WebSocket (standalone endpoint) ──────────────────
655+ /// GET /ai-stream — WebSocket endpoint for streaming AI generation.
656+ /// Client sends request JSON, server forwards to autocli.ai and streams SSE data back via WS.
657+ async fn ai_stream_ws_handler ( ws : WebSocketUpgrade ) -> impl IntoResponse {
658+ ws. on_upgrade ( handle_ai_stream_socket)
659+ }
660+
661+ async fn handle_ai_stream_socket ( mut socket : WebSocket ) {
662+ // Wait for client message with request body
663+ let request_body = match socket. recv ( ) . await {
664+ Some ( Ok ( Message :: Text ( text) ) ) => text,
665+ _ => { let _ = socket. close ( ) . await ; return ; }
666+ } ;
667+
668+ // Read token
669+ let home = std:: env:: var ( "HOME" )
670+ . or_else ( |_| std:: env:: var ( "USERPROFILE" ) )
671+ . unwrap_or_else ( |_| "." . to_string ( ) ) ;
672+ let config_path = std:: path:: PathBuf :: from ( & home) . join ( ".autocli" ) . join ( "config.json" ) ;
673+ let token = std:: fs:: read_to_string ( & config_path)
674+ . ok ( )
675+ . and_then ( |c| serde_json:: from_str :: < serde_json:: Value > ( & c) . ok ( ) )
676+ . and_then ( |v| v. get ( "autocli-token" ) . and_then ( |t| t. as_str ( ) ) . map ( String :: from) )
677+ . unwrap_or_default ( ) ;
678+
679+ if token. is_empty ( ) {
680+ let _ = socket. send ( Message :: Text ( "data: {\" error\" :\" No token configured\" }\n \n " . into ( ) ) ) . await ;
681+ let _ = socket. close ( ) . await ;
682+ return ;
683+ }
684+
685+ let api_base = std:: env:: var ( "AUTOCLI_API_BASE" )
686+ . unwrap_or_else ( |_| "https://www.autocli.ai" . to_string ( ) ) ;
687+ let url = format ! ( "{}/api/ai/extension-generate" , api_base. trim_end_matches( '/' ) ) ;
688+
689+ let client = match reqwest:: Client :: builder ( ) . timeout ( std:: time:: Duration :: from_secs ( 300 ) ) . build ( ) {
690+ Ok ( c) => c,
691+ Err ( _) => { let _ = socket. close ( ) . await ; return ; }
692+ } ;
693+
694+ let mut resp = match client
695+ . post ( & url)
696+ . header ( "Authorization" , format ! ( "Bearer {}" , token) )
697+ . header ( "Content-Type" , "application/json" )
698+ . body ( request_body. to_string ( ) )
699+ . send ( )
700+ . await
701+ {
702+ Ok ( r) => r,
703+ Err ( e) => {
704+ let _ = socket. send ( Message :: Text ( format ! ( "data: {{\" error\" :\" {}\" }}\n \n " , e) . into ( ) ) ) . await ;
705+ let _ = socket. close ( ) . await ;
706+ return ;
707+ }
708+ } ;
709+
710+ if !resp. status ( ) . is_success ( ) {
711+ let status = resp. status ( ) . as_u16 ( ) ;
712+ let body = resp. text ( ) . await . unwrap_or_default ( ) ;
713+ let err_body = body. replace ( '"' , "\\ \" " ) . chars ( ) . take ( 200 ) . collect :: < String > ( ) ;
714+ let _ = socket. send ( Message :: Text ( format ! ( "data: {{\" error\" :\" {}: {}\" }}\n \n status: {}" , status, err_body, status) . into ( ) ) ) . await ;
715+ let _ = socket. close ( ) . await ;
716+
717+ // Don't save/upload on error
718+ return ;
719+ }
720+
721+ // Stream response chunks to WebSocket AND buffer for post-processing
722+ let mut all_bytes = Vec :: new ( ) ;
723+ let mut line_buffer = String :: new ( ) ;
724+
725+ let stream_start = std:: time:: Instant :: now ( ) ;
726+ let mut chunk_count = 0u32 ;
727+ while let Some ( chunk) = resp. chunk ( ) . await . unwrap_or ( None ) {
728+ chunk_count += 1 ;
729+ tracing:: debug!( chunk_count, size = chunk. len( ) , elapsed_ms = stream_start. elapsed( ) . as_millis( ) as u64 , "AI stream chunk received" ) ;
730+ all_bytes. extend_from_slice ( & chunk) ;
731+ if let Ok ( text) = std:: str:: from_utf8 ( & chunk) {
732+ line_buffer. push_str ( text) ;
733+ while let Some ( pos) = line_buffer. find ( '\n' ) {
734+ let line = line_buffer[ ..=pos] . to_string ( ) ;
735+ line_buffer = line_buffer[ pos + 1 ..] . to_string ( ) ;
736+ let _ = socket. send ( Message :: Text ( line. into ( ) ) ) . await ;
737+ }
738+ }
739+ }
740+ tracing:: debug!( total_chunks = chunk_count, total_bytes = all_bytes. len( ) , total_ms = stream_start. elapsed( ) . as_millis( ) as u64 , "AI stream complete" ) ;
741+ if !line_buffer. is_empty ( ) {
742+ let _ = socket. send ( Message :: Text ( line_buffer. into ( ) ) ) . await ;
743+ }
744+
745+ let _ = socket. close ( ) . await ;
746+
747+ // Post-processing: save + upload (same as ai_generate_proxy_handler)
748+ let full_text = String :: from_utf8_lossy ( & all_bytes) . to_string ( ) ;
749+ let yaml_content = extract_yaml_from_response ( & full_text) ;
750+ if !yaml_content. is_empty ( ) {
751+ let _ = save_adapter_locally ( & home, & yaml_content) ;
752+ let _ = upload_adapter_to_server ( & api_base, & token, & yaml_content) . await ;
753+ }
754+ }
755+
547756// ─── Update check ───────────────────────────────────────────────
548757
549758static CACHED_UPDATE : std:: sync:: OnceLock < tokio:: sync:: RwLock < Option < serde_json:: Value > > > = std:: sync:: OnceLock :: new ( ) ;
0 commit comments