Skip to content

Commit 426379a

Browse files
committed
Merge pull request #6 from rdobson/remotetool
Add an module with CLI support for displaying hardware info collected by the parsers.
2 parents 0054ede + 0186837 commit 426379a

File tree

2 files changed

+203
-24
lines changed

2 files changed

+203
-24
lines changed

hwinfo/tools/inspector.py

Lines changed: 58 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@
33
from argparse import ArgumentParser
44
from prettytable import PrettyTable
55
import paramiko
6+
import subprocess
7+
import os
68

79
from hwinfo.pci import PCIDevice
810
from hwinfo.pci.lspci import *
9-
from hwinfo.host.dmidecode import *
11+
12+
from hwinfo.host import dmidecode
1013

1114
def remote_command(host, username, password, cmd):
1215
client = paramiko.SSHClient()
@@ -18,12 +21,19 @@ def remote_command(host, username, password, cmd):
1821
output = stdout.readlines()
1922
error = stderr.readlines()
2023
if error:
21-
print "stderr: %s" % error
24+
raise Exception("stderr: %s" % error)
2225
client.close()
2326
return ''.join(output)
2427

2528
def local_command(cmd):
26-
return cmd
29+
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
30+
stdout, stderr = process.communicate()
31+
if process.returncode == 0:
32+
return str(stdout).strip()
33+
else:
34+
print "RC: %s" % process.returncode
35+
print stdout
36+
raise Exception("stderr: %s" % str(stderr))
2737

2838
class Host(object):
2939

@@ -38,18 +48,51 @@ def exec_command(self, cmd):
3848
else:
3949
return remote_command(self.host, self.username, self.password, cmd)
4050

51+
def get_lspci_data(self):
52+
return self.exec_command(['lspci', '-nnmm'])
53+
54+
def get_dmidecode_data(self):
55+
return self.exec_command(['dmidecode'])
56+
4157
def get_pci_devices(self):
42-
data = self.exec_command(['lspci', '-nnmm'])
58+
data = self.get_lspci_data()
4359
parser = LspciNNMMParser(data)
4460
devices = parser.parse_items()
4561
return [PCIDevice(device) for device in devices]
4662

4763
def get_info(self):
48-
data = self.exec_command(['dmidecode'])
49-
parser = DmidecodeParser(data)
64+
data = self.get_dmidecode_data()
65+
parser = dmidecode.DmidecodeParser(data)
5066
rec = parser.parse()
5167
return rec
5268

69+
def search_for_file(dirname, filename):
70+
for root, _, files in os.walk(dirname):
71+
if filename in files:
72+
return os.path.join(root, filename)
73+
raise Exception("Could not find '%s' in directory '%s'" % (filename, dirname))
74+
75+
def read_from_file(filename):
76+
fh = open(filename, 'r')
77+
data = fh.read()
78+
fh.close()
79+
return data
80+
81+
class HostFromLogs(Host):
82+
83+
def __init__(self, dirname):
84+
self.dirname = dirname
85+
86+
def _load_from_file(self, filename):
87+
filename = search_for_file(self.dirname, filename)
88+
return read_from_file(filename)
89+
90+
def get_lspci_data(self):
91+
return self._load_from_file('lspci-nnm.out')
92+
93+
def get_dmidecode_data(self):
94+
return self._load_from_file('dmidecode.out')
95+
5396
def pci_filter(devices, types):
5497
res = []
5598
for device in devices:
@@ -71,19 +114,6 @@ def pci_filter_for_gpu(devices):
71114
gpu_types = ['03']
72115
return pci_filter(devices, gpu_types)
73116

74-
def print_lines(lines):
75-
max_len = 0
76-
output = []
77-
for line in lines:
78-
output.append(line)
79-
if len(line) > max_len:
80-
max_len = len(line)
81-
print ""
82-
print "-" * max_len
83-
print '\n'.join(output)
84-
print "-" * max_len
85-
print ""
86-
87117
def rec_to_table(rec):
88118
table = PrettyTable(["Key", "Value"])
89119
table.align['Key'] = 'l'
@@ -115,14 +145,18 @@ def main():
115145
parser = ArgumentParser(prog="hwinfo")
116146

