Open Access Handles The Queue
October 9, 2013 Jon Paris
Note: The code accompanying this article is available for download here.
In my previous tip I introduced you to an RPG Open Access (OA) handler that facilitated writing to a data queue using conventional WRITE operations. As you saw, from the programmer’s perspective they were writing to a disk file. The only difference was that the F-spec included the HANDLER keyword to instruct RPG that the handler was to perform all of the actual I/O operations. This time I am going to describe its companion handler–one that reads from data queues.
When we write a program that will process a disk file sequentially we are normally operating on the basis that there are a finite number of records in the file. When we have processed all of them the program will normally either terminate or move on to processing another file. But what exactly constitutes end-of-file when reading a data queue, where additional items may be constantly added?
In the past I have developed a number of applications that used data queues, and they tended to fall into one of three basic categories.
So how to provide for these three scenarios within our handler?
The first category is relatively easy to implement. We simply need the program that writes to the queue to add a message containing some form of end-of-file marker. We can allow the user of the read handler to specify the value of the marker via the user parameter on the HANDLER keyword. As you’ll see when we study the code, we can easily set up a default value for this.
The second is also easy to implement. Every time the API returns with a message length of zero we simply loop back and retry. The only consideration here is whether we want to control the loop wait time. Why? Because if we were to simply keep retrying the read, the program would be looping continuously and consuming CPU resources while it waited for a message. Since the program will be “woken up” if a message arrives on the queue while the program is in a wait state, it is normally a good idea to specify something other than a wait time of zero. I would normally use a default value of 2 unless very long waits were expected between messages. Again, we can use the user parameter to supply this value to the handler.
In the third case, we want to set end-of-file as soon as the queue is empty. To achieve this we simply set the wait time to zero and then set end of file if no message is found. I wanted to be able to use a single parameter to indicate both this immediate EOF requirement and to set the wait time. Only one problem: the programmer might want to actually use a wait time of zero so some other value would have to be used. I decided to use a negative value (-1) to indicate that this immediate EOF option should be used. I was a little conflicted over this choice since specifying a negative wait time to the QRCVDTAQ API means that you want to wait forever! It seemed to me though that this would not be a problem since a value of 99,999 (the maximum) equates to roughly 28 hours–that’s close to forever in terms of the run time of most programs! However, should you still want to request an infinite wait, you can do so by using any negative value other than -1.
You will see as we study the code how these options are implemented.
One other consideration occurred to me while working on this handler. Perhaps an option could be added to the queue writer handler I described in the previous tip to allow for the specification of an end-of-file marker. If it was supplied, then the close logic would cause the marker to be written to the queue before the program set on LR and returned. When we return to that program in a future tip I will include that feature, but thought I’d mention the idea now in case you wanted to go ahead and add it yourself.
The User Program (TESTRCVQH2)
I’m showing a program that takes advantage of the user parameter to specify the wait time and EOF marker. When we discuss the handler itself, you’ll see how the program determines whether the parameter was actually supplied and how it sets up default values.
Let’s look at the code. As before, the most important thing is what you don’t see–any sign of a data queue being manipulated:
H DftActGrp(*No) Option(*SrcSTmt) FTestDQ1 IF E DISK Handler('RCVDQHAND1' (A) F : myDQParms) F UsrOpn // Prototypes for Data Queue routines etc. /Copy ITJTIPSRC,DQProtoEtc D forever s n Inz(*Off) D count s 5i 0 (B) D myDQParms ds LikeDS(dQReadParms_T) /Free (C) myDQParms.waitTime = 60; myDqParms.eofMarker = '****'; (D) Open TestDQ1; DoU %EOF(TestDQ1); Read TestDQ1; If not %EOF(TestDQ1); Dsply ( 'Char1 => ' + Char1 ); Dsply ( 'PckDec1 => ' + %Char(PckDec1) + ' - Date1 => ' + %Char(Date1) ); Else; Dsply ( 'Hey - I hit EOF!'); *InLr = *On; EndIf; EndDo;
At (A) you can see that the user parameter has been added to the HANDLER keyword. The actual definition of the parameter DS is at (B) and it is defined as being LIKEDS a template that is contained within the /Copy file DQProtoEtc. This is what that template looks like:
d dQReadParms_T ds Template Qualified d waitTime 5p 0 d eofMarker 5a Varying Inz
At (C) the program sets the wait time to 60 seconds and the eof-marker to ‘****’. Notice that this should be done prior to opening the file (D). An alternative would be to pass the DS in as a parameter in which case RPG’s normal automatic open could be used. I chose to do it this way so that the values to be used in the program could be stored in the state information area that is created during the file open.
The DQ Reader Handler (RCVDQHAND1)
Much of the logic, for example validating the existence of the Data Queue, is identical to that in the Write handler from my last tip so I will not be describing it again. In fact, if I were writing this as a production program rather than as a teaching example, I would encapsulate things like the DQ validation logic in a service program. But I digress.
At (E) you can see the definition for the user parameters is BASED on the userArea pointer in the main info parameter and uses the dQReadParms_T template that we looked at earlier. This is followed by the definition for the RPG input buffer (F) data will be copied into this from the data queue message buffer (G) when appropriate.
(E) d dQReadParms ds LikeDS(dQReadParms_T) d Based(info.userArea) (F) d queueEntry s 64512a Based(info.inputBuffer) d waitTime s 5p 0 Inz // Message received into this buffer (entry) // Will be copied to RPG buffer if length is valid (G) d entry s 64512a d entryLength s 5p 0 d status s 7a d errorInfo ds LikeDS(stdErrStruct_T) // State info including working copy of User DQParms (H) d stateInfo DS Based(info.stateInfo) d Qualified d queueOpen n d queueInfo LikeDS(dQInfo_T) (I) d parms LikeDS(dQReadParms_T)
The state information for this handler (H) has been extended to include a copy of the user parameters (I). This was done to ensure that the values present at open time were captured and could not therefore be changed during the currency of the program. It also provides storage that can be used for default values as you will see later.
The first significant addition is the processing of the user area that begins at (J) with a check of the user area pointer. A null value indicates that the extra parameter was not present on the HANDLER keyword and so we set suitable default values. I have chosen to use “<end>” as the end of file marker and a wait time of 10 seconds. You can modify these to suit your own needs.
If the user area was passed then we simply copy the values into the state information area (K). This would be a good point to validate the parameters, but in my case there is really nothing to check.
// Check if user parm was supplied and if not set defaults (J) If info.userArea = *Null; stateInfo.parms.eofMarker = '<eof>'; // Set default EOF marker stateInfo.parms.waitTime = 10; // Set read for 10 second wait Else; (K) stateInfo.parms = dQReadParms; // Copy in user supplied data EndIf;
The real meat of the processing takes place starting at (L) where we enter a loop that will normally only be exited when a queue entry has been read. We will also break out of the loop if the programmer requested an immediate end of file when there were no more records to process.
The choice to use the wait time parameter to also allow the programmer to request an immediate EOF condition when the queue was empty rather than have an additional parameter required the additional logic at (M). A value of -1 would cause the QRCVDTAQ API to wait forever and that’s the exact opposite of what we want. So we set the value on the API call to 0. In all other cases we use the value supplied by the programmer.
(L) DoU entryLength <> 0; // Stay in loop until data arrives on queue // or quit if immediate EOF selected // Set wait time avoiding use of -1 when immediate EOF selected (M) If stateInfo.waitTime = immediateEOF; waitTime = 0; Else; waitTime = stateInfo.waitTime; EndIf; ReceiveFromDQ( info.externalFile.name : info.externalFile.library : entryLength : entry : waitTime);
At (N) we determine whether any data was retrieved from the queue or if the wait time simply expired. If there was no data and the programmer had requested immediate EOF when the queue was empty then we simply set the end of file indicator and return control to RPG. In all other cases, we will simply loop back and retry the DQ read.
When we break out of the loop because we have retrieved an entry from the queue, the first thing we must check is to see if the data matches the EOF marker (O). Since the parameter value is a variable length field, we use its length (%Len) to restrict the comparison to the relevant portion of the entry buffer. If the EOF marker is found then we set end of file and return control to RPG.
// If no data received from the queue check to see if EOF // should be signalled (N) If ( entryLength = 0 ) and ( stateInfo.parms.waitTime = immediateEOF ); info.eof = *On; // Set eof and return control to RPG Return; EndIf; EndDo; // We have data so check if it includes the EOF marker. (O) If %Subst( entry: 1: %Len(stateInfo.parms.eofMarker)) = stateInfo.parms.eofMarker; // Sender indicated EOF info.eof = *On; // Set eof and return control to RPG Return; EndIf; // EOF was not flagged so copy data to read buffer // Regardless of length of received data only copy as // much as the buffer has room for or bad things will happen (P) %Subst(queueEntry: 1: info.inputBufferLen) = entry;
At this point we have a queue entry, and we are not at end of file, so we must return it to the RPG program as a record. We do this by copying the date from the queue entry into the RPG input buffer (P). Notice that we restrict the amount of data that we copy to the size of the space RPG has reserved to hold it, regardless of how long the queue entry is. Why? Because we would run the risk of corrupting memory that RPG has reserved for other purposes, and that could have really nasty consequences.
And that is all there really is to it.
The nice thing about writing a buffer handler like this one is that we neither need to know nor care about the format of the data. It is the responsibility of the user of the handlers to make sure that they specify the same database file to both the program that writes to the queue and the one that reads from it. If you think about it, that is “situation normal”. If we write to a file then that is the one we must read from. And that is the whole point of using Open Access handlers: to do different things by using traditional I/O methods. It is my firm belief that OA has the potential to open up the use of features such as data queues to a wider audience and thereby to simplify the logic of programs that use them. The same approach can be used with delimited (CSV) files, sockets programming, and much more. My only regret is that we did not have this functionality 10 years or more ago.
In the next tip we’ll look at what it takes to extend the handlers to be able to use keyed data queues. In the meantime, please let me know of your own experiments and if there are any other uses of OA you’d like me to explore in this series.
Jon Paris is one of the world’s most knowledgeable experts on programming on the System i platform. Paris cut his teeth on the System/38 way back when, and in 1987 he joined IBM’s Toronto software lab to work on the COBOL compilers for the System/38 and System/36. He also worked on the creation of the COBOL/400 compilers for the original AS/400s back in 1988, and was one of the key developers behind RPG IV and the CODE/400 development tool. In 1998, he left IBM to start his own education and training firm, a job he does to this day with his wife, Susan Gantner–also an expert in System i programming. Paris and Gantner, along with Paul Tuohy and Skip Marchesani, are co-founders of System i Developer, which hosts the new RPG & DB2 Summit conference. Send your questions or comments for Jon to Ted Holt via the IT Jungle Contact page.