forked from booksbyus/zguide
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchapter2.txt
1328 lines (1015 loc) · 83.1 KB
/
chapter2.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
.output chapter2.wd
.bookmark sockets-and-patterns
++ Sockets and Patterns
In [#basics] we took 0MQ for a drive, with some basic examples of the main 0MQ patterns: request-reply, publish-subscribe, and pipeline. In this chapter we're going to get our hands dirty and start to learn how to use these tools in real programs.
We'll cover:
* How to create and work with 0MQ sockets.
* How to send and receive messages on sockets.
* How to build your apps around 0MQ's asynchronous I/O model.
* How to handle multiple sockets in one thread.
* How to handle fatal and non-fatal errors properly.
* How to handle interrupt signals like Ctrl-C.
* How to shutdown a 0MQ application cleanly.
* How to check a 0MQ application for memory leaks.
* How to send and receive multi-part messages.
* How to forward messages across networks.
* How to build a simple message queuing broker.
* How to write multithreaded applications with 0MQ.
* How to use 0MQ to signal between threads.
* How to use 0MQ to coordinate a network of nodes.
* How to create and use message envelopes for publish-subscribe.
* Using the high-water mark (HWM) to protect against memory overflows.
+++ The Socket API
To be perfectly honest, 0MQ does a kind of switch-and-bait on you. Which we don't apologize for, it's for your own good and hurts us more than it hurts you. It presents a familiar socket-based API, which requires great effort for us to hide a bunch of message-processing engines. However, the result will slowly fix your world-view about how to design and write distributed software.
Sockets are the de-facto standard API for network programming, as well as being useful for stopping your eyes from falling onto your cheeks. One thing that makes 0MQ especially tasty to developers is that it uses sockets and messages instead of some other arbitrary set of concepts. Kudos to Martin Sustrik for pulling this off. It turns "Message Oriented Middleware", a phrase guaranteed to send the whole room off to Catatonia, into "Extra Spicy Sockets!" which leaves us with a strange craving for pizza, and a desire to know more.
Like a favorite dish, 0MQ sockets are easy to digest. Sockets have a life in four parts, just like BSD sockets:
* Creating and destroying sockets, which go together to form a karmic circle of socket life (see {{zmq-socket[3]}}, {{zmq-close[3]).
* Configuring sockets by setting options on them and checking them if necessary (see {{zmq-setsockopt[3]}}, {{zmq-getsockopt[3]).
* Plugging sockets onto the network topology by creating 0MQ connections to and from them (see {{zmq-bind[3]}}, {{zmq-connect[3]).
* Using the sockets to carry data by writing and receiving messages on them (see {{zmq-msg-send[3]}}, {{zmq-msg-recv[3]).
Note that sockets are always void pointers, and messages (which we'll come to very soon) are structures. So in C you pass sockets as-such, but you pass addresses of messages in all functions that work with messages, like {{zmq-msg-send[3]}} and {{zmq-msg-recv[3]}}. As a mnemonic, realize that "in 0MQ all your sockets are belong to us", but messages are things you actually own in your code.
Creating, destroying, and configuring sockets works as you'd expect for any object. But remember that 0MQ is an asynchronous, elastic fabric. This has some impact on how we plug sockets into the network topology, and how we use the sockets after that.
++++ Plugging Sockets Into the Topology
To create a connection between two nodes you use {{zmq-bind[3]}} in one node, and {{zmq-connect[3]}} in the other. As a general rule of thumb, the node which does {{zmq-bind[3]}} is a "server", sitting on a well-known network address, and the node which does {{zmq-connect[3]}} is a "client", with unknown or arbitrary network addresses. Thus we say that we "bind a socket to an endpoint" and "connect a socket to an endpoint", the endpoint being that well-known network address.
0MQ connections are somewhat different from old-fashioned TCP connections. The main notable differences are:
* They go across an arbitrary transport ({{inproc}}, {{ipc}}, {{tcp}}, {{pgm}} or {{epgm}}). See {{zmq-inproc[7]}}, {{zmq-ipc[7]}}, {{zmq-tcp[7]}}, {{zmq-pgm[7]}}, and {{zmq-epgm[7]}}.
* One socket may have many outgoing and many incoming connections.
* There is no {{zmq-accept() method. When a socket is bound to an endpoint it automatically starts accepting connections.
* The network connection itself happens in the background, and 0MQ will automatically re-connect if the network connection is broken (e.g. if the peer disappears and then comes back).
* Your application code cannot work with these connections directly; they are encapsulated under the socket.
Many architectures follow some kind of client-server model, where the server is the component that is most static, and the clients are the components that are most dynamic, i.e. they come and go the most. There are sometimes issues of addressing: servers will be visible to clients, but not necessarily vice-versa. So mostly it's obvious which node should be doing {{zmq-bind[3]}} (the server) and which should be doing {{zmq-connect[3]}} (the client). It also depends on the kind of sockets you're using, with some exceptions for unusual network architectures. We'll look at socket types later.
Now, imagine we start the client //before// we start the server. In traditional networking we get a big red Fail flag. But 0MQ lets us start and stop pieces arbitrarily. As soon as the client node does {{zmq-connect[3]}} the connection exists and that node can start to write messages to the socket. At some stage (hopefully before messages queue up so much that they start to get discarded, or the client blocks), the server comes alive, does a {{zmq-bind[3]}} and 0MQ starts to deliver messages.
A server node can bind to many endpoints (that is, a combination of protocol and address) and it can do this using a single socket. This means it will accept connections across different transports:
[[code type="fragment" name="binding"]]
zmq-bind (socket, "tcp://*:5555");
zmq-bind (socket, "tcp://*:9999");
zmq-bind (socket, "inproc://somename");
[[/code]]
With most transports you cannot bind to the same endpoint twice, unlike for example in UDP. The {{ipc}} transport does however let one process bind to an endpoint already used by a first process. It's meant to allow a process to recover after a crash.
Although 0MQ tries to be neutral about which side binds, and which side connects, there are differences. We'll see these in more detail later. The upshot is that you should usually think in terms of "servers" as static parts of your topology, that bind to more-or-less fixed endpoints, and "clients" as dynamic parts that come and go and connect to these endpoints. Then, design your application around this model. The chances that it will "just work" are much better like that.
Sockets have types. The socket type defines the semantics of the socket, its policies for routing messages inwards and outwards, queuing, etc. You can connect certain types of socket together, e.g. a publisher socket and a subscriber socket. Sockets work together in "messaging patterns". We'll look at this in more detail later.
It's the ability to connect sockets in these different ways that gives 0MQ its basic power as a message queuing system. There are layers on top of this, such as proxies, which we'll get to later. But essentially, with 0MQ you define your network architecture by plugging pieces together like a child's construction toy.
++++ Using Sockets to Carry Data
To send and receive messages you use the {{zmq-msg-send[3]}} and {{zmq-msg-recv[3]}} methods. The names are conventional but 0MQ's I/O model is different enough from the TCP model!figref() that you will need time to get your head around it.
[[code type="textdiagram" title="TCP sockets are 1 to 1"]]
+------------+
| |
| Node |
| |
+------------+
| Socket |
\------------/
^
|
1 to 1
|
v
/------------\
| Socket |
+------------+
| |
| Node |
| |
+------------+
[[/code]]
Let's look at the main differences between TCP sockets and 0MQ sockets when it comes to working with data:
* 0MQ sockets carry messages, like UDP, rather than a stream of bytes as TCP does. A 0MQ message is length-specified binary data. We'll come to messages shortly, their design is optimized for performance and so a little tricky.
* 0MQ sockets do their I/O in a background thread. This means that messages arrive in local input queues, and are sent from local output queues, no matter what your application is busy doing.
* 0MQ sockets have one-to-N routing behavior built-in, according to the socket type.
The {{zmq-msg-send[3]}} method does not actually send the message to the socket connection(s). It queues the message so that the I/O thread can send it asynchronously. It does not block except in some exception cases. So the message is not necessarily sent when {{zmq-msg-send[3]}} returns to your application. If you created a message using {{zmq-msg-init-data[3]}} you cannot reuse the data or free it, otherwise the I/O thread will rapidly find itself writing overwritten or unallocated garbage. This is a common mistake for beginners. We'll see a little later how to properly work with messages.
++++ Unicast Transports
0MQ provides a set of unicast transports ({{inproc}}, {{ipc}}, and {{tcp}}) and multicast transports (epgm, pgm). Multicast is an advanced technique that we'll come to later. Don't even start using it unless you know that your fanout ratios will make 1-to-N unicast impossible.
For most common cases, use **{{tcp}}**, which is a //disconnected TCP// transport. It is elastic, portable, and fast enough for most cases. We call this 'disconnected' because 0MQ's {{tcp}} transport doesn't require that the endpoint exists before you connect to it. Clients and servers can connect and bind at any time, can go and come back, and it remains transparent to applications.
The inter-process {{ipc}} transport is disconnected, like {{tcp}}. It has one limitation: it does not yet work on Windows. By convention we use endpoint names with an ".ipc" extension to avoid potential conflict with other file names. On UNIX systems, if you use {{ipc}} endpoints you need to create these with appropriate permissions otherwise they may not be shareable between processes running under different user IDs. You must also make sure all processes can access the files, e.g. by running in the same working directory.
The inter-thread transport, **{{inproc}}**, is a connected signaling transport. It is much faster than {{tcp}} or {{ipc}}. This transport has a specific limitation compared to {{ipc}} and {{tcp}}: **the server must issue a bind before any client issues a connect**. This is something future versions of 0MQ may fix, but at present this defines you use {{inproc}} sockets. We create and bind one socket, start the child threads, which create and connect the other sockets.
++++ 0MQ is Not a Neutral Carrier
A common question that newcomers to 0MQ ask (it's one I asked myself) is, "how do I write an XYZ server in 0MQ?" For example, "how do I write an HTTP server in 0MQ?" The implication is that if we use normal sockets to carry HTTP requests and responses, we should be able to use 0MQ sockets to do the same, only much faster and better.
The answer used to be "this is not how it works". 0MQ is not a neutral carrier, it imposes a framing on the transport protocols it uses. This framing is not compatible with existing protocols, which tend to use their own framing. For example, compare an HTTP request!figref(), and a 0MQ request!figref(), both over TCP/IP.
The HTTP request uses CR-LF as its simplest framing delimiter, whereas 0MQ uses a length-specified frame.
[[code type="textdiagram" title="HTTP On the Wire"]]
+----------------+----+----+----+----+
| GET /index.html| 13 | 10 | 13 | 10 |
+----------------+----+----+----+----+
[[/code]]
[[code type="textdiagram" title="0MQ On the Wire"]]
+---+---+---+---+---+---+
| 5 | H | E | L | L | O |
+---+---+---+---+---+---+
[[/code]]
So you could write a HTTP-like protocol using 0MQ, using for example the request-reply socket pattern. But it would not be HTTP.
Since 0MQ/3.3, however, the {{ZMQ_ROUTER_RAW}} socket option lets you read and write TCP bytes without the 0MQ framing. Hardeep Singh contributed this change so that he could connect to Telnet servers from his 0MQ application. This is still, at time of writing, somewhat experimental, but it shows how 0MQ keeps evolving to solve new problems. Maybe the next patch will be yours.
++++ I/O Threads
We said that 0MQ does I/O in a background thread. One I/O thread (for all sockets) is sufficient for all but the most extreme applications. When you create a new context it starts with one I/O thread. The general rule of thumb is to allow one I/O thread per gigabyte of data in or out per second. To raise the number of I/O threads, use the {{zmq-ctx-set[3]}} call //before// creating any sockets:
[[code type="fragment" name="iothreads"]]
int io-threads = 4;
void *context = zmq-ctx-new ();
zmq-ctx-set (context, ZMQ-IO-THREADS, io-threads);
assert (zmq-ctx-get (context, ZMQ-IO-THREADS) == io-threads);
[[/code]]
We've seen that one socket can handle many (dozens, thousands of) connections at once. This has a fundamental impact on how you write applications. A traditional networked application has one process or one thread per remote connection, and that process or thread handles one socket. 0MQ lets you collapse this entire structure into a single process, and then break it up as necessary for scaling.
If you are using 0MQ for inter-thread communications only, i.e. a multithreaded application that does no external socket I/O, you can set the I/O threads to zero. It's not a significant optimization though, more of a curiosity.
+++ Messaging Patterns
Underneath the brown paper wrapping of 0MQ's socket API lies the world of messaging patterns. If you have a background in enterprise messaging, or know UDP well, these will be vaguely familiar. But to most 0MQ newcomers they are a surprise, we're so used to the TCP paradigm where a socket maps one-to-one to another node.
Let's recap briefly what 0MQ does for you. It delivers blobs of data (messages) to nodes, quickly and efficiently. You can map nodes to threads, processes, or nodes. 0MQ gives your applications a single socket API to work with, no matter what the actual transport (like in-process, inter-process, TCP, or multicast). It automatically reconnects to peers as they come and go. It queues messages at both sender and receiver, as needed. It manages these queues carefully to ensure processes don't run out of memory, overflowing to disk when appropriate. It handles socket errors. It does all I/O in background threads. It uses lock-free techniques for talking between nodes, so there are never locks, waits, semaphores, or deadlocks.
But cutting through that, it routes and queues messages according to precise recipes called //patterns//. It is these patterns that provide 0MQ's intelligence. They encapsulate our hard-earned experience of the best ways to distribute data and work. 0MQ's patterns are hard-coded but future versions may allow user-definable patterns.
0MQ patterns are implemented by pairs of sockets with matching types. In other words, to understand 0MQ patterns you need to understand socket types and how they work together. Mostly this just takes study, there is little that is obvious at this level.
The built-in core 0MQ patterns are:
* **Request-reply**, which connects a set of clients to a set of services. This is a remote procedure call and task distribution pattern.
* **Publish-subscribe**, which connects a set of publishers to a set of subscribers. This is a data distribution pattern.
* **Pipeline**, which connects nodes in a fan-out / fan-in pattern that can have multiple steps, and loops. This is a parallel task distribution and collection pattern.
We looked at each of these in the first chapter. There's one more pattern that people tend to try to use when they still think of 0MQ in terms of traditional TCP sockets: **Exclusive pair**, which connects two sockets exclusively. This is a pattern you should use only to connect two threads in a process. We'll see an example at the end of this chapter.
The {{zmq-socket[3]}} man page is fairly clear about the patterns, it's worth reading several times until it starts to make sense. These are the socket combinations that are valid for a connect-bind pair (either side can bind):
* PUB and SUB
* REQ and REP
* REQ and ROUTER
* DEALER and REP
* DEALER and ROUTER
* DEALER and DEALER
* ROUTER and ROUTER
* PUSH and PULL
* PAIR and PAIR
You'll also see references to XPUB and XSUB sockets, which we'll come to later (they're like raw versions of PUB and SUB). Any other combination will produce undocumented and unreliable results and future versions of 0MQ will probably return errors if you try them. You can and will of course bridge other socket types //via code//, i.e. read from one socket type and write to another.
++++ High-level Messaging Patterns
These four core patterns are cooked-in to 0MQ. They are part of the 0MQ API, implemented in the core C++ library, and guaranteed to be available in all fine retail stores.
On top, we add //high-level patterns//. We build these high-level patterns on top of 0MQ and implement them in whatever language we're using for our application. They are not part of the core library, do not come with the 0MQ package, and exist in their own space, as part of the 0MQ community. For example the Majordomo pattern, which we explore in [#reliable-request-reply], sits in the GitHub Majordomo project in the ZeroMQ organization.
One of the things we aim to provide you with this guide are a set of such high-level patterns, both small (how to handle messages sanely) to large (how to make a reliable publish-subscribe architecture).
++++ Working with Messages
On the wire, 0MQ messages are blobs of any size from zero upwards, fitting in memory. You do your own serialization using protobufs, msgpack, JSON, or whatever else your applications need to speak. It's wise to choose a data representation that is portable and fast, but you can make your own decisions about trade-offs.
In memory, 0MQ messages are {{zmq-msg-t}} structures (or classes depending on your language). Here are the basic ground rules for using 0MQ messages in C:
* You create and pass around {{zmq-msg-t}} objects, not blocks of data.
* To read a message you use {{zmq-msg-init[3]}} to create an empty message, and then you pass that to {{zmq-msg-recv[3]}}.
* To write a message from new data, you use {{zmq-msg-init-size[3]}} to create a message and at the same time allocate a block of data of some size. You then fill that data using {{memcpy}}, and pass the message to {{zmq-msg-send[3]}}.
* To release (not destroy) a message you call {{zmq-msg-close[3]}}. This drops a reference, and eventually 0MQ will destroy the message.
* To access the message content you use {{zmq-msg-data[3]}}. To know how much data the message contains, use {{zmq-msg-size[3]}}.
* Do not use {{zmq-msg-move[3]}}, {{zmq-msg-copy[3]}}, or {{zmq-msg-init-data[3]}} unless you read the man pages and know precisely why you need these.
Here is a typical chunk of code working with messages, which should be familiar if you have been paying attention. This is from the {{zhelpers.h}} file we use in all the examples:
[[code language="C"]]
// Receive 0MQ string from socket and convert into C string
static char *
s-recv (void *socket) {
zmq-msg-t message;
zmq-msg-init (&message);
int size = zmq-msg-recv (&message, socket, 0);
if (size == -1)
return NULL;
char *string = malloc (size + 1);
memcpy (string, zmq-msg-data (&message), size);
zmq-msg-close (&message);
string [size] = 0;
return (string);
}
// Convert C string to 0MQ string and send to socket
static int
s-send (void *socket, char *string) {
zmq-msg-t message;
zmq-msg-init-size (&message, strlen (string));
memcpy (zmq-msg-data (&message), string, strlen (string));
int size = zmq-msg-send (&message, socket, 0);
zmq-msg-close (&message);
return (size);
}
[[/code]]
You can easily extend this code to send and receive blobs of arbitrary length.
NOTE:
After you pass a message to {{zmq-msg-send[3]}}, ØMQ will clear the message, i.e., set the size to zero. You cannot send the same message twice, and you cannot access the message data after sending it.
If you want to send the same message more than once, create a second message, initialize it using {{zmq-msg-init[3]}} and then use {{zmq-msg-copy[3]}} to create a copy of the first message. This does not copy the data but the reference. You can then send the message twice (or more, if you create more copies) and the message will only be finally destroyed when the last copy is sent or closed.
0MQ also supports //multi-part// messages, which let you send or receive a list of frames as a single on-the-wire message. This is widely used in real applications and we'll look at that later in this chapter and in [#advanced-request-reply].
Frames (also called "message parts" in the 0MQ reference manual pages) are the basic wire format for 0MQ messages. A frame is a length-specified block of data. The length can be zero upwards. If you've done any TCP programming you'll appreciate why frames are a useful answer to the question "how much data am I supposed to read of this network socket now?"
There is a wire-level [http://rfc.zeromq.org/spec:15 protocol called ZMTP] that defines how 0MQ reads and writes frames on a TCP connection. If you're interested in how this works, the spec is quite short, just a few pages.
Originally, a 0MQ message was one frame, like UDP. We later extended this with "multipart" messages, which are quite simply series of frames with a "more" bit set to one, followed by one with that bit set to zero. The 0MQ API then lets you write messages with a "more" flag, and when you read messages, lets you check if there's "more".
In the low-level 0MQ API and the reference manual, therefore, there's some fuzziness about messages vs. frames. So here's a useful lexicon:
* A message can be one or more parts.
* These parts are also called "frames".
* Each part is a zmq_msg_t object.
* You send and receive each part separately, in the low-level API.
* Higher-level APIs provide wrappers to send entire multi-part messages.
Some other things that are worth knowing about messages:
* You may send zero-length messages, e.g. for sending a signal from one thread to another.
* 0MQ guarantees to deliver all the parts (one or more) for a message, or none of them.
* 0MQ does not send the message (single, or multi-part) right away but at some indeterminate later time. A multi-part message must therefore fit in memory.
* A message (single, or multi-part) must fit in memory. If you want to send files of arbitrary sizes, you should break them into pieces and send each piece as separate single-part messages.
* You must call {{zmq-msg-close[3]}} when finished with a message, in languages that don't automatically destroy objects when a scope closes.
And to be necessarily repetitive, do not use {{zmq-msg-init-data[3]}}, yet. This is a zero-copy method and guaranteed to create trouble for you. There are far more important things to learn about 0MQ before you start to worry about shaving off microseconds.
++++ Handling Multiple Sockets
In all the examples so far, the main loop of most examples has been:
# wait for message on socket
# process message
# repeat
What if we want to read from multiple endpoints at the same time? The simplest way is to connect one socket to all the endpoints and get 0MQ to do the fan-in for us. This is legal if the remote endpoints are in the same pattern but it would be wrong to e.g. connect a PULL socket to a PUB endpoint.
To actually read from multiple sockets at once, use {{zmq-poll[3]}}. An even better way might be to wrap {{zmq-poll[3]}} in a framework that turns it into a nice event-driven //reactor//, but it's significantly more work than we want to cover here.
Let's start with a dirty hack, partly for the fun of not doing it right, but mainly because it lets me show you how to do non-blocking socket reads. Here is a simple example of reading from two sockets using non-blocking reads. This rather confused program acts both as a subscriber to weather updates, and a worker for parallel tasks:
[[code type="example" title="Multiple socket reader" name="msreader"]]
[[/code]]
The cost of this approach is some additional latency on the first message (the sleep at the end of the loop, when there are no waiting messages to process). This would be a problem in applications where sub-millisecond latency was vital. Also, you need to check the documentation for nanosleep() or whatever function you use to make sure it does not busy-loop.
You can treat the sockets fairly by reading first from one, then the second rather than prioritizing them as we did in this example.
Now let's see the same little senseless application done right, using {{zmq-poll[3]}}:
[[code type="example" title="Multiple socket poller" name="mspoller"]]
[[/code]]
The items structure has these four members:
[[code language="C"]]
typedef struct {
void *socket; // 0MQ socket to poll on
int fd; // OR, native file handle to poll on
short events; // Events to poll on
short revents; // Events returned after poll
} zmq-pollitem-t;
[[/code]]
++++ Multi-part Messages
0MQ lets us compose a message out of several frames, giving us a "multi-part message". Realistic applications use multi-part messages heavily, both for wrapping messages with address information, and for simple serialization. We'll look at reply envelopes later.
What we'll learn now is simply how to safely (but blindly) read and write multi-part messages in any application (like a proxy) that needs to forward messages without inspecting them.
When you work with multi-part messages, each part is a {{zmq-msg}} item. E.g. if you are sending a message with five parts, you must construct, send, and destroy five {{zmq-msg}} items. You can do this in advance (and store the {{zmq-msg}} items in an array or structure), or as you send them, one by one.
Here is how we send the frames in a multi-part message (we receive each frame into a message object):
[[code type="fragment" name="sendmore"]]
zmq-msg-send (socket, &message, ZMQ-SNDMORE);
...
zmq-msg-send (socket, &message, ZMQ-SNDMORE);
...
zmq-msg-send (socket, &message, 0);
[[/code]]
Here is how we receive and process all the parts in a message, be it single part or multi-part:
[[code type="fragment" name="recvmore"]]
while (1) {
zmq-msg-t message;
zmq-msg-init (&message);
zmq-msg-recv (socket, &message, 0);
// Process the message frame
zmq-msg-close (&message);
int-t more;
size-t more-size = sizeof (more);
zmq-getsockopt (socket, ZMQ-RCVMORE, &more, &more-size);
if (!more)
break; // Last message frame
}
[[/code]]
Some things to know about multi-part messages:
* When you send a multi-part message, the first part (and all following parts) are only actually sent on the wire when you send the final part.
* If you are using {{zmq-poll[3]}}, when you receive the first part of a message, all the rest has also arrived.
* You will receive all parts of a message, or none at all.
* Each part of a message is a separate {{zmq-msg}} item.
* You will receive all parts of a message whether or not you check the RCVMORE option.
* On sending, 0MQ queues message frames in memory until the last is received, then sends them all.
* There is no way to cancel a partially sent message, except by closing the socket.
++++ Intermediaries and Proxies
0MQ aims for decentralized intelligence but that doesn't mean your network is empty space in the middle. It's filled with message-aware infrastructure and quite often, we build that infrastructure with 0MQ. The 0MQ plumbing can range from tiny pipes to full-blown service-oriented brokers. The messaging industry calls this "intermediation", meaning that the stuff in the middle deals with either side. In 0MQ we call these proxies, queues, forwarders, device, or brokers, depending on the context.
This pattern is extremely common in the real world and is why our societies and economies are filled with intermediaries who have no other real function than to reduce the complexity and scaling costs of larger networks. Real-world intermediaries are typically called wholesalers, distributors, managers, etc.
++++ The Dynamic Discovery Problem
One of the problems you will hit as you design larger distributed architectures is discovery. That is, how do pieces know about each other? It's especially difficult if pieces come and go, thus we can call this the "dynamic discovery problem".
There are several solutions to dynamic discovery. The simplest is to entirely avoid it by hard-coding (or configuring) the network architecture so discovery is done by hand. That is, when you add a new piece, you reconfigure the network to know about it.
In practice this leads to increasingly fragile and hard-to-manage architectures. Let's say you have one publisher and a hundred subscribers. You connect each subscriber to the publisher by configuring a publisher endpoint in each subscriber. That's easy!figref(). Subscribers are dynamic, the publisher is static. Now say you add more publishers. Suddenly it's not so easy any more. If you continue to connect each subscriber to each publisher, the cost of avoiding dynamic discovery gets higher and higher.
[[code type="textdiagram" title="Small-scale Pub-Sub Network"]]
+-----------+
| |
| Publisher |
| |
+-----------+
| PUB |
\-----------/
bind
tcp://192.168.55.210:5556
|
|
+----------------+----------------+
| | |
| | |
connect connect connect
/------------\ /------------\ /------------\
| SUB | | SUB | | SUB |
+------------+ +------------+ +------------+
| | | | | |
| Subscriber | | Subscriber | | Subscriber |
| | | | | |
+------------+ +------------+ +------------+
[[/code]]
There are quite a few answers to this but the very simplest answer is to add an intermediary, that is, a static point in the network to which all other nodes connect. In classic messaging, this is the job of the "message broker". 0MQ doesn't come with a message broker as such, but it lets us build intermediaries quite easily.
You might wonder, if all networks eventually get large enough to need intermediaries, why don't we simply have a message broker in place for all applications? For beginners, it's a fair compromise. Just always use a star topology, forget about performance, and things will usually work. However message brokers are greedy things; in their role as central intermediaries, they become too complex, too stateful, and eventually a problem.
It's better to think of intermediaries as simple stateless message switches. The best analogy is an HTTP proxy; it's there but doesn't have any special role. Adding a pub-sub proxy solves the dynamic discovery problem in our example. We set the proxy in the "middle" of the network!figref(). The proxy opens an XSUB socket, an XPUB socket, and binds each to well-known IP addresses and ports. Then all other processes connect to the proxy, instead of to each other. It becomes trivial to add more subscribers or publishers.
[[code type="textdiagram" title="Pub-Sub Network with a Proxy"]]
+------------+ +------------+ +------------+
| | | | | |
| Publisher | | Publisher | | Publisher |
| | | | | |
+------------+ +------------+ +------------+
| PUB | | PUB | | PUB |
\------------/ \------------/ \------------/
connect connect connect
| | |
| | |
+----------------+----------------+
|
|
bind
/------------\
| XSUB |
+------------+
| |
| Proxy |
| |
+------------+
| XPUB |
\------------/
bind
|
|
+----------------+----------------+
| | |
| | |
connect connect connect
/------------\ /------------\ /------------\
| SUB | | SUB | | SUB |
+------------+ +------------+ +------------+
| | | | | |
| Subscriber | | Subscriber | | Subscriber |
| | | | | |
+------------+ +------------+ +------------+
[[/code]]
We need XPUB and XSUB sockets because 0MQ does subscription forwarding: SUB sockets actually send subscriptions to PUB sockets as special messages. The proxy has to forward these as well, by reading them from the XPUB socket and writing them to the XSUB socket. This is the main use-case for XSUB and XPUB!figref().
[[code type="textdiagram" title="Extended Publish-Subscribe"]]
+---------+ +---------+ +---------+
| PUB | | PUB | | PUB |
\----+----/ \----+----/ \----+----/
| | |
| | |
+-------------+-------------+
|
|
/-----+-----\
| XSUB |
+-----------+
| code |
+-----------+
| XPUB |
\-----+-----/
|
|
+-------------+-------------+
| | |
| | |
/----+----\ /----+----\ /----+----\
| SUB | | SUB | | SUB |
+---------+ +---------+ +---------+
[[/code]]
++++ Shared Queue (DEALER and ROUTER sockets)
In the Hello World client-server application we have one client that talks to one service. However in real cases we usually need to allow multiple services as well as multiple clients. This lets us scale up the power of the service (many threads or processes or nodes rather than just one). The only constraint is that services must be stateless, all state being in the request or in some shared storage such as a database.
There are two ways to connect multiple clients to multiple servers. The brute-force way is to connect each client socket to multiple service endpoints. One client socket can connect to multiple service sockets, and the REQ socket will then distribute requests among these services. Let's say you connect a client socket to three service endpoints, A, B, and C. The client makes requests R1, R2, R3, R4. R1 and R4 go to service A, R2 goes to B, and R3 goes to service C!figref().
[[code type="textdiagram" title="Request Distribution"]]
+-----------+
| |
| Client |
| |
+-----------+
| REQ |
\-----+-----/
|
R1, R2, R3, R4
|
+-------------+-------------+
| | |
R1, R4 R2 R3
| | |
v v v
/---------\ /---------\ /---------\
| REP | | REP | | REP |
+---------+ +---------+ +---------+
| | | | | |
| Service | | Service | | Service |
| A | | B | | C |
| | | | | |
+---------+ +---------+ +---------+
[[/code]]
This design lets you add more clients cheaply. You can also add more services. Each client will distribute its requests to the services. But each client has to know the service topology. If you have 100 clients and then you decide to add three more services, you need to reconfigure and restart 100 clients in order for the clients to know about the three new services.
That's clearly not the kind of thing we want to be doing at 3am when our supercomputing cluster has run out of resources and we desperately need to add a couple of hundred new service nodes. Too many static pieces are like liquid concrete: knowledge is distributed and the more static pieces you have, the more effort it is to change the topology. What we want is something sitting in between clients and services that centralizes all knowledge of the topology. Ideally, we should be able to add and remove services or clients at any time without touching any other part of the topology.
So we'll write a little message queuing broker that gives us this flexibility. The broker binds to two endpoints, a frontend for clients and a backend for services. It then uses {{zmq-poll[3]}} to monitor these two sockets for activity and when it has some, it shuttles messages between its two sockets. It doesn't actually manage any queues explicitly -- 0MQ does that automatically on each socket.
When you use REQ to talk to REP you get a strictly synchronous request-reply dialog. The client sends a request. The service reads the request and sends a reply. The client then reads the reply. If either the client or the service try to do anything else (e.g. sending two requests in a row without waiting for a response) they will get an error.
But our broker has to be non-blocking. Obviously we can use {{zmq-poll[3]}} to wait for activity on either socket, but we can't use REP and REQ.
Luckily there are two sockets called DEALER and ROUTER that let you do non-blocking request-response. You'll see in [#advanced-request-reply] how DEALER and ROUTER sockets let you build all kinds of asynchronous request-reply flows. For now, we're just going to see how DEALER and ROUTER let us extend REQ-REP across an intermediary, that is, our little broker.
In this simple extended request-reply pattern, REQ talks to ROUTER and DEALER talks to REP. In between the DEALER and ROUTER we have to have code (like our broker) that pulls messages off the one socket and shoves them onto the other!figref().
[[code type="textdiagram" title="Extended Request-reply"]]
+---------+ +---------+ +---------+
| REQ | | REQ | | REQ |
\----+----/ \----+----/ \----+----/
| | |
| | |
+-------------+-------------+
|
|
/-----+-----\
| ROUTER |
+-----------+
| code |
+-----------+
| DEALER |
\-----+-----/
|
|
+-------------+-------------+
| | |
| | |
/----+----\ /----+----\ /----+----\
| REP | | REP | | REP |
+---------+ +---------+ +---------+
[[/code]]
The request-reply broker binds to two endpoints, one for clients to connect to (the frontend socket) and one for workers to connect to (the backend). To test this broker, you will want to change your workers so they connect to the backend socket. Here is a client that shows what I mean:
[[code type="example" title="Request-reply client" name="rrclient"]]
[[/code]]
Here is the worker:
[[code type="example" title="Request-reply worker" name="rrworker"]]
[[/code]]
And here is the broker, which properly handles multi-part messages:
[[code type="example" title="Request-reply broker" name="rrbroker"]]
[[/code]]
Using a request-reply broker makes your client-server architectures easier to scale since clients don't see workers, and workers don't see clients. The only static node is the broker in the middle!figref().
[[code type="textdiagram" title="Request-reply Broker"]]
+---------+ +---------+ +---------+
| | | | | |
| Client | | Client | | Client |
| | | | | |
+---------+ +---------+ +---------+
| REQ | | REQ | | REQ |
\---------/ \---------/ \---------/
connect connect connect
| | |
| | |
request request request
| | |
+-------------+-------------+
|
fair-queuing
|
v
bind
/-----------\
| ROUTER |
+-----------+
| |
| Broker |
| |
+-----------+
| DEALER |
\-----------/
bind
|
load balancing
|
+-------------+-------------+
| | |
request request request
| | |
v v v
connect connect connect
/---------\ /---------\ /---------\
| REP | | REP | | REP |
+---------+ +---------+ +---------+
| | | | | |
| Service | | Service | | Service |
| A | | B | | C |
| | | | | |
+---------+ +---------+ +---------+
[[/code]]
++++ 0MQ's Built-in Proxy Function
It turns out that the core loop in the previous section's rrbroker is very useful, and reusable. It lets us build pub-sub forwarders and shared queues and other little intermediaries, with very little effort. 0MQ wraps this up in a single method, {{zmq-proxy[3]}}:
[[code type="fragment" name="proxy"]]
zmq-proxy (frontend, backend, capture);
[[/code]]
The two (or three sockets, if we want to capture data) must be properly connected, bound, configured. When we call the {{zmq-proxy}} method it's exactly like starting the main loop of rrbroker. Let's rewrite the request-reply broker to call {{zmq-proxy}}, and re-badge this as an expensive-sounding "message queue" (people have charged houses for code that did less):
[[code type="example" title="Message queue broker" name="msgqueue"]]
[[/code]]
If you're like most 0MQ users, at this stage your mind is starting to think, "what kind of evil stuff can I do if I plug random socket types into the proxy?" The short answer is: try it and work out what is happening. In practice you would usually stick to ROUTER/DEALER, XSUB/XPUB, or PULL/PUSH.
++++ Transport Bridging
A frequent request from 0MQ users is "how do I connect my 0MQ network with technology X?" where X is some other networking or messaging technology. The simple answer is to build a "bridge". A bridge is a small application that speaks one protocol at one socket, and converts to/from a second protocol at another socket. A protocol interpreter, if you like. A common bridging problem in 0MQ is to bridge two transports or networks.
As an example, we're going to write a little proxy that sits in between a publisher and a set of subscribers, bridging two networks. The frontend socket (SUB) faces the internal network, where the weather server is sitting, and the backend (PUB) faces subscribers on the external network. It subscribes to the weather service on the frontend socket, and republishes its data on the backend socket!figref().
[[code type="example" title="Weather update proxy" name="wuproxy"]]
[[/code]]
It looks very similar to the earlier proxy example but the key part is that the frontend and backend sockets are on two different networks. We can use this model for example to connect a multicast network ({{pgm}} transport) to a TCP publisher.
[[code type="textdiagram" title="Pub-Sub Forwarder Proxy"]]
+-----------+
| |
| Publisher |
| |
+-----------+
| PUB |
\-----------/
bind
tcp://192.168.55.210:5556
|
|
+----------------+----------------+
| | |
| | |
connect connect |
/------------\ /------------\ connect
| SUB | | SUB | /------------\
+------------+ +------------+ | XSUB |
| | | | +------------+
| Subscriber | | Subscriber | | |
| | | | | Forwarder |
+------------+ +------------+ | |
+------------+
Internal network | XPUB |
---------------------------------\------------/--------
External network bind
tcp://10.1.1.0:8100
|
|
+--------+--------+
| |
| |
connect connect
/------------\ /------------\
| SUB | | SUB |
+------------+ +------------+
| | | |
| Subscriber | | Subscriber |
| | | |
+------------+ +------------+
[[/code]]
+++ Handling Errors and ETERM
0MQ's error handling philosophy is a mix of fail-fast and resilience. Processes, we believe, should be as vulnerable as possible to internal errors, and as robust as possible against external attacks and errors. To give an analogy, a living cell will self-destruct if it detects a single internal error, yet it will resist attack from the outside by all means possible.
Assertions, which pepper the 0MQ code, are absolutely vital to robust code, they just have to be on the right side of the cellular wall. And there should be such a wall. If it is unclear whether a fault is internal or external, that is a design flaw to be fixed. In C/C++, assertions stop the application immediately with an error. In other languages you may get exceptions or halts.
When 0MQ detects an external fault it returns an error to the calling code. In some rare cases it drops messages silently, if there is no obvious strategy for recovering from the error.
In most of the C examples we've seen so far there's been no error handling. **Real code should do error handling on every single 0MQ call**. If you're using a language binding other than C, the binding may handle errors for you. In C you do need to do this yourself. There are some simple rules, starting with POSIX conventions:
* Methods that create objects return NULL if they fail.
* Methods that process data may return the number of bytes processed, or -1 on an error or failure.
* Other methods return 0 on success and -1 on an error or failure.
* The error code is provided in {{errno}} or {{zmq-errno[3]}}.
* A descriptive error text for logging is provided by {{zmq-strerror[3]}}.
For example:
[[code type="fragment" name="errorhandling"]]
void *context = zmq-ctx-new ();
assert (context);
void *socket = zmq-socket (context, ZMQ-REP);
assert (socket);
int rc = zmq-bind (socket, "tcp://*:5555");
if (rc != 0) {
printf ("E: bind failed: %s\n", strerror (errno));
return -1;
}
[[/code]]
There are two main exceptional conditions that you may want to handle as non-fatal:
* When a thread calls {{zmq-msg-recv[3]}} with the {{ZMQ-DONTWAIT}} option and there is no waiting data. 0MQ will return -1 and set {{errno}} to {{EAGAIN}}.
* When a thread calls {{zmq-ctx-destroy[3]}} and other threads are doing blocking work. The {{zmq-ctx-destroy[3]}} call closes the context and all blocking calls exit with -1, and errno set to {{ETERM}}.
In C/C++, asserts can be removed entirely in optimized code, so don't make the mistake of wrapping the whole 0MQ call in an assert(). It looks neat, then the optimizer removes all the asserts and the calls you want to make, and your application breaks in impressive ways.
Let's see how to shut down a process cleanly. We'll take the parallel pipeline example from the previous section. If we've started a whole lot of workers in the background, we now want to kill them when the batch is finished. Let's do this by sending a kill message to the workers. The best place to do this is the sink, since it really knows when the batch is done.
How do we connect the sink to the workers? The PUSH/PULL sockets are one-way only. The standard 0MQ answer is: create a new socket flow for each type of problem you need to solve. We'll use a publish-subscribe model to send kill messages to the workers!figref():
* The sink creates a PUB socket on a new endpoint.
* Workers bind their input socket to this endpoint.
* When the sink detects the end of the batch it sends a kill to its PUB socket.
* When a worker detects this kill message, it exits.
It doesn't take much new code in the sink:
[[code type="fragment" name="killsignal"]]
void *control = zmq-socket (context, ZMQ-PUB);
zmq-bind (control, "tcp://*:5559");
...
// Send kill signal to workers
zmq-msg-init-data (&message, "KILL", 5);
zmq-msg-send (control, &message, 0);
zmq-msg-close (&message);
[[/code]]
[[code type="textdiagram" title="Parallel Pipeline with Kill Signaling"]]
+-------------+
| |
| Ventilator |
| |
+-------------+
| PUSH |
\------+------/
|
tasks
|
+---------------+---------------+
| | |
| /=--------|-----+=--------|-----+------\
task | task | task | :
| | | | | | |
v v v v v v |
/------+-----\ /------+-----\ /------+-----\ |
| PULL | SUB | | PULL | SUB | | PULL | SUB | |
+------+-----+ +------+-----+ +------+-----+ |
| | | | | | |
| Worker | | Worker | | Worker | |
| | | | | | |
+------------+ +------------+ +------------+ |
| PUSH | | PUSH | | PUSH | |
\-----+------/ \-----+------/ \-----+------/ |
| | | |
result result result |
| | | |
+---------------+---------------+ |
| |
results |
| |
v |
/-------------\ |
| PULL | |
+-------------+ |
| | |
| Sink | |
| | |
+-------------+ |
| PUB | |
\------+------/ |
| |
KILL signal |
| |
\--------------------------/
[[/code]]
Here is the worker process, which manages two sockets (a PULL socket getting tasks, and a SUB socket getting control commands) using the {{zmq-poll[3]}} technique we saw earlier:
[[code type="example" title="Parallel task worker with kill signaling" name="taskwork2"]]
[[/code]]
Here is the modified sink application. When it's finished collecting results it broadcasts a kill message to all workers:
[[code type="example" title="Parallel task sink with kill signaling" name="tasksink2"]]
[[/code]]
+++ Handling Interrupt Signals
Realistic applications need to shut down cleanly when interrupted with Ctrl-C or another signal such as SIGTERM. By default, these simply kill the process, meaning messages won't be flushed, files won't be closed cleanly, etc.
Here is how we handle a signal in various languages:
[[code type="example" title="Handling Ctrl-C cleanly" name="interrupt"]]
[[/code]]
The program provides {{s-catch-signals()}}, which traps Ctrl-C ({{SIGINT}}) and {{SIGTERM}}. When either of these signals arrive, the {{s-catch-signals()}} handler sets the global variable {{s-interrupted}}. Thanks to your signal handler, your application will not die automatically. Instead, you have a chance to clean up and exit gracefully. You have to now explicitly check for an interrupt, and handle it properly. Do this by calling {{s-catch-signals()}} (copy this from {{interrupt.c}}) at the start of your main code. This sets-up the signal handling. The interrupt will affect 0MQ calls as follows:
* If your code is blocking in {{zmq-msg-recv[3]}}, {{zmq-poll[3]}}, or {{zmq-msg-send[3]}}, when a signal arrives, the call will return with {{EINTR}}.
* Wrappers like {{s-recv()}} return NULL if they are interrupted.
So check for an {{EINTR}} return code, a NULL return, and/or {{s-interrupted}}.
Here is a typical code fragment:
[[code]]
s-catch-signals ();
client = zmq-socket (...);
while (!s-interrupted) {
char *message = s-recv (client);
if (!message)
break; // Ctrl-C used
}
zmq-close (client);
[[/code]]
If you call {{s-catch-signals()}} and don't test for interrupts, the your application will become immune to Ctrl-C and SIGTERM, which may be useful, but is usually not.
+++ Detecting Memory Leaks
Any long-running application has to manage memory correctly, or eventually it'll use up all available memory and crash. If you use a language that handles this automatically for you, congratulations. If you program in C or C++ or any other language where you're responsible for memory management, here's a short tutorial on using valgrind, which among other things will report on any leaks your programs have.
* To install valgrind, e.g. on Ubuntu or Debian issue:
[[code]]
sudo apt-get install valgrind
[[/code]]
* By default, 0MQ will cause valgrind to complain a lot. To remove these warnings, create a file called {{valgrind.supp}} that contains this:
[[code]]
{
<socketcall-sendto>
Memcheck:Param
socketcall.sendto(msg)
fun:send
...
}
{
<socketcall-sendto>
Memcheck:Param
socketcall.send(msg)
fun:send
...
}
[[/code]]
* Fix your applications to exit cleanly after Ctrl-C. For any application that exits by itself, that's not needed, but for long-running applications, this is essential, otherwise valgrind will complain about all currently allocated memory.
* Build your application with -DDEBUG, if it's not your default setting. That ensures valgrind can tell you exactly where memory is being leaked.
* Finally, run valgrind thus:
[[code]]
valgrind --tool=memcheck --leak-check=full --suppressions=valgrind.supp someprog
[[/code]]
And after fixing any errors it reported, you should get the pleasant message:
[[code]]
==30536== ERROR SUMMARY: 0 errors from 0 contexts...
[[/code]]
+++ Multithreading with 0MQ
0MQ is perhaps the nicest way ever to write multithreaded (MT) applications. Whereas as 0MQ sockets require some readjustment if you are used to traditional sockets, 0MQ multithreading will take everything you know about writing MT applications, throw it into a heap in the garden, pour gasoline over it, and set it alight. It's a rare book that deserves burning, but most books on concurrent programming do.
To make utterly perfect MT programs (and I mean that literally) **we don't need mutexes, locks, or any other form of inter-thread communication except messages sent across 0MQ sockets.**
By "perfect" MT programs I mean code that's easy to write and understand, that works with the same design approach in any programming language, and on any operating system, and that scales across any number of CPUs with zero wait states and no point of diminishing returns.
If you've spent years learning tricks to make your MT code work at all, let alone rapidly, with locks and semaphores and critical sections, you will be disgusted when you realize it was all for nothing. If there's one lesson we've learned from 30+ years of concurrent programming it is: //just don't share state//. It's like two drunkards trying to share a beer. It doesn't matter if they're good buddies. Sooner or later they're going to get into a fight. And the more drunkards you add to the table, the more they fight each other over the beer. The tragic majority of MT applications look like drunken bar fights.
The list of weird problems that you need to fight as you write classic shared-state MT code would be hilarious if it didn't translate directly into stress and risk, as code that seems to work suddenly fails under pressure. A large firm with world-beating experience in buggy code released its list of "11 Likely Problems In Your Multithreaded Code", which covers forgotten synchronization, incorrect granularity, read and write tearing, lock-free reordering, lock convoys, two-step dance, and priority inversion.
Yeah, we also counted seven problems, not eleven. That's not the point though. The point is, do you really want that code running the power grid or stock market to start getting two-step lock convoys at 3pm on a busy Thursday? Who cares what the terms actually mean. This is not what turned us on to programming, fighting ever more complex side-effects with ever more complex hacks.
Some widely used models, despite being the basis for entire industries, are fundamentally broken, and shared state concurrency is one of them. Code that wants to scale without limit does it like the Internet does, by sending messages and sharing nothing except a common contempt for broken programming models.
You should follow some rules to write happy multithreaded code with 0MQ:
* You must not access the same data from multiple threads. Using classic MT techniques like mutexes are an anti-pattern in 0MQ applications. The only exception to this is a 0MQ context object, which is threadsafe.
* You must create a 0MQ context for your process, and pass that to all threads that you want to connect via {{inproc}} sockets.
* You may treat threads as separate tasks, with their own context, but these threads cannot communicate over {{inproc}}. However they will be easier to break into standalone processes afterwards.
* You must not share 0MQ sockets between threads. 0MQ sockets are not threadsafe. Technically it's possible to do this, but it demands semaphores, locks, or mutexes. This will make your application slow and fragile. The only place where it's remotely sane to share sockets between threads are in language bindings that need to do magic like garbage collection on sockets.
If you need to start more than one proxy in an application, for example, you will want to run each in their own thread. It is easy to make the error of creating the proxy frontend and backend sockets in one thread, and then passing the sockets to the proxy in another thread. This may appear to work but will fail randomly. Remember: //Do not use or close sockets except in the thread that created them.//
If you follow these rules, you can quite easily split threads into separate processes, when you need to. Application logic can sit in threads, processes, nodes: whatever your scale needs.
0MQ uses native OS threads rather than virtual "green" threads. The advantage is that you don't need to learn any new threading API, and that 0MQ threads map cleanly to your operating system. You can use standard tools like Intel's ThreadChecker to see what your application is doing. The disadvantages are that your code, when it for instance starts new threads, won't be portable, and that if you have a huge number of threads (thousands), some operating systems will get stressed.
Let's see how this works in practice. We'll turn our old Hello World server into something more capable. The original server ran in a single thread. If the work per request is low, that's fine: one ØMQ thread can run at full speed on a CPU core, with no waits, doing an awful lot of work. But realistic servers have to do non-trivial work per request. A single core may not be enough when 10,000 clients hit the server all at once. So a realistic server must start multiple worker threads. It then accepts requests as fast as it can, and distributes these to its worker threads. The worker threads grind through the work, and eventually send their replies back.
You can of course do all this using a proxy broker and external worker processes, but often it's easier to start one process that gobbles up sixteen cores than sixteen processes, each gobbling up one core. Further, running workers as threads will cut out a network hop, latency, and network traffic.
The MT version of the Hello World service basically collapses the broker and workers into a single process. We use pthreads because it's the most widespread standard for multithreading:
[[code type="example" title="Multithreaded service" name="mtserver"]]
[[/code]]
All the code should be recognizable to you by now. How it works:
* The server starts a set of worker threads. Each worker thread creates a REP socket and then processes requests on this socket. Worker threads are just like single-threaded servers. The only differences are the transport ({{inproc}} instead of {{tcp}}), and the bind-connect direction.
* The server creates a ROUTER socket to talk to clients and binds this to its external interface (over {{tcp}}).
* The server creates a DEALER socket to talk to the workers and binds this to its internal interface (over {{inproc}}).
* The server starts a proxy that connects the two sockets. The proxy pulls incoming requests fairly from all clients, and distributes those out to workers. It also routes replies back to their origin.
Note that creating threads is not portable in most programming languages. The POSIX library is pthreads, but on Windows you have to use a different API. In our example, the {{pthread-create}} call starts up a new thread running the {{worker-routine}} function we defined. We'll see in [#advanced-request-reply] how to wrap this in a portable API.
Here the 'work' is just a one-second pause. We could do anything in the workers, including talking to other nodes. This is what the MT server looks like in terms of ØMQ sockets and nodes. Note how the request-reply chain is {{REQ-ROUTER-queue-DEALER-REP}}!figref().
[[code type="textdiagram" title="Multithreaded Server"]]
+------------+
| |
| Client |
| |
+------------+
| REQ |
\---+--------/
| ^
| |
"Hello" "World"
| |
/------------------|--=-|------------------\
| v | :
| /--------+---\ |
| | ROUTER | |
| +------------+ |
| | | |
| | Server | |
| | | |
| +------------+ |
| | | |
| | Queue | |
| | proxy | |
| | | |
| +------------+ |
| | DEALER | |
| \------------/ |
| ^ |
| | |
| +-----------+-----------+ |
| | | | |
| v v v |
| /--------\ /--------\ /--------\ |
| | REP | | REP | | REP | |
| +--------+ +--------+ +--------+ |
| | | | | | | |
| | Worker | | Worker | | Worker | |
| | | | | | | |
| +--------+ +--------+ +--------+ |
| |
\------------------------------------------/
[[/code]]
+++ Signaling between Threads (PAIR sockets)
When you start making multithreaded applications with 0MQ, you'll encounter the question of how to coordinate your threads. Though you might be tempted to insert 'sleep' statements, or use multithreading techniques such as semaphores or mutexes, **the only mechanism that you should use are 0MQ messages**. Remember the story of The Drunkards and the Beer Bottle.
Let's make three threads that signal each other when they are ready!figref(). In this example we use PAIR sockets over the {{inproc}} transport:
[[code type="example" title="Multithreaded relay" name="mtrelay"]]
[[/code]]