117147
filter_choices = ['bios', 'nic', 'storage', 'gpu']
118-
parser.add_argument("-f", "--filter", choices=filter_choices)
119-
parser.add_argument("-m", "--machine", default='localhost')
120-
parser.add_argument("-u", "--username")
121-
parser.add_argument("-p", "--password")
148+
parser.add_argument("-f", "--filter", choices=filter_choices, help="Query a specific class.")
149+
parser.add_argument("-m", "--machine", default='localhost', help="Remote host address.")
150+
parser.add_argument("-u", "--username", help="Username for remote host.")
151+
parser.add_argument("-p", "--password", help="Password for remote host.")
152+
parser.add_argument("-l", "--logs", help="Path to the directory with the logfiles.")
122153

123154
args = parser.parse_args()
124155

125-
host = Host(args.machine, args.username, args.password)
156+
if args.logs:
157+
host = HostFromLogs(args.logs)
158+
else:
159+
host = Host(args.machine, args.username, args.password)
126160

127161
options = []
128162

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
import unittest
2+
import mock
3+
from mock import patch
4+
from StringIO import StringIO
5+
6+
from hwinfo.tools import inspector
7+
8+
class HostObjectTests(unittest.TestCase):
9+
10+
@patch('hwinfo.tools.inspector.local_command')
11+
def test_local_exec_command(self, local_command):
12+
host = inspector.Host()
13+
host.exec_command('ls')
14+
inspector.local_command.assert_called_once_with('ls')
15+
16+
@patch('hwinfo.tools.inspector.remote_command')
17+
def test_remote_exec_command(self, remote_command):
18+
host = inspector.Host('mymachine', 'root', 'pass')
19+
host.exec_command('ls')
20+
inspector.remote_command.assert_called_once_with('mymachine', 'root', 'pass', 'ls')
21+
22+
@patch('hwinfo.tools.inspector.Host.exec_command')
23+
def test_get_pci_devices(self, exec_command):
24+
host = inspector.Host()
25+
devs = host.get_pci_devices()
26+
exec_command.assert_called_once_with(['lspci', '-nnmm'])
27+
28+
@patch('hwinfo.host.dmidecode.DmidecodeParser')
29+
@patch('hwinfo.tools.inspector.Host.exec_command')
30+
def test_get_info(self, mock_exec_command, mock_dmidecode_parser_cls):
31+
mock_exec_command.return_value = 'blah'
32+
mparser = mock_dmidecode_parser_cls.return_value = mock.Mock()
33+
mparser.parse.return_value = {'key':'value'}
34+
host = inspector.Host()
35+
rec = host.get_info()
36+
self.assertEqual(rec, {'key':'value'})
37+
38+
39+
class RemoteCommandTests(unittest.TestCase):
40+
41+
def setUp(self):
42+
self.stdout = StringIO('')
43+
self.stdin = StringIO('')
44+
self.stderr = StringIO('')
45+
46+
@patch('paramiko.SSHClient')
47+
def test_ssh_connect(self, ssh_client_cls):
48+
client = ssh_client_cls.return_value = mock.Mock()
49+
client.exec_command.return_value = self.stdout, self.stdin, self.stderr
50+
inspector.remote_command('test', 'user', 'pass', 'ls')
51+
client.connect.assert_called_with('test', password='pass', username='user', timeout=10)
52+
53+
@patch('paramiko.SSHClient')
54+
def test_ssh_connect_error(self, ssh_client_cls):
55+
client = ssh_client_cls.return_value = mock.Mock()
56+
client.exec_command.return_value = self.stdout, self.stdin, StringIO("Error")
57+
with self.assertRaises(Exception) as context:
58+
inspector.remote_command('test', 'user', 'pass', 'ls')
59+
self.assertEqual(context.exception.message, "stderr: ['Error']")
60+
61+
class LocalCommandTests(unittest.TestCase):
62+
63+
@patch('subprocess.Popen')
64+
def test_local_call(self, mock_popen_cls):
65+
mprocess =mock_popen_cls.return_value = mock.MagicMock()
66+
mprocess.communicate.return_value = 'test', None
67+
mprocess.returncode = 0
68+
stdout = inspector.local_command("echo 'test'")
69+
self.assertEqual(stdout, 'test')
70+
71+
@patch('subprocess.Popen')
72+
def test_local_call_error(self, mock_popen_cls):
73+
mprocess =mock_popen_cls.return_value = mock.MagicMock()
74+
mprocess.communicate.return_value = 'test', 'my error'
75+
mprocess.returncode = 1
76+
with self.assertRaises(Exception) as context:
77+
stdout = inspector.local_command("echo 'test'")
78+
self.assertEqual(context.exception.message, "stderr: my error")
79+
80+
81+
class PCIFilterTests(unittest.TestCase):
82+
83+
def setUp(self):
84+
device_a = mock.MagicMock()
85+
device_b = mock.MagicMock()
86+
device_c = mock.MagicMock()
87+
device_d = mock.MagicMock()
88+
89+
device_a.get_pci_class.return_value = '0230'
90+
device_b.get_pci_class.return_value = '0340'
91+
device_c.get_pci_class.return_value = '0210'
92+
device_d.get_pci_class.return_value = '0100'
93+
94+
self.devices = [device_a, device_b, device_c, device_d]
95+
96+
def test_pci_filter_match_all(self):
97+
devs = inspector.pci_filter(self.devices, ['0'])
98+
self.assertEqual(len(devs), len(self.devices))
99+
100+
def test_pci_filter_match_two(self):
101+
devs = inspector.pci_filter(self.devices, ['02'])
102+
for dev in devs:
103+
print dev.get_pci_class()
104+
self.assertEqual(len(devs), 2)
105+
106+
def test_pci_filter_match_one(self):
107+
devs = inspector.pci_filter(self.devices, ['023'])
108+
self.assertEqual(len(devs), 1)
109+
self.assertEqual(devs[0].get_pci_class(), '0230')
110+
111+
def test_pci_filter_match_none(self):
112+
devs = inspector.pci_filter(self.devices, ['0234'])
113+
self.assertEqual(devs, [])
114+
115+
def test_pci_filter_for_nics(self):
116+
devs = inspector.pci_filter_for_nics(self.devices)
117+
self.assertEqual(len(devs), 2)
118+
119+
def test_pci_filter_for_storage(self):
120+
devs = inspector.pci_filter_for_storage(self.devices)
121+
self.assertEqual(len(devs), 1)
122+
self.assertEqual(devs[0].get_pci_class(), '0100')
123+
124+
def test_pci_filter_for_gpu(self):
125+
devs = inspector.pci_filter_for_gpu(self.devices)
126+
self.assertEqual(len(devs), 1)
127+
self.assertEqual(devs[0].get_pci_class(), '0340')
128+
129+
130+
class TabulateTests(unittest.TestCase):
131+
132+
@patch('hwinfo.tools.inspector.PrettyTable')
133+
def test_rec_to_table(self, mock_pt_cls):
134+
mock_table = mock_pt_cls.return_value = mock.MagicMock()
135+
rec = {'one': 1, 'two': 2, 'three': 3}
136+
inspector.rec_to_table(rec)
137+
self.assertEqual(mock_table.add_row.call_count, 3)
138+
expected_calls = [
139+
mock.call(['one', 1]),
140+
mock.call(['two', 2]),
141+
mock.call(['three', 3]),
142+
]
143+
mock_table.add_row.assert_has_calls(expected_calls, any_order=True)
144+
145+

0 commit comments

Comments
 (0)