Skip to content

Commit 9bc6604

Browse files
committed
Failure retries + recovery.
1 parent cc21716 commit 9bc6604

File tree

6 files changed

+85
-20
lines changed

6 files changed

+85
-20
lines changed

examples/alias_operations.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@
77
'nodes': [{
88
'host': 'localhost',
99
'port': '8108',
10-
'protocol': 'http',
11-
'api_key': 'abcd'
10+
'protocol': 'http'
1211
}],
1312
'timeout_seconds': 2
1413
})

examples/collection_operations.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@
1111
'nodes': [{
1212
'host': 'localhost',
1313
'port': '8108',
14-
'protocol': 'http',
15-
'api_key': 'abcd'
14+
'protocol': 'http'
1615
}],
1716
'timeout_seconds': 2
1817
})

examples/curation_operations.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@
77
'nodes': [{
88
'host': 'localhost',
99
'port': '8108',
10-
'protocol': 'http',
11-
'api_key': 'abcd'
10+
'protocol': 'http'
1211
}],
1312
'timeout_seconds': 2
1413
})

examples/indexing_in_loop.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import os
2+
import sys
3+
import json
4+
5+
curr_dir = os.path.dirname(os.path.realpath(__file__))
6+
sys.path.insert(1, os.path.abspath(os.path.join(curr_dir, os.pardir)))
7+
8+
import typesense
9+
10+
client = typesense.Client({
11+
'api_key': 'abcd',
12+
'nodes': [
13+
{
14+
'host': 'localhost',
15+
'port': '8108',
16+
'protocol': 'http'
17+
},
18+
{
19+
'host': 'localhost',
20+
'port': '7108',
21+
'protocol': 'http'
22+
},
23+
{
24+
'host': 'localhost',
25+
'port': '6108',
26+
'protocol': 'http'
27+
}
28+
],
29+
'timeout_seconds': 2
30+
})
31+
32+
schema = {
33+
"name": "books",
34+
"fields": [
35+
{"name": "title", "type": "string"},
36+
{"name": "authors", "type": "string[]"},
37+
{"name": "authors_facet", "type": "string[]", "facet": True},
38+
{"name": "publication_year", "type": "int32"},
39+
{"name": "publication_year_facet", "type": "string", "facet": True},
40+
{"name": "ratings_count", "type": "int32"},
41+
{"name": "average_rating", "type": "float"},
42+
{"name": "image_url", "type": "string"}
43+
],
44+
"default_sorting_field": "ratings_count"
45+
}
46+
47+
create_response = client.collections.create(schema)
48+
49+
print(create_response)
50+
51+
with open('/tmp/books.jsonl') as infile:
52+
for json_line in infile:
53+
book_document = json.loads(json_line)
54+
client.collections['books'].documents.create(book_document)

typesense/api_call.py

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from .exceptions import (ObjectAlreadyExists,
55
ObjectNotFound, ObjectUnprocessable,
66
RequestMalformed, RequestUnauthorized,
7-
ServerError, TypesenseClientError)
7+
ServerError, ServiceUnavailable, TypesenseClientError)
88

99

1010
class ApiCall(object):
@@ -15,27 +15,31 @@ def __init__(self, config):
1515
self.config = config
1616
self.nodes = self.config.nodes
1717
self.node_index = 0
18-
self.failed_nodes = []
19-
self.last_fail_check_ts = int(time.time())
18+
self.last_health_check_ts = int(time.time())
2019

2120
def _check_failed_node(self):
2221
current_epoch_ts = int(time.time())
23-
check_failed_node = ((current_epoch_ts - self.last_fail_check_ts) > ApiCall.CHECK_FAILED_NODE_INTERVAL_S)
24-
if check_failed_node:
25-
self.last_fail_check_ts = current_epoch_ts
22+
check_node = ((current_epoch_ts - self.last_health_check_ts) > ApiCall.CHECK_FAILED_NODE_INTERVAL_S)
23+
if check_node:
24+
self.last_health_check_ts = current_epoch_ts
2625

27-
return check_failed_node
26+
return check_node
2827

2928
# Returns a healthy host from the pool in a round-robin fashion.
3029
# Might return an unhealthy host periodically to check for recovery.
3130
def get_node(self):
32-
num_times = 0
33-
while num_times < 3:
34-
num_times += 1
31+
i = 0
32+
while i < len(self.nodes):
33+
i += 1
3534
self.node_index = (self.node_index + 1) % len(self.nodes)
3635
if self.nodes[self.node_index].healthy or self._check_failed_node():
3736
return self.nodes[self.node_index]
3837

38+
# None of the nodes are marked healthy, but some of them could have become healthy since last health check.
39+
# So we will just return the next node.
40+
self.node_index = (self.node_index + 1) % len(self.nodes)
41+
return self.nodes[self.node_index]
42+
3943
@staticmethod
4044
def get_exception(http_code):
4145
if http_code == 400:
@@ -50,6 +54,8 @@ def get_exception(http_code):
5054
return ObjectUnprocessable
5155
elif http_code == 500:
5256
return ServerError
57+
elif http_code == 503:
58+
return ServiceUnavailable
5359
else:
5460
return TypesenseClientError
5561

@@ -59,16 +65,20 @@ def make_request(self, fn, method, endpoint, as_json, **kwargs):
5965
while num_tries < self.config.num_retries:
6066
num_tries += 1
6167
node = self.get_node()
68+
node.healthy = False
6269

6370
try:
6471
url = node.url() + endpoint
65-
print('URL is: ' + url)
6672
r = fn(url, headers={ApiCall.API_KEY_HEADER_NAME: self.config.api_key}, **kwargs)
73+
74+
if 0 < r.status_code < 500:
75+
node.healthy = True
76+
6777
if (method != 'post' and r.status_code != 200) or (method == 'post' and r.status_code != 201):
6878
error_message = r.json().get('message', 'API error.')
79+
# print('error_message: ' + error_message)
6980
raise ApiCall.get_exception(r.status_code)(error_message)
7081

71-
node.healthy = True
7282
return r.json() if as_json else r.text
7383
except requests.exceptions.Timeout:
7484
pass
@@ -79,10 +89,10 @@ def make_request(self, fn, method, endpoint, as_json, **kwargs):
7989
except Exception as e:
8090
raise e
8191

82-
node.healthy = False
92+
# print('Failed, retrying after sleep: ' + node.port)
8393
time.sleep(self.config.retry_interval_seconds)
8494

85-
raise TypesenseClientError('All hosts are bad.')
95+
raise TypesenseClientError('Retries exceeded.')
8696

8797
def get(self, endpoint, params=None, as_json=True):
8898
params = params or {}

typesense/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,7 @@ class ObjectUnprocessable(TypesenseClientError):
3333

3434
class ServerError(TypesenseClientError):
3535
pass
36+
37+
38+
class ServiceUnavailable(TypesenseClientError):
39+
pass

0 commit comments

Comments
 (0)