Skip to content

Commit 661d66e

Browse files
committed
- completed IBeanstalkClient implementation
- added some IBeanstalkClient extension methods for string payloads
1 parent 91e1b70 commit 661d66e

12 files changed

+237
-34
lines changed

Beanstalk.Client/Beanstalk.Client.csproj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
<ItemGroup>
4848
<Compile Include="BeanstalkClient.cs" />
4949
<Compile Include="BeanstalkClientExceptions.cs" />
50+
<Compile Include="BeanstalkDefaults.cs" />
51+
<Compile Include="BeanstalkExtensions.cs" />
5052
<Compile Include="Protocol\ISocket.cs" />
5153
<Compile Include="Protocol\IWatchedTubeClient.cs" />
5254
<Compile Include="IWatchedTubeCollection.cs" />
@@ -63,6 +65,7 @@
6365
<Compile Include="PutResponse.cs" />
6466
<Compile Include="ReleaseStatus.cs" />
6567
<Compile Include="ServerStats.cs" />
68+
<Compile Include="Protocol\StatsBase.cs" />
6669
<Compile Include="TubeStats.cs" />
6770
<Compile Include="Protocol\Request.cs" />
6871
<Compile Include="Protocol\RequestData.cs" />

Beanstalk.Client/BeanstalkClient.cs

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,16 @@ namespace Droog.Beanstalk.Client {
2929

3030
// TODO: should query the current and watched tubes at start-up, especially once there are connection pools
3131
public class BeanstalkClient : IBeanstalkClient, IWatchedTubeClient {
32+
3233
public static readonly TimeSpan DefaultConnectTimeout = TimeSpan.FromSeconds(10);
3334

3435
private readonly Func<ISocket> _socketFactory;
36+
private readonly byte[] _buffer = new byte[16 * 1024];
37+
private readonly TubeCollectionProxy _watchedTubes;
38+
private readonly BeanstalkDefaults _defaults = new BeanstalkDefaults();
3539
private ISocket _socket;
3640
private bool _isDisposed;
37-
private readonly byte[] _buffer = new byte[16 * 1024];
3841
private string _currentTube = "default";
39-
private readonly TubeCollectionProxy _watchedTubes;
4042

4143
public BeanstalkClient(IPAddress address, int port)
4244
: this(address, port, DefaultConnectTimeout) {
@@ -127,6 +129,10 @@ public void Connect() {
127129
VerifyConnection();
128130
}
129131

132+
public BeanstalkDefaults Defaults {
133+
get { return _defaults; }
134+
}
135+
130136
public PutResponse Put(uint priority, TimeSpan delay, TimeSpan timeToRun, Stream request, long length) {
131137
var response = Exec(Request.Create(RequestCommand.Put)
132138
.AppendArgument(priority)
@@ -183,39 +189,51 @@ public ReleaseStatus Release(uint jobId, uint priority, TimeSpan delay) {
183189
}
184190

185191
public bool Bury(uint jobId, uint priority) {
186-
throw new NotImplementedException();
192+
var response = Exec(Request.Create(RequestCommand.Bury)
193+
.AppendArgument(jobId)
194+
.AppendArgument(priority)
195+
.ExpectStatuses(ResponseStatus.Buried | ResponseStatus.NotFound));
196+
return response.Status == ResponseStatus.Buried;
187197
}
188198

189199
public bool Touch(uint jobId) {
190-
throw new NotImplementedException();
200+
var response = Exec(Request.Create(RequestCommand.Touch)
201+
.AppendArgument(jobId)
202+
.ExpectStatuses(ResponseStatus.Touched | ResponseStatus.NotFound));
203+
return response.Status == ResponseStatus.Touched;
191204
}
192205

193-
public PeekResponse Peek(uint jobId) {
194-
throw new NotImplementedException();
206+
public Job Peek(uint jobId) {
207+
return Peek(Request.Create(RequestCommand.Peek).AppendArgument(jobId));
195208
}
196209

197-
public PeekResponse PeekReady() {
198-
throw new NotImplementedException();
210+
public Job PeekReady() {
211+
return Peek(Request.Create(RequestCommand.PeekReady));
199212
}
200213

201-
public PeekResponse PeekDelayed() {
202-
throw new NotImplementedException();
214+
public Job PeekDelayed() {
215+
return Peek(Request.Create(RequestCommand.PeekDelayed));
203216
}
204217

205-
public PeekResponse PeekBuried() {
206-
throw new NotImplementedException();
218+
public Job PeekBuried() {
219+
return Peek(Request.Create(RequestCommand.PeekBuried));
207220
}
208221

209222
public uint Kick(uint bound) {
210-
throw new NotImplementedException();
223+
var response = Exec(Request.Create(RequestCommand.Touch)
224+
.AppendArgument(bound)
225+
.ExpectStatuses(ResponseStatus.Kicked));
226+
return uint.Parse(response.Arguments[0]);
211227
}
212228

213229
public JobStats GetJobStats(uint jobId) {
214-
throw new NotImplementedException();
230+
var response = Exec(Request.Create(RequestCommand.StatsJob).ExpectStatuses(ResponseStatus.Ok));
231+
return new JobStats(MicroYaml.ParseDictionary(response));
215232
}
216233

217234
public TubeStats GetTubeStats(string tube) {
218-
throw new NotImplementedException();
235+
var response = Exec(Request.Create(RequestCommand.StatsTube).ExpectStatuses(ResponseStatus.Ok));
236+
return new TubeStats(MicroYaml.ParseDictionary(response));
219237
}
220238

221239
public ServerStats GetServerStats() {
@@ -224,7 +242,8 @@ public ServerStats GetServerStats() {
224242
}
225243

226244
public IEnumerable<string> GetTubes() {
227-
throw new NotImplementedException();
245+
var response = Exec(Request.Create(RequestCommand.ListTubes).ExpectStatuses(ResponseStatus.Ok));
246+
return MicroYaml.ParseList(response);
228247
}
229248

230249
public void Close() {
@@ -253,6 +272,13 @@ IEnumerable<string> IWatchedTubeClient.ListWatchedTubes() {
253272
return MicroYaml.ParseList(response);
254273
}
255274

275+
private Job Peek(Request request) {
276+
var response = Exec(request.ExpectStatuses(ResponseStatus.Found | ResponseStatus.NotFound));
277+
return response.Status == ResponseStatus.NotFound
278+
? null
279+
: new Job(uint.Parse(response.Arguments[0]), response.Data, long.Parse(response.Arguments[1]));
280+
}
281+
256282
private Response Exec(Request request) {
257283
VerifyConnection();
258284
_socket.SendRequest(request, _buffer);
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* libBeanstalk.NET
3+
* Copyright (C) 2010 Arne F. Claassen
4+
* geekblog [at] claassen [dot] net
5+
* http://github.com/sdether/libBeanstalk.NET
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
using System;
21+
22+
namespace Droog.Beanstalk.Client {
23+
public class BeanstalkDefaults {
24+
public uint Priority = ushort.MaxValue;
25+
public TimeSpan Delay = TimeSpan.Zero;
26+
public TimeSpan TimeToRun = TimeSpan.FromMinutes(10);
27+
}
28+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* libBeanstalk.NET
3+
* Copyright (C) 2010 Arne F. Claassen
4+
* geekblog [at] claassen [dot] net
5+
* http://github.com/sdether/libBeanstalk.NET
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
using System;
21+
using System.IO;
22+
using System.Text;
23+
24+
namespace Droog.Beanstalk.Client {
25+
public static class BeanstalkExtensions {
26+
27+
public static PutResponse Put(this IBeanstalkClient client, string data) {
28+
using(var stream = new MemoryStream(Encoding.UTF8.GetBytes(data))) {
29+
return client.Put(client.Defaults.Priority, client.Defaults.Delay, client.Defaults.TimeToRun, stream, stream.Length);
30+
}
31+
}
32+
33+
public static PutResponse Put(this IBeanstalkClient client, string data, uint priority) {
34+
using(var stream = new MemoryStream(Encoding.UTF8.GetBytes(data))) {
35+
return client.Put(priority, client.Defaults.Delay, client.Defaults.TimeToRun, stream, stream.Length);
36+
}
37+
}
38+
39+
public static PutResponse Put(this IBeanstalkClient client, string data, uint priority, TimeSpan delay) {
40+
using(var stream = new MemoryStream(Encoding.UTF8.GetBytes(data))) {
41+
return client.Put(priority, delay, client.Defaults.TimeToRun, stream, stream.Length);
42+
}
43+
}
44+
45+
public static PutResponse Put(this IBeanstalkClient client, string data, uint priority, TimeSpan delay, TimeSpan timeToRun) {
46+
using(var stream = new MemoryStream(Encoding.UTF8.GetBytes(data))) {
47+
return client.Put(priority, delay, timeToRun, stream, stream.Length);
48+
}
49+
}
50+
51+
public static Job<string> Reserve(this IBeanstalkClient client) {
52+
var job = client.Reserve();
53+
using(var reader = new StreamReader(job.Data)) {
54+
return new Job<string>(job.JobId, reader.ReadToEnd());
55+
}
56+
}
57+
58+
public static Job<string> Reserve(this IBeanstalkClient client, TimeSpan timeout) {
59+
var job = client.Reserve(timeout);
60+
if(job == null) {
61+
return null;
62+
}
63+
using(var reader = new StreamReader(job.Data)) {
64+
return new Job<string>(job.JobId, reader.ReadToEnd());
65+
}
66+
}
67+
}
68+
}

Beanstalk.Client/IBeanstalkClient.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ public interface IBeanstalkClient : IDisposable
3535
// Consumer related
3636
string CurrentTube { get; set; }
3737

38+
BeanstalkDefaults Defaults { get; }
39+
3840
PutResponse Put(uint priority, TimeSpan delay, TimeSpan timeToRun, Stream request, long length);
3941

4042
// Producer related
@@ -48,10 +50,10 @@ public interface IBeanstalkClient : IDisposable
4850
bool Touch(uint jobId);
4951

5052
// Misc
51-
PeekResponse Peek(uint jobId);
52-
PeekResponse PeekReady();
53-
PeekResponse PeekDelayed();
54-
PeekResponse PeekBuried();
53+
Job Peek(uint jobId);
54+
Job PeekReady();
55+
Job PeekDelayed();
56+
Job PeekBuried();
5557
uint Kick(uint bound);
5658
JobStats GetJobStats(uint jobId);
5759
TubeStats GetTubeStats(string tube);

Beanstalk.Client/Job.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919

2020
using System.IO;
21+
using System.Text;
2122

2223
namespace Droog.Beanstalk.Client {
2324
public class Job {
@@ -35,4 +36,19 @@ public Job(uint jobId, Stream data, long length) {
3536
public Stream Data { get { return _data; } }
3637
public long DataLength { get { return _length; } }
3738
}
39+
40+
public class Job<T> {
41+
private readonly uint _jobId;
42+
private readonly T _data;
43+
44+
public Job(uint jobId, T data) {
45+
_jobId = jobId;
46+
_data = data;
47+
}
48+
49+
public uint JobId { get { return _jobId; } }
50+
public T Data { get { return _data; } }
51+
}
52+
53+
3854
}

Beanstalk.Client/JobStats.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@
1717
* limitations under the License.
1818
*/
1919

20+
using System.Collections.Generic;
21+
using Droog.Beanstalk.Client.Protocol;
22+
2023
namespace Droog.Beanstalk.Client {
21-
public class JobStats {}
24+
public class JobStats : StatsBase {
25+
public JobStats(IDictionary<string, string> dictionary) : base(dictionary) { }
26+
}
2227
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* libBeanstalk.NET
3+
* Copyright (C) 2010 Arne F. Claassen
4+
* geekblog [at] claassen [dot] net
5+
* http://github.com/sdether/libBeanstalk.NET
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
using System.Collections.Generic;
20+
21+
namespace Droog.Beanstalk.Client.Protocol {
22+
public abstract class StatsBase {
23+
protected readonly IDictionary<string, string> _dictionary;
24+
25+
protected StatsBase(IDictionary<string, string> dictionary) { _dictionary = dictionary; }
26+
27+
public string this[string key] {
28+
get {
29+
string value;
30+
_dictionary.TryGetValue(key, out value);
31+
return value;
32+
}
33+
}
34+
}
35+
}

Beanstalk.Client/ServerStats.cs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,13 @@
1818
*/
1919

2020
using System.Collections.Generic;
21+
using Droog.Beanstalk.Client.Protocol;
2122

2223
namespace Droog.Beanstalk.Client {
23-
public class ServerStats {
24-
private readonly IDictionary<string, string> _dictionary;
24+
public class ServerStats : StatsBase {
2525

26-
public ServerStats(IDictionary<string, string> dictionary) {
27-
_dictionary = dictionary;
28-
}
29-
30-
public string this[string key] {
31-
get {
32-
string value;
33-
_dictionary.TryGetValue(key, out value);
34-
return value;
35-
}
26+
public ServerStats(IDictionary<string, string> dictionary)
27+
: base(dictionary) {
3628
}
3729
}
3830
}

Beanstalk.Client/TubeStats.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@
1717
* limitations under the License.
1818
*/
1919

20+
using System.Collections.Generic;
21+
using Droog.Beanstalk.Client.Protocol;
22+
2023
namespace Droog.Beanstalk.Client {
21-
public class TubeStats {}
24+
public class TubeStats : StatsBase {
25+
public TubeStats(IDictionary<string, string> dictionary) : base(dictionary) { }
26+
}
2227
}

0 commit comments

Comments
 (0)