Skip to content
This repository was archived by the owner on Jul 30, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
feat: add DeviceMiniInfo
  • Loading branch information
huangguihua committed May 28, 2024
commit 6a109b18271e8968cdc76ed34fb29783a43623b9
6 changes: 2 additions & 4 deletions tidevice3/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,9 @@ def connect_remote_service_discovery_service(udid: str, tunneld_url: str = None)
try:
resp = requests.get(tunneld_url, timeout=DEFAULT_TIMEOUT)
tunnels: Dict[str, Any] = resp.json()
ipv6_address = tunnels.get("usb_" + udid)
ipv6_address = tunnels.get(udid)
if ipv6_address is None:
ipv6_address = tunnels.get("wifi_" + udid)
if ipv6_address is None:
raise FatalError("tunneld not ready for device", udid)
raise FatalError("tunneld not ready for device", udid)
rsd = EnterableRemoteServiceDiscoveryService(ipv6_address)
return rsd
except requests.RequestException:
Expand Down
73 changes: 33 additions & 40 deletions tidevice3/cli/tunneld.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,30 +34,26 @@ class Address(NamedTuple):
port: int


def get_connected_devices(wifi: bool) -> list[str]:
class DeviceMiniInfo(NamedTuple):
Udid: str
ConnectionType: str
ProductVersion: str


def get_connected_devices(wifi: bool) -> List[DeviceMiniInfo]:
"""return list of udid"""
try:
usb_devices = list_devices(usb=True, network=False)
devices = ["usb_" + d.Identifier for d in usb_devices if Version(d.ProductVersion) >= Version("17")]
devices = [DeviceMiniInfo(d.Identifier, d.ConnectionType, d.ProductVersion) for d in usb_devices if Version(d.ProductVersion) >= Version("17")]
if wifi:
wifi_devices = list_devices(usb=False, network=True)
devices.extend(["wifi_" + d.Identifier for d in wifi_devices if Version(d.ProductVersion) >= Version("17")])
devices.extend([DeviceMiniInfo(d.Identifier, d.ConnectionType, d.ProductVersion) for d in wifi_devices if Version(d.ProductVersion) >= Version("17")])
except MuxException as e:
logger.error("list_devices failed: %s", e)
return []
return devices


def get_need_lockdown_devices() -> list[str]:
"""return list of udid"""
try:
devices = list_devices(usb=True, network=False)
except MuxException as e:
logger.error("list_devices failed: %s", e)
return []
return [d.Identifier for d in devices if Version(d.ProductVersion) >= Version("17.4")]


def guess_pymobiledevice3_cmd() -> List[str]:
pmd3path = shutil.which("pymobiledevice3")
if not pmd3path:
Expand All @@ -70,24 +66,22 @@ class TunnelError(Exception):


@threadsafe_function
def start_tunnel(pmd3_path: List[str], udid: str) -> Tuple[Address, subprocess.Popen]:
def start_tunnel(pmd3_path: List[str], device: DeviceMiniInfo) -> Tuple[Address, subprocess.Popen]:
"""
Start program, should be killed when the main program quit

Raises:
TunnelError
"""
# cmd = ["bash", "-c", "echo ::1 1234; sleep 10001"]
device_type, _udid = udid.split("_")[0], udid.split("_")[1]
log_prefix = f"[{_udid}]"
log_prefix = f"[{device.Udid}]"
start_tunnel_cmd = "remote"
lockdown_devices = get_need_lockdown_devices()
if device_type == "wifi" and _udid not in lockdown_devices:
cmdargs = pmd3_path + f"{start_tunnel_cmd} start-tunnel --script-mode --udid {_udid} -t wifi".split()
if device.ConnectionType == "Network" and Version(device.ProductVersion) < Version("17.4"):
cmdargs = pmd3_path + f"{start_tunnel_cmd} start-tunnel --script-mode --udid {device.Udid} -t wifi".split()
else:
if _udid in lockdown_devices:
if Version(device.ProductVersion) >= Version("17.4"):
start_tunnel_cmd = "lockdown"
cmdargs = pmd3_path + f"{start_tunnel_cmd} start-tunnel --script-mode --udid {_udid}".split()
cmdargs = pmd3_path + f"{start_tunnel_cmd} start-tunnel --script-mode --udid {device.Udid}".split()
logger.info("%s cmd: %s", log_prefix, shlex.join(cmdargs))
process = subprocess.Popen(
cmdargs, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE
Expand All @@ -111,26 +105,25 @@ def __init__(self):

def update_devices(self, wifi: bool):
current_devices = get_connected_devices(wifi)
current_udids = [d.Udid for d in current_devices]
active_udids = self.active_monitors.keys()

# Start monitors for new devices
for udid in current_devices:
if udid in active_udids:
continue
if udid.replace("wifi", "usb") in active_udids: # skip if device already monitered by usb
for device in current_devices:
if device.Udid in active_udids:
continue
self.active_monitors[udid] = None
self.active_monitors[device.Udid] = None
try:
threading.Thread(name=f"{udid} keeper",
threading.Thread(name=f"{device.Udid} keeper",
target=self._start_tunnel_keeper,
args=(udid,),
args=(device,),
daemon=True).start()
except Exception as e:
logger.error("udid: %s start-tunnel failed: %s", udid, e)
logger.error("udid: %s start-tunnel failed: %s", device, e)

# Stop monitors for disconnected devices
for udid in active_udids:
if udid in current_devices:
if udid in current_udids:
continue
logger.info("udid: %s quit, terminate related process", udid)
process = self.active_monitors[udid]
Expand All @@ -139,23 +132,23 @@ def update_devices(self, wifi: bool):
self.active_monitors.pop(udid, None)
self.addresses.pop(udid, None)

def _start_tunnel_keeper(self, udid: str):
while udid in self.active_monitors:
def _start_tunnel_keeper(self, device: DeviceMiniInfo):
while device.Udid in self.active_monitors:
try:
addr, process = start_tunnel(self.pmd3_cmd, udid)
self.active_monitors[udid] = process
self.addresses[udid] = addr
self._wait_process_exit(process, udid)
addr, process = start_tunnel(self.pmd3_cmd, device)
self.active_monitors[device.Udid] = process
self.addresses[device.Udid] = addr
self._wait_process_exit(process, device)
except TunnelError:
logger.exception("udid: %s start-tunnel failed", udid)
logger.exception("udid: %s start-tunnel failed", device)
time.sleep(3)

def _wait_process_exit(self, process: subprocess.Popen, udid: str):
def _wait_process_exit(self, process: subprocess.Popen, device: DeviceMiniInfo):
while True:
try:
process.wait(1.0)
self.addresses.pop(udid, None)
logger.warning("udid: %s process exit with code: %s", udid, process.returncode)
self.addresses.pop(device.Udid, None)
logger.warning("udid: %s process exit with code: %s", device, process.returncode)
break
except subprocess.TimeoutExpired:
continue
Expand Down